fix: address Greptile P2 findings (iter 2)

- PendingMessageStore.transitionMessagesTo: require sessionDbId (drop
  the unscoped-drain branch that would nuke every pending/processing
  row across all sessions if a future caller omitted the filter).
- IngestEventBus.takeRecentSummaryStored: make idempotent — keep the
  cached event until TTL eviction so a retried Stop hook's second
  /api/session/end returns immediately instead of hanging 30 s.
- TranscriptWatcher fs.watch callback: skip full glob scan for paths
  already tailed (JSONL appends fire on every line; only unknown
  paths warrant a rescan).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-04-24 14:26:18 -07:00
parent 34a7f2021f
commit 9a9ccc9da5
4 changed files with 74 additions and 78 deletions

File diff suppressed because one or more lines are too long

View File

@@ -230,27 +230,19 @@ export class PendingMessageStore {
* old per-status wrapper methods were deleted in the same PR).
*
* @param status `'failed'` (processing-only) or `'abandoned'` (pending+processing)
* @param filter `{ sessionDbId?: number }` — scope to one session's rows
* @param filter `{ sessionDbId: number }` — scope to one session's rows.
* Required: no unscoped path exists, to prevent accidental global drain.
* @returns Number of rows updated
*/
transitionMessagesTo(
status: 'failed' | 'abandoned',
filter: { sessionDbId?: number }
filter: { sessionDbId: number }
): number {
const now = Date.now();
const statusClause = status === 'failed'
? `status = 'processing'`
: `status IN ('pending', 'processing')`;
if (filter.sessionDbId === undefined) {
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'failed', failed_at_epoch = ?
WHERE ${statusClause}
`);
return stmt.run(now).changes;
}
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'failed', failed_at_epoch = ?

View File

@@ -134,9 +134,12 @@ export class TranscriptWatcher {
try {
const watcher = fsWatch(watchRoot, { recursive: true, persistent: true }, (event, name) => {
if (!name) return; // some events omit filename
// Re-resolve the configured path; new files surface here. Restricting
// to the configured pattern keeps unrelated edits in the watched root
// from triggering tailer churn.
// Skip the glob scan for paths we already tail — JSONL appends fire
// here on every line and a full resolveWatchFiles() per append is
// more expensive than the prior 5-s interval. Only unknown paths
// warrant a rescan (new transcript files surface here first).
const changed = resolvePath(watchRoot, name);
if (this.tailers.has(changed)) return;
const matches = this.resolveWatchFiles(resolvedPath);
for (const filePath of matches) {
if (!this.tailers.has(filePath)) {

View File

@@ -53,7 +53,10 @@ class IngestEventBus extends EventEmitter {
* happens between the two requests. The session-end handler consults this
* buffer before registering to catch any already-fired event.
*
* Entries are evicted after RECENT_EVENT_TTL_MS or when consumed.
* Entries are evicted only on TTL expiry (RECENT_EVENT_TTL_MS) so that a
* retried Stop hook — which opens a second `/api/session/end` — still sees
* the already-emitted event instead of hanging 30 s for a replay that will
* never come.
*/
private readonly recentStored = new Map<string, { event: SummaryStoredEvent; at: number }>();
private static readonly RECENT_EVENT_TTL_MS = 60_000;
@@ -70,12 +73,14 @@ class IngestEventBus extends EventEmitter {
});
}
/** Retrieve and remove a recently-emitted summaryStoredEvent, if any. */
/** Read a recently-emitted summaryStoredEvent (idempotent; TTL-evicted). */
takeRecentSummaryStored(sessionId: string): SummaryStoredEvent | undefined {
const entry = this.recentStored.get(sessionId);
if (!entry) return undefined;
this.recentStored.delete(sessionId);
if (Date.now() - entry.at > IngestEventBus.RECENT_EVENT_TTL_MS) return undefined;
if (Date.now() - entry.at > IngestEventBus.RECENT_EVENT_TTL_MS) {
this.recentStored.delete(sessionId);
return undefined;
}
return entry.event;
}