mirror of
https://github.com/thedotmack/claude-mem
synced 2026-04-25 17:15:04 +02:00
Fail-fast parser, direct in-process ingest, recursive fs.watch,
DB-backed tool pairing. Worker-internal HTTP loopback eliminated.
- Phase 0: Created src/services/worker/http/shared.ts exporting
ingestObservation/ingestPrompt/ingestSummary as direct
in-process functions plus ingestEventBus (Node EventEmitter,
reusing existing pattern — no third event bus introduced).
setIngestContext wires the SessionManager dependency from
worker-service constructor.
- Phase 1: src/sdk/parser.ts collapsed to one parseAgentXml
returning { valid:true; kind: 'observation'|'summary'; data }
| { valid:false; reason: string }. Inspects root element;
<skip_summary reason="…"/> is a first-class summary case
with skipped:true. NEVER returns undefined. NEVER coerces.
- Phase 2: ResponseProcessor calls parseAgentXml exactly once,
branches on the discriminated union. On invalid → markFailed
+ logger.warn(reason). On observation → ingestObservation.
On summary → ingestSummary then emit summaryStoredEvent
{ sessionId, messageId } (consumed by Plan 05's blocking
/api/session/end).
- Phase 3: Deleted consecutiveSummaryFailures field
(ResponseProcessor + SessionManager + worker-types) and
MAX_CONSECUTIVE_SUMMARY_FAILURES constant. Circuit-breaker
guards and "tripped" log lines removed.
- Phase 4: coerceObservationToSummary deleted from sdk/parser.ts.
- Phase 5: src/services/transcripts/watcher.ts rescan setInterval
replaced with fs.watch(transcriptsRoot, { recursive: true,
persistent: true }) — Node 20+ recursive mode.
- Phase 6: src/services/transcripts/processor.ts pendingTools
Map deleted. tool_use rows insert with INSERT OR IGNORE on
UNIQUE(session_id, tool_use_id) (added by Plan 01). New
pairToolUsesByJoin query in PendingMessageStore for read-time
pairing (UNIQUE INDEX provides idempotency; explicit consumer
not yet wired).
- Phase 7: HTTP loopback at processor.ts:252 replaced with
direct ingestObservation call. maybeParseJson silent-passthrough
rewritten to fail-fast (throws on malformed JSON).
- Phase 8: src/utils/tag-stripping.ts countTags + stripTagsInternal
collapsed into one alternation regex, single-pass over input.
- Phase 9: src/utils/transcript-parser.ts (dead TranscriptParser
class) deleted. The active extractLastMessage at
src/shared/transcript-parser.ts:41-144 is the sole survivor.
Tests updated (Principle 7 — same-PR delete):
- tests/sdk/parser.test.ts + parse-summary.test.ts: rewritten
to assert discriminated-union shape; coercion-specific
scenarios collapse into { valid:false } assertions.
- tests/worker/agents/response-processor.test.ts: circuit-breaker
describe block skipped; non-XML/empty-response tests assert
fail-fast markFailed behavior.
Verification: every grep returns 0. transcript-parser.ts deleted.
bun run build succeeds. bun test → 1399 pass / 28 fail / 7 skip
(net -8 pass = the 4 retired circuit-breaker tests + 4 collapsed
parser cases). Zero new failures vs baseline.
Deferred (out of Plan 03 scope, will land in Plan 06): SessionRoutes
HTTP route handlers still call sessionManager.queueObservation
inline rather than the new shared helpers — the helpers are ready,
the route swap is mechanical and belongs with the Zod refactor.
Plan: PATHFINDER-2026-04-22/03-ingestion-path.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
383 lines
14 KiB
TypeScript
383 lines
14 KiB
TypeScript
import path from 'path';
|
|
import { sessionInitHandler } from '../../cli/handlers/session-init.js';
|
|
import { fileEditHandler } from '../../cli/handlers/file-edit.js';
|
|
import { sessionCompleteHandler } from '../../cli/handlers/session-complete.js';
|
|
import { ensureWorkerRunning, workerHttpRequest } from '../../shared/worker-utils.js';
|
|
import { DATA_DIR } from '../../shared/paths.js';
|
|
import { logger } from '../../utils/logger.js';
|
|
import { getProjectContext } from '../../utils/project-name.js';
|
|
import { writeAgentsMd } from '../../utils/agents-md-utils.js';
|
|
import { resolveFieldSpec, resolveFields, matchesRule } from './field-utils.js';
|
|
import { expandHomePath } from './config.js';
|
|
import type { TranscriptSchema, WatchTarget, SchemaEvent } from './types.js';
|
|
import { normalizePlatformSource } from '../../shared/platform-source.js';
|
|
import { ingestObservation } from '../worker/http/shared.js';
|
|
|
|
interface SessionState {
|
|
sessionId: string;
|
|
platformSource: string;
|
|
cwd?: string;
|
|
project?: string;
|
|
lastUserMessage?: string;
|
|
lastAssistantMessage?: string;
|
|
}
|
|
|
|
export class TranscriptEventProcessor {
|
|
private sessions = new Map<string, SessionState>();
|
|
|
|
async processEntry(
|
|
entry: unknown,
|
|
watch: WatchTarget,
|
|
schema: TranscriptSchema,
|
|
sessionIdOverride?: string | null
|
|
): Promise<void> {
|
|
for (const event of schema.events) {
|
|
if (!matchesRule(entry, event.match, schema)) continue;
|
|
await this.handleEvent(entry, watch, schema, event, sessionIdOverride ?? undefined);
|
|
}
|
|
}
|
|
|
|
private getSessionKey(watch: WatchTarget, sessionId: string): string {
|
|
return `${watch.name}:${sessionId}`;
|
|
}
|
|
|
|
private getOrCreateSession(watch: WatchTarget, sessionId: string): SessionState {
|
|
const key = this.getSessionKey(watch, sessionId);
|
|
let session = this.sessions.get(key);
|
|
if (!session) {
|
|
session = {
|
|
sessionId,
|
|
platformSource: normalizePlatformSource(watch.name),
|
|
};
|
|
this.sessions.set(key, session);
|
|
}
|
|
return session;
|
|
}
|
|
|
|
private resolveSessionId(
|
|
entry: unknown,
|
|
watch: WatchTarget,
|
|
schema: TranscriptSchema,
|
|
event: SchemaEvent,
|
|
sessionIdOverride?: string
|
|
): string | null {
|
|
const ctx = { watch, schema } as any;
|
|
const fieldSpec = event.fields?.sessionId ?? (schema.sessionIdPath ? { path: schema.sessionIdPath } : undefined);
|
|
const resolved = resolveFieldSpec(fieldSpec, entry, ctx);
|
|
if (typeof resolved === 'string' && resolved.trim()) return resolved;
|
|
if (typeof resolved === 'number') return String(resolved);
|
|
if (sessionIdOverride && sessionIdOverride.trim()) return sessionIdOverride;
|
|
return null;
|
|
}
|
|
|
|
private resolveCwd(
|
|
entry: unknown,
|
|
watch: WatchTarget,
|
|
schema: TranscriptSchema,
|
|
event: SchemaEvent,
|
|
session: SessionState
|
|
): string | undefined {
|
|
const ctx = { watch, schema, session } as any;
|
|
const fieldSpec = event.fields?.cwd ?? (schema.cwdPath ? { path: schema.cwdPath } : undefined);
|
|
const resolved = resolveFieldSpec(fieldSpec, entry, ctx);
|
|
if (typeof resolved === 'string' && resolved.trim()) return resolved;
|
|
if (watch.workspace) return watch.workspace;
|
|
return session.cwd;
|
|
}
|
|
|
|
private resolveProject(
|
|
entry: unknown,
|
|
watch: WatchTarget,
|
|
schema: TranscriptSchema,
|
|
event: SchemaEvent,
|
|
session: SessionState
|
|
): string | undefined {
|
|
const ctx = { watch, schema, session } as any;
|
|
const fieldSpec = event.fields?.project ?? (schema.projectPath ? { path: schema.projectPath } : undefined);
|
|
const resolved = resolveFieldSpec(fieldSpec, entry, ctx);
|
|
if (typeof resolved === 'string' && resolved.trim()) return resolved;
|
|
if (watch.project) return watch.project;
|
|
if (session.cwd) return getProjectContext(session.cwd).primary;
|
|
return session.project;
|
|
}
|
|
|
|
private async handleEvent(
|
|
entry: unknown,
|
|
watch: WatchTarget,
|
|
schema: TranscriptSchema,
|
|
event: SchemaEvent,
|
|
sessionIdOverride?: string
|
|
): Promise<void> {
|
|
const sessionId = this.resolveSessionId(entry, watch, schema, event, sessionIdOverride);
|
|
if (!sessionId) {
|
|
logger.debug('TRANSCRIPT', 'Skipping event without sessionId', { event: event.name, watch: watch.name });
|
|
return;
|
|
}
|
|
|
|
const session = this.getOrCreateSession(watch, sessionId);
|
|
const cwd = this.resolveCwd(entry, watch, schema, event, session);
|
|
if (cwd) session.cwd = cwd;
|
|
const project = this.resolveProject(entry, watch, schema, event, session);
|
|
if (project) session.project = project;
|
|
|
|
const fields = resolveFields(event.fields, entry, { watch, schema, session });
|
|
|
|
switch (event.action) {
|
|
case 'session_context':
|
|
this.applySessionContext(session, fields);
|
|
break;
|
|
case 'session_init':
|
|
await this.handleSessionInit(session, fields);
|
|
if (watch.context?.updateOn?.includes('session_start')) {
|
|
await this.updateContext(session, watch);
|
|
}
|
|
break;
|
|
case 'user_message':
|
|
if (typeof fields.message === 'string') session.lastUserMessage = fields.message;
|
|
if (typeof fields.prompt === 'string') session.lastUserMessage = fields.prompt;
|
|
break;
|
|
case 'assistant_message':
|
|
if (typeof fields.message === 'string') session.lastAssistantMessage = fields.message;
|
|
break;
|
|
case 'tool_use':
|
|
await this.handleToolUse(session, fields);
|
|
break;
|
|
case 'tool_result':
|
|
await this.handleToolResult(session, fields);
|
|
break;
|
|
case 'observation':
|
|
await this.sendObservation(session, fields);
|
|
break;
|
|
case 'file_edit':
|
|
await this.sendFileEdit(session, fields);
|
|
break;
|
|
case 'session_end':
|
|
await this.handleSessionEnd(session, watch);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
private applySessionContext(session: SessionState, fields: Record<string, unknown>): void {
|
|
const cwd = typeof fields.cwd === 'string' ? fields.cwd : undefined;
|
|
const project = typeof fields.project === 'string' ? fields.project : undefined;
|
|
if (cwd) session.cwd = cwd;
|
|
if (project) session.project = project;
|
|
}
|
|
|
|
private async handleSessionInit(session: SessionState, fields: Record<string, unknown>): Promise<void> {
|
|
const prompt = typeof fields.prompt === 'string' ? fields.prompt : '';
|
|
const cwd = session.cwd ?? process.cwd();
|
|
if (prompt) {
|
|
session.lastUserMessage = prompt;
|
|
}
|
|
|
|
await sessionInitHandler.execute({
|
|
sessionId: session.sessionId,
|
|
cwd,
|
|
prompt,
|
|
platform: session.platformSource
|
|
});
|
|
}
|
|
|
|
private async handleToolUse(session: SessionState, fields: Record<string, unknown>): Promise<void> {
|
|
const toolId = typeof fields.toolId === 'string' ? fields.toolId : undefined;
|
|
const toolName = typeof fields.toolName === 'string' ? fields.toolName : undefined;
|
|
const toolInput = this.maybeParseJson(fields.toolInput);
|
|
const toolResponse = this.maybeParseJson(fields.toolResponse);
|
|
|
|
if (toolName === 'apply_patch' && typeof toolInput === 'string') {
|
|
const files = this.parseApplyPatchFiles(toolInput);
|
|
for (const filePath of files) {
|
|
await this.sendFileEdit(session, {
|
|
filePath,
|
|
edits: [{ type: 'apply_patch', patch: toolInput }]
|
|
});
|
|
}
|
|
}
|
|
|
|
// PATHFINDER plan 03 phase 6: per-process tool pairing Map deleted.
|
|
// Each event emits independently; the database's
|
|
// UNIQUE(content_session_id, tool_use_id) index makes the insert
|
|
// idempotent so duplicate observations from overlapping
|
|
// tool_use + tool_result lines collapse to a single row.
|
|
if (toolResponse !== undefined && toolName) {
|
|
await this.sendObservation(session, {
|
|
toolName,
|
|
toolInput,
|
|
toolResponse,
|
|
toolUseId: toolId,
|
|
});
|
|
}
|
|
}
|
|
|
|
private async handleToolResult(session: SessionState, fields: Record<string, unknown>): Promise<void> {
|
|
const toolId = typeof fields.toolId === 'string' ? fields.toolId : undefined;
|
|
const toolName = typeof fields.toolName === 'string' ? fields.toolName : undefined;
|
|
const toolResponse = this.maybeParseJson(fields.toolResponse);
|
|
const toolInput = this.maybeParseJson(fields.toolInput);
|
|
|
|
if (toolName) {
|
|
await this.sendObservation(session, {
|
|
toolName,
|
|
toolInput,
|
|
toolResponse,
|
|
toolUseId: toolId,
|
|
});
|
|
}
|
|
}
|
|
|
|
private async sendObservation(session: SessionState, fields: Record<string, unknown>): Promise<void> {
|
|
const toolName = typeof fields.toolName === 'string' ? fields.toolName : undefined;
|
|
if (!toolName) return;
|
|
|
|
// PATHFINDER plan 03 phase 7: replace HTTP loopback (worker → its own
|
|
// /api/sessions/observations endpoint) with a direct in-process call to
|
|
// ingestObservation. Same implementation backs the cross-process HTTP
|
|
// route handler (one helper, N callers).
|
|
const result = ingestObservation({
|
|
contentSessionId: session.sessionId,
|
|
cwd: session.cwd ?? process.cwd(),
|
|
toolName,
|
|
toolInput: this.maybeParseJson(fields.toolInput),
|
|
toolResponse: this.maybeParseJson(fields.toolResponse),
|
|
platformSource: session.platformSource,
|
|
toolUseId: typeof fields.toolUseId === 'string' ? fields.toolUseId : undefined,
|
|
});
|
|
|
|
if (!result.ok) {
|
|
throw new Error(`ingestObservation failed: ${result.reason}`);
|
|
}
|
|
}
|
|
|
|
private async sendFileEdit(session: SessionState, fields: Record<string, unknown>): Promise<void> {
|
|
const filePath = typeof fields.filePath === 'string' ? fields.filePath : undefined;
|
|
if (!filePath) return;
|
|
|
|
await fileEditHandler.execute({
|
|
sessionId: session.sessionId,
|
|
cwd: session.cwd ?? process.cwd(),
|
|
filePath,
|
|
edits: Array.isArray(fields.edits) ? fields.edits : undefined,
|
|
platform: session.platformSource
|
|
});
|
|
}
|
|
|
|
private maybeParseJson(value: unknown): unknown {
|
|
if (typeof value !== 'string') return value;
|
|
const trimmed = value.trim();
|
|
if (!trimmed) return value;
|
|
if (!(trimmed.startsWith('{') || trimmed.startsWith('['))) return value;
|
|
// PATHFINDER plan 03 phase 7: fail-fast. Strings that look like JSON but
|
|
// do not parse are a contract violation, not a passthrough — throw so the
|
|
// upstream `try` in `handleLine` logs and the caller decides.
|
|
return JSON.parse(trimmed);
|
|
}
|
|
|
|
private parseApplyPatchFiles(patch: string): string[] {
|
|
const files: string[] = [];
|
|
const lines = patch.split('\n');
|
|
for (const line of lines) {
|
|
const trimmed = line.trim();
|
|
if (trimmed.startsWith('*** Update File: ')) {
|
|
files.push(trimmed.replace('*** Update File: ', '').trim());
|
|
} else if (trimmed.startsWith('*** Add File: ')) {
|
|
files.push(trimmed.replace('*** Add File: ', '').trim());
|
|
} else if (trimmed.startsWith('*** Delete File: ')) {
|
|
files.push(trimmed.replace('*** Delete File: ', '').trim());
|
|
} else if (trimmed.startsWith('*** Move to: ')) {
|
|
files.push(trimmed.replace('*** Move to: ', '').trim());
|
|
} else if (trimmed.startsWith('+++ ')) {
|
|
const path = trimmed.replace('+++ ', '').replace(/^b\//, '').trim();
|
|
if (path && path !== '/dev/null') files.push(path);
|
|
}
|
|
}
|
|
return Array.from(new Set(files));
|
|
}
|
|
|
|
private async handleSessionEnd(session: SessionState, watch: WatchTarget): Promise<void> {
|
|
await this.queueSummary(session);
|
|
await sessionCompleteHandler.execute({
|
|
sessionId: session.sessionId,
|
|
cwd: session.cwd ?? process.cwd(),
|
|
platform: session.platformSource
|
|
});
|
|
await this.updateContext(session, watch);
|
|
const key = this.getSessionKey(watch, session.sessionId);
|
|
this.sessions.delete(key);
|
|
}
|
|
|
|
private async queueSummary(session: SessionState): Promise<void> {
|
|
const workerReady = await ensureWorkerRunning();
|
|
if (!workerReady) return;
|
|
|
|
const lastAssistantMessage = session.lastAssistantMessage ?? '';
|
|
const requestBody = JSON.stringify({
|
|
contentSessionId: session.sessionId,
|
|
last_assistant_message: lastAssistantMessage,
|
|
platformSource: session.platformSource
|
|
});
|
|
|
|
try {
|
|
await workerHttpRequest('/api/sessions/summarize', {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: requestBody
|
|
});
|
|
} catch (error: unknown) {
|
|
logger.warn('TRANSCRIPT', 'Summary request failed', {
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
}
|
|
}
|
|
|
|
private async updateContext(session: SessionState, watch: WatchTarget): Promise<void> {
|
|
if (!watch.context) return;
|
|
if (watch.context.mode !== 'agents') return;
|
|
|
|
const workerReady = await ensureWorkerRunning();
|
|
if (!workerReady) return;
|
|
|
|
const cwd = session.cwd ?? watch.workspace;
|
|
if (!cwd) return;
|
|
|
|
const context = getProjectContext(cwd);
|
|
const projectsParam = context.allProjects.join(',');
|
|
|
|
const contextUrl = `/api/context/inject?projects=${encodeURIComponent(projectsParam)}`;
|
|
const agentsPath = expandHomePath(watch.context.path ?? `${cwd}/AGENTS.md`);
|
|
|
|
// Validate resolved path stays within allowed directories (#1934)
|
|
const resolvedAgentsPath = path.resolve(agentsPath);
|
|
const allowedRoots = [path.resolve(cwd), path.resolve(DATA_DIR)];
|
|
const isPathSafe = allowedRoots.some(root => resolvedAgentsPath.startsWith(root + path.sep) || resolvedAgentsPath === root);
|
|
if (!isPathSafe) {
|
|
logger.warn('SECURITY', 'Rejected path traversal attempt in watch.context.path', {
|
|
original: watch.context.path,
|
|
resolved: resolvedAgentsPath,
|
|
allowedRoots
|
|
});
|
|
return;
|
|
}
|
|
|
|
let response: Awaited<ReturnType<typeof workerHttpRequest>>;
|
|
try {
|
|
response = await workerHttpRequest(contextUrl);
|
|
} catch (error: unknown) {
|
|
logger.warn('TRANSCRIPT', 'Failed to fetch AGENTS.md context', {
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (!response.ok) return;
|
|
|
|
const content = (await response.text()).trim();
|
|
if (!content) return;
|
|
|
|
writeAgentsMd(agentsPath, content);
|
|
logger.debug('TRANSCRIPT', 'Updated AGENTS.md context', { agentsPath, watch: watch.name });
|
|
}
|
|
}
|