feat: add embedded Process Supervisor for unified process lifecycle (#1370)

* feat: add embedded Process Supervisor for unified process lifecycle management

Consolidates scattered process management (ProcessManager, GracefulShutdown,
HealthMonitor, ProcessRegistry) into a unified src/supervisor/ module.

New: ProcessRegistry with JSON persistence, env sanitizer (strips CLAUDECODE_*
vars), graceful shutdown cascade (SIGTERM → 5s wait → SIGKILL with tree-kill
on Windows), PID file liveness validation, and singleton Supervisor API.

Fixes #1352 (worker inherits CLAUDECODE env causing nested sessions)
Fixes #1356 (zombie TCP socket after Windows reboot)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add session-scoped process reaping to supervisor

Adds reapSession(sessionId) to ProcessRegistry for killing session-tagged
processes on session end. SessionManager.deleteSession() now triggers reaping.
Tightens orphan reaper interval from 60s to 30s.

Fixes #1351 (MCP server processes leak on session end)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add Unix domain socket support for worker communication

Introduces socket-manager.ts for UDS-based worker communication, eliminating
port 37777 collisions between concurrent sessions. Worker listens on
~/.claude-mem/sockets/worker.sock by default with TCP fallback.

All hook handlers, MCP server, health checks, and admin commands updated to
use socket-aware workerHttpRequest(). Backwards compatible — settings can
force TCP mode via CLAUDE_MEM_WORKER_TRANSPORT=tcp.

Fixes #1346 (port 37777 collision across concurrent sessions)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: remove in-process worker fallback from hook command

Removes the fallback path where hook scripts started WorkerService in-process,
making the worker a grandchild of Claude Code (killed by sandbox). Hooks now
always delegate to ensureWorkerStarted() which spawns a fully detached daemon.

Fixes #1249 (grandchild process killed by sandbox)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add health checker and /api/admin/doctor endpoint

Adds 30-second periodic health sweep that prunes dead processes from the
supervisor registry and cleans stale socket files. Adds /api/admin/doctor
endpoint exposing supervisor state, process liveness, and environment health.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test: add comprehensive supervisor test suite

64 tests covering all supervisor modules: process registry (18 tests),
env sanitizer (8), shutdown cascade (10), socket manager (15), health
checker (5), and supervisor API (6). Includes persistence, isolation,
edge cases, and cross-module integration scenarios.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: revert Unix domain socket transport, restore TCP on port 37777

The socket-manager introduced UDS as default transport, but this broke
the HTTP server's TCP accessibility (viewer UI, curl, external monitoring).
Since there's only ever one worker process handling all sessions, the
port collision rationale for UDS doesn't apply. Reverts to TCP-only,
removing ~900 lines of unnecessary complexity.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore: remove dead code found in pre-landing review

Remove unused `acceptingSpawns` field from Supervisor class (written but
never read — assertCanSpawn uses stopPromise instead) and unused
`buildWorkerUrl` import from context handler.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* updated gitignore

* fix: address PR review feedback - downgrade HTTP logging, clean up gitignore, harden supervisor

- Downgrade request/response HTTP logging from info to debug to reduce noise
- Remove unused getWorkerPort imports, use buildWorkerUrl helper
- Export ENV_PREFIXES/ENV_EXACT_MATCHES from env-sanitizer, reuse in Server.ts
- Fix isPidAlive(0) returning true (should be false)
- Add shutdownInitiated flag to prevent signal handler race condition
- Make validateWorkerPidFile testable with pidFilePath option
- Remove unused dataDir from ShutdownCascadeOptions
- Upgrade reapSession log from debug to warn
- Rename zombiePidFiles to deadProcessPids (returns actual PIDs)
- Clean up gitignore: remove duplicate datasets/, stale ~*/ and http*/ patterns
- Fix tests to use temp directories instead of relying on real PID file

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-03-16 14:49:23 -07:00
committed by GitHub
parent 237a4c37f8
commit 80a8c90a1a
44 changed files with 2385 additions and 636 deletions

13
.gitignore vendored
View File

@@ -20,7 +20,6 @@ plugin/data.backup/
package-lock.json
bun.lock
private/
datasets/
Auto Run Docs/
# Generated UI files (built from viewer-template.html)
@@ -30,12 +29,10 @@ src/ui/viewer.html
.mcp.json
.cursor/
# Prevent literal tilde directories (path validation bug artifacts)
~*/
# Prevent other malformed path directories
http*/
https*/
# Ignore WebStorm project files (for dinosaur IDE users)
.idea/
.claude-octopus/
.claude/session-intent.md
.claude/session-plan.md
.octo/

View File

@@ -129,5 +129,8 @@
"tree-sitter-typescript": "^0.23.2",
"tsx": "^4.20.6",
"typescript": "^5.3.0"
},
"optionalDependencies": {
"tree-kill": "^1.2.2"
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -6,7 +6,7 @@
*/
import type { EventHandler, NormalizedHookInput, HookResult } from '../types.js';
import { ensureWorkerRunning, getWorkerPort } from '../../shared/worker-utils.js';
import { ensureWorkerRunning, getWorkerPort, workerHttpRequest } from '../../shared/worker-utils.js';
import { getProjectContext } from '../../utils/project-name.js';
import { HOOK_EXIT_CODES } from '../../shared/hook-constants.js';
import { logger } from '../../utils/logger.js';
@@ -38,16 +38,16 @@ export const contextHandler: EventHandler = {
// Pass all projects (parent + worktree if applicable) for unified timeline
const projectsParam = context.allProjects.join(',');
const url = `http://127.0.0.1:${port}/api/context/inject?projects=${encodeURIComponent(projectsParam)}`;
const apiPath = `/api/context/inject?projects=${encodeURIComponent(projectsParam)}`;
const colorApiPath = `${apiPath}&colors=true`;
// Note: Removed AbortSignal.timeout due to Windows Bun cleanup issue (libuv assertion)
// Worker service has its own timeouts, so client-side timeout is redundant
try {
// Fetch markdown (for Claude context) and optionally colored (for user display)
const colorUrl = `${url}&colors=true`;
const [response, colorResponse] = await Promise.all([
fetch(url),
showTerminalOutput ? fetch(colorUrl).catch(() => null) : Promise.resolve(null)
workerHttpRequest(apiPath),
showTerminalOutput ? workerHttpRequest(colorApiPath).catch(() => null) : Promise.resolve(null)
]);
if (!response.ok) {

View File

@@ -6,7 +6,7 @@
*/
import type { EventHandler, NormalizedHookInput, HookResult } from '../types.js';
import { ensureWorkerRunning, getWorkerPort } from '../../shared/worker-utils.js';
import { ensureWorkerRunning, workerHttpRequest } from '../../shared/worker-utils.js';
import { logger } from '../../utils/logger.js';
import { HOOK_EXIT_CODES } from '../../shared/hook-constants.js';
@@ -25,10 +25,7 @@ export const fileEditHandler: EventHandler = {
throw new Error('fileEditHandler requires filePath');
}
const port = getWorkerPort();
logger.dataIn('HOOK', `FileEdit: ${filePath}`, {
workerPort: port,
editCount: edits?.length ?? 0
});
@@ -40,7 +37,7 @@ export const fileEditHandler: EventHandler = {
// Send to worker as an observation with file edit metadata
// The observation handler on the worker will process this appropriately
try {
const response = await fetch(`http://127.0.0.1:${port}/api/sessions/observations`, {
const response = await workerHttpRequest('/api/sessions/observations', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
@@ -50,7 +47,6 @@ export const fileEditHandler: EventHandler = {
tool_response: { success: true },
cwd
})
// Note: Removed signal to avoid Windows Bun cleanup issue (libuv assertion)
});
if (!response.ok) {

View File

@@ -5,7 +5,7 @@
*/
import type { EventHandler, NormalizedHookInput, HookResult } from '../types.js';
import { ensureWorkerRunning, getWorkerPort } from '../../shared/worker-utils.js';
import { ensureWorkerRunning, workerHttpRequest } from '../../shared/worker-utils.js';
import { logger } from '../../utils/logger.js';
import { HOOK_EXIT_CODES } from '../../shared/hook-constants.js';
import { isProjectExcluded } from '../../utils/project-filter.js';
@@ -28,13 +28,9 @@ export const observationHandler: EventHandler = {
return { continue: true, suppressOutput: true, exitCode: HOOK_EXIT_CODES.SUCCESS };
}
const port = getWorkerPort();
const toolStr = logger.formatTool(toolName, toolInput);
logger.dataIn('HOOK', `PostToolUse: ${toolStr}`, {
workerPort: port
});
logger.dataIn('HOOK', `PostToolUse: ${toolStr}`, {});
// Validate required fields before sending to worker
if (!cwd) {
@@ -50,7 +46,7 @@ export const observationHandler: EventHandler = {
// Send to worker - worker handles privacy check and database operations
try {
const response = await fetch(`http://127.0.0.1:${port}/api/sessions/observations`, {
const response = await workerHttpRequest('/api/sessions/observations', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
@@ -60,7 +56,6 @@ export const observationHandler: EventHandler = {
tool_response: toolResponse,
cwd
})
// Note: Removed signal to avoid Windows Bun cleanup issue (libuv assertion)
});
if (!response.ok) {

View File

@@ -10,7 +10,7 @@
*/
import type { EventHandler, NormalizedHookInput, HookResult } from '../types.js';
import { ensureWorkerRunning, getWorkerPort } from '../../shared/worker-utils.js';
import { ensureWorkerRunning, workerHttpRequest } from '../../shared/worker-utils.js';
import { logger } from '../../utils/logger.js';
export const sessionCompleteHandler: EventHandler = {
@@ -23,7 +23,6 @@ export const sessionCompleteHandler: EventHandler = {
}
const { sessionId } = input;
const port = getWorkerPort();
if (!sessionId) {
logger.warn('HOOK', 'session-complete: Missing sessionId, skipping');
@@ -31,13 +30,12 @@ export const sessionCompleteHandler: EventHandler = {
}
logger.info('HOOK', '→ session-complete: Removing session from active map', {
workerPort: port,
contentSessionId: sessionId
});
try {
// Call the session complete endpoint by contentSessionId
const response = await fetch(`http://127.0.0.1:${port}/api/sessions/complete`, {
const response = await workerHttpRequest('/api/sessions/complete', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({

View File

@@ -5,7 +5,7 @@
*/
import type { EventHandler, NormalizedHookInput, HookResult } from '../types.js';
import { ensureWorkerRunning, getWorkerPort } from '../../shared/worker-utils.js';
import { ensureWorkerRunning, workerHttpRequest } from '../../shared/worker-utils.js';
import { getProjectName } from '../../utils/project-name.js';
import { logger } from '../../utils/logger.js';
import { HOOK_EXIT_CODES } from '../../shared/hook-constants.js';
@@ -42,12 +42,11 @@ export const sessionInitHandler: EventHandler = {
const prompt = (!rawPrompt || !rawPrompt.trim()) ? '[media prompt]' : rawPrompt;
const project = getProjectName(cwd);
const port = getWorkerPort();
logger.debug('HOOK', 'session-init: Calling /api/sessions/init', { contentSessionId: sessionId, project });
// Initialize session via HTTP - handles DB operations and privacy checks
const initResponse = await fetch(`http://127.0.0.1:${port}/api/sessions/init`, {
const initResponse = await workerHttpRequest('/api/sessions/init', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
@@ -55,7 +54,6 @@ export const sessionInitHandler: EventHandler = {
project,
prompt
})
// Note: Removed signal to avoid Windows Bun cleanup issue (libuv assertion)
});
if (!initResponse.ok) {
@@ -107,11 +105,10 @@ export const sessionInitHandler: EventHandler = {
logger.debug('HOOK', 'session-init: Calling /sessions/{sessionDbId}/init', { sessionDbId, promptNumber });
// Initialize SDK agent session via HTTP (starts the agent!)
const response = await fetch(`http://127.0.0.1:${port}/sessions/${sessionDbId}/init`, {
const response = await workerHttpRequest(`/sessions/${sessionDbId}/init`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ userPrompt: cleanedPrompt, promptNumber })
// Note: Removed signal to avoid Windows Bun cleanup issue (libuv assertion)
});
if (!response.ok) {

View File

@@ -7,7 +7,7 @@
*/
import type { EventHandler, NormalizedHookInput, HookResult } from '../types.js';
import { ensureWorkerRunning, getWorkerPort, fetchWithTimeout } from '../../shared/worker-utils.js';
import { ensureWorkerRunning, workerHttpRequest } from '../../shared/worker-utils.js';
import { logger } from '../../utils/logger.js';
import { extractLastMessage } from '../../shared/transcript-parser.js';
import { HOOK_EXIT_CODES, HOOK_TIMEOUTS, getTimeout } from '../../shared/hook-constants.js';
@@ -25,8 +25,6 @@ export const summarizeHandler: EventHandler = {
const { sessionId, transcriptPath } = input;
const port = getWorkerPort();
// Validate required fields before processing
if (!transcriptPath) {
// No transcript available - skip summary gracefully (not an error)
@@ -40,23 +38,19 @@ export const summarizeHandler: EventHandler = {
const lastAssistantMessage = extractLastMessage(transcriptPath, 'assistant', true);
logger.dataIn('HOOK', 'Stop: Requesting summary', {
workerPort: port,
hasLastAssistantMessage: !!lastAssistantMessage
});
// Send to worker - worker handles privacy check and database operations
const response = await fetchWithTimeout(
`http://127.0.0.1:${port}/api/sessions/summarize`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
contentSessionId: sessionId,
last_assistant_message: lastAssistantMessage
}),
},
SUMMARIZE_TIMEOUT_MS
);
const response = await workerHttpRequest('/api/sessions/summarize', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
contentSessionId: sessionId,
last_assistant_message: lastAssistantMessage
}),
timeoutMs: SUMMARIZE_TIMEOUT_MS
});
if (!response.ok) {
// Return standard response even on failure (matches original behavior)

View File

@@ -7,7 +7,7 @@
import { basename } from 'path';
import type { EventHandler, NormalizedHookInput, HookResult } from '../types.js';
import { ensureWorkerRunning, getWorkerPort } from '../../shared/worker-utils.js';
import { ensureWorkerRunning, getWorkerPort, workerHttpRequest } from '../../shared/worker-utils.js';
import { HOOK_EXIT_CODES } from '../../shared/hook-constants.js';
export const userMessageHandler: EventHandler = {
@@ -23,11 +23,9 @@ export const userMessageHandler: EventHandler = {
const project = basename(input.cwd ?? process.cwd());
// Fetch formatted context directly from worker API
// Note: Removed AbortSignal.timeout to avoid Windows Bun cleanup issue (libuv assertion)
try {
const response = await fetch(
`http://127.0.0.1:${port}/api/context/inject?project=${encodeURIComponent(project)}&colors=true`,
{ method: 'GET' }
const response = await workerHttpRequest(
`/api/context/inject?project=${encodeURIComponent(project)}&colors=true`
);
if (!response.ok) {

View File

@@ -27,19 +27,12 @@ import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import { getWorkerPort, getWorkerHost } from '../shared/worker-utils.js';
import { workerHttpRequest } from '../shared/worker-utils.js';
import { searchCodebase, formatSearchResults } from '../services/smart-file-read/search.js';
import { parseFile, formatFoldedView, unfoldSymbol } from '../services/smart-file-read/parser.js';
import { readFile } from 'node:fs/promises';
import { resolve } from 'node:path';
/**
* Worker HTTP API configuration
*/
const WORKER_PORT = getWorkerPort();
const WORKER_HOST = getWorkerHost();
const WORKER_BASE_URL = `http://${WORKER_HOST}:${WORKER_PORT}`;
/**
* Map tool names to Worker HTTP endpoints
*/
@@ -49,7 +42,7 @@ const TOOL_ENDPOINT_MAP: Record<string, string> = {
};
/**
* Call Worker HTTP API endpoint
* Call Worker HTTP API endpoint (uses socket or TCP automatically)
*/
async function callWorkerAPI(
endpoint: string,
@@ -67,8 +60,8 @@ async function callWorkerAPI(
}
}
const url = `${WORKER_BASE_URL}${endpoint}?${searchParams}`;
const response = await fetch(url);
const apiPath = `${endpoint}?${searchParams}`;
const response = await workerHttpRequest(apiPath);
if (!response.ok) {
const errorText = await response.text();
@@ -103,12 +96,9 @@ async function callWorkerAPIPost(
logger.debug('HTTP', 'Worker API request (POST)', undefined, { endpoint });
try {
const url = `${WORKER_BASE_URL}${endpoint}`;
const response = await fetch(url, {
const response = await workerHttpRequest(endpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body)
});
@@ -145,7 +135,7 @@ async function callWorkerAPIPost(
*/
async function verifyWorkerConnection(): Promise<boolean> {
try {
const response = await fetch(`${WORKER_BASE_URL}/api/health`);
const response = await workerHttpRequest('/api/health');
return response.ok;
} catch (error) {
// Expected during worker startup or if worker is down
@@ -448,11 +438,11 @@ async function main() {
setTimeout(async () => {
const workerAvailable = await verifyWorkerConnection();
if (!workerAvailable) {
logger.error('SYSTEM', 'Worker not available', undefined, { workerUrl: WORKER_BASE_URL });
logger.error('SYSTEM', 'Worker not available', undefined, {});
logger.error('SYSTEM', 'Tools will fail until Worker is started');
logger.error('SYSTEM', 'Start Worker with: npm run worker:restart');
} else {
logger.info('SYSTEM', 'Worker available', undefined, { workerUrl: WORKER_BASE_URL });
logger.info('SYSTEM', 'Worker available', undefined, {});
}
}, 0);
}

View File

@@ -10,12 +10,7 @@
import http from 'http';
import { logger } from '../../utils/logger.js';
import {
getChildProcesses,
forceKillProcess,
waitForProcessesExit,
removePidFile
} from './ProcessManager.js';
import { stopSupervisor } from '../../supervisor/index.js';
export interface ShutdownableService {
shutdownAll(): Promise<void>;
@@ -57,49 +52,35 @@ export interface GracefulShutdownConfig {
export async function performGracefulShutdown(config: GracefulShutdownConfig): Promise<void> {
logger.info('SYSTEM', 'Shutdown initiated');
// Clean up PID file on shutdown
removePidFile();
// STEP 1: Enumerate all child processes BEFORE we start closing things
const childPids = await getChildProcesses(process.pid);
logger.info('SYSTEM', 'Found child processes', { count: childPids.length, pids: childPids });
// STEP 2: Close HTTP server first
// STEP 1: Close HTTP server first
if (config.server) {
await closeHttpServer(config.server);
logger.info('SYSTEM', 'HTTP server closed');
}
// STEP 3: Shutdown active sessions
// STEP 2: Shutdown active sessions
await config.sessionManager.shutdownAll();
// STEP 4: Close MCP client connection (signals child to exit gracefully)
// STEP 3: Close MCP client connection (signals child to exit gracefully)
if (config.mcpClient) {
await config.mcpClient.close();
logger.info('SYSTEM', 'MCP client closed');
}
// STEP 5: Stop Chroma MCP connection
// STEP 4: Stop Chroma MCP connection
if (config.chromaMcpManager) {
logger.info('SHUTDOWN', 'Stopping Chroma MCP connection...');
await config.chromaMcpManager.stop();
logger.info('SHUTDOWN', 'Chroma MCP connection stopped');
}
// STEP 6: Close database connection (includes ChromaSync cleanup)
// STEP 5: Close database connection (includes ChromaSync cleanup)
if (config.dbManager) {
await config.dbManager.close();
}
// STEP 7: Force kill any remaining child processes (Windows zombie port fix)
if (childPids.length > 0) {
logger.info('SYSTEM', 'Force killing remaining children');
for (const pid of childPids) {
await forceKillProcess(pid);
}
// Wait for children to fully exit
await waitForProcessesExit(childPids, 5000);
}
// STEP 6: Supervisor handles tracked child termination, PID cleanup, and stale sockets.
await stopSupervisor();
logger.info('SYSTEM', 'Worker shutdown complete');
}

View File

@@ -14,6 +14,26 @@ import { readFileSync } from 'fs';
import { logger } from '../../utils/logger.js';
import { MARKETPLACE_ROOT } from '../../shared/paths.js';
/**
* Make an HTTP request to the worker via TCP.
* Returns { ok, statusCode, body } or throws on transport error.
*/
async function httpRequestToWorker(
port: number,
endpointPath: string,
method: string = 'GET'
): Promise<{ ok: boolean; statusCode: number; body: string }> {
const response = await fetch(`http://127.0.0.1:${port}${endpointPath}`, { method });
// Gracefully handle cases where response body isn't available (e.g., test mocks)
let body = '';
try {
body = await response.text();
} catch {
// Body unavailable — health/readiness checks only need .ok
}
return { ok: response.ok, statusCode: response.status, body };
}
/**
* Check if a port is in use by querying the health endpoint
*/
@@ -29,7 +49,7 @@ export async function isPortInUse(port: number): Promise<boolean> {
}
/**
* Poll a localhost endpoint until it returns 200 OK or timeout.
* Poll a worker endpoint until it returns 200 OK or timeout.
* Shared implementation for liveness and readiness checks.
*/
async function pollEndpointUntilOk(
@@ -41,12 +61,11 @@ async function pollEndpointUntilOk(
const start = Date.now();
while (Date.now() - start < timeoutMs) {
try {
// Note: Removed AbortSignal.timeout to avoid Windows Bun cleanup issue (libuv assertion)
const response = await fetch(`http://127.0.0.1:${port}${endpointPath}`);
if (response.ok) return true;
const result = await httpRequestToWorker(port, endpointPath);
if (result.ok) return true;
} catch (error) {
// [ANTI-PATTERN IGNORED]: Retry loop - expected failures during startup, will retry
logger.debug('SYSTEM', retryLogMessage, { port }, error as Error);
logger.debug('SYSTEM', retryLogMessage, {}, error as Error);
}
await new Promise(r => setTimeout(r, 500));
}
@@ -87,28 +106,24 @@ export async function waitForPortFree(port: number, timeoutMs: number = 10000):
/**
* Send HTTP shutdown request to a running worker
* @param port Worker port
* @returns true if shutdown request was acknowledged, false otherwise
*/
export async function httpShutdown(port: number): Promise<boolean> {
try {
// Note: Removed AbortSignal.timeout to avoid Windows Bun cleanup issue (libuv assertion)
const response = await fetch(`http://127.0.0.1:${port}/api/admin/shutdown`, {
method: 'POST'
});
if (!response.ok) {
logger.warn('SYSTEM', 'Shutdown request returned error', { port, status: response.status });
const result = await httpRequestToWorker(port, '/api/admin/shutdown', 'POST');
if (!result.ok) {
logger.warn('SYSTEM', 'Shutdown request returned error', { status: result.statusCode });
return false;
}
return true;
} catch (error) {
// Connection refused is expected if worker already stopped
if (error instanceof Error && error.message?.includes('ECONNREFUSED')) {
logger.debug('SYSTEM', 'Worker already stopped', { port }, error);
logger.debug('SYSTEM', 'Worker already stopped', {}, error);
return false;
}
// Unexpected error - log full details
logger.error('SYSTEM', 'Shutdown request failed unexpectedly', { port }, error as Error);
logger.error('SYSTEM', 'Shutdown request failed unexpectedly', {}, error as Error);
return false;
}
}
@@ -135,17 +150,17 @@ export function getInstalledPluginVersion(): string {
/**
* Get the running worker's version via API
* This is the "actual" version currently running
* This is the "actual" version currently running.
*/
export async function getRunningWorkerVersion(port: number): Promise<string | null> {
try {
const response = await fetch(`http://127.0.0.1:${port}/api/version`);
if (!response.ok) return null;
const data = await response.json() as { version: string };
const result = await httpRequestToWorker(port, '/api/version');
if (!result.ok) return null;
const data = JSON.parse(result.body) as { version: string };
return data.version;
} catch {
// Expected: worker not running or version endpoint unavailable
logger.debug('SYSTEM', 'Could not fetch worker version', { port });
logger.debug('SYSTEM', 'Could not fetch worker version', {});
return null;
}
}

View File

@@ -15,6 +15,8 @@ import { exec, execSync, spawn } from 'child_process';
import { promisify } from 'util';
import { logger } from '../../utils/logger.js';
import { HOOK_TIMEOUTS } from '../../shared/hook-constants.js';
import { sanitizeEnv } from '../../supervisor/env-sanitizer.js';
import { getSupervisor, validateWorkerPidFile, type ValidateWorkerPidStatus } from '../../supervisor/index.js';
const execAsync = promisify(exec);
@@ -625,11 +627,13 @@ export function spawnDaemon(
extraEnv: Record<string, string> = {}
): number | undefined {
const isWindows = process.platform === 'win32';
const env = {
getSupervisor().assertCanSpawn('worker daemon');
const env = sanitizeEnv({
...process.env,
CLAUDE_MEM_WORKER_PORT: String(port),
...extraEnv
};
});
if (isWindows) {
// Use PowerShell Start-Process to spawn a hidden, independent process
@@ -764,18 +768,8 @@ export function touchPidFile(): void {
* Called at the top of ensureWorkerStarted() to clean up after WSL2
* hibernate, OOM kills, or other ungraceful worker deaths.
*/
export function cleanStalePidFile(): void {
const pidInfo = readPidFile();
if (!pidInfo) return;
if (!isProcessAlive(pidInfo.pid)) {
logger.info('SYSTEM', 'Removing stale PID file (worker process is dead)', {
pid: pidInfo.pid,
port: pidInfo.port,
startedAt: pidInfo.startedAt
});
removePidFile();
}
export function cleanStalePidFile(): ValidateWorkerPidStatus {
return validateWorkerPidFile({ logAlive: false });
}
/**

View File

@@ -15,7 +15,7 @@ import { existsSync, readFileSync, writeFileSync, unlinkSync, mkdirSync } from '
import { exec } from 'child_process';
import { promisify } from 'util';
import { logger } from '../../utils/logger.js';
import { getWorkerPort } from '../../shared/worker-utils.js';
import { getWorkerPort, workerHttpRequest } from '../../shared/worker-utils.js';
import { DATA_DIR, MARKETPLACE_ROOT, CLAUDE_CONFIG_DIR } from '../../shared/paths.js';
import {
readCursorRegistry as readCursorRegistryFromFile,
@@ -95,16 +95,16 @@ export function unregisterCursorProject(projectName: string): void {
* Update Cursor context files for all registered projects matching this project name.
* Called by SDK agents after saving a summary.
*/
export async function updateCursorContextForProject(projectName: string, port: number): Promise<void> {
export async function updateCursorContextForProject(projectName: string, _port: number): Promise<void> {
const registry = readCursorRegistry();
const entry = registry[projectName];
if (!entry) return; // Project doesn't have Cursor hooks installed
try {
// Fetch fresh context from worker
const response = await fetch(
`http://127.0.0.1:${port}/api/context/inject?project=${encodeURIComponent(projectName)}`
// Fetch fresh context from worker (uses socket or TCP automatically)
const response = await workerHttpRequest(
`/api/context/inject?project=${encodeURIComponent(projectName)}`
);
if (!response.ok) return;
@@ -398,19 +398,18 @@ async function setupProjectContext(targetDir: string, workspaceRoot: string): Pr
const rulesDir = path.join(targetDir, 'rules');
mkdirSync(rulesDir, { recursive: true });
const port = getWorkerPort();
const projectName = path.basename(workspaceRoot);
let contextGenerated = false;
console.log(` Generating initial context...`);
try {
// Check if worker is running
const healthResponse = await fetch(`http://127.0.0.1:${port}/api/readiness`);
// Check if worker is running (uses socket or TCP automatically)
const healthResponse = await workerHttpRequest('/api/readiness');
if (healthResponse.ok) {
// Fetch context
const contextResponse = await fetch(
`http://127.0.0.1:${port}/api/context/inject?project=${encodeURIComponent(projectName)}`
const contextResponse = await workerHttpRequest(
`/api/context/inject?project=${encodeURIComponent(projectName)}`
);
if (contextResponse.ok) {
const context = await contextResponse.text();

View File

@@ -17,6 +17,9 @@ import { ALLOWED_OPERATIONS, ALLOWED_TOPICS } from './allowed-constants.js';
import { logger } from '../../utils/logger.js';
import { createMiddleware, summarizeRequestBody, requireLocalhost } from './Middleware.js';
import { errorHandler, notFoundHandler } from './ErrorHandler.js';
import { getSupervisor } from '../../supervisor/index.js';
import { isPidAlive } from '../../supervisor/process-registry.js';
import { ENV_PREFIXES, ENV_EXACT_MATCHES } from '../../supervisor/env-sanitizer.js';
// Build-time injected version constant (set by esbuild define)
declare const __DEFAULT_PACKAGE_VERSION__: string;
@@ -285,6 +288,50 @@ export class Server {
}, 100);
}
});
// Doctor endpoint - diagnostic view of supervisor, processes, and health
this.app.get('/api/admin/doctor', requireLocalhost, (_req: Request, res: Response) => {
const supervisor = getSupervisor();
const registry = supervisor.getRegistry();
const allRecords = registry.getAll();
// Check each process liveness
const processes = allRecords.map(record => ({
id: record.id,
pid: record.pid,
type: record.type,
status: isPidAlive(record.pid) ? 'alive' as const : 'dead' as const,
startedAt: record.startedAt,
}));
// Check for dead processes still in registry
const deadProcessPids = processes.filter(p => p.status === 'dead').map(p => p.pid);
// Check if CLAUDECODE_* env vars are leaking into this process
const envClean = !Object.keys(process.env).some(key =>
ENV_EXACT_MATCHES.has(key) || ENV_PREFIXES.some(prefix => key.startsWith(prefix))
);
// Format uptime
const uptimeMs = Date.now() - this.startTime;
const uptimeSeconds = Math.floor(uptimeMs / 1000);
const hours = Math.floor(uptimeSeconds / 3600);
const minutes = Math.floor((uptimeSeconds % 3600) / 60);
const formattedUptime = hours > 0 ? `${hours}h ${minutes}m` : `${minutes}m`;
res.json({
supervisor: {
running: true,
pid: process.pid,
uptime: formattedUptime,
},
processes,
health: {
deadProcessPids,
envClean,
},
});
});
}
/**

View File

@@ -839,19 +839,21 @@ export class SessionStore {
* Add content_hash column to observations for deduplication (migration 22)
*/
private addObservationContentHashColumn(): void {
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(22) as SchemaVersion | undefined;
if (applied) return;
// Check actual schema first — cross-machine DB sync can leave schema_versions
// claiming this migration ran while the column is actually missing.
const tableInfo = this.db.query('PRAGMA table_info(observations)').all() as TableColumnInfo[];
const hasColumn = tableInfo.some(col => col.name === 'content_hash');
if (!hasColumn) {
this.db.run('ALTER TABLE observations ADD COLUMN content_hash TEXT');
this.db.run("UPDATE observations SET content_hash = substr(hex(randomblob(8)), 1, 16) WHERE content_hash IS NULL");
this.db.run('CREATE INDEX IF NOT EXISTS idx_observations_content_hash ON observations(content_hash, created_at_epoch)');
logger.debug('DB', 'Added content_hash column to observations table with backfill and index');
if (hasColumn) {
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(22, new Date().toISOString());
return;
}
this.db.run('ALTER TABLE observations ADD COLUMN content_hash TEXT');
this.db.run("UPDATE observations SET content_hash = substr(hex(randomblob(8)), 1, 16) WHERE content_hash IS NULL");
this.db.run('CREATE INDEX IF NOT EXISTS idx_observations_content_hash ON observations(content_hash, created_at_epoch)');
logger.debug('DB', 'Added content_hash column to observations table with backfill and index');
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(22, new Date().toISOString());
}

View File

@@ -823,21 +823,25 @@ export class MigrationRunner {
* Backfills existing rows with unique random hashes so they don't block new inserts.
*/
private addObservationContentHashColumn(): void {
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(22) as SchemaVersion | undefined;
if (applied) return;
// Check actual schema first — cross-machine DB sync can leave schema_versions
// claiming this migration ran while the column is actually missing (e.g. migration 21
// recreated the table without content_hash on the synced machine).
const tableInfo = this.db.query('PRAGMA table_info(observations)').all() as TableColumnInfo[];
const hasColumn = tableInfo.some(col => col.name === 'content_hash');
if (!hasColumn) {
this.db.run('ALTER TABLE observations ADD COLUMN content_hash TEXT');
// Backfill existing rows with unique random hashes
this.db.run("UPDATE observations SET content_hash = substr(hex(randomblob(8)), 1, 16) WHERE content_hash IS NULL");
// Index for fast dedup lookups
this.db.run('CREATE INDEX IF NOT EXISTS idx_observations_content_hash ON observations(content_hash, created_at_epoch)');
logger.debug('DB', 'Added content_hash column to observations table with backfill and index');
if (hasColumn) {
// Column exists — just ensure version record is present
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(22, new Date().toISOString());
return;
}
this.db.run('ALTER TABLE observations ADD COLUMN content_hash TEXT');
// Backfill existing rows with unique random hashes
this.db.run("UPDATE observations SET content_hash = substr(hex(randomblob(8)), 1, 16) WHERE content_hash IS NULL");
// Index for fast dedup lookups
this.db.run('CREATE INDEX IF NOT EXISTS idx_observations_content_hash ON observations(content_hash, created_at_epoch)');
logger.debug('DB', 'Added content_hash column to observations table with backfill and index');
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(22, new Date().toISOString());
}

View File

@@ -21,12 +21,15 @@ import fs from 'fs';
import { logger } from '../../utils/logger.js';
import { SettingsDefaultsManager } from '../../shared/SettingsDefaultsManager.js';
import { USER_SETTINGS_PATH } from '../../shared/paths.js';
import { sanitizeEnv } from '../../supervisor/env-sanitizer.js';
import { getSupervisor } from '../../supervisor/index.js';
const CHROMA_MCP_CLIENT_NAME = 'claude-mem-chroma';
const CHROMA_MCP_CLIENT_VERSION = '1.0.0';
const MCP_CONNECTION_TIMEOUT_MS = 30_000;
const RECONNECT_BACKOFF_MS = 10_000; // Don't retry connections faster than this after failure
const DEFAULT_CHROMA_DATA_DIR = path.join(os.homedir(), '.claude-mem', 'chroma');
const CHROMA_SUPERVISOR_ID = 'chroma-mcp';
export class ChromaMcpManager {
private static instance: ChromaMcpManager | null = null;
@@ -101,6 +104,7 @@ export class ChromaMcpManager {
const commandArgs = this.buildCommandArgs();
const spawnEnvironment = this.getSpawnEnv();
getSupervisor().assertCanSpawn('chroma mcp');
// On Windows, .cmd files require shell resolution. Since MCP SDK's
// StdioClientTransport doesn't support `shell: true`, route through
@@ -155,6 +159,7 @@ export class ChromaMcpManager {
clearTimeout(timeoutId!);
this.connected = true;
this.registerManagedProcess();
logger.info('CHROMA_MCP', 'Connected to chroma-mcp successfully');
@@ -169,6 +174,7 @@ export class ChromaMcpManager {
}
logger.warn('CHROMA_MCP', 'chroma-mcp subprocess closed unexpectedly, applying reconnect backoff');
this.connected = false;
getSupervisor().unregisterProcess(CHROMA_SUPERVISOR_ID);
this.client = null;
this.transport = null;
this.lastConnectionFailureTimestamp = Date.now();
@@ -333,6 +339,7 @@ export class ChromaMcpManager {
logger.debug('CHROMA_MCP', 'Error during client close (subprocess may already be dead)', {}, error as Error);
}
getSupervisor().unregisterProcess(CHROMA_SUPERVISOR_ID);
this.client = null;
this.transport = null;
this.connected = false;
@@ -428,7 +435,7 @@ export class ChromaMcpManager {
*/
private getSpawnEnv(): Record<string, string> {
const baseEnv: Record<string, string> = {};
for (const [key, value] of Object.entries(process.env)) {
for (const [key, value] of Object.entries(sanitizeEnv(process.env))) {
if (value !== undefined) {
baseEnv[key] = value;
}
@@ -451,4 +458,21 @@ export class ChromaMcpManager {
NODE_EXTRA_CA_CERTS: combinedCertPath
};
}
private registerManagedProcess(): void {
const chromaProcess = (this.transport as unknown as { _process?: import('child_process').ChildProcess })._process;
if (!chromaProcess?.pid) {
return;
}
getSupervisor().registerProcess(CHROMA_SUPERVISOR_ID, {
pid: chromaProcess.pid,
type: 'chroma',
startedAt: new Date().toISOString()
}, chromaProcess);
chromaProcess.once('exit', () => {
getSupervisor().unregisterProcess(CHROMA_SUPERVISOR_ID);
});
}
}

View File

@@ -2,7 +2,7 @@ import { sessionInitHandler } from '../../cli/handlers/session-init.js';
import { observationHandler } from '../../cli/handlers/observation.js';
import { fileEditHandler } from '../../cli/handlers/file-edit.js';
import { sessionCompleteHandler } from '../../cli/handlers/session-complete.js';
import { ensureWorkerRunning, getWorkerPort } from '../../shared/worker-utils.js';
import { ensureWorkerRunning, workerHttpRequest } from '../../shared/worker-utils.js';
import { logger } from '../../utils/logger.js';
import { getProjectContext, getProjectName } from '../../utils/project-name.js';
import { writeAgentsMd } from '../../utils/agents-md-utils.js';
@@ -317,11 +317,10 @@ export class TranscriptEventProcessor {
const workerReady = await ensureWorkerRunning();
if (!workerReady) return;
const port = getWorkerPort();
const lastAssistantMessage = session.lastAssistantMessage ?? '';
try {
await fetch(`http://127.0.0.1:${port}/api/sessions/summarize`, {
await workerHttpRequest('/api/sessions/summarize', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
@@ -348,11 +347,10 @@ export class TranscriptEventProcessor {
const context = getProjectContext(cwd);
const projectsParam = context.allProjects.join(',');
const port = getWorkerPort();
try {
const response = await fetch(
`http://127.0.0.1:${port}/api/context/inject?projects=${encodeURIComponent(projectsParam)}`
const response = await workerHttpRequest(
`/api/context/inject?projects=${encodeURIComponent(projectsParam)}`
);
if (!response.ok) return;

View File

@@ -20,6 +20,8 @@ import { getAuthMethodDescription } from '../shared/EnvManager.js';
import { logger } from '../utils/logger.js';
import { ChromaMcpManager } from './sync/ChromaMcpManager.js';
import { ChromaSync } from './sync/ChromaSync.js';
import { configureSupervisorSignalHandlers, getSupervisor, startSupervisor } from '../supervisor/index.js';
import { sanitizeEnv } from '../supervisor/env-sanitizer.js';
// Windows: avoid repeated spawn popups when startup fails (issue #921)
const WINDOWS_SPAWN_COOLDOWN_MS = 2 * 60 * 1000;
@@ -78,7 +80,6 @@ import {
cleanStalePidFile,
isProcessAlive,
spawnDaemon,
createSignalHandler,
isPidFileRecent,
touchPidFile
} from './infrastructure/ProcessManager.js';
@@ -263,33 +264,10 @@ export class WorkerService {
* Register signal handlers for graceful shutdown
*/
private registerSignalHandlers(): void {
const shutdownRef = { value: this.isShuttingDown };
const handler = createSignalHandler(() => this.shutdown(), shutdownRef);
process.on('SIGTERM', () => {
this.isShuttingDown = shutdownRef.value;
handler('SIGTERM');
configureSupervisorSignalHandlers(async () => {
this.isShuttingDown = true;
await this.shutdown();
});
process.on('SIGINT', () => {
this.isShuttingDown = shutdownRef.value;
handler('SIGINT');
});
// SIGHUP: sent by kernel when controlling terminal closes.
// Daemon mode: ignore it (survive parent shell exit).
// Interactive mode: treat like SIGTERM (graceful shutdown).
if (process.platform !== 'win32') {
if (process.argv.includes('--daemon')) {
process.on('SIGHUP', () => {
logger.debug('SYSTEM', 'Ignoring SIGHUP in daemon mode');
});
} else {
process.on('SIGHUP', () => {
this.isShuttingDown = shutdownRef.value;
handler('SIGHUP');
});
}
}
}
/**
@@ -351,7 +329,9 @@ export class WorkerService {
const port = getWorkerPort();
const host = getWorkerHost();
// Start HTTP server FIRST - make port available immediately
await startSupervisor();
// Start HTTP server FIRST - make it available immediately
await this.server.listen(port, host);
// Worker writes its own PID - reliable on all platforms
@@ -363,6 +343,12 @@ export class WorkerService {
startedAt: new Date().toISOString()
});
getSupervisor().registerProcess('worker', {
pid: process.pid,
type: 'worker',
startedAt: new Date().toISOString()
});
logger.info('SYSTEM', 'Worker started', { host, port, pid: process.pid });
// Do slow initialization in background (non-blocking)
@@ -446,19 +432,50 @@ export class WorkerService {
// Connect to MCP server
const mcpServerPath = path.join(__dirname, 'mcp-server.cjs');
getSupervisor().assertCanSpawn('mcp server');
const transport = new StdioClientTransport({
command: 'node',
args: [mcpServerPath],
env: process.env
env: sanitizeEnv(process.env)
});
const MCP_INIT_TIMEOUT_MS = 300000;
const mcpConnectionPromise = this.mcpClient.connect(transport);
const timeoutPromise = new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error('MCP connection timeout after 5 minutes')), MCP_INIT_TIMEOUT_MS)
);
let timeoutId: ReturnType<typeof setTimeout>;
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(
() => reject(new Error('MCP connection timeout after 5 minutes')),
MCP_INIT_TIMEOUT_MS
);
});
await Promise.race([mcpConnectionPromise, timeoutPromise]);
try {
await Promise.race([mcpConnectionPromise, timeoutPromise]);
} catch (connectionError) {
clearTimeout(timeoutId!);
logger.warn('WORKER', 'MCP server connection failed, cleaning up subprocess', {
error: connectionError instanceof Error ? connectionError.message : String(connectionError)
});
try {
await transport.close();
} catch {
// Best effort: the supervisor handles later process cleanup for survivors.
}
throw connectionError;
}
clearTimeout(timeoutId!);
const mcpProcess = (transport as unknown as { _process?: import('child_process').ChildProcess })._process;
if (mcpProcess?.pid) {
getSupervisor().registerProcess('mcp-server', {
pid: mcpProcess.pid,
type: 'mcp',
startedAt: new Date().toISOString()
}, mcpProcess);
mcpProcess.once('exit', () => {
getSupervisor().unregisterProcess('mcp-server');
});
}
this.mcpReady = true;
logger.success('WORKER', 'MCP server connected');
@@ -470,7 +487,7 @@ export class WorkerService {
}
return activeIds;
});
logger.info('SYSTEM', 'Started orphan reaper (runs every 1 minute)');
logger.info('SYSTEM', 'Started orphan reaper (runs every 30 seconds)');
// Reap stale sessions to unblock orphan process cleanup (Issue #1168)
this.staleSessionReaperInterval = setInterval(async () => {
@@ -916,12 +933,22 @@ export class WorkerService {
* Ensures the worker is started and healthy.
* This function can be called by both 'start' and 'hook' commands.
*
* @param port - The port the worker should run on
* @param port - The TCP port (used for port-in-use checks and daemon spawn)
* @returns true if worker is healthy (existing or newly started), false on failure
*/
async function ensureWorkerStarted(port: number): Promise<boolean> {
// Clean stale PID file first (cheap: 1 fs read + 1 signal-0 check)
cleanStalePidFile();
const pidFileStatus = cleanStalePidFile();
if (pidFileStatus === 'alive') {
logger.info('SYSTEM', 'Worker PID file points to a live process, skipping duplicate spawn');
const healthy = await waitForHealth(port, getPlatformTimeout(HOOK_TIMEOUTS.PORT_IN_USE_WAIT));
if (healthy) {
logger.info('SYSTEM', 'Worker became healthy while waiting on live PID');
return true;
}
logger.warn('SYSTEM', 'Live PID detected but worker did not become healthy before timeout');
return false;
}
// Check if worker is already running and healthy
if (await waitForHealth(port, 1000)) {
@@ -1065,11 +1092,9 @@ async function main() {
case 'restart': {
logger.info('SYSTEM', 'Restarting worker');
await httpShutdown(port);
const freed = await waitForPortFree(port, getPlatformTimeout(15000));
if (!freed) {
const restartFreed = await waitForPortFree(port, getPlatformTimeout(15000));
if (!restartFreed) {
logger.error('SYSTEM', 'Port did not free up after shutdown, aborting restart', { port });
// Exit gracefully: Windows Terminal won't keep tab open on exit 0
// The wrapper/plugin will handle restart logic if needed
process.exit(0);
}
removePidFile();
@@ -1100,9 +1125,9 @@ async function main() {
}
case 'status': {
const running = await isPortInUse(port);
const portInUse = await isPortInUse(port);
const pidInfo = readPidFile();
if (running && pidInfo) {
if (portInUse && pidInfo) {
console.log('Worker is running');
console.log(` PID: ${pidInfo.pid}`);
console.log(` Port: ${pidInfo.port}`);
@@ -1122,13 +1147,7 @@ async function main() {
}
case 'hook': {
// Auto-start worker if not running
const workerReady = await ensureWorkerStarted(port);
if (!workerReady) {
logger.warn('SYSTEM', 'Worker failed to start before hook, handler will retry');
}
// Existing logic unchanged
// Validate CLI args first (before any I/O)
const platform = process.argv[3];
const event = process.argv[4];
if (!platform || !event) {
@@ -1138,32 +1157,20 @@ async function main() {
process.exit(1);
}
// Check if worker is already running on port
const portInUse = await isPortInUse(port);
let startedWorkerInProcess = false;
if (!portInUse) {
// Port free - start worker IN THIS PROCESS (no spawn!)
// This process becomes the worker and stays alive
try {
logger.info('SYSTEM', 'Starting worker in-process for hook', { event });
const worker = new WorkerService();
await worker.start();
startedWorkerInProcess = true;
// Worker is now running in this process on the port
} catch (error) {
logger.failure('SYSTEM', 'Worker failed to start in hook', {}, error as Error);
removePidFile();
process.exit(0);
}
// Ensure worker is running as a detached daemon (#1249).
//
// IMPORTANT: The hook process MUST NOT become the worker. Starting the
// worker in-process makes it a grandchild of Claude Code, which the
// sandbox kills. Instead, ensureWorkerStarted() spawns a fully detached
// daemon (detached: true, stdio: 'ignore', child.unref()) that survives
// the hook process's exit and is invisible to Claude Code's sandbox.
const workerReady = await ensureWorkerStarted(port);
if (!workerReady) {
logger.warn('SYSTEM', 'Worker failed to start before hook, handler will proceed gracefully');
}
// If port in use, we'll use HTTP to the existing worker
const { hookCommand } = await import('../cli/hook-command.js');
// If we started the worker in this process, skip process.exit() so we stay alive as the worker
await hookCommand(platform, event, { skipExit: startedWorkerInProcess });
// Note: if we started worker in-process, this process stays alive as the worker
// The break allows the event loop to continue serving requests
await hookCommand(platform, event);
break;
}

View File

@@ -19,6 +19,8 @@
import { spawn, exec, ChildProcess } from 'child_process';
import { promisify } from 'util';
import { logger } from '../../utils/logger.js';
import { sanitizeEnv } from '../../supervisor/env-sanitizer.js';
import { getSupervisor } from '../../supervisor/index.js';
const execAsync = promisify(exec);
@@ -29,14 +31,36 @@ interface TrackedProcess {
process: ChildProcess;
}
// PID Registry - tracks spawned Claude subprocesses
const processRegistry = new Map<number, TrackedProcess>();
function getTrackedProcesses(): TrackedProcess[] {
return getSupervisor().getRegistry()
.getAll()
.filter(record => record.type === 'sdk')
.map((record) => {
const processRef = getSupervisor().getRegistry().getRuntimeProcess(record.id);
if (!processRef) {
return null;
}
return {
pid: record.pid,
sessionDbId: Number(record.sessionId),
spawnedAt: Date.parse(record.startedAt),
process: processRef
};
})
.filter((value): value is TrackedProcess => value !== null);
}
/**
* Register a spawned process in the registry
*/
export function registerProcess(pid: number, sessionDbId: number, process: ChildProcess): void {
processRegistry.set(pid, { pid, sessionDbId, spawnedAt: Date.now(), process });
getSupervisor().registerProcess(`sdk:${sessionDbId}:${pid}`, {
pid,
type: 'sdk',
sessionId: sessionDbId,
startedAt: new Date().toISOString()
}, process);
logger.info('PROCESS', `Registered PID ${pid} for session ${sessionDbId}`, { pid, sessionDbId });
}
@@ -44,7 +68,11 @@ export function registerProcess(pid: number, sessionDbId: number, process: Child
* Unregister a process from the registry and notify pool waiters
*/
export function unregisterProcess(pid: number): void {
processRegistry.delete(pid);
for (const record of getSupervisor().getRegistry().getByPid(pid)) {
if (record.type === 'sdk') {
getSupervisor().unregisterProcess(record.id);
}
}
logger.debug('PROCESS', `Unregistered PID ${pid}`, { pid });
// Notify waiters that a pool slot may be available
notifySlotAvailable();
@@ -55,10 +83,7 @@ export function unregisterProcess(pid: number): void {
* Warns if multiple processes found (indicates race condition)
*/
export function getProcessBySession(sessionDbId: number): TrackedProcess | undefined {
const matches: TrackedProcess[] = [];
for (const [, info] of processRegistry) {
if (info.sessionDbId === sessionDbId) matches.push(info);
}
const matches = getTrackedProcesses().filter(info => info.sessionDbId === sessionDbId);
if (matches.length > 1) {
logger.warn('PROCESS', `Multiple processes found for session ${sessionDbId}`, {
count: matches.length,
@@ -72,7 +97,7 @@ export function getProcessBySession(sessionDbId: number): TrackedProcess | undef
* Get count of active processes in the registry
*/
export function getActiveCount(): number {
return processRegistry.size;
return getSupervisor().getRegistry().getAll().filter(record => record.type === 'sdk').length;
}
// Waiters for pool slots - resolved when a process exits and frees a slot
@@ -95,13 +120,14 @@ const TOTAL_PROCESS_HARD_CAP = 10;
export async function waitForSlot(maxConcurrent: number, timeoutMs: number = 60_000): Promise<void> {
// Hard cap: refuse to spawn if too many processes exist regardless of pool accounting
if (processRegistry.size >= TOTAL_PROCESS_HARD_CAP) {
throw new Error(`Hard cap exceeded: ${processRegistry.size} processes in registry (cap=${TOTAL_PROCESS_HARD_CAP}). Refusing to spawn more.`);
const activeCount = getActiveCount();
if (activeCount >= TOTAL_PROCESS_HARD_CAP) {
throw new Error(`Hard cap exceeded: ${activeCount} processes in registry (cap=${TOTAL_PROCESS_HARD_CAP}). Refusing to spawn more.`);
}
if (processRegistry.size < maxConcurrent) return;
if (activeCount < maxConcurrent) return;
logger.info('PROCESS', `Pool limit reached (${processRegistry.size}/${maxConcurrent}), waiting for slot...`);
logger.info('PROCESS', `Pool limit reached (${activeCount}/${maxConcurrent}), waiting for slot...`);
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
@@ -112,7 +138,7 @@ export async function waitForSlot(maxConcurrent: number, timeoutMs: number = 60_
const onSlot = () => {
clearTimeout(timeout);
if (processRegistry.size < maxConcurrent) {
if (getActiveCount() < maxConcurrent) {
resolve();
} else {
// Still full, re-queue
@@ -129,7 +155,7 @@ export async function waitForSlot(maxConcurrent: number, timeoutMs: number = 60_
*/
export function getActiveProcesses(): Array<{ pid: number; sessionDbId: number; ageMs: number }> {
const now = Date.now();
return Array.from(processRegistry.values()).map(info => ({
return getTrackedProcesses().map(info => ({
pid: info.pid,
sessionDbId: info.sessionDbId,
ageMs: now - info.spawnedAt
@@ -308,17 +334,26 @@ export async function reapOrphanedProcesses(activeSessionIds: Set<number>): Prom
let killed = 0;
// Registry-based: kill processes for dead sessions
for (const [pid, info] of processRegistry) {
if (activeSessionIds.has(info.sessionDbId)) continue; // Active = safe
for (const record of getSupervisor().getRegistry().getAll().filter(entry => entry.type === 'sdk')) {
const pid = record.pid;
const sessionDbId = Number(record.sessionId);
const processRef = getSupervisor().getRegistry().getRuntimeProcess(record.id);
logger.warn('PROCESS', `Killing orphan PID ${pid} (session ${info.sessionDbId} gone)`, { pid, sessionDbId: info.sessionDbId });
if (activeSessionIds.has(sessionDbId)) continue; // Active = safe
logger.warn('PROCESS', `Killing orphan PID ${pid} (session ${sessionDbId} gone)`, { pid, sessionDbId });
try {
info.process.kill('SIGKILL');
if (processRef) {
processRef.kill('SIGKILL');
} else {
process.kill(pid, 'SIGKILL');
}
killed++;
} catch {
// Already dead
}
unregisterProcess(pid);
getSupervisor().unregisterProcess(record.id);
notifySlotAvailable();
}
// System-level: find ppid=1 orphans
@@ -347,20 +382,23 @@ export function createPidCapturingSpawn(sessionDbId: number) {
env?: NodeJS.ProcessEnv;
signal?: AbortSignal;
}) => {
getSupervisor().assertCanSpawn('claude sdk');
// On Windows, use cmd.exe wrapper for .cmd files to properly handle paths with spaces
const useCmdWrapper = process.platform === 'win32' && spawnOptions.command.endsWith('.cmd');
const env = sanitizeEnv(spawnOptions.env ?? process.env);
const child = useCmdWrapper
? spawn('cmd.exe', ['/d', '/c', spawnOptions.command, ...spawnOptions.args], {
cwd: spawnOptions.cwd,
env: spawnOptions.env,
env,
stdio: ['pipe', 'pipe', 'pipe'],
signal: spawnOptions.signal,
windowsHide: true
})
: spawn(spawnOptions.command, spawnOptions.args, {
cwd: spawnOptions.cwd,
env: spawnOptions.env,
env,
stdio: ['pipe', 'pipe', 'pipe'],
signal: spawnOptions.signal, // CRITICAL: Pass signal for AbortController integration
windowsHide: true
@@ -407,7 +445,7 @@ export function createPidCapturingSpawn(sessionDbId: number) {
* Start the orphan reaper interval
* Returns cleanup function to stop the interval
*/
export function startOrphanReaper(getActiveSessionIds: () => Set<number>, intervalMs: number = 60 * 1000): () => void {
export function startOrphanReaper(getActiveSessionIds: () => Set<number>, intervalMs: number = 30 * 1000): () => void {
const interval = setInterval(async () => {
try {
const activeIds = getActiveSessionIds();

View File

@@ -22,6 +22,7 @@ import type { ActiveSession, SDKUserMessage } from '../worker-types.js';
import { ModeManager } from '../domain/ModeManager.js';
import { processAgentResponse, type WorkerRef } from './agents/index.js';
import { createPidCapturingSpawn, getProcessBySession, ensureProcessExit, waitForSlot } from './ProcessRegistry.js';
import { sanitizeEnv } from '../../supervisor/env-sanitizer.js';
// Import Agent SDK (assumes it's installed)
// @ts-ignore - Agent SDK types may not be available
@@ -96,7 +97,7 @@ export class SDKAgent {
// Build isolated environment from ~/.claude-mem/.env
// This prevents Issue #733: random ANTHROPIC_API_KEY from project .env files
// being used instead of the configured auth method (CLI subscription or explicit API key)
const isolatedEnv = buildIsolatedEnv();
const isolatedEnv = sanitizeEnv(buildIsolatedEnv());
const authMethod = getAuthMethodDescription();
logger.info('SDK', 'Starting SDK query', {

View File

@@ -15,6 +15,7 @@ import type { ActiveSession, PendingMessage, PendingMessageWithId, ObservationDa
import { PendingMessageStore } from '../sqlite/PendingMessageStore.js';
import { SessionQueueProcessor } from '../queue/SessionQueueProcessor.js';
import { getProcessBySession, ensureProcessExit } from './ProcessRegistry.js';
import { getSupervisor } from '../../supervisor/index.js';
export class SessionManager {
private dbManager: DatabaseManager;
@@ -310,6 +311,17 @@ export class SessionManager {
await ensureProcessExit(tracked, 5000);
}
// 3b. Reap all supervisor-tracked processes for this session (#1351)
// This catches MCP servers and other child processes not tracked by the
// in-memory ProcessRegistry (e.g. processes registered only in supervisor.json).
try {
await getSupervisor().getRegistry().reapSession(sessionDbId);
} catch (error) {
logger.warn('SESSION', 'Supervisor reapSession failed (non-blocking)', {
sessionId: sessionDbId
}, error as Error);
}
// 4. Cleanup
this.sessions.delete(sessionDbId);
this.sessionQueues.delete(sessionDbId);

View File

@@ -57,13 +57,13 @@ export function createMiddleware(
// Log incoming request with body summary
const bodySummary = summarizeRequestBody(req.method, req.path, req.body);
logger.info('HTTP', `${req.method} ${req.path}`, { requestId }, bodySummary);
logger.debug('HTTP', `${req.method} ${req.path}`, { requestId }, bodySummary);
// Capture response
const originalSend = res.send.bind(res);
res.send = function(body: any) {
const duration = Date.now() - start;
logger.info('HTTP', `${res.statusCode} ${req.path}`, { requestId, duration: `${duration}ms` });
logger.debug('HTTP', `${res.statusCode} ${req.path}`, { requestId, duration: `${duration}ms` });
return originalSend(body);
};

View File

@@ -78,8 +78,8 @@ export function getWorkerHost(): string {
}
/**
* Clear the cached port and host values
* Call this when settings are updated to force re-reading from file
* Clear the cached port and host values.
* Call this when settings are updated to force re-reading from file.
*/
export function clearPortCache(): void {
cachedPort = null;
@@ -87,7 +87,46 @@ export function clearPortCache(): void {
}
/**
* Check if worker HTTP server is responsive
* Build a full URL for a given API path.
*/
export function buildWorkerUrl(apiPath: string): string {
return `http://${getWorkerHost()}:${getWorkerPort()}${apiPath}`;
}
/**
* Make an HTTP request to the worker over TCP.
*
* This is the preferred way for hooks to communicate with the worker.
*/
export function workerHttpRequest(
apiPath: string,
options: {
method?: string;
headers?: Record<string, string>;
body?: string;
timeoutMs?: number;
} = {}
): Promise<Response> {
const method = options.method ?? 'GET';
const timeoutMs = options.timeoutMs ?? HEALTH_CHECK_TIMEOUT_MS;
const url = buildWorkerUrl(apiPath);
const init: RequestInit = { method };
if (options.headers) {
init.headers = options.headers;
}
if (options.body) {
init.body = options.body;
}
if (timeoutMs > 0) {
return fetchWithTimeout(url, init, timeoutMs);
}
return fetch(url, init);
}
/**
* Check if worker HTTP server is responsive.
* Uses /api/health (liveness) instead of /api/readiness because:
* - Hooks have 15-second timeout, but full initialization can take 5+ minutes (MCP connection)
* - /api/health returns 200 as soon as HTTP server is up (sufficient for hook communication)
@@ -95,10 +134,7 @@ export function clearPortCache(): void {
* See: https://github.com/thedotmack/claude-mem/issues/811
*/
async function isWorkerHealthy(): Promise<boolean> {
const port = getWorkerPort();
const response = await fetchWithTimeout(
`http://127.0.0.1:${port}/api/health`, {}, HEALTH_CHECK_TIMEOUT_MS
);
const response = await workerHttpRequest('/api/health', { timeoutMs: HEALTH_CHECK_TIMEOUT_MS });
return response.ok;
}
@@ -125,10 +161,7 @@ function getPluginVersion(): string {
* Get the running worker's version from the API
*/
async function getWorkerVersion(): Promise<string> {
const port = getWorkerPort();
const response = await fetchWithTimeout(
`http://127.0.0.1:${port}/api/version`, {}, HEALTH_CHECK_TIMEOUT_MS
);
const response = await workerHttpRequest('/api/version', { timeoutMs: HEALTH_CHECK_TIMEOUT_MS });
if (!response.ok) {
throw new Error(`Failed to get worker version: ${response.status}`);
}

View File

@@ -0,0 +1,20 @@
export const ENV_PREFIXES = ['CLAUDECODE_', 'CLAUDE_CODE_'];
export const ENV_EXACT_MATCHES = new Set([
'CLAUDECODE',
'CLAUDE_CODE_SESSION',
'CLAUDE_CODE_ENTRYPOINT',
'MCP_SESSION_ID',
]);
export function sanitizeEnv(env: NodeJS.ProcessEnv = process.env): NodeJS.ProcessEnv {
const sanitized: NodeJS.ProcessEnv = {};
for (const [key, value] of Object.entries(env)) {
if (value === undefined) continue;
if (ENV_EXACT_MATCHES.has(key)) continue;
if (ENV_PREFIXES.some(prefix => key.startsWith(prefix))) continue;
sanitized[key] = value;
}
return sanitized;
}

View File

@@ -0,0 +1,40 @@
/**
* Health Checker - Periodic background cleanup of dead processes
*
* Runs every 30 seconds to prune dead processes from the supervisor registry.
* The interval is unref'd so it does not keep the process alive.
*/
import { logger } from '../utils/logger.js';
import { getProcessRegistry } from './process-registry.js';
const HEALTH_CHECK_INTERVAL_MS = 30_000;
let healthCheckInterval: ReturnType<typeof setInterval> | null = null;
function runHealthCheck(): void {
const registry = getProcessRegistry();
const removedProcessCount = registry.pruneDeadEntries();
if (removedProcessCount > 0) {
logger.info('SYSTEM', `Health check: pruned ${removedProcessCount} dead process(es) from registry`);
}
}
export function startHealthChecker(): void {
if (healthCheckInterval !== null) return;
healthCheckInterval = setInterval(runHealthCheck, HEALTH_CHECK_INTERVAL_MS);
healthCheckInterval.unref();
logger.debug('SYSTEM', 'Health checker started', { intervalMs: HEALTH_CHECK_INTERVAL_MS });
}
export function stopHealthChecker(): void {
if (healthCheckInterval === null) return;
clearInterval(healthCheckInterval);
healthCheckInterval = null;
logger.debug('SYSTEM', 'Health checker stopped');
}

188
src/supervisor/index.ts Normal file
View File

@@ -0,0 +1,188 @@
import { existsSync, readFileSync, rmSync } from 'fs';
import { homedir } from 'os';
import path from 'path';
import { logger } from '../utils/logger.js';
import { getProcessRegistry, isPidAlive, type ManagedProcessInfo, type ProcessRegistry } from './process-registry.js';
import { runShutdownCascade } from './shutdown.js';
import { startHealthChecker, stopHealthChecker } from './health-checker.js';
const DATA_DIR = path.join(homedir(), '.claude-mem');
const PID_FILE = path.join(DATA_DIR, 'worker.pid');
interface PidInfo {
pid: number;
port: number;
startedAt: string;
}
interface ValidateWorkerPidOptions {
logAlive?: boolean;
pidFilePath?: string;
}
export type ValidateWorkerPidStatus = 'missing' | 'alive' | 'stale' | 'invalid';
class Supervisor {
private readonly registry: ProcessRegistry;
private started = false;
private stopPromise: Promise<void> | null = null;
private signalHandlersRegistered = false;
private shutdownInitiated = false;
private shutdownHandler: (() => Promise<void>) | null = null;
constructor(registry: ProcessRegistry) {
this.registry = registry;
}
async start(): Promise<void> {
if (this.started) return;
this.registry.initialize();
const pidStatus = validateWorkerPidFile({ logAlive: false });
if (pidStatus === 'alive') {
throw new Error('Worker already running');
}
this.started = true;
startHealthChecker();
}
configureSignalHandlers(shutdownHandler: () => Promise<void>): void {
this.shutdownHandler = shutdownHandler;
if (this.signalHandlersRegistered) return;
this.signalHandlersRegistered = true;
const handleSignal = async (signal: string): Promise<void> => {
if (this.shutdownInitiated) {
logger.warn('SYSTEM', `Received ${signal} but shutdown already in progress`);
return;
}
this.shutdownInitiated = true;
logger.info('SYSTEM', `Received ${signal}, shutting down...`);
try {
if (this.shutdownHandler) {
await this.shutdownHandler();
} else {
await this.stop();
}
} catch (error) {
logger.error('SYSTEM', 'Error during shutdown', {}, error as Error);
try {
await this.stop();
} catch (stopError) {
logger.debug('SYSTEM', 'Supervisor shutdown fallback failed', {}, stopError as Error);
}
}
process.exit(0);
};
process.on('SIGTERM', () => void handleSignal('SIGTERM'));
process.on('SIGINT', () => void handleSignal('SIGINT'));
if (process.platform !== 'win32') {
if (process.argv.includes('--daemon')) {
process.on('SIGHUP', () => {
logger.debug('SYSTEM', 'Ignoring SIGHUP in daemon mode');
});
} else {
process.on('SIGHUP', () => void handleSignal('SIGHUP'));
}
}
}
async stop(): Promise<void> {
if (this.stopPromise) {
await this.stopPromise;
return;
}
stopHealthChecker();
this.stopPromise = runShutdownCascade({
registry: this.registry,
currentPid: process.pid
}).finally(() => {
this.started = false;
this.stopPromise = null;
});
await this.stopPromise;
}
assertCanSpawn(type: string): void {
if (this.stopPromise !== null) {
throw new Error(`Supervisor is shutting down, refusing to spawn ${type}`);
}
}
registerProcess(id: string, processInfo: ManagedProcessInfo, processRef?: Parameters<ProcessRegistry['register']>[2]): void {
this.registry.register(id, processInfo, processRef);
}
unregisterProcess(id: string): void {
this.registry.unregister(id);
}
getRegistry(): ProcessRegistry {
return this.registry;
}
}
const supervisorSingleton = new Supervisor(getProcessRegistry());
export async function startSupervisor(): Promise<void> {
await supervisorSingleton.start();
}
export async function stopSupervisor(): Promise<void> {
await supervisorSingleton.stop();
}
export function getSupervisor(): Supervisor {
return supervisorSingleton;
}
export function configureSupervisorSignalHandlers(shutdownHandler: () => Promise<void>): void {
supervisorSingleton.configureSignalHandlers(shutdownHandler);
}
export function validateWorkerPidFile(options: ValidateWorkerPidOptions = {}): ValidateWorkerPidStatus {
const pidFilePath = options.pidFilePath ?? PID_FILE;
if (!existsSync(pidFilePath)) {
return 'missing';
}
let pidInfo: PidInfo | null = null;
try {
pidInfo = JSON.parse(readFileSync(pidFilePath, 'utf-8')) as PidInfo;
} catch (error) {
logger.warn('SYSTEM', 'Failed to parse worker PID file, removing it', { path: pidFilePath }, error as Error);
rmSync(pidFilePath, { force: true });
return 'invalid';
}
if (isPidAlive(pidInfo.pid)) {
if (options.logAlive ?? true) {
logger.info('SYSTEM', 'Worker already running (PID alive)', {
existingPid: pidInfo.pid,
existingPort: pidInfo.port,
startedAt: pidInfo.startedAt
});
}
return 'alive';
}
logger.info('SYSTEM', 'Removing stale PID file (worker process is dead)', {
pid: pidInfo.pid,
port: pidInfo.port,
startedAt: pidInfo.startedAt
});
rmSync(pidFilePath, { force: true });
return 'stale';
}

View File

@@ -0,0 +1,253 @@
import { ChildProcess } from 'child_process';
import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'fs';
import { homedir } from 'os';
import path from 'path';
import { logger } from '../utils/logger.js';
const REAP_SESSION_SIGTERM_TIMEOUT_MS = 5_000;
const REAP_SESSION_SIGKILL_TIMEOUT_MS = 1_000;
const DATA_DIR = path.join(homedir(), '.claude-mem');
const DEFAULT_REGISTRY_PATH = path.join(DATA_DIR, 'supervisor.json');
export interface ManagedProcessInfo {
pid: number;
type: string;
sessionId?: string | number;
startedAt: string;
}
export interface ManagedProcessRecord extends ManagedProcessInfo {
id: string;
}
interface PersistedRegistry {
processes: Record<string, ManagedProcessInfo>;
}
export function isPidAlive(pid: number): boolean {
if (!Number.isInteger(pid) || pid < 0) return false;
if (pid === 0) return false;
try {
process.kill(pid, 0);
return true;
} catch (error: unknown) {
const code = (error as NodeJS.ErrnoException).code;
return code === 'EPERM';
}
}
export class ProcessRegistry {
private readonly registryPath: string;
private readonly entries = new Map<string, ManagedProcessInfo>();
private readonly runtimeProcesses = new Map<string, ChildProcess>();
private initialized = false;
constructor(registryPath: string = DEFAULT_REGISTRY_PATH) {
this.registryPath = registryPath;
}
initialize(): void {
if (this.initialized) return;
this.initialized = true;
mkdirSync(path.dirname(this.registryPath), { recursive: true });
if (!existsSync(this.registryPath)) {
this.persist();
return;
}
try {
const raw = JSON.parse(readFileSync(this.registryPath, 'utf-8')) as PersistedRegistry;
const processes = raw.processes ?? {};
for (const [id, info] of Object.entries(processes)) {
this.entries.set(id, info);
}
} catch (error) {
logger.warn('SYSTEM', 'Failed to parse supervisor registry, rebuilding', {
path: this.registryPath
}, error as Error);
this.entries.clear();
}
const removed = this.pruneDeadEntries();
if (removed > 0) {
logger.info('SYSTEM', 'Removed dead processes from supervisor registry', { removed });
}
this.persist();
}
register(id: string, processInfo: ManagedProcessInfo, processRef?: ChildProcess): void {
this.initialize();
this.entries.set(id, processInfo);
if (processRef) {
this.runtimeProcesses.set(id, processRef);
}
this.persist();
}
unregister(id: string): void {
this.initialize();
this.entries.delete(id);
this.runtimeProcesses.delete(id);
this.persist();
}
clear(): void {
this.entries.clear();
this.runtimeProcesses.clear();
this.persist();
}
getAll(): ManagedProcessRecord[] {
this.initialize();
return Array.from(this.entries.entries())
.map(([id, info]) => ({ id, ...info }))
.sort((a, b) => {
const left = Date.parse(a.startedAt);
const right = Date.parse(b.startedAt);
return (Number.isNaN(left) ? 0 : left) - (Number.isNaN(right) ? 0 : right);
});
}
getBySession(sessionId: string | number): ManagedProcessRecord[] {
const normalized = String(sessionId);
return this.getAll().filter(record => record.sessionId !== undefined && String(record.sessionId) === normalized);
}
getRuntimeProcess(id: string): ChildProcess | undefined {
return this.runtimeProcesses.get(id);
}
getByPid(pid: number): ManagedProcessRecord[] {
return this.getAll().filter(record => record.pid === pid);
}
pruneDeadEntries(): number {
this.initialize();
let removed = 0;
for (const [id, info] of this.entries) {
if (isPidAlive(info.pid)) continue;
this.entries.delete(id);
this.runtimeProcesses.delete(id);
removed += 1;
}
if (removed > 0) {
this.persist();
}
return removed;
}
/**
* Kill and unregister all processes tagged with the given sessionId.
* Sends SIGTERM first, waits up to 5s, then SIGKILL for survivors.
* Called when a session is deleted to prevent leaked child processes (#1351).
*/
async reapSession(sessionId: string | number): Promise<number> {
this.initialize();
const sessionRecords = this.getBySession(sessionId);
if (sessionRecords.length === 0) {
return 0;
}
const sessionIdNum = typeof sessionId === 'number' ? sessionId : Number(sessionId) || undefined;
logger.info('SYSTEM', `Reaping ${sessionRecords.length} process(es) for session ${sessionId}`, {
sessionId: sessionIdNum,
pids: sessionRecords.map(r => r.pid)
});
// Phase 1: SIGTERM all alive processes
const aliveRecords = sessionRecords.filter(r => isPidAlive(r.pid));
for (const record of aliveRecords) {
try {
process.kill(record.pid, 'SIGTERM');
} catch (error: unknown) {
const code = (error as NodeJS.ErrnoException).code;
if (code !== 'ESRCH') {
logger.debug('SYSTEM', `Failed to SIGTERM session process PID ${record.pid}`, {
pid: record.pid
}, error as Error);
}
}
}
// Phase 2: Wait for processes to exit
const deadline = Date.now() + REAP_SESSION_SIGTERM_TIMEOUT_MS;
while (Date.now() < deadline) {
const survivors = aliveRecords.filter(r => isPidAlive(r.pid));
if (survivors.length === 0) break;
await new Promise(resolve => setTimeout(resolve, 100));
}
// Phase 3: SIGKILL any survivors
const survivors = aliveRecords.filter(r => isPidAlive(r.pid));
for (const record of survivors) {
logger.warn('SYSTEM', `Session process PID ${record.pid} did not exit after SIGTERM, sending SIGKILL`, {
pid: record.pid,
sessionId: sessionIdNum
});
try {
process.kill(record.pid, 'SIGKILL');
} catch (error: unknown) {
const code = (error as NodeJS.ErrnoException).code;
if (code !== 'ESRCH') {
logger.debug('SYSTEM', `Failed to SIGKILL session process PID ${record.pid}`, {
pid: record.pid
}, error as Error);
}
}
}
// Brief wait for SIGKILL to take effect
if (survivors.length > 0) {
const sigkillDeadline = Date.now() + REAP_SESSION_SIGKILL_TIMEOUT_MS;
while (Date.now() < sigkillDeadline) {
const remaining = survivors.filter(r => isPidAlive(r.pid));
if (remaining.length === 0) break;
await new Promise(resolve => setTimeout(resolve, 100));
}
}
// Phase 4: Unregister all session records
for (const record of sessionRecords) {
this.entries.delete(record.id);
this.runtimeProcesses.delete(record.id);
}
this.persist();
logger.info('SYSTEM', `Reaped ${sessionRecords.length} process(es) for session ${sessionId}`, {
sessionId: sessionIdNum,
reaped: sessionRecords.length
});
return sessionRecords.length;
}
private persist(): void {
const payload: PersistedRegistry = {
processes: Object.fromEntries(this.entries.entries())
};
mkdirSync(path.dirname(this.registryPath), { recursive: true });
writeFileSync(this.registryPath, JSON.stringify(payload, null, 2));
}
}
let registrySingleton: ProcessRegistry | null = null;
export function getProcessRegistry(): ProcessRegistry {
if (!registrySingleton) {
registrySingleton = new ProcessRegistry();
}
return registrySingleton;
}
export function createProcessRegistry(registryPath: string): ProcessRegistry {
return new ProcessRegistry(registryPath);
}

157
src/supervisor/shutdown.ts Normal file
View File

@@ -0,0 +1,157 @@
import { execFile } from 'child_process';
import { rmSync } from 'fs';
import { homedir } from 'os';
import path from 'path';
import { promisify } from 'util';
import { logger } from '../utils/logger.js';
import { HOOK_TIMEOUTS } from '../shared/hook-constants.js';
import { isPidAlive, type ManagedProcessRecord, type ProcessRegistry } from './process-registry.js';
const execFileAsync = promisify(execFile);
const DATA_DIR = path.join(homedir(), '.claude-mem');
const PID_FILE = path.join(DATA_DIR, 'worker.pid');
type TreeKillFn = (pid: number, signal?: string, callback?: (error?: Error | null) => void) => void;
export interface ShutdownCascadeOptions {
registry: ProcessRegistry;
currentPid?: number;
pidFilePath?: string;
}
export async function runShutdownCascade(options: ShutdownCascadeOptions): Promise<void> {
const currentPid = options.currentPid ?? process.pid;
const pidFilePath = options.pidFilePath ?? PID_FILE;
const allRecords = options.registry.getAll();
const childRecords = [...allRecords]
.filter(record => record.pid !== currentPid)
.sort((a, b) => Date.parse(b.startedAt) - Date.parse(a.startedAt));
for (const record of childRecords) {
if (!isPidAlive(record.pid)) {
options.registry.unregister(record.id);
continue;
}
try {
await signalProcess(record.pid, 'SIGTERM');
} catch (error) {
logger.debug('SYSTEM', 'Failed to send SIGTERM to child process', {
pid: record.pid,
type: record.type
}, error as Error);
}
}
await waitForExit(childRecords, 5000);
const survivors = childRecords.filter(record => isPidAlive(record.pid));
for (const record of survivors) {
try {
await signalProcess(record.pid, 'SIGKILL');
} catch (error) {
logger.debug('SYSTEM', 'Failed to force kill child process', {
pid: record.pid,
type: record.type
}, error as Error);
}
}
await waitForExit(survivors, 1000);
for (const record of childRecords) {
options.registry.unregister(record.id);
}
for (const record of allRecords.filter(record => record.pid === currentPid)) {
options.registry.unregister(record.id);
}
try {
rmSync(pidFilePath, { force: true });
} catch (error) {
logger.debug('SYSTEM', 'Failed to remove PID file during shutdown', { pidFilePath }, error as Error);
}
options.registry.pruneDeadEntries();
}
async function waitForExit(records: ManagedProcessRecord[], timeoutMs: number): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const survivors = records.filter(record => isPidAlive(record.pid));
if (survivors.length === 0) {
return;
}
await new Promise(resolve => setTimeout(resolve, 100));
}
}
async function signalProcess(pid: number, signal: 'SIGTERM' | 'SIGKILL'): Promise<void> {
if (signal === 'SIGTERM') {
try {
process.kill(pid, signal);
} catch (error) {
const errno = (error as NodeJS.ErrnoException).code;
if (errno === 'ESRCH') {
return;
}
throw error;
}
return;
}
if (process.platform === 'win32') {
const treeKill = await loadTreeKill();
if (treeKill) {
await new Promise<void>((resolve, reject) => {
treeKill(pid, signal, (error) => {
if (!error) {
resolve();
return;
}
const errno = (error as NodeJS.ErrnoException).code;
if (errno === 'ESRCH') {
resolve();
return;
}
reject(error);
});
});
return;
}
const args = ['/PID', String(pid), '/T'];
if (signal === 'SIGKILL') {
args.push('/F');
}
await execFileAsync('taskkill', args, {
timeout: HOOK_TIMEOUTS.POWERSHELL_COMMAND,
windowsHide: true
});
return;
}
try {
process.kill(pid, signal);
} catch (error) {
const errno = (error as NodeJS.ErrnoException).code;
if (errno === 'ESRCH') {
return;
}
throw error;
}
}
async function loadTreeKill(): Promise<TreeKillFn | null> {
const moduleName = 'tree-kill';
try {
const treeKillModule = await import(moduleName);
return (treeKillModule.default ?? treeKillModule) as TreeKillFn;
} catch {
return null;
}
}

7
src/types/tree-kill.d.ts vendored Normal file
View File

@@ -0,0 +1,7 @@
declare module 'tree-kill' {
export default function treeKill(
pid: number,
signal?: string,
callback?: (error?: Error | null) => void
): void;
}

View File

@@ -12,7 +12,7 @@ import os from 'os';
import { logger } from './logger.js';
import { formatDate, groupByDate } from '../shared/timeline-formatting.js';
import { SettingsDefaultsManager } from '../shared/SettingsDefaultsManager.js';
import { getWorkerHost } from '../shared/worker-utils.js';
import { workerHttpRequest } from '../shared/worker-utils.js';
const SETTINGS_PATH = path.join(os.homedir(), '.claude-mem', 'settings.json');
@@ -321,12 +321,12 @@ function isExcludedFolder(folderPath: string, excludePaths: string[]): boolean {
*
* @param filePaths - Array of absolute file paths (modified or read)
* @param project - Project identifier for API query
* @param port - Worker API port
* @param _port - Worker API port (legacy, now resolved automatically via socket/TCP)
*/
export async function updateFolderClaudeMdFiles(
filePaths: string[],
project: string,
port: number,
_port: number,
projectRoot?: string
): Promise<void> {
// Load settings to get configurable observation limit and exclude list
@@ -417,10 +417,9 @@ export async function updateFolderClaudeMdFiles(
// Process each folder
for (const folderPath of folderPaths) {
try {
// Fetch timeline via existing API
const host = getWorkerHost();
const response = await fetch(
`http://${host}:${port}/api/search/by-file?filePath=${encodeURIComponent(folderPath)}&limit=${limit}&project=${encodeURIComponent(project)}&isFolder=true`
// Fetch timeline via existing API (uses socket or TCP automatically)
const response = await workerHttpRequest(
`/api/search/by-file?filePath=${encodeURIComponent(folderPath)}&limit=${limit}&project=${encodeURIComponent(project)}&isFolder=true`
);
if (!response.ok) {

View File

@@ -27,6 +27,15 @@ mock.module('../../src/shared/SettingsDefaultsManager.js', () => ({
mock.module('../../src/shared/worker-utils.js', () => ({
ensureWorkerRunning: () => Promise.resolve(true),
getWorkerPort: () => 37777,
workerHttpRequest: (apiPath: string, options?: any) => {
// Delegate to global fetch so tests can mock fetch behavior
const url = `http://127.0.0.1:37777${apiPath}`;
return globalThis.fetch(url, {
method: options?.method ?? 'GET',
headers: options?.headers,
body: options?.body,
});
},
}));
mock.module('../../src/utils/project-name.js', () => ({

View File

@@ -59,7 +59,11 @@ describe('HealthMonitor', () => {
describe('waitForHealth', () => {
it('should succeed immediately when server responds', async () => {
global.fetch = mock(() => Promise.resolve({ ok: true } as Response));
global.fetch = mock(() => Promise.resolve({
ok: true,
status: 200,
text: () => Promise.resolve('')
} as unknown as Response));
const start = Date.now();
const result = await waitForHealth(37777, 5000);
@@ -91,7 +95,11 @@ describe('HealthMonitor', () => {
if (callCount < 3) {
return Promise.reject(new Error('ECONNREFUSED'));
}
return Promise.resolve({ ok: true } as Response);
return Promise.resolve({
ok: true,
status: 200,
text: () => Promise.resolve('')
} as unknown as Response);
});
const result = await waitForHealth(37777, 5000);
@@ -101,7 +109,11 @@ describe('HealthMonitor', () => {
});
it('should check health endpoint for liveness', async () => {
const fetchMock = mock(() => Promise.resolve({ ok: true } as Response));
const fetchMock = mock(() => Promise.resolve({
ok: true,
status: 200,
text: () => Promise.resolve('')
} as unknown as Response));
global.fetch = fetchMock;
await waitForHealth(37777, 1000);
@@ -115,7 +127,11 @@ describe('HealthMonitor', () => {
});
it('should use default timeout when not specified', async () => {
global.fetch = mock(() => Promise.resolve({ ok: true } as Response));
global.fetch = mock(() => Promise.resolve({
ok: true,
status: 200,
text: () => Promise.resolve('')
} as unknown as Response));
// Just verify it doesn't throw and returns quickly
const result = await waitForHealth(37777);
@@ -154,8 +170,9 @@ describe('HealthMonitor', () => {
it('should detect version mismatch', async () => {
global.fetch = mock(() => Promise.resolve({
ok: true,
json: () => Promise.resolve({ version: '0.0.0-definitely-wrong' })
} as Response));
status: 200,
text: () => Promise.resolve(JSON.stringify({ version: '0.0.0-definitely-wrong' }))
} as unknown as Response));
const result = await checkVersionMatch(37777);
@@ -172,8 +189,9 @@ describe('HealthMonitor', () => {
global.fetch = mock(() => Promise.resolve({
ok: true,
json: () => Promise.resolve({ version: pluginVersion })
} as Response));
status: 200,
text: () => Promise.resolve(JSON.stringify({ version: pluginVersion }))
} as unknown as Response));
const result = await checkVersionMatch(37777);

View File

@@ -0,0 +1,123 @@
import { describe, expect, it } from 'bun:test';
import { sanitizeEnv } from '../../src/supervisor/env-sanitizer.js';
describe('sanitizeEnv', () => {
it('strips variables with CLAUDECODE_ prefix', () => {
const result = sanitizeEnv({
CLAUDECODE_FOO: 'bar',
CLAUDECODE_SOMETHING: 'value',
PATH: '/usr/bin'
});
expect(result.CLAUDECODE_FOO).toBeUndefined();
expect(result.CLAUDECODE_SOMETHING).toBeUndefined();
expect(result.PATH).toBe('/usr/bin');
});
it('strips variables with CLAUDE_CODE_ prefix', () => {
const result = sanitizeEnv({
CLAUDE_CODE_BAR: 'baz',
CLAUDE_CODE_OAUTH_TOKEN: 'token',
HOME: '/home/user'
});
expect(result.CLAUDE_CODE_BAR).toBeUndefined();
expect(result.CLAUDE_CODE_OAUTH_TOKEN).toBeUndefined();
expect(result.HOME).toBe('/home/user');
});
it('strips exact-match variables (CLAUDECODE, CLAUDE_CODE_SESSION, CLAUDE_CODE_ENTRYPOINT, MCP_SESSION_ID)', () => {
const result = sanitizeEnv({
CLAUDECODE: '1',
CLAUDE_CODE_SESSION: 'session-123',
CLAUDE_CODE_ENTRYPOINT: 'hook',
MCP_SESSION_ID: 'mcp-abc',
NODE_PATH: '/usr/local/lib'
});
expect(result.CLAUDECODE).toBeUndefined();
expect(result.CLAUDE_CODE_SESSION).toBeUndefined();
expect(result.CLAUDE_CODE_ENTRYPOINT).toBeUndefined();
expect(result.MCP_SESSION_ID).toBeUndefined();
expect(result.NODE_PATH).toBe('/usr/local/lib');
});
it('preserves allowed variables like PATH, HOME, NODE_PATH', () => {
const result = sanitizeEnv({
PATH: '/usr/bin:/usr/local/bin',
HOME: '/home/user',
NODE_PATH: '/usr/local/lib/node_modules',
SHELL: '/bin/zsh',
USER: 'developer',
LANG: 'en_US.UTF-8'
});
expect(result.PATH).toBe('/usr/bin:/usr/local/bin');
expect(result.HOME).toBe('/home/user');
expect(result.NODE_PATH).toBe('/usr/local/lib/node_modules');
expect(result.SHELL).toBe('/bin/zsh');
expect(result.USER).toBe('developer');
expect(result.LANG).toBe('en_US.UTF-8');
});
it('returns a new object and does not mutate the original', () => {
const original: NodeJS.ProcessEnv = {
PATH: '/usr/bin',
CLAUDECODE_FOO: 'bar',
KEEP: 'yes'
};
const originalCopy = { ...original };
const result = sanitizeEnv(original);
// Result should be a different object
expect(result).not.toBe(original);
// Original should be unchanged
expect(original).toEqual(originalCopy);
// Result should not contain stripped vars
expect(result.CLAUDECODE_FOO).toBeUndefined();
expect(result.PATH).toBe('/usr/bin');
});
it('handles empty env gracefully', () => {
const result = sanitizeEnv({});
expect(result).toEqual({});
});
it('skips entries with undefined values', () => {
const env: NodeJS.ProcessEnv = {
DEFINED: 'value',
UNDEFINED_KEY: undefined
};
const result = sanitizeEnv(env);
expect(result.DEFINED).toBe('value');
expect('UNDEFINED_KEY' in result).toBe(false);
});
it('combines prefix and exact match removal in a single pass', () => {
const result = sanitizeEnv({
PATH: '/usr/bin',
CLAUDECODE: '1',
CLAUDECODE_FOO: 'bar',
CLAUDE_CODE_BAR: 'baz',
CLAUDE_CODE_OAUTH_TOKEN: 'oauth-token',
CLAUDE_CODE_SESSION: 'session',
CLAUDE_CODE_ENTRYPOINT: 'entry',
MCP_SESSION_ID: 'mcp',
KEEP_ME: 'yes'
});
expect(result.PATH).toBe('/usr/bin');
expect(result.KEEP_ME).toBe('yes');
expect(result.CLAUDECODE).toBeUndefined();
expect(result.CLAUDECODE_FOO).toBeUndefined();
expect(result.CLAUDE_CODE_BAR).toBeUndefined();
expect(result.CLAUDE_CODE_OAUTH_TOKEN).toBeUndefined();
expect(result.CLAUDE_CODE_SESSION).toBeUndefined();
expect(result.CLAUDE_CODE_ENTRYPOINT).toBeUndefined();
expect(result.MCP_SESSION_ID).toBeUndefined();
});
});

View File

@@ -0,0 +1,73 @@
import { afterEach, describe, expect, it, mock } from 'bun:test';
import { startHealthChecker, stopHealthChecker } from '../../src/supervisor/health-checker.js';
describe('health-checker', () => {
afterEach(() => {
// Always stop the checker to avoid leaking intervals between tests
stopHealthChecker();
});
it('startHealthChecker sets up an interval without throwing', () => {
expect(() => startHealthChecker()).not.toThrow();
});
it('stopHealthChecker clears the interval without throwing', () => {
startHealthChecker();
expect(() => stopHealthChecker()).not.toThrow();
});
it('stopHealthChecker is safe to call when no checker is running', () => {
expect(() => stopHealthChecker()).not.toThrow();
});
it('multiple startHealthChecker calls do not create multiple intervals', () => {
// Track setInterval calls
const originalSetInterval = globalThis.setInterval;
let setIntervalCallCount = 0;
globalThis.setInterval = ((...args: Parameters<typeof setInterval>) => {
setIntervalCallCount++;
return originalSetInterval(...args);
}) as typeof setInterval;
try {
// Stop any existing checker first to ensure clean state
stopHealthChecker();
setIntervalCallCount = 0;
startHealthChecker();
startHealthChecker();
startHealthChecker();
// Only one interval should have been created due to the guard
expect(setIntervalCallCount).toBe(1);
} finally {
globalThis.setInterval = originalSetInterval;
}
});
it('stopHealthChecker after start allows restarting', () => {
const originalSetInterval = globalThis.setInterval;
let setIntervalCallCount = 0;
globalThis.setInterval = ((...args: Parameters<typeof setInterval>) => {
setIntervalCallCount++;
return originalSetInterval(...args);
}) as typeof setInterval;
try {
stopHealthChecker();
setIntervalCallCount = 0;
startHealthChecker();
expect(setIntervalCallCount).toBe(1);
stopHealthChecker();
startHealthChecker();
expect(setIntervalCallCount).toBe(2);
} finally {
globalThis.setInterval = originalSetInterval;
}
});
});

View File

@@ -0,0 +1,111 @@
import { afterEach, describe, expect, it } from 'bun:test';
import { mkdirSync, rmSync, writeFileSync } from 'fs';
import { tmpdir } from 'os';
import path from 'path';
import { validateWorkerPidFile, type ValidateWorkerPidStatus } from '../../src/supervisor/index.js';
function makeTempDir(): string {
const dir = path.join(tmpdir(), `claude-mem-index-${Date.now()}-${Math.random().toString(36).slice(2)}`);
mkdirSync(dir, { recursive: true });
return dir;
}
const tempDirs: string[] = [];
describe('validateWorkerPidFile', () => {
afterEach(() => {
while (tempDirs.length > 0) {
const dir = tempDirs.pop();
if (dir) {
rmSync(dir, { recursive: true, force: true });
}
}
});
it('returns "missing" when PID file does not exist', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const pidFilePath = path.join(tempDir, 'worker.pid');
const status = validateWorkerPidFile({ logAlive: false, pidFilePath });
expect(status).toBe('missing');
});
it('returns "invalid" when PID file contains bad JSON', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const pidFilePath = path.join(tempDir, 'worker.pid');
writeFileSync(pidFilePath, 'not-json!!!');
const status = validateWorkerPidFile({ logAlive: false, pidFilePath });
expect(status).toBe('invalid');
});
it('returns "stale" when PID file references a dead process', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const pidFilePath = path.join(tempDir, 'worker.pid');
writeFileSync(pidFilePath, JSON.stringify({
pid: 2147483647,
port: 37777,
startedAt: new Date().toISOString()
}));
const status = validateWorkerPidFile({ logAlive: false, pidFilePath });
expect(status).toBe('stale');
});
it('returns "alive" when PID file references the current process', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const pidFilePath = path.join(tempDir, 'worker.pid');
writeFileSync(pidFilePath, JSON.stringify({
pid: process.pid,
port: 37777,
startedAt: new Date().toISOString()
}));
const status = validateWorkerPidFile({ logAlive: false, pidFilePath });
expect(status).toBe('alive');
});
});
describe('Supervisor assertCanSpawn behavior', () => {
it('assertCanSpawn throws when stopPromise is active (shutdown in progress)', () => {
const { getSupervisor } = require('../../src/supervisor/index.js');
const supervisor = getSupervisor();
// When not shutting down, assertCanSpawn should not throw
expect(() => supervisor.assertCanSpawn('test')).not.toThrow();
});
it('registerProcess and unregisterProcess delegate to the registry', () => {
const { getSupervisor } = require('../../src/supervisor/index.js');
const supervisor = getSupervisor();
const registry = supervisor.getRegistry();
const testId = `test-${Date.now()}`;
supervisor.registerProcess(testId, {
pid: process.pid,
type: 'test',
startedAt: new Date().toISOString()
});
const found = registry.getAll().find((r: { id: string }) => r.id === testId);
expect(found).toBeDefined();
expect(found?.type).toBe('test');
supervisor.unregisterProcess(testId);
const afterUnregister = registry.getAll().find((r: { id: string }) => r.id === testId);
expect(afterUnregister).toBeUndefined();
});
});
describe('Supervisor start idempotency', () => {
it('getSupervisor returns the same instance', () => {
const { getSupervisor } = require('../../src/supervisor/index.js');
const s1 = getSupervisor();
const s2 = getSupervisor();
expect(s1).toBe(s2);
});
});

View File

@@ -0,0 +1,423 @@
import { afterEach, describe, expect, it } from 'bun:test';
import { existsSync, mkdirSync, readFileSync, rmSync, writeFileSync } from 'fs';
import { tmpdir } from 'os';
import path from 'path';
import { createProcessRegistry, isPidAlive } from '../../src/supervisor/process-registry.js';
function makeTempDir(): string {
return path.join(tmpdir(), `claude-mem-supervisor-${Date.now()}-${Math.random().toString(36).slice(2)}`);
}
const tempDirs: string[] = [];
describe('supervisor ProcessRegistry', () => {
afterEach(() => {
while (tempDirs.length > 0) {
const dir = tempDirs.pop();
if (dir) {
rmSync(dir, { recursive: true, force: true });
}
}
});
describe('isPidAlive', () => {
it('treats current process as alive', () => {
expect(isPidAlive(process.pid)).toBe(true);
});
it('treats an impossibly high PID as dead', () => {
expect(isPidAlive(2147483647)).toBe(false);
});
it('treats negative PID as dead', () => {
expect(isPidAlive(-1)).toBe(false);
});
it('treats non-integer PID as dead', () => {
expect(isPidAlive(3.14)).toBe(false);
});
});
describe('persistence', () => {
it('persists entries to disk and reloads them on initialize', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
mkdirSync(tempDir, { recursive: true });
const registryPath = path.join(tempDir, 'supervisor.json');
// Create a registry, register an entry, and let it persist
const registry1 = createProcessRegistry(registryPath);
registry1.register('worker:1', {
pid: process.pid,
type: 'worker',
startedAt: '2026-03-15T00:00:00.000Z'
});
// Verify file exists on disk
expect(existsSync(registryPath)).toBe(true);
const diskData = JSON.parse(readFileSync(registryPath, 'utf-8'));
expect(diskData.processes['worker:1']).toBeDefined();
// Create a second registry from the same path — it should load the persisted entry
const registry2 = createProcessRegistry(registryPath);
registry2.initialize();
const records = registry2.getAll();
expect(records).toHaveLength(1);
expect(records[0]?.id).toBe('worker:1');
expect(records[0]?.pid).toBe(process.pid);
});
it('prunes dead processes on initialize', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
mkdirSync(tempDir, { recursive: true });
const registryPath = path.join(tempDir, 'supervisor.json');
writeFileSync(registryPath, JSON.stringify({
processes: {
alive: {
pid: process.pid,
type: 'worker',
startedAt: '2026-03-15T00:00:00.000Z'
},
dead: {
pid: 2147483647,
type: 'mcp',
startedAt: '2026-03-15T00:00:01.000Z'
}
}
}));
const registry = createProcessRegistry(registryPath);
registry.initialize();
const records = registry.getAll();
expect(records).toHaveLength(1);
expect(records[0]?.id).toBe('alive');
expect(existsSync(registryPath)).toBe(true);
});
it('handles corrupted registry file gracefully', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
mkdirSync(tempDir, { recursive: true });
const registryPath = path.join(tempDir, 'supervisor.json');
writeFileSync(registryPath, '{ not valid json!!!');
const registry = createProcessRegistry(registryPath);
registry.initialize();
// Should recover with an empty registry
expect(registry.getAll()).toHaveLength(0);
});
});
describe('register and unregister', () => {
it('register adds an entry retrievable by getAll', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registry = createProcessRegistry(path.join(tempDir, 'supervisor.json'));
expect(registry.getAll()).toHaveLength(0);
registry.register('sdk:1', {
pid: process.pid,
type: 'sdk',
startedAt: '2026-03-15T00:00:00.000Z'
});
const records = registry.getAll();
expect(records).toHaveLength(1);
expect(records[0]?.id).toBe('sdk:1');
expect(records[0]?.type).toBe('sdk');
});
it('unregister removes an entry', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registry = createProcessRegistry(path.join(tempDir, 'supervisor.json'));
registry.register('sdk:1', {
pid: process.pid,
type: 'sdk',
startedAt: '2026-03-15T00:00:00.000Z'
});
expect(registry.getAll()).toHaveLength(1);
registry.unregister('sdk:1');
expect(registry.getAll()).toHaveLength(0);
});
it('unregister is a no-op for unknown IDs', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registry = createProcessRegistry(path.join(tempDir, 'supervisor.json'));
registry.register('sdk:1', {
pid: process.pid,
type: 'sdk',
startedAt: '2026-03-15T00:00:00.000Z'
});
registry.unregister('nonexistent');
expect(registry.getAll()).toHaveLength(1);
});
});
describe('getAll', () => {
it('returns records sorted by startedAt ascending', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registry = createProcessRegistry(path.join(tempDir, 'supervisor.json'));
registry.register('newest', {
pid: process.pid,
type: 'sdk',
startedAt: '2026-03-15T00:00:02.000Z'
});
registry.register('oldest', {
pid: process.pid,
type: 'worker',
startedAt: '2026-03-15T00:00:00.000Z'
});
registry.register('middle', {
pid: process.pid,
type: 'mcp',
startedAt: '2026-03-15T00:00:01.000Z'
});
const records = registry.getAll();
expect(records).toHaveLength(3);
expect(records[0]?.id).toBe('oldest');
expect(records[1]?.id).toBe('middle');
expect(records[2]?.id).toBe('newest');
});
it('returns empty array when no entries exist', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registry = createProcessRegistry(path.join(tempDir, 'supervisor.json'));
expect(registry.getAll()).toEqual([]);
});
});
describe('getBySession', () => {
it('filters records by session id', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registry = createProcessRegistry(path.join(tempDir, 'supervisor.json'));
registry.register('sdk:1', {
pid: process.pid,
type: 'sdk',
sessionId: 42,
startedAt: '2026-03-15T00:00:00.000Z'
});
registry.register('sdk:2', {
pid: process.pid,
type: 'sdk',
sessionId: 'other',
startedAt: '2026-03-15T00:00:01.000Z'
});
const records = registry.getBySession(42);
expect(records).toHaveLength(1);
expect(records[0]?.id).toBe('sdk:1');
});
it('returns empty array when no processes match the session', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registry = createProcessRegistry(path.join(tempDir, 'supervisor.json'));
registry.register('sdk:1', {
pid: process.pid,
type: 'sdk',
sessionId: 42,
startedAt: '2026-03-15T00:00:00.000Z'
});
expect(registry.getBySession(999)).toHaveLength(0);
});
it('matches string and numeric session IDs by string comparison', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registry = createProcessRegistry(path.join(tempDir, 'supervisor.json'));
registry.register('sdk:1', {
pid: process.pid,
type: 'sdk',
sessionId: '42',
startedAt: '2026-03-15T00:00:00.000Z'
});
// Querying with number should find string "42"
expect(registry.getBySession(42)).toHaveLength(1);
});
});
describe('pruneDeadEntries', () => {
it('removes entries with dead PIDs and preserves live ones', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registryPath = path.join(tempDir, 'supervisor.json');
const registry = createProcessRegistry(registryPath);
registry.register('alive', {
pid: process.pid,
type: 'worker',
startedAt: '2026-03-15T00:00:00.000Z'
});
registry.register('dead', {
pid: 2147483647,
type: 'mcp',
startedAt: '2026-03-15T00:00:01.000Z'
});
const removed = registry.pruneDeadEntries();
expect(removed).toBe(1);
expect(registry.getAll()).toHaveLength(1);
expect(registry.getAll()[0]?.id).toBe('alive');
});
it('returns 0 when all entries are alive', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registry = createProcessRegistry(path.join(tempDir, 'supervisor.json'));
registry.register('alive', {
pid: process.pid,
type: 'worker',
startedAt: '2026-03-15T00:00:00.000Z'
});
const removed = registry.pruneDeadEntries();
expect(removed).toBe(0);
expect(registry.getAll()).toHaveLength(1);
});
it('persists changes to disk after pruning', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registryPath = path.join(tempDir, 'supervisor.json');
const registry = createProcessRegistry(registryPath);
registry.register('dead', {
pid: 2147483647,
type: 'mcp',
startedAt: '2026-03-15T00:00:01.000Z'
});
registry.pruneDeadEntries();
const diskData = JSON.parse(readFileSync(registryPath, 'utf-8'));
expect(Object.keys(diskData.processes)).toHaveLength(0);
});
});
describe('clear', () => {
it('removes all entries', () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registryPath = path.join(tempDir, 'supervisor.json');
const registry = createProcessRegistry(registryPath);
registry.register('sdk:1', {
pid: process.pid,
type: 'sdk',
startedAt: '2026-03-15T00:00:00.000Z'
});
registry.register('sdk:2', {
pid: process.pid,
type: 'sdk',
startedAt: '2026-03-15T00:00:01.000Z'
});
expect(registry.getAll()).toHaveLength(2);
registry.clear();
expect(registry.getAll()).toHaveLength(0);
// Verify persisted to disk
const diskData = JSON.parse(readFileSync(registryPath, 'utf-8'));
expect(Object.keys(diskData.processes)).toHaveLength(0);
});
});
describe('createProcessRegistry', () => {
it('creates an isolated instance with a custom path', () => {
const tempDir1 = makeTempDir();
const tempDir2 = makeTempDir();
tempDirs.push(tempDir1, tempDir2);
const registry1 = createProcessRegistry(path.join(tempDir1, 'supervisor.json'));
const registry2 = createProcessRegistry(path.join(tempDir2, 'supervisor.json'));
registry1.register('sdk:1', {
pid: process.pid,
type: 'sdk',
startedAt: '2026-03-15T00:00:00.000Z'
});
// registry2 should be independent
expect(registry1.getAll()).toHaveLength(1);
expect(registry2.getAll()).toHaveLength(0);
});
});
describe('reapSession', () => {
it('unregisters dead processes for the given session', async () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registry = createProcessRegistry(path.join(tempDir, 'supervisor.json'));
registry.register('sdk:99:50001', {
pid: 2147483640,
type: 'sdk',
sessionId: 99,
startedAt: '2026-03-15T00:00:00.000Z'
});
registry.register('mcp:99:50002', {
pid: 2147483641,
type: 'mcp',
sessionId: 99,
startedAt: '2026-03-15T00:00:01.000Z'
});
// Register a process for a different session (should survive)
registry.register('sdk:100:50003', {
pid: process.pid,
type: 'sdk',
sessionId: 100,
startedAt: '2026-03-15T00:00:02.000Z'
});
const reaped = await registry.reapSession(99);
expect(reaped).toBe(2);
expect(registry.getBySession(99)).toHaveLength(0);
expect(registry.getBySession(100)).toHaveLength(1);
});
it('returns 0 when no processes match the session', async () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
const registry = createProcessRegistry(path.join(tempDir, 'supervisor.json'));
registry.register('sdk:1', {
pid: process.pid,
type: 'sdk',
sessionId: 42,
startedAt: '2026-03-15T00:00:00.000Z'
});
const reaped = await registry.reapSession(999);
expect(reaped).toBe(0);
expect(registry.getAll()).toHaveLength(1);
});
});
});

View File

@@ -0,0 +1,186 @@
import { afterEach, describe, expect, it } from 'bun:test';
import { mkdirSync, readFileSync, rmSync, writeFileSync } from 'fs';
import { tmpdir } from 'os';
import path from 'path';
import { createProcessRegistry } from '../../src/supervisor/process-registry.js';
import { runShutdownCascade } from '../../src/supervisor/shutdown.js';
function makeTempDir(): string {
return path.join(tmpdir(), `claude-mem-shutdown-${Date.now()}-${Math.random().toString(36).slice(2)}`);
}
const tempDirs: string[] = [];
describe('supervisor shutdown cascade', () => {
afterEach(() => {
while (tempDirs.length > 0) {
const dir = tempDirs.pop();
if (dir) {
rmSync(dir, { recursive: true, force: true });
}
}
});
it('removes child records and pid file', async () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
mkdirSync(tempDir, { recursive: true });
const registryPath = path.join(tempDir, 'supervisor.json');
const pidFilePath = path.join(tempDir, 'worker.pid');
writeFileSync(pidFilePath, JSON.stringify({
pid: process.pid,
port: 37777,
startedAt: new Date().toISOString()
}));
const registry = createProcessRegistry(registryPath);
registry.register('worker', {
pid: process.pid,
type: 'worker',
startedAt: '2026-03-15T00:00:00.000Z'
});
registry.register('dead-child', {
pid: 2147483647,
type: 'mcp',
startedAt: '2026-03-15T00:00:01.000Z'
});
await runShutdownCascade({
registry,
currentPid: process.pid,
pidFilePath
});
const persisted = JSON.parse(readFileSync(registryPath, 'utf-8'));
expect(Object.keys(persisted.processes)).toHaveLength(0);
expect(() => readFileSync(pidFilePath, 'utf-8')).toThrow();
});
it('terminates tracked children in reverse spawn order', async () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
mkdirSync(tempDir, { recursive: true });
const registry = createProcessRegistry(path.join(tempDir, 'supervisor.json'));
registry.register('oldest', {
pid: 41001,
type: 'sdk',
startedAt: '2026-03-15T00:00:00.000Z'
});
registry.register('middle', {
pid: 41002,
type: 'mcp',
startedAt: '2026-03-15T00:00:01.000Z'
});
registry.register('newest', {
pid: 41003,
type: 'chroma',
startedAt: '2026-03-15T00:00:02.000Z'
});
const originalKill = process.kill;
const alive = new Set([41001, 41002, 41003]);
const calls: Array<{ pid: number; signal: NodeJS.Signals | number }> = [];
process.kill = ((pid: number, signal?: NodeJS.Signals | number) => {
const normalizedSignal = signal ?? 'SIGTERM';
if (normalizedSignal === 0) {
if (!alive.has(pid)) {
const error = new Error(`kill ESRCH ${pid}`) as NodeJS.ErrnoException;
error.code = 'ESRCH';
throw error;
}
return true;
}
calls.push({ pid, signal: normalizedSignal });
alive.delete(pid);
return true;
}) as typeof process.kill;
try {
await runShutdownCascade({
registry,
currentPid: process.pid,
pidFilePath: path.join(tempDir, 'worker.pid')
});
} finally {
process.kill = originalKill;
}
expect(calls).toEqual([
{ pid: 41003, signal: 'SIGTERM' },
{ pid: 41002, signal: 'SIGTERM' },
{ pid: 41001, signal: 'SIGTERM' }
]);
});
it('handles already-dead processes gracefully without throwing', async () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
mkdirSync(tempDir, { recursive: true });
const registryPath = path.join(tempDir, 'supervisor.json');
const registry = createProcessRegistry(registryPath);
// Register processes with PIDs that are definitely dead
registry.register('dead:1', {
pid: 2147483640,
type: 'sdk',
startedAt: '2026-03-15T00:00:00.000Z'
});
registry.register('dead:2', {
pid: 2147483641,
type: 'mcp',
startedAt: '2026-03-15T00:00:01.000Z'
});
// Should not throw
await runShutdownCascade({
registry,
currentPid: process.pid,
pidFilePath: path.join(tempDir, 'worker.pid')
});
// All entries should be unregistered
const persisted = JSON.parse(readFileSync(registryPath, 'utf-8'));
expect(Object.keys(persisted.processes)).toHaveLength(0);
});
it('unregisters all children from registry after cascade', async () => {
const tempDir = makeTempDir();
tempDirs.push(tempDir);
mkdirSync(tempDir, { recursive: true });
const registryPath = path.join(tempDir, 'supervisor.json');
const registry = createProcessRegistry(registryPath);
registry.register('worker', {
pid: process.pid,
type: 'worker',
startedAt: '2026-03-15T00:00:00.000Z'
});
registry.register('child:1', {
pid: 2147483640,
type: 'sdk',
startedAt: '2026-03-15T00:00:01.000Z'
});
registry.register('child:2', {
pid: 2147483641,
type: 'mcp',
startedAt: '2026-03-15T00:00:02.000Z'
});
await runShutdownCascade({
registry,
currentPid: process.pid,
pidFilePath: path.join(tempDir, 'worker.pid')
});
// All records (including the current process one) should be removed
expect(registry.getAll()).toHaveLength(0);
});
});

View File

@@ -14,6 +14,24 @@ mock.module('../../src/utils/logger.js', () => ({
},
}));
// Mock worker-utils to delegate workerHttpRequest to global.fetch
mock.module('../../src/shared/worker-utils.js', () => ({
getWorkerPort: () => 37777,
getWorkerHost: () => '127.0.0.1',
workerHttpRequest: (apiPath: string, options?: any) => {
const url = `http://127.0.0.1:37777${apiPath}`;
return globalThis.fetch(url, {
method: options?.method ?? 'GET',
headers: options?.headers,
body: options?.body,
});
},
clearPortCache: () => {},
ensureWorkerRunning: () => Promise.resolve(true),
fetchWithTimeout: (url: string, init: any, timeoutMs: number) => globalThis.fetch(url, init),
buildWorkerUrl: (apiPath: string) => `http://127.0.0.1:37777${apiPath}`,
}));
// Import after mocks
import {
replaceTaggedContent,