mirror of
https://github.com/paperclipai/paperclip
synced 2026-04-25 17:25:15 +02:00
[codex] Clean up terminal-result adapter process groups (#4129)
## Thinking Path > - Paperclip runs local adapter processes for agents and streams their output into heartbeat runs > - Some adapters can emit a terminal result before all descendant processes have exited > - If those descendants keep running, a heartbeat can appear complete while the process group remains alive > - Claude local runs need a bounded cleanup path after terminal JSON output is observed and the child exits > - This pull request adds terminal-result cleanup support to adapter process utilities and wires it into the Claude local adapter > - The benefit is fewer stranded adapter process groups after successful terminal results ## What Changed - Added terminal-result cleanup options to `runChildProcess`. - Tracked child exit plus terminal output before signaling lingering process groups. - Added Claude local adapter configuration for terminal result cleanup grace time. - Added process cleanup tests covering terminal-output cleanup and noisy non-terminal runs. ## Verification - `pnpm install --frozen-lockfile --ignore-scripts` - `pnpm exec vitest run packages/adapter-utils/src/server-utils.test.ts` - Result: 9 tests passed. ## Risks - Medium risk: this changes adapter child-process cleanup behavior. - The cleanup only arms after terminal result detection and child exit, and it is covered by process-group tests. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI Codex coding agent based on GPT-5, tool-enabled local shell and GitHub workflow, exact runtime context window not exposed in this session. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots, or documented why it is not applicable - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --------- Co-authored-by: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -4,6 +4,7 @@ import {
|
|||||||
appendWithByteCap,
|
appendWithByteCap,
|
||||||
DEFAULT_PAPERCLIP_AGENT_PROMPT_TEMPLATE,
|
DEFAULT_PAPERCLIP_AGENT_PROMPT_TEMPLATE,
|
||||||
renderPaperclipWakePrompt,
|
renderPaperclipWakePrompt,
|
||||||
|
runningProcesses,
|
||||||
runChildProcess,
|
runChildProcess,
|
||||||
stringifyPaperclipWakePayload,
|
stringifyPaperclipWakePayload,
|
||||||
} from "./server-utils.js";
|
} from "./server-utils.js";
|
||||||
@@ -26,6 +27,17 @@ async function waitForPidExit(pid: number, timeoutMs = 2_000) {
|
|||||||
return !isPidAlive(pid);
|
return !isPidAlive(pid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function waitForTextMatch(read: () => string, pattern: RegExp, timeoutMs = 1_000) {
|
||||||
|
const deadline = Date.now() + timeoutMs;
|
||||||
|
while (Date.now() < deadline) {
|
||||||
|
const value = read();
|
||||||
|
const match = value.match(pattern);
|
||||||
|
if (match) return match;
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 25));
|
||||||
|
}
|
||||||
|
return read().match(pattern);
|
||||||
|
}
|
||||||
|
|
||||||
describe("runChildProcess", () => {
|
describe("runChildProcess", () => {
|
||||||
it("does not arm a timeout when timeoutSec is 0", async () => {
|
it("does not arm a timeout when timeoutSec is 0", async () => {
|
||||||
const result = await runChildProcess(
|
const result = await runChildProcess(
|
||||||
@@ -110,6 +122,103 @@ describe("runChildProcess", () => {
|
|||||||
|
|
||||||
expect(await waitForPidExit(descendantPid!, 2_000)).toBe(true);
|
expect(await waitForPidExit(descendantPid!, 2_000)).toBe(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it.skipIf(process.platform === "win32")("cleans up a lingering process group after terminal output and child exit", async () => {
|
||||||
|
const result = await runChildProcess(
|
||||||
|
randomUUID(),
|
||||||
|
process.execPath,
|
||||||
|
[
|
||||||
|
"-e",
|
||||||
|
[
|
||||||
|
"const { spawn } = require('node:child_process');",
|
||||||
|
"const child = spawn(process.execPath, ['-e', 'setInterval(() => {}, 1000)'], { stdio: ['ignore', 'inherit', 'ignore'] });",
|
||||||
|
"process.stdout.write(`descendant:${child.pid}\\n`);",
|
||||||
|
"process.stdout.write(`${JSON.stringify({ type: 'result', result: 'done' })}\\n`);",
|
||||||
|
"setTimeout(() => process.exit(0), 25);",
|
||||||
|
].join(" "),
|
||||||
|
],
|
||||||
|
{
|
||||||
|
cwd: process.cwd(),
|
||||||
|
env: {},
|
||||||
|
timeoutSec: 0,
|
||||||
|
graceSec: 1,
|
||||||
|
onLog: async () => {},
|
||||||
|
terminalResultCleanup: {
|
||||||
|
graceMs: 100,
|
||||||
|
hasTerminalResult: ({ stdout }) => stdout.includes('"type":"result"'),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
const descendantPid = Number.parseInt(result.stdout.match(/descendant:(\d+)/)?.[1] ?? "", 10);
|
||||||
|
expect(result.timedOut).toBe(false);
|
||||||
|
expect(result.exitCode).toBe(0);
|
||||||
|
expect(Number.isInteger(descendantPid) && descendantPid > 0).toBe(true);
|
||||||
|
expect(await waitForPidExit(descendantPid, 2_000)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it.skipIf(process.platform === "win32")("does not clean up noisy runs that have no terminal output", async () => {
|
||||||
|
const runId = randomUUID();
|
||||||
|
let observed = "";
|
||||||
|
const resultPromise = runChildProcess(
|
||||||
|
runId,
|
||||||
|
process.execPath,
|
||||||
|
[
|
||||||
|
"-e",
|
||||||
|
[
|
||||||
|
"const { spawn } = require('node:child_process');",
|
||||||
|
"const child = spawn(process.execPath, ['-e', \"setInterval(() => process.stdout.write('noise\\\\n'), 50)\"], { stdio: ['ignore', 'inherit', 'ignore'] });",
|
||||||
|
"process.stdout.write(`descendant:${child.pid}\\n`);",
|
||||||
|
"setTimeout(() => process.exit(0), 25);",
|
||||||
|
].join(" "),
|
||||||
|
],
|
||||||
|
{
|
||||||
|
cwd: process.cwd(),
|
||||||
|
env: {},
|
||||||
|
timeoutSec: 0,
|
||||||
|
graceSec: 1,
|
||||||
|
onLog: async (_stream, chunk) => {
|
||||||
|
observed += chunk;
|
||||||
|
},
|
||||||
|
terminalResultCleanup: {
|
||||||
|
graceMs: 50,
|
||||||
|
hasTerminalResult: ({ stdout }) => stdout.includes('"type":"result"'),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
const pidMatch = await waitForTextMatch(() => observed, /descendant:(\d+)/);
|
||||||
|
const descendantPid = Number.parseInt(pidMatch?.[1] ?? "", 10);
|
||||||
|
expect(Number.isInteger(descendantPid) && descendantPid > 0).toBe(true);
|
||||||
|
|
||||||
|
const race = await Promise.race([
|
||||||
|
resultPromise.then(() => "settled" as const),
|
||||||
|
new Promise<"pending">((resolve) => setTimeout(() => resolve("pending"), 300)),
|
||||||
|
]);
|
||||||
|
expect(race).toBe("pending");
|
||||||
|
expect(isPidAlive(descendantPid)).toBe(true);
|
||||||
|
|
||||||
|
const running = runningProcesses.get(runId) as
|
||||||
|
| { child: { kill(signal: NodeJS.Signals): boolean }; processGroupId: number | null }
|
||||||
|
| undefined;
|
||||||
|
try {
|
||||||
|
if (running?.processGroupId) {
|
||||||
|
process.kill(-running.processGroupId, "SIGKILL");
|
||||||
|
} else {
|
||||||
|
running?.child.kill("SIGKILL");
|
||||||
|
}
|
||||||
|
await resultPromise;
|
||||||
|
} finally {
|
||||||
|
runningProcesses.delete(runId);
|
||||||
|
if (isPidAlive(descendantPid)) {
|
||||||
|
try {
|
||||||
|
process.kill(descendantPid, "SIGKILL");
|
||||||
|
} catch {
|
||||||
|
// Ignore cleanup races.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("appendWithByteCap", () => {
|
describe("appendWithByteCap", () => {
|
||||||
|
|||||||
@@ -16,6 +16,11 @@ export interface RunProcessResult {
|
|||||||
startedAt: string | null;
|
startedAt: string | null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface TerminalResultCleanupOptions {
|
||||||
|
hasTerminalResult: (output: { stdout: string; stderr: string }) => boolean;
|
||||||
|
graceMs?: number;
|
||||||
|
}
|
||||||
|
|
||||||
interface RunningProcess {
|
interface RunningProcess {
|
||||||
child: ChildProcess;
|
child: ChildProcess;
|
||||||
graceSec: number;
|
graceSec: number;
|
||||||
@@ -29,6 +34,10 @@ interface SpawnTarget {
|
|||||||
|
|
||||||
type ChildProcessWithEvents = ChildProcess & {
|
type ChildProcessWithEvents = ChildProcess & {
|
||||||
on(event: "error", listener: (err: Error) => void): ChildProcess;
|
on(event: "error", listener: (err: Error) => void): ChildProcess;
|
||||||
|
on(
|
||||||
|
event: "exit",
|
||||||
|
listener: (code: number | null, signal: NodeJS.Signals | null) => void,
|
||||||
|
): ChildProcess;
|
||||||
on(
|
on(
|
||||||
event: "close",
|
event: "close",
|
||||||
listener: (code: number | null, signal: NodeJS.Signals | null) => void,
|
listener: (code: number | null, signal: NodeJS.Signals | null) => void,
|
||||||
@@ -60,6 +69,7 @@ function signalRunningProcess(
|
|||||||
export const runningProcesses = new Map<string, RunningProcess>();
|
export const runningProcesses = new Map<string, RunningProcess>();
|
||||||
export const MAX_CAPTURE_BYTES = 4 * 1024 * 1024;
|
export const MAX_CAPTURE_BYTES = 4 * 1024 * 1024;
|
||||||
export const MAX_EXCERPT_BYTES = 32 * 1024;
|
export const MAX_EXCERPT_BYTES = 32 * 1024;
|
||||||
|
const TERMINAL_RESULT_SCAN_OVERLAP_CHARS = 64 * 1024;
|
||||||
const SENSITIVE_ENV_KEY = /(key|token|secret|password|passwd|authorization|cookie)/i;
|
const SENSITIVE_ENV_KEY = /(key|token|secret|password|passwd|authorization|cookie)/i;
|
||||||
const PAPERCLIP_SKILL_ROOT_RELATIVE_CANDIDATES = [
|
const PAPERCLIP_SKILL_ROOT_RELATIVE_CANDIDATES = [
|
||||||
"../../skills",
|
"../../skills",
|
||||||
@@ -1237,6 +1247,7 @@ export async function runChildProcess(
|
|||||||
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>;
|
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>;
|
||||||
onLogError?: (err: unknown, runId: string, message: string) => void;
|
onLogError?: (err: unknown, runId: string, message: string) => void;
|
||||||
onSpawn?: (meta: { pid: number; processGroupId: number | null; startedAt: string }) => Promise<void>;
|
onSpawn?: (meta: { pid: number; processGroupId: number | null; startedAt: string }) => Promise<void>;
|
||||||
|
terminalResultCleanup?: TerminalResultCleanupOptions;
|
||||||
stdin?: string;
|
stdin?: string;
|
||||||
},
|
},
|
||||||
): Promise<RunProcessResult> {
|
): Promise<RunProcessResult> {
|
||||||
@@ -1286,11 +1297,61 @@ export async function runChildProcess(
|
|||||||
let stdout = "";
|
let stdout = "";
|
||||||
let stderr = "";
|
let stderr = "";
|
||||||
let logChain: Promise<void> = Promise.resolve();
|
let logChain: Promise<void> = Promise.resolve();
|
||||||
|
let childExited = false;
|
||||||
|
let terminalResultSeen = false;
|
||||||
|
let terminalCleanupStarted = false;
|
||||||
|
let terminalCleanupTimer: NodeJS.Timeout | null = null;
|
||||||
|
let terminalCleanupKillTimer: NodeJS.Timeout | null = null;
|
||||||
|
let terminalResultStdoutScanOffset = 0;
|
||||||
|
let terminalResultStderrScanOffset = 0;
|
||||||
|
|
||||||
|
const clearTerminalCleanupTimers = () => {
|
||||||
|
if (terminalCleanupTimer) clearTimeout(terminalCleanupTimer);
|
||||||
|
if (terminalCleanupKillTimer) clearTimeout(terminalCleanupKillTimer);
|
||||||
|
terminalCleanupTimer = null;
|
||||||
|
terminalCleanupKillTimer = null;
|
||||||
|
};
|
||||||
|
|
||||||
|
const maybeArmTerminalResultCleanup = () => {
|
||||||
|
const terminalCleanup = opts.terminalResultCleanup;
|
||||||
|
if (!terminalCleanup || terminalCleanupStarted || timedOut) return;
|
||||||
|
if (!terminalResultSeen) {
|
||||||
|
const stdoutStart = Math.max(0, terminalResultStdoutScanOffset - TERMINAL_RESULT_SCAN_OVERLAP_CHARS);
|
||||||
|
const stderrStart = Math.max(0, terminalResultStderrScanOffset - TERMINAL_RESULT_SCAN_OVERLAP_CHARS);
|
||||||
|
const scanOutput = {
|
||||||
|
stdout: stdout.slice(stdoutStart),
|
||||||
|
stderr: stderr.slice(stderrStart),
|
||||||
|
};
|
||||||
|
terminalResultStdoutScanOffset = stdout.length;
|
||||||
|
terminalResultStderrScanOffset = stderr.length;
|
||||||
|
if (scanOutput.stdout.length === 0 && scanOutput.stderr.length === 0) return;
|
||||||
|
try {
|
||||||
|
terminalResultSeen = terminalCleanup.hasTerminalResult(scanOutput);
|
||||||
|
} catch (err) {
|
||||||
|
onLogError(err, runId, "failed to inspect terminal adapter output");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!terminalResultSeen || !childExited) return;
|
||||||
|
|
||||||
|
if (terminalCleanupTimer) return;
|
||||||
|
const graceMs = Math.max(0, terminalCleanup.graceMs ?? 5_000);
|
||||||
|
terminalCleanupTimer = setTimeout(() => {
|
||||||
|
terminalCleanupTimer = null;
|
||||||
|
if (terminalCleanupStarted || timedOut) return;
|
||||||
|
terminalCleanupStarted = true;
|
||||||
|
signalRunningProcess({ child, processGroupId }, "SIGTERM");
|
||||||
|
terminalCleanupKillTimer = setTimeout(() => {
|
||||||
|
terminalCleanupKillTimer = null;
|
||||||
|
signalRunningProcess({ child, processGroupId }, "SIGKILL");
|
||||||
|
}, Math.max(1, opts.graceSec) * 1000);
|
||||||
|
}, graceMs);
|
||||||
|
};
|
||||||
|
|
||||||
const timeout =
|
const timeout =
|
||||||
opts.timeoutSec > 0
|
opts.timeoutSec > 0
|
||||||
? setTimeout(() => {
|
? setTimeout(() => {
|
||||||
timedOut = true;
|
timedOut = true;
|
||||||
|
clearTerminalCleanupTimers();
|
||||||
signalRunningProcess({ child, processGroupId }, "SIGTERM");
|
signalRunningProcess({ child, processGroupId }, "SIGTERM");
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
signalRunningProcess({ child, processGroupId }, "SIGKILL");
|
signalRunningProcess({ child, processGroupId }, "SIGKILL");
|
||||||
@@ -1304,10 +1365,14 @@ export async function runChildProcess(
|
|||||||
readable.pause();
|
readable.pause();
|
||||||
const text = String(chunk);
|
const text = String(chunk);
|
||||||
stdout = appendWithCap(stdout, text);
|
stdout = appendWithCap(stdout, text);
|
||||||
|
maybeArmTerminalResultCleanup();
|
||||||
logChain = logChain
|
logChain = logChain
|
||||||
.then(() => opts.onLog("stdout", text))
|
.then(() => opts.onLog("stdout", text))
|
||||||
.catch((err) => onLogError(err, runId, "failed to append stdout log chunk"))
|
.catch((err) => onLogError(err, runId, "failed to append stdout log chunk"))
|
||||||
.finally(() => resumeReadable(readable));
|
.finally(() => {
|
||||||
|
maybeArmTerminalResultCleanup();
|
||||||
|
resumeReadable(readable);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
child.stderr?.on("data", (chunk: unknown) => {
|
child.stderr?.on("data", (chunk: unknown) => {
|
||||||
@@ -1316,10 +1381,14 @@ export async function runChildProcess(
|
|||||||
readable.pause();
|
readable.pause();
|
||||||
const text = String(chunk);
|
const text = String(chunk);
|
||||||
stderr = appendWithCap(stderr, text);
|
stderr = appendWithCap(stderr, text);
|
||||||
|
maybeArmTerminalResultCleanup();
|
||||||
logChain = logChain
|
logChain = logChain
|
||||||
.then(() => opts.onLog("stderr", text))
|
.then(() => opts.onLog("stderr", text))
|
||||||
.catch((err) => onLogError(err, runId, "failed to append stderr log chunk"))
|
.catch((err) => onLogError(err, runId, "failed to append stderr log chunk"))
|
||||||
.finally(() => resumeReadable(readable));
|
.finally(() => {
|
||||||
|
maybeArmTerminalResultCleanup();
|
||||||
|
resumeReadable(readable);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
const stdin = child.stdin;
|
const stdin = child.stdin;
|
||||||
@@ -1333,6 +1402,7 @@ export async function runChildProcess(
|
|||||||
|
|
||||||
child.on("error", (err: Error) => {
|
child.on("error", (err: Error) => {
|
||||||
if (timeout) clearTimeout(timeout);
|
if (timeout) clearTimeout(timeout);
|
||||||
|
clearTerminalCleanupTimers();
|
||||||
runningProcesses.delete(runId);
|
runningProcesses.delete(runId);
|
||||||
const errno = (err as NodeJS.ErrnoException).code;
|
const errno = (err as NodeJS.ErrnoException).code;
|
||||||
const pathValue = mergedEnv.PATH ?? mergedEnv.Path ?? "";
|
const pathValue = mergedEnv.PATH ?? mergedEnv.Path ?? "";
|
||||||
@@ -1343,8 +1413,14 @@ export async function runChildProcess(
|
|||||||
reject(new Error(msg));
|
reject(new Error(msg));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
child.on("exit", () => {
|
||||||
|
childExited = true;
|
||||||
|
maybeArmTerminalResultCleanup();
|
||||||
|
});
|
||||||
|
|
||||||
child.on("close", (code: number | null, signal: NodeJS.Signals | null) => {
|
child.on("close", (code: number | null, signal: NodeJS.Signals | null) => {
|
||||||
if (timeout) clearTimeout(timeout);
|
if (timeout) clearTimeout(timeout);
|
||||||
|
clearTerminalCleanupTimers();
|
||||||
runningProcesses.delete(runId);
|
runningProcesses.delete(runId);
|
||||||
void logChain.finally(() => {
|
void logChain.finally(() => {
|
||||||
resolve({
|
resolve({
|
||||||
|
|||||||
@@ -330,6 +330,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||||||
graceSec,
|
graceSec,
|
||||||
extraArgs,
|
extraArgs,
|
||||||
} = runtimeConfig;
|
} = runtimeConfig;
|
||||||
|
const terminalResultCleanupGraceMs = Math.max(
|
||||||
|
0,
|
||||||
|
asNumber(config.terminalResultCleanupGraceMs, 5_000),
|
||||||
|
);
|
||||||
const effectiveEnv = Object.fromEntries(
|
const effectiveEnv = Object.fromEntries(
|
||||||
Object.entries({ ...process.env, ...env }).filter(
|
Object.entries({ ...process.env, ...env }).filter(
|
||||||
(entry): entry is [string, string] => typeof entry[1] === "string",
|
(entry): entry is [string, string] => typeof entry[1] === "string",
|
||||||
@@ -503,6 +507,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||||||
graceSec,
|
graceSec,
|
||||||
onSpawn,
|
onSpawn,
|
||||||
onLog,
|
onLog,
|
||||||
|
terminalResultCleanup: {
|
||||||
|
graceMs: terminalResultCleanupGraceMs,
|
||||||
|
hasTerminalResult: ({ stdout }) => parseClaudeStreamJson(stdout).resultJson !== null,
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
const parsedStream = parseClaudeStreamJson(proc.stdout);
|
const parsedStream = parseClaudeStreamJson(proc.stdout);
|
||||||
|
|||||||
Reference in New Issue
Block a user