refactor(phase-2c): unify process reaper, drop isProcessAlive (-97 LoC)

Phase 2 tasks 8, 9 of PLAN-RIP-THE-BAND-AIDS-OFF — collapse four
overlapping reaping functions into one loop and remove a duplicate
liveness check.

- Task 8 (unified reaper): new startUnifiedReaper() in
  supervisor/process-registry.ts runs one 60s interval that reaps
  registry entries for dead sessions, ppid=1 orphans, and idle
  daemon-children. Delete startOrphanReaper, reapOrphanedProcesses,
  killIdleDaemonChildren, killSystemOrphans from
  worker/ProcessRegistry.ts. Export notifySlotAvailable so the reaper
  wakes waiters when it frees a concurrency slot — preserves the
  old reapOrphanedProcesses behavior.
- Task 9 (unify isPidAlive / isProcessAlive): delete
  ProcessManager.isProcessAlive and its 5 tests. Callers already use
  supervisor/process-registry.isPidAlive.

Intentional deviations from the plan (trust-but-verify):
- staleSessionReaperInterval retained — its 2-min callback does three
  jobs (reapStaleSessions, clearFailedOlderThan, PRAGMA
  wal_checkpoint); only one is process-reaping. Decomposition is a
  follow-up refactor, not Phase 2c scope.
- aggressiveStartupCleanup retained — it runs once at init, not on
  an interval; plan's "delete the ongoing part" is a no-op.

