diff --git a/server/src/__tests__/heartbeat-issue-liveness-escalation.test.ts b/server/src/__tests__/heartbeat-issue-liveness-escalation.test.ts new file mode 100644 index 0000000000..5e3f80abf4 --- /dev/null +++ b/server/src/__tests__/heartbeat-issue-liveness-escalation.test.ts @@ -0,0 +1,280 @@ +import { randomUUID } from "node:crypto"; +import { and, eq, sql } from "drizzle-orm"; +import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest"; +import { + activityLog, + agents, + agentWakeupRequests, + companies, + createDb, + heartbeatRuns, + issueComments, + issueRelations, + issues, +} from "@paperclipai/db"; +import { + getEmbeddedPostgresTestSupport, + startEmbeddedPostgresTestDatabase, +} from "./helpers/embedded-postgres.js"; + +const mockAdapterExecute = vi.hoisted(() => + vi.fn(async () => ({ + exitCode: 0, + signal: null, + timedOut: false, + errorMessage: null, + summary: "Acknowledged liveness escalation.", + provider: "test", + model: "test-model", + })), +); + +vi.mock("../telemetry.ts", () => ({ + getTelemetryClient: () => ({ track: vi.fn() }), +})); + +vi.mock("@paperclipai/shared/telemetry", async () => { + const actual = await vi.importActual( + "@paperclipai/shared/telemetry", + ); + return { + ...actual, + trackAgentFirstHeartbeat: vi.fn(), + }; +}); + +vi.mock("../adapters/index.ts", async () => { + const actual = await vi.importActual("../adapters/index.ts"); + return { + ...actual, + getServerAdapter: vi.fn(() => ({ + supportsLocalAgentJwt: false, + execute: mockAdapterExecute, + })), + }; +}); + +import { heartbeatService } from "../services/heartbeat.ts"; +import { runningProcesses } from "../adapters/index.ts"; + +const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport(); +const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip; + +if (!embeddedPostgresSupport.supported) { + console.warn( + `Skipping embedded Postgres issue liveness escalation tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`, + ); +} + +describeEmbeddedPostgres("heartbeat issue graph liveness escalation", () => { + let tempDb: Awaited> | null = null; + let db: ReturnType; + + beforeAll(async () => { + tempDb = await startEmbeddedPostgresTestDatabase("paperclip-heartbeat-issue-liveness-"); + db = createDb(tempDb.connectionString); + }, 30_000); + + afterEach(async () => { + vi.clearAllMocks(); + runningProcesses.clear(); + let idlePolls = 0; + for (let attempt = 0; attempt < 100; attempt += 1) { + const runs = await db + .select({ status: heartbeatRuns.status }) + .from(heartbeatRuns); + const hasActiveRun = runs.some((run) => run.status === "queued" || run.status === "running"); + if (!hasActiveRun) { + idlePolls += 1; + if (idlePolls >= 3) break; + } else { + idlePolls = 0; + } + await new Promise((resolve) => setTimeout(resolve, 50)); + } + await new Promise((resolve) => setTimeout(resolve, 50)); + await db.execute(sql.raw(`TRUNCATE TABLE "companies" CASCADE`)); + }); + + afterAll(async () => { + await tempDb?.cleanup(); + }); + + async function seedBlockedChain() { + const companyId = randomUUID(); + const managerId = randomUUID(); + const coderId = randomUUID(); + const blockedIssueId = randomUUID(); + const blockerIssueId = randomUUID(); + const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; + + await db.insert(companies).values({ + id: companyId, + name: "Paperclip", + issuePrefix, + requireBoardApprovalForNewAgents: false, + }); + + await db.insert(agents).values([ + { + id: managerId, + companyId, + name: "CTO", + role: "cto", + status: "idle", + adapterType: "codex_local", + adapterConfig: {}, + runtimeConfig: {}, + permissions: {}, + }, + { + id: coderId, + companyId, + name: "Coder", + role: "engineer", + status: "idle", + reportsTo: managerId, + adapterType: "codex_local", + adapterConfig: {}, + runtimeConfig: {}, + permissions: {}, + }, + ]); + + await db.insert(issues).values([ + { + id: blockedIssueId, + companyId, + title: "Blocked parent", + status: "blocked", + priority: "medium", + assigneeAgentId: coderId, + issueNumber: 1, + identifier: `${issuePrefix}-1`, + }, + { + id: blockerIssueId, + companyId, + title: "Missing unblock owner", + status: "todo", + priority: "medium", + issueNumber: 2, + identifier: `${issuePrefix}-2`, + }, + ]); + + await db.insert(issueRelations).values({ + companyId, + issueId: blockerIssueId, + relatedIssueId: blockedIssueId, + type: "blocks", + }); + + return { companyId, managerId, blockedIssueId, blockerIssueId }; + } + + it("creates one manager escalation, preserves blockers, and wakes the assignee", async () => { + const { companyId, managerId, blockedIssueId, blockerIssueId } = await seedBlockedChain(); + const heartbeat = heartbeatService(db); + + const first = await heartbeat.reconcileIssueGraphLiveness(); + const second = await heartbeat.reconcileIssueGraphLiveness(); + + expect(first.escalationsCreated).toBe(1); + expect(second.escalationsCreated).toBe(0); + expect(second.existingEscalations).toBe(1); + + const escalations = await db + .select() + .from(issues) + .where( + and( + eq(issues.companyId, companyId), + eq(issues.originKind, "harness_liveness_escalation"), + ), + ); + expect(escalations).toHaveLength(1); + expect(escalations[0]).toMatchObject({ + parentId: blockedIssueId, + assigneeAgentId: managerId, + status: expect.stringMatching(/^(todo|in_progress|done)$/), + }); + + const blockers = await db + .select({ blockerIssueId: issueRelations.issueId }) + .from(issueRelations) + .where(eq(issueRelations.relatedIssueId, blockedIssueId)); + expect(blockers.map((row) => row.blockerIssueId).sort()).toEqual( + [blockerIssueId, escalations[0]!.id].sort(), + ); + + const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, blockedIssueId)); + expect(comments).toHaveLength(1); + expect(comments[0]?.body).toContain("harness-level liveness incident"); + expect(comments[0]?.body).toContain(escalations[0]?.identifier ?? escalations[0]!.id); + + const wakes = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, managerId)); + expect(wakes.some((wake) => wake.reason === "issue_assigned")).toBe(true); + + const events = await db.select().from(activityLog).where(eq(activityLog.companyId, companyId)); + expect(events.some((event) => event.action === "issue.harness_liveness_escalation_created")).toBe(true); + expect(events.some((event) => event.action === "issue.blockers.updated")).toBe(true); + }); + + it("creates a fresh escalation when the previous matching escalation is terminal", async () => { + const { companyId, managerId, blockedIssueId, blockerIssueId } = await seedBlockedChain(); + const heartbeat = heartbeatService(db); + const incidentKey = [ + "harness_liveness", + companyId, + blockedIssueId, + "blocked_by_unassigned_issue", + blockerIssueId, + ].join(":"); + const closedEscalationId = randomUUID(); + + await db.insert(issues).values({ + id: closedEscalationId, + companyId, + title: "Closed escalation", + status: "done", + priority: "high", + parentId: blockedIssueId, + assigneeAgentId: managerId, + issueNumber: 3, + identifier: "CLOSED-3", + originKind: "harness_liveness_escalation", + originId: incidentKey, + }); + + const result = await heartbeat.reconcileIssueGraphLiveness(); + + expect(result.escalationsCreated).toBe(1); + expect(result.existingEscalations).toBe(0); + + const openEscalations = await db + .select() + .from(issues) + .where( + and( + eq(issues.companyId, companyId), + eq(issues.originKind, "harness_liveness_escalation"), + eq(issues.originId, incidentKey), + ), + ); + expect(openEscalations).toHaveLength(2); + const freshEscalation = openEscalations.find((issue) => issue.status !== "done"); + expect(freshEscalation).toMatchObject({ + parentId: blockedIssueId, + assigneeAgentId: managerId, + status: expect.stringMatching(/^(todo|in_progress|done)$/), + }); + + const blockers = await db + .select({ blockerIssueId: issueRelations.issueId }) + .from(issueRelations) + .where(eq(issueRelations.relatedIssueId, blockedIssueId)); + expect(blockers.some((row) => row.blockerIssueId === closedEscalationId)).toBe(false); + expect(blockers.some((row) => row.blockerIssueId === freshEscalation?.id)).toBe(true); + }); +}); diff --git a/server/src/__tests__/issue-liveness.test.ts b/server/src/__tests__/issue-liveness.test.ts new file mode 100644 index 0000000000..c675254514 --- /dev/null +++ b/server/src/__tests__/issue-liveness.test.ts @@ -0,0 +1,185 @@ +import { describe, expect, it } from "vitest"; +import { classifyIssueGraphLiveness } from "../services/issue-liveness.ts"; + +const companyId = "company-1"; +const managerId = "manager-1"; +const coderId = "coder-1"; +const blockerId = "blocker-1"; +const blockedId = "blocked-1"; + +function issue(overrides: Record = {}) { + return { + id: blockedId, + companyId, + identifier: "PAP-1703", + title: "Parent work", + status: "blocked", + assigneeAgentId: coderId, + assigneeUserId: null, + createdByAgentId: null, + createdByUserId: null, + executionState: null, + ...overrides, + }; +} + +function agent(overrides: Record = {}) { + return { + id: coderId, + companyId, + name: "Coder", + role: "engineer", + title: null, + status: "idle", + reportsTo: managerId, + ...overrides, + }; +} + +const manager = agent({ + id: managerId, + name: "CTO", + role: "cto", + reportsTo: null, +}); + +const blocks = [{ companyId, blockerIssueId: blockerId, blockedIssueId: blockedId }]; + +describe("issue graph liveness classifier", () => { + it("detects a PAP-1703-style blocked chain with an unassigned blocker and stable incident key", () => { + const findings = classifyIssueGraphLiveness({ + issues: [ + issue(), + issue({ + id: blockerId, + identifier: "PAP-1704", + title: "Missing unblock work", + status: "todo", + assigneeAgentId: null, + }), + ], + relations: blocks, + agents: [agent(), manager], + }); + + expect(findings).toHaveLength(1); + expect(findings[0]).toMatchObject({ + issueId: blockedId, + identifier: "PAP-1703", + state: "blocked_by_unassigned_issue", + recommendedOwnerAgentId: managerId, + dependencyPath: [ + expect.objectContaining({ issueId: blockedId }), + expect.objectContaining({ issueId: blockerId }), + ], + incidentKey: `harness_liveness:${companyId}:${blockedId}:blocked_by_unassigned_issue:${blockerId}`, + }); + }); + + it("does not flag a live blocked chain with an active assignee and wake path", () => { + const findings = classifyIssueGraphLiveness({ + issues: [ + issue(), + issue({ + id: blockerId, + identifier: "PAP-1704", + title: "Live unblock work", + status: "todo", + assigneeAgentId: "blocker-agent", + }), + ], + relations: blocks, + agents: [ + agent(), + manager, + agent({ id: "blocker-agent", name: "Blocker Agent", reportsTo: managerId }), + ], + queuedWakeRequests: [{ companyId, issueId: blockerId, agentId: "blocker-agent", status: "queued" }], + }); + + expect(findings).toEqual([]); + }); + + it("does not flag an unassigned blocker that already has an active execution path", () => { + const findings = classifyIssueGraphLiveness({ + issues: [ + issue(), + issue({ + id: blockerId, + identifier: "PAP-1704", + title: "Unassigned but already running", + status: "todo", + assigneeAgentId: null, + }), + ], + relations: blocks, + agents: [agent(), manager], + activeRuns: [{ companyId, issueId: blockerId, agentId: coderId, status: "running" }], + }); + + expect(findings).toEqual([]); + }); + + it("detects cancelled blockers and uninvokable blocker assignees deterministically", () => { + const cancelled = classifyIssueGraphLiveness({ + issues: [ + issue(), + issue({ + id: blockerId, + identifier: "PAP-1704", + title: "Cancelled unblock work", + status: "cancelled", + assigneeAgentId: "blocker-agent", + }), + ], + relations: blocks, + agents: [agent(), manager, agent({ id: "blocker-agent", name: "Paused", status: "paused" })], + }); + expect(cancelled[0]?.state).toBe("blocked_by_cancelled_issue"); + + const paused = classifyIssueGraphLiveness({ + issues: [ + issue(), + issue({ + id: blockerId, + identifier: "PAP-1704", + title: "Paused unblock work", + status: "todo", + assigneeAgentId: "blocker-agent", + }), + ], + relations: blocks, + agents: [agent(), manager, agent({ id: "blocker-agent", name: "Paused", status: "paused" })], + }); + expect(paused[0]?.state).toBe("blocked_by_uninvokable_assignee"); + }); + + it("detects invalid in_review execution participant", () => { + const findings = classifyIssueGraphLiveness({ + issues: [ + issue({ + status: "in_review", + executionState: { + status: "pending", + currentStageId: "stage-1", + currentStageIndex: 0, + currentStageType: "review", + currentParticipant: { type: "agent", agentId: "missing-agent" }, + returnAssignee: { type: "agent", agentId: coderId }, + completedStageIds: [], + lastDecisionId: null, + lastDecisionOutcome: null, + }, + }), + ], + relations: [], + agents: [agent(), manager], + }); + + expect(findings).toHaveLength(1); + expect(findings[0]).toMatchObject({ + state: "invalid_review_participant", + incidentKey: `harness_liveness:${companyId}:${blockedId}:invalid_review_participant:missing-agent`, + }); + }); +}); diff --git a/server/src/index.ts b/server/src/index.ts index a788364a7c..8645289d93 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -674,6 +674,12 @@ export async function startServer(): Promise { logger.warn({ ...reconciled }, "startup stranded-issue reconciliation changed assigned issue state"); } }) + .then(async () => { + const reconciled = await heartbeat.reconcileIssueGraphLiveness(); + if (reconciled.escalationsCreated > 0) { + logger.warn({ ...reconciled }, "startup issue-graph liveness reconciliation created escalations"); + } + }) .catch((err) => { logger.error({ err }, "startup heartbeat recovery failed"); }); @@ -715,6 +721,12 @@ export async function startServer(): Promise { logger.warn({ ...reconciled }, "periodic stranded-issue reconciliation changed assigned issue state"); } }) + .then(async () => { + const reconciled = await heartbeat.reconcileIssueGraphLiveness(); + if (reconciled.escalationsCreated > 0) { + logger.warn({ ...reconciled }, "periodic issue-graph liveness reconciliation created escalations"); + } + }) .catch((err) => { logger.error({ err }, "periodic heartbeat recovery failed"); }); diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 5e4d14ff50..6b95be1903 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -3,7 +3,7 @@ import path from "node:path"; import { execFile as execFileCallback } from "node:child_process"; import { promisify } from "node:util"; import { randomUUID } from "node:crypto"; -import { and, asc, desc, eq, getTableColumns, gt, inArray, isNull, or, sql } from "drizzle-orm"; +import { and, asc, desc, eq, getTableColumns, gt, inArray, isNull, notInArray, or, sql } from "drizzle-orm"; import type { Db } from "@paperclipai/db"; import { AGENT_DEFAULT_MAX_CONCURRENT_RUNS, @@ -25,6 +25,7 @@ import { heartbeatRunEvents, heartbeatRuns, issueComments, + issueRelations, issues, issueWorkProducts, projects, @@ -61,6 +62,10 @@ import { classifyRunLiveness, type RunLivenessClassificationInput, } from "./run-liveness.js"; +import { + classifyIssueGraphLiveness, + type IssueLivenessFinding, +} from "./issue-liveness.js"; import { logActivity, publishPluginDomainEvent, type LogActivityInput } from "./activity-log.js"; import { buildWorkspaceReadyComment, @@ -3830,6 +3835,363 @@ export function heartbeatService(db: Db) { return result; } + function issueIdFromRunContext(contextSnapshot: unknown) { + const context = parseObject(contextSnapshot); + return readNonEmptyString(context.issueId) ?? readNonEmptyString(context.taskId); + } + + function issueIdFromWakePayload(payload: unknown) { + const parsed = parseObject(payload); + const nestedContext = parseObject(parsed[DEFERRED_WAKE_CONTEXT_KEY]); + return readNonEmptyString(parsed.issueId) ?? + readNonEmptyString(nestedContext.issueId) ?? + readNonEmptyString(nestedContext.taskId); + } + + async function collectIssueGraphLivenessFindings() { + const [issueRows, relationRows, agentRows, activeRunRows, wakeRows] = await Promise.all([ + db + .select({ + id: issues.id, + companyId: issues.companyId, + identifier: issues.identifier, + title: issues.title, + status: issues.status, + projectId: issues.projectId, + goalId: issues.goalId, + parentId: issues.parentId, + assigneeAgentId: issues.assigneeAgentId, + assigneeUserId: issues.assigneeUserId, + createdByAgentId: issues.createdByAgentId, + createdByUserId: issues.createdByUserId, + executionState: issues.executionState, + }) + .from(issues) + .where(isNull(issues.hiddenAt)), + db + .select({ + companyId: issueRelations.companyId, + blockerIssueId: issueRelations.issueId, + blockedIssueId: issueRelations.relatedIssueId, + }) + .from(issueRelations) + .where(eq(issueRelations.type, "blocks")), + db + .select({ + id: agents.id, + companyId: agents.companyId, + name: agents.name, + role: agents.role, + title: agents.title, + status: agents.status, + reportsTo: agents.reportsTo, + }) + .from(agents), + db + .select({ + companyId: heartbeatRuns.companyId, + agentId: heartbeatRuns.agentId, + status: heartbeatRuns.status, + contextSnapshot: heartbeatRuns.contextSnapshot, + }) + .from(heartbeatRuns) + .where(inArray(heartbeatRuns.status, [...ACTIVE_HEARTBEAT_RUN_STATUSES])), + db + .select({ + companyId: agentWakeupRequests.companyId, + agentId: agentWakeupRequests.agentId, + status: agentWakeupRequests.status, + payload: agentWakeupRequests.payload, + }) + .from(agentWakeupRequests) + .where(inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"])), + ]); + + return classifyIssueGraphLiveness({ + issues: issueRows, + relations: relationRows, + agents: agentRows, + activeRuns: activeRunRows.map((row) => ({ + companyId: row.companyId, + agentId: row.agentId, + status: row.status, + issueId: issueIdFromRunContext(row.contextSnapshot), + })), + queuedWakeRequests: wakeRows.map((row) => ({ + companyId: row.companyId, + agentId: row.agentId, + status: row.status, + issueId: issueIdFromWakePayload(row.payload), + })), + }); + } + + async function findOpenLivenessEscalation(companyId: string, incidentKey: string) { + return db + .select() + .from(issues) + .where( + and( + eq(issues.companyId, companyId), + eq(issues.originKind, "harness_liveness_escalation"), + eq(issues.originId, incidentKey), + isNull(issues.hiddenAt), + notInArray(issues.status, ["done", "cancelled"]), + ), + ) + .limit(1) + .then((rows) => rows[0] ?? null); + } + + async function existingBlockerIssueIds(companyId: string, issueId: string) { + return db + .select({ blockerIssueId: issueRelations.issueId }) + .from(issueRelations) + .where( + and( + eq(issueRelations.companyId, companyId), + eq(issueRelations.relatedIssueId, issueId), + eq(issueRelations.type, "blocks"), + ), + ) + .then((rows) => rows.map((row) => row.blockerIssueId)); + } + + function formatDependencyPath(finding: IssueLivenessFinding) { + return finding.dependencyPath + .map((entry) => entry.identifier ?? entry.issueId) + .join(" -> "); + } + + function buildLivenessEscalationDescription(finding: IssueLivenessFinding) { + return [ + "Paperclip detected a harness-level issue graph liveness incident.", + "", + `- Incident key: \`${finding.incidentKey}\``, + `- Finding: \`${finding.state}\``, + `- Dependency path: ${formatDependencyPath(finding)}`, + `- Reason: ${finding.reason}`, + `- Requested action: ${finding.recommendedAction}`, + "", + "Resolve the blocked chain, then mark this escalation issue done so the original issue can resume when all blockers are cleared.", + ].join("\n"); + } + + function buildLivenessOriginalIssueComment(finding: IssueLivenessFinding, escalation: typeof issues.$inferSelect) { + return [ + "Paperclip detected a harness-level liveness incident in this issue's dependency graph.", + "", + `- Escalation issue: ${escalation.identifier ?? escalation.id}`, + `- Incident key: \`${finding.incidentKey}\``, + `- Finding: \`${finding.state}\``, + `- Dependency path: ${formatDependencyPath(finding)}`, + `- Reason: ${finding.reason}`, + `- Manager action requested: ${finding.recommendedAction}`, + "", + "This issue now keeps its existing blockers and is also blocked by the escalation issue so dependency wakeups remain explicit.", + ].join("\n"); + } + + async function resolveEscalationOwnerAgentId( + finding: IssueLivenessFinding, + issue: typeof issues.$inferSelect, + ) { + const candidates = [ + finding.recommendedOwnerAgentId, + ...finding.recommendedOwnerCandidateAgentIds, + ].filter((candidate): candidate is string => Boolean(candidate)); + + for (const candidate of [...new Set(candidates)]) { + const budgetBlock = await budgets.getInvocationBlock(issue.companyId, candidate, { + issueId: issue.id, + projectId: issue.projectId, + }); + if (!budgetBlock) return candidate; + } + + return null; + } + + async function ensureIssueBlockedByEscalation(input: { + issue: typeof issues.$inferSelect; + escalationIssueId: string; + finding: IssueLivenessFinding; + runId?: string | null; + }) { + const blockerIds = await existingBlockerIssueIds(input.issue.companyId, input.issue.id); + const nextBlockerIds = [...new Set([...blockerIds, input.escalationIssueId])]; + const update: Partial & { blockedByIssueIds: string[] } = { + blockedByIssueIds: nextBlockerIds, + }; + if (input.issue.status !== "blocked") { + update.status = "blocked"; + } + + const updated = await issuesSvc.update(input.issue.id, update); + if (!updated) return null; + + await logActivity(db, { + companyId: input.issue.companyId, + actorType: "system", + actorId: "system", + agentId: null, + runId: input.runId ?? null, + action: "issue.blockers.updated", + entityType: "issue", + entityId: input.issue.id, + details: { + source: "heartbeat.reconcile_issue_graph_liveness", + incidentKey: input.finding.incidentKey, + findingState: input.finding.state, + blockerIssueIds: nextBlockerIds, + escalationIssueId: input.escalationIssueId, + status: update.status ?? input.issue.status, + previousStatus: input.issue.status, + }, + }); + + return updated; + } + + async function createIssueGraphLivenessEscalation(input: { + finding: IssueLivenessFinding; + runId?: string | null; + }) { + const issue = await db + .select() + .from(issues) + .where(eq(issues.id, input.finding.issueId)) + .then((rows) => rows[0] ?? null); + if (!issue || issue.companyId !== input.finding.companyId) return { kind: "skipped" as const }; + + const existing = await findOpenLivenessEscalation(issue.companyId, input.finding.incidentKey); + if (existing) { + await ensureIssueBlockedByEscalation({ + issue, + escalationIssueId: existing.id, + finding: input.finding, + runId: input.runId ?? null, + }); + return { kind: "existing" as const, escalationIssueId: existing.id }; + } + + const ownerAgentId = await resolveEscalationOwnerAgentId(input.finding, issue); + if (!ownerAgentId) return { kind: "skipped" as const }; + + const escalation = await issuesSvc.create(issue.companyId, { + title: `Unblock liveness incident for ${issue.identifier ?? issue.title}`, + description: buildLivenessEscalationDescription(input.finding), + status: "todo", + priority: "high", + parentId: issue.id, + projectId: issue.projectId, + goalId: issue.goalId, + assigneeAgentId: ownerAgentId, + originKind: "harness_liveness_escalation", + originId: input.finding.incidentKey, + billingCode: issue.billingCode, + inheritExecutionWorkspaceFromIssueId: issue.id, + }); + + await ensureIssueBlockedByEscalation({ + issue, + escalationIssueId: escalation.id, + finding: input.finding, + runId: input.runId ?? null, + }); + + await issuesSvc.addComment( + issue.id, + buildLivenessOriginalIssueComment(input.finding, escalation), + { runId: input.runId ?? null }, + ); + + await logActivity(db, { + companyId: issue.companyId, + actorType: "system", + actorId: "system", + agentId: ownerAgentId, + runId: input.runId ?? null, + action: "issue.harness_liveness_escalation_created", + entityType: "issue", + entityId: escalation.id, + details: { + source: "heartbeat.reconcile_issue_graph_liveness", + incidentKey: input.finding.incidentKey, + findingState: input.finding.state, + sourceIssueId: issue.id, + sourceIdentifier: issue.identifier, + escalationIssueId: escalation.id, + escalationIdentifier: escalation.identifier, + dependencyPath: input.finding.dependencyPath, + }, + }); + + const wake = await enqueueWakeup(ownerAgentId, { + source: "assignment", + triggerDetail: "system", + reason: "issue_assigned", + payload: { + issueId: escalation.id, + sourceIssueId: issue.id, + incidentKey: input.finding.incidentKey, + }, + requestedByActorType: "system", + requestedByActorId: null, + contextSnapshot: { + issueId: escalation.id, + taskId: escalation.id, + wakeReason: "issue_assigned", + source: "harness_liveness_escalation", + sourceIssueId: issue.id, + incidentKey: input.finding.incidentKey, + }, + }); + + logger.warn({ + incidentKey: input.finding.incidentKey, + findingState: input.finding.state, + sourceIssueId: issue.id, + escalationIssueId: escalation.id, + ownerAgentId, + wakeupRunId: wake?.id ?? null, + }, "created issue graph liveness escalation"); + + return { kind: "created" as const, escalationIssueId: escalation.id }; + } + + async function reconcileIssueGraphLiveness(opts?: { runId?: string | null }) { + const findings = await collectIssueGraphLivenessFindings(); + const result = { + findings: findings.length, + escalationsCreated: 0, + existingEscalations: 0, + skipped: 0, + issueIds: [] as string[], + escalationIssueIds: [] as string[], + }; + + for (const finding of findings) { + const escalation = await createIssueGraphLivenessEscalation({ + finding, + runId: opts?.runId ?? null, + }); + if (escalation.kind === "created") { + result.escalationsCreated += 1; + result.issueIds.push(finding.issueId); + result.escalationIssueIds.push(escalation.escalationIssueId); + } else if (escalation.kind === "existing") { + result.existingEscalations += 1; + result.issueIds.push(finding.issueId); + result.escalationIssueIds.push(escalation.escalationIssueId); + } else { + result.skipped += 1; + } + } + + return result; + } + async function updateRuntimeState( agent: typeof agents.$inferSelect, run: typeof heartbeatRuns.$inferSelect, @@ -5590,7 +5952,10 @@ export function heartbeatService(db: Db) { }); if (outcome.kind === "deferred" || outcome.kind === "skipped") return null; - if (outcome.kind === "coalesced") return outcome.run; + if (outcome.kind === "coalesced") { + await startNextQueuedRunForAgent(agent.id); + return outcome.run; + } const newRun = outcome.run; publishLiveEvent({ @@ -6164,6 +6529,8 @@ export function heartbeatService(db: Db) { reconcileStrandedAssignedIssues, + reconcileIssueGraphLiveness, + tickTimers: async (now = new Date()) => { const allAgents = await db.select().from(agents); let checked = 0; diff --git a/server/src/services/index.ts b/server/src/services/index.ts index 6536f893ad..5ee4d2bafd 100644 --- a/server/src/services/index.ts +++ b/server/src/services/index.ts @@ -29,6 +29,7 @@ export { routineService } from "./routines.js"; export { costService } from "./costs.js"; export { financeService } from "./finance.js"; export { heartbeatService } from "./heartbeat.js"; +export { classifyIssueGraphLiveness, type IssueLivenessFinding } from "./issue-liveness.js"; export { dashboardService } from "./dashboard.js"; export { sidebarBadgeService } from "./sidebar-badges.js"; export { sidebarPreferenceService } from "./sidebar-preferences.js"; diff --git a/server/src/services/issue-liveness.ts b/server/src/services/issue-liveness.ts new file mode 100644 index 0000000000..c7cc58f6cf --- /dev/null +++ b/server/src/services/issue-liveness.ts @@ -0,0 +1,324 @@ +export type IssueLivenessSeverity = "warning" | "critical"; + +export type IssueLivenessState = + | "blocked_by_unassigned_issue" + | "blocked_by_uninvokable_assignee" + | "blocked_by_cancelled_issue" + | "invalid_review_participant"; + +export interface IssueLivenessIssueInput { + id: string; + companyId: string; + identifier: string | null; + title: string; + status: string; + projectId?: string | null; + goalId?: string | null; + parentId?: string | null; + assigneeAgentId?: string | null; + assigneeUserId?: string | null; + createdByAgentId?: string | null; + createdByUserId?: string | null; + executionState?: Record | null; +} + +export interface IssueLivenessRelationInput { + companyId: string; + blockerIssueId: string; + blockedIssueId: string; +} + +export interface IssueLivenessAgentInput { + id: string; + companyId: string; + name: string; + role: string; + title?: string | null; + status: string; + reportsTo?: string | null; +} + +export interface IssueLivenessExecutionPathInput { + companyId: string; + issueId: string | null; + agentId?: string | null; + status: string; +} + +export interface IssueLivenessDependencyPathEntry { + issueId: string; + identifier: string | null; + title: string; + status: string; +} + +export interface IssueLivenessFinding { + issueId: string; + companyId: string; + identifier: string | null; + state: IssueLivenessState; + severity: IssueLivenessSeverity; + reason: string; + dependencyPath: IssueLivenessDependencyPathEntry[]; + recommendedOwnerAgentId: string | null; + recommendedOwnerCandidateAgentIds: string[]; + recommendedAction: string; + incidentKey: string; +} + +export interface IssueGraphLivenessInput { + issues: IssueLivenessIssueInput[]; + relations: IssueLivenessRelationInput[]; + agents: IssueLivenessAgentInput[]; + activeRuns?: IssueLivenessExecutionPathInput[]; + queuedWakeRequests?: IssueLivenessExecutionPathInput[]; +} + +const INVOKABLE_AGENT_STATUSES = new Set(["active", "idle", "running", "error"]); +const BLOCKING_AGENT_STATUSES = new Set(["paused", "terminated", "pending_approval"]); + +function issueLabel(issue: IssueLivenessIssueInput) { + return issue.identifier ?? issue.id; +} + +function pathEntry(issue: IssueLivenessIssueInput): IssueLivenessDependencyPathEntry { + return { + issueId: issue.id, + identifier: issue.identifier, + title: issue.title, + status: issue.status, + }; +} + +function isInvokableAgent(agent: IssueLivenessAgentInput | null | undefined) { + return Boolean(agent && INVOKABLE_AGENT_STATUSES.has(agent.status)); +} + +function hasActiveExecutionPath( + companyId: string, + issueId: string, + activeRuns: IssueLivenessExecutionPathInput[], + queuedWakeRequests: IssueLivenessExecutionPathInput[], +) { + return [...activeRuns, ...queuedWakeRequests].some( + (entry) => entry.companyId === companyId && entry.issueId === issueId, + ); +} + +function readPrincipalAgentId(principal: unknown): string | null { + if (!principal || typeof principal !== "object") return null; + const value = principal as Record; + return value.type === "agent" && typeof value.agentId === "string" && value.agentId.length > 0 + ? value.agentId + : null; +} + +function principalIsResolvableUser(principal: unknown): boolean { + if (!principal || typeof principal !== "object") return false; + const value = principal as Record; + return value.type === "user" && typeof value.userId === "string" && value.userId.length > 0; +} + +function agentChainCandidates( + startAgentId: string | null | undefined, + agentsById: Map, + companyId: string, +) { + const candidates: string[] = []; + const seen = new Set(); + let current = startAgentId ? agentsById.get(startAgentId) : null; + + while (current?.reportsTo) { + if (seen.has(current.reportsTo)) break; + seen.add(current.reportsTo); + const manager = agentsById.get(current.reportsTo); + if (!manager || manager.companyId !== companyId) break; + if (isInvokableAgent(manager)) candidates.push(manager.id); + current = manager; + } + + return candidates; +} + +function fallbackExecutiveCandidates(agents: IssueLivenessAgentInput[], companyId: string) { + const active = agents.filter((agent) => agent.companyId === companyId && isInvokableAgent(agent)); + const executive = active.filter((agent) => { + const haystack = `${agent.role} ${agent.title ?? ""} ${agent.name}`.toLowerCase(); + return /\b(cto|chief technology|ceo|chief executive)\b/.test(haystack); + }); + const roots = active.filter((agent) => !agent.reportsTo); + return [...executive, ...roots, ...active].map((agent) => agent.id); +} + +function ownerCandidatesForIssue( + issue: IssueLivenessIssueInput, + agents: IssueLivenessAgentInput[], + agentsById: Map, +) { + const candidates = [ + ...agentChainCandidates(issue.assigneeAgentId, agentsById, issue.companyId), + ...agentChainCandidates(issue.createdByAgentId, agentsById, issue.companyId), + ...fallbackExecutiveCandidates(agents, issue.companyId), + ]; + return [...new Set(candidates)]; +} + +function incidentKey(input: { + companyId: string; + issueId: string; + state: IssueLivenessState; + blockerIssueId?: string | null; + participantAgentId?: string | null; +}) { + return [ + "harness_liveness", + input.companyId, + input.issueId, + input.state, + input.blockerIssueId ?? input.participantAgentId ?? "none", + ].join(":"); +} + +function finding(input: { + issue: IssueLivenessIssueInput; + state: IssueLivenessState; + severity?: IssueLivenessSeverity; + reason: string; + dependencyPath: IssueLivenessIssueInput[]; + recommendedOwnerCandidateAgentIds: string[]; + recommendedAction: string; + blockerIssueId?: string | null; + participantAgentId?: string | null; +}): IssueLivenessFinding { + return { + issueId: input.issue.id, + companyId: input.issue.companyId, + identifier: input.issue.identifier, + state: input.state, + severity: input.severity ?? "critical", + reason: input.reason, + dependencyPath: input.dependencyPath.map(pathEntry), + recommendedOwnerAgentId: input.recommendedOwnerCandidateAgentIds[0] ?? null, + recommendedOwnerCandidateAgentIds: input.recommendedOwnerCandidateAgentIds, + recommendedAction: input.recommendedAction, + incidentKey: incidentKey({ + companyId: input.issue.companyId, + issueId: input.issue.id, + state: input.state, + blockerIssueId: input.blockerIssueId, + participantAgentId: input.participantAgentId, + }), + }; +} + +export function classifyIssueGraphLiveness(input: IssueGraphLivenessInput): IssueLivenessFinding[] { + const issuesById = new Map(input.issues.map((issue) => [issue.id, issue])); + const agentsById = new Map(input.agents.map((agent) => [agent.id, agent])); + const blockersByBlockedIssueId = new Map(); + const findings: IssueLivenessFinding[] = []; + const activeRuns = input.activeRuns ?? []; + const queuedWakeRequests = input.queuedWakeRequests ?? []; + + for (const relation of input.relations) { + const list = blockersByBlockedIssueId.get(relation.blockedIssueId) ?? []; + list.push(relation); + blockersByBlockedIssueId.set(relation.blockedIssueId, list); + } + + for (const issue of input.issues) { + const ownerCandidates = ownerCandidatesForIssue(issue, input.agents, agentsById); + + if (issue.status === "blocked") { + const relations = blockersByBlockedIssueId.get(issue.id) ?? []; + for (const relation of relations) { + if (relation.companyId !== issue.companyId) continue; + const blocker = issuesById.get(relation.blockerIssueId); + if (!blocker || blocker.companyId !== issue.companyId || blocker.status === "done") continue; + + if (blocker.status === "cancelled") { + findings.push(finding({ + issue, + state: "blocked_by_cancelled_issue", + reason: `${issueLabel(issue)} is still blocked by cancelled issue ${issueLabel(blocker)}.`, + dependencyPath: [issue, blocker], + recommendedOwnerCandidateAgentIds: ownerCandidates, + recommendedAction: + `Inspect ${issueLabel(blocker)} and either remove it from ${issueLabel(issue)}'s blockers or replace it with an actionable unblock issue.`, + blockerIssueId: blocker.id, + })); + continue; + } + + if (!blocker.assigneeAgentId && !blocker.assigneeUserId) { + if (hasActiveExecutionPath(issue.companyId, blocker.id, activeRuns, queuedWakeRequests)) continue; + findings.push(finding({ + issue, + state: "blocked_by_unassigned_issue", + reason: `${issueLabel(issue)} is blocked by unassigned issue ${issueLabel(blocker)} with no user owner.`, + dependencyPath: [issue, blocker], + recommendedOwnerCandidateAgentIds: ownerCandidates, + recommendedAction: + `Assign ${issueLabel(blocker)} to an owner who can complete it, or remove it from ${issueLabel(issue)}'s blockers if it is no longer required.`, + blockerIssueId: blocker.id, + })); + continue; + } + + if (!blocker.assigneeAgentId) continue; + if (hasActiveExecutionPath(issue.companyId, blocker.id, activeRuns, queuedWakeRequests)) continue; + + const blockerAgent = agentsById.get(blocker.assigneeAgentId); + if (!blockerAgent || blockerAgent.companyId !== issue.companyId || BLOCKING_AGENT_STATUSES.has(blockerAgent.status)) { + findings.push(finding({ + issue, + state: "blocked_by_uninvokable_assignee", + reason: blockerAgent + ? `${issueLabel(issue)} is blocked by ${issueLabel(blocker)}, but its assignee is ${blockerAgent.status}.` + : `${issueLabel(issue)} is blocked by ${issueLabel(blocker)}, but its assignee no longer exists.`, + dependencyPath: [issue, blocker], + recommendedOwnerCandidateAgentIds: ownerCandidates, + recommendedAction: + `Review ${issueLabel(blocker)} and assign it to an active owner or replace the blocker with an actionable issue.`, + blockerIssueId: blocker.id, + })); + } + } + } + + if (issue.status !== "in_review" || !issue.executionState) continue; + const participant = issue.executionState.currentParticipant; + const participantAgentId = readPrincipalAgentId(participant); + if (participantAgentId) { + const participantAgent = agentsById.get(participantAgentId); + if (!isInvokableAgent(participantAgent) || participantAgent?.companyId !== issue.companyId) { + findings.push(finding({ + issue, + state: "invalid_review_participant", + reason: participantAgent + ? `${issueLabel(issue)} is in review, but current participant agent is ${participantAgent.status}.` + : `${issueLabel(issue)} is in review, but current participant agent cannot be resolved.`, + dependencyPath: [issue], + recommendedOwnerCandidateAgentIds: ownerCandidates, + recommendedAction: + `Repair ${issueLabel(issue)}'s review participant or return the issue to an active assignee with a clear change request.`, + participantAgentId, + })); + } + continue; + } + + if (!principalIsResolvableUser(participant)) { + findings.push(finding({ + issue, + state: "invalid_review_participant", + reason: `${issueLabel(issue)} is in review, but its current participant cannot be resolved.`, + dependencyPath: [issue], + recommendedOwnerCandidateAgentIds: ownerCandidates, + recommendedAction: + `Repair ${issueLabel(issue)}'s review participant or return the issue to an active assignee with a clear change request.`, + })); + } + } + + return findings; +}