diff --git a/packages/adapter-utils/src/server-utils.test.ts b/packages/adapter-utils/src/server-utils.test.ts index f49dfd4592..f3f254576e 100644 --- a/packages/adapter-utils/src/server-utils.test.ts +++ b/packages/adapter-utils/src/server-utils.test.ts @@ -4,6 +4,7 @@ import { appendWithByteCap, DEFAULT_PAPERCLIP_AGENT_PROMPT_TEMPLATE, renderPaperclipWakePrompt, + runningProcesses, runChildProcess, stringifyPaperclipWakePayload, } from "./server-utils.js"; @@ -26,6 +27,17 @@ async function waitForPidExit(pid: number, timeoutMs = 2_000) { return !isPidAlive(pid); } +async function waitForTextMatch(read: () => string, pattern: RegExp, timeoutMs = 1_000) { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const value = read(); + const match = value.match(pattern); + if (match) return match; + await new Promise((resolve) => setTimeout(resolve, 25)); + } + return read().match(pattern); +} + describe("runChildProcess", () => { it("does not arm a timeout when timeoutSec is 0", async () => { const result = await runChildProcess( @@ -110,6 +122,103 @@ describe("runChildProcess", () => { expect(await waitForPidExit(descendantPid!, 2_000)).toBe(true); }); + + it.skipIf(process.platform === "win32")("cleans up a lingering process group after terminal output and child exit", async () => { + const result = await runChildProcess( + randomUUID(), + process.execPath, + [ + "-e", + [ + "const { spawn } = require('node:child_process');", + "const child = spawn(process.execPath, ['-e', 'setInterval(() => {}, 1000)'], { stdio: ['ignore', 'inherit', 'ignore'] });", + "process.stdout.write(`descendant:${child.pid}\\n`);", + "process.stdout.write(`${JSON.stringify({ type: 'result', result: 'done' })}\\n`);", + "setTimeout(() => process.exit(0), 25);", + ].join(" "), + ], + { + cwd: process.cwd(), + env: {}, + timeoutSec: 0, + graceSec: 1, + onLog: async () => {}, + terminalResultCleanup: { + graceMs: 100, + hasTerminalResult: ({ stdout }) => stdout.includes('"type":"result"'), + }, + }, + ); + + const descendantPid = Number.parseInt(result.stdout.match(/descendant:(\d+)/)?.[1] ?? "", 10); + expect(result.timedOut).toBe(false); + expect(result.exitCode).toBe(0); + expect(Number.isInteger(descendantPid) && descendantPid > 0).toBe(true); + expect(await waitForPidExit(descendantPid, 2_000)).toBe(true); + }); + + it.skipIf(process.platform === "win32")("does not clean up noisy runs that have no terminal output", async () => { + const runId = randomUUID(); + let observed = ""; + const resultPromise = runChildProcess( + runId, + process.execPath, + [ + "-e", + [ + "const { spawn } = require('node:child_process');", + "const child = spawn(process.execPath, ['-e', \"setInterval(() => process.stdout.write('noise\\\\n'), 50)\"], { stdio: ['ignore', 'inherit', 'ignore'] });", + "process.stdout.write(`descendant:${child.pid}\\n`);", + "setTimeout(() => process.exit(0), 25);", + ].join(" "), + ], + { + cwd: process.cwd(), + env: {}, + timeoutSec: 0, + graceSec: 1, + onLog: async (_stream, chunk) => { + observed += chunk; + }, + terminalResultCleanup: { + graceMs: 50, + hasTerminalResult: ({ stdout }) => stdout.includes('"type":"result"'), + }, + }, + ); + + const pidMatch = await waitForTextMatch(() => observed, /descendant:(\d+)/); + const descendantPid = Number.parseInt(pidMatch?.[1] ?? "", 10); + expect(Number.isInteger(descendantPid) && descendantPid > 0).toBe(true); + + const race = await Promise.race([ + resultPromise.then(() => "settled" as const), + new Promise<"pending">((resolve) => setTimeout(() => resolve("pending"), 300)), + ]); + expect(race).toBe("pending"); + expect(isPidAlive(descendantPid)).toBe(true); + + const running = runningProcesses.get(runId) as + | { child: { kill(signal: NodeJS.Signals): boolean }; processGroupId: number | null } + | undefined; + try { + if (running?.processGroupId) { + process.kill(-running.processGroupId, "SIGKILL"); + } else { + running?.child.kill("SIGKILL"); + } + await resultPromise; + } finally { + runningProcesses.delete(runId); + if (isPidAlive(descendantPid)) { + try { + process.kill(descendantPid, "SIGKILL"); + } catch { + // Ignore cleanup races. + } + } + } + }); }); describe("appendWithByteCap", () => { diff --git a/packages/adapter-utils/src/server-utils.ts b/packages/adapter-utils/src/server-utils.ts index 2eec1892ef..34eb08f348 100644 --- a/packages/adapter-utils/src/server-utils.ts +++ b/packages/adapter-utils/src/server-utils.ts @@ -16,6 +16,11 @@ export interface RunProcessResult { startedAt: string | null; } +export interface TerminalResultCleanupOptions { + hasTerminalResult: (output: { stdout: string; stderr: string }) => boolean; + graceMs?: number; +} + interface RunningProcess { child: ChildProcess; graceSec: number; @@ -29,6 +34,10 @@ interface SpawnTarget { type ChildProcessWithEvents = ChildProcess & { on(event: "error", listener: (err: Error) => void): ChildProcess; + on( + event: "exit", + listener: (code: number | null, signal: NodeJS.Signals | null) => void, + ): ChildProcess; on( event: "close", listener: (code: number | null, signal: NodeJS.Signals | null) => void, @@ -60,6 +69,7 @@ function signalRunningProcess( export const runningProcesses = new Map(); export const MAX_CAPTURE_BYTES = 4 * 1024 * 1024; export const MAX_EXCERPT_BYTES = 32 * 1024; +const TERMINAL_RESULT_SCAN_OVERLAP_CHARS = 64 * 1024; const SENSITIVE_ENV_KEY = /(key|token|secret|password|passwd|authorization|cookie)/i; const PAPERCLIP_SKILL_ROOT_RELATIVE_CANDIDATES = [ "../../skills", @@ -1237,6 +1247,7 @@ export async function runChildProcess( onLog: (stream: "stdout" | "stderr", chunk: string) => Promise; onLogError?: (err: unknown, runId: string, message: string) => void; onSpawn?: (meta: { pid: number; processGroupId: number | null; startedAt: string }) => Promise; + terminalResultCleanup?: TerminalResultCleanupOptions; stdin?: string; }, ): Promise { @@ -1286,11 +1297,61 @@ export async function runChildProcess( let stdout = ""; let stderr = ""; let logChain: Promise = Promise.resolve(); + let childExited = false; + let terminalResultSeen = false; + let terminalCleanupStarted = false; + let terminalCleanupTimer: NodeJS.Timeout | null = null; + let terminalCleanupKillTimer: NodeJS.Timeout | null = null; + let terminalResultStdoutScanOffset = 0; + let terminalResultStderrScanOffset = 0; + + const clearTerminalCleanupTimers = () => { + if (terminalCleanupTimer) clearTimeout(terminalCleanupTimer); + if (terminalCleanupKillTimer) clearTimeout(terminalCleanupKillTimer); + terminalCleanupTimer = null; + terminalCleanupKillTimer = null; + }; + + const maybeArmTerminalResultCleanup = () => { + const terminalCleanup = opts.terminalResultCleanup; + if (!terminalCleanup || terminalCleanupStarted || timedOut) return; + if (!terminalResultSeen) { + const stdoutStart = Math.max(0, terminalResultStdoutScanOffset - TERMINAL_RESULT_SCAN_OVERLAP_CHARS); + const stderrStart = Math.max(0, terminalResultStderrScanOffset - TERMINAL_RESULT_SCAN_OVERLAP_CHARS); + const scanOutput = { + stdout: stdout.slice(stdoutStart), + stderr: stderr.slice(stderrStart), + }; + terminalResultStdoutScanOffset = stdout.length; + terminalResultStderrScanOffset = stderr.length; + if (scanOutput.stdout.length === 0 && scanOutput.stderr.length === 0) return; + try { + terminalResultSeen = terminalCleanup.hasTerminalResult(scanOutput); + } catch (err) { + onLogError(err, runId, "failed to inspect terminal adapter output"); + } + } + if (!terminalResultSeen || !childExited) return; + + if (terminalCleanupTimer) return; + const graceMs = Math.max(0, terminalCleanup.graceMs ?? 5_000); + terminalCleanupTimer = setTimeout(() => { + terminalCleanupTimer = null; + if (terminalCleanupStarted || timedOut) return; + terminalCleanupStarted = true; + signalRunningProcess({ child, processGroupId }, "SIGTERM"); + terminalCleanupKillTimer = setTimeout(() => { + terminalCleanupKillTimer = null; + signalRunningProcess({ child, processGroupId }, "SIGKILL"); + }, Math.max(1, opts.graceSec) * 1000); + }, graceMs); + }; const timeout = opts.timeoutSec > 0 ? setTimeout(() => { timedOut = true; + clearTerminalCleanupTimers(); signalRunningProcess({ child, processGroupId }, "SIGTERM"); setTimeout(() => { signalRunningProcess({ child, processGroupId }, "SIGKILL"); @@ -1304,10 +1365,14 @@ export async function runChildProcess( readable.pause(); const text = String(chunk); stdout = appendWithCap(stdout, text); + maybeArmTerminalResultCleanup(); logChain = logChain .then(() => opts.onLog("stdout", text)) .catch((err) => onLogError(err, runId, "failed to append stdout log chunk")) - .finally(() => resumeReadable(readable)); + .finally(() => { + maybeArmTerminalResultCleanup(); + resumeReadable(readable); + }); }); child.stderr?.on("data", (chunk: unknown) => { @@ -1316,10 +1381,14 @@ export async function runChildProcess( readable.pause(); const text = String(chunk); stderr = appendWithCap(stderr, text); + maybeArmTerminalResultCleanup(); logChain = logChain .then(() => opts.onLog("stderr", text)) .catch((err) => onLogError(err, runId, "failed to append stderr log chunk")) - .finally(() => resumeReadable(readable)); + .finally(() => { + maybeArmTerminalResultCleanup(); + resumeReadable(readable); + }); }); const stdin = child.stdin; @@ -1333,6 +1402,7 @@ export async function runChildProcess( child.on("error", (err: Error) => { if (timeout) clearTimeout(timeout); + clearTerminalCleanupTimers(); runningProcesses.delete(runId); const errno = (err as NodeJS.ErrnoException).code; const pathValue = mergedEnv.PATH ?? mergedEnv.Path ?? ""; @@ -1343,8 +1413,14 @@ export async function runChildProcess( reject(new Error(msg)); }); + child.on("exit", () => { + childExited = true; + maybeArmTerminalResultCleanup(); + }); + child.on("close", (code: number | null, signal: NodeJS.Signals | null) => { if (timeout) clearTimeout(timeout); + clearTerminalCleanupTimers(); runningProcesses.delete(runId); void logChain.finally(() => { resolve({ diff --git a/packages/adapters/claude-local/src/server/execute.ts b/packages/adapters/claude-local/src/server/execute.ts index 0633a3075f..79327fdb44 100644 --- a/packages/adapters/claude-local/src/server/execute.ts +++ b/packages/adapters/claude-local/src/server/execute.ts @@ -330,6 +330,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise typeof entry[1] === "string", @@ -503,6 +507,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise parseClaudeStreamJson(stdout).resultJson !== null, + }, }); const parsedStream = parseClaudeStreamJson(proc.stdout);