diff --git a/apps/app/src/react/island.tsx b/apps/app/src/react/island.tsx index bbb4d2ae..101680b1 100644 --- a/apps/app/src/react/island.tsx +++ b/apps/app/src/react/island.tsx @@ -1,7 +1,8 @@ import { createEffect, onCleanup, onMount } from "solid-js"; import { createElement, Fragment, type ComponentType } from "react"; -import { QueryClient, QueryClientProvider } from "@tanstack/react-query"; +import { QueryClientProvider } from "@tanstack/react-query"; import { createRoot, type Root } from "react-dom/client"; +import { getReactQueryClient } from "./kernel/query-client"; type ReactIslandProps = { component: ComponentType; @@ -13,7 +14,7 @@ type ReactIslandProps = { export function ReactIsland(props: ReactIslandProps) { let container: HTMLDivElement | undefined; let root: Root | null = null; - const queryClient = new QueryClient(); + const queryClient = getReactQueryClient(); const render = () => { if (!root) return; diff --git a/apps/app/src/react/kernel/query-client.ts b/apps/app/src/react/kernel/query-client.ts new file mode 100644 index 00000000..67b7c241 --- /dev/null +++ b/apps/app/src/react/kernel/query-client.ts @@ -0,0 +1,9 @@ +import { QueryClient } from "@tanstack/react-query"; + +let client: QueryClient | null = null; + +export function getReactQueryClient() { + if (client) return client; + client = new QueryClient(); + return client; +} diff --git a/apps/app/src/react/session/session-surface.react.tsx b/apps/app/src/react/session/session-surface.react.tsx index 3b7b85ad..d79530da 100644 --- a/apps/app/src/react/session/session-surface.react.tsx +++ b/apps/app/src/react/session/session-surface.react.tsx @@ -1,14 +1,22 @@ /** @jsxImportSource react */ import { useEffect, useMemo, useRef, useState } from "react"; -import { useChat } from "@ai-sdk/react"; import type { UIMessage } from "ai"; import { useQuery, useQueryClient } from "@tanstack/react-query"; +import { createClient } from "../../app/lib/opencode"; +import { abortSessionSafe } from "../../app/lib/opencode-session"; import type { OpenworkServerClient, OpenworkSessionSnapshot } from "../../app/lib/openwork-server"; import { SessionDebugPanel } from "./debug-panel.react"; import { deriveSessionRenderModel } from "./transition-controller"; import { SessionTranscript } from "./message-list.react"; -import { createOpenworkChatTransport, snapshotToUIMessages } from "./usechat-adapter"; +import { + ensureWorkspaceSessionSync, + seedSessionState, + statusKey as reactStatusKey, + todoKey as reactTodoKey, + transcriptKey as reactTranscriptKey, +} from "./session-sync"; +import { snapshotToUIMessages } from "./usechat-adapter"; type SessionSurfaceProps = { client: OpenworkServerClient; @@ -29,38 +37,32 @@ function statusLabel(snapshot: OpenworkSessionSnapshot | undefined, busy: boolea export function SessionSurface(props: SessionSurfaceProps) { const [draft, setDraft] = useState(""); const [error, setError] = useState(null); + const [sending, setSending] = useState(false); const [rendered, setRendered] = useState<{ sessionId: string; snapshot: OpenworkSessionSnapshot; } | null>(null); const hydratedKeyRef = useRef(null); const queryClient = useQueryClient(); - const transport = useMemo( - () => - createOpenworkChatTransport({ - baseUrl: props.opencodeBaseUrl, - openworkToken: props.openworkToken, - sessionId: props.sessionId, - }), - [props.opencodeBaseUrl, props.openworkToken, props.sessionId], + const opencodeClient = useMemo( + () => createClient(props.opencodeBaseUrl, undefined, { token: props.openworkToken, mode: "openwork" }), + [props.opencodeBaseUrl, props.openworkToken], ); - const chat = useChat({ - id: props.sessionId, - transport, - onError(nextError) { - setError(nextError.message); - }, - onFinish() { - void query.refetch(); - }, - }); const queryKey = useMemo( () => ["react-session-snapshot", props.workspaceId, props.sessionId], [props.workspaceId, props.sessionId], ); - const transcriptKey = useMemo( - () => ["react-session-transcript", props.workspaceId, props.sessionId], + const transcriptQueryKey = useMemo( + () => reactTranscriptKey(props.workspaceId, props.sessionId), + [props.workspaceId, props.sessionId], + ); + const statusQueryKey = useMemo( + () => reactStatusKey(props.workspaceId, props.sessionId), + [props.workspaceId, props.sessionId], + ); + const todoQueryKey = useMemo( + () => reactTodoKey(props.workspaceId, props.sessionId), [props.workspaceId, props.sessionId], ); @@ -71,14 +73,28 @@ export function SessionSurface(props: SessionSurfaceProps) { }); const currentSnapshot = query.data?.session.id === props.sessionId ? query.data : null; + const statusQuery = useQuery({ + queryKey: statusQueryKey, + queryFn: async () => currentSnapshot?.status ?? { type: "idle" as const }, + initialData: () => queryClient.getQueryData(statusQueryKey) ?? currentSnapshot?.status ?? { type: "idle" as const }, + staleTime: Infinity, + gcTime: 1000 * 60 * 60, + }); + useQuery({ + queryKey: todoQueryKey, + queryFn: async () => currentSnapshot?.todos ?? [], + initialData: () => queryClient.getQueryData(todoQueryKey) ?? currentSnapshot?.todos ?? [], + staleTime: Infinity, + gcTime: 1000 * 60 * 60, + }); const transcriptQuery = useQuery({ - queryKey: transcriptKey, + queryKey: transcriptQueryKey, queryFn: async () => { if (currentSnapshot) return snapshotToUIMessages(currentSnapshot); return []; }, initialData: () => { - const cached = queryClient.getQueryData(transcriptKey); + const cached = queryClient.getQueryData(transcriptQueryKey); if (cached) return cached; return currentSnapshot ? snapshotToUIMessages(currentSnapshot) : []; }, @@ -86,6 +102,14 @@ export function SessionSurface(props: SessionSurfaceProps) { gcTime: 1000 * 60 * 60, }); + useEffect(() => { + return ensureWorkspaceSessionSync({ + workspaceId: props.workspaceId, + baseUrl: props.opencodeBaseUrl, + openworkToken: props.openworkToken, + }); + }, [props.workspaceId, props.opencodeBaseUrl, props.openworkToken]); + useEffect(() => { if (!currentSnapshot) return; setRendered({ sessionId: props.sessionId, snapshot: currentSnapshot }); @@ -94,63 +118,52 @@ export function SessionSurface(props: SessionSurfaceProps) { useEffect(() => { hydratedKeyRef.current = null; setError(null); + setSending(false); }, [props.sessionId]); useEffect(() => { if (!currentSnapshot) return; - const existing = queryClient.getQueryData(transcriptKey); - if (!existing || existing.length === 0) { - queryClient.setQueryData(transcriptKey, snapshotToUIMessages(currentSnapshot)); - } - }, [currentSnapshot, queryClient, transcriptKey]); - - useEffect(() => { - const cached = transcriptQuery.data; - if (!cached || cached.length === 0) return; - if (chat.messages.length > 0) return; - chat.setMessages(cached); - }, [chat, chat.messages.length, transcriptQuery.data]); - - useEffect(() => { - if (chat.messages.length === 0) return; - queryClient.setQueryData(transcriptKey, chat.messages); - }, [chat.messages, queryClient, transcriptKey]); + seedSessionState(props.workspaceId, currentSnapshot); + }, [currentSnapshot, props.workspaceId]); useEffect(() => { if (!currentSnapshot) return; const key = `${props.sessionId}:${currentSnapshot.session.time?.updated ?? currentSnapshot.session.time?.created ?? 0}:${currentSnapshot.messages.length}`; if (hydratedKeyRef.current === key) return; hydratedKeyRef.current = key; - const nextMessages = snapshotToUIMessages(currentSnapshot); - if (chat.messages.length === 0) { - chat.setMessages(nextMessages); - } - queryClient.setQueryData(transcriptKey, (current: UIMessage[] | undefined) => - current && current.length > 0 ? current : nextMessages, - ); - }, [chat, chat.messages.length, props.sessionId, currentSnapshot, queryClient, transcriptKey]); + seedSessionState(props.workspaceId, currentSnapshot); + }, [props.sessionId, currentSnapshot, props.workspaceId]); const snapshot = currentSnapshot ?? rendered?.snapshot ?? null; - const chatStreaming = chat.status === "submitted" || chat.status === "streaming"; - const renderedMessages = transcriptQuery.data && transcriptQuery.data.length > 0 ? transcriptQuery.data : chat.messages; + const liveStatus = statusQuery.data ?? snapshot?.status ?? { type: "idle" as const }; + const chatStreaming = sending || liveStatus.type === "busy" || liveStatus.type === "retry"; + const renderedMessages = transcriptQuery.data ?? []; const model = deriveSessionRenderModel({ intendedSessionId: props.sessionId, renderedSessionId: renderedMessages.length > 0 || query.data ? props.sessionId : rendered?.sessionId ?? null, hasSnapshot: Boolean(snapshot) || renderedMessages.length > 0, isFetching: query.isFetching || chatStreaming, - isError: query.isError || chat.status === "error", + isError: query.isError || Boolean(error), }); const handleSend = async () => { const text = draft.trim(); if (!text || chatStreaming) return; setError(null); + setSending(true); try { - await chat.sendMessage({ text }); + const result = await opencodeClient.session.promptAsync({ + sessionID: props.sessionId, + parts: [{ type: "text", text }], + }); + if (result.error) { + throw result.error instanceof Error ? result.error : new Error(String(result.error)); + } setDraft(""); } catch (nextError) { setError(nextError instanceof Error ? nextError.message : "Failed to send prompt."); + setSending(false); } }; @@ -158,13 +171,19 @@ export function SessionSurface(props: SessionSurfaceProps) { if (!chatStreaming) return; setError(null); try { - chat.stop(); + await abortSessionSafe(opencodeClient, props.sessionId); await query.refetch(); } catch (nextError) { setError(nextError instanceof Error ? nextError.message : "Failed to stop run."); } }; + useEffect(() => { + if (liveStatus.type === "idle") { + setSending(false); + } + }, [liveStatus.type]); + const onComposerKeyDown = async (event: React.KeyboardEvent) => { if (!event.metaKey && !event.ctrlKey) return; if (event.key !== "Enter") return; @@ -188,7 +207,7 @@ export function SessionSurface(props: SessionSurfaceProps) {
Loading React session view...
- ) : (query.isError || chat.status === "error") && !snapshot && renderedMessages.length === 0 ? ( + ) : (query.isError || error) && !snapshot && renderedMessages.length === 0 ? (
{error || (query.error instanceof Error ? query.error.message : "Failed to load React session view.")} diff --git a/apps/app/src/react/session/session-sync.ts b/apps/app/src/react/session/session-sync.ts new file mode 100644 index 00000000..a5f4d2a0 --- /dev/null +++ b/apps/app/src/react/session/session-sync.ts @@ -0,0 +1,233 @@ +import type { UIMessage } from "ai"; +import type { Part, SessionStatus, Todo } from "@opencode-ai/sdk/v2/client"; + +import { getReactQueryClient } from "../kernel/query-client"; +import { createClient } from "../../app/lib/opencode"; +import { normalizeEvent } from "../../app/utils"; +import type { OpencodeEvent } from "../../app/types"; +import { snapshotToUIMessages } from "./usechat-adapter"; +import type { OpenworkSessionSnapshot } from "../../app/lib/openwork-server"; + +type SyncOptions = { + workspaceId: string; + baseUrl: string; + openworkToken: string; +}; + +type SyncEntry = { + refs: number; + stopTimer: ReturnType | null; + dispose: () => void; +}; + +const idleStatus: SessionStatus = { type: "idle" }; +const syncs = new Map(); + +export const transcriptKey = (workspaceId: string, sessionId: string) => ["react-session-transcript", workspaceId, sessionId] as const; +export const statusKey = (workspaceId: string, sessionId: string) => ["react-session-status", workspaceId, sessionId] as const; +export const todoKey = (workspaceId: string, sessionId: string) => ["react-session-todos", workspaceId, sessionId] as const; + +function syncKey(input: SyncOptions) { + return `${input.workspaceId}:${input.baseUrl}:${input.openworkToken}`; +} + +function toTextPart(part: Part): UIMessage["parts"][number] | null { + if (part.type === "text") { + return { type: "text", text: typeof (part as { text?: unknown }).text === "string" ? (part as { text: string }).text : "", state: "done" }; + } + if (part.type === "reasoning") { + return { type: "reasoning", text: typeof (part as { text?: unknown }).text === "string" ? (part as { text: string }).text : "", state: "done" }; + } + if (part.type === "file") { + const file = part as Part & { url?: string; filename?: string; mime?: string }; + if (!file.url) return null; + return { type: "file", url: file.url, filename: file.filename, mediaType: file.mime ?? "application/octet-stream" }; + } + if (part.type === "tool") { + const record = part as Part & { tool?: string; state?: Record }; + const state = record.state ?? {}; + const toolName = typeof record.tool === "string" ? record.tool : "tool"; + if (typeof state.error === "string" && state.error.trim()) { + return { + type: "dynamic-tool", + toolName, + toolCallId: part.id, + state: "output-error", + input: state.input, + errorText: state.error, + }; + } + if (state.output !== undefined) { + return { + type: "dynamic-tool", + toolName, + toolCallId: part.id, + state: "output-available", + input: state.input, + output: state.output, + }; + } + return { + type: "dynamic-tool", + toolName, + toolCallId: part.id, + state: "input-available", + input: state.input, + }; + } + if (part.type === "step-start") return { type: "step-start" }; + return null; +} + +function upsertMessage(messages: UIMessage[], next: UIMessage) { + const index = messages.findIndex((message) => message.id === next.id); + if (index === -1) return [...messages, next]; + return messages.map((message, messageIndex) => (messageIndex === index ? { ...message, ...next } : message)); +} + +function upsertPart(messages: UIMessage[], messageId: string, partId: string, next: UIMessage["parts"][number]) { + return messages.map((message) => { + if (message.id !== messageId) return message; + const index = message.parts.findIndex((part) => "toolCallId" in part ? part.toolCallId === partId : "id" in part ? (part as { id?: string }).id === partId : false); + if (index === -1) { + return { ...message, parts: [...message.parts, next] }; + } + const parts = message.parts.slice(); + parts[index] = next; + return { ...message, parts }; + }); +} + +function appendDelta(messages: UIMessage[], messageId: string, partId: string, delta: string, reasoning: boolean) { + return messages.map((message) => { + if (message.id !== messageId) return message; + const parts = message.parts.map((part) => { + if (reasoning && part.type === "reasoning") { + const id = `${messageId}:reasoning:${message.parts.indexOf(part)}`; + if (id === partId || partId === id) return { ...part, text: `${part.text}${delta}`, state: "streaming" as const }; + } + if (!reasoning && part.type === "text") { + const id = `${messageId}:text:${message.parts.indexOf(part)}`; + if (id === partId || partId === id) return { ...part, text: `${part.text}${delta}`, state: "streaming" as const }; + } + if (part.type === "dynamic-tool" && part.toolCallId === partId) return part; + return part; + }); + return { ...message, parts }; + }); +} + +function applyEvent(workspaceId: string, event: OpencodeEvent) { + const queryClient = getReactQueryClient(); + + if (event.type === "session.status") { + const props = (event.properties ?? {}) as { sessionID?: string; status?: SessionStatus }; + if (!props.sessionID || !props.status) return; + queryClient.setQueryData(statusKey(workspaceId, props.sessionID), props.status); + return; + } + + if (event.type === "todo.updated") { + const props = (event.properties ?? {}) as { sessionID?: string; todos?: Todo[] }; + if (!props.sessionID || !props.todos) return; + queryClient.setQueryData(todoKey(workspaceId, props.sessionID), props.todos); + return; + } + + if (event.type === "message.updated") { + const props = (event.properties ?? {}) as { info?: { id?: string; role?: UIMessage["role"] | string; sessionID?: string } }; + const info = props.info; + if (!info?.id || !info.sessionID || (info.role !== "user" && info.role !== "assistant" && info.role !== "system")) return; + const next = { id: info.id, role: info.role, parts: [] } satisfies UIMessage; + queryClient.setQueryData(transcriptKey(workspaceId, info.sessionID), (current = []) => + upsertMessage(current, next), + ); + return; + } + + if (event.type === "message.part.updated") { + const props = (event.properties ?? {}) as { part?: Part }; + const part = props.part; + if (!part?.sessionID || !part.messageID) return; + const mapped = toTextPart(part); + if (!mapped) return; + queryClient.setQueryData(transcriptKey(workspaceId, part.sessionID), (current = []) => { + const withMessage = upsertMessage(current, { id: part.messageID, role: "assistant", parts: [] }); + return upsertPart(withMessage, part.messageID, part.id, mapped); + }); + return; + } + + if (event.type === "message.part.delta") { + const props = (event.properties ?? {}) as { sessionID?: string; messageID?: string; partID?: string; field?: string; delta?: string }; + if (!props.sessionID || !props.messageID || !props.partID || !props.delta) return; + queryClient.setQueryData(transcriptKey(workspaceId, props.sessionID), (current = []) => + appendDelta(current, props.messageID!, props.partID!, props.delta!, props.field === "reasoning"), + ); + return; + } + + if (event.type === "session.idle") { + const props = (event.properties ?? {}) as { sessionID?: string }; + if (!props.sessionID) return; + queryClient.setQueryData(statusKey(workspaceId, props.sessionID), idleStatus); + } +} + +function startSync(input: SyncOptions) { + const client = createClient(input.baseUrl, undefined, { token: input.openworkToken, mode: "openwork" }); + const controller = new AbortController(); + + void client.event.subscribe(undefined, { signal: controller.signal }).then((sub) => { + void (async () => { + for await (const raw of sub.stream) { + if (controller.signal.aborted) return; + const event = normalizeEvent(raw); + if (!event) continue; + applyEvent(input.workspaceId, event); + } + })(); + }); + + return () => controller.abort(); +} + +export function ensureWorkspaceSessionSync(input: SyncOptions) { + const key = syncKey(input); + const existing = syncs.get(key); + if (existing) { + existing.refs += 1; + if (existing.stopTimer) { + clearTimeout(existing.stopTimer); + existing.stopTimer = null; + } + return () => releaseWorkspaceSessionSync(input); + } + + syncs.set(key, { + refs: 1, + stopTimer: null, + dispose: startSync(input), + }); + + return () => releaseWorkspaceSessionSync(input); +} + +function releaseWorkspaceSessionSync(input: SyncOptions) { + const key = syncKey(input); + const existing = syncs.get(key); + if (!existing) return; + existing.refs -= 1; + if (existing.refs > 0) return; + existing.stopTimer = setTimeout(() => { + existing.dispose(); + syncs.delete(key); + }, 10_000); +} + +export function seedSessionState(workspaceId: string, snapshot: OpenworkSessionSnapshot) { + const queryClient = getReactQueryClient(); + queryClient.setQueryData(transcriptKey(workspaceId, snapshot.session.id), snapshotToUIMessages(snapshot)); + queryClient.setQueryData(statusKey(workspaceId, snapshot.session.id), snapshot.status); + queryClient.setQueryData(todoKey(workspaceId, snapshot.session.id), snapshot.todos); +}