Tests: 38 fail / 1332 pass expected (5 deleted tests). Observed runs
fluctuate at/below baseline due to pre-existing flakes; zero NEW
failures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-04-21 00:01:12 -07:00
parent 16c454a4b3
commit db7048e75c
7 changed files with 304 additions and 401 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -887,40 +887,6 @@ export function spawnDaemon(
return child.pid;
}
/**
* Check if a process with the given PID is alive.
*
* Uses the process.kill(pid, 0) idiom: signal 0 doesn't send a signal,
* it just checks if the process exists and is reachable.
*
* EPERM is treated as "alive" because it means the process exists but
* belongs to a different user/session (common in multi-user setups).
* PID 0 (Windows sentinel for unknown PID) is treated as alive.
*/
export function isProcessAlive(pid: number): boolean {
// PID 0 is the Windows sentinel value — process was spawned but PID unknown
if (pid === 0) return true;
// Invalid PIDs are not alive
if (!Number.isInteger(pid) || pid < 0) return false;
try {
process.kill(pid, 0);
return true;
} catch (error: unknown) {
if (error instanceof Error) {
const code = (error as NodeJS.ErrnoException).code;
// EPERM = process exists but different user/session — treat as alive
if (code === 'EPERM') return true;
logger.debug('SYSTEM', 'Process not alive', { pid, code });
} else {
logger.debug('SYSTEM', 'Process not alive (non-Error thrown)', { pid }, new Error(String(error)));
}
// ESRCH = no such process — it's dead
return false;
}
}
/**
* Check if the PID file was written recently (within thresholdMs).
*

View File

@@ -105,7 +105,8 @@ import { CorpusBuilder } from './worker/knowledge/CorpusBuilder.js';
import { KnowledgeAgent } from './worker/knowledge/KnowledgeAgent.js';
// Process management for zombie cleanup (Issue #737)
import { startOrphanReaper, reapOrphanedProcesses, getProcessBySession, ensureProcessExit } from './worker/ProcessRegistry.js';
import { getProcessBySession, ensureProcessExit, notifySlotAvailable } from './worker/ProcessRegistry.js';
import { startUnifiedReaper } from '../supervisor/process-registry.js';
/**
* Build JSON status output for hook framework communication.
@@ -524,15 +525,20 @@ export class WorkerService {
}
logger.success('WORKER', 'MCP loopback self-check connected');
// Start orphan reaper to clean up zombie processes (Issue #737)
this.stopOrphanReaper = startOrphanReaper(() => {
const activeIds = new Set<number>();
for (const [id] of this.sessionManager['sessions']) {
activeIds.add(id);
}
return activeIds;
});
logger.info('SYSTEM', 'Started orphan reaper (runs every 30 seconds)');
// Start unified orphan/stale reaper (Issue #737) — runs every 60s,
// reaps registry entries for dead sessions + ppid=1 orphans + idle daemon children.
this.stopOrphanReaper = startUnifiedReaper(
getSupervisor().getRegistry(),
() => {
const activeIds = new Set<number>();
for (const [id] of this.sessionManager['sessions']) {
activeIds.add(id);
}
return activeIds;
},
notifySlotAvailable
);
logger.info('SYSTEM', 'Started unified reaper (runs every 60 seconds)');
// Reap stale sessions to unblock orphan process cleanup (Issue #1168)
this.staleSessionReaperInterval = setInterval(async () => {

View File

@@ -13,17 +13,14 @@
* - Use SDK's spawnClaudeCodeProcess option to capture PIDs
* - Track all spawned processes with session association
* - Verify exit on session deletion with timeout + SIGKILL escalation
* - Safety net orphan reaper runs every 5 minutes
* - Unified reaper in src/supervisor/process-registry.ts runs every 60s
*/
import { spawn, exec, ChildProcess } from 'child_process';
import { promisify } from 'util';
import { spawn, ChildProcess } from 'child_process';
import { logger } from '../../utils/logger.js';
import { sanitizeEnv } from '../../supervisor/env-sanitizer.js';
import { getSupervisor } from '../../supervisor/index.js';
const execAsync = promisify(exec);
interface TrackedProcess {
pid: number;
sessionDbId: number;
@@ -106,7 +103,7 @@ const slotWaiters: Array<() => void> = [];
/**
* Notify waiters that a slot has freed up
*/
function notifySlotAvailable(): void {
export function notifySlotAvailable(): void {
const waiter = slotWaiters.shift();
if (waiter) waiter();
}
@@ -228,159 +225,6 @@ export async function ensureProcessExit(tracked: TrackedProcess, timeoutMs: numb
unregisterProcess(pid);
}
/**
* Kill idle daemon children (claude processes spawned by worker-service)
*
* These are SDK-spawned claude processes that completed their work but
* didn't terminate properly. They remain as children of the worker-service
* daemon, consuming memory without doing useful work.
*
* Criteria for cleanup:
* - Process name is "claude"
* - Parent PID is the worker-service daemon (this process)
* - Process has 0% CPU (idle)
* - Process has been running for more than 2 minutes
*/
async function killIdleDaemonChildren(): Promise<number> {
if (process.platform === 'win32') {
// Windows: Different process model, skip for now
return 0;
}
const daemonPid = process.pid;
let killed = 0;
try {
const { stdout } = await execAsync(
'ps -eo pid,ppid,%cpu,etime,comm 2>/dev/null | grep "claude$" || true'
);
for (const line of stdout.trim().split('\n')) {
if (!line) continue;
const parts = line.trim().split(/\s+/);
if (parts.length < 5) continue;
const [pidStr, ppidStr, cpuStr, etime] = parts;
const pid = parseInt(pidStr, 10);
const ppid = parseInt(ppidStr, 10);
const cpu = parseFloat(cpuStr);
// Skip if not a child of this daemon
if (ppid !== daemonPid) continue;
// Skip if actively using CPU
if (cpu > 0) continue;
// Parse elapsed time to minutes
// Formats: MM:SS, HH:MM:SS, D-HH:MM:SS
let minutes = 0;
const dayMatch = etime.match(/^(\d+)-(\d+):(\d+):(\d+)$/);
const hourMatch = etime.match(/^(\d+):(\d+):(\d+)$/);
const minMatch = etime.match(/^(\d+):(\d+)$/);
if (dayMatch) {
minutes = parseInt(dayMatch[1], 10) * 24 * 60 +
parseInt(dayMatch[2], 10) * 60 +
parseInt(dayMatch[3], 10);
} else if (hourMatch) {
minutes = parseInt(hourMatch[1], 10) * 60 +
parseInt(hourMatch[2], 10);
} else if (minMatch) {
minutes = parseInt(minMatch[1], 10);
}
// Kill if idle for more than 1 minute
if (minutes >= 1) {
logger.info('PROCESS', `Killing idle daemon child PID ${pid} (idle ${minutes}m)`, { pid, minutes });
try {
process.kill(pid, 'SIGKILL');
killed++;
} catch {
// Already dead or permission denied
}
}
}
} catch {
// No matches or command error
}
return killed;
}
/**
* Kill system-level orphans (ppid=1 on Unix)
* These are Claude processes whose parent died unexpectedly
*/
async function killSystemOrphans(): Promise<number> {
if (process.platform === 'win32') {
return 0; // Windows doesn't have ppid=1 orphan concept
}
try {
const { stdout } = await execAsync(
'ps -eo pid,ppid,args 2>/dev/null | grep -E "claude.*haiku|claude.*output-format" | grep -v grep'
);
let killed = 0;
for (const line of stdout.trim().split('\n')) {
if (!line) continue;
const match = line.trim().match(/^(\d+)\s+(\d+)/);
if (match && parseInt(match[2]) === 1) { // ppid=1 = orphan
const orphanPid = parseInt(match[1]);
logger.warn('PROCESS', `Killing system orphan PID ${orphanPid}`, { pid: orphanPid });
try {
process.kill(orphanPid, 'SIGKILL');
killed++;
} catch {
// Already dead or permission denied
}
}
}
return killed;
} catch {
return 0; // No matches or error
}
}
/**
* Reap orphaned processes - both registry-tracked and system-level
*/
export async function reapOrphanedProcesses(activeSessionIds: Set<number>): Promise<number> {
let killed = 0;
// Registry-based: kill processes for dead sessions
for (const record of getSupervisor().getRegistry().getAll().filter(entry => entry.type === 'sdk')) {
const pid = record.pid;
const sessionDbId = Number(record.sessionId);
const processRef = getSupervisor().getRegistry().getRuntimeProcess(record.id);
if (activeSessionIds.has(sessionDbId)) continue; // Active = safe
logger.warn('PROCESS', `Killing orphan PID ${pid} (session ${sessionDbId} gone)`, { pid, sessionDbId });
try {
if (processRef) {
processRef.kill('SIGKILL');
} else {
process.kill(pid, 'SIGKILL');
}
killed++;
} catch {
// Already dead
}
getSupervisor().unregisterProcess(record.id);
notifySlotAvailable();
}
// System-level: find ppid=1 orphans
killed += await killSystemOrphans();
// Daemon children: find idle SDK processes that didn't terminate
killed += await killIdleDaemonChildren();
return killed;
}
/**
* Create a custom spawn function for SDK that captures PIDs
*
@@ -501,27 +345,3 @@ export function createPidCapturingSpawn(sessionDbId: number) {
};
}
/**
* Start the orphan reaper interval
* Returns cleanup function to stop the interval
*/
export function startOrphanReaper(getActiveSessionIds: () => Set<number>, intervalMs: number = 30 * 1000): () => void {
const interval = setInterval(async () => {
try {
const activeIds = getActiveSessionIds();
const killed = await reapOrphanedProcesses(activeIds);
if (killed > 0) {
logger.info('PROCESS', `Reaper cleaned up ${killed} orphaned processes`, { killed });
}
} catch (error) {
if (error instanceof Error) {
logger.error('WORKER', 'Reaper error', {}, error);
} else {
logger.error('WORKER', 'Reaper error', { rawError: String(error) });
}
}
}, intervalMs);
// Return cleanup function
return () => clearInterval(interval);
}

View File

@@ -1,11 +1,16 @@
import { ChildProcess, spawnSync } from 'child_process';
import { ChildProcess, exec, spawnSync } from 'child_process';
import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'fs';
import { homedir } from 'os';
import path from 'path';
import { promisify } from 'util';
import { logger } from '../utils/logger.js';
const execAsync = promisify(exec);
const REAP_SESSION_SIGTERM_TIMEOUT_MS = 5_000;
const REAP_SESSION_SIGKILL_TIMEOUT_MS = 1_000;
const UNIFIED_REAPER_INTERVAL_MS = 60_000;
const IDLE_DAEMON_CHILD_MIN_MINUTES = 1;
const DATA_DIR = path.join(homedir(), '.claude-mem');
const DEFAULT_REGISTRY_PATH = path.join(DATA_DIR, 'supervisor.json');
@@ -406,3 +411,136 @@ export function getProcessRegistry(): ProcessRegistry {
export function createProcessRegistry(registryPath: string): ProcessRegistry {
return new ProcessRegistry(registryPath);
}
/**
* Kill system-level orphans and idle daemon children.
*
* Two concerns, one `ps` sweep:
* - ppid=1 orphans matching "claude.*haiku|claude.*output-format" — their parent died.
* - claude children of this daemon that are idle (0% CPU) for >=1 minute — zombie SDK procs.
*/
async function sweepSystemOrphans(daemonPid: number): Promise<number> {
if (process.platform === 'win32') return 0;
let killed = 0;
try {
const { stdout } = await execAsync(
'ps -eo pid,ppid,%cpu,etime,args 2>/dev/null | grep -E "claude.*haiku|claude.*output-format|claude$" | grep -v grep || true'
);
for (const line of stdout.trim().split('\n')) {
if (!line) continue;
const parts = line.trim().split(/\s+/);
if (parts.length < 5) continue;
const pid = parseInt(parts[0], 10);
const ppid = parseInt(parts[1], 10);
const cpu = parseFloat(parts[2]);
const etime = parts[3];
if (!Number.isInteger(pid) || pid <= 0) continue;
if (pid === daemonPid) continue;
const isPpidOneOrphan = ppid === 1;
const isIdleDaemonChild = ppid === daemonPid && cpu === 0 && parseEtimeMinutes(etime) >= IDLE_DAEMON_CHILD_MIN_MINUTES;
if (!isPpidOneOrphan && !isIdleDaemonChild) continue;
logger.warn('SYSTEM', `Reaper killing ${isPpidOneOrphan ? 'ppid=1 orphan' : 'idle daemon child'} PID ${pid}`, { pid, ppid });
try {
process.kill(pid, 'SIGKILL');
killed++;
} catch (error: unknown) {
if (error instanceof Error) {
const code = (error as NodeJS.ErrnoException).code;
if (code !== 'ESRCH') {
logger.debug('SYSTEM', `Failed to SIGKILL orphan PID ${pid}`, { pid }, error);
}
}
}
}
} catch (error: unknown) {
if (error instanceof Error) {
logger.debug('SYSTEM', 'sweepSystemOrphans: ps scan failed', {}, error);
}
}
return killed;
}
function parseEtimeMinutes(etime: string): number {
// Formats: MM:SS, HH:MM:SS, D-HH:MM:SS
const day = etime.match(/^(\d+)-(\d+):(\d+):(\d+)$/);
if (day) return parseInt(day[1], 10) * 24 * 60 + parseInt(day[2], 10) * 60 + parseInt(day[3], 10);
const hour = etime.match(/^(\d+):(\d+):(\d+)$/);
if (hour) return parseInt(hour[1], 10) * 60 + parseInt(hour[2], 10);
const min = etime.match(/^(\d+):(\d+)$/);
if (min) return parseInt(min[1], 10);
return 0;
}
/**
* Start the unified orphan/stale reaper.
*
* One 60s loop that:
* - iterates registry entries, SIGKILLs PIDs whose session is no longer active
* - sweeps system-level orphans (ppid=1) and idle daemon-children claude processes
*
* Returns a stop function that clears the interval.
*/
export function startUnifiedReaper(
registry: ProcessRegistry,
getActiveSessionIds: () => Set<number>,
onSlotFreed: () => void = () => {}
): () => void {
const interval = setInterval(async () => {
let killed = 0;
try {
const activeIds = getActiveSessionIds();
for (const record of registry.getAll()) {
if (record.type !== 'sdk') continue;
const sessionDbId = Number(record.sessionId);
if (activeIds.has(sessionDbId)) continue;
const processRef = registry.getRuntimeProcess(record.id);
logger.warn('SYSTEM', `Reaper killing orphan PID ${record.pid} (session ${sessionDbId} gone)`, {
pid: record.pid,
sessionDbId
});
try {
if (processRef) {
processRef.kill('SIGKILL');
} else if (isPidAlive(record.pid)) {
process.kill(record.pid, 'SIGKILL');
}
killed++;
} catch (error: unknown) {
if (error instanceof Error) {
const code = (error as NodeJS.ErrnoException).code;
if (code !== 'ESRCH') {
logger.debug('SYSTEM', `Reaper SIGKILL failed for PID ${record.pid}`, { pid: record.pid }, error);
}
}
}
registry.unregister(record.id);
onSlotFreed();
}
killed += await sweepSystemOrphans(process.pid);
if (killed > 0) {
logger.info('SYSTEM', `Unified reaper cleaned up ${killed} processes`, { killed });
}
} catch (error: unknown) {
if (error instanceof Error) {
logger.error('SYSTEM', 'Unified reaper error', {}, error);
} else {
logger.error('SYSTEM', 'Unified reaper error', { rawError: String(error) });
}
}
}, UNIFIED_REAPER_INTERVAL_MS);
return () => clearInterval(interval);
}

View File

@@ -9,7 +9,6 @@ import {
removePidFile,
getPlatformTimeout,
parseElapsedTime,
isProcessAlive,
cleanStalePidFile,
isPidFileRecent,
touchPidFile,
@@ -338,31 +337,6 @@ describe('ProcessManager', () => {
});
});
describe('isProcessAlive', () => {
it('should return true for the current process', () => {
expect(isProcessAlive(process.pid)).toBe(true);
});
it('should return false for a non-existent PID', () => {
// Use a very high PID that's extremely unlikely to exist
expect(isProcessAlive(2147483647)).toBe(false);
});
it('should return true for PID 0 (Windows WMIC sentinel)', () => {
expect(isProcessAlive(0)).toBe(true);
});
it('should return false for negative PIDs', () => {
expect(isProcessAlive(-1)).toBe(false);
expect(isProcessAlive(-999)).toBe(false);
});
it('should return false for non-integer PIDs', () => {
expect(isProcessAlive(1.5)).toBe(false);
expect(isProcessAlive(NaN)).toBe(false);
});
});
describe('captureProcessStartToken', () => {
const supported = process.platform === 'linux' || process.platform === 'darwin';