fix: stop spinner from spinning forever (#1440)

* fix: stop spinner from spinning forever due to orphaned DB messages

The activity spinner never stopped because isAnySessionProcessing() queried
ALL pending/processing messages in the database, including orphaned messages
from dead sessions that no generator would ever process.

Root cause: isAnySessionProcessing() used hasAnyPendingWork() which is a
global DB scan. Changed it to use getTotalQueueDepth() which only checks
sessions in the active in-memory Map.

Additional fixes:
- Add terminateSession() to enforce restart-or-terminate invariant
- Fix 3 zombie paths in .finally() handler that left sessions alive
- Clean up idle sessions from memory on successful completion
- Remove redundant bare isProcessing:true broadcast
- Replace inline require() with proper accessor
- Add 8 regression tests for session termination invariant

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: address review findings — idle-timeout race, double broadcast, query amplification

- Move pendingCount check before idle-timeout termination to prevent
  abandoning fresh messages that arrive between idle abort and .finally()
- Move broadcastProcessingStatus() inside restart branch only — the else
  branch already broadcasts via removeSessionImmediate callback
- Compute queueDepth once in broadcastProcessingStatus() and derive
  isProcessing from it, eliminating redundant double iteration

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-03-21 14:13:10 -07:00
committed by GitHub
parent 9f529a30f5
commit 4d7bec4d05
7 changed files with 1043 additions and 68213 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -653,30 +653,26 @@ export class WorkerService {
// Do NOT restart after unrecoverable errors - prevents infinite loops // Do NOT restart after unrecoverable errors - prevents infinite loops
if (hadUnrecoverableError) { if (hadUnrecoverableError) {
logger.warn('SYSTEM', 'Skipping restart due to unrecoverable error', { this.terminateSession(session.sessionDbId, 'unrecoverable_error');
sessionId: session.sessionDbId
});
this.broadcastProcessingStatus();
return; return;
} }
// Store for pending-count check below const pendingStore = this.sessionManager.getPendingMessageStore();
const { PendingMessageStore } = require('./sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
// Idle timeout means no new work arrived for 3 minutes - don't restart
// No need to reset stale processing messages here — claimNextMessage() self-heals
if (session.idleTimedOut) {
logger.info('SYSTEM', 'Generator exited due to idle timeout, not restarting', {
sessionId: session.sessionDbId
});
session.idleTimedOut = false; // Reset flag
this.broadcastProcessingStatus();
return;
}
// Check if there's pending work that needs processing with a fresh AbortController // Check if there's pending work that needs processing with a fresh AbortController
const pendingCount = pendingStore.getPendingCount(session.sessionDbId); const pendingCount = pendingStore.getPendingCount(session.sessionDbId);
// Idle timeout means no new work arrived for 3 minutes - don't restart
// But check pendingCount first: a message may have arrived between idle
// abort and .finally(), and we must not abandon it
if (session.idleTimedOut) {
session.idleTimedOut = false; // Reset flag
if (pendingCount === 0) {
this.terminateSession(session.sessionDbId, 'idle_timeout');
return;
}
// Fall through to pending-work restart below
}
const MAX_PENDING_RESTARTS = 3; const MAX_PENDING_RESTARTS = 3;
if (pendingCount > 0) { if (pendingCount > 0) {
@@ -690,7 +686,7 @@ export class WorkerService {
consecutiveRestarts: session.consecutiveRestarts consecutiveRestarts: session.consecutiveRestarts
}); });
session.consecutiveRestarts = 0; session.consecutiveRestarts = 0;
this.broadcastProcessingStatus(); this.terminateSession(session.sessionDbId, 'max_restarts_exceeded');
return; return;
} }
@@ -703,12 +699,13 @@ export class WorkerService {
session.abortController = new AbortController(); session.abortController = new AbortController();
// Restart processor // Restart processor
this.startSessionProcessor(session, 'pending-work-restart'); this.startSessionProcessor(session, 'pending-work-restart');
this.broadcastProcessingStatus();
} else { } else {
// Successful completion with no pending work — reset counter // Successful completion with no pending work — clean up session
// removeSessionImmediate fires onSessionDeletedCallback → broadcastProcessingStatus()
session.consecutiveRestarts = 0; session.consecutiveRestarts = 0;
this.sessionManager.removeSessionImmediate(session.sessionDbId);
} }
this.broadcastProcessingStatus();
}); });
} }
@@ -784,6 +781,30 @@ export class WorkerService {
this.sessionEventBroadcaster.broadcastSessionCompleted(sessionDbId); this.sessionEventBroadcaster.broadcastSessionCompleted(sessionDbId);
} }
/**
* Terminate a session that will not restart.
* Enforces the restart-or-terminate invariant: every generator exit
* must either call startSessionProcessor() or terminateSession().
* No zombie sessions allowed.
*
* GENERATOR EXIT INVARIANT:
* .finally() → restart? → startSessionProcessor()
* no? → terminateSession()
*/
private terminateSession(sessionDbId: number, reason: string): void {
const pendingStore = this.sessionManager.getPendingMessageStore();
const abandoned = pendingStore.markAllSessionMessagesAbandoned(sessionDbId);
logger.info('SYSTEM', 'Session terminated', {
sessionId: sessionDbId,
reason,
abandonedMessages: abandoned
});
// removeSessionImmediate fires onSessionDeletedCallback → broadcastProcessingStatus()
this.sessionManager.removeSessionImmediate(sessionDbId);
}
/** /**
* Process pending session queues * Process pending session queues
*/ */
@@ -907,8 +928,8 @@ export class WorkerService {
* Broadcast processing status change to SSE clients * Broadcast processing status change to SSE clients
*/ */
broadcastProcessingStatus(): void { broadcastProcessingStatus(): void {
const isProcessing = this.sessionManager.isAnySessionProcessing();
const queueDepth = this.sessionManager.getTotalActiveWork(); const queueDepth = this.sessionManager.getTotalActiveWork();
const isProcessing = queueDepth > 0;
const activeSessions = this.sessionManager.getActiveSessionCount(); const activeSessions = this.sessionManager.getActiveSessionCount();
logger.info('WORKER', 'Broadcasting processing status', { logger.info('WORKER', 'Broadcasting processing status', {

View File

@@ -350,7 +350,7 @@ export class SessionManager {
this.sessions.delete(sessionDbId); this.sessions.delete(sessionDbId);
this.sessionQueues.delete(sessionDbId); this.sessionQueues.delete(sessionDbId);
logger.info('SESSION', 'Session removed (orphaned after SDK termination)', { logger.info('SESSION', 'Session removed from active sessions', {
sessionId: sessionDbId, sessionId: sessionDbId,
project: session.project project: session.project
}); });
@@ -402,10 +402,11 @@ export class SessionManager {
} }
/** /**
* Check if any session has pending messages (for spinner tracking) * Check if any active session has pending messages (for spinner tracking).
* Scoped to in-memory sessions only.
*/ */
hasPendingMessages(): boolean { hasPendingMessages(): boolean {
return this.getPendingStore().hasAnyPendingWork(); return this.getTotalQueueDepth() > 0;
} }
/** /**
@@ -437,12 +438,12 @@ export class SessionManager {
} }
/** /**
* Check if any session is actively processing (has pending messages OR active generator) * Check if any active session has pending work.
* Used for activity indicator to prevent spinner from stopping while SDK is processing * Scoped to in-memory sessions only — orphaned DB messages from dead
* sessions must not keep the spinner spinning forever.
*/ */
isAnySessionProcessing(): boolean { isAnySessionProcessing(): boolean {
// hasAnyPendingWork checks for 'pending' OR 'processing' return this.getTotalQueueDepth() > 0;
return this.getPendingStore().hasAnyPendingWork();
} }
/** /**

View File

@@ -33,12 +33,6 @@ export class SessionEventBroadcaster {
prompt prompt
}); });
// Start activity indicator (work is about to begin)
this.sseBroadcaster.broadcast({
type: 'processing_status',
isProcessing: true
});
// Update processing status based on queue depth // Update processing status based on queue depth
this.workerService.broadcastProcessingStatus(); this.workerService.broadcastProcessingStatus();
} }

View File

@@ -326,4 +326,152 @@ describe('Zombie Agent Prevention', () => {
session.generatorPromise = null; session.generatorPromise = null;
expect(session.generatorPromise).toBeNull(); expect(session.generatorPromise).toBeNull();
}); });
describe('Session Termination Invariant', () => {
// Tests the restart-or-terminate invariant:
// When a generator exits without restarting, its messages must be
// marked abandoned and the session removed from the active Map.
test('should mark messages abandoned when session is terminated', () => {
const sessionId = createDbSession('content-terminate-1');
enqueueTestMessage(sessionId, 'content-terminate-1');
enqueueTestMessage(sessionId, 'content-terminate-1');
// Verify messages exist
expect(pendingStore.getPendingCount(sessionId)).toBe(2);
expect(pendingStore.hasAnyPendingWork()).toBe(true);
// Terminate: mark abandoned (same as terminateSession does)
const abandoned = pendingStore.markAllSessionMessagesAbandoned(sessionId);
expect(abandoned).toBe(2);
// Spinner should stop: no pending work remains
expect(pendingStore.hasAnyPendingWork()).toBe(false);
expect(pendingStore.getPendingCount(sessionId)).toBe(0);
});
test('should handle terminate with zero pending messages', () => {
const sessionId = createDbSession('content-terminate-empty');
// No messages enqueued
expect(pendingStore.getPendingCount(sessionId)).toBe(0);
// Terminate with nothing to abandon
const abandoned = pendingStore.markAllSessionMessagesAbandoned(sessionId);
expect(abandoned).toBe(0);
// Still no pending work
expect(pendingStore.hasAnyPendingWork()).toBe(false);
});
test('should be idempotent — double terminate marks zero on second call', () => {
const sessionId = createDbSession('content-terminate-idempotent');
enqueueTestMessage(sessionId, 'content-terminate-idempotent');
// First terminate
const first = pendingStore.markAllSessionMessagesAbandoned(sessionId);
expect(first).toBe(1);
// Second terminate — already failed, nothing to mark
const second = pendingStore.markAllSessionMessagesAbandoned(sessionId);
expect(second).toBe(0);
expect(pendingStore.hasAnyPendingWork()).toBe(false);
});
test('should remove session from Map via removeSessionImmediate', () => {
const sessionId = createDbSession('content-terminate-map');
const session = createMockSession(sessionId, {
contentSessionId: 'content-terminate-map',
});
// Simulate the in-memory sessions Map
const sessions = new Map<number, ActiveSession>();
sessions.set(sessionId, session);
expect(sessions.has(sessionId)).toBe(true);
// Simulate removeSessionImmediate behavior
sessions.delete(sessionId);
expect(sessions.has(sessionId)).toBe(false);
});
test('should return hasAnyPendingWork false after all sessions terminated', () => {
// Create multiple sessions with messages
const sid1 = createDbSession('content-multi-term-1');
const sid2 = createDbSession('content-multi-term-2');
const sid3 = createDbSession('content-multi-term-3');
enqueueTestMessage(sid1, 'content-multi-term-1');
enqueueTestMessage(sid1, 'content-multi-term-1');
enqueueTestMessage(sid2, 'content-multi-term-2');
enqueueTestMessage(sid3, 'content-multi-term-3');
expect(pendingStore.hasAnyPendingWork()).toBe(true);
// Terminate all sessions
pendingStore.markAllSessionMessagesAbandoned(sid1);
pendingStore.markAllSessionMessagesAbandoned(sid2);
pendingStore.markAllSessionMessagesAbandoned(sid3);
// Spinner must stop
expect(pendingStore.hasAnyPendingWork()).toBe(false);
});
test('should not affect other sessions when terminating one', () => {
const sid1 = createDbSession('content-isolate-1');
const sid2 = createDbSession('content-isolate-2');
enqueueTestMessage(sid1, 'content-isolate-1');
enqueueTestMessage(sid2, 'content-isolate-2');
// Terminate only session 1
pendingStore.markAllSessionMessagesAbandoned(sid1);
// Session 2 still has work
expect(pendingStore.getPendingCount(sid1)).toBe(0);
expect(pendingStore.getPendingCount(sid2)).toBe(1);
expect(pendingStore.hasAnyPendingWork()).toBe(true);
});
test('should mark both pending and processing messages as abandoned', () => {
const sessionId = createDbSession('content-mixed-status');
// Enqueue two messages
const msgId1 = enqueueTestMessage(sessionId, 'content-mixed-status');
enqueueTestMessage(sessionId, 'content-mixed-status');
// Claim first message (transitions to 'processing')
const claimed = pendingStore.claimNextMessage(sessionId);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(msgId1);
// Now we have 1 processing + 1 pending
expect(pendingStore.getPendingCount(sessionId)).toBe(2);
// Terminate should mark BOTH as failed
const abandoned = pendingStore.markAllSessionMessagesAbandoned(sessionId);
expect(abandoned).toBe(2);
expect(pendingStore.hasAnyPendingWork()).toBe(false);
});
test('should enforce invariant: no pending work after terminate regardless of initial state', () => {
const sessionId = createDbSession('content-invariant');
// Create a complex initial state: some pending, some processing, some with stale timestamps
enqueueTestMessage(sessionId, 'content-invariant');
enqueueTestMessage(sessionId, 'content-invariant');
enqueueTestMessage(sessionId, 'content-invariant');
// Claim one (processing)
pendingStore.claimNextMessage(sessionId);
// Verify complex state
expect(pendingStore.getPendingCount(sessionId)).toBe(3);
// THE INVARIANT: after terminate, hasAnyPendingWork MUST be false
pendingStore.markAllSessionMessagesAbandoned(sessionId);
expect(pendingStore.hasAnyPendingWork()).toBe(false);
expect(pendingStore.getPendingCount(sessionId)).toBe(0);
});
});
}); });