Compare commits

...

5 Commits

Author SHA1 Message Date
Dotta
1cdbf9e500 Simplify capped live run polling
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-30 15:55:19 -05:00
Dotta
b192440642 Document liveness recovery coverage scope
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-30 15:37:35 -05:00
Dotta
c66bcd38f8 Split advanced liveness behavior from reliability PR
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-30 15:20:19 -05:00
Dotta
5684e17b79 Address reliability review findings
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-30 13:47:42 -05:00
Dotta
1686cd47ee Harden issue recovery reliability
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-30 13:38:46 -05:00
7 changed files with 210 additions and 32 deletions

View File

@@ -10,7 +10,6 @@ const mockHeartbeatService = vi.hoisted(() => ({
buildRunOutputSilence: vi.fn(),
getRunIssueSummary: vi.fn(),
getActiveRunIssueSummaryForAgent: vi.fn(),
buildRunOutputSilence: vi.fn(),
getRunLogAccess: vi.fn(),
readLog: vi.fn(),
}));
@@ -71,7 +70,7 @@ function registerModuleMocks() {
}));
}
async function createApp() {
async function createApp(db: Record<string, unknown> = {}) {
const [{ agentRoutes }, { errorHandler }] = await Promise.all([
vi.importActual<typeof import("../routes/agents.js")>("../routes/agents.js"),
vi.importActual<typeof import("../middleware/index.js")>("../middleware/index.js"),
@@ -88,11 +87,32 @@ async function createApp() {
};
next();
});
app.use("/api", agentRoutes({} as any));
app.use("/api", agentRoutes(db as any));
app.use(errorHandler);
return app;
}
function createLiveRunsDbStub(rows: Array<Record<string, unknown>>) {
const limit = vi.fn(async (value: number) => rows.slice(0, value));
const orderedQuery = {
limit,
then: (resolve: (value: Array<Record<string, unknown>>) => unknown) => Promise.resolve(rows).then(resolve),
};
const query = {
from: vi.fn().mockReturnThis(),
innerJoin: vi.fn().mockReturnThis(),
where: vi.fn().mockReturnThis(),
orderBy: vi.fn().mockReturnValue(orderedQuery),
};
return {
db: {
select: vi.fn().mockReturnValue(query),
},
limit,
};
}
async function requestApp(
app: express.Express,
buildRequest: (baseUrl: string) => request.Test,
@@ -284,4 +304,81 @@ describe("agent live run routes", () => {
nextOffset: 5,
});
});
it("caps company live run polling by default", async () => {
const rows = Array.from({ length: 75 }, (_, index) => ({
id: `run-${index}`,
companyId: "company-1",
status: "running",
invocationSource: "on_demand",
triggerDetail: "manual",
startedAt: new Date("2026-04-10T09:30:00.000Z"),
finishedAt: null,
createdAt: new Date(`2026-04-10T09:${String(index % 60).padStart(2, "0")}:00.000Z`),
agentId: "agent-1",
agentName: "Builder",
adapterType: "codex_local",
logBytes: 0,
livenessState: "healthy",
livenessReason: null,
continuationAttempt: 0,
lastUsefulActionAt: null,
nextAction: null,
lastOutputAt: null,
lastOutputSeq: null,
lastOutputStream: null,
lastOutputBytes: 0,
processStartedAt: null,
issueId: "issue-1",
}));
const { db, limit } = createLiveRunsDbStub(rows);
const res = await requestApp(
await createApp(db),
(baseUrl) => request(baseUrl).get("/api/companies/company-1/live-runs"),
);
expect(res.status, JSON.stringify(res.body)).toBe(200);
expect(limit).toHaveBeenCalledWith(50);
expect(res.body).toHaveLength(50);
expect(mockHeartbeatService.buildRunOutputSilence).toHaveBeenCalledTimes(50);
});
it("treats explicit zero live run limits as the capped default", async () => {
const rows = Array.from({ length: 75 }, (_, index) => ({
id: `run-${index}`,
companyId: "company-1",
status: "running",
invocationSource: "on_demand",
triggerDetail: "manual",
startedAt: new Date("2026-04-10T09:30:00.000Z"),
finishedAt: null,
createdAt: new Date(`2026-04-10T09:${String(index % 60).padStart(2, "0")}:00.000Z`),
agentId: "agent-1",
agentName: "Builder",
adapterType: "codex_local",
logBytes: 0,
livenessState: "healthy",
livenessReason: null,
continuationAttempt: 0,
lastUsefulActionAt: null,
nextAction: null,
lastOutputAt: null,
lastOutputSeq: null,
lastOutputStream: null,
lastOutputBytes: 0,
processStartedAt: null,
issueId: "issue-1",
}));
const { db, limit } = createLiveRunsDbStub(rows);
const res = await requestApp(
await createApp(db),
(baseUrl) => request(baseUrl).get("/api/companies/company-1/live-runs?limit=0&minCount=0"),
);
expect(res.status, JSON.stringify(res.body)).toBe(200);
expect(limit).toHaveBeenCalledWith(50);
expect(res.body).toHaveLength(50);
});
});

