Files
paperclip/server/src/services/workspace-runtime.ts
2026-04-06 21:23:21 -05:00

2255 lines
73 KiB
TypeScript

import { spawn, type ChildProcess } from "node:child_process";
import { existsSync, readdirSync, readFileSync, realpathSync } from "node:fs";
import fs from "node:fs/promises";
import net from "node:net";
import { createHash, randomUUID } from "node:crypto";
import path from "node:path";
import { setTimeout as delay } from "node:timers/promises";
import type { AdapterRuntimeServiceReport } from "@paperclipai/adapter-utils";
import type { Db } from "@paperclipai/db";
import { executionWorkspaces, projectWorkspaces, workspaceRuntimeServices } from "@paperclipai/db";
import { and, desc, eq, inArray } from "drizzle-orm";
import { asNumber, asString, parseObject, renderTemplate } from "../adapters/utils.js";
import { resolveHomeAwarePath } from "../home-paths.js";
import {
createLocalServiceKey,
findLocalServiceRegistryRecordByRuntimeServiceId,
findAdoptableLocalService,
readLocalServicePortOwner,
removeLocalServiceRegistryRecord,
terminateLocalService,
touchLocalServiceRegistryRecord,
writeLocalServiceRegistryRecord,
} from "./local-service-supervisor.js";
import type { WorkspaceOperationRecorder } from "./workspace-operations.js";
import { readExecutionWorkspaceConfig } from "./execution-workspaces.js";
import { readProjectWorkspaceRuntimeConfig } from "./project-workspace-runtime-config.js";
export function resolveShell(): string {
return process.env.SHELL?.trim() || (process.platform === "win32" ? "sh" : "/bin/sh");
}
export interface ExecutionWorkspaceInput {
baseCwd: string;
source: "project_primary" | "task_session" | "agent_home";
projectId: string | null;
workspaceId: string | null;
repoUrl: string | null;
repoRef: string | null;
}
export interface ExecutionWorkspaceIssueRef {
id: string;
identifier: string | null;
title: string | null;
}
export interface ExecutionWorkspaceAgentRef {
id: string | null;
name: string;
companyId: string;
}
export interface RealizedExecutionWorkspace extends ExecutionWorkspaceInput {
strategy: "project_primary" | "git_worktree";
cwd: string;
branchName: string | null;
worktreePath: string | null;
warnings: string[];
created: boolean;
}
export interface RuntimeServiceRef {
id: string;
companyId: string;
projectId: string | null;
projectWorkspaceId: string | null;
executionWorkspaceId: string | null;
issueId: string | null;
serviceName: string;
status: "starting" | "running" | "stopped" | "failed";
lifecycle: "shared" | "ephemeral";
scopeType: "project_workspace" | "execution_workspace" | "run" | "agent";
scopeId: string | null;
reuseKey: string | null;
command: string | null;
cwd: string | null;
port: number | null;
url: string | null;
provider: "local_process" | "adapter_managed";
providerRef: string | null;
ownerAgentId: string | null;
startedByRunId: string | null;
lastUsedAt: string;
startedAt: string;
stoppedAt: string | null;
stopPolicy: Record<string, unknown> | null;
healthStatus: "unknown" | "healthy" | "unhealthy";
reused: boolean;
}
interface RuntimeServiceRecord extends RuntimeServiceRef {
db?: Db;
child: ChildProcess | null;
leaseRunIds: Set<string>;
idleTimer: ReturnType<typeof globalThis.setTimeout> | null;
envFingerprint: string;
serviceKey: string;
profileKind: string;
processGroupId: number | null;
}
const runtimeServicesById = new Map<string, RuntimeServiceRecord>();
const runtimeServicesByReuseKey = new Map<string, string>();
const runtimeServiceLeasesByRun = new Map<string, string[]>();
export async function resetRuntimeServicesForTests() {
for (const record of runtimeServicesById.values()) {
clearIdleTimer(record);
}
runtimeServicesById.clear();
runtimeServicesByReuseKey.clear();
runtimeServiceLeasesByRun.clear();
}
function stableStringify(value: unknown): string {
if (Array.isArray(value)) {
return `[${value.map((entry) => stableStringify(entry)).join(",")}]`;
}
if (value && typeof value === "object") {
const rec = value as Record<string, unknown>;
return `{${Object.keys(rec).sort().map((key) => `${JSON.stringify(key)}:${stableStringify(rec[key])}`).join(",")}}`;
}
return JSON.stringify(value);
}
type WorkspaceLinkMismatch = {
packageName: string;
expectedPath: string;
actualPath: string | null;
};
function readJsonFile(filePath: string): Record<string, unknown> {
return JSON.parse(readFileSync(filePath, "utf8")) as Record<string, unknown>;
}
function findWorkspaceRoot(startCwd: string) {
let current = path.resolve(startCwd);
while (true) {
if (existsSync(path.join(current, "pnpm-workspace.yaml"))) {
return current;
}
const parent = path.dirname(current);
if (parent === current) return null;
current = parent;
}
}
function discoverWorkspacePackagePaths(rootDir: string): Map<string, string> {
const packagePaths = new Map<string, string>();
const ignoredDirNames = new Set([".git", ".paperclip", "dist", "node_modules"]);
function visit(dirPath: string) {
if (!existsSync(dirPath)) return;
const packageJsonPath = path.join(dirPath, "package.json");
if (existsSync(packageJsonPath)) {
const packageJson = readJsonFile(packageJsonPath);
if (typeof packageJson.name === "string" && packageJson.name.length > 0) {
packagePaths.set(packageJson.name, dirPath);
}
}
for (const entry of readdirSync(dirPath, { withFileTypes: true })) {
if (!entry.isDirectory()) continue;
if (ignoredDirNames.has(entry.name)) continue;
visit(path.join(dirPath, entry.name));
}
}
visit(path.join(rootDir, "packages"));
visit(path.join(rootDir, "server"));
visit(path.join(rootDir, "ui"));
visit(path.join(rootDir, "cli"));
return packagePaths;
}
function findServerWorkspaceLinkMismatches(rootDir: string): WorkspaceLinkMismatch[] {
const serverPackageJsonPath = path.join(rootDir, "server", "package.json");
if (!existsSync(serverPackageJsonPath)) return [];
const serverPackageJson = readJsonFile(serverPackageJsonPath);
const dependencies = {
...(serverPackageJson.dependencies as Record<string, unknown> | undefined),
...(serverPackageJson.devDependencies as Record<string, unknown> | undefined),
};
const workspacePackagePaths = discoverWorkspacePackagePaths(rootDir);
const mismatches: WorkspaceLinkMismatch[] = [];
for (const [packageName, version] of Object.entries(dependencies)) {
if (typeof version !== "string" || !version.startsWith("workspace:")) continue;
const expectedPath = workspacePackagePaths.get(packageName);
if (!expectedPath) continue;
const normalizedExpectedPath = existsSync(expectedPath) ? path.resolve(realpathSync(expectedPath)) : path.resolve(expectedPath);
const linkPath = path.join(rootDir, "server", "node_modules", ...packageName.split("/"));
const actualPath = existsSync(linkPath) ? path.resolve(realpathSync(linkPath)) : null;
if (actualPath === normalizedExpectedPath) continue;
mismatches.push({
packageName,
expectedPath: normalizedExpectedPath,
actualPath,
});
}
return mismatches;
}
export async function ensureServerWorkspaceLinksCurrent(
startCwd: string,
opts?: {
onLog?: (stream: "stdout" | "stderr", chunk: string) => Promise<void>;
},
) {
const workspaceRoot = findWorkspaceRoot(startCwd);
if (!workspaceRoot) return;
const mismatches = findServerWorkspaceLinkMismatches(workspaceRoot);
if (mismatches.length === 0) return;
if (opts?.onLog) {
await opts.onLog("stdout", "[runtime] detected stale workspace package links for server; relinking dependencies...\n");
for (const mismatch of mismatches) {
await opts.onLog(
"stdout",
`[runtime] ${mismatch.packageName}: ${mismatch.actualPath ?? "missing"} -> ${mismatch.expectedPath}\n`,
);
}
}
for (const mismatch of mismatches) {
const linkPath = path.join(workspaceRoot, "server", "node_modules", ...mismatch.packageName.split("/"));
await fs.mkdir(path.dirname(linkPath), { recursive: true });
await fs.rm(linkPath, { recursive: true, force: true });
await fs.symlink(mismatch.expectedPath, linkPath);
}
const remainingMismatches = findServerWorkspaceLinkMismatches(workspaceRoot);
if (remainingMismatches.length === 0) return;
throw new Error(
`Workspace relink did not repair all server package links: ${remainingMismatches.map((item) => item.packageName).join(", ")}`,
);
}
export function sanitizeRuntimeServiceBaseEnv(baseEnv: NodeJS.ProcessEnv): NodeJS.ProcessEnv {
const env: NodeJS.ProcessEnv = { ...baseEnv };
for (const key of Object.keys(env)) {
if (key.startsWith("PAPERCLIP_")) {
delete env[key];
}
}
delete env.DATABASE_URL;
delete env.npm_config_tailscale_auth;
delete env.npm_config_authenticated_private;
return env;
}
function stableRuntimeServiceId(input: {
adapterType: string;
runId: string;
scopeType: RuntimeServiceRef["scopeType"];
scopeId: string | null;
serviceName: string;
reportId: string | null;
providerRef: string | null;
reuseKey: string | null;
}) {
if (input.reportId) return input.reportId;
const digest = createHash("sha256")
.update(
stableStringify({
adapterType: input.adapterType,
runId: input.runId,
scopeType: input.scopeType,
scopeId: input.scopeId,
serviceName: input.serviceName,
providerRef: input.providerRef,
reuseKey: input.reuseKey,
}),
)
.digest("hex")
.slice(0, 32);
return `${input.adapterType}-${digest}`;
}
function toRuntimeServiceRef(record: RuntimeServiceRecord, overrides?: Partial<RuntimeServiceRef>): RuntimeServiceRef {
return {
id: record.id,
companyId: record.companyId,
projectId: record.projectId,
projectWorkspaceId: record.projectWorkspaceId,
executionWorkspaceId: record.executionWorkspaceId,
issueId: record.issueId,
serviceName: record.serviceName,
status: record.status,
lifecycle: record.lifecycle,
scopeType: record.scopeType,
scopeId: record.scopeId,
reuseKey: record.reuseKey,
command: record.command,
cwd: record.cwd,
port: record.port,
url: record.url,
provider: record.provider,
providerRef: record.providerRef,
ownerAgentId: record.ownerAgentId,
startedByRunId: record.startedByRunId,
lastUsedAt: record.lastUsedAt,
startedAt: record.startedAt,
stoppedAt: record.stoppedAt,
stopPolicy: record.stopPolicy,
healthStatus: record.healthStatus,
reused: record.reused,
...overrides,
};
}
function sanitizeSlugPart(value: string | null | undefined, fallback: string): string {
const raw = (value ?? "").trim().toLowerCase();
const normalized = raw
.replace(/[^a-z0-9_-]+/g, "-")
.replace(/-+/g, "-")
.replace(/^[-_]+|[-_]+$/g, "");
return normalized.length > 0 ? normalized : fallback;
}
function renderWorkspaceTemplate(template: string, input: {
issue: ExecutionWorkspaceIssueRef | null;
agent: ExecutionWorkspaceAgentRef;
projectId: string | null;
repoRef: string | null;
}) {
const issueIdentifier = input.issue?.identifier ?? input.issue?.id ?? "issue";
const slug = sanitizeSlugPart(input.issue?.title, sanitizeSlugPart(issueIdentifier, "issue"));
return renderTemplate(template, {
issue: {
id: input.issue?.id ?? "",
identifier: input.issue?.identifier ?? "",
title: input.issue?.title ?? "",
},
agent: {
id: input.agent.id ?? "",
name: input.agent.name,
},
project: {
id: input.projectId ?? "",
},
workspace: {
repoRef: input.repoRef ?? "",
},
slug,
});
}
function sanitizeBranchName(value: string): string {
return value
.trim()
.replace(/[^A-Za-z0-9._/-]+/g, "-")
.replace(/-+/g, "-")
.replace(/^[-/.]+|[-/.]+$/g, "")
.slice(0, 120) || "paperclip-work";
}
function isAbsolutePath(value: string) {
return path.isAbsolute(value) || value.startsWith("~");
}
function resolveConfiguredPath(value: string, baseDir: string): string {
if (isAbsolutePath(value)) {
return resolveHomeAwarePath(value);
}
return path.resolve(baseDir, value);
}
function formatCommandForDisplay(command: string, args: string[]) {
return [command, ...args]
.map((part) => (/^[A-Za-z0-9_./:-]+$/.test(part) ? part : JSON.stringify(part)))
.join(" ");
}
async function executeProcess(input: {
command: string;
args: string[];
cwd: string;
env?: NodeJS.ProcessEnv;
}): Promise<{ stdout: string; stderr: string; code: number | null }> {
const proc = await new Promise<{ stdout: string; stderr: string; code: number | null }>((resolve, reject) => {
const child = spawn(input.command, input.args, {
cwd: input.cwd,
stdio: ["ignore", "pipe", "pipe"],
env: input.env ?? process.env,
});
let stdout = "";
let stderr = "";
child.stdout?.on("data", (chunk) => {
stdout += String(chunk);
});
child.stderr?.on("data", (chunk) => {
stderr += String(chunk);
});
child.on("error", reject);
child.on("close", (code) => resolve({ stdout, stderr, code }));
});
return proc;
}
async function runGit(args: string[], cwd: string): Promise<string> {
const proc = await executeProcess({
command: "git",
args,
cwd,
});
if (proc.code !== 0) {
throw new Error(proc.stderr.trim() || proc.stdout.trim() || `git ${args.join(" ")} failed`);
}
return proc.stdout.trim();
}
function gitErrorIncludes(error: unknown, needle: string) {
const message = error instanceof Error ? error.message : String(error);
return message.toLowerCase().includes(needle.toLowerCase());
}
async function detectDefaultBranch(repoRoot: string): Promise<string | null> {
// Try the explicit remote HEAD first (set by git clone or git remote set-head)
try {
const remoteHead = await runGit(
["symbolic-ref", "--quiet", "--short", "refs/remotes/origin/HEAD"],
repoRoot,
);
const branch = remoteHead?.startsWith("origin/") ? remoteHead.slice("origin/".length) : remoteHead;
if (branch) return branch;
} catch {
// Not set — fall through to heuristic
}
// Fallback: check for common default branch names on the remote
for (const candidate of ["main", "master"]) {
try {
await runGit(["rev-parse", "--verify", `refs/remotes/origin/${candidate}`], repoRoot);
return candidate;
} catch {
// Not found — try next
}
}
return null;
}
async function directoryExists(value: string) {
return fs.stat(value).then((stats) => stats.isDirectory()).catch(() => false);
}
function terminateChildProcess(child: ChildProcess) {
if (!child.pid) return;
if (process.platform !== "win32") {
try {
process.kill(-child.pid, "SIGTERM");
return;
} catch {
// Fall through to the direct child kill.
}
}
if (!child.killed) {
child.kill("SIGTERM");
}
}
function buildWorkspaceCommandEnv(input: {
base: ExecutionWorkspaceInput;
repoRoot: string;
worktreePath: string;
branchName: string;
issue: ExecutionWorkspaceIssueRef | null;
agent: ExecutionWorkspaceAgentRef;
created: boolean;
}) {
const env: NodeJS.ProcessEnv = { ...process.env };
env.PAPERCLIP_WORKSPACE_CWD = input.worktreePath;
env.PAPERCLIP_WORKSPACE_PATH = input.worktreePath;
env.PAPERCLIP_WORKSPACE_WORKTREE_PATH = input.worktreePath;
env.PAPERCLIP_WORKSPACE_BRANCH = input.branchName;
env.PAPERCLIP_WORKSPACE_BASE_CWD = input.base.baseCwd;
env.PAPERCLIP_WORKSPACE_REPO_ROOT = input.repoRoot;
env.PAPERCLIP_WORKSPACE_SOURCE = input.base.source;
env.PAPERCLIP_WORKSPACE_REPO_REF = input.base.repoRef ?? "";
env.PAPERCLIP_WORKSPACE_REPO_URL = input.base.repoUrl ?? "";
env.PAPERCLIP_WORKSPACE_CREATED = input.created ? "true" : "false";
env.PAPERCLIP_PROJECT_ID = input.base.projectId ?? "";
env.PAPERCLIP_PROJECT_WORKSPACE_ID = input.base.workspaceId ?? "";
env.PAPERCLIP_AGENT_ID = input.agent.id ?? "";
env.PAPERCLIP_AGENT_NAME = input.agent.name;
env.PAPERCLIP_COMPANY_ID = input.agent.companyId;
env.PAPERCLIP_ISSUE_ID = input.issue?.id ?? "";
env.PAPERCLIP_ISSUE_IDENTIFIER = input.issue?.identifier ?? "";
env.PAPERCLIP_ISSUE_TITLE = input.issue?.title ?? "";
return env;
}
async function runWorkspaceCommand(input: {
command: string;
cwd: string;
env: NodeJS.ProcessEnv;
label: string;
}) {
const shell = resolveShell();
const proc = await executeProcess({
command: shell,
args: ["-c", input.command],
cwd: input.cwd,
env: input.env,
});
if (proc.code === 0) return;
const details = [proc.stderr.trim(), proc.stdout.trim()].filter(Boolean).join("\n");
throw new Error(
details.length > 0
? `${input.label} failed: ${details}`
: `${input.label} failed with exit code ${proc.code ?? -1}`,
);
}
async function recordGitOperation(
recorder: WorkspaceOperationRecorder | null | undefined,
input: {
phase: "worktree_prepare" | "worktree_cleanup";
args: string[];
cwd: string;
metadata?: Record<string, unknown> | null;
successMessage?: string | null;
failureLabel?: string | null;
},
): Promise<string> {
if (!recorder) {
return runGit(input.args, input.cwd);
}
let stdout = "";
let stderr = "";
let code: number | null = null;
await recorder.recordOperation({
phase: input.phase,
command: formatCommandForDisplay("git", input.args),
cwd: input.cwd,
metadata: input.metadata ?? null,
run: async () => {
const result = await executeProcess({
command: "git",
args: input.args,
cwd: input.cwd,
});
stdout = result.stdout;
stderr = result.stderr;
code = result.code;
return {
status: result.code === 0 ? "succeeded" : "failed",
exitCode: result.code,
stdout: result.stdout,
stderr: result.stderr,
system: result.code === 0 ? input.successMessage ?? null : null,
};
},
});
if (code !== 0) {
const details = [stderr.trim(), stdout.trim()].filter(Boolean).join("\n");
throw new Error(
details.length > 0
? `${input.failureLabel ?? `git ${input.args.join(" ")}`} failed: ${details}`
: `${input.failureLabel ?? `git ${input.args.join(" ")}`} failed with exit code ${code ?? -1}`,
);
}
return stdout.trim();
}
async function recordWorkspaceCommandOperation(
recorder: WorkspaceOperationRecorder | null | undefined,
input: {
phase: "workspace_provision" | "workspace_teardown";
command: string;
cwd: string;
env: NodeJS.ProcessEnv;
label: string;
metadata?: Record<string, unknown> | null;
successMessage?: string | null;
},
) {
if (!recorder) {
await runWorkspaceCommand(input);
return;
}
let stdout = "";
let stderr = "";
let code: number | null = null;
await recorder.recordOperation({
phase: input.phase,
command: input.command,
cwd: input.cwd,
metadata: input.metadata ?? null,
run: async () => {
const shell = resolveShell();
const result = await executeProcess({
command: shell,
args: ["-c", input.command],
cwd: input.cwd,
env: input.env,
});
stdout = result.stdout;
stderr = result.stderr;
code = result.code;
return {
status: result.code === 0 ? "succeeded" : "failed",
exitCode: result.code,
stdout: result.stdout,
stderr: result.stderr,
system: result.code === 0 ? input.successMessage ?? null : null,
};
},
});
if (code === 0) return;
const details = [stderr.trim(), stdout.trim()].filter(Boolean).join("\n");
throw new Error(
details.length > 0
? `${input.label} failed: ${details}`
: `${input.label} failed with exit code ${code ?? -1}`,
);
}
async function provisionExecutionWorktree(input: {
strategy: Record<string, unknown>;
base: ExecutionWorkspaceInput;
repoRoot: string;
worktreePath: string;
branchName: string;
issue: ExecutionWorkspaceIssueRef | null;
agent: ExecutionWorkspaceAgentRef;
created: boolean;
recorder?: WorkspaceOperationRecorder | null;
}) {
const provisionCommand = asString(input.strategy.provisionCommand, "").trim();
if (!provisionCommand) return;
await recordWorkspaceCommandOperation(input.recorder, {
phase: "workspace_provision",
command: provisionCommand,
cwd: input.worktreePath,
env: buildWorkspaceCommandEnv({
base: input.base,
repoRoot: input.repoRoot,
worktreePath: input.worktreePath,
branchName: input.branchName,
issue: input.issue,
agent: input.agent,
created: input.created,
}),
label: `Execution workspace provision command "${provisionCommand}"`,
metadata: {
repoRoot: input.repoRoot,
worktreePath: input.worktreePath,
branchName: input.branchName,
created: input.created,
},
successMessage: `Provisioned workspace at ${input.worktreePath}\n`,
});
}
function buildExecutionWorkspaceCleanupEnv(input: {
workspace: {
cwd: string | null;
providerRef: string | null;
branchName: string | null;
repoUrl: string | null;
baseRef: string | null;
projectId: string | null;
projectWorkspaceId: string | null;
sourceIssueId: string | null;
};
projectWorkspaceCwd?: string | null;
}) {
const env: NodeJS.ProcessEnv = sanitizeRuntimeServiceBaseEnv(process.env);
env.PAPERCLIP_WORKSPACE_CWD = input.workspace.cwd ?? "";
env.PAPERCLIP_WORKSPACE_PATH = input.workspace.cwd ?? "";
env.PAPERCLIP_WORKSPACE_WORKTREE_PATH =
input.workspace.providerRef ?? input.workspace.cwd ?? "";
env.PAPERCLIP_WORKSPACE_BRANCH = input.workspace.branchName ?? "";
env.PAPERCLIP_WORKSPACE_BASE_CWD = input.projectWorkspaceCwd ?? "";
env.PAPERCLIP_WORKSPACE_REPO_ROOT = input.projectWorkspaceCwd ?? "";
env.PAPERCLIP_WORKSPACE_REPO_URL = input.workspace.repoUrl ?? "";
env.PAPERCLIP_WORKSPACE_REPO_REF = input.workspace.baseRef ?? "";
env.PAPERCLIP_PROJECT_ID = input.workspace.projectId ?? "";
env.PAPERCLIP_PROJECT_WORKSPACE_ID = input.workspace.projectWorkspaceId ?? "";
env.PAPERCLIP_ISSUE_ID = input.workspace.sourceIssueId ?? "";
return env;
}
async function resolveGitRepoRootForWorkspaceCleanup(
worktreePath: string,
projectWorkspaceCwd: string | null,
): Promise<string | null> {
if (projectWorkspaceCwd) {
const resolvedProjectWorkspaceCwd = path.resolve(projectWorkspaceCwd);
const gitDir = await runGit(["rev-parse", "--git-common-dir"], resolvedProjectWorkspaceCwd)
.catch(() => null);
if (gitDir) {
const resolvedGitDir = path.resolve(resolvedProjectWorkspaceCwd, gitDir);
return path.dirname(resolvedGitDir);
}
}
const gitDir = await runGit(["rev-parse", "--git-common-dir"], worktreePath).catch(() => null);
if (!gitDir) return null;
const resolvedGitDir = path.resolve(worktreePath, gitDir);
return path.dirname(resolvedGitDir);
}
export async function realizeExecutionWorkspace(input: {
base: ExecutionWorkspaceInput;
config: Record<string, unknown>;
issue: ExecutionWorkspaceIssueRef | null;
agent: ExecutionWorkspaceAgentRef;
recorder?: WorkspaceOperationRecorder | null;
}): Promise<RealizedExecutionWorkspace> {
const rawStrategy = parseObject(input.config.workspaceStrategy);
const strategyType = asString(rawStrategy.type, "project_primary");
if (strategyType !== "git_worktree") {
return {
...input.base,
strategy: "project_primary",
cwd: input.base.baseCwd,
branchName: null,
worktreePath: null,
warnings: [],
created: false,
};
}
const repoRoot = await runGit(["rev-parse", "--show-toplevel"], input.base.baseCwd);
const branchTemplate = asString(rawStrategy.branchTemplate, "{{issue.identifier}}-{{slug}}");
const renderedBranch = renderWorkspaceTemplate(branchTemplate, {
issue: input.issue,
agent: input.agent,
projectId: input.base.projectId,
repoRef: input.base.repoRef,
});
const branchName = sanitizeBranchName(renderedBranch);
const configuredParentDir = asString(rawStrategy.worktreeParentDir, "");
const worktreeParentDir = configuredParentDir
? resolveConfiguredPath(configuredParentDir, repoRoot)
: path.join(repoRoot, ".paperclip", "worktrees");
const worktreePath = path.join(worktreeParentDir, branchName);
const configuredBaseRef = typeof rawStrategy.baseRef === "string" && rawStrategy.baseRef.length > 0
? rawStrategy.baseRef
: input.base.repoRef ?? null;
const baseRef = configuredBaseRef
?? await detectDefaultBranch(repoRoot)
?? "HEAD";
await fs.mkdir(worktreeParentDir, { recursive: true });
const existingWorktree = await directoryExists(worktreePath);
if (existingWorktree) {
const existingGitDir = await runGit(["rev-parse", "--git-dir"], worktreePath).catch(() => null);
if (existingGitDir) {
if (input.recorder) {
await input.recorder.recordOperation({
phase: "worktree_prepare",
cwd: repoRoot,
metadata: {
repoRoot,
worktreePath,
branchName,
baseRef,
created: false,
reused: true,
},
run: async () => ({
status: "succeeded",
exitCode: 0,
system: `Reused existing git worktree at ${worktreePath}\n`,
}),
});
}
await provisionExecutionWorktree({
strategy: rawStrategy,
base: input.base,
repoRoot,
worktreePath,
branchName,
issue: input.issue,
agent: input.agent,
created: false,
recorder: input.recorder ?? null,
});
return {
...input.base,
strategy: "git_worktree",
cwd: worktreePath,
branchName,
worktreePath,
warnings: [],
created: false,
};
}
throw new Error(`Configured worktree path "${worktreePath}" already exists and is not a git worktree.`);
}
try {
await recordGitOperation(input.recorder, {
phase: "worktree_prepare",
args: ["worktree", "add", "-b", branchName, worktreePath, baseRef],
cwd: repoRoot,
metadata: {
repoRoot,
worktreePath,
branchName,
baseRef,
created: true,
},
successMessage: `Created git worktree at ${worktreePath}\n`,
failureLabel: `git worktree add ${worktreePath}`,
});
} catch (error) {
if (!gitErrorIncludes(error, "already exists")) {
throw error;
}
await recordGitOperation(input.recorder, {
phase: "worktree_prepare",
args: ["worktree", "add", worktreePath, branchName],
cwd: repoRoot,
metadata: {
repoRoot,
worktreePath,
branchName,
baseRef,
created: false,
reusedExistingBranch: true,
},
successMessage: `Attached existing branch ${branchName} at ${worktreePath}\n`,
failureLabel: `git worktree add ${worktreePath}`,
});
}
await provisionExecutionWorktree({
strategy: rawStrategy,
base: input.base,
repoRoot,
worktreePath,
branchName,
issue: input.issue,
agent: input.agent,
created: true,
recorder: input.recorder ?? null,
});
return {
...input.base,
strategy: "git_worktree",
cwd: worktreePath,
branchName,
worktreePath,
warnings: [],
created: true,
};
}
export async function cleanupExecutionWorkspaceArtifacts(input: {
workspace: {
id: string;
cwd: string | null;
providerType: string;
providerRef: string | null;
branchName: string | null;
repoUrl: string | null;
baseRef: string | null;
projectId: string | null;
projectWorkspaceId: string | null;
sourceIssueId: string | null;
metadata?: Record<string, unknown> | null;
};
projectWorkspace?: {
cwd: string | null;
cleanupCommand: string | null;
} | null;
cleanupCommand?: string | null;
teardownCommand?: string | null;
recorder?: WorkspaceOperationRecorder | null;
}) {
const warnings: string[] = [];
const workspacePath = input.workspace.providerRef ?? input.workspace.cwd;
const cleanupEnv = buildExecutionWorkspaceCleanupEnv({
workspace: input.workspace,
projectWorkspaceCwd: input.projectWorkspace?.cwd ?? null,
});
const createdByRuntime = input.workspace.metadata?.createdByRuntime === true;
const cleanupCommands = [
input.cleanupCommand ?? null,
input.projectWorkspace?.cleanupCommand ?? null,
input.teardownCommand ?? null,
]
.map((value) => asString(value, "").trim())
.filter(Boolean);
for (const command of cleanupCommands) {
try {
await recordWorkspaceCommandOperation(input.recorder, {
phase: "workspace_teardown",
command,
cwd: workspacePath ?? input.projectWorkspace?.cwd ?? process.cwd(),
env: cleanupEnv,
label: `Execution workspace cleanup command "${command}"`,
metadata: {
workspaceId: input.workspace.id,
workspacePath,
branchName: input.workspace.branchName,
providerType: input.workspace.providerType,
},
successMessage: `Completed cleanup command "${command}"\n`,
});
} catch (err) {
warnings.push(err instanceof Error ? err.message : String(err));
}
}
if (input.workspace.providerType === "git_worktree" && workspacePath) {
const repoRoot = await resolveGitRepoRootForWorkspaceCleanup(
workspacePath,
input.projectWorkspace?.cwd ?? null,
);
const worktreeExists = await directoryExists(workspacePath);
if (worktreeExists) {
if (!repoRoot) {
warnings.push(`Could not resolve git repo root for "${workspacePath}".`);
} else {
try {
await recordGitOperation(input.recorder, {
phase: "worktree_cleanup",
args: ["worktree", "remove", "--force", workspacePath],
cwd: repoRoot,
metadata: {
workspaceId: input.workspace.id,
workspacePath,
branchName: input.workspace.branchName,
cleanupAction: "worktree_remove",
},
successMessage: `Removed git worktree ${workspacePath}\n`,
failureLabel: `git worktree remove ${workspacePath}`,
});
} catch (err) {
warnings.push(err instanceof Error ? err.message : String(err));
}
}
}
if (createdByRuntime && input.workspace.branchName) {
if (!repoRoot) {
warnings.push(`Could not resolve git repo root to delete branch "${input.workspace.branchName}".`);
} else {
try {
await recordGitOperation(input.recorder, {
phase: "worktree_cleanup",
args: ["branch", "-d", input.workspace.branchName],
cwd: repoRoot,
metadata: {
workspaceId: input.workspace.id,
workspacePath,
branchName: input.workspace.branchName,
cleanupAction: "branch_delete",
},
successMessage: `Deleted branch ${input.workspace.branchName}\n`,
failureLabel: `git branch -d ${input.workspace.branchName}`,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
warnings.push(`Skipped deleting branch "${input.workspace.branchName}": ${message}`);
}
}
}
} else if (input.workspace.providerType === "local_fs" && createdByRuntime && workspacePath) {
const projectWorkspaceCwd = input.projectWorkspace?.cwd ? path.resolve(input.projectWorkspace.cwd) : null;
const resolvedWorkspacePath = path.resolve(workspacePath);
const containsProjectWorkspace = projectWorkspaceCwd
? (
resolvedWorkspacePath === projectWorkspaceCwd ||
projectWorkspaceCwd.startsWith(`${resolvedWorkspacePath}${path.sep}`)
)
: false;
if (containsProjectWorkspace) {
warnings.push(`Refusing to remove path "${workspacePath}" because it contains the project workspace.`);
} else {
await fs.rm(resolvedWorkspacePath, { recursive: true, force: true });
if (input.recorder) {
await input.recorder.recordOperation({
phase: "workspace_teardown",
cwd: projectWorkspaceCwd ?? process.cwd(),
metadata: {
workspaceId: input.workspace.id,
workspacePath: resolvedWorkspacePath,
cleanupAction: "remove_local_fs",
},
run: async () => ({
status: "succeeded",
exitCode: 0,
system: `Removed local workspace directory ${resolvedWorkspacePath}\n`,
}),
});
}
}
}
const cleaned =
!workspacePath ||
!(await directoryExists(workspacePath));
return {
cleanedPath: workspacePath,
cleaned,
warnings,
};
}
async function allocatePort(): Promise<number> {
return await new Promise<number>((resolve, reject) => {
const server = net.createServer();
server.listen(0, "127.0.0.1", () => {
const address = server.address();
server.close((err) => {
if (err) {
reject(err);
return;
}
if (!address || typeof address === "string") {
reject(new Error("Failed to allocate port"));
return;
}
resolve(address.port);
});
});
server.on("error", reject);
});
}
function buildTemplateData(input: {
workspace: RealizedExecutionWorkspace;
agent: ExecutionWorkspaceAgentRef;
issue: ExecutionWorkspaceIssueRef | null;
adapterEnv: Record<string, string>;
port: number | null;
}) {
return {
workspace: {
cwd: input.workspace.cwd,
branchName: input.workspace.branchName ?? "",
worktreePath: input.workspace.worktreePath ?? "",
repoUrl: input.workspace.repoUrl ?? "",
repoRef: input.workspace.repoRef ?? "",
env: input.adapterEnv,
},
issue: {
id: input.issue?.id ?? "",
identifier: input.issue?.identifier ?? "",
title: input.issue?.title ?? "",
},
agent: {
id: input.agent.id ?? "",
name: input.agent.name,
},
port: input.port ?? "",
};
}
function renderRuntimeServiceEnv(input: {
envConfig: Record<string, unknown>;
templateData: ReturnType<typeof buildTemplateData>;
}) {
const rendered: Record<string, string> = {};
for (const [key, value] of Object.entries(input.envConfig)) {
if (typeof value !== "string") continue;
rendered[key] = renderTemplate(value, input.templateData);
}
return rendered;
}
function resolveRuntimeServiceReuseIdentity(input: {
service: Record<string, unknown>;
workspace: RealizedExecutionWorkspace;
agent: ExecutionWorkspaceAgentRef;
issue: ExecutionWorkspaceIssueRef | null;
adapterEnv: Record<string, string>;
scopeType: RuntimeServiceRef["scopeType"];
scopeId: string | null;
}): {
serviceName: string;
lifecycle: RuntimeServiceRef["lifecycle"];
command: string;
serviceCwd: string;
envConfig: Record<string, unknown>;
envFingerprint: string;
explicitPort: number;
identityPort: number | null;
reuseKey: string | null;
} {
const serviceName = asString(input.service.name, "service");
const lifecycle = asString(input.service.lifecycle, "shared") === "ephemeral" ? "ephemeral" : "shared";
const command = asString(input.service.command, "");
const serviceCwdTemplate = asString(input.service.cwd, ".");
const portConfig = parseObject(input.service.port);
const envConfig = parseObject(input.service.env);
const explicitPort = asNumber(portConfig.value, asNumber(input.service.port, 0));
const identityPort = explicitPort > 0 ? explicitPort : null;
const templateData = buildTemplateData({
workspace: input.workspace,
agent: input.agent,
issue: input.issue,
adapterEnv: input.adapterEnv,
port: identityPort,
});
const serviceCwd = resolveConfiguredPath(renderTemplate(serviceCwdTemplate, templateData), input.workspace.cwd);
const renderedEnv = renderRuntimeServiceEnv({
envConfig,
templateData,
});
const envFingerprint = createHash("sha256").update(stableStringify(renderedEnv)).digest("hex");
const reuseKey =
lifecycle === "shared"
? createHash("sha256")
.update(
stableStringify({
scopeType: input.scopeType,
scopeId: input.scopeId,
serviceName,
command,
cwd: serviceCwd,
port: identityPort,
env: renderedEnv,
}),
)
.digest("hex")
: null;
return {
serviceName,
lifecycle,
command,
serviceCwd,
envConfig,
envFingerprint,
explicitPort,
identityPort,
reuseKey,
};
}
function resolveServiceScopeId(input: {
service: Record<string, unknown>;
workspace: RealizedExecutionWorkspace;
executionWorkspaceId?: string | null;
issue: ExecutionWorkspaceIssueRef | null;
runId: string;
agent: ExecutionWorkspaceAgentRef;
}): {
scopeType: "project_workspace" | "execution_workspace" | "run" | "agent";
scopeId: string | null;
} {
const scopeTypeRaw = asString(input.service.reuseScope, input.service.lifecycle === "shared" ? "project_workspace" : "run");
const scopeType =
scopeTypeRaw === "project_workspace" ||
scopeTypeRaw === "execution_workspace" ||
scopeTypeRaw === "agent"
? scopeTypeRaw
: "run";
if (scopeType === "project_workspace") return { scopeType, scopeId: input.workspace.workspaceId ?? input.workspace.projectId };
if (scopeType === "execution_workspace") {
return { scopeType, scopeId: input.executionWorkspaceId ?? input.workspace.cwd };
}
if (scopeType === "agent") return { scopeType, scopeId: input.agent.id };
return { scopeType: "run" as const, scopeId: input.runId };
}
async function waitForReadiness(input: {
service: Record<string, unknown>;
url: string | null;
}) {
const readiness = parseObject(input.service.readiness);
const readinessType = asString(readiness.type, "");
if (readinessType !== "http" || !input.url) return;
const timeoutSec = Math.max(1, asNumber(readiness.timeoutSec, 30));
const intervalMs = Math.max(100, asNumber(readiness.intervalMs, 500));
const deadline = Date.now() + timeoutSec * 1000;
let lastError = "service did not become ready";
while (Date.now() < deadline) {
try {
const response = await fetch(input.url);
if (response.ok) return;
lastError = `received HTTP ${response.status}`;
} catch (err) {
lastError = err instanceof Error ? err.message : String(err);
}
await delay(intervalMs);
}
throw new Error(`Readiness check failed for ${input.url}: ${lastError}`);
}
async function isRuntimeServiceUrlHealthy(url: string | null) {
if (!url) return true;
try {
const response = await fetch(url, { signal: AbortSignal.timeout(2_000) });
return response.ok;
} catch {
return false;
}
}
function toPersistedWorkspaceRuntimeService(record: RuntimeServiceRecord): typeof workspaceRuntimeServices.$inferInsert {
return {
id: record.id,
companyId: record.companyId,
projectId: record.projectId,
projectWorkspaceId: record.projectWorkspaceId,
executionWorkspaceId: record.executionWorkspaceId,
issueId: record.issueId,
scopeType: record.scopeType,
scopeId: record.scopeId,
serviceName: record.serviceName,
status: record.status,
lifecycle: record.lifecycle,
reuseKey: record.reuseKey,
command: record.command,
cwd: record.cwd,
port: record.port,
url: record.url,
provider: record.provider,
providerRef: record.providerRef,
ownerAgentId: record.ownerAgentId,
startedByRunId: record.startedByRunId,
lastUsedAt: new Date(record.lastUsedAt),
startedAt: new Date(record.startedAt),
stoppedAt: record.stoppedAt ? new Date(record.stoppedAt) : null,
stopPolicy: record.stopPolicy,
healthStatus: record.healthStatus,
updatedAt: new Date(),
};
}
async function persistRuntimeServiceRecord(db: Db | undefined, record: RuntimeServiceRecord) {
if (!db) return;
const values = toPersistedWorkspaceRuntimeService(record);
await db
.insert(workspaceRuntimeServices)
.values(values)
.onConflictDoUpdate({
target: workspaceRuntimeServices.id,
set: {
projectId: values.projectId,
projectWorkspaceId: values.projectWorkspaceId,
executionWorkspaceId: values.executionWorkspaceId,
issueId: values.issueId,
scopeType: values.scopeType,
scopeId: values.scopeId,
serviceName: values.serviceName,
status: values.status,
lifecycle: values.lifecycle,
reuseKey: values.reuseKey,
command: values.command,
cwd: values.cwd,
port: values.port,
url: values.url,
provider: values.provider,
providerRef: values.providerRef,
ownerAgentId: values.ownerAgentId,
startedByRunId: values.startedByRunId,
lastUsedAt: values.lastUsedAt,
startedAt: values.startedAt,
stoppedAt: values.stoppedAt,
stopPolicy: values.stopPolicy,
healthStatus: values.healthStatus,
updatedAt: values.updatedAt,
},
});
}
function clearIdleTimer(record: RuntimeServiceRecord) {
if (!record.idleTimer) return;
clearTimeout(record.idleTimer);
record.idleTimer = null;
}
export function normalizeAdapterManagedRuntimeServices(input: {
adapterType: string;
runId: string;
agent: ExecutionWorkspaceAgentRef;
issue: ExecutionWorkspaceIssueRef | null;
workspace: RealizedExecutionWorkspace;
executionWorkspaceId?: string | null;
reports: AdapterRuntimeServiceReport[];
now?: Date;
}): RuntimeServiceRef[] {
const nowIso = (input.now ?? new Date()).toISOString();
return input.reports.map((report) => {
const scopeType = report.scopeType ?? "run";
const scopeId =
report.scopeId ??
(scopeType === "project_workspace"
? input.workspace.workspaceId
: scopeType === "execution_workspace"
? input.executionWorkspaceId ?? input.workspace.cwd
: scopeType === "agent"
? input.agent.id
: input.runId) ??
null;
const serviceName = asString(report.serviceName, "").trim() || "service";
const status = report.status ?? "running";
const lifecycle = report.lifecycle ?? "ephemeral";
const healthStatus =
report.healthStatus ??
(status === "running" ? "healthy" : status === "failed" ? "unhealthy" : "unknown");
return {
id: stableRuntimeServiceId({
adapterType: input.adapterType,
runId: input.runId,
scopeType,
scopeId,
serviceName,
reportId: report.id ?? null,
providerRef: report.providerRef ?? null,
reuseKey: report.reuseKey ?? null,
}),
companyId: input.agent.companyId,
projectId: report.projectId ?? input.workspace.projectId,
projectWorkspaceId: report.projectWorkspaceId ?? input.workspace.workspaceId,
executionWorkspaceId: input.executionWorkspaceId ?? null,
issueId: report.issueId ?? input.issue?.id ?? null,
serviceName,
status,
lifecycle,
scopeType,
scopeId,
reuseKey: report.reuseKey ?? null,
command: report.command ?? null,
cwd: report.cwd ?? null,
port: report.port ?? null,
url: report.url ?? null,
provider: "adapter_managed",
providerRef: report.providerRef ?? null,
ownerAgentId: report.ownerAgentId ?? input.agent.id ?? null,
startedByRunId: input.runId,
lastUsedAt: nowIso,
startedAt: nowIso,
stoppedAt: status === "running" || status === "starting" ? null : nowIso,
stopPolicy: report.stopPolicy ?? null,
healthStatus,
reused: false,
};
});
}
async function startLocalRuntimeService(input: {
db?: Db;
runId: string;
leaseRunId?: string | null;
startedByRunId?: string | null;
agent: ExecutionWorkspaceAgentRef;
issue: ExecutionWorkspaceIssueRef | null;
workspace: RealizedExecutionWorkspace;
executionWorkspaceId?: string | null;
adapterEnv: Record<string, string>;
service: Record<string, unknown>;
onLog?: (stream: "stdout" | "stderr", chunk: string) => Promise<void>;
reuseKey: string | null;
scopeType: "project_workspace" | "execution_workspace" | "run" | "agent";
scopeId: string | null;
}): Promise<RuntimeServiceRecord> {
const leaseRunId = input.leaseRunId === undefined ? input.runId : input.leaseRunId;
const startedByRunId = input.startedByRunId === undefined ? input.runId : input.startedByRunId;
const identity = resolveRuntimeServiceReuseIdentity({
service: input.service,
workspace: input.workspace,
agent: input.agent,
issue: input.issue,
adapterEnv: input.adapterEnv,
scopeType: input.scopeType,
scopeId: input.scopeId,
});
const serviceName = identity.serviceName;
const lifecycle = identity.lifecycle;
const command = identity.command;
if (!command) throw new Error(`Runtime service "${serviceName}" is missing command`);
const portConfig = parseObject(input.service.port);
const envConfig = identity.envConfig;
const envFingerprint = identity.envFingerprint;
const serviceIdentityFingerprint = input.reuseKey ?? envFingerprint;
const explicitPort = identity.explicitPort;
const identityPort = identity.identityPort;
const port =
asString(portConfig.type, "") === "auto"
? await allocatePort()
: explicitPort > 0
? explicitPort
: null;
const templateData = buildTemplateData({
workspace: input.workspace,
agent: input.agent,
issue: input.issue,
adapterEnv: input.adapterEnv,
port,
});
const serviceCwd =
port === identityPort
? identity.serviceCwd
: resolveConfiguredPath(renderTemplate(asString(input.service.cwd, "."), templateData), input.workspace.cwd);
const env: Record<string, string> = {
...sanitizeRuntimeServiceBaseEnv(process.env),
...input.adapterEnv,
} as Record<string, string>;
for (const [key, value] of Object.entries(renderRuntimeServiceEnv({ envConfig, templateData }))) {
env[key] = value;
}
if (port) {
const portEnvKey = asString(portConfig.envKey, "PORT");
env[portEnvKey] = String(port);
}
const expose = parseObject(input.service.expose);
const readiness = parseObject(input.service.readiness);
const urlTemplate =
asString(expose.urlTemplate, "") ||
asString(readiness.urlTemplate, "");
const url = urlTemplate ? renderTemplate(urlTemplate, templateData) : null;
const stopPolicy = parseObject(input.service.stopPolicy);
const serviceKey = createLocalServiceKey({
profileKind: "workspace-runtime",
serviceName,
cwd: serviceCwd,
command,
envFingerprint: serviceIdentityFingerprint,
port: identityPort,
scope: {
scopeType: input.scopeType,
scopeId: input.scopeId,
executionWorkspaceId: input.executionWorkspaceId ?? null,
reuseKey: input.reuseKey,
},
});
const adoptedRecord = await findAdoptableLocalService({
serviceKey,
command,
cwd: serviceCwd,
envFingerprint: serviceIdentityFingerprint,
port: identityPort,
});
if (adoptedRecord) {
return {
id: adoptedRecord.runtimeServiceId ?? randomUUID(),
companyId: input.agent.companyId,
projectId: input.workspace.projectId,
projectWorkspaceId: input.workspace.workspaceId,
executionWorkspaceId: input.executionWorkspaceId ?? null,
issueId: input.issue?.id ?? null,
serviceName,
status: "running",
lifecycle,
scopeType: input.scopeType,
scopeId: input.scopeId,
reuseKey: input.reuseKey,
command,
cwd: serviceCwd,
port: adoptedRecord.port ?? port,
url: adoptedRecord.url ?? url,
provider: "local_process",
providerRef: String(adoptedRecord.pid),
ownerAgentId: input.agent.id ?? null,
startedByRunId,
lastUsedAt: new Date().toISOString(),
startedAt: adoptedRecord.startedAt,
stoppedAt: null,
stopPolicy,
healthStatus: "healthy",
reused: true,
db: input.db,
child: null,
leaseRunIds: leaseRunId ? new Set([leaseRunId]) : new Set(),
idleTimer: null,
envFingerprint,
serviceKey,
profileKind: "workspace-runtime",
processGroupId: adoptedRecord.processGroupId ?? null,
};
}
if (identityPort) {
const ownerPid = await readLocalServicePortOwner(identityPort);
if (ownerPid) {
throw new Error(
`Runtime service "${serviceName}" could not start because port ${identityPort} is already in use by pid ${ownerPid}`,
);
}
}
await ensureServerWorkspaceLinksCurrent(serviceCwd, {
onLog: input.onLog,
});
const shell = resolveShell();
const child = spawn(shell, ["-lc", command], {
cwd: serviceCwd,
env,
detached: process.platform !== "win32",
stdio: ["ignore", "pipe", "pipe"],
});
let stderrExcerpt = "";
let stdoutExcerpt = "";
child.stdout?.on("data", async (chunk) => {
const text = String(chunk);
stdoutExcerpt = (stdoutExcerpt + text).slice(-4096);
if (input.onLog) await input.onLog("stdout", `[service:${serviceName}] ${text}`);
});
child.stderr?.on("data", async (chunk) => {
const text = String(chunk);
stderrExcerpt = (stderrExcerpt + text).slice(-4096);
if (input.onLog) await input.onLog("stderr", `[service:${serviceName}] ${text}`);
});
try {
await waitForReadiness({ service: input.service, url });
} catch (err) {
terminateChildProcess(child);
throw new Error(
`Failed to start runtime service "${serviceName}": ${err instanceof Error ? err.message : String(err)}${stderrExcerpt ? ` | stderr: ${stderrExcerpt.trim()}` : ""}`,
);
}
const record: RuntimeServiceRecord = {
id: randomUUID(),
companyId: input.agent.companyId,
projectId: input.workspace.projectId,
projectWorkspaceId: input.workspace.workspaceId,
executionWorkspaceId: input.executionWorkspaceId ?? null,
issueId: input.issue?.id ?? null,
serviceName,
status: "running",
lifecycle,
scopeType: input.scopeType,
scopeId: input.scopeId,
reuseKey: input.reuseKey,
command,
cwd: serviceCwd,
port,
url,
provider: "local_process",
providerRef: child.pid ? String(child.pid) : null,
ownerAgentId: input.agent.id ?? null,
startedByRunId,
lastUsedAt: new Date().toISOString(),
startedAt: new Date().toISOString(),
stoppedAt: null,
stopPolicy,
healthStatus: "healthy",
reused: false,
db: input.db,
child,
leaseRunIds: leaseRunId ? new Set([leaseRunId]) : new Set(),
idleTimer: null,
envFingerprint,
serviceKey,
profileKind: "workspace-runtime",
processGroupId: child.pid ?? null,
};
if (child.pid) {
await writeLocalServiceRegistryRecord({
version: 1,
serviceKey,
profileKind: "workspace-runtime",
serviceName,
command,
cwd: serviceCwd,
envFingerprint: serviceIdentityFingerprint,
port,
url,
pid: child.pid,
processGroupId: child.pid,
provider: "local_process",
runtimeServiceId: record.id,
reuseKey: input.reuseKey,
startedAt: record.startedAt,
lastSeenAt: record.lastUsedAt,
metadata: {
projectId: record.projectId,
projectWorkspaceId: record.projectWorkspaceId,
executionWorkspaceId: record.executionWorkspaceId,
issueId: record.issueId,
scopeType: record.scopeType,
scopeId: record.scopeId,
},
});
}
return record;
}
function scheduleIdleStop(record: RuntimeServiceRecord) {
clearIdleTimer(record);
const stopType = asString(record.stopPolicy?.type, "manual");
if (stopType !== "idle_timeout") return;
const idleSeconds = Math.max(1, asNumber(record.stopPolicy?.idleSeconds, 1800));
record.idleTimer = setTimeout(() => {
stopRuntimeService(record.id).catch(() => undefined);
}, idleSeconds * 1000);
}
async function stopRuntimeService(serviceId: string) {
const record = runtimeServicesById.get(serviceId);
if (!record) return;
clearIdleTimer(record);
record.status = "stopped";
record.healthStatus = "unknown";
record.lastUsedAt = new Date().toISOString();
record.stoppedAt = new Date().toISOString();
runtimeServicesById.delete(serviceId);
if (record.reuseKey && runtimeServicesByReuseKey.get(record.reuseKey) === record.id) {
runtimeServicesByReuseKey.delete(record.reuseKey);
}
if (record.child && record.child.pid) {
await terminateLocalService({
pid: record.child.pid,
processGroupId: record.processGroupId ?? record.child.pid,
});
} else if (record.providerRef) {
const pid = Number.parseInt(record.providerRef, 10);
if (Number.isInteger(pid) && pid > 0) {
await terminateLocalService({
pid,
processGroupId: record.processGroupId,
});
}
}
await removeLocalServiceRegistryRecord(record.serviceKey);
await persistRuntimeServiceRecord(record.db, record);
}
async function markPersistedRuntimeServicesStoppedForExecutionWorkspace(input: {
db: Db;
executionWorkspaceId: string;
}) {
const now = new Date();
await input.db
.update(workspaceRuntimeServices)
.set({
status: "stopped",
healthStatus: "unknown",
stoppedAt: now,
lastUsedAt: now,
updatedAt: now,
})
.where(
and(
eq(workspaceRuntimeServices.executionWorkspaceId, input.executionWorkspaceId),
inArray(workspaceRuntimeServices.status, ["starting", "running"]),
),
);
}
function registerRuntimeService(db: Db | undefined, record: RuntimeServiceRecord) {
record.db = db;
runtimeServicesById.set(record.id, record);
if (record.reuseKey) {
runtimeServicesByReuseKey.set(record.reuseKey, record.id);
}
record.child?.on("exit", (code, signal) => {
const current = runtimeServicesById.get(record.id);
if (!current) return;
clearIdleTimer(current);
current.status = code === 0 || signal === "SIGTERM" ? "stopped" : "failed";
current.healthStatus = current.status === "failed" ? "unhealthy" : "unknown";
current.lastUsedAt = new Date().toISOString();
current.stoppedAt = new Date().toISOString();
runtimeServicesById.delete(current.id);
if (current.reuseKey && runtimeServicesByReuseKey.get(current.reuseKey) === current.id) {
runtimeServicesByReuseKey.delete(current.reuseKey);
}
void removeLocalServiceRegistryRecord(current.serviceKey);
void persistRuntimeServiceRecord(db, current);
});
}
function readRuntimeServiceEntries(config: Record<string, unknown>) {
const runtime = parseObject(config.workspaceRuntime);
return Array.isArray(runtime.services)
? runtime.services.filter((entry): entry is Record<string, unknown> => typeof entry === "object" && entry !== null)
: [];
}
export async function ensureRuntimeServicesForRun(input: {
db?: Db;
runId: string;
agent: ExecutionWorkspaceAgentRef;
issue: ExecutionWorkspaceIssueRef | null;
workspace: RealizedExecutionWorkspace;
executionWorkspaceId?: string | null;
config: Record<string, unknown>;
adapterEnv: Record<string, string>;
onLog?: (stream: "stdout" | "stderr", chunk: string) => Promise<void>;
}): Promise<RuntimeServiceRef[]> {
const rawServices = readRuntimeServiceEntries(input.config);
const acquiredServiceIds: string[] = [];
const refs: RuntimeServiceRef[] = [];
runtimeServiceLeasesByRun.set(input.runId, acquiredServiceIds);
try {
for (const service of rawServices) {
const { scopeType, scopeId } = resolveServiceScopeId({
service,
workspace: input.workspace,
executionWorkspaceId: input.executionWorkspaceId,
issue: input.issue,
runId: input.runId,
agent: input.agent,
});
const reuseKey = resolveRuntimeServiceReuseIdentity({
service,
workspace: input.workspace,
agent: input.agent,
issue: input.issue,
adapterEnv: input.adapterEnv,
scopeType,
scopeId,
}).reuseKey;
if (reuseKey) {
const existingId = runtimeServicesByReuseKey.get(reuseKey);
const existing = existingId ? runtimeServicesById.get(existingId) : null;
if (existing && existing.status === "running") {
existing.leaseRunIds.add(input.runId);
existing.lastUsedAt = new Date().toISOString();
existing.stoppedAt = null;
clearIdleTimer(existing);
void touchLocalServiceRegistryRecord(existing.serviceKey, {
runtimeServiceId: existing.id,
lastSeenAt: existing.lastUsedAt,
});
await persistRuntimeServiceRecord(input.db, existing);
acquiredServiceIds.push(existing.id);
refs.push(toRuntimeServiceRef(existing, { reused: true }));
continue;
}
}
const record = await startLocalRuntimeService({
db: input.db,
runId: input.runId,
agent: input.agent,
issue: input.issue,
workspace: input.workspace,
executionWorkspaceId: input.executionWorkspaceId,
adapterEnv: input.adapterEnv,
service,
onLog: input.onLog,
reuseKey,
scopeType,
scopeId,
});
registerRuntimeService(input.db, record);
await persistRuntimeServiceRecord(input.db, record);
acquiredServiceIds.push(record.id);
refs.push(toRuntimeServiceRef(record));
}
} catch (err) {
await releaseRuntimeServicesForRun(input.runId);
throw err;
}
return refs;
}
export async function startRuntimeServicesForWorkspaceControl(input: {
db?: Db;
invocationId?: string;
actor: ExecutionWorkspaceAgentRef;
issue: ExecutionWorkspaceIssueRef | null;
workspace: RealizedExecutionWorkspace;
executionWorkspaceId?: string | null;
config: Record<string, unknown>;
adapterEnv: Record<string, string>;
onLog?: (stream: "stdout" | "stderr", chunk: string) => Promise<void>;
}): Promise<RuntimeServiceRef[]> {
const rawServices = readRuntimeServiceEntries(input.config);
const refs: RuntimeServiceRef[] = [];
const invocationId = input.invocationId ?? randomUUID();
for (const service of rawServices) {
const { scopeType, scopeId } = resolveServiceScopeId({
service,
workspace: input.workspace,
executionWorkspaceId: input.executionWorkspaceId,
issue: input.issue,
runId: invocationId,
agent: input.actor,
});
const reuseKey = resolveRuntimeServiceReuseIdentity({
service,
workspace: input.workspace,
agent: input.actor,
issue: input.issue,
adapterEnv: input.adapterEnv,
scopeType,
scopeId,
}).reuseKey;
if (reuseKey) {
const existingId = runtimeServicesByReuseKey.get(reuseKey);
const existing = existingId ? runtimeServicesById.get(existingId) : null;
if (existing && existing.status === "running") {
existing.lastUsedAt = new Date().toISOString();
existing.stoppedAt = null;
clearIdleTimer(existing);
void touchLocalServiceRegistryRecord(existing.serviceKey, {
runtimeServiceId: existing.id,
lastSeenAt: existing.lastUsedAt,
});
await persistRuntimeServiceRecord(input.db, existing);
refs.push(toRuntimeServiceRef(existing, { reused: true }));
continue;
}
}
// Manually controlled services are not tied to a heartbeat run lifecycle, so they do not
// retain a run lease and never persist a startedByRunId foreign key.
const record = await startLocalRuntimeService({
db: input.db,
runId: invocationId,
leaseRunId: null,
startedByRunId: null,
agent: input.actor,
issue: input.issue,
workspace: input.workspace,
executionWorkspaceId: input.executionWorkspaceId,
adapterEnv: input.adapterEnv,
service,
onLog: input.onLog,
reuseKey,
scopeType,
scopeId,
});
registerRuntimeService(input.db, record);
await persistRuntimeServiceRecord(input.db, record);
refs.push(toRuntimeServiceRef(record));
}
return refs;
}
export async function releaseRuntimeServicesForRun(runId: string) {
const acquired = runtimeServiceLeasesByRun.get(runId) ?? [];
runtimeServiceLeasesByRun.delete(runId);
for (const serviceId of acquired) {
const record = runtimeServicesById.get(serviceId);
if (!record) continue;
record.leaseRunIds.delete(runId);
record.lastUsedAt = new Date().toISOString();
const stopType = asString(record.stopPolicy?.type, record.lifecycle === "ephemeral" ? "on_run_finish" : "manual");
await persistRuntimeServiceRecord(record.db, record);
if (record.leaseRunIds.size === 0) {
if (record.lifecycle === "ephemeral" || stopType === "on_run_finish") {
await stopRuntimeService(serviceId);
continue;
}
scheduleIdleStop(record);
}
}
}
export async function stopRuntimeServicesForExecutionWorkspace(input: {
db?: Db;
executionWorkspaceId: string;
workspaceCwd?: string | null;
}) {
const normalizedWorkspaceCwd = input.workspaceCwd ? path.resolve(input.workspaceCwd) : null;
const matchingServiceIds = Array.from(runtimeServicesById.values())
.filter((record) => {
if (record.executionWorkspaceId === input.executionWorkspaceId) return true;
if (!normalizedWorkspaceCwd || !record.cwd) return false;
const resolvedCwd = path.resolve(record.cwd);
return (
resolvedCwd === normalizedWorkspaceCwd ||
resolvedCwd.startsWith(`${normalizedWorkspaceCwd}${path.sep}`)
);
})
.map((record) => record.id);
for (const serviceId of matchingServiceIds) {
await stopRuntimeService(serviceId);
}
if (input.db) {
await markPersistedRuntimeServicesStoppedForExecutionWorkspace({
db: input.db,
executionWorkspaceId: input.executionWorkspaceId,
});
}
}
export async function stopRuntimeServicesForProjectWorkspace(input: {
db?: Db;
projectWorkspaceId: string;
}) {
const matchingServiceIds = Array.from(runtimeServicesById.values())
.filter((record) => record.projectWorkspaceId === input.projectWorkspaceId && record.scopeType === "project_workspace")
.map((record) => record.id);
for (const serviceId of matchingServiceIds) {
await stopRuntimeService(serviceId);
}
if (input.db) {
const now = new Date();
await input.db
.update(workspaceRuntimeServices)
.set({
status: "stopped",
healthStatus: "unknown",
stoppedAt: now,
lastUsedAt: now,
updatedAt: now,
})
.where(
and(
eq(workspaceRuntimeServices.projectWorkspaceId, input.projectWorkspaceId),
eq(workspaceRuntimeServices.scopeType, "project_workspace"),
inArray(workspaceRuntimeServices.status, ["starting", "running"]),
),
);
}
}
export async function listWorkspaceRuntimeServicesForProjectWorkspaces(
db: Db,
companyId: string,
projectWorkspaceIds: string[],
) {
if (projectWorkspaceIds.length === 0) return new Map<string, typeof workspaceRuntimeServices.$inferSelect[]>();
const rows = await db
.select()
.from(workspaceRuntimeServices)
.where(
and(
eq(workspaceRuntimeServices.companyId, companyId),
inArray(workspaceRuntimeServices.projectWorkspaceId, projectWorkspaceIds),
eq(workspaceRuntimeServices.scopeType, "project_workspace"),
),
)
.orderBy(desc(workspaceRuntimeServices.updatedAt), desc(workspaceRuntimeServices.createdAt));
const grouped = new Map<string, typeof workspaceRuntimeServices.$inferSelect[]>();
for (const row of rows) {
if (!row.projectWorkspaceId) continue;
const existing = grouped.get(row.projectWorkspaceId);
if (existing) existing.push(row);
else grouped.set(row.projectWorkspaceId, [row]);
}
return grouped;
}
export async function reconcilePersistedRuntimeServicesOnStartup(db: Db) {
const rows = await db
.select()
.from(workspaceRuntimeServices)
.where(
and(
eq(workspaceRuntimeServices.provider, "local_process"),
inArray(workspaceRuntimeServices.status, ["starting", "running"]),
),
);
if (rows.length === 0) return { reconciled: 0, adopted: 0, stopped: 0 };
let adopted = 0;
let stopped = 0;
for (const row of rows) {
const adoptedRecord = await findLocalServiceRegistryRecordByRuntimeServiceId({
runtimeServiceId: row.id,
profileKind: "workspace-runtime",
});
if (adoptedRecord) {
const adoptedUrl = adoptedRecord.url ?? row.url ?? null;
if (!(await isRuntimeServiceUrlHealthy(adoptedUrl))) {
await removeLocalServiceRegistryRecord(adoptedRecord.serviceKey);
} else {
const record: RuntimeServiceRecord = {
id: row.id,
companyId: row.companyId,
projectId: row.projectId ?? null,
projectWorkspaceId: row.projectWorkspaceId ?? null,
executionWorkspaceId: row.executionWorkspaceId ?? null,
issueId: row.issueId ?? null,
serviceName: row.serviceName,
status: "running",
lifecycle: row.lifecycle as RuntimeServiceRecord["lifecycle"],
scopeType: row.scopeType as RuntimeServiceRecord["scopeType"],
scopeId: row.scopeId ?? null,
reuseKey: row.reuseKey ?? null,
command: row.command ?? null,
cwd: row.cwd ?? null,
port: adoptedRecord.port ?? row.port ?? null,
url: adoptedRecord.url ?? row.url ?? null,
provider: "local_process",
providerRef: String(adoptedRecord.pid),
ownerAgentId: row.ownerAgentId ?? null,
startedByRunId: row.startedByRunId ?? null,
lastUsedAt: new Date().toISOString(),
startedAt: row.startedAt.toISOString(),
stoppedAt: null,
stopPolicy: (row.stopPolicy as Record<string, unknown> | null) ?? null,
healthStatus: "healthy",
reused: true,
db,
child: null,
leaseRunIds: new Set(),
idleTimer: null,
envFingerprint: row.reuseKey ?? "",
serviceKey: adoptedRecord.serviceKey,
profileKind: "workspace-runtime",
processGroupId: adoptedRecord.processGroupId ?? null,
};
registerRuntimeService(db, record);
await touchLocalServiceRegistryRecord(adoptedRecord.serviceKey, {
runtimeServiceId: row.id,
lastSeenAt: record.lastUsedAt,
});
await persistRuntimeServiceRecord(db, record);
adopted += 1;
continue;
}
}
const now = new Date();
await db
.update(workspaceRuntimeServices)
.set({
status: "stopped",
healthStatus: "unknown",
stoppedAt: now,
lastUsedAt: now,
updatedAt: now,
})
.where(eq(workspaceRuntimeServices.id, row.id));
const registryRecord = await findLocalServiceRegistryRecordByRuntimeServiceId({
runtimeServiceId: row.id,
profileKind: "workspace-runtime",
});
if (registryRecord) {
await removeLocalServiceRegistryRecord(registryRecord.serviceKey);
}
stopped += 1;
}
return { reconciled: rows.length, adopted, stopped };
}
export async function restartDesiredRuntimeServicesOnStartup(db: Db) {
let restarted = 0;
let failed = 0;
const projectWorkspaceRows = await db
.select()
.from(projectWorkspaces);
for (const row of projectWorkspaceRows) {
const runtimeConfig = readProjectWorkspaceRuntimeConfig((row.metadata as Record<string, unknown> | null) ?? null);
if (runtimeConfig?.desiredState !== "running" || !runtimeConfig.workspaceRuntime || !row.cwd) continue;
try {
const refs = await startRuntimeServicesForWorkspaceControl({
db,
actor: { id: null, name: "Paperclip", companyId: row.companyId },
issue: null,
workspace: {
baseCwd: row.cwd,
source: "project_primary",
projectId: row.projectId,
workspaceId: row.id,
repoUrl: row.repoUrl ?? null,
repoRef: row.repoRef ?? null,
strategy: "project_primary",
cwd: row.cwd,
branchName: row.defaultRef ?? row.repoRef ?? null,
worktreePath: null,
warnings: [],
created: false,
},
config: { workspaceRuntime: runtimeConfig.workspaceRuntime },
adapterEnv: {},
});
if (refs.length > 0) restarted += refs.filter((ref) => !ref.reused).length;
} catch {
failed += 1;
}
}
const executionWorkspaceRows = await db
.select()
.from(executionWorkspaces)
.where(inArray(executionWorkspaces.status, ["active", "idle", "in_review", "cleanup_failed"]));
for (const row of executionWorkspaceRows) {
const config = readExecutionWorkspaceConfig((row.metadata as Record<string, unknown> | null) ?? null);
if (config?.desiredState !== "running" || !config.workspaceRuntime || !row.cwd) continue;
try {
const refs = await startRuntimeServicesForWorkspaceControl({
db,
actor: { id: null, name: "Paperclip", companyId: row.companyId },
issue: row.sourceIssueId
? {
id: row.sourceIssueId,
identifier: null,
title: row.name,
}
: null,
workspace: {
baseCwd: row.cwd,
source: row.mode === "shared_workspace" ? "project_primary" : "task_session",
projectId: row.projectId,
workspaceId: row.projectWorkspaceId ?? null,
repoUrl: row.repoUrl ?? null,
repoRef: row.baseRef ?? null,
strategy: row.strategyType === "git_worktree" ? "git_worktree" : "project_primary",
cwd: row.cwd,
branchName: row.branchName ?? null,
worktreePath: row.strategyType === "git_worktree" ? row.cwd : null,
warnings: [],
created: false,
},
executionWorkspaceId: row.id,
config: { workspaceRuntime: config.workspaceRuntime },
adapterEnv: {},
});
if (refs.length > 0) restarted += refs.filter((ref) => !ref.reused).length;
} catch {
failed += 1;
}
}
return { restarted, failed };
}
export async function persistAdapterManagedRuntimeServices(input: {
db: Db;
adapterType: string;
runId: string;
agent: ExecutionWorkspaceAgentRef;
issue: ExecutionWorkspaceIssueRef | null;
workspace: RealizedExecutionWorkspace;
executionWorkspaceId?: string | null;
reports: AdapterRuntimeServiceReport[];
}) {
const refs = normalizeAdapterManagedRuntimeServices(input);
if (refs.length === 0) return refs;
const existingRows = await input.db
.select()
.from(workspaceRuntimeServices)
.where(inArray(workspaceRuntimeServices.id, refs.map((ref) => ref.id)));
const existingById = new Map(existingRows.map((row) => [row.id, row]));
for (const ref of refs) {
const existing = existingById.get(ref.id);
const startedAt = existing?.startedAt ?? new Date(ref.startedAt);
const createdAt = existing?.createdAt ?? new Date();
await input.db
.insert(workspaceRuntimeServices)
.values({
id: ref.id,
companyId: ref.companyId,
projectId: ref.projectId,
projectWorkspaceId: ref.projectWorkspaceId,
executionWorkspaceId: ref.executionWorkspaceId,
issueId: ref.issueId,
scopeType: ref.scopeType,
scopeId: ref.scopeId,
serviceName: ref.serviceName,
status: ref.status,
lifecycle: ref.lifecycle,
reuseKey: ref.reuseKey,
command: ref.command,
cwd: ref.cwd,
port: ref.port,
url: ref.url,
provider: ref.provider,
providerRef: ref.providerRef,
ownerAgentId: ref.ownerAgentId,
startedByRunId: ref.startedByRunId,
lastUsedAt: new Date(ref.lastUsedAt),
startedAt,
stoppedAt: ref.stoppedAt ? new Date(ref.stoppedAt) : null,
stopPolicy: ref.stopPolicy,
healthStatus: ref.healthStatus,
createdAt,
updatedAt: new Date(),
})
.onConflictDoUpdate({
target: workspaceRuntimeServices.id,
set: {
projectId: ref.projectId,
projectWorkspaceId: ref.projectWorkspaceId,
executionWorkspaceId: ref.executionWorkspaceId,
issueId: ref.issueId,
scopeType: ref.scopeType,
scopeId: ref.scopeId,
serviceName: ref.serviceName,
status: ref.status,
lifecycle: ref.lifecycle,
reuseKey: ref.reuseKey,
command: ref.command,
cwd: ref.cwd,
port: ref.port,
url: ref.url,
provider: ref.provider,
providerRef: ref.providerRef,
ownerAgentId: ref.ownerAgentId,
startedByRunId: ref.startedByRunId,
lastUsedAt: new Date(ref.lastUsedAt),
startedAt,
stoppedAt: ref.stoppedAt ? new Date(ref.stoppedAt) : null,
stopPolicy: ref.stopPolicy,
healthStatus: ref.healthStatus,
updatedAt: new Date(),
},
});
}
return refs;
}
export function buildWorkspaceReadyComment(input: {
workspace: RealizedExecutionWorkspace;
runtimeServices: RuntimeServiceRef[];
}) {
const lines = ["## Workspace Ready", ""];
lines.push(`- Strategy: \`${input.workspace.strategy}\``);
if (input.workspace.branchName) lines.push(`- Branch: \`${input.workspace.branchName}\``);
lines.push(`- CWD: \`${input.workspace.cwd}\``);
if (input.workspace.worktreePath && input.workspace.worktreePath !== input.workspace.cwd) {
lines.push(`- Worktree: \`${input.workspace.worktreePath}\``);
}
for (const service of input.runtimeServices) {
const detail = service.url ? `${service.serviceName}: ${service.url}` : `${service.serviceName}: running`;
const suffix = service.reused ? " (reused)" : "";
lines.push(`- Service: ${detail}${suffix}`);
}
return lines.join("\n");
}