mirror of
https://github.com/thedotmack/claude-mem
synced 2026-04-25 17:15:04 +02:00
feat: security observation types + Telegram notifier (#2084)
* feat: security observation types + Telegram notifier Adds two severity-axis security observation types (security_alert, security_note) to the code mode and a fire-and-forget Telegram notifier that posts when a saved observation matches configured type or concept triggers. Default trigger fires on security_alert only; notifier is disabled until BOT_TOKEN and CHAT_ID are set. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(telegram): honor CLAUDE_MEM_TELEGRAM_ENABLED master toggle Adds an explicit on/off flag (default 'true') so users can disable the notifier without clearing credentials. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * perf(stop-hook): make summarize handler fire-and-forget Stop hook previously blocked the Claude Code session for up to 110 seconds while polling the worker for summary completion. The handler now returns as soon as the enqueue POST is acked. - summarize.ts: drop the 500ms polling loop and /api/sessions/complete call; tighten SUMMARIZE_TIMEOUT_MS from 300s to 5s since the worker acks the enqueue synchronously. - SessionCompletionHandler: extract idempotent finalizeSession() for DB mark + orphaned-pending-queue drain + broadcast. completeByDbId now delegates so the /api/sessions/complete HTTP route is backward compatible. - SessionRoutes: wire finalizeSession into the SDK-agent generator's finally block, gated on lastSummaryStored + empty pending queue so only Stop events produce finalize (not every idle tick). - WorkerService: own the single SessionCompletionHandler instance and inject it into SessionRoutes to avoid duplicate construction. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(pr2084): address reviewer findings CodeRabbit: - SessionStore.getSessionById now returns status; without it, the finalizeSession idempotency guard always evaluated false and re-fired drain/broadcast on every call. - worker-service.ts: three call sites that remove the in-memory session after finalizeSession now do so only on success. On failure the session is left in place so the 60s orphan reaper can retry; removing it would orphan an 'active' DB row indefinitely under the fire-and- forget Stop hook. - runFallbackForTerminatedSession no longer emits a second session_completed event; finalizeSession already broadcasts one. The explicit broadcast now runs only on the finalize-failure fallback. Greptile: - TelegramNotifier reads via loadFromFile(USER_SETTINGS_PATH) so values in ~/.claude-mem/settings.json actually take effect; SettingsDefaultsManager.get() alone skipped the file and silently ignored user-configured credentials. - Emoji is derived from obs.type (security_alert → 🚨, security_note → 🔐, fallback 🔔) instead of hardcoded 🚨 for every observation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(hooks): worker-port mismatch on Windows and settings.json overrides (#2086) Hooks computed the health-check port as \$((37700 + id -u % 100)), ignoring ~/.claude-mem/settings.json. Two failure modes resulted: 1. Users upgrading from pre-per-uid builds kept CLAUDE_MEM_WORKER_PORT set to '37777' in settings.json. The worker bound 37777 (settings wins), but hooks queried 37701 (uid 501 on macOS), so every SessionStart/UserPromptSubmit health check failed. 2. Windows Git Bash/PowerShell returns a real Windows UID for 'id -u' (e.g. 209), producing port 37709 while the Node worker fell back to 37777 (process.getuid?.() ?? 77). Every prompt hit the 60s hook timeout. hooks.json now resolves the port in this order, matching how the worker itself resolves it: 1. sed CLAUDE_MEM_WORKER_PORT from ~/.claude-mem/settings.json 2. If absent, and uname is MINGW/CYGWIN/MSYS → 37777 3. Otherwise 37700 + (id -u || 77) % 100 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(pr2084): sync DatabaseManager.getSessionById return type CodeRabbit round 2: the DatabaseManager.getSessionById return type was missing platform_source, custom_title, and status fields that SessionStore.getSessionById actually returns. Structural typing hid the mismatch at compile time, but it prevents callers going through DatabaseManager from seeing the status field that the idempotency guard in SessionCompletionHandler relies on. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(pr2084): hooks honor env vars and host; looser port regex (#2086 followup) CodeRabbit round 3: match the worker's env > file > defaults precedence and resolve host the same way as port. - Env: CLAUDE_MEM_WORKER_PORT and CLAUDE_MEM_WORKER_HOST win first. - File: sed now accepts both quoted ('"37777"') and unquoted (37777) JSON values for the port; a separate sed reads CLAUDE_MEM_WORKER_HOST. - Defaults: port per-uid formula (Windows: 37777), host 127.0.0.1. - Health-check URL uses the resolved $HOST instead of hardcoded localhost. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -24,12 +24,12 @@
|
||||
},
|
||||
{
|
||||
"type": "command",
|
||||
"command": "export PATH=\"$($SHELL -lc 'echo $PATH' 2>/dev/null):$PATH\"; _R=\"${CLAUDE_PLUGIN_ROOT}\"; [ -z \"$_R\" ] && _R=$(ls -dt $HOME/.claude/plugins/cache/thedotmack/claude-mem/[0-9]*/ 2>/dev/null | head -1); _R=\"${_R%/}\"; [ -z \"$_R\" ] && _R=\"$HOME/.claude/plugins/marketplaces/thedotmack/plugin\"; node \"$_R/scripts/bun-runner.js\" \"$_R/scripts/worker-service.cjs\" start; for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20; do curl -sf http://localhost:$((37700 + $(id -u 2>/dev/null || echo 77) % 100))/health >/dev/null 2>&1 && break; sleep 1; done; curl -sf http://localhost:$((37700 + $(id -u 2>/dev/null || echo 77) % 100))/health >/dev/null 2>&1 || true; echo '{\"continue\":true,\"suppressOutput\":true}'",
|
||||
"command": "export PATH=\"$($SHELL -lc 'echo $PATH' 2>/dev/null):$PATH\"; _R=\"${CLAUDE_PLUGIN_ROOT}\"; [ -z \"$_R\" ] && _R=$(ls -dt $HOME/.claude/plugins/cache/thedotmack/claude-mem/[0-9]*/ 2>/dev/null | head -1); _R=\"${_R%/}\"; [ -z \"$_R\" ] && _R=\"$HOME/.claude/plugins/marketplaces/thedotmack/plugin\"; PORT=\"${CLAUDE_MEM_WORKER_PORT:-}\"; HOST=\"${CLAUDE_MEM_WORKER_HOST:-}\"; [ -z \"$PORT\" ] && PORT=$(sed -n 's/.*\"CLAUDE_MEM_WORKER_PORT\"[^0-9]*\\([0-9][0-9]*\\).*/\\1/p' \"$HOME/.claude-mem/settings.json\" 2>/dev/null | head -1); [ -z \"$HOST\" ] && HOST=$(sed -n 's/.*\"CLAUDE_MEM_WORKER_HOST\"[^\"]*\"\\([^\"]*\\)\".*/\\1/p' \"$HOME/.claude-mem/settings.json\" 2>/dev/null | head -1); if [ -z \"$PORT\" ]; then case \"$(uname -s 2>/dev/null)\" in MINGW*|CYGWIN*|MSYS*) PORT=37777;; *) PORT=$((37700 + $(id -u 2>/dev/null || echo 77) % 100));; esac; fi; [ -z \"$HOST\" ] && HOST=127.0.0.1; node \"$_R/scripts/bun-runner.js\" \"$_R/scripts/worker-service.cjs\" start; for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20; do curl -sf http://$HOST:$PORT/health >/dev/null 2>&1 && break; sleep 1; done; curl -sf http://$HOST:$PORT/health >/dev/null 2>&1 || true; echo '{\"continue\":true,\"suppressOutput\":true}'",
|
||||
"timeout": 60
|
||||
},
|
||||
{
|
||||
"type": "command",
|
||||
"command": "export PATH=\"$($SHELL -lc 'echo $PATH' 2>/dev/null):$PATH\"; _R=\"${CLAUDE_PLUGIN_ROOT}\"; [ -z \"$_R\" ] && _R=$(ls -dt $HOME/.claude/plugins/cache/thedotmack/claude-mem/[0-9]*/ 2>/dev/null | head -1); _R=\"${_R%/}\"; [ -z \"$_R\" ] && _R=\"$HOME/.claude/plugins/marketplaces/thedotmack/plugin\"; for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20; do curl -sf http://localhost:$((37700 + $(id -u 2>/dev/null || echo 77) % 100))/health >/dev/null 2>&1 && break; sleep 1; done; if curl -sf http://localhost:$((37700 + $(id -u 2>/dev/null || echo 77) % 100))/health >/dev/null 2>&1; then node \"$_R/scripts/bun-runner.js\" \"$_R/scripts/worker-service.cjs\" hook claude-code context || true; fi",
|
||||
"command": "export PATH=\"$($SHELL -lc 'echo $PATH' 2>/dev/null):$PATH\"; _R=\"${CLAUDE_PLUGIN_ROOT}\"; [ -z \"$_R\" ] && _R=$(ls -dt $HOME/.claude/plugins/cache/thedotmack/claude-mem/[0-9]*/ 2>/dev/null | head -1); _R=\"${_R%/}\"; [ -z \"$_R\" ] && _R=\"$HOME/.claude/plugins/marketplaces/thedotmack/plugin\"; PORT=\"${CLAUDE_MEM_WORKER_PORT:-}\"; HOST=\"${CLAUDE_MEM_WORKER_HOST:-}\"; [ -z \"$PORT\" ] && PORT=$(sed -n 's/.*\"CLAUDE_MEM_WORKER_PORT\"[^0-9]*\\([0-9][0-9]*\\).*/\\1/p' \"$HOME/.claude-mem/settings.json\" 2>/dev/null | head -1); [ -z \"$HOST\" ] && HOST=$(sed -n 's/.*\"CLAUDE_MEM_WORKER_HOST\"[^\"]*\"\\([^\"]*\\)\".*/\\1/p' \"$HOME/.claude-mem/settings.json\" 2>/dev/null | head -1); if [ -z \"$PORT\" ]; then case \"$(uname -s 2>/dev/null)\" in MINGW*|CYGWIN*|MSYS*) PORT=37777;; *) PORT=$((37700 + $(id -u 2>/dev/null || echo 77) % 100));; esac; fi; [ -z \"$HOST\" ] && HOST=127.0.0.1; for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20; do curl -sf http://$HOST:$PORT/health >/dev/null 2>&1 && break; sleep 1; done; if curl -sf http://$HOST:$PORT/health >/dev/null 2>&1; then node \"$_R/scripts/bun-runner.js\" \"$_R/scripts/worker-service.cjs\" hook claude-code context || true; fi",
|
||||
"timeout": 60
|
||||
}
|
||||
]
|
||||
@@ -40,7 +40,7 @@
|
||||
"hooks": [
|
||||
{
|
||||
"type": "command",
|
||||
"command": "export PATH=\"$($SHELL -lc 'echo $PATH' 2>/dev/null):$PATH\"; _R=\"${CLAUDE_PLUGIN_ROOT}\"; [ -z \"$_R\" ] && _R=$(ls -dt $HOME/.claude/plugins/cache/thedotmack/claude-mem/[0-9]*/ 2>/dev/null | head -1); _R=\"${_R%/}\"; [ -z \"$_R\" ] && _R=\"$HOME/.claude/plugins/marketplaces/thedotmack/plugin\"; _HEALTH=0; curl -sf http://localhost:$((37700 + $(id -u 2>/dev/null || echo 77) % 100))/health >/dev/null 2>&1 && _HEALTH=1 || for i in 1 2 3 4 5 6 7 8 9 10; do sleep 1; curl -sf http://localhost:$((37700 + $(id -u 2>/dev/null || echo 77) % 100))/health >/dev/null 2>&1 && _HEALTH=1 && break; done; [ \"$_HEALTH\" = \"1\" ] && node \"$_R/scripts/bun-runner.js\" \"$_R/scripts/worker-service.cjs\" hook claude-code session-init",
|
||||
"command": "export PATH=\"$($SHELL -lc 'echo $PATH' 2>/dev/null):$PATH\"; _R=\"${CLAUDE_PLUGIN_ROOT}\"; [ -z \"$_R\" ] && _R=$(ls -dt $HOME/.claude/plugins/cache/thedotmack/claude-mem/[0-9]*/ 2>/dev/null | head -1); _R=\"${_R%/}\"; [ -z \"$_R\" ] && _R=\"$HOME/.claude/plugins/marketplaces/thedotmack/plugin\"; PORT=\"${CLAUDE_MEM_WORKER_PORT:-}\"; HOST=\"${CLAUDE_MEM_WORKER_HOST:-}\"; [ -z \"$PORT\" ] && PORT=$(sed -n 's/.*\"CLAUDE_MEM_WORKER_PORT\"[^0-9]*\\([0-9][0-9]*\\).*/\\1/p' \"$HOME/.claude-mem/settings.json\" 2>/dev/null | head -1); [ -z \"$HOST\" ] && HOST=$(sed -n 's/.*\"CLAUDE_MEM_WORKER_HOST\"[^\"]*\"\\([^\"]*\\)\".*/\\1/p' \"$HOME/.claude-mem/settings.json\" 2>/dev/null | head -1); if [ -z \"$PORT\" ]; then case \"$(uname -s 2>/dev/null)\" in MINGW*|CYGWIN*|MSYS*) PORT=37777;; *) PORT=$((37700 + $(id -u 2>/dev/null || echo 77) % 100));; esac; fi; [ -z \"$HOST\" ] && HOST=127.0.0.1; _HEALTH=0; curl -sf http://$HOST:$PORT/health >/dev/null 2>&1 && _HEALTH=1 || for i in 1 2 3 4 5 6 7 8 9 10; do sleep 1; curl -sf http://$HOST:$PORT/health >/dev/null 2>&1 && _HEALTH=1 && break; done; [ \"$_HEALTH\" = \"1\" ] && node \"$_R/scripts/bun-runner.js\" \"$_R/scripts/worker-service.cjs\" hook claude-code session-init",
|
||||
"timeout": 60
|
||||
}
|
||||
]
|
||||
|
||||
@@ -44,6 +44,20 @@
|
||||
"description": "Architectural/design choice with rationale",
|
||||
"emoji": "⚖️",
|
||||
"work_emoji": "⚖️"
|
||||
},
|
||||
{
|
||||
"id": "security_alert",
|
||||
"label": "Security Alert",
|
||||
"description": "A security issue that needs attention before continuing.",
|
||||
"emoji": "🚨",
|
||||
"work_emoji": "🚨"
|
||||
},
|
||||
{
|
||||
"id": "security_note",
|
||||
"label": "Security Note",
|
||||
"description": "A security-relevant observation worth recording, but not urgent.",
|
||||
"emoji": "🔐",
|
||||
"work_emoji": "🔐"
|
||||
}
|
||||
],
|
||||
"observation_concepts": [
|
||||
|
||||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -1,28 +1,19 @@
|
||||
/**
|
||||
* Summarize Handler - Stop
|
||||
*
|
||||
* Runs in the Stop hook (120s timeout, not capped like SessionEnd).
|
||||
* This is the ONLY place where we can reliably wait for async work.
|
||||
*
|
||||
* Flow:
|
||||
* 1. Queue summarize request to worker
|
||||
* 2. Poll worker until summary processing completes
|
||||
* 3. Call /api/sessions/complete to clean up session
|
||||
*
|
||||
* SessionEnd (1.5s cap from Claude Code) is just a lightweight fallback —
|
||||
* all real work must happen here in Stop.
|
||||
* Fire-and-forget: enqueue the summarize request with the worker and return
|
||||
* immediately so the Stop hook does not block the user's terminal. The worker
|
||||
* owns completion and session cleanup.
|
||||
*/
|
||||
|
||||
import type { EventHandler, NormalizedHookInput, HookResult } from '../types.js';
|
||||
import { ensureWorkerRunning, workerHttpRequest } from '../../shared/worker-utils.js';
|
||||
import { logger } from '../../utils/logger.js';
|
||||
import { extractLastMessage } from '../../shared/transcript-parser.js';
|
||||
import { HOOK_EXIT_CODES, HOOK_TIMEOUTS, getTimeout } from '../../shared/hook-constants.js';
|
||||
import { HOOK_EXIT_CODES } from '../../shared/hook-constants.js';
|
||||
import { normalizePlatformSource } from '../../shared/platform-source.js';
|
||||
|
||||
const SUMMARIZE_TIMEOUT_MS = getTimeout(HOOK_TIMEOUTS.DEFAULT);
|
||||
const POLL_INTERVAL_MS = 500;
|
||||
const MAX_WAIT_FOR_SUMMARY_MS = 110_000; // 110s — fits within Stop hook's 120s timeout
|
||||
const SUMMARIZE_TIMEOUT_MS = 5000;
|
||||
|
||||
export const summarizeHandler: EventHandler = {
|
||||
async execute(input: NormalizedHookInput): Promise<HookResult> {
|
||||
@@ -107,62 +98,7 @@ export const summarizeHandler: EventHandler = {
|
||||
return { continue: true, suppressOutput: true };
|
||||
}
|
||||
|
||||
logger.debug('HOOK', 'Summary request queued, waiting for completion');
|
||||
|
||||
// 2. Poll worker until pending work for this session is done.
|
||||
// This keeps the Stop hook alive (120s timeout) so the SDK agent
|
||||
// can finish processing the summary before SessionEnd kills the session.
|
||||
const waitStart = Date.now();
|
||||
let summaryStored: boolean | null = null;
|
||||
while ((Date.now() - waitStart) < MAX_WAIT_FOR_SUMMARY_MS) {
|
||||
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS));
|
||||
|
||||
let statusResponse: Response;
|
||||
let status: { queueLength?: number; summaryStored?: boolean | null };
|
||||
try {
|
||||
statusResponse = await workerHttpRequest(`/api/sessions/status?contentSessionId=${encodeURIComponent(sessionId)}`, { timeoutMs: 5000 });
|
||||
status = await statusResponse.json() as { queueLength?: number; summaryStored?: boolean | null };
|
||||
} catch (pollError) {
|
||||
// Worker may be busy — keep polling
|
||||
logger.debug('HOOK', 'Summary status poll failed, retrying', { error: pollError instanceof Error ? pollError.message : String(pollError) });
|
||||
continue;
|
||||
}
|
||||
|
||||
const queueLength = status.queueLength ?? 0;
|
||||
// Only treat an empty queue as completion when the session exists (non-404).
|
||||
// A 404 means the session was not found — not that processing finished.
|
||||
if (queueLength === 0 && statusResponse.status !== 404) {
|
||||
summaryStored = status.summaryStored ?? null;
|
||||
logger.info('HOOK', 'Summary processing complete', {
|
||||
waitedMs: Date.now() - waitStart,
|
||||
summaryStored
|
||||
});
|
||||
// Warn when the agent processed a summarize request but produced no storable summary.
|
||||
// This is the silent-failure path described in #1633: queue empties but no summary record exists.
|
||||
if (summaryStored === false) {
|
||||
logger.warn('HOOK', 'Summary was not stored: LLM response likely lacked valid <summary> tags (#1633)', {
|
||||
sessionId,
|
||||
waitedMs: Date.now() - waitStart
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Complete the session — clean up active sessions map.
|
||||
// This runs here in Stop (120s timeout) instead of SessionEnd (1.5s cap)
|
||||
// so it reliably fires after summary work is done.
|
||||
try {
|
||||
await workerHttpRequest('/api/sessions/complete', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ contentSessionId: sessionId }),
|
||||
timeoutMs: 10_000
|
||||
});
|
||||
logger.info('HOOK', 'Session completed in Stop hook', { contentSessionId: sessionId });
|
||||
} catch (err) {
|
||||
logger.warn('HOOK', `Stop hook: session-complete failed: ${err instanceof Error ? err.message : err}`);
|
||||
}
|
||||
logger.debug('HOOK', 'Summary request queued');
|
||||
|
||||
return { continue: true, suppressOutput: true };
|
||||
}
|
||||
|
||||
119
src/services/integrations/TelegramNotifier.ts
Normal file
119
src/services/integrations/TelegramNotifier.ts
Normal file
@@ -0,0 +1,119 @@
|
||||
/**
|
||||
* TelegramNotifier
|
||||
*
|
||||
* Fire-and-forget Telegram notification module. Fires one message per observation
|
||||
* whose type or concepts match user-configured triggers. Never throws; all errors
|
||||
* are caught per-observation and logged as warnings. Bot token is never logged.
|
||||
*/
|
||||
|
||||
import { ParsedObservation } from '../../sdk/parser.js';
|
||||
import { SettingsDefaultsManager } from '../../shared/SettingsDefaultsManager.js';
|
||||
import { USER_SETTINGS_PATH } from '../../shared/paths.js';
|
||||
import { logger } from '../../utils/logger.js';
|
||||
|
||||
export interface TelegramNotifyInput {
|
||||
observations: ParsedObservation[];
|
||||
observationIds: number[];
|
||||
project: string;
|
||||
memorySessionId: string;
|
||||
}
|
||||
|
||||
const MARKDOWN_V2_RESERVED = /[_*\[\]()~`>#+\-=|{}.!\\]/g;
|
||||
|
||||
// Emoji per observation type. Unknown types fall back to the generic 🔔 so
|
||||
// the message is still readable rather than misleadingly loud.
|
||||
const TYPE_EMOJI: Record<string, string> = {
|
||||
security_alert: '🚨',
|
||||
security_note: '🔐',
|
||||
};
|
||||
const DEFAULT_EMOJI = '🔔';
|
||||
|
||||
function escapeMarkdownV2(value: string): string {
|
||||
return value.replace(MARKDOWN_V2_RESERVED, '\\$&');
|
||||
}
|
||||
|
||||
function splitCsv(value: string): string[] {
|
||||
return value
|
||||
.split(',')
|
||||
.map(entry => entry.trim())
|
||||
.filter(entry => entry.length > 0);
|
||||
}
|
||||
|
||||
function formatMessage(
|
||||
obs: ParsedObservation,
|
||||
project: string,
|
||||
memorySessionId: string,
|
||||
observationId: number,
|
||||
): string {
|
||||
const emoji = TYPE_EMOJI[obs.type] ?? DEFAULT_EMOJI;
|
||||
const type = escapeMarkdownV2(obs.type);
|
||||
const title = escapeMarkdownV2(obs.title ?? '');
|
||||
const subtitle = escapeMarkdownV2(obs.subtitle ?? '');
|
||||
const projectEscaped = escapeMarkdownV2(project);
|
||||
const idEscaped = escapeMarkdownV2(String(observationId));
|
||||
return `${emoji} *${type}* — ${title}\n${subtitle}\nProject: \`${projectEscaped}\` · obs \\#${idEscaped}`;
|
||||
}
|
||||
|
||||
async function postOne(botToken: string, chatId: string, text: string): Promise<void> {
|
||||
const url = `https://api.telegram.org/bot${botToken}/sendMessage`;
|
||||
const response = await fetch(url, {
|
||||
method: 'POST',
|
||||
headers: { 'content-type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
chat_id: chatId,
|
||||
text,
|
||||
parse_mode: 'MarkdownV2',
|
||||
}),
|
||||
});
|
||||
if (!response.ok) {
|
||||
const status = response.status;
|
||||
const statusText = response.statusText;
|
||||
throw new Error(`Telegram API responded ${status} ${statusText}`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function notifyTelegram(input: TelegramNotifyInput): Promise<void> {
|
||||
// loadFromFile merges env > settings.json > defaults so values stored in
|
||||
// ~/.claude-mem/settings.json actually take effect. SettingsDefaultsManager.get()
|
||||
// alone skips the file and would silently ignore user-configured credentials.
|
||||
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
|
||||
|
||||
if (settings.CLAUDE_MEM_TELEGRAM_ENABLED !== 'true') {
|
||||
return;
|
||||
}
|
||||
|
||||
const botToken = settings.CLAUDE_MEM_TELEGRAM_BOT_TOKEN;
|
||||
const chatId = settings.CLAUDE_MEM_TELEGRAM_CHAT_ID;
|
||||
if (!botToken || !chatId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const triggerTypes = splitCsv(settings.CLAUDE_MEM_TELEGRAM_TRIGGER_TYPES);
|
||||
const triggerConcepts = splitCsv(settings.CLAUDE_MEM_TELEGRAM_TRIGGER_CONCEPTS);
|
||||
if (triggerTypes.length === 0 && triggerConcepts.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { observations, observationIds, project, memorySessionId } = input;
|
||||
for (let i = 0; i < observations.length; i++) {
|
||||
const obs = observations[i];
|
||||
const matchesType = triggerTypes.includes(obs.type);
|
||||
const matchesConcept = obs.concepts.some(c => triggerConcepts.includes(c));
|
||||
if (!matchesType && !matchesConcept) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const observationId = observationIds[i];
|
||||
try {
|
||||
const text = formatMessage(obs, project, memorySessionId, observationId);
|
||||
await postOne(botToken, chatId, text);
|
||||
} catch (error) {
|
||||
logger.warn('TELEGRAM', 'Failed to send Telegram notification', {
|
||||
observationId,
|
||||
project,
|
||||
memorySessionId,
|
||||
type: obs.type,
|
||||
}, error as Error);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1588,11 +1588,12 @@ export class SessionStore {
|
||||
platform_source: string;
|
||||
user_prompt: string;
|
||||
custom_title: string | null;
|
||||
status: string;
|
||||
} | null {
|
||||
const stmt = this.db.prepare(`
|
||||
SELECT id, content_session_id, memory_session_id, project,
|
||||
COALESCE(platform_source, '${DEFAULT_PLATFORM_SOURCE}') as platform_source,
|
||||
user_prompt, custom_title
|
||||
user_prompt, custom_title, status
|
||||
FROM sdk_sessions
|
||||
WHERE id = ?
|
||||
LIMIT 1
|
||||
|
||||
@@ -87,6 +87,7 @@ import { SearchManager } from './worker/SearchManager.js';
|
||||
import { FormattingService } from './worker/FormattingService.js';
|
||||
import { TimelineService } from './worker/TimelineService.js';
|
||||
import { SessionEventBroadcaster } from './worker/events/SessionEventBroadcaster.js';
|
||||
import { SessionCompletionHandler } from './worker/session/SessionCompletionHandler.js';
|
||||
import { DEFAULT_CONFIG_PATH, DEFAULT_STATE_PATH, expandHomePath, loadTranscriptWatchConfig, writeSampleConfig } from './transcripts/config.js';
|
||||
import { TranscriptWatcher } from './transcripts/watcher.js';
|
||||
|
||||
@@ -152,6 +153,7 @@ export class WorkerService {
|
||||
private paginationHelper: PaginationHelper;
|
||||
private settingsManager: SettingsManager;
|
||||
private sessionEventBroadcaster: SessionEventBroadcaster;
|
||||
private sessionCompletionHandler: SessionCompletionHandler;
|
||||
private corpusStore: CorpusStore;
|
||||
|
||||
// Route handlers
|
||||
@@ -198,6 +200,11 @@ export class WorkerService {
|
||||
this.paginationHelper = new PaginationHelper(this.dbManager);
|
||||
this.settingsManager = new SettingsManager(this.dbManager);
|
||||
this.sessionEventBroadcaster = new SessionEventBroadcaster(this.sseBroadcaster, this);
|
||||
this.sessionCompletionHandler = new SessionCompletionHandler(
|
||||
this.sessionManager,
|
||||
this.sessionEventBroadcaster,
|
||||
this.dbManager
|
||||
);
|
||||
this.corpusStore = new CorpusStore();
|
||||
|
||||
// Set callback for when sessions are deleted
|
||||
@@ -305,7 +312,7 @@ export class WorkerService {
|
||||
|
||||
// Standard routes (registered AFTER guard middleware)
|
||||
this.server.registerRoutes(new ViewerRoutes(this.sseBroadcaster, this.dbManager, this.sessionManager));
|
||||
this.server.registerRoutes(new SessionRoutes(this.sessionManager, this.dbManager, this.sdkAgent, this.geminiAgent, this.openRouterAgent, this.sessionEventBroadcaster, this));
|
||||
this.server.registerRoutes(new SessionRoutes(this.sessionManager, this.dbManager, this.sdkAgent, this.geminiAgent, this.openRouterAgent, this.sessionEventBroadcaster, this, this.sessionCompletionHandler));
|
||||
this.server.registerRoutes(new DataRoutes(this.paginationHelper, this.dbManager, this.sessionManager, this.sseBroadcaster, this, this.startTime));
|
||||
this.server.registerRoutes(new SettingsRoutes(this.settingsManager));
|
||||
this.server.registerRoutes(new LogsRoutes());
|
||||
@@ -849,11 +856,26 @@ export class WorkerService {
|
||||
this.startSessionProcessor(session, 'pending-work-restart');
|
||||
this.broadcastProcessingStatus();
|
||||
} else {
|
||||
// Successful completion with no pending work — clean up session
|
||||
// removeSessionImmediate fires onSessionDeletedCallback → broadcastProcessingStatus()
|
||||
// Successful completion with no pending work — clean up session.
|
||||
// Only remove from the in-memory map if finalize succeeds; otherwise
|
||||
// leave the session in place so the 60s orphan reaper (or a future
|
||||
// retry) can repair the inconsistency. Removing a still-"active" DB
|
||||
// row from memory would orphan it indefinitely under the new
|
||||
// fire-and-forget Stop hook (no /api/sessions/complete to retry).
|
||||
session.restartGuard?.recordSuccess();
|
||||
session.consecutiveRestarts = 0;
|
||||
this.sessionManager.removeSessionImmediate(session.sessionDbId);
|
||||
let finalized = false;
|
||||
try {
|
||||
this.sessionCompletionHandler.finalizeSession(session.sessionDbId);
|
||||
finalized = true;
|
||||
} catch (err) {
|
||||
logger.warn('SESSION', 'finalizeSession failed in WorkerService generator .finally()', {
|
||||
sessionId: session.sessionDbId
|
||||
}, err as Error);
|
||||
}
|
||||
if (finalized) {
|
||||
this.sessionManager.removeSessionImmediate(session.sessionDbId);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -947,8 +969,25 @@ export class WorkerService {
|
||||
abandoned
|
||||
});
|
||||
}
|
||||
this.sessionManager.removeSessionImmediate(sessionDbId);
|
||||
this.sessionEventBroadcaster.broadcastSessionCompleted(sessionDbId);
|
||||
// Finalize so DB status + broadcast + pending-drain are consistent on fallback failure.
|
||||
// finalizeSession already broadcasts session_completed, so we don't also call
|
||||
// broadcastSessionCompleted below. On finalize failure, fall back to the
|
||||
// explicit broadcast so the UI still gets the event and leave the session
|
||||
// in memory for the orphan reaper to retry.
|
||||
let finalized = false;
|
||||
try {
|
||||
this.sessionCompletionHandler.finalizeSession(sessionDbId);
|
||||
finalized = true;
|
||||
} catch (err) {
|
||||
logger.warn('SESSION', 'finalizeSession failed in runFallbackForTerminatedSession', {
|
||||
sessionId: sessionDbId
|
||||
}, err as Error);
|
||||
}
|
||||
if (finalized) {
|
||||
this.sessionManager.removeSessionImmediate(sessionDbId);
|
||||
} else {
|
||||
this.sessionEventBroadcaster.broadcastSessionCompleted(sessionDbId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -971,8 +1010,25 @@ export class WorkerService {
|
||||
abandonedMessages: abandoned
|
||||
});
|
||||
|
||||
// removeSessionImmediate fires onSessionDeletedCallback → broadcastProcessingStatus()
|
||||
this.sessionManager.removeSessionImmediate(sessionDbId);
|
||||
// Finalize session (mark completed in DB + drain pending + broadcast). Idempotent.
|
||||
// This runs AFTER startSession() has returned, which means any summary/observation
|
||||
// writes inside processAgentResponse() are already committed to SQLite synchronously.
|
||||
// Only remove from the in-memory map if finalize succeeds; otherwise leave the
|
||||
// session in place so the 60s orphan reaper can repair the DB inconsistency.
|
||||
let finalized = false;
|
||||
try {
|
||||
this.sessionCompletionHandler.finalizeSession(sessionDbId);
|
||||
finalized = true;
|
||||
} catch (err) {
|
||||
logger.warn('SESSION', 'finalizeSession failed during terminateSession', {
|
||||
sessionId: sessionDbId, reason
|
||||
}, err as Error);
|
||||
}
|
||||
|
||||
if (finalized) {
|
||||
// removeSessionImmediate fires onSessionDeletedCallback → broadcastProcessingStatus()
|
||||
this.sessionManager.removeSessionImmediate(sessionDbId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -101,7 +101,10 @@ export class DatabaseManager {
|
||||
content_session_id: string;
|
||||
memory_session_id: string | null;
|
||||
project: string;
|
||||
platform_source: string;
|
||||
user_prompt: string;
|
||||
custom_title: string | null;
|
||||
status: string;
|
||||
} {
|
||||
const session = this.getSessionStore().getSessionById(sessionDbId);
|
||||
if (!session) {
|
||||
|
||||
@@ -15,6 +15,7 @@ import { logger } from '../../../utils/logger.js';
|
||||
import { parseObservations, parseSummary, type ParsedObservation, type ParsedSummary } from '../../../sdk/parser.js';
|
||||
import { SUMMARY_MODE_MARKER, MAX_CONSECUTIVE_SUMMARY_FAILURES } from '../../../sdk/prompts.js';
|
||||
import { updateCursorContextForProject } from '../../integrations/CursorHooksInstaller.js';
|
||||
import { notifyTelegram } from '../../integrations/TelegramNotifier.js';
|
||||
import { updateFolderClaudeMdFiles } from '../../../utils/claude-md-utils.js';
|
||||
import { getWorkerPort } from '../../../shared/worker-utils.js';
|
||||
import { SettingsDefaultsManager } from '../../../shared/SettingsDefaultsManager.js';
|
||||
@@ -213,6 +214,13 @@ export async function processAgentResponse(
|
||||
// Clear the tracking array after confirmation
|
||||
session.processingMessageIds = [];
|
||||
|
||||
void notifyTelegram({
|
||||
observations: labeledObservations,
|
||||
observationIds: result.observationIds,
|
||||
project: session.project,
|
||||
memorySessionId: session.memorySessionId,
|
||||
});
|
||||
|
||||
// AFTER transaction commits - async operations (can fail safely without data loss)
|
||||
await syncAndBroadcastObservations(
|
||||
observations,
|
||||
|
||||
@@ -13,7 +13,7 @@ import { CorpusBuilder } from '../../knowledge/CorpusBuilder.js';
|
||||
import { KnowledgeAgent } from '../../knowledge/KnowledgeAgent.js';
|
||||
import type { CorpusFilter } from '../../knowledge/types.js';
|
||||
|
||||
const ALLOWED_CORPUS_TYPES = new Set(['decision', 'bugfix', 'feature', 'refactor', 'discovery', 'change']);
|
||||
const ALLOWED_CORPUS_TYPES = new Set(['decision', 'bugfix', 'feature', 'refactor', 'discovery', 'change', 'security_alert', 'security_note']);
|
||||
|
||||
export class CorpusRoutes extends BaseRouteHandler {
|
||||
constructor(
|
||||
|
||||
@@ -38,14 +38,14 @@ export class SessionRoutes extends BaseRouteHandler {
|
||||
private geminiAgent: GeminiAgent,
|
||||
private openRouterAgent: OpenRouterAgent,
|
||||
private eventBroadcaster: SessionEventBroadcaster,
|
||||
private workerService: WorkerService
|
||||
private workerService: WorkerService,
|
||||
completionHandler: SessionCompletionHandler
|
||||
) {
|
||||
super();
|
||||
this.completionHandler = new SessionCompletionHandler(
|
||||
sessionManager,
|
||||
eventBroadcaster,
|
||||
dbManager
|
||||
);
|
||||
// Use the shared completion handler from WorkerService so the SDK-agent
|
||||
// completion path and the HTTP fallback route operate on the same instance
|
||||
// (avoids duplicate construction; keeps finalize semantics consistent).
|
||||
this.completionHandler = completionHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -289,6 +289,43 @@ export class SessionRoutes extends BaseRouteHandler {
|
||||
session.currentProvider = null;
|
||||
this.workerService.broadcastProcessingStatus();
|
||||
|
||||
// Stop-hook fire-and-forget (Phase 2): if the generator just processed
|
||||
// a summary and no work remains, the Stop hook is done and we should
|
||||
// self-clean the session. The summary write is already committed to
|
||||
// SQLite synchronously inside processAgentResponse() BEFORE startSession()
|
||||
// returns (see ResponseProcessor.ts: storeObservations() is sync, and
|
||||
// confirmProcessed() runs right after), so by the time this .finally()
|
||||
// runs the summary is durably persisted.
|
||||
//
|
||||
// We gate on lastSummaryStored so we don't finalize after every idle
|
||||
// timeout between tool calls — only when a real Stop event produced
|
||||
// a summary record.
|
||||
try {
|
||||
const pendingStore = this.sessionManager.getPendingMessageStore();
|
||||
const pendingNow = pendingStore.getPendingCount(sessionDbId);
|
||||
if (session.lastSummaryStored === true && pendingNow === 0) {
|
||||
logger.info('SESSION', 'Stop-hook self-clean: summary persisted + queue drained → finalizing', {
|
||||
sessionId: sessionDbId
|
||||
});
|
||||
// finalizeSession is idempotent and does NOT touch the in-memory map —
|
||||
// it only marks DB completed, drains any orphaned pending messages,
|
||||
// and broadcasts the completion event. sessionManager cleanup is
|
||||
// handled below by the existing abort/removeSessionImmediate flow.
|
||||
this.completionHandler.finalizeSession(sessionDbId);
|
||||
// Clear the flag so a subsequent re-activation of the same session
|
||||
// does not fire finalize again without a fresh summary.
|
||||
session.lastSummaryStored = false;
|
||||
// Ensure the session is removed from the active-sessions map so the
|
||||
// Stop-hook path doesn't depend on a later idle-timeout tick.
|
||||
this.sessionManager.removeSessionImmediate(sessionDbId);
|
||||
return;
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn('SESSION', 'finalizeSession failed in SessionRoutes generator .finally()', {
|
||||
sessionId: sessionDbId
|
||||
}, err as Error);
|
||||
}
|
||||
|
||||
// Crash recovery: If not aborted and still has work, restart (with limit)
|
||||
if (!wasAborted) {
|
||||
const pendingStore = this.sessionManager.getPendingMessageStore();
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
*/
|
||||
import { logger } from '../../../../utils/logger.js';
|
||||
|
||||
type ObservationType = 'decision' | 'bugfix' | 'feature' | 'refactor' | 'discovery' | 'change';
|
||||
type ObservationType = 'decision' | 'bugfix' | 'feature' | 'refactor' | 'discovery' | 'change' | 'security_alert' | 'security_note';
|
||||
|
||||
/**
|
||||
* Valid observation types
|
||||
@@ -16,7 +16,9 @@ export const OBSERVATION_TYPES: ObservationType[] = [
|
||||
'feature',
|
||||
'refactor',
|
||||
'discovery',
|
||||
'change'
|
||||
'change',
|
||||
'security_alert',
|
||||
'security_note'
|
||||
];
|
||||
|
||||
/**
|
||||
|
||||
@@ -22,38 +22,75 @@ export class SessionCompletionHandler {
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Complete session by database ID
|
||||
* Used by DELETE /api/sessions/:id and POST /api/sessions/:id/complete
|
||||
* Finalize a session's persistent + broadcast state.
|
||||
*
|
||||
* Idempotent — safe to call twice. The worker calls this from the SDK-agent
|
||||
* generator's finally-block (primary path), and the HTTP route
|
||||
* POST /api/sessions/complete also calls it as a backward-compat shim.
|
||||
* If the session is already marked completed in the DB, this is a no-op.
|
||||
*
|
||||
* This method intentionally does NOT touch the in-memory SessionManager map.
|
||||
* The generator's finally-block handles in-memory removal via
|
||||
* `removeSessionImmediate` (which cannot `await` the generator it's running
|
||||
* inside); the HTTP route layers `deleteSession` on top for the case where
|
||||
* the generator is still running and needs to be aborted.
|
||||
*/
|
||||
async completeByDbId(sessionDbId: number): Promise<void> {
|
||||
// Persist completion to database before in-memory cleanup (fix for #1532)
|
||||
this.dbManager.getSessionStore().markSessionCompleted(sessionDbId);
|
||||
finalizeSession(sessionDbId: number): void {
|
||||
const sessionStore = this.dbManager.getSessionStore();
|
||||
|
||||
// Delete from session manager (aborts SDK agent via SIGTERM)
|
||||
await this.sessionManager.deleteSession(sessionDbId);
|
||||
// Idempotency check: if already completed, do nothing.
|
||||
const row = sessionStore.getSessionById(sessionDbId);
|
||||
if (!row) {
|
||||
logger.debug('SESSION', 'finalizeSession: session not found, skipping', { sessionId: sessionDbId });
|
||||
return;
|
||||
}
|
||||
if (row.status === 'completed') {
|
||||
logger.debug('SESSION', 'finalizeSession: already completed, skipping', { sessionId: sessionDbId });
|
||||
return;
|
||||
}
|
||||
|
||||
// Drain orphaned pending messages left by SIGTERM.
|
||||
// When deleteSession() aborts the generator, pending messages in the queue
|
||||
// are never processed. Without drain, they stay in 'pending' status forever
|
||||
// since no future generator will pick them up for a completed session.
|
||||
// Note: this is best-effort — if a generator outlives the 30s SIGTERM timeout
|
||||
// (SessionManager.deleteSession), it may enqueue messages after this drain.
|
||||
// In practice this race is rare (zero orphans over 23 days, 3400+ observations).
|
||||
// Mark completed in DB (primary source of truth for idempotency).
|
||||
sessionStore.markSessionCompleted(sessionDbId);
|
||||
|
||||
// Drain orphaned pending messages. This is best-effort — same rationale
|
||||
// as the historical completeByDbId path: messages left 'pending' by a
|
||||
// completed session would never be picked up again.
|
||||
try {
|
||||
const pendingStore = this.sessionManager.getPendingMessageStore();
|
||||
const drainedCount = pendingStore.markAllSessionMessagesAbandoned(sessionDbId);
|
||||
if (drainedCount > 0) {
|
||||
logger.warn('SESSION', `Drained ${drainedCount} orphaned pending messages on session completion`, {
|
||||
logger.warn('SESSION', `Drained ${drainedCount} orphaned pending messages on session finalize`, {
|
||||
sessionId: sessionDbId, drainedCount
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
logger.debug('SESSION', 'Failed to drain pending queue on session completion', {
|
||||
logger.debug('SESSION', 'Failed to drain pending queue on session finalize', {
|
||||
sessionId: sessionDbId, error: e instanceof Error ? e.message : String(e)
|
||||
});
|
||||
}
|
||||
|
||||
// Broadcast session completed event
|
||||
// Broadcast session completed event (UI spinner, etc.)
|
||||
this.eventBroadcaster.broadcastSessionCompleted(sessionDbId);
|
||||
|
||||
logger.info('SESSION', 'Session finalized', { sessionId: sessionDbId });
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete session by database ID
|
||||
* Used by DELETE /api/sessions/:id and POST /api/sessions/:id/complete
|
||||
*
|
||||
* Calls `finalizeSession` (DB mark + drain + broadcast, idempotent) and then
|
||||
* aborts any running SDK agent via `sessionManager.deleteSession`. The
|
||||
* HTTP route wraps this so older callers that still POST to
|
||||
* /api/sessions/complete keep working even after the worker self-cleans.
|
||||
*/
|
||||
async completeByDbId(sessionDbId: number): Promise<void> {
|
||||
// Finalize first so the DB and broadcast state are consistent even if
|
||||
// deleteSession hangs on a slow subprocess exit.
|
||||
this.finalizeSession(sessionDbId);
|
||||
|
||||
// Abort SDK agent and clean in-memory state. Idempotent: deleteSession
|
||||
// early-returns if the session isn't in the active map.
|
||||
await this.sessionManager.deleteSession(sessionDbId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,6 +76,12 @@ export interface SettingsDefaults {
|
||||
CLAUDE_MEM_CHROMA_API_KEY: string;
|
||||
CLAUDE_MEM_CHROMA_TENANT: string;
|
||||
CLAUDE_MEM_CHROMA_DATABASE: string;
|
||||
// Telegram Notifier
|
||||
CLAUDE_MEM_TELEGRAM_ENABLED: string;
|
||||
CLAUDE_MEM_TELEGRAM_BOT_TOKEN: string;
|
||||
CLAUDE_MEM_TELEGRAM_CHAT_ID: string;
|
||||
CLAUDE_MEM_TELEGRAM_TRIGGER_TYPES: string;
|
||||
CLAUDE_MEM_TELEGRAM_TRIGGER_CONCEPTS: string;
|
||||
}
|
||||
|
||||
export class SettingsDefaultsManager {
|
||||
@@ -147,6 +153,12 @@ export class SettingsDefaultsManager {
|
||||
CLAUDE_MEM_CHROMA_API_KEY: '',
|
||||
CLAUDE_MEM_CHROMA_TENANT: 'default_tenant',
|
||||
CLAUDE_MEM_CHROMA_DATABASE: 'default_database',
|
||||
// Telegram Notifier
|
||||
CLAUDE_MEM_TELEGRAM_ENABLED: 'true',
|
||||
CLAUDE_MEM_TELEGRAM_BOT_TOKEN: '',
|
||||
CLAUDE_MEM_TELEGRAM_CHAT_ID: '',
|
||||
CLAUDE_MEM_TELEGRAM_TRIGGER_TYPES: 'security_alert',
|
||||
CLAUDE_MEM_TELEGRAM_TRIGGER_CONCEPTS: '',
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@@ -15,7 +15,7 @@ export enum LogLevel {
|
||||
SILENT = 4
|
||||
}
|
||||
|
||||
export type Component = 'HOOK' | 'WORKER' | 'SDK' | 'PARSER' | 'DB' | 'SYSTEM' | 'HTTP' | 'SESSION' | 'CHROMA' | 'CHROMA_MCP' | 'CHROMA_SYNC' | 'FOLDER_INDEX' | 'CLAUDE_MD' | 'QUEUE';
|
||||
export type Component = 'HOOK' | 'WORKER' | 'SDK' | 'PARSER' | 'DB' | 'SYSTEM' | 'HTTP' | 'SESSION' | 'CHROMA' | 'CHROMA_MCP' | 'CHROMA_SYNC' | 'FOLDER_INDEX' | 'CLAUDE_MD' | 'QUEUE' | 'TELEGRAM';
|
||||
|
||||
interface LogContext {
|
||||
sessionId?: number;
|
||||
|
||||
Reference in New Issue
Block a user