diff --git a/scripts/lib/brief-compose.mjs b/scripts/lib/brief-compose.mjs index f032de3ff..08d55a3a2 100644 --- a/scripts/lib/brief-compose.mjs +++ b/scripts/lib/brief-compose.mjs @@ -28,7 +28,12 @@ import { const SENSITIVITY_RANK = { all: 0, high: 1, critical: 2 }; -function compareRules(a, b) { +// Exported so the cron orchestration's two-pass winner walk +// (sortedDue / sortedAll) can sort each pass identically to how +// `groupEligibleRulesByUser` already orders candidates here. Kept as +// a same-shape function so callers can reuse it without re-deriving +// the priority key. +export function compareRules(a, b) { const aFull = a.variant === 'full' ? 0 : 1; const bFull = b.variant === 'full' ? 0 : 1; if (aFull !== bFull) return aFull - bFull; @@ -297,6 +302,17 @@ function digestStoryToUpstreamTopStory(s) { // to 'General' / 'Global' via filterTopStories defaults. category: typeof s?.category === 'string' ? s.category : undefined, countryCode: typeof s?.countryCode === 'string' ? s.countryCode : undefined, + // Stable digest story hash. Carried through so: + // (a) the canonical synthesis prompt can emit `rankedStoryHashes` + // referencing each story by hash (not position, not title), + // (b) `filterTopStories` can re-order the pool by ranking BEFORE + // applying the MAX_STORIES_PER_USER cap, so the model's + // editorial judgment of importance survives the cap. + // Falls back to titleHash when the digest path didn't materialise + // a primary `hash` (rare; shape varies across producer versions). + hash: typeof s?.hash === 'string' && s.hash.length > 0 + ? s.hash + : (typeof s?.titleHash === 'string' ? s.titleHash : undefined), }; } @@ -308,15 +324,37 @@ function digestStoryToUpstreamTopStory(s) { * Returns null when no story survives the sensitivity filter — caller * falls back to another variant or skips the user. * + * Pure / synchronous. The cron orchestration layer pre-resolves the + * canonical synthesis (`exec` from `generateDigestProse`) and the + * non-personalised `publicLead` (`generateDigestProsePublic`) and + * passes them in via `opts.synthesis` — this module performs no LLM + * I/O. + * * @param {object} rule — enabled alertRule row * @param {unknown[]} digestStories — output of buildDigest(rule, windowStart) * @param {{ clusters: number; multiSource: number }} insightsNumbers - * @param {{ nowMs?: number, onDrop?: import('../../shared/brief-filter.js').DropMetricsFn }} [opts] + * @param {{ + * nowMs?: number, + * onDrop?: import('../../shared/brief-filter.js').DropMetricsFn, + * synthesis?: { + * lead?: string, + * threads?: Array<{ tag: string, teaser: string }>, + * signals?: string[], + * rankedStoryHashes?: string[], + * publicLead?: string, + * publicSignals?: string[], + * publicThreads?: Array<{ tag: string, teaser: string }>, + * }, + * }} [opts] * `onDrop` is forwarded to filterTopStories so the seeder can * aggregate per-user filter-drop counts without this module knowing * how they are reported. + * `synthesis` (when provided) substitutes envelope.digest.lead / + * threads / signals / publicLead with the canonical synthesis from + * the orchestration layer, and re-orders the candidate pool by + * `synthesis.rankedStoryHashes` before applying the cap. */ -export function composeBriefFromDigestStories(rule, digestStories, insightsNumbers, { nowMs = Date.now(), onDrop } = {}) { +export function composeBriefFromDigestStories(rule, digestStories, insightsNumbers, { nowMs = Date.now(), onDrop, synthesis } = {}) { if (!Array.isArray(digestStories) || digestStories.length === 0) return null; // Default to 'high' (NOT 'all') for undefined sensitivity, aligning // with buildDigest at scripts/seed-digest-notifications.mjs:392 and @@ -335,10 +373,11 @@ export function composeBriefFromDigestStories(rule, digestStories, insightsNumbe sensitivity, maxStories: MAX_STORIES_PER_USER, onDrop, + rankedStoryHashes: synthesis?.rankedStoryHashes, }); if (stories.length === 0) return null; const issueDate = issueDateInTz(nowMs, tz); - return assembleStubbedBriefEnvelope({ + const envelope = assembleStubbedBriefEnvelope({ user: { name: userDisplayNameFromId(rule.userId), tz }, stories, issueDate, @@ -348,4 +387,35 @@ export function composeBriefFromDigestStories(rule, digestStories, insightsNumbe issuedAt: nowMs, localHour: localHourInTz(nowMs, tz), }); + // Splice canonical synthesis into the envelope's digest. Done as a + // shallow merge so the assembleStubbedBriefEnvelope path stays the + // single source for greeting/numbers/threads-default. We only + // override the LLM-driven fields when the orchestrator supplied + // them; missing fields fall back to the stub for graceful + // degradation when synthesis fails. + if (synthesis && envelope?.data?.digest) { + if (typeof synthesis.lead === 'string' && synthesis.lead.length > 0) { + envelope.data.digest.lead = synthesis.lead; + } + if (Array.isArray(synthesis.threads) && synthesis.threads.length > 0) { + envelope.data.digest.threads = synthesis.threads; + } + if (Array.isArray(synthesis.signals)) { + envelope.data.digest.signals = synthesis.signals; + } + if (typeof synthesis.publicLead === 'string' && synthesis.publicLead.length > 0) { + envelope.data.digest.publicLead = synthesis.publicLead; + } + // Public signals/threads are non-personalised siblings produced by + // generateDigestProsePublic. Captured separately from the + // personalised signals/threads above so the share-URL renderer + // never has to choose between leaking and omitting a whole page. + if (Array.isArray(synthesis.publicSignals) && synthesis.publicSignals.length > 0) { + envelope.data.digest.publicSignals = synthesis.publicSignals; + } + if (Array.isArray(synthesis.publicThreads) && synthesis.publicThreads.length > 0) { + envelope.data.digest.publicThreads = synthesis.publicThreads; + } + } + return envelope; } diff --git a/scripts/lib/brief-llm.mjs b/scripts/lib/brief-llm.mjs index 0e1a680b0..598e7ea42 100644 --- a/scripts/lib/brief-llm.mjs +++ b/scripts/lib/brief-llm.mjs @@ -15,15 +15,21 @@ // through to the original stub — the brief must always ship. // // Cache semantics: -// - brief:llm:whymatters:v1:{storyHash} — 24h, shared across users. -// whyMatters is editorial global-stakes commentary, not user -// personalisation, so per-story caching collapses N×U LLM calls -// to N. -// - brief:llm:digest:v1:{userId}:{poolHash} — 4h, per user. -// The executive summary IS personalised to a user's sensitivity -// and surfaced story pool, so cache keys include a hash of both. -// 4h balances cost vs freshness — hourly cron pays at most once -// per 4 ticks per user. +// - brief:llm:whymatters:v3:{storyHash}:{leadHash} — 24h, shared +// across users for the same (story, lead) pair. v3 includes +// SHA-256 of the resolved digest lead so per-story rationales +// re-generate when the lead changes (rationales must align with +// the headline frame). v2 rows were lead-blind and could drift. +// - brief:llm:digest:v3:{userId|public}:{sensitivity}:{poolHash} +// — 4h. The canonical synthesis is now ALWAYS produced through +// this path (formerly split with `generateAISummary` in the +// digest cron). Material includes profile-SHA, greeting bucket, +// isPublic flag, and per-story hash so cache hits never serve a +// differently-ranked or differently-personalised prompt. +// When isPublic=true, the userId slot in the key is the literal +// string 'public' so all public-share readers of the same +// (date, sensitivity, story-pool) hit the same row — no PII in +// the public cache key. v2 rows ignored on rollout. import { createHash } from 'node:crypto'; @@ -303,43 +309,112 @@ export async function generateStoryDescription(story, deps) { return parsed; } -// ── Digest prose (per user) ──────────────────────────────────────────────── +// ── Digest prose (canonical synthesis) ───────────────────────────────────── +// +// This is the single LLM call that produces the brief's executive summary. +// All channels (email HTML, plain-text, Telegram, Slack, Discord, webhook) +// AND the magazine's `digest.lead` read the same string from this output. +// The cron orchestration layer also produces a separate non-personalised +// `publicLead` via `generateDigestProsePublic` for the share-URL surface. -const DIGEST_PROSE_SYSTEM = +const DIGEST_PROSE_SYSTEM_BASE = 'You are the chief editor of WorldMonitor Brief. Given a ranked list of ' + "today's top stories for a reader, produce EXACTLY this JSON and nothing " + 'else (no markdown, no code fences, no preamble):\n' + '{\n' + ' "lead": "<2–3 sentence executive summary, editorial tone, references ' + - 'the most important 1–2 threads, addresses the reader in the third person>",\n' + + 'the most important 1–2 threads, addresses the reader directly>",\n' + ' "threads": [\n' + ' { "tag": "", ' + '"teaser": "" }\n' + ' ],\n' + - ' "signals": [""]\n' + + ' "signals": [""],\n' + + ' "rankedStoryHashes": ["", "..."]\n' + '}\n' + 'Threads: 3–6 items reflecting actual clusters in the stories. ' + - 'Signals: 2–4 items, forward-looking.'; + 'Signals: 2–4 items, forward-looking. ' + + 'rankedStoryHashes: at least the top 3 stories by editorial importance, ' + + 'using the short hash from each story line (the value inside [h:...]). ' + + 'Lead with the single most impactful development. Lead under 250 words.'; /** - * @param {Array<{ headline: string; threatLevel: string; category: string; country: string; source: string }>} stories + * Compute a coarse greeting bucket for cache-key stability. + * Greeting strings can vary in punctuation/capitalisation across + * locales; the bucket collapses them to one of three slots so the + * cache key only changes when the time-of-day window changes. + * + * @param {string|null|undefined} greeting + * @returns {'morning' | 'afternoon' | 'evening' | ''} + */ +export function greetingBucket(greeting) { + if (typeof greeting !== 'string') return ''; + const g = greeting.toLowerCase(); + if (g.includes('morning')) return 'morning'; + if (g.includes('afternoon')) return 'afternoon'; + if (g.includes('evening') || g.includes('night')) return 'evening'; + return ''; +} + +/** + * @typedef {object} DigestPromptCtx + * @property {string|null} [profile] formatted user profile lines, or null for non-personalised + * @property {string|null} [greeting] e.g. "Good morning", or null for non-personalised + * @property {boolean} [isPublic] true = strip personalisation, build a generic lead + */ + +/** + * Build the digest-prose prompt. When `ctx.profile` / `ctx.greeting` + * are present (and `ctx.isPublic !== true`), the prompt asks the + * model to address the reader by their watched assets/regions and + * open with the greeting. Otherwise the prompt produces a generic + * editorial brief safe for share-URL surfaces. + * + * Per-story line format includes a stable short-hash prefix: + * `01 [h:abc12345] [CRITICAL] Headline — Category · Country · Source` + * The model emits `rankedStoryHashes` referencing those short hashes + * so the cron can re-order envelope.stories before the cap. + * + * @param {Array<{ hash?: string; headline: string; threatLevel: string; category: string; country: string; source: string }>} stories * @param {string} sensitivity + * @param {DigestPromptCtx} [ctx] * @returns {{ system: string; user: string }} */ -export function buildDigestPrompt(stories, sensitivity) { +export function buildDigestPrompt(stories, sensitivity, ctx = {}) { + const isPublic = ctx?.isPublic === true; + const profile = !isPublic && typeof ctx?.profile === 'string' ? ctx.profile.trim() : ''; + const greeting = !isPublic && typeof ctx?.greeting === 'string' ? ctx.greeting.trim() : ''; + const lines = stories.slice(0, MAX_STORIES_PER_USER).map((s, i) => { const n = String(i + 1).padStart(2, '0'); - return `${n}. [${s.threatLevel}] ${s.headline} — ${s.category} · ${s.country} · ${s.source}`; + const sev = (s.threatLevel ?? '').toUpperCase(); + // Short hash prefix — first 8 chars of digest story hash. Keeps + // the prompt compact while remaining collision-free for ≤30 + // stories. Stories without a hash fall back to position-based + // 'p' so the prompt is always well-formed. + const shortHash = typeof s.hash === 'string' && s.hash.length >= 8 + ? s.hash.slice(0, 8) + : `p${n}`; + return `${n}. [h:${shortHash}] [${sev}] ${s.headline} — ${s.category} · ${s.country} · ${s.source}`; }); - const user = [ + + const userParts = [ `Reader sensitivity level: ${sensitivity}`, - '', - "Today's surfaced stories (ranked):", - ...lines, - ].join('\n'); - return { system: DIGEST_PROSE_SYSTEM, user }; + ]; + if (greeting) { + userParts.push('', `Open the lead with: "${greeting}."`); + } + if (profile) { + userParts.push('', 'Reader profile (use to personalise lead and signals):', profile); + } + userParts.push('', "Today's surfaced stories (ranked):", ...lines); + + return { system: DIGEST_PROSE_SYSTEM_BASE, user: userParts.join('\n') }; } +// Back-compat alias for tests that import the old constant name. +export const DIGEST_PROSE_SYSTEM = DIGEST_PROSE_SYSTEM_BASE; + /** * Strict shape check for a parsed digest-prose object. Used by BOTH * parseDigestProse (fresh LLM output) AND generateDigestProse's @@ -349,14 +424,20 @@ export function buildDigestPrompt(stories, sensitivity) { * returns the caller's object by reference so downstream writes * can't observe internal state. * + * v3 (2026-04-25): adds optional `rankedStoryHashes` — short hashes + * (≥4 chars each) that the orchestration layer maps back to digest + * story `hash` values to re-order envelope.stories before the cap. + * Field is optional so v2-shaped cache rows still pass validation + * during the rollout window — they just don't carry ranking signal. + * * @param {unknown} obj - * @returns {{ lead: string; threads: Array<{tag:string;teaser:string}>; signals: string[] } | null} + * @returns {{ lead: string; threads: Array<{tag:string;teaser:string}>; signals: string[]; rankedStoryHashes: string[] } | null} */ export function validateDigestProseShape(obj) { if (!obj || typeof obj !== 'object' || Array.isArray(obj)) return null; const lead = typeof obj.lead === 'string' ? obj.lead.trim() : ''; - if (lead.length < 40 || lead.length > 800) return null; + if (lead.length < 40 || lead.length > 1500) return null; const rawThreads = Array.isArray(obj.threads) ? obj.threads : []; const threads = rawThreads @@ -387,7 +468,18 @@ export function validateDigestProseShape(obj) { }) .slice(0, 6); - return { lead, threads, signals }; + // rankedStoryHashes: optional. When present, must be array of + // non-empty short-hash strings (≥4 chars). Each entry trimmed and + // capped to 16 chars (the prompt emits 8). Length capped to + // MAX_STORIES_PER_USER × 2 to bound prompt drift. + const rawRanked = Array.isArray(obj.rankedStoryHashes) ? obj.rankedStoryHashes : []; + const rankedStoryHashes = rawRanked + .filter((x) => typeof x === 'string') + .map((x) => x.trim().slice(0, 16)) + .filter((x) => x.length >= 4) + .slice(0, MAX_STORIES_PER_USER * 2); + + return { lead, threads, signals, rankedStoryHashes }; } /** @@ -419,10 +511,25 @@ export function parseDigestProse(text) { * about cache-hit rate; that optimisation is the wrong tradeoff for * an editorial product whose correctness bar is "matches the email". * - * v2 key space so pre-fix cache rows (under the looser key) are - * ignored on rollout — a one-tick cost to pay for clean semantics. + * v3 key space (2026-04-25): material now includes the digest-story + * `hash` (per-story rankability), `ctx.profile` SHA-256, greeting + * bucket, and isPublic flag. When `ctx.isPublic === true` the userId + * slot is replaced with the literal `'public'` so all public-share + * readers of the same (sensitivity, story-pool) hit ONE cache row + * regardless of caller — no PII in public cache keys, no per-user + * inflation. v2 rows are ignored on rollout (paid for once). + * + * @param {string} userId + * @param {Array} stories + * @param {string} sensitivity + * @param {DigestPromptCtx} [ctx] */ -function hashDigestInput(userId, stories, sensitivity) { +function hashDigestInput(userId, stories, sensitivity, ctx = {}) { + const isPublic = ctx?.isPublic === true; + const profileSha = isPublic ? '' : (typeof ctx?.profile === 'string' && ctx.profile.length > 0 + ? createHash('sha256').update(ctx.profile).digest('hex').slice(0, 16) + : ''); + const greetingSlot = isPublic ? '' : greetingBucket(ctx?.greeting); // Canonicalise as JSON of the fields the prompt actually references, // in the prompt's ranked order. Stable stringification via an array // of tuples keeps field ordering deterministic without relying on @@ -430,7 +537,13 @@ function hashDigestInput(userId, stories, sensitivity) { // slice or the cache key drifts from the prompt content. const material = JSON.stringify([ sensitivity ?? '', + profileSha, + greetingSlot, + isPublic ? 'public' : 'private', ...stories.slice(0, MAX_STORIES_PER_USER).map((s) => [ + // hash drives ranking (model emits rankedStoryHashes); without + // it the cache ignores re-ranking and stale ordering is served. + typeof s.hash === 'string' ? s.hash.slice(0, 8) : '', s.headline ?? '', s.threatLevel ?? '', s.category ?? '', @@ -439,20 +552,29 @@ function hashDigestInput(userId, stories, sensitivity) { ]), ]); const h = createHash('sha256').update(material).digest('hex').slice(0, 16); - return `${userId}:${sensitivity}:${h}`; + // userId-slot substitution for public mode — one cache row per + // (sensitivity, story-pool) shared across ALL public readers. + const userSlot = isPublic ? 'public' : userId; + return `${userSlot}:${sensitivity}:${h}`; } /** * Resolve the digest prose object via cache → LLM. + * + * Backward-compatible signature: existing 4-arg callers behave like + * today (no profile/greeting → non-personalised lead). New callers + * pass `ctx` to enable canonical synthesis with greeting + profile. + * * @param {string} userId * @param {Array} stories * @param {string} sensitivity - * @param {object} deps — { callLLM, cacheGet, cacheSet } + * @param {{ callLLM: Function; cacheGet: Function; cacheSet: Function }} deps + * @param {DigestPromptCtx} [ctx] */ -export async function generateDigestProse(userId, stories, sensitivity, deps) { - // v2 key: see hashDigestInput() comment. Full-prompt hash + strict +export async function generateDigestProse(userId, stories, sensitivity, deps, ctx = {}) { + // v3 key: see hashDigestInput() comment. Full-prompt hash + strict // shape validation on every cache hit. - const key = `brief:llm:digest:v2:${hashDigestInput(userId, stories, sensitivity)}`; + const key = `brief:llm:digest:v3:${hashDigestInput(userId, stories, sensitivity, ctx)}`; try { const hit = await deps.cacheGet(key); // CRITICAL: re-run the shape validator on cache hits. Without @@ -467,11 +589,11 @@ export async function generateDigestProse(userId, stories, sensitivity, deps) { if (validated) return validated; } } catch { /* cache miss fine */ } - const { system, user } = buildDigestPrompt(stories, sensitivity); + const { system, user } = buildDigestPrompt(stories, sensitivity, ctx); let text = null; try { text = await deps.callLLM(system, user, { - maxTokens: 700, + maxTokens: 900, temperature: 0.4, timeoutMs: 15_000, skipProviders: BRIEF_LLM_SKIP_PROVIDERS, @@ -487,6 +609,33 @@ export async function generateDigestProse(userId, stories, sensitivity, deps) { return parsed; } +/** + * Non-personalised wrapper for share-URL surfaces. Strips profile + * and greeting; substitutes 'public' for userId in the cache key + * (see hashDigestInput) so all public-share readers of the same + * (sensitivity, story-pool) hit one cache row. + * + * Note the missing `userId` parameter — by design. Callers MUST + * NOT thread their authenticated user's id through this function; + * the public lead must never carry per-user salt. + * + * @param {Array} stories + * @param {string} sensitivity + * @param {{ callLLM: Function; cacheGet: Function; cacheSet: Function }} deps + * @returns {ReturnType} + */ +export async function generateDigestProsePublic(stories, sensitivity, deps) { + // userId param to generateDigestProse is unused when isPublic=true + // (see hashDigestInput's userSlot logic). Pass an empty string so + // a typo on a future caller can't accidentally salt the public + // cache. + return generateDigestProse('', stories, sensitivity, deps, { + profile: null, + greeting: null, + isPublic: true, + }); +} + // ── Envelope enrichment ──────────────────────────────────────────────────── /** diff --git a/scripts/lib/digest-orchestration-helpers.mjs b/scripts/lib/digest-orchestration-helpers.mjs new file mode 100644 index 000000000..e8086f216 --- /dev/null +++ b/scripts/lib/digest-orchestration-helpers.mjs @@ -0,0 +1,201 @@ +// Pure helpers for the digest cron's per-user compose loop. +// +// Extracted from scripts/seed-digest-notifications.mjs so they can be +// unit-tested without dragging the cron's env-checking side effects +// (DIGEST_CRON_ENABLED check, Upstash REST helper, Convex relay +// auth) into the test runtime. The cron imports back from here. + +import { compareRules, MAX_STORIES_PER_USER } from './brief-compose.mjs'; +import { generateDigestProse } from './brief-llm.mjs'; + +/** + * Build the email subject string. Extracted so the synthesis-level + * → subject ternary can be unit-tested without standing up the whole + * cron loop. (Plan acceptance criterion A6.i.) + * + * Rules: + * - synthesisLevel 1 or 2 + non-empty briefLead → "Intelligence Brief" + * - synthesisLevel 3 OR empty/null briefLead → "Digest" + * + * Mirrors today's UX where the editorial subject only appeared when + * a real LLM-produced lead was available; the L3 stub falls back to + * the plain "Digest" subject to set reader expectations correctly. + * + * @param {{ briefLead: string | null | undefined; synthesisLevel: number; shortDate: string }} input + * @returns {string} + */ +export function subjectForBrief({ briefLead, synthesisLevel, shortDate }) { + if (briefLead && synthesisLevel >= 1 && synthesisLevel <= 2) { + return `WorldMonitor Intelligence Brief — ${shortDate}`; + } + return `WorldMonitor Digest — ${shortDate}`; +} + +/** + * Single source of truth for the digest's story window. Used by BOTH + * the compose path (digestFor closure in the cron) and the send loop. + * Without this, the brief lead can be synthesized from a 24h pool + * while the channel body ships 7d / 12h of stories — reintroducing + * the cross-surface divergence the canonical-brain refactor is meant + * to eliminate, just in a different shape. + * + * `lastSentAt` is the rule's previous successful send timestamp (ms + * since epoch) or null on first send. `defaultLookbackMs` is the + * first-send fallback (today: 24h). + * + * @param {number | null | undefined} lastSentAt + * @param {number} nowMs + * @param {number} defaultLookbackMs + * @returns {number} + */ +export function digestWindowStartMs(lastSentAt, nowMs, defaultLookbackMs) { + return lastSentAt ?? (nowMs - defaultLookbackMs); +} + +/** + * Walk an annotated rule list and return the winning candidate + + * its non-empty story pool. Two-pass: due rules first (so the + * synthesis comes from a rule that's actually sending), then ALL + * eligible rules (compose-only tick — keeps the dashboard brief + * fresh for weekly/twice_daily users). Within each pass, walk by + * compareRules priority and pick the FIRST candidate whose pool is + * non-empty AND survives `tryCompose` (when provided). + * + * Returns null when every candidate is rejected — caller skips the + * user (same as today's behavior on empty-pool exhaustion). + * + * Plan acceptance criteria A6.l (compose-only tick still works for + * weekly user) + A6.m (winner walks past empty-pool top-priority + * candidate). Codex Round-3 High #1 + Round-4 High #1 + Round-4 + * Medium #2. + * + * `tryCompose` (optional): called with `(cand, stories)` after a + * non-empty pool is found. Returning a truthy value claims the + * candidate as winner and the value is forwarded as `composeResult`. + * Returning a falsy value (e.g. composeBriefFromDigestStories + * dropped every story via its URL/headline/shape filters) walks to + * the next candidate. Without this callback, the helper preserves + * the original "first non-empty pool wins" semantics, which let a + * filter-rejected top-priority candidate suppress the brief for the + * user even when a lower-priority candidate would have shipped one. + * + * `digestFor` receives the full annotated candidate (not just the + * rule) so callers can derive a per-candidate story window from + * `cand.lastSentAt` — see `digestWindowStartMs`. + * + * `log` is the per-rejected-candidate log emitter — passed in so + * tests can capture lines without reaching for console.log. + * + * @param {Array<{ rule: object; lastSentAt: number | null; due: boolean }>} annotated + * @param {(cand: { rule: object; lastSentAt: number | null; due: boolean }) => Promise} digestFor + * @param {(line: string) => void} log + * @param {string} userId + * @param {((cand: { rule: object; lastSentAt: number | null; due: boolean }, stories: unknown[]) => Promise | unknown)} [tryCompose] + * @returns {Promise<{ winner: { rule: object; lastSentAt: number | null; due: boolean }; stories: unknown[]; composeResult?: unknown } | null>} + */ +export async function pickWinningCandidateWithPool(annotated, digestFor, log, userId, tryCompose) { + if (!Array.isArray(annotated) || annotated.length === 0) return null; + const sortedDue = annotated.filter((a) => a.due).sort((a, b) => compareRules(a.rule, b.rule)); + const sortedAll = [...annotated].sort((a, b) => compareRules(a.rule, b.rule)); + // Build the walk order, deduping by rule reference so the same + // rule isn't tried twice (a due rule appears in both sortedDue + // and sortedAll). + const seen = new Set(); + const walkOrder = []; + for (const cand of [...sortedDue, ...sortedAll]) { + if (seen.has(cand.rule)) continue; + seen.add(cand.rule); + walkOrder.push(cand); + } + for (const cand of walkOrder) { + const stories = await digestFor(cand); + if (!stories || stories.length === 0) { + log( + `[digest] brief filter drops user=${userId} ` + + `sensitivity=${cand.rule.sensitivity ?? 'high'} ` + + `variant=${cand.rule.variant ?? 'full'} ` + + `due=${cand.due} ` + + `outcome=empty-pool ` + + `in=0 dropped_severity=0 dropped_url=0 dropped_headline=0 dropped_shape=0 dropped_cap=0 out=0`, + ); + continue; + } + if (typeof tryCompose === 'function') { + const composeResult = await tryCompose(cand, stories); + if (!composeResult) { + log( + `[digest] brief filter drops user=${userId} ` + + `sensitivity=${cand.rule.sensitivity ?? 'high'} ` + + `variant=${cand.rule.variant ?? 'full'} ` + + `due=${cand.due} ` + + `outcome=filter-rejected ` + + `in=${stories.length} out=0`, + ); + continue; + } + return { winner: cand, stories, composeResult }; + } + return { winner: cand, stories }; + } + return null; +} + +/** + * Run the three-level canonical synthesis fallback chain. + * L1: full pre-cap pool + ctx (profile, greeting, !public) — canonical. + * L2: envelope-sized slice + empty ctx — degraded fallback (mirrors + * today's enrichBriefEnvelopeWithLLM behaviour). + * L3: null synthesis — caller composes from stub. + * + * Returns { synthesis, level } with `synthesis` matching + * generateDigestProse's output shape (or null on L3) and `level` + * one of {1, 2, 3}. + * + * Pure helper — no I/O beyond the deps.callLLM the inner functions + * already perform. Errors at L1 propagate to L2; L2 errors propagate + * to L3 (null/stub). `trace` callback fires per level transition so + * callers can quantify failure-mode distribution in production logs. + * + * Plan acceptance criterion A6.h (3-level fallback triggers). + * + * @param {string} userId + * @param {Array} stories — full pre-cap pool + * @param {string} sensitivity + * @param {{ profile: string | null; greeting: string | null }} ctx + * @param {{ callLLM: Function; cacheGet: Function; cacheSet: Function }} deps + * @param {(level: 1 | 2 | 3, kind: 'success' | 'fall' | 'throw', err?: unknown) => void} [trace] + * @returns {Promise<{ synthesis: object | null; level: 1 | 2 | 3 }>} + */ +export async function runSynthesisWithFallback(userId, stories, sensitivity, ctx, deps, trace) { + const noteTrace = typeof trace === 'function' ? trace : () => {}; + // L1 — canonical + try { + const l1 = await generateDigestProse(userId, stories, sensitivity, deps, { + profile: ctx?.profile ?? null, + greeting: ctx?.greeting ?? null, + isPublic: false, + }); + if (l1) { + noteTrace(1, 'success'); + return { synthesis: l1, level: 1 }; + } + noteTrace(1, 'fall'); + } catch (err) { + noteTrace(1, 'throw', err); + } + // L2 — degraded fallback + try { + const cappedSlice = (Array.isArray(stories) ? stories : []).slice(0, MAX_STORIES_PER_USER); + const l2 = await generateDigestProse(userId, cappedSlice, sensitivity, deps); + if (l2) { + noteTrace(2, 'success'); + return { synthesis: l2, level: 2 }; + } + noteTrace(2, 'fall'); + } catch (err) { + noteTrace(2, 'throw', err); + } + // L3 — stub + noteTrace(3, 'success'); + return { synthesis: null, level: 3 }; +} diff --git a/scripts/seed-digest-notifications.mjs b/scripts/seed-digest-notifications.mjs index 4e0b6c12d..13c7a8974 100644 --- a/scripts/seed-digest-notifications.mjs +++ b/scripts/seed-digest-notifications.mjs @@ -33,12 +33,25 @@ const { normalizeResendSender } = require('./lib/resend-from.cjs'); import { readRawJsonFromUpstash, redisPipeline } from '../api/_upstash-json.js'; import { composeBriefFromDigestStories, + compareRules, extractInsights, groupEligibleRulesByUser, + MAX_STORIES_PER_USER, shouldExitNonZero as shouldExitOnBriefFailures, } from './lib/brief-compose.mjs'; +import { + digestWindowStartMs, + pickWinningCandidateWithPool, + runSynthesisWithFallback, + subjectForBrief, +} from './lib/digest-orchestration-helpers.mjs'; import { issueSlotInTz } from '../shared/brief-filter.js'; -import { enrichBriefEnvelopeWithLLM } from './lib/brief-llm.mjs'; +import { + enrichBriefEnvelopeWithLLM, + generateDigestProse, + generateDigestProsePublic, + greetingBucket, +} from './lib/brief-llm.mjs'; import { parseDigestOnlyUser } from './lib/digest-only-user.mjs'; import { assertBriefEnvelope } from '../server/_shared/brief-render.js'; import { signBriefUrl, BriefUrlError } from './lib/brief-url-sign.mjs'; @@ -93,7 +106,6 @@ const DIGEST_LOOKBACK_MS = 24 * 60 * 60 * 1000; // 24h default lookback on first const DIGEST_CRITICAL_LIMIT = Infinity; const DIGEST_HIGH_LIMIT = 15; const DIGEST_MEDIUM_LIMIT = 10; -const AI_SUMMARY_CACHE_TTL = 3600; // 1h const AI_DIGEST_ENABLED = process.env.AI_DIGEST_ENABLED !== '0'; const ENTITLEMENT_CACHE_TTL = 900; // 15 min @@ -116,12 +128,13 @@ const BRIEF_URL_SIGNING_SECRET = process.env.BRIEF_URL_SIGNING_SECRET ?? ''; const WORLDMONITOR_PUBLIC_BASE_URL = process.env.WORLDMONITOR_PUBLIC_BASE_URL ?? 'https://worldmonitor.app'; const BRIEF_TTL_SECONDS = 7 * 24 * 60 * 60; // 7 days -// The brief is a once-per-day editorial snapshot. 24h is the natural -// window regardless of a user's email cadence (daily / twice_daily / -// weekly) — weekly subscribers still expect a fresh brief each day -// in the dashboard panel. Matches DIGEST_LOOKBACK_MS so first-send -// users see identical story pools in brief and email. -const BRIEF_STORY_WINDOW_MS = 24 * 60 * 60 * 1000; +// Brief story window: derived per-rule from the rule's lastSentAt via +// digestWindowStartMs, identical to the send-loop window. The previous +// fixed-24h constant decoupled the canonical brief lead from the +// stories the email/Slack body actually shipped, reintroducing the +// cross-surface divergence the canonical-brain refactor is designed to +// eliminate (especially severe for weekly users — 7d email body vs 24h +// lead). const INSIGHTS_KEY = 'news:insights:v1'; // Operator kill switch — used to intentionally silence brief compose @@ -309,6 +322,66 @@ function toLocalHour(nowMs, timezone) { } } +/** + * Read digest:last-sent:v1:{userId}:{variant} from Upstash. Returns + * null on miss / parse error / network hiccup so the caller can treat + * "first send" and "transient lookup failure" the same way (both fall + * through to isDue's `lastSentAt === null` branch). Extracted so the + * compose-flow's per-rule annotation pass and the send loop can share + * one source of truth — Codex Round-3 High #1 + Round-4 fixes. + * + * @param {{ userId: string; variant?: string }} rule + * @returns {Promise} + */ +async function getLastSentAt(rule) { + if (!rule?.userId || !rule.variant) return null; + const key = `digest:last-sent:v1:${rule.userId}:${rule.variant}`; + try { + const raw = await upstashRest('GET', key); + if (!raw) return null; + const parsed = JSON.parse(raw); + return typeof parsed.sentAt === 'number' ? parsed.sentAt : null; + } catch { + return null; + } +} + +/** + * Build the synthesis context (profile, greeting) for the canonical + * synthesis call. profile is the formatted user-context line block; + * greeting is the time-of-day-appropriate opener. Both are stripped + * by `generateDigestProsePublic` for the share-URL surface; this + * function is for the personalised path only. + * + * Defensive: prefs lookup failures degrade to a non-personalised + * synthesis (profile=null) rather than blocking the brief — same + * pattern the legacy generateAISummary used. + * + * @param {{ userId: string; variant?: string; digestTimezone?: string }} rule + * @param {number} nowMs + * @returns {Promise<{ profile: string | null; greeting: string | null }>} + */ +async function buildSynthesisCtx(rule, nowMs) { + if (!rule?.userId) return { profile: null, greeting: null }; + let profile = null; + try { + const { data: prefs } = await fetchUserPreferences(rule.userId, rule.variant ?? 'full'); + if (prefs) { + const ctx = extractUserContext(prefs); + profile = formatUserProfile(ctx, rule.variant ?? 'full'); + } + } catch { + /* prefs unavailable — degrade to non-personalised */ + } + const tz = rule.digestTimezone ?? 'UTC'; + const localHour = toLocalHour(nowMs, tz); + const greeting = localHour >= 5 && localHour < 12 ? 'Good morning' + : localHour >= 12 && localHour < 17 ? 'Good afternoon' + : localHour >= 17 && localHour < 22 ? 'Good evening' + : 'Good evening'; + return { profile, greeting }; +} + function isDue(rule, lastSentAt) { const nowMs = Date.now(); const tz = rule.digestTimezone ?? 'UTC'; @@ -701,95 +774,23 @@ function formatDigestHtml(stories, nowMs) { `; } -// ── AI summary generation ──────────────────────────────────────────────────── - -function hashShort(str) { - return createHash('sha256').update(str).digest('hex').slice(0, 16); -} - -async function generateAISummary(stories, rule) { - if (!AI_DIGEST_ENABLED) return null; - if (!stories || stories.length === 0) return null; - - // rule.aiDigestEnabled (from alertRules) is the user's explicit opt-in for - // AI summaries. userPreferences is a SEPARATE table (SPA app settings blob: - // watchlist, airports, panels). A user can have alertRules without having - // ever saved userPreferences — or under a different variant. Missing prefs - // must NOT silently disable the feature the user just enabled; degrade to - // a non-personalized summary instead. - // error: true = transient fetch failure (network, non-OK HTTP, env missing) - // error: false = the (userId, variant) row genuinely does not exist - // Both cases degrade to a non-personalized summary, but log them distinctly - // so transient fetch failures are visible in observability. - const { data: prefs, error: prefsFetchError } = await fetchUserPreferences(rule.userId, rule.variant ?? 'full'); - if (!prefs) { - console.log( - prefsFetchError - ? `[digest] Prefs fetch failed for ${rule.userId} — generating non-personalized AI summary` - : `[digest] No stored preferences for ${rule.userId} — generating non-personalized AI summary`, - ); - } - const ctx = extractUserContext(prefs); - const profile = formatUserProfile(ctx, rule.variant ?? 'full'); - - const variant = rule.variant ?? 'full'; - const tz = rule.digestTimezone ?? 'UTC'; - const localHour = toLocalHour(Date.now(), tz); - if (localHour === -1) console.warn(`[digest] Bad timezone "${tz}" for ${rule.userId} — defaulting to evening greeting`); - const greeting = localHour >= 5 && localHour < 12 ? 'Good morning' - : localHour >= 12 && localHour < 17 ? 'Good afternoon' - : 'Good evening'; - const storiesHash = hashShort(stories.map(s => - `${s.titleHash ?? s.title}:${s.severity ?? ''}:${s.phase ?? ''}:${(s.sources ?? []).slice(0, 3).join(',')}` - ).sort().join('|')); - const ctxHash = hashShort(JSON.stringify(ctx)); - const cacheKey = `digest:ai-summary:v1:${variant}:${greeting}:${storiesHash}:${ctxHash}`; - - try { - const cached = await upstashRest('GET', cacheKey); - if (cached) { - console.log(`[digest] AI summary cache hit for ${rule.userId}`); - return cached; - } - } catch { /* miss */ } - - const dateStr = new Date().toISOString().split('T')[0]; - const storyList = stories.slice(0, 20).map((s, i) => { - const phase = s.phase ? ` [${s.phase}]` : ''; - const src = s.sources?.length > 0 ? ` (${s.sources.slice(0, 2).join(', ')})` : ''; - return `${i + 1}. [${(s.severity ?? 'high').toUpperCase()}]${phase} ${s.title}${src}`; - }).join('\n'); - - const systemPrompt = `You are WorldMonitor's intelligence analyst. Today is ${dateStr} UTC. -Write a personalized daily brief for a user focused on ${rule.variant ?? 'full'} intelligence. -The user's local time greeting is "${greeting}" — use this exact greeting to open the brief. - -User profile: -${profile} - -Rules: -- Open with "${greeting}." followed by the brief -- Lead with the single most impactful development for this user -- Connect events to watched assets/regions where relevant -- 3-5 bullet points, 1-2 sentences each -- Flag anything directly affecting watched assets -- Separate facts from assessment -- End with "Signals to watch:" (1-2 items) -- Under 250 words`; - - const summary = await callLLM(systemPrompt, storyList, { maxTokens: 600, temperature: 0.3, timeoutMs: 15_000, skipProviders: ['groq'] }); - if (!summary) { - console.warn(`[digest] AI summary generation failed for ${rule.userId}`); - return null; - } - - try { - await upstashRest('SET', cacheKey, summary, 'EX', String(AI_SUMMARY_CACHE_TTL)); - } catch { /* best-effort cache write */ } - - console.log(`[digest] AI summary generated for ${rule.userId} (${summary.length} chars)`); - return summary; -} +// ── (Removed) standalone generateAISummary ─────────────────────────────────── +// +// Prior to 2026-04-25 a separate `generateAISummary()` here ran a +// second LLM call per send to produce the email's exec-summary +// block, independent of the brief envelope's `digest.lead`. That +// asymmetry was the root cause of the email/brief contradiction +// (different inputs, different leads, different ranked stories). +// +// The synthesis is now produced ONCE per user by +// `generateDigestProse(userId, fullPool, sensitivity, deps, ctx)` +// in composeAndStoreBriefForUser, written into +// `envelope.data.digest.lead`, and read by every channel +// (email HTML, plain-text, Telegram, Slack, Discord, webhook). See +// docs/plans/2026-04-25-002-fix-brief-email-two-brain-divergence-plan.md. +// +// The `digest:ai-summary:v1:*` cache rows from the legacy code path +// expire on their existing 1h TTL — no cleanup pass needed. // ── Channel deactivation ────────────────────────────────────────────────────── @@ -1254,22 +1255,50 @@ async function composeBriefsForRun(rules, nowMs) { // inherits a looser populator's pool (the earlier populator "wins" // and decides which severity tiers enter the pool, so stricter // users get a pool that contains severities they never wanted). - const windowStart = nowMs - BRIEF_STORY_WINDOW_MS; + // + // windowStart is derived per-candidate from `lastSentAt`, matching + // the send loop's formula exactly (digestWindowStartMs). Without + // this, the canonical brief lead would be synthesized from a fixed + // 24h pool while the email/Slack body ships the actual cadence's + // window (7d for weekly, 12h for twice_daily) — a different flavor + // of the cross-surface divergence the canonical-brain refactor is + // designed to eliminate. const digestCache = new Map(); - async function digestFor(candidate) { - const key = `${candidate.variant ?? 'full'}:${candidate.lang ?? 'en'}:${candidate.sensitivity ?? 'high'}:${windowStart}`; + async function digestFor(cand) { + const windowStart = digestWindowStartMs(cand.lastSentAt, nowMs, DIGEST_LOOKBACK_MS); + const key = `${cand.rule.variant ?? 'full'}:${cand.rule.lang ?? 'en'}:${cand.rule.sensitivity ?? 'high'}:${windowStart}`; if (digestCache.has(key)) return digestCache.get(key); - const stories = await buildDigest(candidate, windowStart); + const stories = await buildDigest(cand.rule, windowStart); digestCache.set(key, stories ?? []); return stories ?? []; } - const eligibleByUser = groupEligibleRulesByUser(rules); + // Pre-annotate every eligible rule with its lastSentAt + isDue + // status. The compose flow uses this to prefer a "due-this-tick" + // candidate as the canonical synthesis source, falling back to any + // eligible candidate when nothing is due (preserving today's + // dashboard refresh contract for weekly users on non-due ticks). + // Codex Round-3 High #1 + Round-4 High #1 + Round-4 Medium #2. + // + // One Upstash GET per rule per tick; with caching across rules of + // the same user this is cheap. The send loop in main() reads from + // this same map (via getLastSentAt) so compose + send agree on + // lastSentAt for every rule. + const annotatedByUser = new Map(); + for (const [userId, candidates] of groupEligibleRulesByUser(rules)) { + const annotated = []; + for (const rule of candidates) { + const lastSentAt = await getLastSentAt(rule); + annotated.push({ rule, lastSentAt, due: isDue(rule, lastSentAt) }); + } + annotatedByUser.set(userId, annotated); + } + let composeSuccess = 0; let composeFailed = 0; - for (const [userId, candidates] of eligibleByUser) { + for (const [userId, annotated] of annotatedByUser) { try { - const hit = await composeAndStoreBriefForUser(userId, candidates, insightsNumbers, digestFor, nowMs); + const hit = await composeAndStoreBriefForUser(userId, annotated, insightsNumbers, digestFor, nowMs); if (hit) { briefByUser.set(userId, hit); composeSuccess++; @@ -1284,114 +1313,188 @@ async function composeBriefsForRun(rules, nowMs) { } } console.log( - `[digest] brief: compose_success=${composeSuccess} compose_failed=${composeFailed} total_users=${eligibleByUser.size}`, + `[digest] brief: compose_success=${composeSuccess} compose_failed=${composeFailed} total_users=${annotatedByUser.size}`, ); return { briefByUser, composeSuccess, composeFailed }; } /** - * Per-user: walk candidates, for each pull the per-variant digest - * story pool (same pool buildDigest feeds to the email), and compose - * the brief envelope from the first candidate that yields non-empty - * stories. SETEX the envelope, sign the magazine URL. Returns the - * entry the caller should stash in briefByUser, or null when no - * candidate had stories. + * Per-user: pick a winning candidate (DUE rules first, then any + * eligible rule), pull its digest pool, run canonical synthesis + * over the FULL pre-cap pool, then compose the envelope with the + * synthesis spliced in. SETEX the envelope, sign the magazine URL. + * + * Returns the entry the caller should stash in briefByUser, or null + * when no candidate had stories. The entry's `synthesisLevel` field + * tells the send loop which fallback path produced the lead (1 = + * canonical, 2 = degraded, 3 = stub) — drives the email subject-line + * ternary and the parity log. + * + * @param {string} userId + * @param {Array<{ rule: object; lastSentAt: number | null; due: boolean }>} annotated + * @param {{ clusters: number; multiSource: number }} insightsNumbers + * @param {(rule: object) => Promise} digestFor + * @param {number} nowMs */ -async function composeAndStoreBriefForUser(userId, candidates, insightsNumbers, digestFor, nowMs) { - let envelope = null; - let chosenVariant = null; - let chosenCandidate = null; - for (const candidate of candidates) { - const digestStories = await digestFor(candidate); - if (!digestStories || digestStories.length === 0) continue; - const dropStats = { severity: 0, headline: 0, url: 0, shape: 0, cap: 0, in: digestStories.length }; - const composed = composeBriefFromDigestStories( - candidate, - digestStories, - insightsNumbers, - { - nowMs, - onDrop: (ev) => { dropStats[ev.reason] = (dropStats[ev.reason] ?? 0) + 1; }, +async function composeAndStoreBriefForUser(userId, annotated, insightsNumbers, digestFor, nowMs) { + // Two-pass walk extracted to a pure helper so it can be unit-tested + // (A6.l + A6.m). When no candidate has a non-empty pool — OR when + // every non-empty candidate has its stories filtered out by the + // composer (URL/headline/shape filters) — returns null. + // + // The `tryCompose` callback is the filter-rejection fall-through: + // before the original PR, the legacy loop kept trying lower-priority + // candidates whenever compose returned null. Without this hook the + // helper would claim the first non-empty pool as winner and the + // caller would bail on filter-drop, suppressing briefs that a + // lower-priority candidate would have produced. + // + // We compose WITHOUT synthesis here (cheap — pure JS, no I/O) just + // to check filter survival; the real composition with synthesis + // splice-in happens once below, after the winner is locked in. + const log = (line) => console.log(line); + const winnerResult = await pickWinningCandidateWithPool( + annotated, + digestFor, + log, + userId, + (cand, stories) => { + const test = composeBriefFromDigestStories( + cand.rule, + stories, + insightsNumbers, + { nowMs }, + ); + return test ?? null; + }, + ); + if (!winnerResult) return null; + const { winner, stories: winnerStories } = winnerResult; + + // ── Canonical synthesis (3-level fallback chain) ──────────────────── + // + // L1: full pre-cap pool + personalised ctx (profile, greeting). The + // desired outcome — single LLM call per user, lead anchored on + // the wider story set the model has the most signal from. + // L2: post-cap envelope-only + empty ctx. Mirrors today's + // enrichBriefEnvelopeWithLLM behavior — used when L1 returns + // null (LLM down across all providers, parse failure). + // L3: stub from assembleStubbedBriefEnvelope. The brief still + // ships; only the lead text degrades. Email subject downgrades + // from "Intelligence Brief" to "Digest" (driven by + // synthesisLevel === 3 in the send loop). + const sensitivity = winner.rule.sensitivity ?? 'high'; + let synthesis = null; + let publicLead = null; + let synthesisLevel = 3; // pessimistic default; bumped on success + if (BRIEF_LLM_ENABLED) { + const ctx = await buildSynthesisCtx(winner.rule, nowMs); + const result = await runSynthesisWithFallback( + userId, + winnerStories, + sensitivity, + ctx, + briefLlmDeps, + (level, kind, err) => { + if (kind === 'throw') { + console.warn( + `[digest] brief: synthesis L${level} threw for ${userId} — falling to L${level + 1}:`, + err?.message, + ); + } else if (kind === 'success' && level === 2) { + console.log(`[digest] synthesis level=2_degraded user=${userId}`); + } else if (kind === 'success' && level === 3) { + console.log(`[digest] synthesis level=3_stub user=${userId}`); + } }, ); - - // Per-attempt filter-drop line. Emits one structured row for every - // candidate whose digest pool was non-empty, tagged with that - // candidate's own sensitivity and variant. See Solution 0 in - // docs/plans/2026-04-24-004-fix-brief-topic-adjacency-defects-plan.md - // for why this log exists (deciding whether Solution 3 is warranted). - // - // Emitting per attempt — not per user — because: - // - A user can have multiple rules with different sensitivities; - // a single-row-per-user log would have to either pick one - // sensitivity arbitrarily or label as 'mixed', hiding drops - // from the non-winning candidates. - // - An earlier candidate wiped out by post-group filtering (the - // exact signal Sol-0 targets) is invisible if only the winner - // is logged. Every attempt emits its own row so the fallback - // chain is visible. - // - // Outcomes per row: - // outcome=shipped — this candidate's envelope shipped; loop breaks. - // outcome=rejected — composed was null (every story filtered out); - // loop continues to the next candidate. - // - // A user whose every row is `outcome=rejected` is a wipeout — - // operators detect it by grouping rows by user and checking for - // absence of `outcome=shipped` within the tick. - const out = composed?.data?.stories?.length ?? 0; - console.log( - `[digest] brief filter drops user=${userId} ` + - `sensitivity=${candidate.sensitivity ?? 'high'} ` + - `variant=${candidate.variant ?? 'full'} ` + - `outcome=${composed ? 'shipped' : 'rejected'} ` + - `in=${dropStats.in} ` + - `dropped_severity=${dropStats.severity} ` + - `dropped_url=${dropStats.url} ` + - `dropped_headline=${dropStats.headline} ` + - `dropped_shape=${dropStats.shape} ` + - `dropped_cap=${dropStats.cap} ` + - `out=${out}`, - ); - - if (composed) { - envelope = composed; - chosenVariant = candidate.variant; - chosenCandidate = candidate; - break; + synthesis = result.synthesis; + synthesisLevel = result.level; + // Public synthesis — parallel call. Profile-stripped; cache- + // shared across all users for the same (date, sensitivity, + // story-pool). Captures the FULL prose object (lead + signals + + // threads) since each personalised counterpart in the envelope + // can carry profile bias and the public surface needs sibling + // safe-versions of all three. Failure is non-fatal — the + // renderer's public-mode fail-safes (omit pull-quote / omit + // signals page / category-derived threads stub) handle absence + // rather than leaking the personalised version. + try { + const pub = await generateDigestProsePublic(winnerStories, sensitivity, briefLlmDeps); + if (pub) publicLead = pub; // { lead, threads, signals, rankedStoryHashes } + } catch (err) { + console.warn(`[digest] brief: publicLead generation failed for ${userId}:`, err?.message); } } + // Compose envelope with synthesis pre-baked. The composer applies + // rankedStoryHashes-aware ordering BEFORE the cap, so the model's + // editorial judgment of importance survives MAX_STORIES_PER_USER. + const dropStats = { severity: 0, headline: 0, url: 0, shape: 0, cap: 0, in: winnerStories.length }; + const envelope = composeBriefFromDigestStories( + winner.rule, + winnerStories, + insightsNumbers, + { + nowMs, + onDrop: (ev) => { dropStats[ev.reason] = (dropStats[ev.reason] ?? 0) + 1; }, + synthesis: synthesis || publicLead + ? { + ...(synthesis ?? {}), + publicLead: publicLead?.lead ?? undefined, + publicSignals: publicLead?.signals ?? undefined, + publicThreads: publicLead?.threads ?? undefined, + } + : undefined, + }, + ); + + // Per-attempt filter-drop line for the winning candidate. Same + // shape today's log emits — operators can keep their existing + // queries. The `due` field is new; legacy parsers ignore unknown + // fields. + const out = envelope?.data?.stories?.length ?? 0; + console.log( + `[digest] brief filter drops user=${userId} ` + + `sensitivity=${sensitivity} ` + + `variant=${winner.rule.variant ?? 'full'} ` + + `due=${winner.due} ` + + `outcome=${envelope ? 'shipped' : 'rejected'} ` + + `in=${dropStats.in} ` + + `dropped_severity=${dropStats.severity} ` + + `dropped_url=${dropStats.url} ` + + `dropped_headline=${dropStats.headline} ` + + `dropped_shape=${dropStats.shape} ` + + `dropped_cap=${dropStats.cap} ` + + `out=${out}`, + ); + if (!envelope) return null; - // Phase 3b — LLM enrichment. Substitutes the stubbed whyMatters / - // lead / threads / signals fields with Gemini 2.5 Flash output. - // Pure passthrough on any failure: the baseline envelope has - // already passed validation and is safe to ship as-is. Do NOT - // abort composition if the LLM is down; the stub is better than - // no brief. - if (BRIEF_LLM_ENABLED && chosenCandidate) { - const baseline = envelope; + // Per-story whyMatters enrichment. The synthesis is already in the + // envelope; this pass only fills per-story rationales. Failures + // fall through cleanly — the stub `whyMatters` from the composer + // is acceptable. + let finalEnvelope = envelope; + if (BRIEF_LLM_ENABLED) { try { - const enriched = await enrichBriefEnvelopeWithLLM(envelope, chosenCandidate, briefLlmDeps); + const enriched = await enrichBriefEnvelopeWithLLM(envelope, winner.rule, briefLlmDeps); // Defence in depth: re-validate the enriched envelope against // the renderer's strict contract before we SETEX it. If // enrichment produced a structurally broken shape (bad cache // row, code bug, upstream type drift) we'd otherwise SETEX it // and /api/brief would 404 the user's brief at read time. Fall - // back to the unenriched baseline — which is already known to + // back to the unenriched envelope — which is already known to // pass assertBriefEnvelope() because composeBriefFromDigestStories // asserted on construction. try { assertBriefEnvelope(enriched); - envelope = enriched; + finalEnvelope = enriched; } catch (assertErr) { - console.warn(`[digest] brief: enriched envelope failed assertion for ${userId} — shipping stubbed:`, assertErr?.message); - envelope = baseline; + console.warn(`[digest] brief: enriched envelope failed assertion for ${userId} — shipping unenriched:`, assertErr?.message); } } catch (err) { - console.warn(`[digest] brief: LLM enrichment threw for ${userId} — shipping stubbed envelope:`, err?.message); - envelope = baseline; + console.warn(`[digest] brief: per-story enrichment threw for ${userId} — shipping unenriched envelope:`, err?.message); } } @@ -1400,7 +1503,7 @@ async function composeAndStoreBriefForUser(userId, candidates, insightsNumbers, // produce envelope.data.date guarantees the slot's date portion // matches the displayed date. Two same-day compose runs produce // distinct slots so each digest dispatch freezes its own URL. - const briefTz = chosenCandidate?.digestTimezone ?? 'UTC'; + const briefTz = winner.rule?.digestTimezone ?? 'UTC'; const issueSlot = issueSlotInTz(nowMs, briefTz); const key = `brief:${userId}:${issueSlot}`; // The latest-pointer lets readers (dashboard panel, share-url @@ -1409,7 +1512,7 @@ async function composeAndStoreBriefForUser(userId, candidates, insightsNumbers, const latestPointerKey = `brief:latest:${userId}`; const latestPointerValue = JSON.stringify({ issueSlot }); const pipelineResult = await redisPipeline([ - ['SETEX', key, String(BRIEF_TTL_SECONDS), JSON.stringify(envelope)], + ['SETEX', key, String(BRIEF_TTL_SECONDS), JSON.stringify(finalEnvelope)], ['SETEX', latestPointerKey, String(BRIEF_TTL_SECONDS), latestPointerValue], ]); if (!pipelineResult || !Array.isArray(pipelineResult) || pipelineResult.length < 2) { @@ -1427,7 +1530,15 @@ async function composeAndStoreBriefForUser(userId, candidates, insightsNumbers, baseUrl: WORLDMONITOR_PUBLIC_BASE_URL, secret: BRIEF_URL_SIGNING_SECRET, }); - return { envelope, magazineUrl, chosenVariant }; + return { + envelope: finalEnvelope, + magazineUrl, + chosenVariant: winner.rule.variant, + // synthesisLevel goes here — NOT in the envelope (renderer's + // assertNoExtraKeys would reject it). Read by the send loop for + // the email subject-line ternary and the parity log. + synthesisLevel, + }; } // ── Main ────────────────────────────────────────────────────────────────────── @@ -1522,14 +1633,10 @@ async function main() { if (!rule.userId || !rule.variant) continue; const lastSentKey = `digest:last-sent:v1:${rule.userId}:${rule.variant}`; - let lastSentAt = null; - try { - const raw = await upstashRest('GET', lastSentKey); - if (raw) { - const parsed = JSON.parse(raw); - lastSentAt = typeof parsed.sentAt === 'number' ? parsed.sentAt : null; - } - } catch { /* first send */ } + // Reuse the same getLastSentAt helper the compose pass used so + // the two flows agree on lastSentAt for every rule. Codex Round-3 + // High #1 — winner-from-due-candidates pre-condition. + const lastSentAt = await getLastSentAt(rule); if (!isDue(rule, lastSentAt)) continue; @@ -1539,7 +1646,7 @@ async function main() { continue; } - const windowStart = lastSentAt ?? (nowMs - DIGEST_LOOKBACK_MS); + const windowStart = digestWindowStartMs(lastSentAt, nowMs, DIGEST_LOOKBACK_MS); const stories = await buildDigest(rule, windowStart); if (!stories) { console.log(`[digest] No stories in window for ${rule.userId} (${rule.variant})`); @@ -1570,27 +1677,57 @@ async function main() { continue; } - let aiSummary = null; + // Per-rule synthesis: each due rule's channel body must be + // internally consistent (lead derived from THIS rule's pool, not + // some other rule's). For multi-rule users, the compose flow + // picked ONE winning rule for the magazine envelope, but the + // send-loop body for a non-winner rule needs ITS OWN lead — else + // the email leads with one pool's narrative while listing stories + // from another pool. Cache absorbs the cost: when this is the + // winning rule, generateDigestProse hits the cache row written + // during the compose pass (same userId/sensitivity/pool/ctx) and + // no extra LLM call fires. + // + // The magazineUrl still points at the winner's envelope — that + // surface is the share-worthy alpha and remains a single brief + // per user per slot. Channel-body lead vs magazine lead may + // therefore differ for non-winner rules; users on those rules + // see their own coherent email + a magazine that shows the + // winner's editorial. Acceptable trade-off given multi-rule + // users are rare and the `(userId, issueSlot)` URL contract + // can't represent multiple per-rule briefs without an + // architectural change to the URL signer + Redis key. + const brief = briefByUser.get(rule.userId); + let briefLead = null; + let synthesisLevel = 3; if (AI_DIGEST_ENABLED && rule.aiDigestEnabled !== false) { - aiSummary = await generateAISummary(stories, rule); + const ruleCtx = await buildSynthesisCtx(rule, nowMs); + const ruleResult = await runSynthesisWithFallback( + rule.userId, + stories, + rule.sensitivity ?? 'high', + ruleCtx, + briefLlmDeps, + ); + briefLead = ruleResult.synthesis?.lead ?? null; + synthesisLevel = ruleResult.level; } const storyListPlain = formatDigest(stories, nowMs); if (!storyListPlain) continue; const htmlRaw = formatDigestHtml(stories, nowMs); - const brief = briefByUser.get(rule.userId); const magazineUrl = brief?.magazineUrl ?? null; const { text, telegramText, slackText, discordText } = buildChannelBodies( storyListPlain, - aiSummary, + briefLead, magazineUrl, ); - const htmlWithSummary = injectEmailSummary(htmlRaw, aiSummary); + const htmlWithSummary = injectEmailSummary(htmlRaw, briefLead); const html = injectBriefCta(htmlWithSummary, magazineUrl); const shortDate = new Intl.DateTimeFormat('en-US', { month: 'short', day: 'numeric' }).format(new Date(nowMs)); - const subject = aiSummary ? `WorldMonitor Intelligence Brief — ${shortDate}` : `WorldMonitor Digest — ${shortDate}`; + const subject = subjectForBrief({ briefLead, synthesisLevel, shortDate }); let anyDelivered = false; @@ -1613,7 +1750,11 @@ async function main() { } else if (ch.channelType === 'email' && ch.email) { ok = await sendEmail(ch.email, subject, text, html); } else if (ch.channelType === 'webhook' && ch.webhookEnvelope) { - ok = await sendWebhook(rule.userId, ch.webhookEnvelope, stories, aiSummary); + // Webhook payload's `summary` field reads the canonical + // briefLead — same string the email exec block + magazine + // pull-quote use. Codex Round-1 Medium #6 (channel-scope + // parity). + ok = await sendWebhook(rule.userId, ch.webhookEnvelope, stories, briefLead); } if (ok) anyDelivered = true; } @@ -1626,6 +1767,52 @@ async function main() { console.log( `[digest] Sent ${stories.length} stories to ${rule.userId} (${rule.variant}, ${rule.digestMode})`, ); + // Parity observability. Two distinct properties to track: + // + // 1. CHANNEL parity (load-bearing): for ONE send, every channel + // body of THIS rule (email HTML + plain text + Telegram + + // Slack + Discord + webhook) reads the same `briefLead` + // string. Verifiable by code review (single variable threaded + // everywhere); logged here as `exec_len` for telemetry. + // + // 2. WINNER parity (informational): when `winner_match=true`, + // THIS rule is the same one the magazine envelope was + // composed from — so channel lead == magazine lead. When + // `winner_match=false`, this is a non-winner rule send; + // channel lead reflects this rule's pool while the magazine + // URL points at the winner's editorial. Expected divergence, + // not a regression. + // + // PARITY REGRESSION fires only when winner_match=true AND the + // channel lead differs from the envelope lead (the canonical- + // synthesis contract has actually broken). + const envLead = brief?.envelope?.data?.digest?.lead ?? ''; + const winnerVariant = brief?.chosenVariant ?? ''; + const winnerMatch = winnerVariant === (rule.variant ?? 'full'); + const channelsEqual = briefLead === envLead; + const publicLead = brief?.envelope?.data?.digest?.publicLead ?? ''; + console.log( + `[digest] brief lead parity user=${rule.userId} ` + + `rule=${rule.variant ?? 'full'}:${rule.sensitivity ?? 'high'}:${rule.lang ?? 'en'} ` + + `winner_match=${winnerMatch} ` + + `synthesis_level=${synthesisLevel} ` + + `exec_len=${(briefLead ?? '').length} ` + + `brief_lead_len=${envLead.length} ` + + `channels_equal=${channelsEqual} ` + + `public_lead_len=${publicLead.length}`, + ); + if (winnerMatch && !channelsEqual && briefLead && envLead) { + // Sentry alert candidate — winner_match=true means this rule + // composed the envelope, so its channel lead MUST match the + // envelope lead. Mismatch = canonical-synthesis cache drift + // or code regression. Logged loudly so Sentry's console- + // breadcrumb hook surfaces it without an explicit + // captureMessage call. + console.warn( + `[digest] PARITY REGRESSION user=${rule.userId} — winner-rule channel lead != envelope lead. ` + + `Investigate: cache drift between compose pass and send pass?`, + ); + } } } diff --git a/server/_shared/brief-render.js b/server/_shared/brief-render.js index 294ad8b61..4809abd37 100644 --- a/server/_shared/brief-render.js +++ b/server/_shared/brief-render.js @@ -113,7 +113,16 @@ function isFiniteNumber(v) { const ALLOWED_ENVELOPE_KEYS = new Set(['version', 'issuedAt', 'data']); const ALLOWED_DATA_KEYS = new Set(['user', 'issue', 'date', 'dateLong', 'digest', 'stories']); const ALLOWED_USER_KEYS = new Set(['name', 'tz']); -const ALLOWED_DIGEST_KEYS = new Set(['greeting', 'lead', 'numbers', 'threads', 'signals']); +// publicLead / publicSignals / publicThreads: optional v3+ fields. +// Hold non-personalised content the public-share renderer uses in +// place of the personalised lead/signals/threads. v2 envelopes (no +// publicLead) still pass — the validator's optional-key pattern is +// "in the allow list, but isString/array check is skipped when +// undefined" (see validateBriefDigest below). +const ALLOWED_DIGEST_KEYS = new Set([ + 'greeting', 'lead', 'numbers', 'threads', 'signals', + 'publicLead', 'publicSignals', 'publicThreads', +]); const ALLOWED_NUMBERS_KEYS = new Set(['clusters', 'multiSource', 'surfaced']); const ALLOWED_THREAD_KEYS = new Set(['tag', 'teaser']); const ALLOWED_STORY_KEYS = new Set([ @@ -243,6 +252,38 @@ export function assertBriefEnvelope(envelope) { assertNoExtraKeys(digest, ALLOWED_DIGEST_KEYS, 'envelope.data.digest'); if (!isNonEmptyString(digest.greeting)) throw new Error('envelope.data.digest.greeting must be a non-empty string'); if (!isNonEmptyString(digest.lead)) throw new Error('envelope.data.digest.lead must be a non-empty string'); + // publicLead: optional v3+ field. When present, MUST be a non-empty + // string (typed contract enforcement); when absent, the renderer's + // public-mode lead block omits the pull-quote entirely (per the + // "never fall back to personalised lead" rule). + if (digest.publicLead !== undefined && !isNonEmptyString(digest.publicLead)) { + throw new Error('envelope.data.digest.publicLead, when present, must be a non-empty string'); + } + // publicSignals + publicThreads: optional v3+. When present, MUST + // match the signals/threads contracts (array of non-empty strings, + // array of {tag, teaser}). Absent siblings are OK — public render + // path falls back to "omit signals page" / "category-derived + // threads stub" rather than serving the personalised version. + if (digest.publicSignals !== undefined) { + if (!Array.isArray(digest.publicSignals)) { + throw new Error('envelope.data.digest.publicSignals, when present, must be an array'); + } + digest.publicSignals.forEach((s, i) => { + if (!isNonEmptyString(s)) throw new Error(`envelope.data.digest.publicSignals[${i}] must be a non-empty string`); + }); + } + if (digest.publicThreads !== undefined) { + if (!Array.isArray(digest.publicThreads)) { + throw new Error('envelope.data.digest.publicThreads, when present, must be an array'); + } + digest.publicThreads.forEach((t, i) => { + if (!isObject(t)) throw new Error(`envelope.data.digest.publicThreads[${i}] must be an object`); + const th = /** @type {Record} */ (t); + assertNoExtraKeys(th, ALLOWED_THREAD_KEYS, `envelope.data.digest.publicThreads[${i}]`); + if (!isNonEmptyString(th.tag)) throw new Error(`envelope.data.digest.publicThreads[${i}].tag must be a non-empty string`); + if (!isNonEmptyString(th.teaser)) throw new Error(`envelope.data.digest.publicThreads[${i}].teaser must be a non-empty string`); + }); + } if (!isObject(digest.numbers)) throw new Error('envelope.data.digest.numbers is required'); const numbers = /** @type {Record} */ (digest.numbers); @@ -423,13 +464,22 @@ function renderCover({ dateLong, issue, storyCount, pageIndex, totalPages, greet * @param {{ greeting: string; lead: string; dateShort: string; pageIndex: number; totalPages: number }} opts */ function renderDigestGreeting({ greeting, lead, dateShort, pageIndex, totalPages }) { + // Public-share fail-safe: when `lead` is empty, omit the pull-quote + // entirely. Reached via redactForPublic when the envelope lacks a + // non-empty `publicLead` — NEVER serve the personalised lead on the + // public surface. Page still reads as a complete editorial layout + // (greeting + horizontal rule), just without the italic blockquote. + // Codex Round-2 High (security on share-URL surface). + const blockquote = typeof lead === 'string' && lead.length > 0 + ? `
${escapeHtml(lead)}
` + : ''; return ( '
' + digestRunningHead(dateShort, 'Digest / 01') + '
' + '
At The Top Of The Hour
' + `

${escapeHtml(greeting)}

` + - `
${escapeHtml(lead)}
` + + blockquote + '
' + '
' + `
${pad2(pageIndex)} / ${pad2(totalPages)}
` + @@ -1141,17 +1191,57 @@ const NAV_SCRIPT = `