fix: address review - purge guard, migration guard, column safety

- Add in-flight boolean guard to prevent overlapping purge interval ticks
- Reuse PendingMessageStore instance instead of creating one per tick
- Guard migration 26 against fresh databases that already have RESTRICT
  FKs, avoiding unnecessary table rebuilds
- Filter dynamic column lists to only include columns present in the new
  schema, preventing INSERT failures from unexpected source columns
- Return boolean from ensureMemorySessionIdRegistered so callers know
  when the update was skipped due to child rows, and fall back to the
  existing DB value in ResponseProcessor

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-04-18 14:30:54 -07:00
parent 719c8ff8ac
commit 734a0c0ad4
4 changed files with 99 additions and 40 deletions

View File

@@ -1006,8 +1006,10 @@ export class SessionStore {
*
* @param sessionDbId - The database ID of the session
* @param memorySessionId - The memory session ID to ensure is registered
* @returns true if the memory_session_id was updated (or already matched),
* false if the update was skipped because the old ID has child rows.
*/
ensureMemorySessionIdRegistered(sessionDbId: number, memorySessionId: string): void {
ensureMemorySessionIdRegistered(sessionDbId: number, memorySessionId: string): boolean {
const session = this.db.prepare(`
SELECT id, memory_session_id FROM sdk_sessions WHERE id = ?
`).get(sessionDbId) as { id: number; memory_session_id: string | null } | undefined;
@@ -1016,40 +1018,43 @@ export class SessionStore {
throw new Error(`Session ${sessionDbId} not found in sdk_sessions`);
}
if (session.memory_session_id !== memorySessionId) {
// If the old memory_session_id has child rows (observations/summaries),
// don't update in-place — ON UPDATE RESTRICT would reject it, and we
// shouldn't rewrite historical attribution anyway. Only update when
// transitioning from NULL or when there are no children.
if (session.memory_session_id !== null) {
const childCount = this.db.prepare(`
SELECT
(SELECT COUNT(*) FROM observations WHERE memory_session_id = ?) +
(SELECT COUNT(*) FROM session_summaries WHERE memory_session_id = ?)
AS total
`).get(session.memory_session_id, session.memory_session_id) as { total: number };
if (childCount.total > 0) {
logger.warn('DB', 'Skipping memory_session_id update: old ID has child rows (historical attribution preserved)', {
sessionDbId,
oldId: session.memory_session_id,
newId: memorySessionId,
childCount: childCount.total
});
return;
}
}
this.db.prepare(`
UPDATE sdk_sessions SET memory_session_id = ? WHERE id = ?
`).run(memorySessionId, sessionDbId);
logger.info('DB', 'Registered memory_session_id before storage (FK fix)', {
sessionDbId,
oldId: session.memory_session_id,
newId: memorySessionId
});
if (session.memory_session_id === memorySessionId) {
return true; // Already matches
}
// If the old memory_session_id has child rows (observations/summaries),
// don't update in-place — ON UPDATE RESTRICT would reject it, and we
// shouldn't rewrite historical attribution anyway. Only update when
// transitioning from NULL or when there are no children.
if (session.memory_session_id !== null) {
const childCount = this.db.prepare(`
SELECT
(SELECT COUNT(*) FROM observations WHERE memory_session_id = ?) +
(SELECT COUNT(*) FROM session_summaries WHERE memory_session_id = ?)
AS total
`).get(session.memory_session_id, session.memory_session_id) as { total: number };
if (childCount.total > 0) {
logger.warn('DB', 'Skipping memory_session_id update: old ID has child rows (historical attribution preserved)', {
sessionDbId,
oldId: session.memory_session_id,
newId: memorySessionId,
childCount: childCount.total
});
return false;
}
}
this.db.prepare(`
UPDATE sdk_sessions SET memory_session_id = ? WHERE id = ?
`).run(memorySessionId, sessionDbId);
logger.info('DB', 'Registered memory_session_id before storage (FK fix)', {
sessionDbId,
oldId: session.memory_session_id,
newId: memorySessionId
});
return true;
}
/**

View File

@@ -971,14 +971,48 @@ export class MigrationRunner {
logger.debug('DB', 'Replacing ON UPDATE CASCADE with ON UPDATE RESTRICT on FK constraints');
// Get current column lists to ensure we copy all columns including any added by later migrations
const obsColumns = (this.db.query('PRAGMA table_info(observations)').all() as TableColumnInfo[])
// Check if the FK actually uses CASCADE before rebuilding.
// Fresh databases created with the new initializeSchema() already have RESTRICT,
// so the expensive table rebuild is unnecessary.
const obsFkInfo = this.db.query('PRAGMA foreign_key_list(observations)').all() as any[];
const obsHasCascadeOnUpdate = obsFkInfo.some((fk: any) => fk.on_update === 'CASCADE');
const sumFkInfo = this.db.query('PRAGMA foreign_key_list(session_summaries)').all() as any[];
const sumHasCascadeOnUpdate = sumFkInfo.some((fk: any) => fk.on_update === 'CASCADE');
if (!obsHasCascadeOnUpdate && !sumHasCascadeOnUpdate) {
// Already using RESTRICT (or no FK at all), skip rebuild
logger.debug('DB', 'FK constraints already use RESTRICT, skipping table rebuild');
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(26, new Date().toISOString());
return;
}
// Get current column lists to ensure we copy all columns including any added by later migrations.
// Filter to only columns that exist in the new schema to prevent INSERT failures when
// the source table has columns not defined in the target (e.g. from a future migration).
const obsSourceColumns = (this.db.query('PRAGMA table_info(observations)').all() as TableColumnInfo[])
.map(c => c.name)
.filter(n => n !== undefined);
const sumColumns = (this.db.query('PRAGMA table_info(session_summaries)').all() as TableColumnInfo[])
const sumSourceColumns = (this.db.query('PRAGMA table_info(session_summaries)').all() as TableColumnInfo[])
.map(c => c.name)
.filter(n => n !== undefined);
// Canonical column sets for the new tables
const obsNewSchemaColumns = new Set([
'id', 'memory_session_id', 'project', 'text', 'type', 'title', 'subtitle',
'facts', 'narrative', 'concepts', 'files_read', 'files_modified',
'prompt_number', 'discovery_tokens', 'content_hash',
'created_at', 'created_at_epoch', 'merged_into_project'
]);
const sumNewSchemaColumns = new Set([
'id', 'memory_session_id', 'project', 'request', 'investigated', 'learned',
'completed', 'next_steps', 'files_read', 'files_edited', 'notes',
'prompt_number', 'discovery_tokens',
'created_at', 'created_at_epoch', 'merged_into_project'
]);
const obsColumns = obsSourceColumns.filter(c => obsNewSchemaColumns.has(c));
const sumColumns = sumSourceColumns.filter(c => sumNewSchemaColumns.has(c));
this.db.run('PRAGMA foreign_keys = OFF');
this.db.run('BEGIN TRANSACTION');

View File

@@ -174,6 +174,7 @@ export class WorkerService {
// Failed pending messages purge interval (Issue #1957)
private failedMessagesPurgeInterval: ReturnType<typeof setInterval> | null = null;
private failedMessagesPurgeInFlight = false;
// AI interaction tracking for health endpoint
private lastAiInteraction: {
@@ -542,16 +543,20 @@ export class WorkerService {
}, 2 * 60 * 1000);
// Purge failed pending messages every 30 minutes (Issue #1957)
const { PendingMessageStore: PurgeMessageStore } = await import('./sqlite/PendingMessageStore.js');
const purgeStore = new PurgeMessageStore(this.dbManager.getSessionStore().db, 3);
this.failedMessagesPurgeInterval = setInterval(async () => {
if (this.failedMessagesPurgeInFlight || this.isShuttingDown) return;
this.failedMessagesPurgeInFlight = true;
try {
const { PendingMessageStore } = await import('./sqlite/PendingMessageStore.js');
const purgeStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
const purged = purgeStore.clearFailed();
if (purged > 0) {
logger.info('SYSTEM', `Purged ${purged} failed pending messages`);
}
} catch (e) {
logger.error('SYSTEM', 'Failed message purge error', { error: e instanceof Error ? e.message : String(e) });
} finally {
this.failedMessagesPurgeInFlight = false;
}
}, 30 * 60 * 1000);

View File

@@ -99,7 +99,22 @@ export async function processAgentResponse(
// in case the DB was somehow not updated (race condition, crash, etc.).
// In multi-terminal scenarios, createSDKSession() now resets memory_session_id to NULL
// for each new generator, ensuring clean isolation.
sessionStore.ensureMemorySessionIdRegistered(session.sessionDbId, session.memorySessionId);
const memorySessionIdRegistered = sessionStore.ensureMemorySessionIdRegistered(session.sessionDbId, session.memorySessionId);
if (!memorySessionIdRegistered) {
// The DB still holds the old memory_session_id because child rows exist.
// Use the DB's current value so FK constraints are satisfied.
const currentSession = sessionStore.getSessionById(session.sessionDbId);
if (currentSession?.memory_session_id) {
logger.warn('DB', 'Using existing memory_session_id from DB for storage (child-row guard)', {
sessionDbId: session.sessionDbId,
requested: session.memorySessionId,
using: currentSession.memory_session_id
});
session.memorySessionId = currentSession.memory_session_id;
} else {
throw new Error(`Cannot store observations: memory_session_id update was skipped and no existing ID found for session ${session.sessionDbId}`);
}
}
// Log pre-storage with session ID chain for verification
logger.info('DB', `STORING | sessionDbId=${session.sessionDbId} | memorySessionId=${session.memorySessionId} | obsCount=${observations.length} | hasSummary=${!!summaryForStore}`, {