View File

@@ -289,10 +289,23 @@ describeEmbeddedPostgres("heartbeat issue graph liveness escalation", () => {
const heartbeat = heartbeatService(db);
const first = await heartbeat.reconcileIssueGraphLiveness();
const second = await heartbeat.reconcileIssueGraphLiveness();
expect(first.escalationsCreated).toBe(1);
const [sourceAfterFirst] = await db
.select({ updatedAt: issues.updatedAt })
.from(issues)
.where(eq(issues.id, blockedIssueId));
const eventsAfterFirst = await db.select().from(activityLog).where(eq(activityLog.companyId, companyId));
expect(eventsAfterFirst.filter((event) => event.action === "issue.blockers.updated")).toHaveLength(1);
const second = await heartbeat.reconcileIssueGraphLiveness();
expect(second.escalationsCreated).toBe(0);
const [sourceAfterSecond] = await db
.select({ updatedAt: issues.updatedAt })
.from(issues)
.where(eq(issues.id, blockedIssueId));
expect(sourceAfterSecond?.updatedAt.getTime()).toBe(sourceAfterFirst?.updatedAt.getTime());
const escalations = await db
.select()
@@ -345,7 +358,7 @@ describeEmbeddedPostgres("heartbeat issue graph liveness escalation", () => {
projectWorkspaceSourceIssueId: blockerIssueId,
},
});
expect(events.some((event) => event.action === "issue.blockers.updated")).toBe(true);
expect(events.filter((event) => event.action === "issue.blockers.updated")).toHaveLength(1);
});
it("skips budget-blocked direct owners and assigns recovery to the manager fallback", async () => {

View File

@@ -76,6 +76,9 @@ describeEmbeddedPostgres("issue blocker attention", () => {
status: string;
parentId?: string | null;
assigneeAgentId?: string | null;
originKind?: string | null;
originId?: string | null;
originFingerprint?: string | null;
}) {
const id = input.id ?? randomUUID();
await db.insert(issues).values({
@@ -87,6 +90,9 @@ describeEmbeddedPostgres("issue blocker attention", () => {
priority: "medium",
parentId: input.parentId ?? null,
assigneeAgentId: input.assigneeAgentId ?? null,
originKind: input.originKind ?? "manual",
originId: input.originId ?? null,
originFingerprint: input.originFingerprint ?? "default",
});
return id;
}
@@ -356,6 +362,52 @@ describeEmbeddedPostgres("issue blocker attention", () => {
});
});
it("treats open liveness escalation blockers as covered waiting paths", async () => {
const { companyId, agentId } = await createCompany("PBL");
const parentId = await insertIssue({ companyId, identifier: "PBL-1", title: "Parent", status: "blocked" });
const cancelledLeafId = await insertIssue({
companyId,
identifier: "PBL-2",
title: "Cancelled blocker",
status: "cancelled",
assigneeAgentId: agentId,
});
const incidentKey = [
"harness_liveness",
companyId,
parentId,
"blocked_by_cancelled_issue",
cancelledLeafId,
].join(":");
const escalationId = await insertIssue({
companyId,
identifier: "PBL-3",
title: "Liveness escalation",
status: "todo",
assigneeAgentId: agentId,
originKind: "harness_liveness_escalation",
originId: incidentKey,
originFingerprint: [
"harness_liveness_leaf",
companyId,
"blocked_by_cancelled_issue",
cancelledLeafId,
].join(":"),
});
await block({ companyId, blockerIssueId: cancelledLeafId, blockedIssueId: parentId });
await block({ companyId, blockerIssueId: escalationId, blockedIssueId: parentId });
const parent = (await svc.list(companyId, { status: "blocked,todo" })).find((issue) => issue.id === parentId);
expect(parent?.blockerAttention).toMatchObject({
state: "covered",
reason: "active_dependency",
unresolvedBlockerCount: 2,
coveredBlockerCount: 2,
attentionBlockerCount: 0,
});
});
it("does not treat a scheduled retry as actively covered work", async () => {
const { companyId, agentId } = await createCompany("PBY");
const parentId = await insertIssue({ companyId, identifier: "PBY-1", title: "Parent", status: "blocked" });

View File

@@ -98,7 +98,8 @@ function readRunLogLimitBytes(value: unknown) {
function readLiveRunsQueryInt(value: unknown, max: number, fallback = 0) {
const parsed = Number(value);
if (!Number.isFinite(parsed)) return fallback;
return Math.max(0, Math.min(max, Math.trunc(parsed)));
if (parsed <= 0) return fallback;
return Math.min(max, Math.trunc(parsed));
}
export function agentRoutes(
@@ -2703,8 +2704,8 @@ export function agentRoutes(
const companyId = req.params.companyId as string;
assertCompanyAccess(req, companyId);
const minCount = readLiveRunsQueryInt(req.query.minCount, 50);
const limit = readLiveRunsQueryInt(req.query.limit, 50);
const minCount = readLiveRunsQueryInt(req.query.minCount, 50, 50);
const limit = readLiveRunsQueryInt(req.query.limit, 50, 50);
const columns = {
id: heartbeatRuns.id,
@@ -2744,8 +2745,8 @@ export function agentRoutes(
)
.orderBy(desc(heartbeatRuns.createdAt));
const liveRuns = limit > 0 ? await liveRunsQuery.limit(limit) : await liveRunsQuery;
const targetRunCount = limit > 0 ? Math.min(minCount, limit) : minCount;
const liveRuns = await liveRunsQuery.limit(limit);
const targetRunCount = Math.min(minCount, limit);
if (targetRunCount > 0 && liveRuns.length < targetRunCount) {
const activeIds = liveRuns.map((r) => r.id);

View File

@@ -52,6 +52,7 @@ import {
issueTreeControlService,
type ActiveIssueTreePauseHoldGate,
} from "./issue-tree-control.js";
import { parseIssueGraphLivenessIncidentKey } from "./recovery/origins.js";
const ALL_ISSUE_STATUSES = ["backlog", "todo", "in_progress", "in_review", "blocked", "done", "cancelled"];
const MAX_ISSUE_COMMENT_PAGE_LIMIT = 500;
@@ -1174,12 +1175,12 @@ async function listIssueBlockerAttentionMap(
}
}
const reviewNodeIds = [...nodesById.values()]
.filter((node) => node.status === "in_review")
const explicitWaitCandidateIds = [...nodesById.values()]
.filter((node) => node.status !== "done")
.map((node) => node.id);
const explicitWaitingIssueIds = new Set<string>();
if (reviewNodeIds.length > 0) {
for (const chunk of chunkList(reviewNodeIds, ISSUE_LIST_RELATED_QUERY_CHUNK_SIZE)) {
if (explicitWaitCandidateIds.length > 0) {
for (const chunk of chunkList(explicitWaitCandidateIds, ISSUE_LIST_RELATED_QUERY_CHUNK_SIZE)) {
const interactionRows: Array<{ issueId: string }> = await dbOrTx
.select({ issueId: issueThreadInteractions.issueId })
.from(issueThreadInteractions)
@@ -1204,22 +1205,28 @@ async function listIssueBlockerAttentionMap(
),
);
for (const row of approvalRows) explicitWaitingIssueIds.add(row.issueId);
}
const recoveryRows: Array<{ originId: string | null }> = await dbOrTx
.select({ originId: issues.originId })
.from(issues)
.where(
and(
eq(issues.companyId, companyId),
eq(issues.originKind, BLOCKER_ATTENTION_OPEN_RECOVERY_ORIGIN_KIND),
isNull(issues.hiddenAt),
inArray(issues.originId, chunk),
notInArray(issues.status, BLOCKER_ATTENTION_OPEN_RECOVERY_TERMINAL_STATUSES),
),
);
for (const row of recoveryRows) {
if (row.originId) explicitWaitingIssueIds.add(row.originId);
}
// Recovery rows are intentionally company-wide: a liveness escalation for
// the same leaf blocker represents an active waiting path even when that
// blocker is reached through another blocked graph.
const recoveryRows: Array<{ id: string; originId: string | null }> = await dbOrTx
.select({ id: issues.id, originId: issues.originId })
.from(issues)
.where(
and(
eq(issues.companyId, companyId),
eq(issues.originKind, BLOCKER_ATTENTION_OPEN_RECOVERY_ORIGIN_KIND),
isNull(issues.hiddenAt),
notInArray(issues.status, BLOCKER_ATTENTION_OPEN_RECOVERY_TERMINAL_STATUSES),
),
);
for (const row of recoveryRows) {
const parsed = parseIssueGraphLivenessIncidentKey(row.originId);
if (!parsed || parsed.companyId !== companyId) continue;
explicitWaitingIssueIds.add(row.id);
explicitWaitingIssueIds.add(parsed.issueId);
explicitWaitingIssueIds.add(parsed.leafIssueId);
}
}
@@ -1257,8 +1264,11 @@ async function listIssueBlockerAttentionMap(
if (node.status === "done") {
return { covered: true, stalled: false, sampleBlockerIdentifier: nodeSample, sampleStalledBlockerIdentifier: null };
}
if (explicitWaitingIssueIds.has(node.id)) {
return { covered: true, stalled: false, sampleBlockerIdentifier: nodeSample, sampleStalledBlockerIdentifier: null };
}
if (node.status === "in_review") {
const hasWaitingPath = activeIssueIds.has(node.id) || Boolean(node.assigneeUserId) || explicitWaitingIssueIds.has(node.id);
const hasWaitingPath = activeIssueIds.has(node.id) || Boolean(node.assigneeUserId);
if (hasWaitingPath) {
return { covered: true, stalled: false, sampleBlockerIdentifier: nodeSample, sampleStalledBlockerIdentifier: null };
}

View File

@@ -127,7 +127,6 @@ export function decideRunLivenessContinuation(input: {
if (budgetBlocked) {
return { kind: "skip", reason: "budget hard stop blocks continuation" };
}
const currentAttempt = readContinuationAttempt(run.continuationAttempt);
if (currentAttempt >= maxAttempts) {
return {

View File

@@ -2250,10 +2250,16 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
}) {
const blockerIds = await existingBlockerIssueIds(input.issue.companyId, input.issue.id);
const nextBlockerIds = [...new Set([...blockerIds, input.escalationIssueId])];
const isAlreadyBlockedByEscalation = blockerIds.includes(input.escalationIssueId);
const isAlreadyBlocked = input.issue.status === "blocked";
if (isAlreadyBlockedByEscalation && isAlreadyBlocked) {
return input.issue;
}
const update: Partial<typeof issues.$inferInsert> & { blockedByIssueIds: string[] } = {
blockedByIssueIds: nextBlockerIds,
};
if (input.issue.status !== "blocked") {
if (!isAlreadyBlocked) {
update.status = "blocked";
}