diff --git a/src/services/sqlite/SessionStore.ts b/src/services/sqlite/SessionStore.ts index 3202351c..adb849b6 100644 --- a/src/services/sqlite/SessionStore.ts +++ b/src/services/sqlite/SessionStore.ts @@ -1006,8 +1006,10 @@ export class SessionStore { * * @param sessionDbId - The database ID of the session * @param memorySessionId - The memory session ID to ensure is registered + * @returns true if the memory_session_id was updated (or already matched), + * false if the update was skipped because the old ID has child rows. */ - ensureMemorySessionIdRegistered(sessionDbId: number, memorySessionId: string): void { + ensureMemorySessionIdRegistered(sessionDbId: number, memorySessionId: string): boolean { const session = this.db.prepare(` SELECT id, memory_session_id FROM sdk_sessions WHERE id = ? `).get(sessionDbId) as { id: number; memory_session_id: string | null } | undefined; @@ -1016,40 +1018,43 @@ export class SessionStore { throw new Error(`Session ${sessionDbId} not found in sdk_sessions`); } - if (session.memory_session_id !== memorySessionId) { - // If the old memory_session_id has child rows (observations/summaries), - // don't update in-place — ON UPDATE RESTRICT would reject it, and we - // shouldn't rewrite historical attribution anyway. Only update when - // transitioning from NULL or when there are no children. - if (session.memory_session_id !== null) { - const childCount = this.db.prepare(` - SELECT - (SELECT COUNT(*) FROM observations WHERE memory_session_id = ?) + - (SELECT COUNT(*) FROM session_summaries WHERE memory_session_id = ?) - AS total - `).get(session.memory_session_id, session.memory_session_id) as { total: number }; - - if (childCount.total > 0) { - logger.warn('DB', 'Skipping memory_session_id update: old ID has child rows (historical attribution preserved)', { - sessionDbId, - oldId: session.memory_session_id, - newId: memorySessionId, - childCount: childCount.total - }); - return; - } - } - - this.db.prepare(` - UPDATE sdk_sessions SET memory_session_id = ? WHERE id = ? - `).run(memorySessionId, sessionDbId); - - logger.info('DB', 'Registered memory_session_id before storage (FK fix)', { - sessionDbId, - oldId: session.memory_session_id, - newId: memorySessionId - }); + if (session.memory_session_id === memorySessionId) { + return true; // Already matches } + + // If the old memory_session_id has child rows (observations/summaries), + // don't update in-place — ON UPDATE RESTRICT would reject it, and we + // shouldn't rewrite historical attribution anyway. Only update when + // transitioning from NULL or when there are no children. + if (session.memory_session_id !== null) { + const childCount = this.db.prepare(` + SELECT + (SELECT COUNT(*) FROM observations WHERE memory_session_id = ?) + + (SELECT COUNT(*) FROM session_summaries WHERE memory_session_id = ?) + AS total + `).get(session.memory_session_id, session.memory_session_id) as { total: number }; + + if (childCount.total > 0) { + logger.warn('DB', 'Skipping memory_session_id update: old ID has child rows (historical attribution preserved)', { + sessionDbId, + oldId: session.memory_session_id, + newId: memorySessionId, + childCount: childCount.total + }); + return false; + } + } + + this.db.prepare(` + UPDATE sdk_sessions SET memory_session_id = ? WHERE id = ? + `).run(memorySessionId, sessionDbId); + + logger.info('DB', 'Registered memory_session_id before storage (FK fix)', { + sessionDbId, + oldId: session.memory_session_id, + newId: memorySessionId + }); + return true; } /** diff --git a/src/services/sqlite/migrations/runner.ts b/src/services/sqlite/migrations/runner.ts index 2a8bb9cd..6ec7bd39 100644 --- a/src/services/sqlite/migrations/runner.ts +++ b/src/services/sqlite/migrations/runner.ts @@ -971,14 +971,48 @@ export class MigrationRunner { logger.debug('DB', 'Replacing ON UPDATE CASCADE with ON UPDATE RESTRICT on FK constraints'); - // Get current column lists to ensure we copy all columns including any added by later migrations - const obsColumns = (this.db.query('PRAGMA table_info(observations)').all() as TableColumnInfo[]) + // Check if the FK actually uses CASCADE before rebuilding. + // Fresh databases created with the new initializeSchema() already have RESTRICT, + // so the expensive table rebuild is unnecessary. + const obsFkInfo = this.db.query('PRAGMA foreign_key_list(observations)').all() as any[]; + const obsHasCascadeOnUpdate = obsFkInfo.some((fk: any) => fk.on_update === 'CASCADE'); + const sumFkInfo = this.db.query('PRAGMA foreign_key_list(session_summaries)').all() as any[]; + const sumHasCascadeOnUpdate = sumFkInfo.some((fk: any) => fk.on_update === 'CASCADE'); + + if (!obsHasCascadeOnUpdate && !sumHasCascadeOnUpdate) { + // Already using RESTRICT (or no FK at all), skip rebuild + logger.debug('DB', 'FK constraints already use RESTRICT, skipping table rebuild'); + this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(26, new Date().toISOString()); + return; + } + + // Get current column lists to ensure we copy all columns including any added by later migrations. + // Filter to only columns that exist in the new schema to prevent INSERT failures when + // the source table has columns not defined in the target (e.g. from a future migration). + const obsSourceColumns = (this.db.query('PRAGMA table_info(observations)').all() as TableColumnInfo[]) .map(c => c.name) .filter(n => n !== undefined); - const sumColumns = (this.db.query('PRAGMA table_info(session_summaries)').all() as TableColumnInfo[]) + const sumSourceColumns = (this.db.query('PRAGMA table_info(session_summaries)').all() as TableColumnInfo[]) .map(c => c.name) .filter(n => n !== undefined); + // Canonical column sets for the new tables + const obsNewSchemaColumns = new Set([ + 'id', 'memory_session_id', 'project', 'text', 'type', 'title', 'subtitle', + 'facts', 'narrative', 'concepts', 'files_read', 'files_modified', + 'prompt_number', 'discovery_tokens', 'content_hash', + 'created_at', 'created_at_epoch', 'merged_into_project' + ]); + const sumNewSchemaColumns = new Set([ + 'id', 'memory_session_id', 'project', 'request', 'investigated', 'learned', + 'completed', 'next_steps', 'files_read', 'files_edited', 'notes', + 'prompt_number', 'discovery_tokens', + 'created_at', 'created_at_epoch', 'merged_into_project' + ]); + + const obsColumns = obsSourceColumns.filter(c => obsNewSchemaColumns.has(c)); + const sumColumns = sumSourceColumns.filter(c => sumNewSchemaColumns.has(c)); + this.db.run('PRAGMA foreign_keys = OFF'); this.db.run('BEGIN TRANSACTION'); diff --git a/src/services/worker-service.ts b/src/services/worker-service.ts index 1e8bec2b..16590911 100644 --- a/src/services/worker-service.ts +++ b/src/services/worker-service.ts @@ -174,6 +174,7 @@ export class WorkerService { // Failed pending messages purge interval (Issue #1957) private failedMessagesPurgeInterval: ReturnType | null = null; + private failedMessagesPurgeInFlight = false; // AI interaction tracking for health endpoint private lastAiInteraction: { @@ -542,16 +543,20 @@ export class WorkerService { }, 2 * 60 * 1000); // Purge failed pending messages every 30 minutes (Issue #1957) + const { PendingMessageStore: PurgeMessageStore } = await import('./sqlite/PendingMessageStore.js'); + const purgeStore = new PurgeMessageStore(this.dbManager.getSessionStore().db, 3); this.failedMessagesPurgeInterval = setInterval(async () => { + if (this.failedMessagesPurgeInFlight || this.isShuttingDown) return; + this.failedMessagesPurgeInFlight = true; try { - const { PendingMessageStore } = await import('./sqlite/PendingMessageStore.js'); - const purgeStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3); const purged = purgeStore.clearFailed(); if (purged > 0) { logger.info('SYSTEM', `Purged ${purged} failed pending messages`); } } catch (e) { logger.error('SYSTEM', 'Failed message purge error', { error: e instanceof Error ? e.message : String(e) }); + } finally { + this.failedMessagesPurgeInFlight = false; } }, 30 * 60 * 1000); diff --git a/src/services/worker/agents/ResponseProcessor.ts b/src/services/worker/agents/ResponseProcessor.ts index d7f446c3..af87b1e1 100644 --- a/src/services/worker/agents/ResponseProcessor.ts +++ b/src/services/worker/agents/ResponseProcessor.ts @@ -99,7 +99,22 @@ export async function processAgentResponse( // in case the DB was somehow not updated (race condition, crash, etc.). // In multi-terminal scenarios, createSDKSession() now resets memory_session_id to NULL // for each new generator, ensuring clean isolation. - sessionStore.ensureMemorySessionIdRegistered(session.sessionDbId, session.memorySessionId); + const memorySessionIdRegistered = sessionStore.ensureMemorySessionIdRegistered(session.sessionDbId, session.memorySessionId); + if (!memorySessionIdRegistered) { + // The DB still holds the old memory_session_id because child rows exist. + // Use the DB's current value so FK constraints are satisfied. + const currentSession = sessionStore.getSessionById(session.sessionDbId); + if (currentSession?.memory_session_id) { + logger.warn('DB', 'Using existing memory_session_id from DB for storage (child-row guard)', { + sessionDbId: session.sessionDbId, + requested: session.memorySessionId, + using: currentSession.memory_session_id + }); + session.memorySessionId = currentSession.memory_session_id; + } else { + throw new Error(`Cannot store observations: memory_session_id update was skipped and no existing ID found for session ${session.sessionDbId}`); + } + } // Log pre-storage with session ID chain for verification logger.info('DB', `STORING | sessionDbId=${session.sessionDbId} | memorySessionId=${session.memorySessionId} | obsCount=${observations.length} | hasSummary=${!!summaryForStore}`, {