fix: address Greptile iteration 3 (P1 + 2× P2)

- P1 worker-service.ts: wire ensureGeneratorRunning into the ingest
  context after SessionRoutes is constructed. setIngestContext runs
  before routes exist, so transcript-watcher observations queued via
  ingestObservation() had no way to auto-start the SDK generator.
  Added attachIngestGeneratorStarter() to patch the callback in.
- P2 shared.ts: IngestEventBus now sets maxListeners to 0. Concurrent
  /api/session/end calls register one listener each and clean up on
  completion, so the default-10 warning fires spuriously under normal
  load.
- P2 SessionRoutes.ts: handleObservationsByClaudeId now delegates to
  ingestObservation() instead of duplicating skip-tool / meta /
  privacy / queue logic. Single helper, matching the Plan 03 goal.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-04-23 18:58:06 -07:00
parent f991271c74
commit 87e58db588
4 changed files with 214 additions and 248 deletions

File diff suppressed because one or more lines are too long

View File

@@ -87,7 +87,7 @@ import { SearchManager } from './worker/SearchManager.js';
import { FormattingService } from './worker/FormattingService.js';
import { TimelineService } from './worker/TimelineService.js';
import { SessionEventBroadcaster } from './worker/events/SessionEventBroadcaster.js';
import { setIngestContext } from './worker/http/shared.js';
import { setIngestContext, attachIngestGeneratorStarter } from './worker/http/shared.js';
import { DEFAULT_CONFIG_PATH, DEFAULT_STATE_PATH, expandHomePath, loadTranscriptWatchConfig, writeSampleConfig } from './transcripts/config.js';
import { TranscriptWatcher } from './transcripts/watcher.js';
@@ -311,7 +311,15 @@ export class WorkerService {
// Standard routes (registered AFTER guard middleware)
this.server.registerRoutes(new ViewerRoutes(this.sseBroadcaster, this.dbManager, this.sessionManager));
this.server.registerRoutes(new SessionRoutes(this.sessionManager, this.dbManager, this.sdkAgent, this.geminiAgent, this.openRouterAgent, this.sessionEventBroadcaster, this));
const sessionRoutes = new SessionRoutes(this.sessionManager, this.dbManager, this.sdkAgent, this.geminiAgent, this.openRouterAgent, this.sessionEventBroadcaster, this);
this.server.registerRoutes(sessionRoutes);
// Wire the generator-starter callback now that SessionRoutes exists.
// `setIngestContext` ran in the constructor before routes were
// constructed; transcript-watcher observations depend on this side-effect
// to auto-start the SDK generator after enqueue.
attachIngestGeneratorStarter((sessionDbId, source) =>
sessionRoutes.ensureGeneratorRunning(sessionDbId, source),
);
this.server.registerRoutes(new DataRoutes(this.paginationHelper, this.dbManager, this.sessionManager, this.sseBroadcaster, this, this.startTime));
this.server.registerRoutes(new SettingsRoutes(this.settingsManager));
this.server.registerRoutes(new LogsRoutes());

View File

@@ -7,7 +7,7 @@
import express, { Request, Response } from 'express';
import { z } from 'zod';
import { ingestEventBus, type SummaryStoredEvent } from '../shared.js';
import { ingestEventBus, ingestObservation, type SummaryStoredEvent } from '../shared.js';
import { validateBody } from '../middleware/validateBody.js';
import { getWorkerPort } from '../../../../shared/worker-utils.js';
import { logger } from '../../../../utils/logger.js';
@@ -100,7 +100,7 @@ export class SessionRoutes extends BaseRouteHandler {
private static readonly STALE_GENERATOR_THRESHOLD_MS = 30_000; // 30 seconds (#1099)
private static readonly MAX_SESSION_WALL_CLOCK_MS = 4 * 60 * 60 * 1000; // 4 hours (#1590)
private ensureGeneratorRunning(sessionDbId: number, source: string): void {
public ensureGeneratorRunning(sessionDbId: number, source: string): void {
const session = this.sessionManager.getSession(sessionDbId);
if (!session) return;
@@ -712,94 +712,28 @@ export class SessionRoutes extends BaseRouteHandler {
* Body: { contentSessionId, tool_name, tool_input, tool_response, cwd }
*/
private handleObservationsByClaudeId = this.wrapHandler((req: Request, res: Response): void => {
const { contentSessionId, tool_name, tool_input, tool_response, cwd, agentId, agentType } = req.body;
const platformSource = normalizePlatformSource(req.body.platformSource);
const project = typeof cwd === 'string' && cwd.trim() ? getProjectContext(cwd).primary : '';
const { contentSessionId, tool_name, tool_input, tool_response, cwd, platformSource, agentId, agentType } = req.body;
// Load skip tools from settings
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
const skipTools = new Set(settings.CLAUDE_MEM_SKIP_TOOLS.split(',').map(t => t.trim()).filter(Boolean));
// Skip low-value or meta tools
if (skipTools.has(tool_name)) {
logger.debug('SESSION', 'Skipping observation for tool', { tool_name });
res.json({ status: 'skipped', reason: 'tool_excluded' });
return;
}
// Skip meta-observations: file operations on session-memory files
const fileOperationTools = new Set(['Edit', 'Write', 'Read', 'NotebookEdit']);
if (fileOperationTools.has(tool_name) && tool_input) {
const filePath = tool_input.file_path || tool_input.notebook_path;
if (filePath && filePath.includes('session-memory')) {
logger.debug('SESSION', 'Skipping meta-observation for session-memory file', {
tool_name,
file_path: filePath
});
res.json({ status: 'skipped', reason: 'session_memory_meta' });
return;
}
}
const store = this.dbManager.getSessionStore();
let sessionDbId: number;
let promptNumber: number;
try {
sessionDbId = store.createSDKSession(contentSessionId, project, '', undefined, platformSource);
promptNumber = store.getPromptNumberFromUserPrompts(contentSessionId);
} catch (error) {
const normalizedError = error instanceof Error ? error : new Error(String(error));
logger.error('HTTP', 'Observation storage failed', { contentSessionId, tool_name }, normalizedError);
res.json({ stored: false, reason: normalizedError.message });
return;
}
// Privacy check: skip if user prompt was entirely private
const userPrompt = PrivacyCheckValidator.checkUserPromptPrivacy(
store,
const result = ingestObservation({
contentSessionId,
promptNumber,
'observation',
sessionDbId,
{ tool_name }
);
if (!userPrompt) {
res.json({ status: 'skipped', reason: 'private' });
return;
}
// Strip memory tags from tool_input and tool_response
const cleanedToolInput = tool_input !== undefined
? stripMemoryTagsFromJson(JSON.stringify(tool_input))
: '{}';
const cleanedToolResponse = tool_response !== undefined
? stripMemoryTagsFromJson(JSON.stringify(tool_response))
: '{}';
// Queue observation
this.sessionManager.queueObservation(sessionDbId, {
tool_name,
tool_input: cleanedToolInput,
tool_response: cleanedToolResponse,
prompt_number: promptNumber,
cwd: cwd || (() => {
logger.error('SESSION', 'Missing cwd when queueing observation in SessionRoutes', {
sessionId: sessionDbId,
tool_name
});
return '';
})(),
agentId: typeof agentId === 'string' ? agentId : undefined,
agentType: typeof agentType === 'string' ? agentType : undefined,
toolName: tool_name,
toolInput: tool_input,
toolResponse: tool_response,
cwd,
platformSource,
agentId,
agentType,
});
// Ensure SDK agent is running
this.ensureGeneratorRunning(sessionDbId, 'observation');
if (!result.ok) {
res.status(result.status ?? 500).json({ stored: false, reason: result.reason });
return;
}
// Broadcast observation queued event
this.eventBroadcaster.broadcastObservationQueued(sessionDbId);
if ('status' in result && result.status === 'skipped') {
res.json({ status: 'skipped', reason: result.reason });
return;
}
res.json({ status: 'queued' });
});

View File

@@ -45,7 +45,15 @@ export interface SummaryStoredEvent {
messageId: number;
}
class IngestEventBus extends EventEmitter {}
class IngestEventBus extends EventEmitter {
constructor() {
super();
// Listener count is bounded by concurrent /api/session/end calls and they
// all clean up on completion. Disable the default 10-listener warning so
// normal load doesn't look like a leak in monitoring.
this.setMaxListeners(0);
}
}
/**
* Process-local event bus for ingestion lifecycle events.
@@ -81,6 +89,22 @@ export function setIngestContext(next: IngestContext): void {
ctx = next;
}
/**
* Attach the generator-running callback after `SessionRoutes` has been
* constructed. `setIngestContext` is called early in `WorkerService` startup
* (before routes exist), so the callback is wired in as a second step once
* `SessionRoutes.ensureGeneratorRunning` is available.
*
* Without this, transcript-watcher observations queue via
* `ingestObservation()` but the SDK generator never auto-starts to drain
* them.
*/
export function attachIngestGeneratorStarter(
ensureGeneratorRunning: (sessionDbId: number, source: string) => void,
): void {
requireContext().ensureGeneratorRunning = ensureGeneratorRunning;
}
function requireContext(): IngestContext {
if (!ctx) {
throw new Error('ingest helpers used before setIngestContext() — wiring bug');