feat(app): keep React session state in shared sync cache

Move React session transcript ownership out of the mounted chat view by sharing one query client across islands and applying workspace event updates into per-session query cache, mirroring the global sync pattern used by OpenCode.
This commit is contained in:
Benjamin Shafii
2026-04-05 20:25:07 -07:00
parent 1e29e2feee
commit 49df59d6ef
4 changed files with 319 additions and 57 deletions

View File

@@ -1,7 +1,8 @@
import { createEffect, onCleanup, onMount } from "solid-js";
import { createElement, Fragment, type ComponentType } from "react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { QueryClientProvider } from "@tanstack/react-query";
import { createRoot, type Root } from "react-dom/client";
import { getReactQueryClient } from "./kernel/query-client";
type ReactIslandProps<T extends object> = {
component: ComponentType<T>;
@@ -13,7 +14,7 @@ type ReactIslandProps<T extends object> = {
export function ReactIsland<T extends object>(props: ReactIslandProps<T>) {
let container: HTMLDivElement | undefined;
let root: Root | null = null;
const queryClient = new QueryClient();
const queryClient = getReactQueryClient();
const render = () => {
if (!root) return;

View File

@@ -0,0 +1,9 @@
import { QueryClient } from "@tanstack/react-query";
let client: QueryClient | null = null;
export function getReactQueryClient() {
if (client) return client;
client = new QueryClient();
return client;
}

View File

@@ -1,14 +1,22 @@
/** @jsxImportSource react */
import { useEffect, useMemo, useRef, useState } from "react";
import { useChat } from "@ai-sdk/react";
import type { UIMessage } from "ai";
import { useQuery, useQueryClient } from "@tanstack/react-query";
import { createClient } from "../../app/lib/opencode";
import { abortSessionSafe } from "../../app/lib/opencode-session";
import type { OpenworkServerClient, OpenworkSessionSnapshot } from "../../app/lib/openwork-server";
import { SessionDebugPanel } from "./debug-panel.react";
import { deriveSessionRenderModel } from "./transition-controller";
import { SessionTranscript } from "./message-list.react";
import { createOpenworkChatTransport, snapshotToUIMessages } from "./usechat-adapter";
import {
ensureWorkspaceSessionSync,
seedSessionState,
statusKey as reactStatusKey,
todoKey as reactTodoKey,
transcriptKey as reactTranscriptKey,
} from "./session-sync";
import { snapshotToUIMessages } from "./usechat-adapter";
type SessionSurfaceProps = {
client: OpenworkServerClient;
@@ -29,38 +37,32 @@ function statusLabel(snapshot: OpenworkSessionSnapshot | undefined, busy: boolea
export function SessionSurface(props: SessionSurfaceProps) {
const [draft, setDraft] = useState("");
const [error, setError] = useState<string | null>(null);
const [sending, setSending] = useState(false);
const [rendered, setRendered] = useState<{
sessionId: string;
snapshot: OpenworkSessionSnapshot;
} | null>(null);
const hydratedKeyRef = useRef<string | null>(null);
const queryClient = useQueryClient();
const transport = useMemo(
() =>
createOpenworkChatTransport({
baseUrl: props.opencodeBaseUrl,
openworkToken: props.openworkToken,
sessionId: props.sessionId,
}),
[props.opencodeBaseUrl, props.openworkToken, props.sessionId],
const opencodeClient = useMemo(
() => createClient(props.opencodeBaseUrl, undefined, { token: props.openworkToken, mode: "openwork" }),
[props.opencodeBaseUrl, props.openworkToken],
);
const chat = useChat({
id: props.sessionId,
transport,
onError(nextError) {
setError(nextError.message);
},
onFinish() {
void query.refetch();
},
});
const queryKey = useMemo(
() => ["react-session-snapshot", props.workspaceId, props.sessionId],
[props.workspaceId, props.sessionId],
);
const transcriptKey = useMemo(
() => ["react-session-transcript", props.workspaceId, props.sessionId],
const transcriptQueryKey = useMemo(
() => reactTranscriptKey(props.workspaceId, props.sessionId),
[props.workspaceId, props.sessionId],
);
const statusQueryKey = useMemo(
() => reactStatusKey(props.workspaceId, props.sessionId),
[props.workspaceId, props.sessionId],
);
const todoQueryKey = useMemo(
() => reactTodoKey(props.workspaceId, props.sessionId),
[props.workspaceId, props.sessionId],
);
@@ -71,14 +73,28 @@ export function SessionSurface(props: SessionSurfaceProps) {
});
const currentSnapshot = query.data?.session.id === props.sessionId ? query.data : null;
const statusQuery = useQuery({
queryKey: statusQueryKey,
queryFn: async () => currentSnapshot?.status ?? { type: "idle" as const },
initialData: () => queryClient.getQueryData(statusQueryKey) ?? currentSnapshot?.status ?? { type: "idle" as const },
staleTime: Infinity,
gcTime: 1000 * 60 * 60,
});
useQuery({
queryKey: todoQueryKey,
queryFn: async () => currentSnapshot?.todos ?? [],
initialData: () => queryClient.getQueryData(todoQueryKey) ?? currentSnapshot?.todos ?? [],
staleTime: Infinity,
gcTime: 1000 * 60 * 60,
});
const transcriptQuery = useQuery<UIMessage[]>({
queryKey: transcriptKey,
queryKey: transcriptQueryKey,
queryFn: async () => {
if (currentSnapshot) return snapshotToUIMessages(currentSnapshot);
return [];
},
initialData: () => {
const cached = queryClient.getQueryData<UIMessage[]>(transcriptKey);
const cached = queryClient.getQueryData<UIMessage[]>(transcriptQueryKey);
if (cached) return cached;
return currentSnapshot ? snapshotToUIMessages(currentSnapshot) : [];
},
@@ -86,6 +102,14 @@ export function SessionSurface(props: SessionSurfaceProps) {
gcTime: 1000 * 60 * 60,
});
useEffect(() => {
return ensureWorkspaceSessionSync({
workspaceId: props.workspaceId,
baseUrl: props.opencodeBaseUrl,
openworkToken: props.openworkToken,
});
}, [props.workspaceId, props.opencodeBaseUrl, props.openworkToken]);
useEffect(() => {
if (!currentSnapshot) return;
setRendered({ sessionId: props.sessionId, snapshot: currentSnapshot });
@@ -94,63 +118,52 @@ export function SessionSurface(props: SessionSurfaceProps) {
useEffect(() => {
hydratedKeyRef.current = null;
setError(null);
setSending(false);
}, [props.sessionId]);
useEffect(() => {
if (!currentSnapshot) return;
const existing = queryClient.getQueryData<UIMessage[]>(transcriptKey);
if (!existing || existing.length === 0) {
queryClient.setQueryData(transcriptKey, snapshotToUIMessages(currentSnapshot));
}
}, [currentSnapshot, queryClient, transcriptKey]);
useEffect(() => {
const cached = transcriptQuery.data;
if (!cached || cached.length === 0) return;
if (chat.messages.length > 0) return;
chat.setMessages(cached);
}, [chat, chat.messages.length, transcriptQuery.data]);
useEffect(() => {
if (chat.messages.length === 0) return;
queryClient.setQueryData(transcriptKey, chat.messages);
}, [chat.messages, queryClient, transcriptKey]);
seedSessionState(props.workspaceId, currentSnapshot);
}, [currentSnapshot, props.workspaceId]);
useEffect(() => {
if (!currentSnapshot) return;
const key = `${props.sessionId}:${currentSnapshot.session.time?.updated ?? currentSnapshot.session.time?.created ?? 0}:${currentSnapshot.messages.length}`;
if (hydratedKeyRef.current === key) return;
hydratedKeyRef.current = key;
const nextMessages = snapshotToUIMessages(currentSnapshot);
if (chat.messages.length === 0) {
chat.setMessages(nextMessages);
}
queryClient.setQueryData(transcriptKey, (current: UIMessage[] | undefined) =>
current && current.length > 0 ? current : nextMessages,
);
}, [chat, chat.messages.length, props.sessionId, currentSnapshot, queryClient, transcriptKey]);
seedSessionState(props.workspaceId, currentSnapshot);
}, [props.sessionId, currentSnapshot, props.workspaceId]);
const snapshot = currentSnapshot ?? rendered?.snapshot ?? null;
const chatStreaming = chat.status === "submitted" || chat.status === "streaming";
const renderedMessages = transcriptQuery.data && transcriptQuery.data.length > 0 ? transcriptQuery.data : chat.messages;
const liveStatus = statusQuery.data ?? snapshot?.status ?? { type: "idle" as const };
const chatStreaming = sending || liveStatus.type === "busy" || liveStatus.type === "retry";
const renderedMessages = transcriptQuery.data ?? [];
const model = deriveSessionRenderModel({
intendedSessionId: props.sessionId,
renderedSessionId:
renderedMessages.length > 0 || query.data ? props.sessionId : rendered?.sessionId ?? null,
hasSnapshot: Boolean(snapshot) || renderedMessages.length > 0,
isFetching: query.isFetching || chatStreaming,
isError: query.isError || chat.status === "error",
isError: query.isError || Boolean(error),
});
const handleSend = async () => {
const text = draft.trim();
if (!text || chatStreaming) return;
setError(null);
setSending(true);
try {
await chat.sendMessage({ text });
const result = await opencodeClient.session.promptAsync({
sessionID: props.sessionId,
parts: [{ type: "text", text }],
});
if (result.error) {
throw result.error instanceof Error ? result.error : new Error(String(result.error));
}
setDraft("");
} catch (nextError) {
setError(nextError instanceof Error ? nextError.message : "Failed to send prompt.");
setSending(false);
}
};
@@ -158,13 +171,19 @@ export function SessionSurface(props: SessionSurfaceProps) {
if (!chatStreaming) return;
setError(null);
try {
chat.stop();
await abortSessionSafe(opencodeClient, props.sessionId);
await query.refetch();
} catch (nextError) {
setError(nextError instanceof Error ? nextError.message : "Failed to stop run.");
}
};
useEffect(() => {
if (liveStatus.type === "idle") {
setSending(false);
}
}, [liveStatus.type]);
const onComposerKeyDown = async (event: React.KeyboardEvent<HTMLTextAreaElement>) => {
if (!event.metaKey && !event.ctrlKey) return;
if (event.key !== "Enter") return;
@@ -188,7 +207,7 @@ export function SessionSurface(props: SessionSurfaceProps) {
<div className="text-sm text-dls-secondary">Loading React session view...</div>
</div>
</div>
) : (query.isError || chat.status === "error") && !snapshot && renderedMessages.length === 0 ? (
) : (query.isError || error) && !snapshot && renderedMessages.length === 0 ? (
<div className="px-6 py-16">
<div className="mx-auto max-w-xl rounded-3xl border border-red-6/40 bg-red-3/20 px-6 py-5 text-sm text-red-11">
{error || (query.error instanceof Error ? query.error.message : "Failed to load React session view.")}

View File

@@ -0,0 +1,233 @@
import type { UIMessage } from "ai";
import type { Part, SessionStatus, Todo } from "@opencode-ai/sdk/v2/client";
import { getReactQueryClient } from "../kernel/query-client";
import { createClient } from "../../app/lib/opencode";
import { normalizeEvent } from "../../app/utils";
import type { OpencodeEvent } from "../../app/types";
import { snapshotToUIMessages } from "./usechat-adapter";
import type { OpenworkSessionSnapshot } from "../../app/lib/openwork-server";
type SyncOptions = {
workspaceId: string;
baseUrl: string;
openworkToken: string;
};
type SyncEntry = {
refs: number;
stopTimer: ReturnType<typeof setTimeout> | null;
dispose: () => void;
};
const idleStatus: SessionStatus = { type: "idle" };
const syncs = new Map<string, SyncEntry>();
export const transcriptKey = (workspaceId: string, sessionId: string) => ["react-session-transcript", workspaceId, sessionId] as const;
export const statusKey = (workspaceId: string, sessionId: string) => ["react-session-status", workspaceId, sessionId] as const;
export const todoKey = (workspaceId: string, sessionId: string) => ["react-session-todos", workspaceId, sessionId] as const;
function syncKey(input: SyncOptions) {
return `${input.workspaceId}:${input.baseUrl}:${input.openworkToken}`;
}
function toTextPart(part: Part): UIMessage["parts"][number] | null {
if (part.type === "text") {
return { type: "text", text: typeof (part as { text?: unknown }).text === "string" ? (part as { text: string }).text : "", state: "done" };
}
if (part.type === "reasoning") {
return { type: "reasoning", text: typeof (part as { text?: unknown }).text === "string" ? (part as { text: string }).text : "", state: "done" };
}
if (part.type === "file") {
const file = part as Part & { url?: string; filename?: string; mime?: string };
if (!file.url) return null;
return { type: "file", url: file.url, filename: file.filename, mediaType: file.mime ?? "application/octet-stream" };
}
if (part.type === "tool") {
const record = part as Part & { tool?: string; state?: Record<string, unknown> };
const state = record.state ?? {};
const toolName = typeof record.tool === "string" ? record.tool : "tool";
if (typeof state.error === "string" && state.error.trim()) {
return {
type: "dynamic-tool",
toolName,
toolCallId: part.id,
state: "output-error",
input: state.input,
errorText: state.error,
};
}
if (state.output !== undefined) {
return {
type: "dynamic-tool",
toolName,
toolCallId: part.id,
state: "output-available",
input: state.input,
output: state.output,
};
}
return {
type: "dynamic-tool",
toolName,
toolCallId: part.id,
state: "input-available",
input: state.input,
};
}
if (part.type === "step-start") return { type: "step-start" };
return null;
}
function upsertMessage(messages: UIMessage[], next: UIMessage) {
const index = messages.findIndex((message) => message.id === next.id);
if (index === -1) return [...messages, next];
return messages.map((message, messageIndex) => (messageIndex === index ? { ...message, ...next } : message));
}
function upsertPart(messages: UIMessage[], messageId: string, partId: string, next: UIMessage["parts"][number]) {
return messages.map((message) => {
if (message.id !== messageId) return message;
const index = message.parts.findIndex((part) => "toolCallId" in part ? part.toolCallId === partId : "id" in part ? (part as { id?: string }).id === partId : false);
if (index === -1) {
return { ...message, parts: [...message.parts, next] };
}
const parts = message.parts.slice();
parts[index] = next;
return { ...message, parts };
});
}
function appendDelta(messages: UIMessage[], messageId: string, partId: string, delta: string, reasoning: boolean) {
return messages.map((message) => {
if (message.id !== messageId) return message;
const parts = message.parts.map((part) => {
if (reasoning && part.type === "reasoning") {
const id = `${messageId}:reasoning:${message.parts.indexOf(part)}`;
if (id === partId || partId === id) return { ...part, text: `${part.text}${delta}`, state: "streaming" as const };
}
if (!reasoning && part.type === "text") {
const id = `${messageId}:text:${message.parts.indexOf(part)}`;
if (id === partId || partId === id) return { ...part, text: `${part.text}${delta}`, state: "streaming" as const };
}
if (part.type === "dynamic-tool" && part.toolCallId === partId) return part;
return part;
});
return { ...message, parts };
});
}
function applyEvent(workspaceId: string, event: OpencodeEvent) {
const queryClient = getReactQueryClient();
if (event.type === "session.status") {
const props = (event.properties ?? {}) as { sessionID?: string; status?: SessionStatus };
if (!props.sessionID || !props.status) return;
queryClient.setQueryData(statusKey(workspaceId, props.sessionID), props.status);
return;
}
if (event.type === "todo.updated") {
const props = (event.properties ?? {}) as { sessionID?: string; todos?: Todo[] };
if (!props.sessionID || !props.todos) return;
queryClient.setQueryData(todoKey(workspaceId, props.sessionID), props.todos);
return;
}
if (event.type === "message.updated") {
const props = (event.properties ?? {}) as { info?: { id?: string; role?: UIMessage["role"] | string; sessionID?: string } };
const info = props.info;
if (!info?.id || !info.sessionID || (info.role !== "user" && info.role !== "assistant" && info.role !== "system")) return;
const next = { id: info.id, role: info.role, parts: [] } satisfies UIMessage;
queryClient.setQueryData<UIMessage[]>(transcriptKey(workspaceId, info.sessionID), (current = []) =>
upsertMessage(current, next),
);
return;
}
if (event.type === "message.part.updated") {
const props = (event.properties ?? {}) as { part?: Part };
const part = props.part;
if (!part?.sessionID || !part.messageID) return;
const mapped = toTextPart(part);
if (!mapped) return;
queryClient.setQueryData<UIMessage[]>(transcriptKey(workspaceId, part.sessionID), (current = []) => {
const withMessage = upsertMessage(current, { id: part.messageID, role: "assistant", parts: [] });
return upsertPart(withMessage, part.messageID, part.id, mapped);
});
return;
}
if (event.type === "message.part.delta") {
const props = (event.properties ?? {}) as { sessionID?: string; messageID?: string; partID?: string; field?: string; delta?: string };
if (!props.sessionID || !props.messageID || !props.partID || !props.delta) return;
queryClient.setQueryData<UIMessage[]>(transcriptKey(workspaceId, props.sessionID), (current = []) =>
appendDelta(current, props.messageID!, props.partID!, props.delta!, props.field === "reasoning"),
);
return;
}
if (event.type === "session.idle") {
const props = (event.properties ?? {}) as { sessionID?: string };
if (!props.sessionID) return;
queryClient.setQueryData(statusKey(workspaceId, props.sessionID), idleStatus);
}
}
function startSync(input: SyncOptions) {
const client = createClient(input.baseUrl, undefined, { token: input.openworkToken, mode: "openwork" });
const controller = new AbortController();
void client.event.subscribe(undefined, { signal: controller.signal }).then((sub) => {
void (async () => {
for await (const raw of sub.stream) {
if (controller.signal.aborted) return;
const event = normalizeEvent(raw);
if (!event) continue;
applyEvent(input.workspaceId, event);
}
})();
});
return () => controller.abort();
}
export function ensureWorkspaceSessionSync(input: SyncOptions) {
const key = syncKey(input);
const existing = syncs.get(key);
if (existing) {
existing.refs += 1;
if (existing.stopTimer) {
clearTimeout(existing.stopTimer);
existing.stopTimer = null;
}
return () => releaseWorkspaceSessionSync(input);
}
syncs.set(key, {
refs: 1,
stopTimer: null,
dispose: startSync(input),
});
return () => releaseWorkspaceSessionSync(input);
}
function releaseWorkspaceSessionSync(input: SyncOptions) {
const key = syncKey(input);
const existing = syncs.get(key);
if (!existing) return;
existing.refs -= 1;
if (existing.refs > 0) return;
existing.stopTimer = setTimeout(() => {
existing.dispose();
syncs.delete(key);
}, 10_000);
}
export function seedSessionState(workspaceId: string, snapshot: OpenworkSessionSnapshot) {
const queryClient = getReactQueryClient();
queryClient.setQueryData(transcriptKey(workspaceId, snapshot.session.id), snapshotToUIMessages(snapshot));
queryClient.setQueryData(statusKey(workspaceId, snapshot.session.id), snapshot.status);
queryClient.setQueryData(todoKey(workspaceId, snapshot.session.id), snapshot.todos);
}