diff --git a/cli/src/__tests__/company-import-export-e2e.test.ts b/cli/src/__tests__/company-import-export-e2e.test.ts index d3f76c473c..1322064444 100644 --- a/cli/src/__tests__/company-import-export-e2e.test.ts +++ b/cli/src/__tests__/company-import-export-e2e.test.ts @@ -400,8 +400,6 @@ describeEmbeddedPostgres("paperclipai company import/export e2e", () => { ); const importedMatchingIssues = importedIssues.filter((issue) => issue.title === sourceIssue.title); - const importedMatchingIssues = importedIssues.filter((issue) => issue.title === sourceIssue.title); - expect(importedAgents.map((agent) => agent.name)).toContain(sourceAgent.name); expect(importedProjects.map((project) => project.name)).toContain(sourceProject.name); expect(importedMatchingIssues).toHaveLength(1); @@ -476,8 +474,6 @@ describeEmbeddedPostgres("paperclipai company import/export e2e", () => { ); const twiceImportedMatchingIssues = twiceImportedIssues.filter((issue) => issue.title === sourceIssue.title); - const twiceImportedMatchingIssues = twiceImportedIssues.filter((issue) => issue.title === sourceIssue.title); - expect(twiceImportedAgents).toHaveLength(2); expect(new Set(twiceImportedAgents.map((agent) => agent.name)).size).toBe(2); expect(twiceImportedProjects).toHaveLength(2); diff --git a/server/src/__tests__/heartbeat-start-lock.test.ts b/server/src/__tests__/heartbeat-start-lock.test.ts index dc52673f2d..29aefdfb6d 100644 --- a/server/src/__tests__/heartbeat-start-lock.test.ts +++ b/server/src/__tests__/heartbeat-start-lock.test.ts @@ -1,6 +1,6 @@ import { randomUUID } from "node:crypto"; import { afterEach, describe, expect, it, vi } from "vitest"; -import { withAgentStartLockForTest } from "../services/heartbeat.ts"; +import { withAgentStartLock } from "../services/agent-start-lock.ts"; describe("heartbeat agent start lock", () => { afterEach(() => { @@ -14,11 +14,11 @@ describe("heartbeat agent start lock", () => { const firstStart = vi.fn(() => new Promise(() => undefined)); const secondStart = vi.fn(async () => "started"); - void withAgentStartLockForTest(agentId, firstStart); + void withAgentStartLock(agentId, firstStart); await Promise.resolve(); expect(firstStart).toHaveBeenCalledTimes(1); - const secondStartResult = withAgentStartLockForTest(agentId, secondStart); + const secondStartResult = withAgentStartLock(agentId, secondStart); await Promise.resolve(); expect(secondStart).not.toHaveBeenCalled(); diff --git a/server/src/services/agent-start-lock.ts b/server/src/services/agent-start-lock.ts new file mode 100644 index 0000000000..38cbaf74c6 --- /dev/null +++ b/server/src/services/agent-start-lock.ts @@ -0,0 +1,48 @@ +import { logger } from "../middleware/logger.js"; + +const AGENT_START_LOCK_STALE_MS = 30_000; +const startLocksByAgent = new Map; startedAtMs: number }>(); + +async function waitForAgentStartLock(agentId: string, lock: { promise: Promise; startedAtMs: number }) { + const elapsedMs = Date.now() - lock.startedAtMs; + const remainingMs = AGENT_START_LOCK_STALE_MS - elapsedMs; + if (remainingMs <= 0) { + logger.warn({ agentId, staleMs: elapsedMs }, "agent start lock stale; continuing queued-run start"); + return; + } + + let timedOut = false; + let timeout: ReturnType | null = null; + await Promise.race([ + lock.promise, + new Promise((resolve) => { + timeout = setTimeout(() => { + timedOut = true; + resolve(); + }, remainingMs); + }), + ]); + if (timeout) clearTimeout(timeout); + + if (timedOut) { + logger.warn({ agentId, staleMs: AGENT_START_LOCK_STALE_MS }, "agent start lock timed out; continuing queued-run start"); + } +} + +export async function withAgentStartLock(agentId: string, fn: () => Promise) { + const previous = startLocksByAgent.get(agentId); + const waitForPrevious = previous ? waitForAgentStartLock(agentId, previous) : Promise.resolve(); + const run = waitForPrevious.then(fn); + const marker = run.then( + () => undefined, + () => undefined, + ); + startLocksByAgent.set(agentId, { promise: marker, startedAtMs: Date.now() }); + try { + return await run; + } finally { + if (startLocksByAgent.get(agentId)?.promise === marker) { + startLocksByAgent.delete(agentId); + } + } +} diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 1f510878f9..57a4f62f77 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -108,6 +108,7 @@ import { } from "./recovery/index.js"; import { isAutomaticRecoverySuppressedByPauseHold } from "./recovery/pause-hold-guard.js"; import { recoveryService } from "./recovery/service.js"; +import { withAgentStartLock } from "./agent-start-lock.js"; import { redactCurrentUserText, redactCurrentUserValue } from "../log-redaction.js"; import { hasSessionCompactionThresholds, @@ -132,7 +133,6 @@ const MAX_RUN_EVENT_PAYLOAD_OBJECT_KEYS = 100; const MAX_RUN_EVENT_PAYLOAD_DEPTH = 6; const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = AGENT_DEFAULT_MAX_CONCURRENT_RUNS; const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10; -const AGENT_START_LOCK_STALE_MS = 30_000; const LIVENESS_BOOKKEEPING_ACTIVITY_ACTIONS = [ "environment.lease_acquired", "environment.lease_released", @@ -142,7 +142,6 @@ const WAKE_COMMENT_IDS_KEY = "wakeCommentIds"; const PAPERCLIP_WAKE_PAYLOAD_KEY = "paperclipWake"; const PAPERCLIP_HARNESS_CHECKOUT_KEY = "paperclipHarnessCheckedOut"; const DETACHED_PROCESS_ERROR_CODE = "process_detached"; -const startLocksByAgent = new Map; startedAtMs: number }>(); const REPO_ONLY_CWD_SENTINEL = "/__paperclip_repo_only__"; const MANAGED_WORKSPACE_GIT_CLONE_TIMEOUT_MS = 10 * 60 * 1000; const MAX_INLINE_WAKE_COMMENTS = 8; @@ -851,54 +850,6 @@ function normalizeMaxConcurrentRuns(value: unknown) { return Math.max(HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT, Math.min(HEARTBEAT_MAX_CONCURRENT_RUNS_MAX, parsed)); } -async function waitForAgentStartLock(agentId: string, lock: { promise: Promise; startedAtMs: number }) { - const elapsedMs = Date.now() - lock.startedAtMs; - const remainingMs = AGENT_START_LOCK_STALE_MS - elapsedMs; - if (remainingMs <= 0) { - logger.warn({ agentId, staleMs: elapsedMs }, "agent start lock stale; continuing queued-run start"); - return; - } - - let timedOut = false; - let timeout: ReturnType | null = null; - await Promise.race([ - lock.promise, - new Promise((resolve) => { - timeout = setTimeout(() => { - timedOut = true; - resolve(); - }, remainingMs); - }), - ]); - if (timeout) clearTimeout(timeout); - - if (timedOut) { - logger.warn({ agentId, staleMs: AGENT_START_LOCK_STALE_MS }, "agent start lock timed out; continuing queued-run start"); - } -} - -export async function withAgentStartLockForTest(agentId: string, fn: () => Promise) { - return withAgentStartLock(agentId, fn); -} - -async function withAgentStartLock(agentId: string, fn: () => Promise) { - const previous = startLocksByAgent.get(agentId); - const waitForPrevious = previous ? waitForAgentStartLock(agentId, previous) : Promise.resolve(); - const run = waitForPrevious.then(fn); - const marker = run.then( - () => undefined, - () => undefined, - ); - startLocksByAgent.set(agentId, { promise: marker, startedAtMs: Date.now() }); - try { - return await run; - } finally { - if (startLocksByAgent.get(agentId)?.promise === marker) { - startLocksByAgent.delete(agentId); - } - } -} - interface WakeupOptions { source?: "timer" | "assignment" | "on_demand" | "automation"; triggerDetail?: "manual" | "ping" | "callback" | "system"; @@ -5989,7 +5940,7 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) return { kind: "released" as const }; } - if (await isAutomaticRecoverySuppressedByPauseHold(db, issue.companyId, issue.id)) { + if (await isAutomaticRecoverySuppressedByPauseHold(db, issue.companyId, issue.id, treeControlSvc)) { return { kind: "released" as const }; } diff --git a/server/src/services/recovery/pause-hold-guard.ts b/server/src/services/recovery/pause-hold-guard.ts index 6445341b58..9f996d814a 100644 --- a/server/src/services/recovery/pause-hold-guard.ts +++ b/server/src/services/recovery/pause-hold-guard.ts @@ -1,11 +1,14 @@ import type { Db } from "@paperclipai/db"; import { issueTreeControlService } from "../issue-tree-control.js"; +type IssueTreeControlService = ReturnType; + export async function isAutomaticRecoverySuppressedByPauseHold( db: Db, companyId: string, issueId: string, + treeControlSvc: IssueTreeControlService = issueTreeControlService(db), ) { - const activePauseHold = await issueTreeControlService(db).getActivePauseHoldGate(companyId, issueId); + const activePauseHold = await treeControlSvc.getActivePauseHoldGate(companyId, issueId); return Boolean(activePauseHold); } diff --git a/server/src/services/recovery/service.ts b/server/src/services/recovery/service.ts index 2c39044efa..f2c2f58d79 100644 --- a/server/src/services/recovery/service.ts +++ b/server/src/services/recovery/service.ts @@ -19,6 +19,7 @@ import { redactSensitiveText } from "../../redaction.js"; import { logActivity } from "../activity-log.js"; import { budgetService } from "../budgets.js"; import { instanceSettingsService } from "../instance-settings.js"; +import { issueTreeControlService } from "../issue-tree-control.js"; import { issueService } from "../issues.js"; import { getRunLogStore } from "../run-log-store.js"; import { @@ -265,6 +266,7 @@ function buildLivenessOriginalIssueComment(finding: IssueLivenessFinding, escala export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) { const issuesSvc = issueService(db); + const treeControlSvc = issueTreeControlService(db); const budgets = budgetService(db); const instanceSettings = instanceSettingsService(db); const runLogStore = getRunLogStore(); @@ -1438,7 +1440,7 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) continue; } - if (await isAutomaticRecoverySuppressedByPauseHold(db, issue.companyId, issue.id)) { + if (await isAutomaticRecoverySuppressedByPauseHold(db, issue.companyId, issue.id, treeControlSvc)) { result.skipped += 1; continue; } @@ -1913,7 +1915,7 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) .where(eq(issues.id, input.finding.issueId)) .then((rows) => rows[0] ?? null); if (!issue || issue.companyId !== input.finding.companyId) return { kind: "skipped" as const }; - if (await isAutomaticRecoverySuppressedByPauseHold(db, issue.companyId, issue.id)) { + if (await isAutomaticRecoverySuppressedByPauseHold(db, issue.companyId, issue.id, treeControlSvc)) { return { kind: "skipped" as const }; }