Address runtime lifecycle PR followups

This commit is contained in:
Dotta
2026-04-24 15:20:57 -05:00
parent f6d236ce95
commit 5e3933091f
6 changed files with 61 additions and 61 deletions

View File

@@ -400,8 +400,6 @@ describeEmbeddedPostgres("paperclipai company import/export e2e", () => {
);
const importedMatchingIssues = importedIssues.filter((issue) => issue.title === sourceIssue.title);
const importedMatchingIssues = importedIssues.filter((issue) => issue.title === sourceIssue.title);
expect(importedAgents.map((agent) => agent.name)).toContain(sourceAgent.name);
expect(importedProjects.map((project) => project.name)).toContain(sourceProject.name);
expect(importedMatchingIssues).toHaveLength(1);
@@ -476,8 +474,6 @@ describeEmbeddedPostgres("paperclipai company import/export e2e", () => {
);
const twiceImportedMatchingIssues = twiceImportedIssues.filter((issue) => issue.title === sourceIssue.title);
const twiceImportedMatchingIssues = twiceImportedIssues.filter((issue) => issue.title === sourceIssue.title);
expect(twiceImportedAgents).toHaveLength(2);
expect(new Set(twiceImportedAgents.map((agent) => agent.name)).size).toBe(2);
expect(twiceImportedProjects).toHaveLength(2);

View File

@@ -1,6 +1,6 @@
import { randomUUID } from "node:crypto";
import { afterEach, describe, expect, it, vi } from "vitest";
import { withAgentStartLockForTest } from "../services/heartbeat.ts";
import { withAgentStartLock } from "../services/agent-start-lock.ts";
describe("heartbeat agent start lock", () => {
afterEach(() => {
@@ -14,11 +14,11 @@ describe("heartbeat agent start lock", () => {
const firstStart = vi.fn(() => new Promise<void>(() => undefined));
const secondStart = vi.fn(async () => "started");
void withAgentStartLockForTest(agentId, firstStart);
void withAgentStartLock(agentId, firstStart);
await Promise.resolve();
expect(firstStart).toHaveBeenCalledTimes(1);
const secondStartResult = withAgentStartLockForTest(agentId, secondStart);
const secondStartResult = withAgentStartLock(agentId, secondStart);
await Promise.resolve();
expect(secondStart).not.toHaveBeenCalled();

View File

@@ -0,0 +1,48 @@
import { logger } from "../middleware/logger.js";
const AGENT_START_LOCK_STALE_MS = 30_000;
const startLocksByAgent = new Map<string, { promise: Promise<void>; startedAtMs: number }>();
async function waitForAgentStartLock(agentId: string, lock: { promise: Promise<void>; startedAtMs: number }) {
const elapsedMs = Date.now() - lock.startedAtMs;
const remainingMs = AGENT_START_LOCK_STALE_MS - elapsedMs;
if (remainingMs <= 0) {
logger.warn({ agentId, staleMs: elapsedMs }, "agent start lock stale; continuing queued-run start");
return;
}
let timedOut = false;
let timeout: ReturnType<typeof setTimeout> | null = null;
await Promise.race([
lock.promise,
new Promise<void>((resolve) => {
timeout = setTimeout(() => {
timedOut = true;
resolve();
}, remainingMs);
}),
]);
if (timeout) clearTimeout(timeout);
if (timedOut) {
logger.warn({ agentId, staleMs: AGENT_START_LOCK_STALE_MS }, "agent start lock timed out; continuing queued-run start");
}
}
export async function withAgentStartLock<T>(agentId: string, fn: () => Promise<T>) {
const previous = startLocksByAgent.get(agentId);
const waitForPrevious = previous ? waitForAgentStartLock(agentId, previous) : Promise.resolve();
const run = waitForPrevious.then(fn);
const marker = run.then(
() => undefined,
() => undefined,
);
startLocksByAgent.set(agentId, { promise: marker, startedAtMs: Date.now() });
try {
return await run;
} finally {
if (startLocksByAgent.get(agentId)?.promise === marker) {
startLocksByAgent.delete(agentId);
}
}
}

View File

@@ -108,6 +108,7 @@ import {
} from "./recovery/index.js";
import { isAutomaticRecoverySuppressedByPauseHold } from "./recovery/pause-hold-guard.js";
import { recoveryService } from "./recovery/service.js";
import { withAgentStartLock } from "./agent-start-lock.js";
import { redactCurrentUserText, redactCurrentUserValue } from "../log-redaction.js";
import {
hasSessionCompactionThresholds,
@@ -132,7 +133,6 @@ const MAX_RUN_EVENT_PAYLOAD_OBJECT_KEYS = 100;
const MAX_RUN_EVENT_PAYLOAD_DEPTH = 6;
const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = AGENT_DEFAULT_MAX_CONCURRENT_RUNS;
const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10;
const AGENT_START_LOCK_STALE_MS = 30_000;
const LIVENESS_BOOKKEEPING_ACTIVITY_ACTIONS = [
"environment.lease_acquired",
"environment.lease_released",
@@ -142,7 +142,6 @@ const WAKE_COMMENT_IDS_KEY = "wakeCommentIds";
const PAPERCLIP_WAKE_PAYLOAD_KEY = "paperclipWake";
const PAPERCLIP_HARNESS_CHECKOUT_KEY = "paperclipHarnessCheckedOut";
const DETACHED_PROCESS_ERROR_CODE = "process_detached";
const startLocksByAgent = new Map<string, { promise: Promise<void>; startedAtMs: number }>();
const REPO_ONLY_CWD_SENTINEL = "/__paperclip_repo_only__";
const MANAGED_WORKSPACE_GIT_CLONE_TIMEOUT_MS = 10 * 60 * 1000;
const MAX_INLINE_WAKE_COMMENTS = 8;
@@ -851,54 +850,6 @@ function normalizeMaxConcurrentRuns(value: unknown) {
return Math.max(HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT, Math.min(HEARTBEAT_MAX_CONCURRENT_RUNS_MAX, parsed));
}
async function waitForAgentStartLock(agentId: string, lock: { promise: Promise<void>; startedAtMs: number }) {
const elapsedMs = Date.now() - lock.startedAtMs;
const remainingMs = AGENT_START_LOCK_STALE_MS - elapsedMs;
if (remainingMs <= 0) {
logger.warn({ agentId, staleMs: elapsedMs }, "agent start lock stale; continuing queued-run start");
return;
}
let timedOut = false;
let timeout: ReturnType<typeof setTimeout> | null = null;
await Promise.race([
lock.promise,
new Promise<void>((resolve) => {
timeout = setTimeout(() => {
timedOut = true;
resolve();
}, remainingMs);
}),
]);
if (timeout) clearTimeout(timeout);
if (timedOut) {
logger.warn({ agentId, staleMs: AGENT_START_LOCK_STALE_MS }, "agent start lock timed out; continuing queued-run start");
}
}
export async function withAgentStartLockForTest<T>(agentId: string, fn: () => Promise<T>) {
return withAgentStartLock(agentId, fn);
}
async function withAgentStartLock<T>(agentId: string, fn: () => Promise<T>) {
const previous = startLocksByAgent.get(agentId);
const waitForPrevious = previous ? waitForAgentStartLock(agentId, previous) : Promise.resolve();
const run = waitForPrevious.then(fn);
const marker = run.then(
() => undefined,
() => undefined,
);
startLocksByAgent.set(agentId, { promise: marker, startedAtMs: Date.now() });
try {
return await run;
} finally {
if (startLocksByAgent.get(agentId)?.promise === marker) {
startLocksByAgent.delete(agentId);
}
}
}
interface WakeupOptions {
source?: "timer" | "assignment" | "on_demand" | "automation";
triggerDetail?: "manual" | "ping" | "callback" | "system";
@@ -5989,7 +5940,7 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
return { kind: "released" as const };
}
if (await isAutomaticRecoverySuppressedByPauseHold(db, issue.companyId, issue.id)) {
if (await isAutomaticRecoverySuppressedByPauseHold(db, issue.companyId, issue.id, treeControlSvc)) {
return { kind: "released" as const };
}

View File

@@ -1,11 +1,14 @@
import type { Db } from "@paperclipai/db";
import { issueTreeControlService } from "../issue-tree-control.js";
type IssueTreeControlService = ReturnType<typeof issueTreeControlService>;
export async function isAutomaticRecoverySuppressedByPauseHold(
db: Db,
companyId: string,
issueId: string,
treeControlSvc: IssueTreeControlService = issueTreeControlService(db),
) {
const activePauseHold = await issueTreeControlService(db).getActivePauseHoldGate(companyId, issueId);
const activePauseHold = await treeControlSvc.getActivePauseHoldGate(companyId, issueId);
return Boolean(activePauseHold);
}

View File

@@ -19,6 +19,7 @@ import { redactSensitiveText } from "../../redaction.js";
import { logActivity } from "../activity-log.js";
import { budgetService } from "../budgets.js";
import { instanceSettingsService } from "../instance-settings.js";
import { issueTreeControlService } from "../issue-tree-control.js";
import { issueService } from "../issues.js";
import { getRunLogStore } from "../run-log-store.js";
import {
@@ -265,6 +266,7 @@ function buildLivenessOriginalIssueComment(finding: IssueLivenessFinding, escala
export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) {
const issuesSvc = issueService(db);
const treeControlSvc = issueTreeControlService(db);
const budgets = budgetService(db);
const instanceSettings = instanceSettingsService(db);
const runLogStore = getRunLogStore();
@@ -1438,7 +1440,7 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
continue;
}
if (await isAutomaticRecoverySuppressedByPauseHold(db, issue.companyId, issue.id)) {
if (await isAutomaticRecoverySuppressedByPauseHold(db, issue.companyId, issue.id, treeControlSvc)) {
result.skipped += 1;
continue;
}
@@ -1913,7 +1915,7 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
.where(eq(issues.id, input.finding.issueId))
.then((rows) => rows[0] ?? null);
if (!issue || issue.companyId !== input.finding.companyId) return { kind: "skipped" as const };
if (await isAutomaticRecoverySuppressedByPauseHold(db, issue.companyId, issue.id)) {
if (await isAutomaticRecoverySuppressedByPauseHold(db, issue.companyId, issue.id, treeControlSvc)) {
return { kind: "skipped" as const };
}