mirror of
https://github.com/thedotmack/claude-mem
synced 2026-04-25 17:15:04 +02:00
refactor: land PATHFINDER Plan 02 — process lifecycle
OS process groups replace hand-rolled reapers. Worker runs until
killed; orphans are prevented by detached spawn + kill(-pgid).
- Phase 1: src/services/worker/ProcessRegistry.ts DELETED. The
canonical registry at src/supervisor/process-registry.ts is the
sole survivor; SDK spawn site consolidated into it via new
createSdkSpawnFactory/spawnSdkProcess/getSdkProcessForSession/
ensureSdkProcessExit/waitForSlot helpers.
- Phase 2: SDK children spawn with detached:true + stdio:
['ignore','pipe','pipe']; pgid recorded on ManagedProcessInfo.
- Phase 3: shutdown.ts signalProcess teardown uses
process.kill(-pgid, signal) on Unix when pgid is recorded;
Windows path unchanged (tree-kill/taskkill).
- Phase 4: all reaper intervals deleted — startOrphanReaper call,
staleSessionReaperInterval setInterval (including the co-located
WAL checkpoint — SQLite's built-in wal_autocheckpoint handles
WAL growth without an app-level timer), killIdleDaemonChildren,
killSystemOrphans, reapOrphanedProcesses, reapStaleSessions, and
detectStaleGenerator. MAX_GENERATOR_IDLE_MS and MAX_SESSION_IDLE_MS
constants deleted.
- Phase 5: abandonedTimer — already 0 matches; primary-path cleanup
via generatorPromise.finally() already lives in worker-service
startSessionProcessor and SessionRoutes ensureGeneratorRunning.
- Phase 6: evictIdlestSession and its evict callback deleted from
SessionManager. Pool admission gates backpressure upstream.
- Phase 7: SDK-failure fallback — SessionManager has zero matches
for fallbackAgent/Gemini/OpenRouter. Failures surface to hooks
via exit code 2 through SessionRoutes error mapping.
- Phase 8: ensureWorkerRunning in worker-utils.ts rewritten to
lazy-spawn — consults isWorkerPortAlive (which gates
captureProcessStartToken for PID-reuse safety via commit
99060bac), then spawns detached with unref(), then
waitForWorkerPort({ attempts: 3, backoffMs: 250 }) hand-rolled
exponential backoff 250→500→1000ms. No respawn npm dep.
- Phase 9: idle self-shutdown — zero matches for
idleCheck/idleTimeout/IDLE_MAX_MS/idleShutdown. Worker exits
only on external SIGTERM via supervisor signal handlers.
Three test files that exercised deleted code removed:
tests/worker/process-registry.test.ts,
tests/worker/session-lifecycle-guard.test.ts,
tests/services/worker/reap-stale-sessions.test.ts.
Pass count: 1451 → 1407 (-44), all attributable to deleted test
files. Zero new failures. 31 pre-existing failures remain
(schema-repair suite, logger-usage-standards, environmental
openclaw / plugin-distribution) — none introduced by Plan 02.
All 10 verification greps return 0. bun run build succeeds.
Plan: PATHFINDER-2026-04-22/02-process-lifecycle.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -105,8 +105,11 @@ import { CorpusStore } from './worker/knowledge/CorpusStore.js';
|
||||
import { CorpusBuilder } from './worker/knowledge/CorpusBuilder.js';
|
||||
import { KnowledgeAgent } from './worker/knowledge/KnowledgeAgent.js';
|
||||
|
||||
// Process management for zombie cleanup (Issue #737)
|
||||
import { startOrphanReaper, reapOrphanedProcesses, getProcessBySession, ensureProcessExit } from './worker/ProcessRegistry.js';
|
||||
// Primary-path session lifecycle helpers — no reapers, no orphan sweeps.
|
||||
// The SDK subprocess is spawned in its own POSIX process group via
|
||||
// createSdkSpawnFactory; teardown via ensureSdkProcessExit kills the whole
|
||||
// group so no descendants leak (Principle 5).
|
||||
import { getSdkProcessForSession, ensureSdkProcessExit } from '../supervisor/process-registry.js';
|
||||
|
||||
/**
|
||||
* Build JSON status output for hook framework communication.
|
||||
@@ -167,12 +170,6 @@ export class WorkerService {
|
||||
private initializationComplete: Promise<void>;
|
||||
private resolveInitialization!: () => void;
|
||||
|
||||
// Orphan reaper cleanup function (Issue #737)
|
||||
private stopOrphanReaper: (() => void) | null = null;
|
||||
|
||||
// Stale session reaper interval (Issue #1168)
|
||||
private staleSessionReaperInterval: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
// AI interaction tracking for health endpoint
|
||||
private lastAiInteraction: {
|
||||
timestamp: number;
|
||||
@@ -530,43 +527,15 @@ export class WorkerService {
|
||||
}
|
||||
logger.success('WORKER', 'MCP loopback self-check connected');
|
||||
|
||||
// Start orphan reaper to clean up zombie processes (Issue #737)
|
||||
this.stopOrphanReaper = startOrphanReaper(() => {
|
||||
const activeIds = new Set<number>();
|
||||
for (const [id] of this.sessionManager['sessions']) {
|
||||
activeIds.add(id);
|
||||
}
|
||||
return activeIds;
|
||||
});
|
||||
logger.info('SYSTEM', 'Started orphan reaper (runs every 30 seconds)');
|
||||
|
||||
// Reap stale sessions to unblock orphan process cleanup (Issue #1168)
|
||||
this.staleSessionReaperInterval = setInterval(async () => {
|
||||
try {
|
||||
const reaped = await this.sessionManager.reapStaleSessions();
|
||||
if (reaped > 0) {
|
||||
logger.info('SYSTEM', `Reaped ${reaped} stale sessions`);
|
||||
}
|
||||
} catch (e) {
|
||||
// [ANTI-PATTERN IGNORED]: setInterval callback cannot throw; reaper retries on next tick (every 2 min)
|
||||
if (e instanceof Error) {
|
||||
logger.error('WORKER', 'Stale session reaper error', {}, e);
|
||||
} else {
|
||||
logger.error('WORKER', 'Stale session reaper error with non-Error', {}, new Error(String(e)));
|
||||
}
|
||||
}
|
||||
|
||||
// Periodic WAL checkpoint to prevent unbounded WAL growth (#1956)
|
||||
try {
|
||||
this.dbManager.getSessionStore().db.run('PRAGMA wal_checkpoint(PASSIVE)');
|
||||
} catch (e) {
|
||||
if (e instanceof Error) {
|
||||
logger.error('WORKER', 'WAL checkpoint error', {}, e);
|
||||
} else {
|
||||
logger.error('WORKER', 'WAL checkpoint error with non-Error', {}, new Error(String(e)));
|
||||
}
|
||||
}
|
||||
}, 2 * 60 * 1000);
|
||||
// No orphan reaper, no stale-session reaper, no periodic WAL checkpoint.
|
||||
// - Orphan prevention: SDK subprocesses spawn in their own process group
|
||||
// via createSdkSpawnFactory so `kill(-pgid, signal)` tears down every
|
||||
// descendant in one syscall (Principle 5).
|
||||
// - Stale sessions: session cleanup runs in the `generatorPromise.finally`
|
||||
// block of startSessionProcessor — primary-path teardown on generator
|
||||
// exit, not a periodic sweep (Principle 4).
|
||||
// - WAL growth: handled by SQLite's built-in `PRAGMA wal_autocheckpoint`
|
||||
// (applied at DB open time); no app-level timer is required.
|
||||
|
||||
// Auto-recover orphaned queues (fire-and-forget with error logging)
|
||||
this.processPendingQueues(50).then(result => {
|
||||
@@ -760,10 +729,11 @@ export class WorkerService {
|
||||
throw error;
|
||||
})
|
||||
.finally(async () => {
|
||||
// CRITICAL: Verify subprocess exit to prevent zombie accumulation (Issue #1168)
|
||||
const trackedProcess = getProcessBySession(session.sessionDbId);
|
||||
// Primary-path subprocess teardown — process-group kill ensures any
|
||||
// SDK descendants are reaped too (Principle 5).
|
||||
const trackedProcess = getSdkProcessForSession(session.sessionDbId);
|
||||
if (trackedProcess && trackedProcess.process.exitCode === null) {
|
||||
await ensureProcessExit(trackedProcess, 5000);
|
||||
await ensureSdkProcessExit(trackedProcess, 5000);
|
||||
}
|
||||
|
||||
session.generatorPromise = null;
|
||||
@@ -1078,18 +1048,6 @@ export class WorkerService {
|
||||
logger.info('TRANSCRIPT', 'Transcript watcher stopped');
|
||||
}
|
||||
|
||||
// Stop orphan reaper before shutdown (Issue #737)
|
||||
if (this.stopOrphanReaper) {
|
||||
this.stopOrphanReaper();
|
||||
this.stopOrphanReaper = null;
|
||||
}
|
||||
|
||||
// Stop stale session reaper (Issue #1168)
|
||||
if (this.staleSessionReaperInterval) {
|
||||
clearInterval(this.staleSessionReaperInterval);
|
||||
this.staleSessionReaperInterval = null;
|
||||
}
|
||||
|
||||
await performGracefulShutdown({
|
||||
server: this.server.getHttpServer(),
|
||||
sessionManager: this.sessionManager,
|
||||
|
||||
@@ -1,527 +0,0 @@
|
||||
/**
|
||||
* ProcessRegistry: Track spawned Claude subprocesses
|
||||
*
|
||||
* Fixes Issue #737: Claude haiku subprocesses don't terminate properly,
|
||||
* causing zombie process accumulation (user reported 155 processes / 51GB RAM).
|
||||
*
|
||||
* Root causes:
|
||||
* 1. SDK's SpawnedProcess interface hides subprocess PIDs
|
||||
* 2. deleteSession() doesn't verify subprocess exit before cleanup
|
||||
* 3. abort() is fire-and-forget with no confirmation
|
||||
*
|
||||
* Solution:
|
||||
* - Use SDK's spawnClaudeCodeProcess option to capture PIDs
|
||||
* - Track all spawned processes with session association
|
||||
* - Verify exit on session deletion with timeout + SIGKILL escalation
|
||||
* - Safety net orphan reaper runs every 5 minutes
|
||||
*/
|
||||
|
||||
import { spawn, exec, ChildProcess } from 'child_process';
|
||||
import { promisify } from 'util';
|
||||
import { logger } from '../../utils/logger.js';
|
||||
import { sanitizeEnv } from '../../supervisor/env-sanitizer.js';
|
||||
import { getSupervisor } from '../../supervisor/index.js';
|
||||
|
||||
const execAsync = promisify(exec);
|
||||
|
||||
interface TrackedProcess {
|
||||
pid: number;
|
||||
sessionDbId: number;
|
||||
spawnedAt: number;
|
||||
process: ChildProcess;
|
||||
}
|
||||
|
||||
function getTrackedProcesses(): TrackedProcess[] {
|
||||
return getSupervisor().getRegistry()
|
||||
.getAll()
|
||||
.filter(record => record.type === 'sdk')
|
||||
.map((record) => {
|
||||
const processRef = getSupervisor().getRegistry().getRuntimeProcess(record.id);
|
||||
if (!processRef) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
pid: record.pid,
|
||||
sessionDbId: Number(record.sessionId),
|
||||
spawnedAt: Date.parse(record.startedAt),
|
||||
process: processRef
|
||||
};
|
||||
})
|
||||
.filter((value): value is TrackedProcess => value !== null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a spawned process in the registry
|
||||
*/
|
||||
export function registerProcess(pid: number, sessionDbId: number, process: ChildProcess): void {
|
||||
getSupervisor().registerProcess(`sdk:${sessionDbId}:${pid}`, {
|
||||
pid,
|
||||
type: 'sdk',
|
||||
sessionId: sessionDbId,
|
||||
startedAt: new Date().toISOString()
|
||||
}, process);
|
||||
logger.info('PROCESS', `Registered PID ${pid} for session ${sessionDbId}`, { pid, sessionDbId });
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister a process from the registry and notify pool waiters
|
||||
*/
|
||||
export function unregisterProcess(pid: number): void {
|
||||
for (const record of getSupervisor().getRegistry().getByPid(pid)) {
|
||||
if (record.type === 'sdk') {
|
||||
getSupervisor().unregisterProcess(record.id);
|
||||
}
|
||||
}
|
||||
logger.debug('PROCESS', `Unregistered PID ${pid}`, { pid });
|
||||
// Notify waiters that a pool slot may be available
|
||||
notifySlotAvailable();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get process info by session ID
|
||||
* Warns if multiple processes found (indicates race condition)
|
||||
*/
|
||||
export function getProcessBySession(sessionDbId: number): TrackedProcess | undefined {
|
||||
const matches = getTrackedProcesses().filter(info => info.sessionDbId === sessionDbId);
|
||||
if (matches.length > 1) {
|
||||
logger.warn('PROCESS', `Multiple processes found for session ${sessionDbId}`, {
|
||||
count: matches.length,
|
||||
pids: matches.map(m => m.pid)
|
||||
});
|
||||
}
|
||||
return matches[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* Get count of active processes in the registry
|
||||
*/
|
||||
export function getActiveCount(): number {
|
||||
return getSupervisor().getRegistry().getAll().filter(record => record.type === 'sdk').length;
|
||||
}
|
||||
|
||||
// Waiters for pool slots - resolved when a process exits and frees a slot
|
||||
const slotWaiters: Array<() => void> = [];
|
||||
|
||||
/**
|
||||
* Notify waiters that a slot has freed up
|
||||
*/
|
||||
function notifySlotAvailable(): void {
|
||||
const waiter = slotWaiters.shift();
|
||||
if (waiter) waiter();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a pool slot to become available (promise-based, not polling)
|
||||
* @param maxConcurrent Max number of concurrent agents
|
||||
* @param timeoutMs Max time to wait before giving up
|
||||
* @param evictIdleSession Optional callback to evict an idle session when all slots are full (#1868)
|
||||
*/
|
||||
const TOTAL_PROCESS_HARD_CAP = 10;
|
||||
|
||||
export async function waitForSlot(
|
||||
maxConcurrent: number,
|
||||
timeoutMs: number = 60_000,
|
||||
evictIdleSession?: () => boolean
|
||||
): Promise<void> {
|
||||
// Hard cap: refuse to spawn if too many processes exist regardless of pool accounting
|
||||
const activeCount = getActiveCount();
|
||||
if (activeCount >= TOTAL_PROCESS_HARD_CAP) {
|
||||
throw new Error(`Hard cap exceeded: ${activeCount} processes in registry (cap=${TOTAL_PROCESS_HARD_CAP}). Refusing to spawn more.`);
|
||||
}
|
||||
|
||||
if (activeCount < maxConcurrent) return;
|
||||
|
||||
// Try to evict an idle session before waiting (#1868)
|
||||
// Idle sessions hold pool slots during their 3-min idle timeout, blocking new sessions
|
||||
// that would timeout after 60s. Eviction aborts the idle session asynchronously —
|
||||
// the freed slot is picked up by the waiter mechanism below.
|
||||
if (evictIdleSession) {
|
||||
const evicted = evictIdleSession();
|
||||
if (evicted) {
|
||||
logger.info('PROCESS', 'Evicted idle session to free pool slot for waiting request');
|
||||
}
|
||||
}
|
||||
|
||||
logger.info('PROCESS', `Pool limit reached (${activeCount}/${maxConcurrent}), waiting for slot...`);
|
||||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
const idx = slotWaiters.indexOf(onSlot);
|
||||
if (idx >= 0) slotWaiters.splice(idx, 1);
|
||||
reject(new Error(`Timed out waiting for agent pool slot after ${timeoutMs}ms`));
|
||||
}, timeoutMs);
|
||||
|
||||
const onSlot = () => {
|
||||
clearTimeout(timeout);
|
||||
if (getActiveCount() < maxConcurrent) {
|
||||
resolve();
|
||||
} else {
|
||||
// Still full, re-queue
|
||||
slotWaiters.push(onSlot);
|
||||
}
|
||||
};
|
||||
|
||||
slotWaiters.push(onSlot);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all active PIDs (for debugging)
|
||||
*/
|
||||
export function getActiveProcesses(): Array<{ pid: number; sessionDbId: number; ageMs: number }> {
|
||||
const now = Date.now();
|
||||
return getTrackedProcesses().map(info => ({
|
||||
pid: info.pid,
|
||||
sessionDbId: info.sessionDbId,
|
||||
ageMs: now - info.spawnedAt
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a process to exit with timeout, escalating to SIGKILL if needed
|
||||
* Uses event-based waiting instead of polling to avoid CPU overhead
|
||||
*/
|
||||
export async function ensureProcessExit(tracked: TrackedProcess, timeoutMs: number = 5000): Promise<void> {
|
||||
const { pid, process: proc } = tracked;
|
||||
|
||||
// Already exited? Only trust exitCode, NOT proc.killed
|
||||
// proc.killed only means Node sent a signal — the process can still be alive
|
||||
if (proc.exitCode !== null) {
|
||||
unregisterProcess(pid);
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait for graceful exit with timeout using event-based approach
|
||||
const exitPromise = new Promise<void>((resolve) => {
|
||||
proc.once('exit', () => resolve());
|
||||
});
|
||||
|
||||
const timeoutPromise = new Promise<void>((resolve) => {
|
||||
setTimeout(resolve, timeoutMs);
|
||||
});
|
||||
|
||||
await Promise.race([exitPromise, timeoutPromise]);
|
||||
|
||||
// Check if exited gracefully — only trust exitCode
|
||||
if (proc.exitCode !== null) {
|
||||
unregisterProcess(pid);
|
||||
return;
|
||||
}
|
||||
|
||||
// Timeout: escalate to SIGKILL
|
||||
logger.warn('PROCESS', `PID ${pid} did not exit after ${timeoutMs}ms, sending SIGKILL`, { pid, timeoutMs });
|
||||
try {
|
||||
proc.kill('SIGKILL');
|
||||
} catch {
|
||||
// Already dead
|
||||
}
|
||||
|
||||
// Wait for SIGKILL to take effect — use exit event with 1s timeout instead of blind sleep
|
||||
const sigkillExitPromise = new Promise<void>((resolve) => {
|
||||
proc.once('exit', () => resolve());
|
||||
});
|
||||
const sigkillTimeout = new Promise<void>((resolve) => {
|
||||
setTimeout(resolve, 1000);
|
||||
});
|
||||
await Promise.race([sigkillExitPromise, sigkillTimeout]);
|
||||
unregisterProcess(pid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Kill idle daemon children (claude processes spawned by worker-service)
|
||||
*
|
||||
* These are SDK-spawned claude processes that completed their work but
|
||||
* didn't terminate properly. They remain as children of the worker-service
|
||||
* daemon, consuming memory without doing useful work.
|
||||
*
|
||||
* Criteria for cleanup:
|
||||
* - Process name is "claude"
|
||||
* - Parent PID is the worker-service daemon (this process)
|
||||
* - Process has 0% CPU (idle)
|
||||
* - Process has been running for more than 2 minutes
|
||||
*/
|
||||
async function killIdleDaemonChildren(): Promise<number> {
|
||||
if (process.platform === 'win32') {
|
||||
// Windows: Different process model, skip for now
|
||||
return 0;
|
||||
}
|
||||
|
||||
const daemonPid = process.pid;
|
||||
let killed = 0;
|
||||
|
||||
try {
|
||||
const { stdout } = await execAsync(
|
||||
'ps -eo pid,ppid,%cpu,etime,comm 2>/dev/null | grep "claude$" || true'
|
||||
);
|
||||
|
||||
for (const line of stdout.trim().split('\n')) {
|
||||
if (!line) continue;
|
||||
|
||||
const parts = line.trim().split(/\s+/);
|
||||
if (parts.length < 5) continue;
|
||||
|
||||
const [pidStr, ppidStr, cpuStr, etime] = parts;
|
||||
const pid = parseInt(pidStr, 10);
|
||||
const ppid = parseInt(ppidStr, 10);
|
||||
const cpu = parseFloat(cpuStr);
|
||||
|
||||
// Skip if not a child of this daemon
|
||||
if (ppid !== daemonPid) continue;
|
||||
|
||||
// Skip if actively using CPU
|
||||
if (cpu > 0) continue;
|
||||
|
||||
// Parse elapsed time to minutes
|
||||
// Formats: MM:SS, HH:MM:SS, D-HH:MM:SS
|
||||
let minutes = 0;
|
||||
const dayMatch = etime.match(/^(\d+)-(\d+):(\d+):(\d+)$/);
|
||||
const hourMatch = etime.match(/^(\d+):(\d+):(\d+)$/);
|
||||
const minMatch = etime.match(/^(\d+):(\d+)$/);
|
||||
|
||||
if (dayMatch) {
|
||||
minutes = parseInt(dayMatch[1], 10) * 24 * 60 +
|
||||
parseInt(dayMatch[2], 10) * 60 +
|
||||
parseInt(dayMatch[3], 10);
|
||||
} else if (hourMatch) {
|
||||
minutes = parseInt(hourMatch[1], 10) * 60 +
|
||||
parseInt(hourMatch[2], 10);
|
||||
} else if (minMatch) {
|
||||
minutes = parseInt(minMatch[1], 10);
|
||||
}
|
||||
|
||||
// Kill if idle for more than 1 minute
|
||||
if (minutes >= 1) {
|
||||
logger.info('PROCESS', `Killing idle daemon child PID ${pid} (idle ${minutes}m)`, { pid, minutes });
|
||||
try {
|
||||
process.kill(pid, 'SIGKILL');
|
||||
killed++;
|
||||
} catch {
|
||||
// Already dead or permission denied
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// No matches or command error
|
||||
}
|
||||
|
||||
return killed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Kill system-level orphans (ppid=1 on Unix)
|
||||
* These are Claude processes whose parent died unexpectedly
|
||||
*/
|
||||
async function killSystemOrphans(): Promise<number> {
|
||||
if (process.platform === 'win32') {
|
||||
return 0; // Windows doesn't have ppid=1 orphan concept
|
||||
}
|
||||
|
||||
try {
|
||||
const { stdout } = await execAsync(
|
||||
'ps -eo pid,ppid,args 2>/dev/null | grep -E "claude.*haiku|claude.*output-format" | grep -v grep'
|
||||
);
|
||||
|
||||
let killed = 0;
|
||||
for (const line of stdout.trim().split('\n')) {
|
||||
if (!line) continue;
|
||||
const match = line.trim().match(/^(\d+)\s+(\d+)/);
|
||||
if (match && parseInt(match[2]) === 1) { // ppid=1 = orphan
|
||||
const orphanPid = parseInt(match[1]);
|
||||
logger.warn('PROCESS', `Killing system orphan PID ${orphanPid}`, { pid: orphanPid });
|
||||
try {
|
||||
process.kill(orphanPid, 'SIGKILL');
|
||||
killed++;
|
||||
} catch {
|
||||
// Already dead or permission denied
|
||||
}
|
||||
}
|
||||
}
|
||||
return killed;
|
||||
} catch {
|
||||
return 0; // No matches or error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reap orphaned processes - both registry-tracked and system-level
|
||||
*/
|
||||
export async function reapOrphanedProcesses(activeSessionIds: Set<number>): Promise<number> {
|
||||
let killed = 0;
|
||||
|
||||
// Registry-based: kill processes for dead sessions
|
||||
for (const record of getSupervisor().getRegistry().getAll().filter(entry => entry.type === 'sdk')) {
|
||||
const pid = record.pid;
|
||||
const sessionDbId = Number(record.sessionId);
|
||||
const processRef = getSupervisor().getRegistry().getRuntimeProcess(record.id);
|
||||
|
||||
if (activeSessionIds.has(sessionDbId)) continue; // Active = safe
|
||||
|
||||
logger.warn('PROCESS', `Killing orphan PID ${pid} (session ${sessionDbId} gone)`, { pid, sessionDbId });
|
||||
try {
|
||||
if (processRef) {
|
||||
processRef.kill('SIGKILL');
|
||||
} else {
|
||||
process.kill(pid, 'SIGKILL');
|
||||
}
|
||||
killed++;
|
||||
} catch {
|
||||
// Already dead
|
||||
}
|
||||
getSupervisor().unregisterProcess(record.id);
|
||||
notifySlotAvailable();
|
||||
}
|
||||
|
||||
// System-level: find ppid=1 orphans
|
||||
killed += await killSystemOrphans();
|
||||
|
||||
// Daemon children: find idle SDK processes that didn't terminate
|
||||
killed += await killIdleDaemonChildren();
|
||||
|
||||
return killed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a custom spawn function for SDK that captures PIDs
|
||||
*
|
||||
* The SDK's spawnClaudeCodeProcess option allows us to intercept subprocess
|
||||
* creation and capture the PID before the SDK hides it.
|
||||
*
|
||||
* NOTE: Session isolation is handled via the `cwd` option in SDKAgent.ts,
|
||||
* NOT via CLAUDE_CONFIG_DIR (which breaks authentication).
|
||||
*/
|
||||
export function createPidCapturingSpawn(sessionDbId: number) {
|
||||
return (spawnOptions: {
|
||||
command: string;
|
||||
args: string[];
|
||||
cwd?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
signal?: AbortSignal;
|
||||
}) => {
|
||||
// Kill any existing process for this session before spawning a new one.
|
||||
// Multiple processes sharing the same --resume UUID waste API credits and
|
||||
// can conflict with each other (Issue #1590).
|
||||
const existing = getProcessBySession(sessionDbId);
|
||||
if (existing && existing.process.exitCode === null) {
|
||||
logger.warn('PROCESS', `Killing duplicate process PID ${existing.pid} before spawning new one for session ${sessionDbId}`, {
|
||||
existingPid: existing.pid,
|
||||
sessionDbId
|
||||
});
|
||||
let exited = false;
|
||||
try {
|
||||
existing.process.kill('SIGTERM');
|
||||
exited = existing.process.exitCode !== null;
|
||||
} catch (error: unknown) {
|
||||
// Already dead — safe to unregister immediately
|
||||
if (error instanceof Error) {
|
||||
logger.warn('WORKER', `Failed to kill duplicate process PID ${existing.pid}, likely already dead`, { existingPid: existing.pid, sessionDbId }, error);
|
||||
}
|
||||
exited = true;
|
||||
}
|
||||
|
||||
if (exited) {
|
||||
unregisterProcess(existing.pid);
|
||||
}
|
||||
// If still alive, the 'exit' handler (line ~440) will unregister it.
|
||||
}
|
||||
|
||||
getSupervisor().assertCanSpawn('claude sdk');
|
||||
|
||||
// On Windows, use cmd.exe wrapper for .cmd files to properly handle paths with spaces
|
||||
const useCmdWrapper = process.platform === 'win32' && spawnOptions.command.endsWith('.cmd');
|
||||
const env = sanitizeEnv(spawnOptions.env ?? process.env);
|
||||
|
||||
// Filter empty string args AND their preceding flag (Issue #2049).
|
||||
// The Agent SDK emits ["--setting-sources", ""] when settingSources defaults to [].
|
||||
// Simply dropping "" leaves an orphan --setting-sources that consumes the next
|
||||
// flag (e.g. --permission-mode) as its value, crashing Claude Code 2.1.109+ with
|
||||
// "Invalid setting source: --permission-mode". Drop the flag too so the SDK
|
||||
// default (no setting sources) is preserved by omission.
|
||||
const args: string[] = [];
|
||||
for (const arg of spawnOptions.args) {
|
||||
if (arg === '') {
|
||||
if (args.length > 0 && args[args.length - 1].startsWith('--')) {
|
||||
args.pop();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
args.push(arg);
|
||||
}
|
||||
|
||||
const child = useCmdWrapper
|
||||
? spawn('cmd.exe', ['/d', '/c', spawnOptions.command, ...args], {
|
||||
cwd: spawnOptions.cwd,
|
||||
env,
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
signal: spawnOptions.signal,
|
||||
windowsHide: true
|
||||
})
|
||||
: spawn(spawnOptions.command, args, {
|
||||
cwd: spawnOptions.cwd,
|
||||
env,
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
signal: spawnOptions.signal, // CRITICAL: Pass signal for AbortController integration
|
||||
windowsHide: true
|
||||
});
|
||||
|
||||
// Capture stderr for debugging spawn failures
|
||||
if (child.stderr) {
|
||||
child.stderr.on('data', (data: Buffer) => {
|
||||
logger.debug('SDK_SPAWN', `[session-${sessionDbId}] stderr: ${data.toString().trim()}`);
|
||||
});
|
||||
}
|
||||
|
||||
// Register PID
|
||||
if (child.pid) {
|
||||
registerProcess(child.pid, sessionDbId, child);
|
||||
|
||||
// Auto-unregister on exit
|
||||
child.on('exit', (code: number | null, signal: string | null) => {
|
||||
if (code !== 0) {
|
||||
logger.warn('SDK_SPAWN', `[session-${sessionDbId}] Claude process exited`, { code, signal, pid: child.pid });
|
||||
}
|
||||
if (child.pid) {
|
||||
unregisterProcess(child.pid);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Return SDK-compatible interface
|
||||
return {
|
||||
stdin: child.stdin,
|
||||
stdout: child.stdout,
|
||||
stderr: child.stderr,
|
||||
get killed() { return child.killed; },
|
||||
get exitCode() { return child.exitCode; },
|
||||
kill: child.kill.bind(child),
|
||||
on: child.on.bind(child),
|
||||
once: child.once.bind(child),
|
||||
off: child.off.bind(child)
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the orphan reaper interval
|
||||
* Returns cleanup function to stop the interval
|
||||
*/
|
||||
export function startOrphanReaper(getActiveSessionIds: () => Set<number>, intervalMs: number = 30 * 1000): () => void {
|
||||
const interval = setInterval(async () => {
|
||||
try {
|
||||
const activeIds = getActiveSessionIds();
|
||||
const killed = await reapOrphanedProcesses(activeIds);
|
||||
if (killed > 0) {
|
||||
logger.info('PROCESS', `Reaper cleaned up ${killed} orphaned processes`, { killed });
|
||||
}
|
||||
} catch (error) {
|
||||
if (error instanceof Error) {
|
||||
logger.error('WORKER', 'Reaper error', {}, error);
|
||||
} else {
|
||||
logger.error('WORKER', 'Reaper error', { rawError: String(error) });
|
||||
}
|
||||
}
|
||||
}, intervalMs);
|
||||
|
||||
// Return cleanup function
|
||||
return () => clearInterval(interval);
|
||||
}
|
||||
@@ -21,7 +21,12 @@ import { buildIsolatedEnv, getAuthMethodDescription } from '../../shared/EnvMana
|
||||
import type { ActiveSession, SDKUserMessage } from '../worker-types.js';
|
||||
import { ModeManager } from '../domain/ModeManager.js';
|
||||
import { processAgentResponse, type WorkerRef } from './agents/index.js';
|
||||
import { createPidCapturingSpawn, getProcessBySession, ensureProcessExit, waitForSlot } from './ProcessRegistry.js';
|
||||
import {
|
||||
createSdkSpawnFactory,
|
||||
getSdkProcessForSession,
|
||||
ensureSdkProcessExit,
|
||||
waitForSlot,
|
||||
} from '../../supervisor/process-registry.js';
|
||||
import { sanitizeEnv } from '../../supervisor/env-sanitizer.js';
|
||||
|
||||
// Import Agent SDK (assumes it's installed)
|
||||
@@ -90,11 +95,11 @@ export class SDKAgent {
|
||||
}
|
||||
|
||||
// Wait for agent pool slot (configurable via CLAUDE_MEM_MAX_CONCURRENT_AGENTS)
|
||||
// Pass idle session eviction callback to prevent pool deadlock (#1868):
|
||||
// idle sessions hold slots during 3-min idle wait, blocking new sessions
|
||||
// Backpressure only — a full pool waits, never evicts a live session
|
||||
// (Principle 1: do not kick live work to make room).
|
||||
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
|
||||
const maxConcurrent = parseInt(settings.CLAUDE_MEM_MAX_CONCURRENT_AGENTS, 10) || 2;
|
||||
await waitForSlot(maxConcurrent, 60_000, () => this.sessionManager.evictIdlestSession());
|
||||
await waitForSlot(maxConcurrent, 60_000);
|
||||
|
||||
// Build isolated environment from ~/.claude-mem/.env
|
||||
// This prevents Issue #733: random ANTHROPIC_API_KEY from project .env files
|
||||
@@ -143,8 +148,9 @@ export class SDKAgent {
|
||||
disallowedTools,
|
||||
abortController: session.abortController,
|
||||
pathToClaudeCodeExecutable: claudePath,
|
||||
// Custom spawn function captures PIDs to fix zombie process accumulation
|
||||
spawnClaudeCodeProcess: createPidCapturingSpawn(session.sessionDbId),
|
||||
// Custom spawn factory: spawns the SDK child in its own POSIX process
|
||||
// group so the worker can tear down the whole subtree on shutdown.
|
||||
spawnClaudeCodeProcess: createSdkSpawnFactory(session.sessionDbId),
|
||||
env: isolatedEnv // Use isolated credentials from ~/.claude-mem/.env, not process.env
|
||||
}
|
||||
});
|
||||
@@ -283,10 +289,12 @@ export class SDKAgent {
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// Ensure subprocess is terminated after query completes (or on error)
|
||||
const tracked = getProcessBySession(session.sessionDbId);
|
||||
// Ensure subprocess is terminated after query completes (or on error).
|
||||
// Process-group teardown via ensureSdkProcessExit kills any descendants
|
||||
// the SDK spawned, so no orphan reaper is needed (Principle 5).
|
||||
const tracked = getSdkProcessForSession(session.sessionDbId);
|
||||
if (tracked && tracked.process.exitCode === null) {
|
||||
await ensureProcessExit(tracked, 5000);
|
||||
await ensureSdkProcessExit(tracked, 5000);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,75 +14,11 @@ import { logger } from '../../utils/logger.js';
|
||||
import type { ActiveSession, PendingMessage, PendingMessageWithId, ObservationData } from '../worker-types.js';
|
||||
import { PendingMessageStore } from '../sqlite/PendingMessageStore.js';
|
||||
import { SessionQueueProcessor } from '../queue/SessionQueueProcessor.js';
|
||||
import { getProcessBySession, ensureProcessExit } from './ProcessRegistry.js';
|
||||
import { getSdkProcessForSession, ensureSdkProcessExit } from '../../supervisor/process-registry.js';
|
||||
import { getSupervisor } from '../../supervisor/index.js';
|
||||
import { MAX_CONSECUTIVE_SUMMARY_FAILURES } from '../../sdk/prompts.js';
|
||||
import { RestartGuard } from './RestartGuard.js';
|
||||
|
||||
/** Idle threshold before a stuck generator (zombie subprocess) is force-killed. */
|
||||
export const MAX_GENERATOR_IDLE_MS = 5 * 60 * 1000; // 5 minutes
|
||||
|
||||
/** Idle threshold before a no-generator session with no pending work is reaped. */
|
||||
export const MAX_SESSION_IDLE_MS = 15 * 60 * 1000; // 15 minutes
|
||||
|
||||
/**
|
||||
* Minimal process interface used by detectStaleGenerator — compatible with
|
||||
* both the real Bun.Subprocess / ChildProcess shapes and test mocks.
|
||||
*/
|
||||
export interface StaleGeneratorProcess {
|
||||
exitCode: number | null;
|
||||
kill(signal?: string): boolean | void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Minimal session fields required to evaluate stale-generator status.
|
||||
* This is a subset of ActiveSession, allowing unit tests to pass plain objects.
|
||||
*/
|
||||
export interface StaleGeneratorCandidate {
|
||||
generatorPromise: Promise<void> | null;
|
||||
lastGeneratorActivity: number;
|
||||
abortController: AbortController;
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect whether a session's generator is stuck (zombie subprocess) and, if so,
|
||||
* SIGKILL the subprocess and abort the controller.
|
||||
*
|
||||
* Extracted from reapStaleSessions() so tests can import and exercise the exact
|
||||
* same logic rather than duplicating it locally. (Issue #1652)
|
||||
*
|
||||
* @param session - session to inspect
|
||||
* @param proc - tracked subprocess (may be undefined if not in ProcessRegistry)
|
||||
* @param now - current timestamp (defaults to Date.now(); pass explicit value in tests)
|
||||
* @returns true if the session was marked stale, false otherwise
|
||||
*/
|
||||
export function detectStaleGenerator(
|
||||
session: StaleGeneratorCandidate,
|
||||
proc: StaleGeneratorProcess | undefined,
|
||||
now = Date.now()
|
||||
): boolean {
|
||||
if (!session.generatorPromise) return false;
|
||||
|
||||
const generatorIdleMs = now - session.lastGeneratorActivity;
|
||||
if (generatorIdleMs <= MAX_GENERATOR_IDLE_MS) return false;
|
||||
|
||||
// Kill subprocess to unblock stuck for-await
|
||||
if (proc && proc.exitCode === null) {
|
||||
try {
|
||||
proc.kill('SIGKILL');
|
||||
} catch (error) {
|
||||
if (error instanceof Error) {
|
||||
logger.warn('SESSION', 'Failed to SIGKILL stale generator subprocess', {}, error);
|
||||
} else {
|
||||
logger.warn('SESSION', 'Failed to SIGKILL stale generator subprocess with non-Error', {}, new Error(String(error)));
|
||||
}
|
||||
}
|
||||
}
|
||||
// Signal the SDK agent loop to exit
|
||||
session.abortController.abort();
|
||||
return true;
|
||||
}
|
||||
|
||||
export class SessionManager {
|
||||
private dbManager: DatabaseManager;
|
||||
private sessions: Map<number, ActiveSession> = new Map();
|
||||
@@ -402,14 +338,16 @@ export class SessionManager {
|
||||
});
|
||||
}
|
||||
|
||||
// 3. Verify subprocess exit with 5s timeout (Issue #737 fix)
|
||||
const tracked = getProcessBySession(sessionDbId);
|
||||
// 3. Verify subprocess exit with 5s timeout. Process-group teardown is
|
||||
// used internally so any SDK descendants are killed too (Principle 5).
|
||||
const tracked = getSdkProcessForSession(sessionDbId);
|
||||
if (tracked && tracked.process.exitCode === null) {
|
||||
logger.debug('SESSION', `Waiting for subprocess PID ${tracked.pid} to exit`, {
|
||||
logger.debug('SESSION', `Waiting for subprocess PID ${tracked.pid} (pgid ${tracked.pgid}) to exit`, {
|
||||
sessionId: sessionDbId,
|
||||
pid: tracked.pid
|
||||
pid: tracked.pid,
|
||||
pgid: tracked.pgid
|
||||
});
|
||||
await ensureProcessExit(tracked, 5000);
|
||||
await ensureSdkProcessExit(tracked, 5000);
|
||||
}
|
||||
|
||||
// 3b. Reap all supervisor-tracked processes for this session (#1351)
|
||||
@@ -467,106 +405,6 @@ export class SessionManager {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Evict the idlest session to free a pool slot (#1868).
|
||||
* An "idle" session has an active generator but no pending work — it's sitting
|
||||
* in the 3-min idle wait before subprocess cleanup. Evicting it triggers abort
|
||||
* which kills the subprocess and frees the pool slot for a waiting new session.
|
||||
* @returns true if a session was evicted, false if no idle sessions found
|
||||
*/
|
||||
evictIdlestSession(): boolean {
|
||||
let idlestSessionId: number | null = null;
|
||||
let oldestActivity = Infinity;
|
||||
|
||||
for (const [sessionDbId, session] of this.sessions) {
|
||||
if (!session.generatorPromise) continue; // No generator = no slot held
|
||||
const pendingCount = this.getPendingStore().getPendingCount(sessionDbId);
|
||||
if (pendingCount > 0) continue; // Has work to do, don't evict
|
||||
|
||||
// Pick the session with the oldest lastGeneratorActivity (idlest)
|
||||
if (session.lastGeneratorActivity < oldestActivity) {
|
||||
oldestActivity = session.lastGeneratorActivity;
|
||||
idlestSessionId = sessionDbId;
|
||||
}
|
||||
}
|
||||
|
||||
if (idlestSessionId === null) return false;
|
||||
|
||||
const session = this.sessions.get(idlestSessionId);
|
||||
if (!session) return false;
|
||||
|
||||
logger.info('SESSION', 'Evicting idle session to free pool slot for new request (#1868)', {
|
||||
sessionDbId: idlestSessionId,
|
||||
idleDurationMs: Date.now() - oldestActivity
|
||||
});
|
||||
|
||||
session.idleTimedOut = true;
|
||||
session.abortController.abort();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reap sessions with no active generator and no pending work that have been idle too long.
|
||||
* Also reaps sessions whose generator has been stuck (no lastGeneratorActivity update) for
|
||||
* longer than MAX_GENERATOR_IDLE_MS — these are zombie subprocesses that will never exit
|
||||
* on their own because the orphan reaper skips sessions in the active sessions map. (Issue #1652)
|
||||
*
|
||||
* This unblocks the orphan reaper which skips processes for "active" sessions. (Issue #1168)
|
||||
*/
|
||||
async reapStaleSessions(): Promise<number> {
|
||||
const now = Date.now();
|
||||
const staleSessionIds: number[] = [];
|
||||
|
||||
for (const [sessionDbId, session] of this.sessions) {
|
||||
// Sessions with active generators — check for stuck/zombie generators (Issue #1652)
|
||||
if (session.generatorPromise) {
|
||||
const generatorIdleMs = now - session.lastGeneratorActivity;
|
||||
if (generatorIdleMs > MAX_GENERATOR_IDLE_MS) {
|
||||
logger.warn('SESSION', `Stale generator detected for session ${sessionDbId} (no activity for ${Math.round(generatorIdleMs / 60000)}m) — force-killing subprocess`, {
|
||||
sessionDbId,
|
||||
generatorIdleMs
|
||||
});
|
||||
// Force-kill the subprocess to unblock the stuck for-await in SDKAgent.
|
||||
// Without this the generator is blocked on `for await (const msg of queryResult)`
|
||||
// and will never exit even after abort() is called.
|
||||
const trackedProcess = getProcessBySession(sessionDbId);
|
||||
if (trackedProcess && trackedProcess.process.exitCode === null) {
|
||||
try {
|
||||
trackedProcess.process.kill('SIGKILL');
|
||||
} catch (err) {
|
||||
if (err instanceof Error) {
|
||||
logger.warn('SESSION', 'Failed to SIGKILL subprocess for stale generator', { sessionDbId }, err);
|
||||
} else {
|
||||
logger.warn('SESSION', 'Failed to SIGKILL subprocess for stale generator with non-Error', { sessionDbId }, new Error(String(err)));
|
||||
}
|
||||
}
|
||||
}
|
||||
// Signal the SDK agent loop to exit after the subprocess dies
|
||||
session.abortController.abort();
|
||||
staleSessionIds.push(sessionDbId);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip sessions with pending work
|
||||
const pendingCount = this.getPendingStore().getPendingCount(sessionDbId);
|
||||
if (pendingCount > 0) continue;
|
||||
|
||||
// No generator + no pending work + old enough = stale
|
||||
const sessionAge = now - session.startTime;
|
||||
if (sessionAge > MAX_SESSION_IDLE_MS) {
|
||||
logger.warn('SESSION', `Reaping idle session ${sessionDbId} (no activity for >${Math.round(MAX_SESSION_IDLE_MS / 60000)}m)`, { sessionDbId });
|
||||
staleSessionIds.push(sessionDbId);
|
||||
}
|
||||
}
|
||||
|
||||
for (const sessionDbId of staleSessionIds) {
|
||||
await this.deleteSession(sessionDbId);
|
||||
}
|
||||
|
||||
return staleSessionIds.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown all active sessions
|
||||
*/
|
||||
|
||||
@@ -21,7 +21,7 @@ import { SessionCompletionHandler } from '../../session/SessionCompletionHandler
|
||||
import { PrivacyCheckValidator } from '../../validation/PrivacyCheckValidator.js';
|
||||
import { SettingsDefaultsManager } from '../../../../shared/SettingsDefaultsManager.js';
|
||||
import { USER_SETTINGS_PATH } from '../../../../shared/paths.js';
|
||||
import { getProcessBySession, ensureProcessExit } from '../../ProcessRegistry.js';
|
||||
import { getSdkProcessForSession, ensureSdkProcessExit } from '../../../../supervisor/process-registry.js';
|
||||
import { getProjectContext } from '../../../../utils/project-name.js';
|
||||
import { normalizePlatformSource } from '../../../../shared/platform-source.js';
|
||||
import { RestartGuard } from '../../RestartGuard.js';
|
||||
@@ -268,10 +268,11 @@ export class SessionRoutes extends BaseRouteHandler {
|
||||
}
|
||||
})
|
||||
.finally(async () => {
|
||||
// CRITICAL: Verify subprocess exit to prevent zombie accumulation (Issue #1168)
|
||||
const tracked = getProcessBySession(session.sessionDbId);
|
||||
// Primary-path subprocess teardown — process-group kill ensures any
|
||||
// SDK descendants are reaped too (Principle 5).
|
||||
const tracked = getSdkProcessForSession(session.sessionDbId);
|
||||
if (tracked && !tracked.process.killed && tracked.process.exitCode === null) {
|
||||
await ensureProcessExit(tracked, 5000);
|
||||
await ensureSdkProcessExit(tracked, 5000);
|
||||
}
|
||||
|
||||
const sessionDbId = session.sessionDbId;
|
||||
|
||||
@@ -1,9 +1,16 @@
|
||||
import path from "path";
|
||||
import { readFileSync } from "fs";
|
||||
import { readFileSync, existsSync } from "fs";
|
||||
import { spawn, execSync } from "child_process";
|
||||
import { logger } from "../utils/logger.js";
|
||||
import { HOOK_TIMEOUTS, getTimeout } from "./hook-constants.js";
|
||||
import { SettingsDefaultsManager } from "./SettingsDefaultsManager.js";
|
||||
import { MARKETPLACE_ROOT } from "./paths.js";
|
||||
// `validateWorkerPidFile` consults `captureProcessStartToken` at
|
||||
// `src/supervisor/process-registry.ts` for PID-reuse detection (commit
|
||||
// 99060bac). The lazy-spawn fast path below uses it to confirm a live port
|
||||
// is owned by OUR worker incarnation rather than a stale PID squatting on
|
||||
// the port after container restart.
|
||||
import { validateWorkerPidFile } from "../supervisor/index.js";
|
||||
|
||||
// Named constants for health checks
|
||||
// Allow env var override for users on slow systems (e.g., CLAUDE_MEM_HEALTH_TIMEOUT_MS=10000)
|
||||
@@ -214,26 +221,168 @@ async function checkWorkerVersion(): Promise<void> {
|
||||
|
||||
|
||||
/**
|
||||
* Ensure worker service is running
|
||||
* Quick health check - returns false if worker not healthy (doesn't block)
|
||||
* Port might be in use by another process, or worker might not be started yet
|
||||
* Resolve the absolute path to the worker-service script the hook should
|
||||
* relaunch as a detached daemon. Hooks live in the plugin's `scripts/`
|
||||
* directory next to `worker-service.cjs`; production and dev checkouts both
|
||||
* ship the bundled CJS there. Returns null when no candidate exists on disk
|
||||
* (partial install, build artifact missing).
|
||||
*/
|
||||
export async function ensureWorkerRunning(): Promise<boolean> {
|
||||
// Quick health check (single attempt, no polling)
|
||||
try {
|
||||
if (await isWorkerHealthy()) {
|
||||
await checkWorkerVersion(); // logs warning on mismatch, doesn't restart
|
||||
return true; // Worker healthy
|
||||
}
|
||||
} catch (e) {
|
||||
// Not healthy - log for debugging
|
||||
logger.debug('SYSTEM', 'Worker health check failed', {
|
||||
error: e instanceof Error ? e.message : String(e)
|
||||
});
|
||||
function resolveWorkerScriptPath(): string | null {
|
||||
const candidates = [
|
||||
path.join(MARKETPLACE_ROOT, 'plugin', 'scripts', 'worker-service.cjs'),
|
||||
path.join(process.cwd(), 'plugin', 'scripts', 'worker-service.cjs'),
|
||||
];
|
||||
for (const candidate of candidates) {
|
||||
if (existsSync(candidate)) return candidate;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// Port might be in use by something else, or worker not started
|
||||
// Return false but don't throw - let caller decide how to handle
|
||||
logger.warn('SYSTEM', 'Worker not healthy, hook will proceed gracefully');
|
||||
/**
|
||||
* Resolve the absolute path to the Bun runtime.
|
||||
*
|
||||
* Local to worker-utils.ts so the lazy-spawn path does not transitively
|
||||
* import `services/infrastructure/ProcessManager.ts` — that module pulls
|
||||
* in `bun:sqlite` via `cwd-remap`, and pulling it in would break the NPX
|
||||
* CLI bundle which must run under plain Node (no Bun). The worker daemon
|
||||
* itself requires Bun (it uses bun:sqlite directly); this lookup finds
|
||||
* the Bun binary that the daemon will execute under.
|
||||
*/
|
||||
function resolveBunRuntime(): string | null {
|
||||
if (process.env.BUN && existsSync(process.env.BUN)) return process.env.BUN;
|
||||
|
||||
try {
|
||||
const cmd = process.platform === 'win32' ? 'where bun' : 'which bun';
|
||||
const output = execSync(cmd, {
|
||||
stdio: ['ignore', 'pipe', 'ignore'],
|
||||
encoding: 'utf-8',
|
||||
windowsHide: true,
|
||||
});
|
||||
const firstMatch = output
|
||||
.split(/\r?\n/)
|
||||
.map(line => line.trim())
|
||||
.find(line => line.length > 0);
|
||||
return firstMatch || null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the worker port to open, using exponential backoff.
|
||||
*
|
||||
* Deliberately hand-rolled — `respawn` or similar npm helpers add a
|
||||
* supervisor semantic layer we do not want here (Principle 6). The retry
|
||||
* policy is three attempts with 250ms → 500ms → 1000ms backoff, which is
|
||||
* enough to cover the worker's start-up (~1-2s on a warm cache, slower on
|
||||
* Windows) without blocking a hook for long when the spawn outright failed.
|
||||
*/
|
||||
async function waitForWorkerPort(options: { attempts: number; backoffMs: number }): Promise<boolean> {
|
||||
let delayMs = options.backoffMs;
|
||||
for (let attempt = 1; attempt <= options.attempts; attempt++) {
|
||||
if (await isWorkerPortAlive()) return true;
|
||||
if (attempt < options.attempts) {
|
||||
await new Promise<void>(resolve => setTimeout(resolve, delayMs));
|
||||
delayMs *= 2;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Is the worker port owned by a live worker we recognize?
|
||||
*
|
||||
* Two gates:
|
||||
* 1. HTTP /api/health returns 200, AND
|
||||
* 2. PID-file start-token check (via `validateWorkerPidFile` →
|
||||
* `captureProcessStartToken`) confirms the recorded PID has not been
|
||||
* reused by a different process since the file was written.
|
||||
*
|
||||
* When the PID file is missing we accept a healthy HTTP response on its own
|
||||
* — the file is written by the worker itself after `listen()` succeeds, so
|
||||
* a brief window exists during which a freshly-spawned worker is reachable
|
||||
* via HTTP but has not yet persisted its PID record. Treating this as
|
||||
* "not ours" would cause the hook to double-spawn in a race with the
|
||||
* worker's own PID-file write.
|
||||
*
|
||||
* An 'alive' status that fails identity verification is treated as dead so
|
||||
* the caller falls through to the spawn path (Phase 8 contract).
|
||||
*/
|
||||
async function isWorkerPortAlive(): Promise<boolean> {
|
||||
let healthy: boolean;
|
||||
try {
|
||||
healthy = await isWorkerHealthy();
|
||||
} catch (error: unknown) {
|
||||
logger.debug('SYSTEM', 'Worker health check threw', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
return false;
|
||||
}
|
||||
if (!healthy) return false;
|
||||
|
||||
const pidStatus = validateWorkerPidFile({ logAlive: false });
|
||||
if (pidStatus === 'missing') return true; // race: listening before PID file written
|
||||
if (pidStatus === 'alive') return true; // identity verified via start-token
|
||||
return false; // 'stale' | 'invalid' — PID reused
|
||||
}
|
||||
|
||||
/**
|
||||
* Lazy-spawn the worker if it is not already running, then wait for its port.
|
||||
*
|
||||
* Flow:
|
||||
* 1. If the port is alive AND verified as ours, return true (fast path).
|
||||
* 2. Otherwise, resolve the bun runtime + worker script path.
|
||||
* 3. Spawn detached, `unref()` so the hook's exit does not take the worker
|
||||
* down with it (the worker lives as its own independent daemon).
|
||||
* 4. Wait for the port to come up, up to 3 attempts with exponential
|
||||
* backoff (250ms → 500ms → 1000ms — ~1.75s total).
|
||||
*
|
||||
* PID-reuse safety is inherited from `validateWorkerPidFile` (commit
|
||||
* 99060bac) — see the `isWorkerPortAlive` comment above. There is no
|
||||
* auto-restart loop; failure is reported via the return value so the hook
|
||||
* can surface it through exit code 2 (Principle 2 — fail-fast).
|
||||
*/
|
||||
export async function ensureWorkerRunning(): Promise<boolean> {
|
||||
if (await isWorkerPortAlive()) {
|
||||
await checkWorkerVersion();
|
||||
return true;
|
||||
}
|
||||
|
||||
const runtimePath = resolveBunRuntime();
|
||||
const scriptPath = resolveWorkerScriptPath();
|
||||
|
||||
if (!runtimePath) {
|
||||
logger.warn('SYSTEM', 'Cannot lazy-spawn worker: Bun runtime not found on PATH');
|
||||
return false;
|
||||
}
|
||||
if (!scriptPath) {
|
||||
logger.warn('SYSTEM', 'Cannot lazy-spawn worker: worker-service.cjs not found in plugin/scripts');
|
||||
return false;
|
||||
}
|
||||
|
||||
logger.info('SYSTEM', 'Worker not running — lazy-spawning', { runtimePath, scriptPath });
|
||||
|
||||
try {
|
||||
const proc = spawn(runtimePath, [scriptPath, '--daemon'], {
|
||||
detached: true,
|
||||
stdio: ['ignore', 'ignore', 'ignore'],
|
||||
});
|
||||
proc.unref();
|
||||
} catch (error: unknown) {
|
||||
if (error instanceof Error) {
|
||||
logger.error('SYSTEM', 'Lazy-spawn of worker failed', { runtimePath, scriptPath }, error);
|
||||
} else {
|
||||
logger.error('SYSTEM', 'Lazy-spawn of worker failed (non-Error)', {
|
||||
runtimePath, scriptPath, error: String(error),
|
||||
});
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
const alive = await waitForWorkerPort({ attempts: 3, backoffMs: 250 });
|
||||
if (!alive) {
|
||||
logger.warn('SYSTEM', 'Worker port did not open after lazy-spawn within 3 attempts');
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { ChildProcess, spawnSync } from 'child_process';
|
||||
import { ChildProcess, spawn, spawnSync } from 'child_process';
|
||||
import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'fs';
|
||||
import { homedir } from 'os';
|
||||
import path from 'path';
|
||||
import { logger } from '../utils/logger.js';
|
||||
import { sanitizeEnv } from './env-sanitizer.js';
|
||||
|
||||
const REAP_SESSION_SIGTERM_TIMEOUT_MS = 5_000;
|
||||
const REAP_SESSION_SIGKILL_TIMEOUT_MS = 1_000;
|
||||
@@ -15,6 +16,14 @@ export interface ManagedProcessInfo {
|
||||
type: string;
|
||||
sessionId?: string | number;
|
||||
startedAt: string;
|
||||
// POSIX process group leader PID for group-scoped teardown.
|
||||
// On Unix, when a child is spawned with `detached: true`, the kernel calls
|
||||
// setpgid() and the child becomes the leader of its own group — its pgid
|
||||
// equals its pid. Stored so `process.kill(-pgid, signal)` can tear down
|
||||
// the child AND every descendant it spawned in one syscall (Principle 5).
|
||||
// Undefined on Windows (no POSIX groups) and for processes that were not
|
||||
// spawned with detached: true (e.g. the worker itself, MCP stdio clients).
|
||||
pgid?: number;
|
||||
}
|
||||
|
||||
export interface ManagedProcessRecord extends ManagedProcessInfo {
|
||||
@@ -303,22 +312,30 @@ export class ProcessRegistry {
|
||||
pids: sessionRecords.map(r => r.pid)
|
||||
});
|
||||
|
||||
// Phase 1: SIGTERM all alive processes
|
||||
// Phase 1: SIGTERM all alive processes — use process-group teardown for
|
||||
// records that carry pgid so any descendants the SDK spawned are killed
|
||||
// too (Principle 5).
|
||||
const aliveRecords = sessionRecords.filter(r => isPidAlive(r.pid));
|
||||
for (const record of aliveRecords) {
|
||||
try {
|
||||
process.kill(record.pid, 'SIGTERM');
|
||||
if (typeof record.pgid === 'number' && process.platform !== 'win32') {
|
||||
process.kill(-record.pgid, 'SIGTERM');
|
||||
} else {
|
||||
process.kill(record.pid, 'SIGTERM');
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
if (error instanceof Error) {
|
||||
const code = (error as NodeJS.ErrnoException).code;
|
||||
if (code !== 'ESRCH') {
|
||||
logger.debug('SYSTEM', `Failed to SIGTERM session process PID ${record.pid}`, {
|
||||
pid: record.pid
|
||||
pid: record.pid,
|
||||
pgid: record.pgid
|
||||
}, error);
|
||||
}
|
||||
} else {
|
||||
logger.warn('SYSTEM', `Failed to SIGTERM session process PID ${record.pid} (non-Error)`, {
|
||||
pid: record.pid,
|
||||
pgid: record.pgid,
|
||||
error: String(error)
|
||||
});
|
||||
}
|
||||
@@ -333,26 +350,34 @@ export class ProcessRegistry {
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
}
|
||||
|
||||
// Phase 3: SIGKILL any survivors
|
||||
// Phase 3: SIGKILL any survivors — process-group teardown when pgid is
|
||||
// recorded so descendants are killed too.
|
||||
const survivors = aliveRecords.filter(r => isPidAlive(r.pid));
|
||||
for (const record of survivors) {
|
||||
logger.warn('SYSTEM', `Session process PID ${record.pid} did not exit after SIGTERM, sending SIGKILL`, {
|
||||
pid: record.pid,
|
||||
pgid: record.pgid,
|
||||
sessionId: sessionIdNum
|
||||
});
|
||||
try {
|
||||
process.kill(record.pid, 'SIGKILL');
|
||||
if (typeof record.pgid === 'number' && process.platform !== 'win32') {
|
||||
process.kill(-record.pgid, 'SIGKILL');
|
||||
} else {
|
||||
process.kill(record.pid, 'SIGKILL');
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
if (error instanceof Error) {
|
||||
const code = (error as NodeJS.ErrnoException).code;
|
||||
if (code !== 'ESRCH') {
|
||||
logger.debug('SYSTEM', `Failed to SIGKILL session process PID ${record.pid}`, {
|
||||
pid: record.pid
|
||||
pid: record.pid,
|
||||
pgid: record.pgid
|
||||
}, error);
|
||||
}
|
||||
} else {
|
||||
logger.warn('SYSTEM', `Failed to SIGKILL session process PID ${record.pid} (non-Error)`, {
|
||||
pid: record.pid,
|
||||
pgid: record.pgid,
|
||||
error: String(error)
|
||||
});
|
||||
}
|
||||
@@ -406,3 +431,370 @@ export function getProcessRegistry(): ProcessRegistry {
|
||||
export function createProcessRegistry(registryPath: string): ProcessRegistry {
|
||||
return new ProcessRegistry(registryPath);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// SDK session lookup + exit verification
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export interface TrackedSdkProcess {
|
||||
pid: number;
|
||||
pgid: number | undefined;
|
||||
sessionDbId: number;
|
||||
process: ChildProcess;
|
||||
}
|
||||
|
||||
/**
|
||||
* Look up the live SDK subprocess for a given session, if any.
|
||||
*
|
||||
* Returns undefined when no SDK record is registered for the session, or
|
||||
* when the ChildProcess reference has been dropped (process exited and was
|
||||
* unregistered). Warns on duplicates — multiple SDK records per session
|
||||
* indicate a race in createSdkSpawnFactory's pre-spawn cleanup.
|
||||
*/
|
||||
export function getSdkProcessForSession(sessionDbId: number): TrackedSdkProcess | undefined {
|
||||
const registry = getProcessRegistry();
|
||||
const matches = registry.getBySession(sessionDbId).filter(r => r.type === 'sdk');
|
||||
|
||||
if (matches.length > 1) {
|
||||
logger.warn('PROCESS', `Multiple SDK processes found for session ${sessionDbId}`, {
|
||||
count: matches.length,
|
||||
pids: matches.map(m => m.pid),
|
||||
});
|
||||
}
|
||||
|
||||
const record = matches[0];
|
||||
if (!record) return undefined;
|
||||
|
||||
const processRef = registry.getRuntimeProcess(record.id);
|
||||
if (!processRef) return undefined;
|
||||
|
||||
return {
|
||||
pid: record.pid,
|
||||
pgid: record.pgid,
|
||||
sessionDbId,
|
||||
process: processRef,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for an SDK subprocess to exit, escalating to SIGKILL on the process
|
||||
* group if it overstays `timeoutMs`. Fully event-driven — no polling.
|
||||
*
|
||||
* This is primary-path cleanup invoked from session-level finally() blocks
|
||||
* when a session ends; it is NOT a reaper. It runs at most once per session
|
||||
* deletion. Process-group teardown (`kill(-pgid, SIGKILL)`) ensures any
|
||||
* descendants the SDK spawned are also killed.
|
||||
*/
|
||||
export async function ensureSdkProcessExit(
|
||||
tracked: TrackedSdkProcess,
|
||||
timeoutMs: number = 5000
|
||||
): Promise<void> {
|
||||
const { pid, pgid, process: proc } = tracked;
|
||||
|
||||
// Already exited? Trust exitCode, not proc.killed — proc.killed only means
|
||||
// Node sent a signal; the process may still be running.
|
||||
if (proc.exitCode !== null) return;
|
||||
|
||||
const exitPromise = new Promise<void>((resolve) => {
|
||||
proc.once('exit', () => resolve());
|
||||
});
|
||||
|
||||
const timeoutPromise = new Promise<void>((resolve) => {
|
||||
setTimeout(resolve, timeoutMs);
|
||||
});
|
||||
|
||||
await Promise.race([exitPromise, timeoutPromise]);
|
||||
|
||||
if (proc.exitCode !== null) return;
|
||||
|
||||
// Timeout: escalate to SIGKILL on the whole process group so any
|
||||
// descendants the SDK spawned are killed too (Principle 5).
|
||||
logger.warn('PROCESS', `PID ${pid} did not exit after ${timeoutMs}ms, sending SIGKILL to process group`, {
|
||||
pid, pgid, timeoutMs,
|
||||
});
|
||||
try {
|
||||
if (typeof pgid === 'number' && process.platform !== 'win32') {
|
||||
process.kill(-pgid, 'SIGKILL');
|
||||
} else {
|
||||
proc.kill('SIGKILL');
|
||||
}
|
||||
} catch {
|
||||
// Already dead — fine.
|
||||
}
|
||||
|
||||
// Wait up to 1s for SIGKILL to take effect (event-driven, not blind sleep).
|
||||
const sigkillExit = new Promise<void>((resolve) => {
|
||||
proc.once('exit', () => resolve());
|
||||
});
|
||||
const sigkillTimeout = new Promise<void>((resolve) => {
|
||||
setTimeout(resolve, 1000);
|
||||
});
|
||||
await Promise.race([sigkillExit, sigkillTimeout]);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Pool slot waiters — backpressure without eviction
|
||||
// ---------------------------------------------------------------------------
|
||||
//
|
||||
// waitForSlot is used by SDKAgent to avoid starting more concurrent SDK
|
||||
// subprocesses than configured. It is event-driven: when a process exits and
|
||||
// is unregistered, notifySlotAvailable() wakes exactly one waiter. There is
|
||||
// no polling. There is no idle-session eviction (Principle 1 — do not kick
|
||||
// live sessions to make room; a full pool must apply backpressure upstream).
|
||||
|
||||
const TOTAL_PROCESS_HARD_CAP = 10;
|
||||
const slotWaiters: Array<() => void> = [];
|
||||
|
||||
function getActiveSdkCount(): number {
|
||||
return getProcessRegistry().getAll().filter(record => record.type === 'sdk').length;
|
||||
}
|
||||
|
||||
function notifySlotAvailable(): void {
|
||||
const waiter = slotWaiters.shift();
|
||||
if (waiter) waiter();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until a pool slot is available to spawn another SDK subprocess.
|
||||
*
|
||||
* Resolves immediately when active SDK process count is below `maxConcurrent`.
|
||||
* Otherwise enqueues a waiter that is woken by a subsequent exit handler.
|
||||
* Rejects with a timeout error if no slot opens within `timeoutMs`.
|
||||
* Rejects immediately if the registry is already at the hard cap.
|
||||
*/
|
||||
export async function waitForSlot(maxConcurrent: number, timeoutMs: number = 60_000): Promise<void> {
|
||||
const activeCount = getActiveSdkCount();
|
||||
if (activeCount >= TOTAL_PROCESS_HARD_CAP) {
|
||||
throw new Error(`Hard cap exceeded: ${activeCount} processes in registry (cap=${TOTAL_PROCESS_HARD_CAP}). Refusing to spawn more.`);
|
||||
}
|
||||
|
||||
if (activeCount < maxConcurrent) return;
|
||||
|
||||
logger.info('PROCESS', `Pool limit reached (${activeCount}/${maxConcurrent}), waiting for slot...`);
|
||||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
const idx = slotWaiters.indexOf(onSlot);
|
||||
if (idx >= 0) slotWaiters.splice(idx, 1);
|
||||
reject(new Error(`Timed out waiting for agent pool slot after ${timeoutMs}ms`));
|
||||
}, timeoutMs);
|
||||
|
||||
const onSlot = () => {
|
||||
clearTimeout(timeout);
|
||||
if (getActiveSdkCount() < maxConcurrent) {
|
||||
resolve();
|
||||
} else {
|
||||
slotWaiters.push(onSlot);
|
||||
}
|
||||
};
|
||||
|
||||
slotWaiters.push(onSlot);
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// SDK subprocess spawn
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export interface SpawnedSdkProcess {
|
||||
stdin: ChildProcess['stdin'];
|
||||
stdout: ChildProcess['stdout'];
|
||||
stderr: ChildProcess['stderr'];
|
||||
readonly killed: boolean;
|
||||
readonly exitCode: number | null;
|
||||
kill: ChildProcess['kill'];
|
||||
on: ChildProcess['on'];
|
||||
once: ChildProcess['once'];
|
||||
off: ChildProcess['off'];
|
||||
}
|
||||
|
||||
export interface SpawnSdkOptions {
|
||||
command: string;
|
||||
args: string[];
|
||||
cwd?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawn a Claude SDK subprocess in its own POSIX process group.
|
||||
*
|
||||
* The spawn uses `detached: true` so the child becomes the leader of a new
|
||||
* process group (setpgid). The leader's PID equals its pgid on Unix, so we
|
||||
* store `child.pid` as both pid and pgid on the managed process record.
|
||||
* Shutdown then signals the group via `process.kill(-pgid, signal)`, tearing
|
||||
* down the SDK child AND every descendant in one syscall (Principle 5).
|
||||
*
|
||||
* Windows caveat: `detached: true` does not create a POSIX group. The
|
||||
* recorded pgid is still the child PID so Windows teardown at least kills
|
||||
* the direct child; full subtree teardown on Windows requires Job Objects
|
||||
* or `taskkill /T /F` (see shutdown.ts).
|
||||
*
|
||||
* Node's child_process.spawn is used intentionally — Bun.spawn does NOT
|
||||
* support `detached: true` (see PATHFINDER-2026-04-22/_reference.md Part 2
|
||||
* row 3), and this module must work under Bun as well as Node.
|
||||
*/
|
||||
export function spawnSdkProcess(
|
||||
sessionDbId: number,
|
||||
options: SpawnSdkOptions
|
||||
): { process: SpawnedSdkProcess; pid: number; pgid: number } | null {
|
||||
const registry = getProcessRegistry();
|
||||
|
||||
// On Windows, use cmd.exe wrapper for .cmd files to properly handle paths with spaces.
|
||||
const useCmdWrapper = process.platform === 'win32' && options.command.endsWith('.cmd');
|
||||
const env = sanitizeEnv(options.env ?? process.env);
|
||||
|
||||
// Filter empty string args AND their preceding flag (Issue #2049).
|
||||
// The Agent SDK emits ["--setting-sources", ""] when settingSources defaults to [].
|
||||
// Simply dropping "" leaves an orphan --setting-sources that consumes the next
|
||||
// flag as its value, crashing Claude Code 2.1.109+ with
|
||||
// "Invalid setting source: --permission-mode". Drop the flag too so the SDK
|
||||
// default (no setting sources) is preserved by omission.
|
||||
const filteredArgs: string[] = [];
|
||||
for (const arg of options.args) {
|
||||
if (arg === '') {
|
||||
if (filteredArgs.length > 0 && filteredArgs[filteredArgs.length - 1].startsWith('--')) {
|
||||
filteredArgs.pop();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
filteredArgs.push(arg);
|
||||
}
|
||||
|
||||
// Unix: detached:true causes the kernel to setpgid() on the child so the
|
||||
// child becomes leader of a new process group whose pgid equals its pid.
|
||||
// Windows: detached:true decouples the child from the parent console; there
|
||||
// is no POSIX group, but the flag is still safe to pass.
|
||||
const child = useCmdWrapper
|
||||
? spawn('cmd.exe', ['/d', '/c', options.command, ...filteredArgs], {
|
||||
cwd: options.cwd,
|
||||
env,
|
||||
detached: true,
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
signal: options.signal,
|
||||
windowsHide: true,
|
||||
})
|
||||
: spawn(options.command, filteredArgs, {
|
||||
cwd: options.cwd,
|
||||
env,
|
||||
detached: true,
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
signal: options.signal,
|
||||
windowsHide: true,
|
||||
});
|
||||
|
||||
if (!child.pid) {
|
||||
logger.error('PROCESS', 'Spawn succeeded but produced no PID', { sessionDbId });
|
||||
return null;
|
||||
}
|
||||
|
||||
const pid = child.pid;
|
||||
const pgid = pid; // On Unix with detached:true, pgid === pid. On Windows, this is an alias.
|
||||
|
||||
// Capture stderr for debugging spawn failures.
|
||||
if (child.stderr) {
|
||||
child.stderr.on('data', (data: Buffer) => {
|
||||
logger.debug('SDK_SPAWN', `[session-${sessionDbId}] stderr: ${data.toString().trim()}`);
|
||||
});
|
||||
}
|
||||
|
||||
// Register the process in the supervisor registry with pgid recorded so
|
||||
// the shutdown cascade can signal the whole group.
|
||||
const recordId = `sdk:${sessionDbId}:${pid}`;
|
||||
registry.register(recordId, {
|
||||
pid,
|
||||
type: 'sdk',
|
||||
sessionId: sessionDbId,
|
||||
startedAt: new Date().toISOString(),
|
||||
pgid,
|
||||
}, child);
|
||||
|
||||
// Auto-unregister on exit. child.on('exit') is the authoritative event-driven
|
||||
// signal that a process has left — no polling, no sweeper needed (Principle 4).
|
||||
child.on('exit', (code: number | null, signal: string | null) => {
|
||||
if (code !== 0) {
|
||||
logger.warn('SDK_SPAWN', `[session-${sessionDbId}] Claude process exited`, { code, signal, pid });
|
||||
}
|
||||
registry.unregister(recordId);
|
||||
// Wake one pool-slot waiter since a slot just freed up.
|
||||
notifySlotAvailable();
|
||||
});
|
||||
|
||||
const spawned: SpawnedSdkProcess = {
|
||||
stdin: child.stdin,
|
||||
stdout: child.stdout,
|
||||
stderr: child.stderr,
|
||||
get killed() { return child.killed; },
|
||||
get exitCode() { return child.exitCode; },
|
||||
kill: child.kill.bind(child),
|
||||
on: child.on.bind(child),
|
||||
once: child.once.bind(child),
|
||||
off: child.off.bind(child),
|
||||
};
|
||||
|
||||
return { process: spawned, pid, pgid };
|
||||
}
|
||||
|
||||
/**
|
||||
* SDK-compatible spawn factory.
|
||||
*
|
||||
* The Claude Agent SDK's `spawnClaudeCodeProcess` option calls our factory
|
||||
* with its own spawn arguments; we forward them into `spawnSdkProcess` which
|
||||
* creates the child in its own process group and records it in the supervisor
|
||||
* registry. The returned shape is the minimal subset of ChildProcess that the
|
||||
* SDK consumes — stdin/stdout/stderr pipes, killed/exitCode getters, and
|
||||
* kill/on/once/off.
|
||||
*
|
||||
* Pre-spawn cleanup: if a previous process for this session is still alive
|
||||
* (e.g. a crash-recovery attempt that collided with a still-running SDK),
|
||||
* SIGTERM it. Multiple processes sharing the same --resume UUID waste API
|
||||
* credits and can conflict with each other (Issue #1590).
|
||||
*/
|
||||
export function createSdkSpawnFactory(sessionDbId: number) {
|
||||
return (spawnOptions: SpawnSdkOptions): SpawnedSdkProcess => {
|
||||
const registry = getProcessRegistry();
|
||||
|
||||
// Kill any existing process for this session before spawning a new one.
|
||||
const existing = registry.getBySession(sessionDbId).filter(r => r.type === 'sdk');
|
||||
for (const record of existing) {
|
||||
if (!isPidAlive(record.pid)) continue;
|
||||
try {
|
||||
if (typeof record.pgid === 'number') {
|
||||
// Signal the whole group — kill the SDK child and any descendants.
|
||||
if (process.platform !== 'win32') {
|
||||
process.kill(-record.pgid, 'SIGTERM');
|
||||
} else {
|
||||
process.kill(record.pid, 'SIGTERM');
|
||||
}
|
||||
} else {
|
||||
process.kill(record.pid, 'SIGTERM');
|
||||
}
|
||||
logger.warn('PROCESS', `Killing duplicate SDK process PID ${record.pid} before spawning new one for session ${sessionDbId}`, {
|
||||
existingPid: record.pid,
|
||||
sessionDbId,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
const code = error instanceof Error ? (error as NodeJS.ErrnoException).code : undefined;
|
||||
if (code !== 'ESRCH') {
|
||||
if (error instanceof Error) {
|
||||
logger.warn('PROCESS', `Failed to SIGTERM duplicate SDK process PID ${record.pid}`, { sessionDbId }, error);
|
||||
} else {
|
||||
logger.warn('PROCESS', `Failed to SIGTERM duplicate SDK process PID ${record.pid} (non-Error)`, {
|
||||
sessionDbId, error: String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const result = spawnSdkProcess(sessionDbId, spawnOptions);
|
||||
if (!result) {
|
||||
// Match the legacy failure mode: the SDK needs a process-like object
|
||||
// even on spawn failure; throwing here surfaces via exit code 2 to the
|
||||
// hook layer (Principle 2 — fail-fast).
|
||||
throw new Error(`Failed to spawn SDK subprocess for session ${sessionDbId}`);
|
||||
}
|
||||
|
||||
return result.process;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -34,16 +34,18 @@ export async function runShutdownCascade(options: ShutdownCascadeOptions): Promi
|
||||
}
|
||||
|
||||
try {
|
||||
await signalProcess(record.pid, 'SIGTERM');
|
||||
await signalProcess(record, 'SIGTERM');
|
||||
} catch (error: unknown) {
|
||||
if (error instanceof Error) {
|
||||
logger.debug('SYSTEM', 'Failed to send SIGTERM to child process', {
|
||||
pid: record.pid,
|
||||
pgid: record.pgid,
|
||||
type: record.type
|
||||
}, error);
|
||||
} else {
|
||||
logger.warn('SYSTEM', 'Failed to send SIGTERM to child process (non-Error)', {
|
||||
pid: record.pid,
|
||||
pgid: record.pgid,
|
||||
type: record.type,
|
||||
error: String(error)
|
||||
});
|
||||
@@ -56,16 +58,18 @@ export async function runShutdownCascade(options: ShutdownCascadeOptions): Promi
|
||||
const survivors = childRecords.filter(record => isPidAlive(record.pid));
|
||||
for (const record of survivors) {
|
||||
try {
|
||||
await signalProcess(record.pid, 'SIGKILL');
|
||||
await signalProcess(record, 'SIGKILL');
|
||||
} catch (error: unknown) {
|
||||
if (error instanceof Error) {
|
||||
logger.debug('SYSTEM', 'Failed to force kill child process', {
|
||||
pid: record.pid,
|
||||
pgid: record.pgid,
|
||||
type: record.type
|
||||
}, error);
|
||||
} else {
|
||||
logger.warn('SYSTEM', 'Failed to force kill child process (non-Error)', {
|
||||
pid: record.pid,
|
||||
pgid: record.pgid,
|
||||
type: record.type,
|
||||
error: String(error)
|
||||
});
|
||||
@@ -110,7 +114,38 @@ async function waitForExit(records: ManagedProcessRecord[], timeoutMs: number):
|
||||
}
|
||||
}
|
||||
|
||||
async function signalProcess(pid: number, signal: 'SIGTERM' | 'SIGKILL'): Promise<void> {
|
||||
async function signalProcess(record: ManagedProcessRecord, signal: 'SIGTERM' | 'SIGKILL'): Promise<void> {
|
||||
const { pid, pgid } = record;
|
||||
|
||||
// Unix path: when the record carries a pgid (set when the child was spawned
|
||||
// with detached:true so it became its own group leader), signal the negative
|
||||
// PID to tear down the whole process group in one syscall — the SDK child
|
||||
// AND every descendant it spawned. This replaces hand-rolled orphan sweeps
|
||||
// (Principle 5: OS-supervised process groups over hand-rolled reapers).
|
||||
//
|
||||
// Falls back to single-PID kill when pgid is absent (the worker itself,
|
||||
// MCP stdio clients, anything not spawned with detached:true).
|
||||
if (process.platform !== 'win32') {
|
||||
try {
|
||||
if (typeof pgid === 'number') {
|
||||
process.kill(-pgid, signal);
|
||||
} else {
|
||||
process.kill(pid, signal);
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
if (error instanceof Error) {
|
||||
const errno = (error as NodeJS.ErrnoException).code;
|
||||
if (errno === 'ESRCH') {
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Windows: no POSIX process groups. SIGTERM uses single-PID kill; SIGKILL
|
||||
// uses tree-kill or taskkill /T to walk the descendant tree.
|
||||
if (signal === 'SIGTERM') {
|
||||
try {
|
||||
process.kill(pid, signal);
|
||||
@@ -126,50 +161,35 @@ async function signalProcess(pid: number, signal: 'SIGTERM' | 'SIGKILL'): Promis
|
||||
return;
|
||||
}
|
||||
|
||||
if (process.platform === 'win32') {
|
||||
const treeKill = await loadTreeKill();
|
||||
if (treeKill) {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
treeKill(pid, signal, (error) => {
|
||||
if (!error) {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
const treeKill = await loadTreeKill();
|
||||
if (treeKill) {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
treeKill(pid, signal, (error) => {
|
||||
if (!error) {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
const errno = (error as NodeJS.ErrnoException).code;
|
||||
if (errno === 'ESRCH') {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
reject(error);
|
||||
});
|
||||
const errno = (error as NodeJS.ErrnoException).code;
|
||||
if (errno === 'ESRCH') {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
reject(error);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const args = ['/PID', String(pid), '/T'];
|
||||
if (signal === 'SIGKILL') {
|
||||
args.push('/F');
|
||||
}
|
||||
|
||||
await execFileAsync('taskkill', args, {
|
||||
timeout: HOOK_TIMEOUTS.POWERSHELL_COMMAND,
|
||||
windowsHide: true
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
process.kill(pid, signal);
|
||||
} catch (error: unknown) {
|
||||
if (error instanceof Error) {
|
||||
const errno = (error as NodeJS.ErrnoException).code;
|
||||
if (errno === 'ESRCH') {
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw error;
|
||||
const args = ['/PID', String(pid), '/T'];
|
||||
if (signal === 'SIGKILL') {
|
||||
args.push('/F');
|
||||
}
|
||||
|
||||
await execFileAsync('taskkill', args, {
|
||||
timeout: HOOK_TIMEOUTS.POWERSHELL_COMMAND,
|
||||
windowsHide: true
|
||||
});
|
||||
}
|
||||
|
||||
async function loadTreeKill(): Promise<TreeKillFn | null> {
|
||||
|
||||
@@ -1,291 +0,0 @@
|
||||
/**
|
||||
* Tests for Issue #1652: Stuck generator (zombie subprocess) detection in reapStaleSessions()
|
||||
*
|
||||
* Root cause: reapStaleSessions() unconditionally skipped sessions where
|
||||
* `session.generatorPromise` was non-null, meaning generators stuck inside
|
||||
* `for await (const msg of queryResult)` (blocked on a hung subprocess) were
|
||||
* never cleaned up — even after the session's Stop hook completed.
|
||||
*
|
||||
* Fix: Check `session.lastGeneratorActivity`. If it hasn't updated in
|
||||
* MAX_GENERATOR_IDLE_MS (5 min), SIGKILL the subprocess to unblock the
|
||||
* for-await, then abort the controller so the generator exits.
|
||||
*
|
||||
* Mock Justification (~30% mock code):
|
||||
* - Session fixtures: Required to create valid ActiveSession objects with all
|
||||
* required fields — tests the actual detection logic, not fixture creation.
|
||||
* - Process mock: Verify SIGKILL is sent and abort is called — no real subprocess needed.
|
||||
*/
|
||||
|
||||
import { describe, test, expect, beforeEach, afterEach, mock, setSystemTime } from 'bun:test';
|
||||
import {
|
||||
MAX_GENERATOR_IDLE_MS,
|
||||
MAX_SESSION_IDLE_MS,
|
||||
detectStaleGenerator,
|
||||
type StaleGeneratorCandidate,
|
||||
} from '../../../src/services/worker/SessionManager.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
interface MockProcess {
|
||||
exitCode: number | null;
|
||||
killed: boolean;
|
||||
kill: (signal?: string) => boolean;
|
||||
_lastSignal?: string;
|
||||
}
|
||||
|
||||
function createMockProcess(exitCode: number | null = null): MockProcess {
|
||||
const proc: MockProcess = {
|
||||
exitCode,
|
||||
killed: false,
|
||||
kill(signal?: string) {
|
||||
proc.killed = true;
|
||||
proc._lastSignal = signal;
|
||||
return true;
|
||||
},
|
||||
};
|
||||
return proc;
|
||||
}
|
||||
|
||||
interface TestSession extends StaleGeneratorCandidate {
|
||||
sessionDbId: number;
|
||||
startTime: number;
|
||||
}
|
||||
|
||||
function createSession(overrides: Partial<TestSession> = {}): TestSession {
|
||||
return {
|
||||
sessionDbId: 1,
|
||||
generatorPromise: null,
|
||||
lastGeneratorActivity: Date.now(),
|
||||
abortController: new AbortController(),
|
||||
startTime: Date.now(),
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('reapStaleSessions — stale generator detection (Issue #1652)', () => {
|
||||
|
||||
describe('threshold constants', () => {
|
||||
test('MAX_GENERATOR_IDLE_MS should be 5 minutes', () => {
|
||||
expect(MAX_GENERATOR_IDLE_MS).toBe(5 * 60 * 1000);
|
||||
});
|
||||
|
||||
test('MAX_SESSION_IDLE_MS should be 15 minutes', () => {
|
||||
expect(MAX_SESSION_IDLE_MS).toBe(15 * 60 * 1000);
|
||||
});
|
||||
|
||||
test('generator idle threshold should be less than session idle threshold', () => {
|
||||
// Ensures stuck generators are cleaned up before idle no-generator sessions
|
||||
expect(MAX_GENERATOR_IDLE_MS).toBeLessThan(MAX_SESSION_IDLE_MS);
|
||||
});
|
||||
});
|
||||
|
||||
describe('stale generator detection', () => {
|
||||
test('should detect generator as stale when idle > 5 minutes', () => {
|
||||
const session = createSession({
|
||||
generatorPromise: Promise.resolve(),
|
||||
lastGeneratorActivity: Date.now() - (MAX_GENERATOR_IDLE_MS + 1000), // 5m1s ago
|
||||
});
|
||||
const proc = createMockProcess();
|
||||
|
||||
const isStale = detectStaleGenerator(session, proc);
|
||||
|
||||
expect(isStale).toBe(true);
|
||||
});
|
||||
|
||||
test('should NOT detect generator as stale when idle exactly at threshold', () => {
|
||||
// At exactly the threshold we do NOT yet reap (strictly greater than).
|
||||
// Freeze time so that both the session creation and detectStaleGenerator
|
||||
// call share the same Date.now() value, preventing a race where the two
|
||||
// calls return different timestamps and push the idle time over the boundary.
|
||||
const now = Date.now();
|
||||
setSystemTime(now);
|
||||
try {
|
||||
const session = createSession({
|
||||
generatorPromise: Promise.resolve(),
|
||||
lastGeneratorActivity: now - MAX_GENERATOR_IDLE_MS,
|
||||
});
|
||||
const proc = createMockProcess();
|
||||
|
||||
const isStale = detectStaleGenerator(session, proc);
|
||||
|
||||
expect(isStale).toBe(false);
|
||||
} finally {
|
||||
setSystemTime(); // restore real time
|
||||
}
|
||||
});
|
||||
|
||||
test('should NOT detect generator as stale when idle < 5 minutes', () => {
|
||||
const session = createSession({
|
||||
generatorPromise: Promise.resolve(),
|
||||
lastGeneratorActivity: Date.now() - 60_000, // 1 minute ago
|
||||
});
|
||||
const proc = createMockProcess();
|
||||
|
||||
const isStale = detectStaleGenerator(session, proc);
|
||||
|
||||
expect(isStale).toBe(false);
|
||||
});
|
||||
|
||||
test('should NOT flag sessions without a generator (no generator = different code path)', () => {
|
||||
const session = createSession({
|
||||
generatorPromise: null,
|
||||
// Even though lastGeneratorActivity is ancient, no generator means no stale-generator detection
|
||||
lastGeneratorActivity: 0,
|
||||
});
|
||||
const proc = createMockProcess();
|
||||
|
||||
const isStale = detectStaleGenerator(session, proc);
|
||||
|
||||
expect(isStale).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('subprocess kill on stale generator', () => {
|
||||
test('should SIGKILL the subprocess when stale generator detected', () => {
|
||||
const session = createSession({
|
||||
generatorPromise: Promise.resolve(),
|
||||
lastGeneratorActivity: Date.now() - (MAX_GENERATOR_IDLE_MS + 5000),
|
||||
});
|
||||
const proc = createMockProcess(); // exitCode === null (still running)
|
||||
|
||||
detectStaleGenerator(session, proc);
|
||||
|
||||
expect(proc.killed).toBe(true);
|
||||
expect(proc._lastSignal).toBe('SIGKILL');
|
||||
});
|
||||
|
||||
test('should NOT attempt to kill an already-exited subprocess', () => {
|
||||
const session = createSession({
|
||||
generatorPromise: Promise.resolve(),
|
||||
lastGeneratorActivity: Date.now() - (MAX_GENERATOR_IDLE_MS + 5000),
|
||||
});
|
||||
const proc = createMockProcess(0); // exitCode === 0 (already exited)
|
||||
|
||||
detectStaleGenerator(session, proc);
|
||||
|
||||
// Should not try to kill an already-exited process
|
||||
expect(proc.killed).toBe(false);
|
||||
});
|
||||
|
||||
test('should still abort controller even when no tracked subprocess found', () => {
|
||||
const session = createSession({
|
||||
generatorPromise: Promise.resolve(),
|
||||
lastGeneratorActivity: Date.now() - (MAX_GENERATOR_IDLE_MS + 5000),
|
||||
});
|
||||
|
||||
// proc is undefined — subprocess not tracked in ProcessRegistry
|
||||
detectStaleGenerator(session, undefined);
|
||||
|
||||
// AbortController should still be aborted to signal the generator loop
|
||||
expect(session.abortController.signal.aborted).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('abort controller on stale generator', () => {
|
||||
test('should abort the session controller when stale generator detected', () => {
|
||||
const session = createSession({
|
||||
generatorPromise: Promise.resolve(),
|
||||
lastGeneratorActivity: Date.now() - (MAX_GENERATOR_IDLE_MS + 1000),
|
||||
});
|
||||
const proc = createMockProcess();
|
||||
|
||||
expect(session.abortController.signal.aborted).toBe(false);
|
||||
|
||||
detectStaleGenerator(session, proc);
|
||||
|
||||
expect(session.abortController.signal.aborted).toBe(true);
|
||||
});
|
||||
|
||||
test('should NOT abort controller for fresh generator', () => {
|
||||
const session = createSession({
|
||||
generatorPromise: Promise.resolve(),
|
||||
lastGeneratorActivity: Date.now() - 30_000, // 30 seconds ago — fresh
|
||||
});
|
||||
const proc = createMockProcess();
|
||||
|
||||
detectStaleGenerator(session, proc);
|
||||
|
||||
expect(session.abortController.signal.aborted).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('idle session reaping (existing behaviour preserved)', () => {
|
||||
test('idle session without generator should be reaped after 15 minutes', () => {
|
||||
const session = createSession({
|
||||
generatorPromise: null,
|
||||
startTime: Date.now() - (MAX_SESSION_IDLE_MS + 1000), // 15m1s ago
|
||||
});
|
||||
|
||||
// Simulate the existing idle-session path (no generator, no pending work)
|
||||
const sessionAge = Date.now() - session.startTime;
|
||||
const shouldReap = !session.generatorPromise && sessionAge > MAX_SESSION_IDLE_MS;
|
||||
|
||||
expect(shouldReap).toBe(true);
|
||||
});
|
||||
|
||||
test('idle session without generator should NOT be reaped before 15 minutes', () => {
|
||||
const session = createSession({
|
||||
generatorPromise: null,
|
||||
startTime: Date.now() - (10 * 60 * 1000), // 10 minutes ago
|
||||
});
|
||||
|
||||
const sessionAge = Date.now() - session.startTime;
|
||||
const shouldReap = !session.generatorPromise && sessionAge > MAX_SESSION_IDLE_MS;
|
||||
|
||||
expect(shouldReap).toBe(false);
|
||||
});
|
||||
|
||||
test('session with active generator should never be reaped by idle-session path', () => {
|
||||
const session = createSession({
|
||||
generatorPromise: Promise.resolve(),
|
||||
startTime: Date.now() - (60 * 60 * 1000), // 1 hour ago — very old
|
||||
// But generator was active recently (fresh activity)
|
||||
lastGeneratorActivity: Date.now() - 10_000,
|
||||
});
|
||||
const proc = createMockProcess();
|
||||
|
||||
// Stale generator detection says NOT stale (activity is fresh)
|
||||
const isStaleGenerator = detectStaleGenerator(session, proc);
|
||||
expect(isStaleGenerator).toBe(false);
|
||||
|
||||
// Idle-session path is skipped because generatorPromise is non-null
|
||||
expect(session.generatorPromise).not.toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe('lastGeneratorActivity update semantics', () => {
|
||||
test('should be initialized to session startTime to avoid false positives on boot', () => {
|
||||
// When a session is first created, lastGeneratorActivity must be set to a
|
||||
// recent time so the generator isn't immediately flagged as stale before it
|
||||
// has had a chance to produce output.
|
||||
const now = Date.now();
|
||||
const session = createSession({
|
||||
startTime: now,
|
||||
lastGeneratorActivity: now, // mirrors SessionManager initialization
|
||||
});
|
||||
|
||||
const generatorIdleMs = now - session.lastGeneratorActivity;
|
||||
expect(generatorIdleMs).toBeLessThan(MAX_GENERATOR_IDLE_MS);
|
||||
});
|
||||
|
||||
test('should be updated when generator yields a message (prevents false positive reap)', () => {
|
||||
const session = createSession({
|
||||
generatorPromise: Promise.resolve(),
|
||||
lastGeneratorActivity: Date.now() - (MAX_GENERATOR_IDLE_MS - 10_000), // 4m50s ago
|
||||
});
|
||||
|
||||
// Simulate the getMessageIterator yielding a message:
|
||||
session.lastGeneratorActivity = Date.now();
|
||||
|
||||
// Generator is now fresh — should not be reaped
|
||||
const generatorIdleMs = Date.now() - session.lastGeneratorActivity;
|
||||
expect(generatorIdleMs).toBeLessThan(MAX_GENERATOR_IDLE_MS);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,204 +0,0 @@
|
||||
import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
|
||||
import { EventEmitter } from 'events';
|
||||
import {
|
||||
registerProcess,
|
||||
unregisterProcess,
|
||||
getProcessBySession,
|
||||
getActiveCount,
|
||||
getActiveProcesses,
|
||||
waitForSlot,
|
||||
ensureProcessExit,
|
||||
} from '../../src/services/worker/ProcessRegistry.js';
|
||||
|
||||
/**
|
||||
* Create a mock ChildProcess that behaves like a real one for testing.
|
||||
* Supports exitCode, killed, kill(), and event emission.
|
||||
*/
|
||||
function createMockProcess(overrides: { exitCode?: number | null; killed?: boolean } = {}) {
|
||||
const emitter = new EventEmitter();
|
||||
const mock = Object.assign(emitter, {
|
||||
pid: Math.floor(Math.random() * 100000) + 1000,
|
||||
exitCode: overrides.exitCode ?? null,
|
||||
killed: overrides.killed ?? false,
|
||||
kill(signal?: string) {
|
||||
mock.killed = true;
|
||||
// Simulate async exit after kill
|
||||
setTimeout(() => {
|
||||
mock.exitCode = signal === 'SIGKILL' ? null : 0;
|
||||
mock.emit('exit', mock.exitCode, signal || 'SIGTERM');
|
||||
}, 10);
|
||||
return true;
|
||||
},
|
||||
stdin: null,
|
||||
stdout: null,
|
||||
stderr: null,
|
||||
});
|
||||
return mock;
|
||||
}
|
||||
|
||||
// Helper to clear registry between tests by unregistering all
|
||||
function clearRegistry() {
|
||||
for (const p of getActiveProcesses()) {
|
||||
unregisterProcess(p.pid);
|
||||
}
|
||||
}
|
||||
|
||||
describe('ProcessRegistry', () => {
|
||||
beforeEach(() => {
|
||||
clearRegistry();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clearRegistry();
|
||||
});
|
||||
|
||||
describe('registerProcess / unregisterProcess', () => {
|
||||
it('should register and track a process', () => {
|
||||
const proc = createMockProcess();
|
||||
registerProcess(proc.pid, 1, proc as any);
|
||||
expect(getActiveCount()).toBe(1);
|
||||
expect(getProcessBySession(1)).toBeDefined();
|
||||
});
|
||||
|
||||
it('should unregister a process and free the slot', () => {
|
||||
const proc = createMockProcess();
|
||||
registerProcess(proc.pid, 1, proc as any);
|
||||
unregisterProcess(proc.pid);
|
||||
expect(getActiveCount()).toBe(0);
|
||||
expect(getProcessBySession(1)).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('getProcessBySession', () => {
|
||||
it('should return undefined for unknown session', () => {
|
||||
expect(getProcessBySession(999)).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should find process by session ID', () => {
|
||||
const proc = createMockProcess();
|
||||
registerProcess(proc.pid, 42, proc as any);
|
||||
const found = getProcessBySession(42);
|
||||
expect(found).toBeDefined();
|
||||
expect(found!.pid).toBe(proc.pid);
|
||||
});
|
||||
});
|
||||
|
||||
describe('waitForSlot', () => {
|
||||
it('should resolve immediately when under limit', async () => {
|
||||
await waitForSlot(2); // 0 processes, limit 2
|
||||
});
|
||||
|
||||
it('should wait until a slot opens', async () => {
|
||||
const proc1 = createMockProcess();
|
||||
const proc2 = createMockProcess();
|
||||
registerProcess(proc1.pid, 1, proc1 as any);
|
||||
registerProcess(proc2.pid, 2, proc2 as any);
|
||||
|
||||
// Start waiting for slot (limit=2, both slots full)
|
||||
const waitPromise = waitForSlot(2, 5000);
|
||||
|
||||
// Free a slot after 50ms
|
||||
setTimeout(() => unregisterProcess(proc1.pid), 50);
|
||||
|
||||
await waitPromise; // Should resolve once slot freed
|
||||
expect(getActiveCount()).toBe(1);
|
||||
});
|
||||
|
||||
it('should throw on timeout when no slot opens', async () => {
|
||||
const proc1 = createMockProcess();
|
||||
const proc2 = createMockProcess();
|
||||
registerProcess(proc1.pid, 1, proc1 as any);
|
||||
registerProcess(proc2.pid, 2, proc2 as any);
|
||||
|
||||
await expect(waitForSlot(2, 100)).rejects.toThrow('Timed out waiting for agent pool slot');
|
||||
});
|
||||
|
||||
it('should throw when hard cap (10) is exceeded', async () => {
|
||||
// Register 10 processes to hit the hard cap
|
||||
const procs = [];
|
||||
for (let i = 0; i < 10; i++) {
|
||||
const proc = createMockProcess();
|
||||
registerProcess(proc.pid, i + 100, proc as any);
|
||||
procs.push(proc);
|
||||
}
|
||||
|
||||
await expect(waitForSlot(20)).rejects.toThrow('Hard cap exceeded');
|
||||
});
|
||||
});
|
||||
|
||||
describe('ensureProcessExit', () => {
|
||||
it('should unregister immediately if exitCode is set', async () => {
|
||||
const proc = createMockProcess({ exitCode: 0 });
|
||||
registerProcess(proc.pid, 1, proc as any);
|
||||
|
||||
await ensureProcessExit({ pid: proc.pid, sessionDbId: 1, spawnedAt: Date.now(), process: proc as any });
|
||||
expect(getActiveCount()).toBe(0);
|
||||
});
|
||||
|
||||
it('should NOT treat proc.killed as exited — must wait for actual exit', async () => {
|
||||
// This is the core bug fix: proc.killed=true but exitCode=null means NOT dead
|
||||
const proc = createMockProcess({ killed: true, exitCode: null });
|
||||
registerProcess(proc.pid, 1, proc as any);
|
||||
|
||||
// Override kill to simulate SIGKILL + delayed exit
|
||||
proc.kill = (signal?: string) => {
|
||||
proc.killed = true;
|
||||
setTimeout(() => {
|
||||
proc.exitCode = 0;
|
||||
proc.emit('exit', 0, signal);
|
||||
}, 20);
|
||||
return true;
|
||||
};
|
||||
|
||||
// ensureProcessExit should NOT short-circuit on proc.killed
|
||||
// It should wait for exit event or timeout, then escalate to SIGKILL
|
||||
const start = Date.now();
|
||||
await ensureProcessExit({ pid: proc.pid, sessionDbId: 1, spawnedAt: Date.now(), process: proc as any }, 100);
|
||||
expect(getActiveCount()).toBe(0);
|
||||
});
|
||||
|
||||
it('should escalate to SIGKILL after timeout', async () => {
|
||||
const proc = createMockProcess();
|
||||
registerProcess(proc.pid, 1, proc as any);
|
||||
|
||||
// Override kill: only respond to SIGKILL
|
||||
let sigkillSent = false;
|
||||
proc.kill = (signal?: string) => {
|
||||
proc.killed = true;
|
||||
if (signal === 'SIGKILL') {
|
||||
sigkillSent = true;
|
||||
setTimeout(() => {
|
||||
proc.exitCode = -1;
|
||||
proc.emit('exit', -1, 'SIGKILL');
|
||||
}, 10);
|
||||
}
|
||||
// Don't emit exit for non-SIGKILL signals (simulates stuck process)
|
||||
return true;
|
||||
};
|
||||
|
||||
await ensureProcessExit({ pid: proc.pid, sessionDbId: 1, spawnedAt: Date.now(), process: proc as any }, 100);
|
||||
expect(sigkillSent).toBe(true);
|
||||
expect(getActiveCount()).toBe(0);
|
||||
});
|
||||
|
||||
it('should unregister even if process ignores SIGKILL (after 1s timeout)', async () => {
|
||||
const proc = createMockProcess();
|
||||
registerProcess(proc.pid, 1, proc as any);
|
||||
|
||||
// Override kill to never emit exit (completely stuck process)
|
||||
proc.kill = () => {
|
||||
proc.killed = true;
|
||||
return true;
|
||||
};
|
||||
|
||||
const start = Date.now();
|
||||
await ensureProcessExit({ pid: proc.pid, sessionDbId: 1, spawnedAt: Date.now(), process: proc as any }, 100);
|
||||
const elapsed = Date.now() - start;
|
||||
|
||||
// Should have waited ~100ms for graceful + ~1000ms for SIGKILL timeout
|
||||
expect(elapsed).toBeGreaterThan(90);
|
||||
// Process is unregistered regardless (safety net)
|
||||
expect(getActiveCount()).toBe(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,251 +0,0 @@
|
||||
/**
|
||||
* Tests for Issue #1590: Session lifecycle guards to prevent runaway API spend
|
||||
*
|
||||
* Validates three lifecycle safety mechanisms:
|
||||
* 1. SIGTERM detection: externally-killed processes must NOT trigger crash recovery
|
||||
* 2. Wall-clock age limit: sessions older than MAX_SESSION_WALL_CLOCK_MS must be terminated
|
||||
* 3. Duplicate process prevention: a new spawn for a session kills any existing process first
|
||||
*/
|
||||
|
||||
import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
|
||||
import { EventEmitter } from 'events';
|
||||
import {
|
||||
registerProcess,
|
||||
unregisterProcess,
|
||||
getProcessBySession,
|
||||
getActiveCount,
|
||||
getActiveProcesses,
|
||||
createPidCapturingSpawn,
|
||||
} from '../../src/services/worker/ProcessRegistry.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function createMockProcess(overrides: { exitCode?: number | null; killed?: boolean } = {}) {
|
||||
const emitter = new EventEmitter();
|
||||
const mock = Object.assign(emitter, {
|
||||
pid: Math.floor(Math.random() * 100_000) + 10_000,
|
||||
exitCode: overrides.exitCode ?? null,
|
||||
killed: overrides.killed ?? false,
|
||||
stdin: null as null,
|
||||
stdout: null as null,
|
||||
stderr: null as null,
|
||||
kill(signal?: string) {
|
||||
mock.killed = true;
|
||||
setTimeout(() => {
|
||||
mock.exitCode = 0;
|
||||
mock.emit('exit', mock.exitCode, signal || 'SIGTERM');
|
||||
}, 10);
|
||||
return true;
|
||||
},
|
||||
on: emitter.on.bind(emitter),
|
||||
once: emitter.once.bind(emitter),
|
||||
off: emitter.off.bind(emitter),
|
||||
});
|
||||
return mock;
|
||||
}
|
||||
|
||||
function clearRegistry() {
|
||||
for (const p of getActiveProcesses()) {
|
||||
unregisterProcess(p.pid);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 1. SIGTERM detection — does NOT trigger crash recovery
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('SIGTERM detection (Issue #1590)', () => {
|
||||
it('should classify "code 143" as a SIGTERM error', () => {
|
||||
const errorMsg = 'Claude Code process exited with code 143';
|
||||
const isSigterm = errorMsg.includes('code 143') || errorMsg.includes('signal SIGTERM');
|
||||
expect(isSigterm).toBe(true);
|
||||
});
|
||||
|
||||
it('should classify "signal SIGTERM" as a SIGTERM error', () => {
|
||||
const errorMsg = 'Process terminated with signal SIGTERM';
|
||||
const isSigterm = errorMsg.includes('code 143') || errorMsg.includes('signal SIGTERM');
|
||||
expect(isSigterm).toBe(true);
|
||||
});
|
||||
|
||||
it('should NOT classify ordinary errors as SIGTERM', () => {
|
||||
const errorMsg = 'Invalid API key';
|
||||
const isSigterm = errorMsg.includes('code 143') || errorMsg.includes('signal SIGTERM');
|
||||
expect(isSigterm).toBe(false);
|
||||
});
|
||||
|
||||
it('should NOT classify code 1 (normal error) as SIGTERM', () => {
|
||||
const errorMsg = 'Claude Code process exited with code 1';
|
||||
const isSigterm = errorMsg.includes('code 143') || errorMsg.includes('signal SIGTERM');
|
||||
expect(isSigterm).toBe(false);
|
||||
});
|
||||
|
||||
it('aborting the controller should mark wasAborted=true, preventing crash recovery', () => {
|
||||
// Simulate what the catch handler does: abort when SIGTERM detected
|
||||
const abortController = new AbortController();
|
||||
expect(abortController.signal.aborted).toBe(false);
|
||||
|
||||
// SIGTERM arrives — we abort the controller
|
||||
abortController.abort();
|
||||
|
||||
// By the time .finally() runs, wasAborted should be true
|
||||
const wasAborted = abortController.signal.aborted;
|
||||
expect(wasAborted).toBe(true);
|
||||
});
|
||||
|
||||
it('should NOT abort the controller for non-SIGTERM crash errors', () => {
|
||||
const abortController = new AbortController();
|
||||
const errorMsg = 'FOREIGN KEY constraint failed';
|
||||
|
||||
// Non-SIGTERM: do NOT abort
|
||||
const isSigterm = errorMsg.includes('code 143') || errorMsg.includes('signal SIGTERM');
|
||||
if (isSigterm) {
|
||||
abortController.abort();
|
||||
}
|
||||
|
||||
expect(abortController.signal.aborted).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 2. Wall-clock age limit
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('Wall-clock age limit (Issue #1590)', () => {
|
||||
const MAX_SESSION_WALL_CLOCK_MS = 4 * 60 * 60 * 1000; // 4 hours (matches SessionRoutes)
|
||||
|
||||
it('should NOT terminate a session started < 4 hours ago', () => {
|
||||
const startTime = Date.now() - 30 * 60 * 1000; // 30 minutes ago
|
||||
const sessionAgeMs = Date.now() - startTime;
|
||||
expect(sessionAgeMs).toBeLessThan(MAX_SESSION_WALL_CLOCK_MS);
|
||||
});
|
||||
|
||||
it('should NOT terminate a session started exactly 4 hours ago (strict >)', () => {
|
||||
// Production uses strict `>` (not `>=`), so exactly 4h is still alive.
|
||||
const startTime = Date.now() - MAX_SESSION_WALL_CLOCK_MS;
|
||||
const sessionAgeMs = Date.now() - startTime;
|
||||
// At exactly the boundary, sessionAgeMs === MAX, and `>` is false → no termination.
|
||||
expect(sessionAgeMs).toBeLessThanOrEqual(MAX_SESSION_WALL_CLOCK_MS);
|
||||
});
|
||||
|
||||
it('should terminate a session started more than 4 hours ago', () => {
|
||||
const startTime = Date.now() - MAX_SESSION_WALL_CLOCK_MS - 1;
|
||||
const sessionAgeMs = Date.now() - startTime;
|
||||
expect(sessionAgeMs).toBeGreaterThan(MAX_SESSION_WALL_CLOCK_MS);
|
||||
});
|
||||
|
||||
it('should terminate a session started 13+ hours ago (the issue scenario)', () => {
|
||||
const startTime = Date.now() - 13 * 60 * 60 * 1000; // 13 hours ago
|
||||
const sessionAgeMs = Date.now() - startTime;
|
||||
expect(sessionAgeMs).toBeGreaterThan(MAX_SESSION_WALL_CLOCK_MS);
|
||||
});
|
||||
|
||||
it('aborting + draining pending queue should prevent respawn', () => {
|
||||
// Simulate the wall-clock termination sequence:
|
||||
// 1. Abort controller (stops active generator)
|
||||
// 2. Mark pending messages abandoned (no work to restart for)
|
||||
// 3. Remove session from map
|
||||
|
||||
const abortController = new AbortController();
|
||||
let pendingAbandoned = 0;
|
||||
let sessionRemoved = false;
|
||||
|
||||
// Simulate abort
|
||||
abortController.abort();
|
||||
expect(abortController.signal.aborted).toBe(true);
|
||||
|
||||
// Simulate markAllSessionMessagesAbandoned
|
||||
pendingAbandoned = 3; // Pretend 3 messages were abandoned
|
||||
|
||||
// Simulate removeSessionImmediate
|
||||
sessionRemoved = true;
|
||||
|
||||
expect(pendingAbandoned).toBeGreaterThanOrEqual(0);
|
||||
expect(sessionRemoved).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 3. Duplicate process prevention in createPidCapturingSpawn
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('Duplicate process prevention (Issue #1590)', () => {
|
||||
beforeEach(() => {
|
||||
clearRegistry();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clearRegistry();
|
||||
});
|
||||
|
||||
it('should detect a duplicate when a live process already exists for the session', () => {
|
||||
const proc = createMockProcess();
|
||||
registerProcess(proc.pid, 42, proc as any);
|
||||
|
||||
const existing = getProcessBySession(42);
|
||||
expect(existing).toBeDefined();
|
||||
expect(existing!.process.exitCode).toBeNull(); // Still alive
|
||||
});
|
||||
|
||||
it('should NOT detect a duplicate when the existing process has already exited', () => {
|
||||
const proc = createMockProcess({ exitCode: 0 });
|
||||
registerProcess(proc.pid, 42, proc as any);
|
||||
|
||||
const existing = getProcessBySession(42);
|
||||
expect(existing).toBeDefined();
|
||||
// exitCode is set — process is already done, NOT a live duplicate
|
||||
expect(existing!.process.exitCode).not.toBeNull();
|
||||
});
|
||||
|
||||
it('should kill existing process and unregister before spawning', () => {
|
||||
const existingProc = createMockProcess();
|
||||
registerProcess(existingProc.pid, 99, existingProc as any);
|
||||
expect(getActiveCount()).toBe(1);
|
||||
|
||||
// Simulate the duplicate-kill logic:
|
||||
const duplicate = getProcessBySession(99);
|
||||
if (duplicate && duplicate.process.exitCode === null) {
|
||||
try { duplicate.process.kill('SIGTERM'); } catch { /* already dead */ }
|
||||
unregisterProcess(duplicate.pid);
|
||||
}
|
||||
|
||||
expect(getActiveCount()).toBe(0);
|
||||
expect(getProcessBySession(99)).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should leave registry empty after killing duplicate so new process can register', () => {
|
||||
const oldProc = createMockProcess();
|
||||
registerProcess(oldProc.pid, 77, oldProc as any);
|
||||
expect(getActiveCount()).toBe(1);
|
||||
|
||||
// Kill duplicate
|
||||
const dup = getProcessBySession(77);
|
||||
if (dup && dup.process.exitCode === null) {
|
||||
try { dup.process.kill('SIGTERM'); } catch { /* ignore */ }
|
||||
unregisterProcess(dup.pid);
|
||||
}
|
||||
expect(getActiveCount()).toBe(0);
|
||||
|
||||
// New process can now register cleanly
|
||||
const newProc = createMockProcess();
|
||||
registerProcess(newProc.pid, 77, newProc as any);
|
||||
expect(getActiveCount()).toBe(1);
|
||||
|
||||
const found = getProcessBySession(77);
|
||||
expect(found!.pid).toBe(newProc.pid);
|
||||
});
|
||||
|
||||
it('should not interfere when no existing process is registered', () => {
|
||||
expect(getProcessBySession(55)).toBeUndefined();
|
||||
|
||||
// Duplicate-kill logic: should be a no-op
|
||||
const dup = getProcessBySession(55);
|
||||
if (dup && dup.process.exitCode === null) {
|
||||
unregisterProcess(dup.pid);
|
||||
}
|
||||
|
||||
// Registry should still be empty — no side effects
|
||||
expect(getActiveCount()).toBe(0);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user