mirror of
https://github.com/paperclipai/paperclip
synced 2026-04-25 17:25:15 +02:00
## Thinking Path > - Paperclip orchestrates AI agents for zero-human companies. > - This branch extends the issue control-plane so board operators can pause, cancel, and later restore whole issue subtrees while keeping descendant execution and wake behavior coherent. > - That required new hold state in the database, shared contracts, server routes/services, and issue detail UI controls so subtree actions are durable and auditable instead of ad hoc. > - While this branch was in flight, `master` advanced with new environment lifecycle work, including a new `0065_environments` migration. > - Before opening the PR, this branch had to be rebased onto `paperclipai/paperclip:master` without losing the existing subtree-control work or leaving conflicting migration numbering behind. > - This pull request rebases the subtree pause/cancel/restore feature cleanly onto current `master`, renumbers the hold migration to `0066_issue_tree_holds`, and preserves the full branch diff in a single PR. > - The benefit is that reviewers get one clean, mergeable PR for the subtree-control feature instead of stale branch history with migration conflicts. ## What Changed - Added durable issue subtree hold data structures, shared API/types/validators, server routes/services, and UI flows for subtree pause, cancel, and restore operations. - Added server and UI coverage for subtree previewing, hold creation/release, dependency-aware scheduling under holds, and issue detail subtree controls. - Rebased the branch onto current `paperclipai/paperclip:master` and renumbered the branch migration from `0065_issue_tree_holds` to `0066_issue_tree_holds` so it no longer conflicts with upstream `0065_environments`. - Added a small follow-up commit that makes restore requests return `200 OK` explicitly while keeping pause/cancel hold creation at `201 Created`, and updated the route test to match that contract. ## Verification - `pnpm --filter @paperclipai/db typecheck` - `pnpm --filter @paperclipai/shared typecheck` - `pnpm --filter @paperclipai/server typecheck` - `pnpm --filter @paperclipai/ui typecheck` - `cd server && pnpm exec vitest run src/__tests__/issue-tree-control-routes.test.ts src/__tests__/issue-tree-control-service.test.ts src/__tests__/issue-tree-control-service-unit.test.ts src/__tests__/heartbeat-dependency-scheduling.test.ts` - `cd ui && pnpm exec vitest run src/components/IssueChatThread.test.tsx src/pages/IssueDetail.test.tsx` ## Risks - This is a broad cross-layer change touching DB/schema, shared contracts, server orchestration, and UI; regressions are most likely around subtree status restoration or wake suppression/resume edge cases. - The migration was renumbered during PR prep to avoid the new upstream `0065_environments` conflict. Reviewers should confirm the final `0066_issue_tree_holds` ordering is the only hold-related migration that lands. - The issue-tree restore endpoint now responds with `200` instead of relying on implicit behavior, which is semantically better for a restore operation but still changes an API detail that clients or tests could have assumed. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI Codex coding agent in the Paperclip Codex runtime (GPT-5-class tool-using coding model; exact deployment ID/context window is not exposed inside this session). ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [ ] If this change affects the UI, I have included before/after screenshots - [ ] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --------- Co-authored-by: Paperclip <noreply@paperclip.ing>
948 lines
32 KiB
TypeScript
948 lines
32 KiB
TypeScript
import { and, asc, eq, inArray, isNull, notInArray, or, sql } from "drizzle-orm";
|
|
import type { Db } from "@paperclipai/db";
|
|
import {
|
|
agentWakeupRequests,
|
|
heartbeatRuns,
|
|
issueTreeHoldMembers,
|
|
issueTreeHolds,
|
|
issues,
|
|
} from "@paperclipai/db";
|
|
import {
|
|
ISSUE_STATUSES,
|
|
type IssueStatus,
|
|
type IssueTreeControlMode,
|
|
type IssueTreeControlPreview,
|
|
type IssueTreeHold,
|
|
type IssueTreeHoldMember,
|
|
type IssueTreeHoldReleasePolicy,
|
|
type IssueTreePreviewAgent,
|
|
type IssueTreePreviewIssue,
|
|
type IssueTreePreviewRun,
|
|
type IssueTreePreviewWarning,
|
|
} from "@paperclipai/shared";
|
|
import { conflict, notFound, unprocessable } from "../errors.js";
|
|
|
|
type IssueRow = typeof issues.$inferSelect;
|
|
type HoldRow = typeof issueTreeHolds.$inferSelect;
|
|
type HoldMemberRow = typeof issueTreeHoldMembers.$inferSelect;
|
|
export type ActiveIssueTreePauseHoldGate = {
|
|
holdId: string;
|
|
rootIssueId: string;
|
|
issueId: string;
|
|
isRoot: boolean;
|
|
mode: "pause";
|
|
reason: string | null;
|
|
releasePolicy: IssueTreeHoldReleasePolicy | null;
|
|
};
|
|
type ActorInput = {
|
|
actorType: "user" | "agent" | "system";
|
|
actorId: string;
|
|
agentId?: string | null;
|
|
userId?: string | null;
|
|
runId?: string | null;
|
|
};
|
|
type TreeIssue = IssueRow & { depth: number };
|
|
type ActiveRunRow = {
|
|
id: string;
|
|
issueId: string;
|
|
agentId: string;
|
|
status: "queued" | "running";
|
|
startedAt: Date | null;
|
|
createdAt: Date;
|
|
};
|
|
type ActiveCancelSnapshot = {
|
|
holdIds: string[];
|
|
member: IssueTreeHoldMember | null;
|
|
};
|
|
type TreeStatusUpdateResult = {
|
|
updatedIssueIds: string[];
|
|
updatedIssues: Array<{
|
|
id: string;
|
|
status: IssueStatus;
|
|
assigneeAgentId: string | null;
|
|
}>;
|
|
};
|
|
type RestoreTreeStatusResult = TreeStatusUpdateResult & {
|
|
releasedCancelHoldIds: string[];
|
|
restoreHold: IssueTreeHold | null;
|
|
};
|
|
|
|
const TERMINAL_ISSUE_STATUSES = new Set<IssueStatus>(["done", "cancelled"]);
|
|
const ACTIVE_RUN_STATUSES = ["queued", "running"] as const;
|
|
const DEFAULT_RELEASE_POLICY: IssueTreeHoldReleasePolicy = { strategy: "manual" };
|
|
const MAX_PAUSE_HOLD_GATE_DEPTH = 15;
|
|
export const ISSUE_TREE_CONTROL_INTERACTION_WAKE_REASONS: ReadonlySet<string> = new Set([
|
|
"issue_commented",
|
|
"issue_reopened_via_comment",
|
|
"issue_comment_mentioned",
|
|
] as const);
|
|
|
|
function normalizeReleasePolicy(
|
|
releasePolicy: IssueTreeHoldReleasePolicy | null | undefined,
|
|
): IssueTreeHoldReleasePolicy {
|
|
return releasePolicy ?? DEFAULT_RELEASE_POLICY;
|
|
}
|
|
|
|
function coerceIssueStatus(status: string): IssueStatus {
|
|
return ISSUE_STATUSES.includes(status as IssueStatus) ? (status as IssueStatus) : "backlog";
|
|
}
|
|
|
|
function isTerminalIssue(status: string): status is IssueStatus {
|
|
return TERMINAL_ISSUE_STATUSES.has(coerceIssueStatus(status));
|
|
}
|
|
|
|
function toPreviewRun(row: ActiveRunRow): IssueTreePreviewRun {
|
|
return {
|
|
id: row.id,
|
|
issueId: row.issueId,
|
|
agentId: row.agentId,
|
|
status: row.status,
|
|
startedAt: row.startedAt,
|
|
createdAt: row.createdAt,
|
|
};
|
|
}
|
|
|
|
function toHold(row: HoldRow, members?: HoldMemberRow[]): IssueTreeHold {
|
|
return {
|
|
id: row.id,
|
|
companyId: row.companyId,
|
|
rootIssueId: row.rootIssueId,
|
|
mode: row.mode as IssueTreeControlMode,
|
|
status: row.status as IssueTreeHold["status"],
|
|
reason: row.reason,
|
|
releasePolicy: (row.releasePolicy as IssueTreeHoldReleasePolicy | null) ?? null,
|
|
createdByActorType: row.createdByActorType as IssueTreeHold["createdByActorType"],
|
|
createdByAgentId: row.createdByAgentId,
|
|
createdByUserId: row.createdByUserId,
|
|
createdByRunId: row.createdByRunId,
|
|
releasedAt: row.releasedAt,
|
|
releasedByActorType: row.releasedByActorType as IssueTreeHold["releasedByActorType"],
|
|
releasedByAgentId: row.releasedByAgentId,
|
|
releasedByUserId: row.releasedByUserId,
|
|
releasedByRunId: row.releasedByRunId,
|
|
releaseReason: row.releaseReason,
|
|
releaseMetadata: row.releaseMetadata ?? null,
|
|
createdAt: row.createdAt,
|
|
updatedAt: row.updatedAt,
|
|
...(members ? { members: members.map(toHoldMember) } : {}),
|
|
};
|
|
}
|
|
|
|
function toHoldMember(row: HoldMemberRow): IssueTreeHoldMember {
|
|
return {
|
|
id: row.id,
|
|
companyId: row.companyId,
|
|
holdId: row.holdId,
|
|
issueId: row.issueId,
|
|
parentIssueId: row.parentIssueId,
|
|
depth: row.depth,
|
|
issueIdentifier: row.issueIdentifier,
|
|
issueTitle: row.issueTitle,
|
|
issueStatus: coerceIssueStatus(row.issueStatus),
|
|
assigneeAgentId: row.assigneeAgentId,
|
|
assigneeUserId: row.assigneeUserId,
|
|
activeRunId: row.activeRunId,
|
|
activeRunStatus: row.activeRunStatus,
|
|
skipped: row.skipped,
|
|
skipReason: row.skipReason,
|
|
createdAt: row.createdAt,
|
|
};
|
|
}
|
|
|
|
function issueSkipReason(input: {
|
|
mode: IssueTreeControlMode;
|
|
issue: TreeIssue;
|
|
activePauseHoldIds: string[];
|
|
activeCancelSnapshot?: ActiveCancelSnapshot | null;
|
|
}): string | null {
|
|
const status = coerceIssueStatus(input.issue.status);
|
|
if (input.mode === "restore") {
|
|
if (input.activeCancelSnapshot?.member && status !== "cancelled") {
|
|
return "changed_after_cancel";
|
|
}
|
|
if (status !== "cancelled") return "not_cancelled";
|
|
if (!input.activeCancelSnapshot?.member) return "not_cancelled_by_tree_control";
|
|
const snapshotStatus = coerceIssueStatus(input.activeCancelSnapshot.member.issueStatus);
|
|
return isTerminalIssue(snapshotStatus) ? "terminal_status" : null;
|
|
}
|
|
if (isTerminalIssue(status)) {
|
|
return "terminal_status";
|
|
}
|
|
if (input.mode === "pause" && input.activePauseHoldIds.length > 0) {
|
|
return "already_held";
|
|
}
|
|
if (input.mode === "resume" && input.activePauseHoldIds.length === 0) {
|
|
return "not_held";
|
|
}
|
|
return null;
|
|
}
|
|
|
|
function buildAffectedAgents(issuesToPreview: IssueTreePreviewIssue[]): IssueTreePreviewAgent[] {
|
|
const byAgentId = new Map<string, IssueTreePreviewAgent>();
|
|
for (const issue of issuesToPreview) {
|
|
if (issue.skipped) continue;
|
|
const agentIds = new Set<string>();
|
|
if (issue.assigneeAgentId) agentIds.add(issue.assigneeAgentId);
|
|
if (issue.activeRun) agentIds.add(issue.activeRun.agentId);
|
|
for (const agentId of agentIds) {
|
|
const current = byAgentId.get(agentId) ?? { agentId, issueCount: 0, activeRunCount: 0 };
|
|
current.issueCount += 1;
|
|
if (issue.activeRun?.agentId === agentId) current.activeRunCount += 1;
|
|
byAgentId.set(agentId, current);
|
|
}
|
|
}
|
|
return [...byAgentId.values()].sort((a, b) => a.agentId.localeCompare(b.agentId));
|
|
}
|
|
|
|
function buildWarnings(input: {
|
|
mode: IssueTreeControlMode;
|
|
issuesToPreview: IssueTreePreviewIssue[];
|
|
activeRuns: IssueTreePreviewRun[];
|
|
}): IssueTreePreviewWarning[] {
|
|
const affectedIssues = input.issuesToPreview.filter((issue) => !issue.skipped);
|
|
const affectedIssueIds = new Set(affectedIssues.map((issue) => issue.id));
|
|
const affectedRuns = input.activeRuns.filter((run) => affectedIssueIds.has(run.issueId));
|
|
const warnings: IssueTreePreviewWarning[] = [];
|
|
|
|
if (affectedIssues.length === 0) {
|
|
warnings.push({
|
|
code: "no_affected_issues",
|
|
message: "No issues in this subtree match the requested control action.",
|
|
});
|
|
}
|
|
|
|
const runningRunIssueIds = affectedRuns
|
|
.filter((run) => run.status === "running")
|
|
.map((run) => run.issueId);
|
|
if ((input.mode === "pause" || input.mode === "cancel") && runningRunIssueIds.length > 0) {
|
|
warnings.push({
|
|
code: "running_runs_present",
|
|
message: "Some affected issues have running heartbeat runs.",
|
|
issueIds: [...new Set(runningRunIssueIds)].sort(),
|
|
});
|
|
}
|
|
|
|
const queuedRunIssueIds = affectedRuns
|
|
.filter((run) => run.status === "queued")
|
|
.map((run) => run.issueId);
|
|
if ((input.mode === "pause" || input.mode === "cancel") && queuedRunIssueIds.length > 0) {
|
|
warnings.push({
|
|
code: "queued_runs_present",
|
|
message: "Some affected issues have queued heartbeat runs.",
|
|
issueIds: [...new Set(queuedRunIssueIds)].sort(),
|
|
});
|
|
}
|
|
|
|
if (input.mode === "resume" && affectedIssues.length === 0) {
|
|
warnings.push({
|
|
code: "no_active_pause_holds",
|
|
message: "No active pause holds were found in this subtree.",
|
|
});
|
|
}
|
|
|
|
if (input.mode === "restore") {
|
|
const changedIssueIds = input.issuesToPreview
|
|
.filter((issue) => issue.skipReason === "changed_after_cancel")
|
|
.map((issue) => issue.id);
|
|
if (changedIssueIds.length > 0) {
|
|
warnings.push({
|
|
code: "restore_conflicts_present",
|
|
message: "Some issues changed after subtree cancellation and will be skipped.",
|
|
issueIds: changedIssueIds,
|
|
});
|
|
}
|
|
}
|
|
|
|
return warnings;
|
|
}
|
|
|
|
function restoreStatusFromCancelSnapshot(status: IssueStatus): IssueStatus | null {
|
|
if (status === "in_progress") return "todo";
|
|
if (isTerminalIssue(status)) return null;
|
|
return status;
|
|
}
|
|
|
|
export function issueTreeControlService(db: Db) {
|
|
async function listTreeIssues(companyId: string, rootIssueId: string): Promise<TreeIssue[]> {
|
|
const root = await db
|
|
.select()
|
|
.from(issues)
|
|
.where(and(eq(issues.id, rootIssueId), eq(issues.companyId, companyId)))
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!root) {
|
|
throw notFound("Root issue not found");
|
|
}
|
|
|
|
const result: TreeIssue[] = [{ ...root, depth: 0 }];
|
|
const visited = new Set<string>([root.id]);
|
|
let frontier = [{ id: root.id, depth: 0 }];
|
|
|
|
while (frontier.length > 0) {
|
|
const parentIds = frontier.map((item) => item.id);
|
|
const depthByParentId = new Map(frontier.map((item) => [item.id, item.depth]));
|
|
const children = await db
|
|
.select()
|
|
.from(issues)
|
|
.where(and(eq(issues.companyId, companyId), inArray(issues.parentId, parentIds)))
|
|
.orderBy(asc(issues.createdAt), asc(issues.id));
|
|
|
|
const nextFrontier: typeof frontier = [];
|
|
for (const child of children) {
|
|
if (visited.has(child.id)) continue;
|
|
const depth = (depthByParentId.get(child.parentId ?? "") ?? 0) + 1;
|
|
visited.add(child.id);
|
|
result.push({ ...child, depth });
|
|
nextFrontier.push({ id: child.id, depth });
|
|
}
|
|
frontier = nextFrontier;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
async function activeRunsForTree(companyId: string, treeIssues: TreeIssue[]) {
|
|
const issueIds = treeIssues.map((issue) => issue.id);
|
|
if (issueIds.length === 0) return [];
|
|
const runIds = treeIssues
|
|
.map((issue) => issue.executionRunId)
|
|
.filter((id): id is string => typeof id === "string" && id.length > 0);
|
|
const uniqueRunIds = [...new Set(runIds)];
|
|
const issueIdFromContext = sql<string | null>`${heartbeatRuns.contextSnapshot} ->> 'issueId'`;
|
|
const issueIdSet = new Set(issueIds);
|
|
|
|
const rows = await db
|
|
.select({
|
|
id: heartbeatRuns.id,
|
|
agentId: heartbeatRuns.agentId,
|
|
status: heartbeatRuns.status,
|
|
issueIdFromContext,
|
|
startedAt: heartbeatRuns.startedAt,
|
|
createdAt: heartbeatRuns.createdAt,
|
|
})
|
|
.from(heartbeatRuns)
|
|
.where(
|
|
and(
|
|
eq(heartbeatRuns.companyId, companyId),
|
|
inArray(heartbeatRuns.status, [...ACTIVE_RUN_STATUSES]),
|
|
uniqueRunIds.length > 0
|
|
? or(inArray(heartbeatRuns.id, uniqueRunIds), inArray(issueIdFromContext, issueIds))
|
|
: inArray(issueIdFromContext, issueIds),
|
|
),
|
|
);
|
|
|
|
const issueIdByExecutionRunId = new Map(
|
|
treeIssues
|
|
.filter((issue) => issue.executionRunId)
|
|
.map((issue) => [issue.executionRunId as string, issue.id]),
|
|
);
|
|
return rows
|
|
.map((run) => {
|
|
if (run.status !== "queued" && run.status !== "running") return null;
|
|
const issueId = run.issueIdFromContext && issueIdSet.has(run.issueIdFromContext)
|
|
? run.issueIdFromContext
|
|
: issueIdByExecutionRunId.get(run.id) ?? null;
|
|
if (!issueId) return null;
|
|
return {
|
|
id: run.id,
|
|
issueId,
|
|
agentId: run.agentId,
|
|
status: run.status,
|
|
startedAt: run.startedAt,
|
|
createdAt: run.createdAt,
|
|
} satisfies ActiveRunRow;
|
|
})
|
|
.filter((run): run is ActiveRunRow => run !== null)
|
|
.sort((a, b) => a.issueId.localeCompare(b.issueId) || a.createdAt.getTime() - b.createdAt.getTime());
|
|
}
|
|
|
|
async function activeHoldsByIssueId(companyId: string, issueIds: string[]) {
|
|
const byIssueId = new Map<string, { all: string[]; pause: string[] }>();
|
|
if (issueIds.length === 0) return byIssueId;
|
|
const rows = await db
|
|
.select({
|
|
issueId: issueTreeHoldMembers.issueId,
|
|
holdId: issueTreeHolds.id,
|
|
mode: issueTreeHolds.mode,
|
|
})
|
|
.from(issueTreeHoldMembers)
|
|
.innerJoin(issueTreeHolds, eq(issueTreeHoldMembers.holdId, issueTreeHolds.id))
|
|
.where(
|
|
and(
|
|
eq(issueTreeHoldMembers.companyId, companyId),
|
|
eq(issueTreeHolds.status, "active"),
|
|
inArray(issueTreeHoldMembers.issueId, issueIds),
|
|
),
|
|
)
|
|
.orderBy(asc(issueTreeHolds.createdAt), asc(issueTreeHolds.id));
|
|
|
|
for (const row of rows) {
|
|
const current = byIssueId.get(row.issueId) ?? { all: [], pause: [] };
|
|
current.all.push(row.holdId);
|
|
if (row.mode === "pause") current.pause.push(row.holdId);
|
|
byIssueId.set(row.issueId, current);
|
|
}
|
|
return byIssueId;
|
|
}
|
|
|
|
async function activeCancelSnapshotsByIssueId(companyId: string, rootIssueId: string) {
|
|
const activeCancelHolds = await listHolds(companyId, rootIssueId, {
|
|
status: "active",
|
|
mode: "cancel",
|
|
includeMembers: true,
|
|
});
|
|
const byIssueId = new Map<string, ActiveCancelSnapshot>();
|
|
for (const hold of [...activeCancelHolds].reverse()) {
|
|
for (const member of hold.members ?? []) {
|
|
const current = byIssueId.get(member.issueId) ?? { holdIds: [], member: null };
|
|
if (!current.holdIds.includes(hold.id)) current.holdIds.push(hold.id);
|
|
if (!current.member && !member.skipped) current.member = member;
|
|
byIssueId.set(member.issueId, current);
|
|
}
|
|
}
|
|
return byIssueId;
|
|
}
|
|
|
|
async function getActivePauseHoldGate(
|
|
companyId: string,
|
|
issueId: string,
|
|
): Promise<ActiveIssueTreePauseHoldGate | null> {
|
|
const activePauseHolds = await db
|
|
.select({
|
|
id: issueTreeHolds.id,
|
|
rootIssueId: issueTreeHolds.rootIssueId,
|
|
reason: issueTreeHolds.reason,
|
|
releasePolicy: issueTreeHolds.releasePolicy,
|
|
})
|
|
.from(issueTreeHolds)
|
|
.where(
|
|
and(
|
|
eq(issueTreeHolds.companyId, companyId),
|
|
eq(issueTreeHolds.status, "active"),
|
|
eq(issueTreeHolds.mode, "pause"),
|
|
),
|
|
)
|
|
.orderBy(asc(issueTreeHolds.createdAt), asc(issueTreeHolds.id));
|
|
if (activePauseHolds.length === 0) return null;
|
|
|
|
const holdByRootIssueId = new Map(activePauseHolds.map((hold) => [hold.rootIssueId, hold]));
|
|
let currentIssueId: string | null = issueId;
|
|
const visited = new Set<string>();
|
|
let depth = 0;
|
|
|
|
while (currentIssueId && !visited.has(currentIssueId) && depth < MAX_PAUSE_HOLD_GATE_DEPTH) {
|
|
visited.add(currentIssueId);
|
|
const hold = holdByRootIssueId.get(currentIssueId);
|
|
if (hold) {
|
|
return {
|
|
holdId: hold.id,
|
|
rootIssueId: hold.rootIssueId,
|
|
issueId,
|
|
isRoot: hold.rootIssueId === issueId,
|
|
mode: "pause",
|
|
reason: hold.reason,
|
|
releasePolicy: (hold.releasePolicy as IssueTreeHoldReleasePolicy | null) ?? null,
|
|
};
|
|
}
|
|
|
|
const parent: { parentId: string | null } | null = await db
|
|
.select({ parentId: issues.parentId })
|
|
.from(issues)
|
|
.where(and(eq(issues.id, currentIssueId), eq(issues.companyId, companyId)))
|
|
.then((rows) => rows[0] ?? null);
|
|
currentIssueId = parent?.parentId ?? null;
|
|
depth += 1;
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
async function preview(
|
|
companyId: string,
|
|
rootIssueId: string,
|
|
input: {
|
|
mode: IssueTreeControlMode;
|
|
releasePolicy?: IssueTreeHoldReleasePolicy | null;
|
|
},
|
|
): Promise<IssueTreeControlPreview> {
|
|
const treeIssues = await listTreeIssues(companyId, rootIssueId);
|
|
const issueIds = treeIssues.map((issue) => issue.id);
|
|
const [activeRunRows, holdsByIssueId, activeCancelSnapshots] = await Promise.all([
|
|
activeRunsForTree(companyId, treeIssues),
|
|
activeHoldsByIssueId(companyId, issueIds),
|
|
input.mode === "restore"
|
|
? activeCancelSnapshotsByIssueId(companyId, rootIssueId)
|
|
: Promise.resolve(new Map<string, ActiveCancelSnapshot>()),
|
|
]);
|
|
const runsByIssueId = new Map<string, ActiveRunRow>();
|
|
for (const run of activeRunRows) {
|
|
if (!runsByIssueId.has(run.issueId)) runsByIssueId.set(run.issueId, run);
|
|
}
|
|
const countsByStatus: Partial<Record<IssueStatus, number>> = {};
|
|
|
|
const issuesToPreview = treeIssues.map((issue) => {
|
|
const status = coerceIssueStatus(issue.status);
|
|
countsByStatus[status] = (countsByStatus[status] ?? 0) + 1;
|
|
const holdState = holdsByIssueId.get(issue.id) ?? { all: [], pause: [] };
|
|
const skipReason = issueSkipReason({
|
|
mode: input.mode,
|
|
issue,
|
|
activePauseHoldIds: holdState.pause,
|
|
activeCancelSnapshot: activeCancelSnapshots.get(issue.id) ?? null,
|
|
});
|
|
const run = runsByIssueId.get(issue.id);
|
|
return {
|
|
id: issue.id,
|
|
identifier: issue.identifier,
|
|
title: issue.title,
|
|
status,
|
|
parentId: issue.parentId,
|
|
depth: issue.depth,
|
|
assigneeAgentId: issue.assigneeAgentId,
|
|
assigneeUserId: issue.assigneeUserId,
|
|
activeRun: run ? toPreviewRun(run) : null,
|
|
activeHoldIds: holdState.all,
|
|
action: input.mode,
|
|
skipped: skipReason !== null,
|
|
skipReason,
|
|
} satisfies IssueTreePreviewIssue;
|
|
});
|
|
const skippedIssues = issuesToPreview.filter((issue) => issue.skipped);
|
|
const activeRuns = activeRunRows
|
|
.map(toPreviewRun)
|
|
.sort((a, b) => a.issueId.localeCompare(b.issueId) || a.id.localeCompare(b.id));
|
|
const affectedAgents = buildAffectedAgents(issuesToPreview);
|
|
|
|
return {
|
|
companyId,
|
|
rootIssueId,
|
|
mode: input.mode,
|
|
generatedAt: new Date(),
|
|
releasePolicy: normalizeReleasePolicy(input.releasePolicy),
|
|
totals: {
|
|
totalIssues: issuesToPreview.length,
|
|
affectedIssues: issuesToPreview.length - skippedIssues.length,
|
|
skippedIssues: skippedIssues.length,
|
|
activeRuns: activeRuns.filter((run) => run.status === "running").length,
|
|
queuedRuns: activeRuns.filter((run) => run.status === "queued").length,
|
|
affectedAgents: affectedAgents.length,
|
|
},
|
|
countsByStatus,
|
|
issues: issuesToPreview,
|
|
skippedIssues,
|
|
activeRuns,
|
|
affectedAgents,
|
|
warnings: buildWarnings({ mode: input.mode, issuesToPreview, activeRuns }),
|
|
};
|
|
}
|
|
|
|
async function createHold(
|
|
companyId: string,
|
|
rootIssueId: string,
|
|
input: {
|
|
mode: IssueTreeControlMode;
|
|
reason?: string | null;
|
|
releasePolicy?: IssueTreeHoldReleasePolicy | null;
|
|
actor: ActorInput;
|
|
},
|
|
) {
|
|
const holdReleasePolicy = normalizeReleasePolicy(input.releasePolicy);
|
|
const holdPreview = await preview(companyId, rootIssueId, {
|
|
mode: input.mode,
|
|
releasePolicy: holdReleasePolicy,
|
|
});
|
|
|
|
const { hold, members } = await db.transaction(async (tx) => {
|
|
const [createdHold] = await tx
|
|
.insert(issueTreeHolds)
|
|
.values({
|
|
companyId,
|
|
rootIssueId,
|
|
mode: input.mode,
|
|
status: "active",
|
|
reason: input.reason ?? null,
|
|
releasePolicy: holdReleasePolicy as unknown as Record<string, unknown>,
|
|
createdByActorType: input.actor.actorType,
|
|
createdByAgentId: input.actor.agentId ?? null,
|
|
createdByUserId: input.actor.userId ?? (input.actor.actorType === "user" ? input.actor.actorId : null),
|
|
createdByRunId: input.actor.runId ?? null,
|
|
})
|
|
.returning();
|
|
|
|
const memberRows = holdPreview.issues.map((issue) => ({
|
|
companyId,
|
|
holdId: createdHold.id,
|
|
issueId: issue.id,
|
|
parentIssueId: issue.parentId,
|
|
depth: issue.depth,
|
|
issueIdentifier: issue.identifier,
|
|
issueTitle: issue.title,
|
|
issueStatus: issue.status,
|
|
assigneeAgentId: issue.assigneeAgentId,
|
|
assigneeUserId: issue.assigneeUserId,
|
|
activeRunId: issue.activeRun?.id ?? null,
|
|
activeRunStatus: issue.activeRun?.status ?? null,
|
|
skipped: issue.skipped,
|
|
skipReason: issue.skipReason,
|
|
}));
|
|
|
|
const createdMembers = memberRows.length > 0
|
|
? await tx.insert(issueTreeHoldMembers).values(memberRows).returning()
|
|
: [];
|
|
|
|
return { hold: createdHold, members: createdMembers };
|
|
});
|
|
|
|
return {
|
|
hold: toHold(hold, members),
|
|
preview: holdPreview,
|
|
};
|
|
}
|
|
|
|
async function cancelIssueStatusesForHold(
|
|
companyId: string,
|
|
rootIssueId: string,
|
|
holdId: string,
|
|
): Promise<TreeStatusUpdateResult> {
|
|
const hold = await getHold(companyId, holdId);
|
|
if (!hold) throw notFound("Issue tree hold not found");
|
|
if (hold.rootIssueId !== rootIssueId) {
|
|
throw unprocessable("Issue tree hold does not belong to the requested root issue");
|
|
}
|
|
if (hold.mode !== "cancel") {
|
|
throw unprocessable("Issue tree hold is not a cancel operation");
|
|
}
|
|
|
|
const issueIds = [...new Set((hold.members ?? [])
|
|
.filter((member) => !member.skipped)
|
|
.map((member) => member.issueId))];
|
|
if (issueIds.length === 0) return { updatedIssueIds: [], updatedIssues: [] };
|
|
|
|
const now = new Date();
|
|
const updated = await db
|
|
.update(issues)
|
|
.set({
|
|
status: "cancelled",
|
|
cancelledAt: now,
|
|
completedAt: null,
|
|
checkoutRunId: null,
|
|
executionRunId: null,
|
|
executionAgentNameKey: null,
|
|
executionLockedAt: null,
|
|
updatedAt: now,
|
|
})
|
|
.where(
|
|
and(
|
|
eq(issues.companyId, companyId),
|
|
inArray(issues.id, issueIds),
|
|
notInArray(issues.status, ["done", "cancelled"]),
|
|
),
|
|
)
|
|
.returning({
|
|
id: issues.id,
|
|
status: issues.status,
|
|
assigneeAgentId: issues.assigneeAgentId,
|
|
});
|
|
|
|
return {
|
|
updatedIssueIds: updated.map((issue) => issue.id),
|
|
updatedIssues: updated.map((issue) => ({
|
|
id: issue.id,
|
|
status: coerceIssueStatus(issue.status),
|
|
assigneeAgentId: issue.assigneeAgentId,
|
|
})),
|
|
};
|
|
}
|
|
|
|
async function restoreIssueStatusesForHold(
|
|
companyId: string,
|
|
rootIssueId: string,
|
|
restoreHoldId: string,
|
|
input: {
|
|
reason?: string | null;
|
|
actor: ActorInput;
|
|
},
|
|
): Promise<RestoreTreeStatusResult> {
|
|
const restoreHold = await getHold(companyId, restoreHoldId);
|
|
if (!restoreHold) throw notFound("Issue tree hold not found");
|
|
if (restoreHold.rootIssueId !== rootIssueId) {
|
|
throw unprocessable("Issue tree hold does not belong to the requested root issue");
|
|
}
|
|
if (restoreHold.mode !== "restore") {
|
|
throw unprocessable("Issue tree hold is not a restore operation");
|
|
}
|
|
|
|
const activeCancelHolds = await listHolds(companyId, rootIssueId, {
|
|
status: "active",
|
|
mode: "cancel",
|
|
includeMembers: true,
|
|
});
|
|
const cancelSnapshotByIssueId = new Map<string, IssueTreeHoldMember>();
|
|
for (const hold of [...activeCancelHolds].reverse()) {
|
|
for (const member of hold.members ?? []) {
|
|
if (!member.skipped && !cancelSnapshotByIssueId.has(member.issueId)) {
|
|
cancelSnapshotByIssueId.set(member.issueId, member);
|
|
}
|
|
}
|
|
}
|
|
|
|
const restoreIssueIds = [...new Set((restoreHold.members ?? [])
|
|
.filter((member) => !member.skipped)
|
|
.map((member) => member.issueId))];
|
|
const restoreStatusByIssueId = new Map<string, IssueStatus>();
|
|
for (const issueId of restoreIssueIds) {
|
|
const snapshot = cancelSnapshotByIssueId.get(issueId);
|
|
if (!snapshot) continue;
|
|
const restoredStatus = restoreStatusFromCancelSnapshot(coerceIssueStatus(snapshot.issueStatus));
|
|
if (restoredStatus) restoreStatusByIssueId.set(issueId, restoredStatus);
|
|
}
|
|
|
|
const issueIdsByStatus = new Map<IssueStatus, string[]>();
|
|
for (const [issueId, status] of restoreStatusByIssueId) {
|
|
const current = issueIdsByStatus.get(status) ?? [];
|
|
current.push(issueId);
|
|
issueIdsByStatus.set(status, current);
|
|
}
|
|
|
|
const now = new Date();
|
|
const releasedCancelHoldIds = activeCancelHolds.map((hold) => hold.id);
|
|
const updatedIssues = await db.transaction(async (tx) => {
|
|
const restored: TreeStatusUpdateResult["updatedIssues"] = [];
|
|
for (const [status, issueIdsForStatus] of issueIdsByStatus) {
|
|
if (issueIdsForStatus.length === 0) continue;
|
|
const rows = await tx
|
|
.update(issues)
|
|
.set({
|
|
status,
|
|
cancelledAt: null,
|
|
completedAt: null,
|
|
checkoutRunId: null,
|
|
executionRunId: null,
|
|
executionAgentNameKey: null,
|
|
executionLockedAt: null,
|
|
updatedAt: now,
|
|
})
|
|
.where(
|
|
and(
|
|
eq(issues.companyId, companyId),
|
|
inArray(issues.id, issueIdsForStatus),
|
|
eq(issues.status, "cancelled"),
|
|
),
|
|
)
|
|
.returning({
|
|
id: issues.id,
|
|
status: issues.status,
|
|
assigneeAgentId: issues.assigneeAgentId,
|
|
});
|
|
restored.push(...rows.map((issue) => ({
|
|
id: issue.id,
|
|
status: coerceIssueStatus(issue.status),
|
|
assigneeAgentId: issue.assigneeAgentId,
|
|
})));
|
|
}
|
|
|
|
if (releasedCancelHoldIds.length > 0) {
|
|
await tx
|
|
.update(issueTreeHolds)
|
|
.set({
|
|
status: "released",
|
|
releasedAt: now,
|
|
releasedByActorType: input.actor.actorType,
|
|
releasedByAgentId: input.actor.agentId ?? null,
|
|
releasedByUserId: input.actor.userId ?? (input.actor.actorType === "user" ? input.actor.actorId : null),
|
|
releasedByRunId: input.actor.runId ?? null,
|
|
releaseReason: input.reason ?? "Restored by subtree restore operation",
|
|
releaseMetadata: {
|
|
restoreHoldId,
|
|
restoredIssueIds: restored.map((issue) => issue.id),
|
|
},
|
|
updatedAt: now,
|
|
})
|
|
.where(and(eq(issueTreeHolds.companyId, companyId), inArray(issueTreeHolds.id, releasedCancelHoldIds)));
|
|
}
|
|
|
|
await tx
|
|
.update(issueTreeHolds)
|
|
.set({
|
|
status: "released",
|
|
releasedAt: now,
|
|
releasedByActorType: input.actor.actorType,
|
|
releasedByAgentId: input.actor.agentId ?? null,
|
|
releasedByUserId: input.actor.userId ?? (input.actor.actorType === "user" ? input.actor.actorId : null),
|
|
releasedByRunId: input.actor.runId ?? null,
|
|
releaseReason: input.reason ?? "Restore operation applied",
|
|
releaseMetadata: {
|
|
restoredIssueIds: restored.map((issue) => issue.id),
|
|
releasedCancelHoldIds,
|
|
},
|
|
updatedAt: now,
|
|
})
|
|
.where(and(eq(issueTreeHolds.companyId, companyId), eq(issueTreeHolds.id, restoreHoldId)));
|
|
|
|
return restored;
|
|
});
|
|
|
|
return {
|
|
updatedIssueIds: updatedIssues.map((issue) => issue.id),
|
|
updatedIssues,
|
|
releasedCancelHoldIds,
|
|
restoreHold: await getHold(companyId, restoreHoldId),
|
|
};
|
|
}
|
|
|
|
async function getHold(companyId: string, holdId: string) {
|
|
const hold = await db
|
|
.select()
|
|
.from(issueTreeHolds)
|
|
.where(and(eq(issueTreeHolds.id, holdId), eq(issueTreeHolds.companyId, companyId)))
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!hold) return null;
|
|
const members = await db
|
|
.select()
|
|
.from(issueTreeHoldMembers)
|
|
.where(and(eq(issueTreeHoldMembers.companyId, companyId), eq(issueTreeHoldMembers.holdId, holdId)))
|
|
.orderBy(asc(issueTreeHoldMembers.depth), asc(issueTreeHoldMembers.createdAt), asc(issueTreeHoldMembers.issueId));
|
|
return toHold(hold, members);
|
|
}
|
|
|
|
async function listHolds(
|
|
companyId: string,
|
|
rootIssueId: string,
|
|
input?: {
|
|
status?: IssueTreeHold["status"];
|
|
mode?: IssueTreeControlMode;
|
|
includeMembers?: boolean;
|
|
},
|
|
) {
|
|
const whereClauses = [
|
|
eq(issueTreeHolds.companyId, companyId),
|
|
eq(issueTreeHolds.rootIssueId, rootIssueId),
|
|
];
|
|
if (input?.status) whereClauses.push(eq(issueTreeHolds.status, input.status));
|
|
if (input?.mode) whereClauses.push(eq(issueTreeHolds.mode, input.mode));
|
|
|
|
const holds = await db
|
|
.select()
|
|
.from(issueTreeHolds)
|
|
.where(and(...whereClauses))
|
|
.orderBy(asc(issueTreeHolds.createdAt), asc(issueTreeHolds.id));
|
|
if (!input?.includeMembers || holds.length === 0) {
|
|
return holds.map((hold) => toHold(hold));
|
|
}
|
|
|
|
const holdIds = holds.map((hold) => hold.id);
|
|
const members = await db
|
|
.select()
|
|
.from(issueTreeHoldMembers)
|
|
.where(
|
|
and(
|
|
eq(issueTreeHoldMembers.companyId, companyId),
|
|
inArray(issueTreeHoldMembers.holdId, holdIds),
|
|
),
|
|
)
|
|
.orderBy(asc(issueTreeHoldMembers.depth), asc(issueTreeHoldMembers.createdAt), asc(issueTreeHoldMembers.issueId));
|
|
|
|
const membersByHoldId = new Map<string, HoldMemberRow[]>();
|
|
for (const member of members) {
|
|
const existing = membersByHoldId.get(member.holdId) ?? [];
|
|
existing.push(member);
|
|
membersByHoldId.set(member.holdId, existing);
|
|
}
|
|
|
|
return holds.map((hold) => toHold(hold, membersByHoldId.get(hold.id) ?? []));
|
|
}
|
|
|
|
async function releaseHold(
|
|
companyId: string,
|
|
rootIssueId: string,
|
|
holdId: string,
|
|
input: {
|
|
reason?: string | null;
|
|
releasePolicy?: IssueTreeHoldReleasePolicy | null;
|
|
metadata?: Record<string, unknown> | null;
|
|
actor: ActorInput;
|
|
},
|
|
) {
|
|
const existing = await db
|
|
.select()
|
|
.from(issueTreeHolds)
|
|
.where(and(eq(issueTreeHolds.id, holdId), eq(issueTreeHolds.companyId, companyId)))
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!existing) throw notFound("Issue tree hold not found");
|
|
if (existing.rootIssueId !== rootIssueId) {
|
|
throw unprocessable("Issue tree hold does not belong to the requested root issue");
|
|
}
|
|
if (existing.status === "released") {
|
|
throw conflict("Issue tree hold is already released");
|
|
}
|
|
|
|
const [updated] = await db
|
|
.update(issueTreeHolds)
|
|
.set({
|
|
status: "released",
|
|
releasedAt: new Date(),
|
|
releasedByActorType: input.actor.actorType,
|
|
releasedByAgentId: input.actor.agentId ?? null,
|
|
releasedByUserId: input.actor.userId ?? (input.actor.actorType === "user" ? input.actor.actorId : null),
|
|
releasedByRunId: input.actor.runId ?? null,
|
|
releaseReason: input.reason ?? null,
|
|
releasePolicy: input.releasePolicy
|
|
? (normalizeReleasePolicy(input.releasePolicy) as unknown as Record<string, unknown>)
|
|
: existing.releasePolicy,
|
|
releaseMetadata: input.metadata ?? null,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(and(eq(issueTreeHolds.id, holdId), eq(issueTreeHolds.companyId, companyId)))
|
|
.returning();
|
|
|
|
const members = await db
|
|
.select()
|
|
.from(issueTreeHoldMembers)
|
|
.where(and(eq(issueTreeHoldMembers.companyId, companyId), eq(issueTreeHoldMembers.holdId, holdId)))
|
|
.orderBy(asc(issueTreeHoldMembers.depth), asc(issueTreeHoldMembers.createdAt), asc(issueTreeHoldMembers.issueId));
|
|
|
|
return toHold(updated, members);
|
|
}
|
|
|
|
async function cancelUnclaimedWakeupsForTree(companyId: string, rootIssueId: string, reason: string) {
|
|
const treeIssues = await listTreeIssues(companyId, rootIssueId);
|
|
const issueIds = treeIssues.map((issue) => issue.id);
|
|
if (issueIds.length === 0) return [];
|
|
const now = new Date();
|
|
return db
|
|
.update(agentWakeupRequests)
|
|
.set({
|
|
status: "cancelled",
|
|
finishedAt: now,
|
|
error: reason,
|
|
updatedAt: now,
|
|
})
|
|
.where(
|
|
and(
|
|
eq(agentWakeupRequests.companyId, companyId),
|
|
inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]),
|
|
isNull(agentWakeupRequests.runId),
|
|
inArray(sql<string | null>`${agentWakeupRequests.payload} ->> 'issueId'`, issueIds),
|
|
),
|
|
)
|
|
.returning({
|
|
id: agentWakeupRequests.id,
|
|
agentId: agentWakeupRequests.agentId,
|
|
reason: agentWakeupRequests.reason,
|
|
payload: agentWakeupRequests.payload,
|
|
});
|
|
}
|
|
|
|
return {
|
|
listTreeIssues,
|
|
preview,
|
|
createHold,
|
|
cancelIssueStatusesForHold,
|
|
restoreIssueStatusesForHold,
|
|
getHold,
|
|
listHolds,
|
|
getActivePauseHoldGate,
|
|
releaseHold,
|
|
cancelUnclaimedWakeupsForTree,
|
|
};
|
|
}
|