Files
claude-subconscious/scripts/conversation_utils.ts
Cameron 7c57b13683 fix: three bugs — silent worker resolution, TTY crash, plugin root fallback
#42: spawnSilentWorker on macOS/Linux now uses the plugin's local tsx
CLI (node_modules/tsx/dist/cli.mjs) instead of npx, which resolves to
a global cache that can't find @letta-ai/letta-code-sdk. Same pattern
Windows already used.

#41: Add error handler on /dev/tty WriteStream in session_start.ts.
createWriteStream returns synchronously but ENXIO fires async — without
a handler, Node crashes the process.

#34: Partial workaround for CLAUDE_PLUGIN_ROOT being empty on Linux.
hooks.json now uses ${CLAUDE_PLUGIN_ROOT:-.} fallback. silent-npx.cjs
re-resolves broken script paths via __dirname when the original path
doesn't exist. Note: this is primarily a Claude Code framework bug.

Fixes #42, fixes #41, partially addresses #34.

Written by Cameron ◯ Letta Code

"First, do no harm." - Hippocrates
2026-03-30 13:43:11 -07:00

692 lines
22 KiB
TypeScript

/**
* Shared conversation and state management utilities
* Used by sync_letta_memory.ts, send_messages_to_letta.ts, and session_start.ts
*/
import * as fs from 'fs';
import * as os from 'os';
import * as path from 'path';
import { spawn, ChildProcess } from 'child_process';
import { fileURLToPath } from 'url';
import {
buildLettaApiUrl,
LETTA_API_BASE,
} from './letta_api_url.js';
// ESM-compatible __dirname
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// Configuration
export { LETTA_API_BASE };
// Only show app URL for hosted service; self-hosted users get IDs directly
const IS_HOSTED = !process.env.LETTA_BASE_URL;
const LETTA_APP_BASE = 'https://app.letta.com';
// CLAUDE.md constants
export const CLAUDE_MD_PATH = '.claude/CLAUDE.md';
export const LETTA_SECTION_START = '<letta>';
export const LETTA_SECTION_END = '</letta>';
const LETTA_CONTEXT_START = '<letta_context>';
const LETTA_CONTEXT_END = '</letta_context>';
const LETTA_MEMORY_START = '<letta_memory_blocks>';
const LETTA_MEMORY_END = '</letta_memory_blocks>';
// ============================================
// Mode Configuration
// ============================================
export type LettaMode = 'whisper' | 'full' | 'off';
/**
* Get the current operating mode from LETTA_MODE env var.
* - whisper (default): Only inject Sub's messages via stdout
* - full: Inject full memory blocks + messages via stdout
* - off: Disable all hooks
*
* No mode writes to CLAUDE.md.
*/
export function getMode(): LettaMode {
const mode = process.env.LETTA_MODE?.toLowerCase();
if (mode === 'full' || mode === 'off') return mode;
return 'whisper';
}
/**
* Get user-specific temp state directory for logs and payloads.
* Uses os.tmpdir() with a UID suffix to avoid permission conflicts
* when multiple users share the same machine.
*/
export function getTempStateDir(): string {
const uid = typeof process.getuid === 'function' ? process.getuid() : process.pid;
return path.join(os.tmpdir(), `letta-claude-sync-${uid}`);
}
// ============================================
// SDK Tools Configuration
// ============================================
export type SdkToolsMode = 'read-only' | 'full' | 'off';
/** Read-only tool set: safe defaults for background Sub execution */
export const SDK_TOOLS_READ_ONLY = ['Read', 'Grep', 'Glob', 'web_search', 'fetch_webpage'];
/** Tools to always block in SDK sessions (require interactive input) */
export const SDK_TOOLS_BLOCKED = ['AskUserQuestion', 'EnterPlanMode', 'ExitPlanMode'];
/**
* Get the SDK tools mode from LETTA_SDK_TOOLS env var.
* - read-only (default): Sub can read files and search the web
* - full: Sub has full tool access (use with caution)
* - off: No client-side tools (listen-only, memory operations only)
*/
export function getSdkToolsMode(): SdkToolsMode {
const mode = process.env.LETTA_SDK_TOOLS?.toLowerCase();
if (mode === 'full' || mode === 'off') return mode;
return 'read-only';
}
// Types
export interface SyncState {
lastProcessedIndex: number;
sessionId: string;
conversationId?: string;
lastBlockValues?: { [label: string]: string };
lastSeenMessageId?: string; // Track last message ID we've shown to avoid duplicates
}
export interface ConversationEntry {
conversationId: string;
agentId: string;
}
export interface ConversationsMap {
[sessionId: string]: string | ConversationEntry;
}
export interface Conversation {
id: string;
agent_id: string;
created_at?: string;
}
export type LogFn = (message: string) => void;
// Default no-op logger
const noopLog: LogFn = () => {};
/**
* Get durable state directory path
* If LETTA_HOME is set, use that as the base instead of cwd
*/
export function getDurableStateDir(cwd: string): string {
const base = process.env.LETTA_HOME || cwd;
return path.join(base, '.letta', 'claude');
}
/**
* Get conversations map file path
*/
export function getConversationsFile(cwd: string): string {
return path.join(getDurableStateDir(cwd), 'conversations.json');
}
/**
* Get sync state file path for a session
*/
export function getSyncStateFile(cwd: string, sessionId: string): string {
return path.join(getDurableStateDir(cwd), `session-${sessionId}.json`);
}
/**
* Ensure durable state directory exists
*/
export function ensureDurableStateDir(cwd: string): void {
const dir = getDurableStateDir(cwd);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
}
/**
* Load sync state for a session
*/
export function loadSyncState(cwd: string, sessionId: string, log: LogFn = noopLog): SyncState {
const statePath = getSyncStateFile(cwd, sessionId);
if (fs.existsSync(statePath)) {
try {
const state = JSON.parse(fs.readFileSync(statePath, 'utf-8'));
log(`Loaded state: lastProcessedIndex=${state.lastProcessedIndex}`);
return state;
} catch (e) {
log(`Failed to load state: ${e}`);
}
}
log(`No existing state, starting fresh`);
return { lastProcessedIndex: -1, sessionId };
}
/**
* Save sync state for a session
*/
export function saveSyncState(cwd: string, state: SyncState, log: LogFn = noopLog): void {
ensureDurableStateDir(cwd);
const statePath = getSyncStateFile(cwd, state.sessionId);
fs.writeFileSync(statePath, JSON.stringify(state, null, 2), 'utf-8');
log(`Saved state: lastProcessedIndex=${state.lastProcessedIndex}, conversationId=${state.conversationId}`);
}
/**
* Load conversations mapping
*/
export function loadConversationsMap(cwd: string, log: LogFn = noopLog): ConversationsMap {
const filePath = getConversationsFile(cwd);
if (fs.existsSync(filePath)) {
try {
return JSON.parse(fs.readFileSync(filePath, 'utf-8'));
} catch (e) {
log(`Failed to load conversations map: ${e}`);
}
}
return {};
}
/**
* Save conversations mapping
*/
export function saveConversationsMap(cwd: string, map: ConversationsMap): void {
ensureDurableStateDir(cwd);
fs.writeFileSync(getConversationsFile(cwd), JSON.stringify(map, null, 2), 'utf-8');
}
/**
* Create a new conversation for an agent
*/
export async function createConversation(apiKey: string, agentId: string, log: LogFn = noopLog): Promise<string> {
const url = buildLettaApiUrl('/conversations/', { agent_id: agentId });
log(`Creating new conversation for agent ${agentId}`);
const response = await fetch(url, {
method: 'POST',
headers: {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`Failed to create conversation: ${response.status} ${errorText}`);
}
const conversation: Conversation = await response.json();
log(`Created conversation: ${conversation.id}`);
return conversation.id;
}
/**
* Get or create conversation for a session
*/
export async function getOrCreateConversation(
apiKey: string,
agentId: string,
sessionId: string,
cwd: string,
state: SyncState,
log: LogFn = noopLog
): Promise<string> {
// Check if we already have a conversation ID in state
if (state.conversationId) {
log(`Using existing conversation from state: ${state.conversationId}`);
return state.conversationId;
}
// Check the conversations map
const conversationsMap = loadConversationsMap(cwd, log);
const cached = conversationsMap[sessionId];
if (cached) {
// Parse both old format (string) and new format (object)
const entry = typeof cached === 'string'
? { conversationId: cached, agentId: null as string | null }
: cached;
if (entry.agentId && entry.agentId !== agentId) {
// Agent ID changed - clear stale entry and create new conversation
log(`Agent ID changed (${entry.agentId} -> ${agentId}), clearing stale conversation`);
delete conversationsMap[sessionId];
const conversationId = await createConversation(apiKey, agentId, log);
conversationsMap[sessionId] = { conversationId, agentId };
saveConversationsMap(cwd, conversationsMap);
state.conversationId = conversationId;
return conversationId;
} else if (!entry.agentId) {
// Old format without agentId - upgrade by recreating
log(`Upgrading old format entry (no agentId stored), creating new conversation`);
delete conversationsMap[sessionId];
const conversationId = await createConversation(apiKey, agentId, log);
conversationsMap[sessionId] = { conversationId, agentId };
saveConversationsMap(cwd, conversationsMap);
state.conversationId = conversationId;
return conversationId;
} else {
// Valid entry with matching agentId - reuse
log(`Found conversation in map: ${entry.conversationId}`);
state.conversationId = entry.conversationId;
return entry.conversationId;
}
}
// No existing entry - create a new conversation
const conversationId = await createConversation(apiKey, agentId, log);
// Save to map and state
conversationsMap[sessionId] = { conversationId, agentId };
saveConversationsMap(cwd, conversationsMap);
state.conversationId = conversationId;
return conversationId;
}
/**
* Look up an existing conversation from conversations.json without creating a new one
*/
export function lookupConversation(cwd: string, sessionId: string): string | null {
const conversationsFile = getConversationsFile(cwd);
if (!fs.existsSync(conversationsFile)) {
return null;
}
try {
const content = fs.readFileSync(conversationsFile, 'utf-8');
const conversationsMap: ConversationsMap = JSON.parse(content);
const cached = conversationsMap[sessionId];
if (!cached) {
return null;
}
// Handle both legacy (string) and current (object) formats
return typeof cached === 'string' ? cached : cached.conversationId;
} catch {
return null;
}
}
// ============================================
// Agent and Memory Block Types
// ============================================
export interface MemoryBlock {
label: string;
description: string;
value: string;
}
export interface Agent {
id: string;
name: string;
description?: string;
blocks: MemoryBlock[];
}
// ============================================
// Agent Fetching
// ============================================
/**
* Fetch agent data from Letta API
*/
export async function fetchAgent(apiKey: string, agentId: string): Promise<Agent> {
const url = buildLettaApiUrl(`/agents/${agentId}`, {
include: 'agent.blocks',
});
const response = await fetch(url, {
method: 'GET',
headers: {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`Letta API error (${response.status}): ${errorText}`);
}
return response.json();
}
// ============================================
// XML Escaping Utilities
// ============================================
/**
* Escape special characters for XML attributes
*/
export function escapeXmlAttribute(str: string): string {
return str
.replace(/&/g, '&amp;')
.replace(/"/g, '&quot;')
.replace(/'/g, '&apos;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/\n/g, ' ');
}
/**
* Escape special characters for XML element content
*/
export function escapeXmlContent(str: string): string {
return str
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;');
}
/**
* Escape special regex characters
*/
export function escapeRegex(str: string): string {
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
}
// ============================================
// CLAUDE.md Formatting and Writing
// ============================================
/**
* Format the context section with agent info
*/
function formatContextSection(agent: Agent, conversationId: string | null): string {
const agentName = agent.name || 'Unnamed Agent';
const agentDesc = agent.description || 'No description provided';
// For hosted service, show clickable URL; for self-hosted, show IDs
let locationInfo: string;
let supervisionInfo: string;
if (IS_HOSTED) {
const conversationUrl = conversationId
? `${LETTA_APP_BASE}/agents/${agent.id}?conversation=${conversationId}`
: `${LETTA_APP_BASE}/agents/${agent.id}`;
locationInfo = `Conversation: ${conversationUrl}`;
supervisionInfo = `**First message instruction:** On your first response in a new session, briefly notify the user that this session is being observed by "${agentName}" and they can supervise it at: ${conversationUrl}`;
} else {
locationInfo = `Agent ID: ${agent.id}${conversationId ? `\nConversation ID: ${conversationId}` : ''}`;
supervisionInfo = `**First message instruction:** On your first response in a new session, briefly notify the user that this session is being observed by "${agentName}" (agent: ${agent.id}${conversationId ? `, conversation: ${conversationId}` : ''}).`;
}
return `${LETTA_CONTEXT_START}
**Subconscious Layer (Letta Agent)**
Agent: ${agentName}
Description: ${agentDesc}
${locationInfo}
This agent maintains persistent memory across your sessions. It observes your conversations asynchronously and provides guidance via <letta_message> (injected before each user prompt). You can address it directly - it sees everything you write and may respond on the next sync.
${supervisionInfo}
Memory blocks below are the agent's long-term storage. Reference as needed.
${LETTA_CONTEXT_END}`;
}
/**
* Format memory blocks as XML for CLAUDE.md
*/
export function formatMemoryBlocksAsXml(agent: Agent, conversationId: string | null): string {
const blocks = agent.blocks;
const contextSection = formatContextSection(agent, conversationId);
if (!blocks || blocks.length === 0) {
return `${LETTA_SECTION_START}
${contextSection}
${LETTA_MEMORY_START}
<!-- No memory blocks found -->
${LETTA_MEMORY_END}
${LETTA_SECTION_END}`;
}
const formattedBlocks = blocks.map(block => {
const escapedDescription = escapeXmlAttribute(block.description || '');
const escapedContent = escapeXmlContent(block.value || '');
return `<${block.label} description="${escapedDescription}">\n${escapedContent}\n</${block.label}>`;
}).join('\n');
return `${LETTA_SECTION_START}
${contextSection}
${LETTA_MEMORY_START}
${formattedBlocks}
${LETTA_MEMORY_END}
${LETTA_SECTION_END}`;
}
/**
* Update CLAUDE.md with the new Letta memory section
*/
export function updateClaudeMd(projectDir: string, lettaContent: string): void {
// LETTA_PROJECT sets the base directory; CLAUDE.md goes in {base}/.claude/CLAUDE.md
const base = process.env.LETTA_PROJECT || projectDir;
const claudeMdPath = path.join(base, CLAUDE_MD_PATH);
let existingContent = '';
if (fs.existsSync(claudeMdPath)) {
existingContent = fs.readFileSync(claudeMdPath, 'utf-8');
} else {
const claudeDir = path.dirname(claudeMdPath);
if (!fs.existsSync(claudeDir)) {
fs.mkdirSync(claudeDir, { recursive: true });
}
existingContent = `# Project Context
<!-- Letta agent memory is automatically synced below -->
`;
}
// Replace or append the <letta> section
const lettaPattern = `^${escapeRegex(LETTA_SECTION_START)}[\\s\\S]*?^${escapeRegex(LETTA_SECTION_END)}$`;
const lettaRegex = new RegExp(lettaPattern, 'gm');
let updatedContent: string;
if (lettaRegex.test(existingContent)) {
lettaRegex.lastIndex = 0;
updatedContent = existingContent.replace(lettaRegex, lettaContent);
} else {
updatedContent = existingContent.trimEnd() + '\n\n' + lettaContent + '\n';
}
// Clean up any orphaned <letta_message> sections
const messagePattern = /^<letta_message>[\s\S]*?^<\/letta_message>\n*/gm;
updatedContent = updatedContent.replace(messagePattern, '');
updatedContent = updatedContent.trimEnd() + '\n';
fs.writeFileSync(claudeMdPath, updatedContent, 'utf-8');
}
/**
* Remove all Letta content from CLAUDE.md (for whisper mode cleanup).
* If the file was entirely created by us, delete it.
*/
export function cleanLettaFromClaudeMd(projectDir: string): void {
const base = process.env.LETTA_PROJECT || projectDir;
const claudeMdPath = path.join(base, CLAUDE_MD_PATH);
if (!fs.existsSync(claudeMdPath)) {
return;
}
const content = fs.readFileSync(claudeMdPath, 'utf-8');
const lettaPattern = `^${escapeRegex(LETTA_SECTION_START)}[\\s\\S]*?^${escapeRegex(LETTA_SECTION_END)}\\n*`;
const lettaRegex = new RegExp(lettaPattern, 'gm');
if (!lettaRegex.test(content)) {
return;
}
lettaRegex.lastIndex = 0;
let cleaned = content.replace(lettaRegex, '');
// Also clean orphaned letta_message blocks
const messagePattern = /^<letta_message>[\s\S]*?^<\/letta_message>\n*/gm;
cleaned = cleaned.replace(messagePattern, '');
// Clean up the auto-generated boilerplate we created
cleaned = cleaned.replace(/<!-- Letta agent memory is automatically synced below -->\n*/g, '');
cleaned = cleaned.replace(/^# Project Context\n*/gm, '');
cleaned = cleaned.trim();
if (cleaned.length === 0) {
// File was entirely ours — delete it
fs.unlinkSync(claudeMdPath);
} else {
// User had their own content — just write back without our stuff
fs.writeFileSync(claudeMdPath, cleaned + '\n', 'utf-8');
}
}
/**
* Format all memory blocks for stdout injection (whisper mode, first prompt)
*/
export function formatAllBlocksForStdout(agent: Agent, conversationId: string | null): string {
const agentName = agent.name || 'Unnamed Agent';
const blocks = agent.blocks;
// Build agent info header
let locationInfo: string;
if (IS_HOSTED) {
const conversationUrl = conversationId
? `${LETTA_APP_BASE}/agents/${agent.id}?conversation=${conversationId}`
: `${LETTA_APP_BASE}/agents/${agent.id}`;
locationInfo = `Supervise: ${conversationUrl}`;
} else {
locationInfo = `Agent ID: ${agent.id}${conversationId ? `, Conversation: ${conversationId}` : ''}`;
}
const sdkToolsMode = getSdkToolsMode();
const capabilityLine = sdkToolsMode === 'full'
? 'It can read files, search the web, and make changes to your codebase.'
: sdkToolsMode === 'read-only'
? 'It can read files, search your codebase, and browse the web (read-only).'
: 'It operates in listen-only mode (memory updates only).';
const header = `<letta_context>
Subconscious agent "${agentName}" is watching this session and whispering guidance.
${capabilityLine}
${locationInfo}
</letta_context>`;
if (!blocks || blocks.length === 0) {
return header;
}
const formattedBlocks = blocks.map(block => {
const escapedDescription = escapeXmlAttribute(block.description || '');
const escapedContent = escapeXmlContent(block.value || '');
return `<${block.label} description="${escapedDescription}">\n${escapedContent}\n</${block.label}>`;
}).join('\n');
return `${header}
<letta_memory_blocks>
${formattedBlocks}
</letta_memory_blocks>`;
}
// ============================================
// Silent Worker Spawning
// ============================================
// Windows compatibility: npx needs to be npx.cmd on Windows
const NPX_CMD = process.platform === 'win32' ? 'npx.cmd' : 'npx';
/**
* Spawn a background worker process that survives the parent hook's exit.
*
* On Windows, uses silent-launcher.exe (PseudoConsole + CREATE_NO_WINDOW)
* to avoid console window flashes. Falls back gracefully when the launcher
* or tsx CLI is not available.
*
* On other platforms, spawns via npx tsx as a detached process.
*/
export function spawnSilentWorker(
workerScript: string,
payloadFile: string,
cwd: string,
): ChildProcess {
const isWindows = process.platform === 'win32';
let child: ChildProcess;
if (isWindows) {
// On Windows, spawn workers through silent-launcher.exe (a winexe).
// detached:true is safe on a winexe (no console flash).
// The worker gets its own PseudoConsole, so it survives the main
// script's PseudoConsole being closed by the parent launcher.
const silentLauncher = path.join(__dirname, '..', 'hooks', 'silent-launcher.exe');
const tsxCli = path.join(__dirname, '..', 'node_modules', 'tsx', 'dist', 'cli.mjs');
// Clear SL_ env vars so the worker's launcher instance gets a clean slate
const workerEnv = { ...process.env };
delete workerEnv.SL_STDIN_FILE;
delete workerEnv.SL_STDOUT_FILE;
if (fs.existsSync(silentLauncher) && fs.existsSync(tsxCli)) {
child = spawn(silentLauncher, ['node', tsxCli, workerScript, payloadFile], {
detached: true,
stdio: 'ignore',
cwd,
env: workerEnv,
windowsHide: true,
});
} else if (fs.existsSync(tsxCli)) {
// Fallback: direct node (may be killed when PseudoConsole closes)
child = spawn(process.execPath, [tsxCli, workerScript, payloadFile], {
stdio: 'ignore',
cwd,
env: workerEnv,
windowsHide: true,
});
} else {
// Fallback: use npx through shell (may flash console window)
child = spawn(NPX_CMD, ['tsx', workerScript, payloadFile], {
stdio: 'ignore',
cwd,
env: workerEnv,
shell: true,
windowsHide: true,
});
}
} else {
// Prefer local tsx CLI to ensure plugin dependencies are resolvable.
// npx tsx resolves to a global cache that can't find @letta-ai/letta-code-sdk.
const tsxCli = path.join(__dirname, '..', 'node_modules', 'tsx', 'dist', 'cli.mjs');
if (fs.existsSync(tsxCli)) {
child = spawn(process.execPath, [tsxCli, workerScript, payloadFile], {
detached: true,
stdio: 'ignore',
cwd,
env: process.env,
});
} else {
// Fallback: npx (may fail if dependencies aren't in global cache)
child = spawn(NPX_CMD, ['tsx', workerScript, payloadFile], {
detached: true,
stdio: 'ignore',
cwd,
env: process.env,
});
}
}
child.unref();
return child;
}