diff --git a/scripts/lib/brief-dedup.mjs b/scripts/lib/brief-dedup.mjs index 946814d40..bbc765554 100644 --- a/scripts/lib/brief-dedup.mjs +++ b/scripts/lib/brief-dedup.mjs @@ -14,6 +14,10 @@ * location veto; default on * DIGEST_DEDUP_COSINE_THRESHOLD = float in (0, 1], default 0.60 * DIGEST_DEDUP_WALL_CLOCK_MS = int ms, default 45000 + * DIGEST_DEDUP_TOPIC_GROUPING = '0' disables secondary topic + * grouping pass; default on + * DIGEST_DEDUP_TOPIC_THRESHOLD = float in (0, 1], default 0.45 + * — looser secondary-pass cosine * * Anything non-{embed,jaccard} in MODE = jaccard with a loud warn so * a typo can't stay hidden. @@ -53,6 +57,8 @@ import { defaultRedisPipeline } from './_upstash-pipeline.mjs'; * entityVetoEnabled: boolean, * cosineThreshold: number, * wallClockMs: number, + * topicGroupingEnabled: boolean, + * topicThreshold: number, * invalidModeRaw: string | null, * }} */ @@ -65,9 +71,14 @@ export function readOrchestratorConfig(env = process.env) { } else if (modeRaw === 'jaccard') { mode = 'jaccard'; } else { - // Unrecognised value — default to embed (the normal prod path) - // but surface so a DIGEST_DEDUP_MODE=embbed typo is obvious. - mode = 'embed'; + // Unrecognised value — fall back to the SAFE path (Jaccard), not + // the newer embed path. This matches the file-header contract: a + // typo like `DIGEST_DEDUP_MODE=jacard` while an operator is trying + // to set the kill switch during an embed outage must NOT silently + // keep embed on. The invalidModeRaw warn surfaces the typo so it's + // fixed, but the fail-closed default protects the cron in the + // meantime. + mode = 'jaccard'; invalidModeRaw = modeRaw; } @@ -90,12 +101,27 @@ export function readOrchestratorConfig(env = process.env) { const wallClockMs = Number.isInteger(wallClockRaw) && wallClockRaw > 0 ? wallClockRaw : 45_000; + // Secondary topic-grouping pass (default on). Kill switch: set to '0'. + // Any non-'0' value (including '', 'yes', '1') is treated as enabled. + const topicGroupingEnabled = env.DIGEST_DEDUP_TOPIC_GROUPING !== '0'; + + // Looser cosine for the secondary pass (default 0.45). Invalid/out-of-range + // values fall back to the default silently so a Railway typo can't disable + // the feature by accident. + const topicThresholdRaw = Number.parseFloat(env.DIGEST_DEDUP_TOPIC_THRESHOLD ?? ''); + const topicThreshold = + Number.isFinite(topicThresholdRaw) && topicThresholdRaw > 0 && topicThresholdRaw <= 1 + ? topicThresholdRaw + : 0.45; + return { mode, clustering, entityVetoEnabled: env.DIGEST_DEDUP_ENTITY_VETO_ENABLED !== '0', cosineThreshold, wallClockMs, + topicGroupingEnabled, + topicThreshold, invalidModeRaw, }; } @@ -116,28 +142,33 @@ function titleHashHex(normalizedTitle) { * @param {typeof deduplicateStoriesJaccard} [deps.jaccard] * @param {typeof defaultRedisPipeline} [deps.redisPipeline] * @param {() => number} [deps.now] - * @param {(line: string) => void} [deps.log] * @param {(line: string) => void} [deps.warn] + * @returns {Promise<{ + * reps: Array, + * embeddingByHash: Map, + * logSummary: string, + * }>} */ export async function deduplicateStories(stories, deps = {}) { const cfg = readOrchestratorConfig(deps.env ?? process.env); const jaccard = deps.jaccard ?? deduplicateStoriesJaccard; - const log = deps.log ?? ((line) => console.log(line)); const warn = deps.warn ?? ((line) => console.warn(line)); if (cfg.invalidModeRaw !== null) { warn( `[digest] dedup unrecognised DIGEST_DEDUP_MODE=${cfg.invalidModeRaw} — ` + - 'defaulting to embed. Valid values: embed | jaccard.', + 'falling back to jaccard (safe rollback path). Valid values: embed | jaccard.', ); } - if (!Array.isArray(stories) || stories.length === 0) return []; + if (!Array.isArray(stories) || stories.length === 0) { + return { reps: [], embeddingByHash: new Map(), logSummary: '' }; + } // Kill switch: Railway operator sets MODE=jaccard to instantly // revert to the legacy deduper without a redeploy. if (cfg.mode === 'jaccard') { - return jaccard(stories); + return { reps: jaccard(stories), embeddingByHash: new Map(), logSummary: '' }; } const embedImpl = deps.embedBatch ?? embedBatch; @@ -196,16 +227,34 @@ export async function deduplicateStories(stories, deps = {}) { }); const embedClusters = clusterResult.clusters; - const embedOutput = embedClusters.map((cluster) => - materializeCluster(cluster.map((i) => items[i].story)), - ); + const embeddingByHash = new Map(); + const embedOutput = []; + for (const cluster of embedClusters) { + const rep = materializeCluster(cluster.map((i) => items[i].story)); + embedOutput.push(rep); + if (cfg.topicGroupingEnabled) { + // Find the item inside this cluster whose story wins materialize + // (materializeCluster sort key: currentScore DESC, mentionCount DESC + // — ties broken by input order). The winning story's hash matches + // rep.hash; its embedding is the topic-grouping vector for rep. + const winningIdx = cluster.find((i) => items[i].story.hash === rep.hash); + if (winningIdx !== undefined) { + embeddingByHash.set(rep.hash, items[winningIdx].embedding); + } else { + // Defensive: shouldn't fire — materializeCluster always picks a + // hash that's in the cluster. Warn so a future refactor that + // synthesises a new rep doesn't silently skip the sidecar + // (would cause topic grouping to fall through to primary order). + warn(`[digest] dedup sidecar: materialized rep ${rep.hash} not found in its cluster — topic grouping will skip this rep`); + } + } + } - log( + const logSummary = `[digest] dedup mode=embed clustering=${cfg.clustering} stories=${items.length} clusters=${embedClusters.length} ` + - `veto_fires=${clusterResult.vetoFires} ms=${nowImpl() - started} ` + - `threshold=${cfg.cosineThreshold} fallback=false`, - ); - return embedOutput; + `veto_fires=${clusterResult.vetoFires} ms=${nowImpl() - started} ` + + `threshold=${cfg.cosineThreshold} fallback=false`; + return { reps: embedOutput, embeddingByHash, logSummary }; } catch (err) { const reason = err instanceof Error && typeof err.name === 'string' && err.name !== 'Error' @@ -215,6 +264,145 @@ export async function deduplicateStories(stories, deps = {}) { warn( `[digest] dedup embed path failed, falling back to Jaccard reason=${reason} msg=${msg}`, ); - return jaccard(stories); + return { reps: jaccard(stories), embeddingByHash: new Map(), logSummary: '' }; + } +} + +// ── Secondary topic-grouping pass ─────────────────────────────────────── + +/** + * Pure function. Re-orders already-sliced, already-deduped reps so related + * stories form contiguous blocks, with the dominant thread (by topic size) + * leading. Runs AFTER `deduplicateStories` + score-floor + top-N slice. + * + * No I/O, no logging, no Redis. Caller owns logging. Errors are RETURNED + * not thrown — a throw would otherwise propagate into the caller's outer + * try/catch around `deduplicateStories` and trigger the Jaccard fallback + * for a topic-grouping bug, which is the wrong blast radius. + * + * Sort key: (topicSize DESC, topicMax DESC, repScore DESC, titleHashHex ASC) + * — total, deterministic, stable across input permutations. + * + * @param {Array<{hash:string, title:string, currentScore:number}>} top + * @param {{ topicGroupingEnabled: boolean, topicThreshold: number }} cfg + * @param {Map} embeddingByHash + * @param {object} [deps] + * @param {typeof singleLinkCluster} [deps.clusterFn] — injected for testing + * @returns {{ reps: Array, topicCount: number, error: Error | null }} + */ +export function groupTopicsPostDedup(top, cfg, embeddingByHash, deps = {}) { + if (!cfg.topicGroupingEnabled || !Array.isArray(top) || top.length <= 1) { + return { reps: Array.isArray(top) ? top : [], topicCount: Array.isArray(top) ? top.length : 0, error: null }; + } + + const clusterFn = deps.clusterFn ?? singleLinkCluster; + + try { + const items = top.map((rep) => ({ + title: rep.title, + embedding: embeddingByHash?.get(rep.hash), + })); + + if (items.some((it) => !Array.isArray(it.embedding))) { + return { + reps: top, + topicCount: top.length, + error: new Error('topic grouping: missing embedding for at least one rep'), + }; + } + + const { clusters } = clusterFn(items, { + cosineThreshold: cfg.topicThreshold, + // Topic level: do NOT re-apply the event-level entity veto. At this + // cosine (~0.45) stories sharing the same broader narrative should + // group even when their actor sets diverge (Biden+Xi vs Biden+Putin). + vetoFn: null, + }); + + // Dense-fill with -1 sentinel so an incomplete clusterFn (a future + // injection that doesn't cover every input index) surfaces as an + // explicit error instead of silently poisoning the phase-1 aggregates + // (topicSize[undefined] / topicMax[undefined] would degrade the sort). + const topicOf = new Array(top.length).fill(-1); + clusters.forEach((members, tIdx) => { + for (const i of members) topicOf[i] = tIdx; + }); + for (let i = 0; i < topicOf.length; i++) { + if (topicOf[i] === -1) { + throw new Error(`topic grouping: clusterFn missed index ${i}`); + } + } + + const hashOf = top.map((rep) => + titleHashHex(normalizeForEmbedding(rep.title ?? '')), + ); + + // Two-phase sort, NOT a single global key. A global key that ties + // on (topicSize, topicMax) falls through to per-rep repScore, which + // interleaves members of same-size-same-max topics (A90,B90,A80,B70 + // would sort as [A90,B90,A80,B70] — broken contiguity). Phase 1 + // orders the TOPICS; phase 2 orders members inside each topic. + + // Phase 1 prep: per-topic aggregates + a TOPIC-level tiebreak hash + // (min member title hash) so cross-topic ties break by topic + // identity, not by an individual rep's hash. + const topicSize = new Array(clusters.length).fill(0); + const topicMax = new Array(clusters.length).fill(-Infinity); + const topicTieHash = new Array(clusters.length).fill(null); + top.forEach((rep, i) => { + const t = topicOf[i]; + topicSize[t] += 1; + const s = Number(rep.currentScore ?? 0); + if (s > topicMax[t]) topicMax[t] = s; + if (topicTieHash[t] === null || hashOf[i] < topicTieHash[t]) { + topicTieHash[t] = hashOf[i]; + } + }); + + // Members grouped by topic for phase-2 ordering. + const membersOf = Array.from({ length: clusters.length }, () => []); + for (let i = 0; i < top.length; i++) { + membersOf[topicOf[i]].push(i); + } + + // Phase 2: sort members within each topic by (repScore DESC, + // titleHashHex ASC). Deterministic within a topic. + for (const members of membersOf) { + members.sort((a, b) => { + const sA = Number(top[a].currentScore ?? 0); + const sB = Number(top[b].currentScore ?? 0); + if (sA !== sB) return sB - sA; + return hashOf[a] < hashOf[b] ? -1 : hashOf[a] > hashOf[b] ? 1 : 0; + }); + } + + // Phase 1 sort: order TOPICS by (topicSize DESC, topicMax DESC, + // topicTieHash ASC). The topic-tie hash is a property of the topic + // itself, so two topics with the same (size, max) order stably and + // — critically — do not interleave their members. + const topicOrder = [...Array(clusters.length).keys()].sort((a, b) => { + if (topicSize[a] !== topicSize[b]) return topicSize[b] - topicSize[a]; + if (topicMax[a] !== topicMax[b]) return topicMax[b] - topicMax[a]; + return topicTieHash[a] < topicTieHash[b] ? -1 : topicTieHash[a] > topicTieHash[b] ? 1 : 0; + }); + + // Concatenate: for each topic in topicOrder, emit its members in + // their intra-topic order. + const order = []; + for (const t of topicOrder) { + for (const i of membersOf[t]) order.push(i); + } + + return { + reps: order.map((i) => top[i]), + topicCount: clusters.length, + error: null, + }; + } catch (err) { + return { + reps: top, + topicCount: top.length, + error: err instanceof Error ? err : new Error(String(err)), + }; } } diff --git a/scripts/seed-digest-notifications.mjs b/scripts/seed-digest-notifications.mjs index 21909a52f..884ac4f8e 100644 --- a/scripts/seed-digest-notifications.mjs +++ b/scripts/seed-digest-notifications.mjs @@ -40,7 +40,11 @@ import { issueSlotInTz } from '../shared/brief-filter.js'; import { enrichBriefEnvelopeWithLLM } from './lib/brief-llm.mjs'; import { assertBriefEnvelope } from '../server/_shared/brief-render.js'; import { signBriefUrl, BriefUrlError } from './lib/brief-url-sign.mjs'; -import { deduplicateStories } from './lib/brief-dedup.mjs'; +import { + deduplicateStories, + groupTopicsPostDedup, + readOrchestratorConfig, +} from './lib/brief-dedup.mjs'; import { stripSourceSuffix } from './lib/brief-dedup-jaccard.mjs'; // ── Config ──────────────────────────────────────────────────────────────────── @@ -289,7 +293,9 @@ async function buildDigest(rule, windowStartMs) { if (stories.length === 0) return null; stories.sort((a, b) => b.currentScore - a.currentScore); - const dedupedAll = await deduplicateStories(stories); + const cfg = readOrchestratorConfig(process.env); + const { reps: dedupedAll, embeddingByHash, logSummary } = + await deduplicateStories(stories); // Apply the absolute-score floor AFTER dedup so the floor runs on // the representative's score (mentionCount-sum doesn't change the // score field; the rep is the highest-scoring member of its @@ -319,7 +325,40 @@ async function buildDigest(rule, windowStartMs) { } return null; } - const top = deduped.slice(0, DIGEST_MAX_ITEMS); + const sliced = deduped.slice(0, DIGEST_MAX_ITEMS); + + // Secondary topic-grouping pass: re-orders `sliced` so related stories + // form contiguous blocks. Disabled via DIGEST_DEDUP_TOPIC_GROUPING=0. + // Gate on the sidecar Map being non-empty — this is the precise + // signal for "primary embed path produced vectors". Gating on + // cfg.mode is WRONG: the embed path can run AND fall back to + // Jaccard at runtime (try/catch inside deduplicateStories), leaving + // cfg.mode==='embed' but embeddingByHash empty. The Map size is the + // only ground truth. Kill-switch (mode=jaccard) and runtime fallback + // both produce size=0 → shouldGroupTopics=false → no misleading + // "topic grouping failed: missing embedding" warn. + // Errors from the helper are returned (not thrown) and MUST NOT + // cascade into the outer Jaccard fallback — they just preserve + // primary order. + const shouldGroupTopics = cfg.topicGroupingEnabled && embeddingByHash.size > 0; + const { reps: top, topicCount, error: topicErr } = shouldGroupTopics + ? groupTopicsPostDedup(sliced, cfg, embeddingByHash) + : { reps: sliced, topicCount: sliced.length, error: null }; + if (topicErr) { + console.warn( + `[digest] topic grouping failed, preserving primary order: ${topicErr.message}`, + ); + } + if (logSummary) { + const finalLog = + shouldGroupTopics && !topicErr + ? logSummary.replace( + /clusters=(\d+) /, + `clusters=$1 topics=${topicCount} `, + ) + : logSummary; + console.log(finalLog); + } const allSourceCmds = []; const cmdIndex = []; diff --git a/tests/brief-dedup-embedding.test.mjs b/tests/brief-dedup-embedding.test.mjs index 462759f4b..fc65bb6f3 100644 --- a/tests/brief-dedup-embedding.test.mjs +++ b/tests/brief-dedup-embedding.test.mjs @@ -23,7 +23,11 @@ import { describe, it } from 'node:test'; import assert from 'node:assert/strict'; -import { deduplicateStories } from '../scripts/lib/brief-dedup.mjs'; +import { + deduplicateStories, + groupTopicsPostDedup, + readOrchestratorConfig, +} from '../scripts/lib/brief-dedup.mjs'; import { deduplicateStoriesJaccard } from '../scripts/lib/brief-dedup-jaccard.mjs'; import { EmbeddingProviderError, @@ -114,7 +118,7 @@ describe('Scenario 1 — happy path: embed clusters near-duplicates', () => { story('Iran shuts Strait of Hormuz', 85, 1, 'h1'), story('Myanmar coup leader elected president', 80, 1, 'h2'), ]; - const out = await deduplicateStories(stories, { + const { reps: out, logSummary } = await deduplicateStories(stories, { env: EMBED_MODE, embedBatch: embedder.embedBatch, redisPipeline: noopPipeline, @@ -133,9 +137,37 @@ describe('Scenario 1 — happy path: embed clusters near-duplicates', () => { assert.ok(singleton); assert.equal(singleton.mergedHashes[0], 'h2'); - // Structured log line emitted. - assert.ok(collector.lines.some((l) => l.line.includes('mode=embed'))); - assert.ok(collector.lines.some((l) => l.line.includes('fallback=false'))); + // Structured log line composed in logSummary (caller emits). + assert.match(logSummary, /mode=embed/); + assert.match(logSummary, /fallback=false/); + }); + + it('runtime Jaccard fallback returns empty embeddingByHash + empty logSummary', async () => { + // Regression guard for the nested-fallback leak: when the embed + // path throws at runtime, deduplicateStories falls back to Jaccard + // but cfg.mode is still 'embed'. The caller's shouldGroupTopics + // gate must rely on embeddingByHash.size > 0 (ground truth) rather + // than cfg.mode === 'embed' (stale signal), else a false + // "topic grouping failed: missing embedding" warn fires on top + // of the legitimate "falling back to Jaccard" warn. + const throwingEmbedder = async () => { + throw new EmbeddingProviderError('forced', { status: 500 }); + }; + const stories = [ + story('Iran closes Strait of Hormuz', 90, 1, 'h0'), + story('Iran shuts Strait of Hormuz', 85, 1, 'h1'), + ]; + const { reps, embeddingByHash, logSummary } = await deduplicateStories(stories, { + env: EMBED_MODE, // configured mode === 'embed' + embedBatch: throwingEmbedder, + redisPipeline: noopPipeline, + }); + assert.ok(reps.length >= 1, 'Jaccard produced reps'); + assert.equal(embeddingByHash.size, 0, 'fallback path MUST return empty Map'); + assert.equal(logSummary, '', 'fallback path MUST return empty logSummary'); + // Caller-side invariant: shouldGroupTopics using Map size (ground + // truth) is false; using cfg.mode would be true (stale) and leak. + assert.equal(embeddingByHash.size > 0, false, 'correct gate: size-based'); }); }); @@ -152,7 +184,7 @@ describe('Scenario 2 — cold-cache timeout collapses to Jaccard', () => { ]; const collector = lineCollector(); - const out = await deduplicateStories(stories, { + const { reps: out } = await deduplicateStories(stories, { env: EMBED_MODE, embedBatch: throwingEmbedder, redisPipeline: noopPipeline, @@ -188,7 +220,7 @@ describe('Scenario 3 — provider outage collapses to Jaccard', () => { const stories = [story('a', 10, 1, 'a1'), story('b', 10, 1, 'b1')]; const collector = lineCollector(); - const out = await deduplicateStories(stories, { + const { reps: out } = await deduplicateStories(stories, { env: EMBED_MODE, embedBatch: throwingEmbedder, redisPipeline: noopPipeline, @@ -264,7 +296,7 @@ describe('Scenario 5 — entity veto blocks same-location, different-actor merge ]); const embedder = stubEmbedder(vecByTitle); - const out = await deduplicateStories(stories, { + const { reps: out } = await deduplicateStories(stories, { env: EMBED_MODE, embedBatch: embedder.embedBatch, redisPipeline: noopPipeline, @@ -284,7 +316,7 @@ describe('Scenario 5 — entity veto blocks same-location, different-actor merge ]); const embedder = stubEmbedder(vecByTitle); - const out = await deduplicateStories(stories, { + const { reps: out } = await deduplicateStories(stories, { env: { ...EMBED_MODE, DIGEST_DEDUP_ENTITY_VETO_ENABLED: '0' }, embedBatch: embedder.embedBatch, redisPipeline: noopPipeline, @@ -371,7 +403,7 @@ describe('Scenario 7 — cluster-level fixture', () => { ); const embedder = stubEmbedder(vecByTitle); - const out = await deduplicateStories(stories, { + const { reps: out } = await deduplicateStories(stories, { env: EMBED_MODE, embedBatch: embedder.embedBatch, redisPipeline: noopPipeline, @@ -427,7 +459,7 @@ describe('Scenario 9 — permutation-invariance', () => { } // Baseline run on the canonical input order. - const baseline = await deduplicateStories(stories, { + const { reps: baseline } = await deduplicateStories(stories, { env: EMBED_MODE, embedBatch: stubEmbedder(vecByTitle).embedBatch, redisPipeline: noopPipeline, @@ -446,7 +478,7 @@ describe('Scenario 9 — permutation-invariance', () => { const j = Math.floor(rand() * (i + 1)); [shuffled[i], shuffled[j]] = [shuffled[j], shuffled[i]]; } - const out = await deduplicateStories(shuffled, { + const { reps: out } = await deduplicateStories(shuffled, { env: EMBED_MODE, embedBatch: stubEmbedder(vecByTitle).embedBatch, redisPipeline: noopPipeline, @@ -658,7 +690,7 @@ describe('readOrchestratorConfig — DIGEST_DEDUP_CLUSTERING', () => { const cfg = readOrchestratorConfig({ DIGEST_DEDUP_CLUSTERING: 'average' }); assert.equal(cfg.clustering, 'single'); }); - it('structured log line includes clustering=', async () => { + it('structured logSummary includes clustering=', async () => { const { deduplicateStories } = await import('../scripts/lib/brief-dedup.mjs'); const stories = [story('x', 10, 1, 'x1'), story('y', 10, 1, 'y1')]; const vec = new Map([ @@ -666,14 +698,486 @@ describe('readOrchestratorConfig — DIGEST_DEDUP_CLUSTERING', () => { [normalizeForEmbedding('y'), [0.99, Math.sqrt(1 - 0.99 * 0.99), 0]], ]); const { embedBatch } = stubEmbedder(vec); - const lines = []; - await deduplicateStories(stories, { + const { logSummary } = await deduplicateStories(stories, { env: { DIGEST_DEDUP_MODE: 'embed', DIGEST_DEDUP_COSINE_THRESHOLD: '0.5' }, embedBatch, redisPipeline: async () => [], - log: (l) => lines.push(l), }); - assert.ok(lines.some((l) => /clustering=(single|complete)/.test(l)), 'log line must mention clustering algorithm'); + assert.match(logSummary, /clustering=(single|complete)/, 'logSummary must mention clustering algorithm'); + }); +}); + +// ── Topic-grouping post-dedup (secondary pass) ──────────────────────────────── + +/** + * Build a basis-aligned unit vector for topic `c`. `jitter ∈ [0, 0.1)` + * lets within-topic members share cosine ~0.99+ while staying unit + * length. The jitter is parked in dimension `dim-1`, which no topic or + * singleton basis occupies — this guarantees cross-topic cosine = 0 + * regardless of jitter, so the 0.45 secondary threshold has a clean + * separation in either direction. + */ +function basisVec(dim, c, jitter = 0) { + const v = new Array(dim).fill(0); + v[c] = 1 - jitter; + if (jitter > 0) v[dim - 1] = Math.sqrt(1 - (1 - jitter) * (1 - jitter)); + return v; +} + +function topicRep(title, score, hash) { + return { + title, + currentScore: score, + mentionCount: 1, + sources: [], + severity: 'critical', + hash, + mergedHashes: [hash], + }; +} + +const DEFAULT_TOPIC_CFG = { topicGroupingEnabled: true, topicThreshold: 0.45 }; + +describe('groupTopicsPostDedup — size-first total ordering', () => { + it('4-member topic leads 3-member topic leads singletons (size DESC)', () => { + // 12 reps: topic A (basis 0, 4 members, scores 98/92/85/80), + // topic B (basis 1, 3 members, scores 91/90/85), + // 5 singletons (bases 2..6, scores 95/88/70/65/60). + const reps = []; + const emb = new Map(); + const dim = 10; + [98, 92, 85, 80].forEach((s, i) => { + const r = topicRep(`A-${i}`, s, `a${i}`); + reps.push(r); + emb.set(r.hash, basisVec(dim, 0, (i + 1) * 0.01)); + }); + [91, 90, 85].forEach((s, i) => { + const r = topicRep(`B-${i}`, s, `b${i}`); + reps.push(r); + emb.set(r.hash, basisVec(dim, 1, (i + 1) * 0.01)); + }); + [95, 88, 70, 65, 60].forEach((s, i) => { + const r = topicRep(`S-${i}`, s, `s${i}`); + reps.push(r); + emb.set(r.hash, basisVec(dim, 2 + i, 0)); + }); + + // Feed in score-DESC (the digest's pre-grouping order) and verify + // topic ordering overrides raw score order. + const primaryOrder = reps.slice().sort((a, b) => b.currentScore - a.currentScore); + const { reps: ordered, topicCount, error } = groupTopicsPostDedup( + primaryOrder, + DEFAULT_TOPIC_CFG, + emb, + ); + assert.equal(error, null); + // 1 topic (size 4) + 1 topic (size 3) + 5 singletons = 7 + assert.equal(topicCount, 7); + // Topic A leads; members in score DESC: 98, 92, 85, 80 + assert.deepEqual( + ordered.slice(0, 4).map((r) => r.hash), + ['a0', 'a1', 'a2', 'a3'], + ); + // Topic B next; members in score DESC: 91, 90, 85 + assert.deepEqual( + ordered.slice(4, 7).map((r) => r.hash), + ['b0', 'b1', 'b2'], + ); + // Singletons by score DESC: 95, 88, 70, 65, 60 + assert.deepEqual( + ordered.slice(7).map((r) => r.hash), + ['s0', 's1', 's2', 's3', 's4'], + ); + // Critically: Louisiana-score-95 singleton comes AFTER Iran-war-max-91 + // (topic of 3) — the user's explicit editorial intent. + const louisianaIdx = ordered.findIndex((r) => r.hash === 's0'); + const lastTopicBIdx = ordered.findIndex((r) => r.hash === 'b2'); + assert.ok(louisianaIdx > lastTopicBIdx, 'single-rep score 95 appears after 3-member topic max 91'); + }); + + it('topicMax breaks ties between same-size topics', () => { + // Two topics, both size 2. Topic X max=80, topic Y max=90 → Y leads. + const reps = []; + const emb = new Map(); + const dim = 6; + [80, 70].forEach((s, i) => { + const r = topicRep(`X-${i}`, s, `x${i}`); + reps.push(r); + emb.set(r.hash, basisVec(dim, 0, (i + 1) * 0.01)); + }); + [90, 60].forEach((s, i) => { + const r = topicRep(`Y-${i}`, s, `y${i}`); + reps.push(r); + emb.set(r.hash, basisVec(dim, 1, (i + 1) * 0.01)); + }); + + const { reps: ordered, error } = groupTopicsPostDedup(reps, DEFAULT_TOPIC_CFG, emb); + assert.equal(error, null); + // Y-topic (max 90) leads X-topic (max 80) despite X having a higher low. + assert.deepEqual( + ordered.map((r) => r.hash), + ['y0', 'y1', 'x0', 'x1'], + ); + }); + + it('within a topic, reps are ordered by currentScore DESC', () => { + const reps = [ + topicRep('T-low', 70, 't2'), + topicRep('T-hi', 90, 't0'), + topicRep('T-mid', 80, 't1'), + ]; + const emb = new Map([ + ['t0', basisVec(4, 0, 0.01)], + ['t1', basisVec(4, 0, 0.02)], + ['t2', basisVec(4, 0, 0.03)], + ]); + const { reps: ordered, error } = groupTopicsPostDedup(reps, DEFAULT_TOPIC_CFG, emb); + assert.equal(error, null); + assert.deepEqual(ordered.map((r) => r.hash), ['t0', 't1', 't2']); + }); + + // `titleHashHex is the final deterministic tiebreak` test was removed — + // the permutation-invariance test below exercises the same invariant + // against a larger fixture and would catch any tiebreak drift. + + it('same-size same-topicMax topics KEEP MEMBERS CONTIGUOUS (regression)', () => { + // Regression guard for the round-2 bug: a global sort key that + // tied on (topicSize, topicMax) fell through to per-rep repScore, + // interleaving A/B members (output was [a0,b0,a1,b1] instead of + // a contiguous block). Two-phase sort fixes this. + // + // Topic A: score 90, 80 (size 2, max 90) + // Topic B: score 90, 70 (size 2, max 90) — same size and max + const reps = []; + const emb = new Map(); + const dim = 6; + [90, 80].forEach((s, i) => { + const r = topicRep(`A-${i}`, s, `a${i}`); + reps.push(r); + emb.set(r.hash, basisVec(dim, 0, (i + 1) * 0.01)); + }); + [90, 70].forEach((s, i) => { + const r = topicRep(`B-${i}`, s, `b${i}`); + reps.push(r); + emb.set(r.hash, basisVec(dim, 1, (i + 1) * 0.01)); + }); + + const { reps: ordered, error } = groupTopicsPostDedup( + reps, + DEFAULT_TOPIC_CFG, + emb, + ); + assert.equal(error, null); + + // The two A reps must appear as a contiguous pair, and the two B + // reps must appear as a contiguous pair. Which topic leads is + // determined by the deterministic topic-level tiebreak hash, but + // their members MUST NOT interleave. + const hashes = ordered.map((r) => r.hash); + const firstAIdx = hashes.indexOf('a0'); + const firstBIdx = hashes.indexOf('b0'); + const lastAIdx = Math.max(hashes.indexOf('a0'), hashes.indexOf('a1')); + const lastBIdx = Math.max(hashes.indexOf('b0'), hashes.indexOf('b1')); + const aIdxs = [hashes.indexOf('a0'), hashes.indexOf('a1')].sort((x, y) => x - y); + const bIdxs = [hashes.indexOf('b0'), hashes.indexOf('b1')].sort((x, y) => x - y); + assert.equal(aIdxs[1] - aIdxs[0], 1, `A members must be adjacent; got ${JSON.stringify(hashes)}`); + assert.equal(bIdxs[1] - bIdxs[0], 1, `B members must be adjacent; got ${JSON.stringify(hashes)}`); + // And within each topic, higher score first. + assert.ok(hashes.indexOf('a0') < hashes.indexOf('a1'), 'A-90 precedes A-80'); + assert.ok(hashes.indexOf('b0') < hashes.indexOf('b1'), 'B-90 precedes B-70'); + void firstAIdx; + void firstBIdx; + void lastAIdx; + void lastBIdx; + }); +}); + +describe('groupTopicsPostDedup — kill switch & edge cases', () => { + it('topicGroupingEnabled=false preserves primary order byte-identical', () => { + const reps = [ + topicRep('a', 98, 'a'), + topicRep('b', 95, 'b'), + topicRep('c', 92, 'c'), + ]; + // Embeddings would normally merge all three into one topic, but kill + // switch must short-circuit before calling the clusterer. + const emb = new Map([ + ['a', basisVec(4, 0, 0.01)], + ['b', basisVec(4, 0, 0.02)], + ['c', basisVec(4, 0, 0.03)], + ]); + const { reps: ordered, topicCount, error } = groupTopicsPostDedup( + reps, + { topicGroupingEnabled: false, topicThreshold: 0.45 }, + emb, + ); + assert.equal(error, null); + assert.equal(topicCount, reps.length); + assert.deepEqual(ordered, reps, 'output === input reference when disabled'); + }); + + it('empty input returns {reps: [], topicCount: 0, error: null}', () => { + const { reps, topicCount, error } = groupTopicsPostDedup([], DEFAULT_TOPIC_CFG, new Map()); + assert.deepEqual(reps, []); + assert.equal(topicCount, 0); + assert.equal(error, null); + }); + + it('single-rep input passes through with topicCount=1', () => { + const only = [topicRep('solo', 99, 'solo')]; + const { reps: out, topicCount, error } = groupTopicsPostDedup( + only, + DEFAULT_TOPIC_CFG, + new Map([['solo', basisVec(4, 0)]]), + ); + assert.equal(error, null); + assert.equal(topicCount, 1); + assert.deepEqual(out, only); + }); +}); + +describe('groupTopicsPostDedup — permutation invariance', () => { + it('15 reps in 5 topics of 3 produce identical ordering across 5 shuffles', () => { + const N_TOPICS = 5; + const PER = 3; + const dim = N_TOPICS + 1; // +1 free dimension for jitter + const reps = []; + const emb = new Map(); + for (let c = 0; c < N_TOPICS; c++) { + for (let k = 0; k < PER; k++) { + const score = 100 - (c * PER + k); + const r = topicRep(`c${c}-k${k}`, score, `c${c}k${k}`); + reps.push(r); + emb.set(r.hash, basisVec(dim, c, 0.001 * (k + 1))); + } + } + + const sigFor = (arr) => arr.map((r) => r.hash).join('|'); + const baseline = groupTopicsPostDedup(reps.slice(), DEFAULT_TOPIC_CFG, emb); + assert.equal(baseline.error, null); + const baselineSig = sigFor(baseline.reps); + + let seed = 7; + const rand = () => { + seed = (seed * 1103515245 + 12345) & 0x7fffffff; + return seed / 0x7fffffff; + }; + for (let r = 0; r < 5; r++) { + const shuffled = reps.slice(); + for (let i = shuffled.length - 1; i > 0; i--) { + const j = Math.floor(rand() * (i + 1)); + [shuffled[i], shuffled[j]] = [shuffled[j], shuffled[i]]; + } + const run = groupTopicsPostDedup(shuffled, DEFAULT_TOPIC_CFG, emb); + assert.equal(run.error, null); + assert.equal( + sigFor(run.reps), + baselineSig, + `shuffle ${r} produced a different ordering`, + ); + } + }); +}); + +describe('groupTopicsPostDedup — error boundary (nested fallback)', () => { + it('injected clusterFn that throws returns error, primary order preserved, no re-throw', () => { + const reps = [ + topicRep('a', 90, 'a'), + topicRep('b', 80, 'b'), + topicRep('c', 70, 'c'), + ]; + const emb = new Map([ + ['a', basisVec(4, 0)], + ['b', basisVec(4, 1)], + ['c', basisVec(4, 2)], + ]); + const boom = () => { + throw new Error('boom'); + }; + + let threw = false; + let result; + try { + result = groupTopicsPostDedup(reps, DEFAULT_TOPIC_CFG, emb, { clusterFn: boom }); + } catch (_err) { + threw = true; + } + assert.equal(threw, false, 'helper must NOT re-throw — it returns the error'); + assert.ok(result.error instanceof Error); + assert.equal(result.error.message, 'boom'); + assert.equal(result.topicCount, reps.length); + assert.deepEqual(result.reps, reps, 'primary order preserved on failure'); + }); + + it('missing embedding for any rep returns primary order + descriptive error', () => { + const reps = [ + topicRep('a', 90, 'a'), + topicRep('b', 80, 'b'), + ]; + const emb = new Map([['a', basisVec(4, 0)]]); + const { reps: out, error } = groupTopicsPostDedup(reps, DEFAULT_TOPIC_CFG, emb); + assert.ok(error instanceof Error); + assert.match(error.message, /missing embedding/); + assert.deepEqual(out, reps); + }); +}); + +describe('deduplicateStories — embeddingByHash keys match materialized rep', () => { + it('winning rep is items[1] (higher mentionCount) — sidecar key is that hash', async () => { + // Primary cluster of two items at the SAME score; items[1] has a + // higher mentionCount so materializeCluster picks it as rep. + // Sidecar embeddingByHash must be keyed by the rep's hash. + const loser = story('Iran shuts Hormuz', 80, 1, 'loser'); + const winner = story('Iran closes Strait of Hormuz', 80, 5, 'winner'); + const vec = new Map([ + [normalizeForEmbedding(loser.title), [1, 0, 0]], + [normalizeForEmbedding(winner.title), [0.95, Math.sqrt(1 - 0.95 * 0.95), 0]], + ]); + const embedder = stubEmbedder(vec); + + const { reps, embeddingByHash } = await deduplicateStories([loser, winner], { + env: { + ...EMBED_MODE, + DIGEST_DEDUP_TOPIC_GROUPING: '1', + DIGEST_DEDUP_ENTITY_VETO_ENABLED: '0', // let cosine merge w/o veto + }, + embedBatch: embedder.embedBatch, + redisPipeline: noopPipeline, + }); + assert.equal(reps.length, 1, 'one merged cluster'); + const rep = reps[0]; + // Sort key for materializeCluster is (currentScore DESC, mentionCount DESC) + // → `winner` (mentionCount 5) wins over `loser` (mentionCount 1). + assert.equal(rep.hash, 'winner'); + assert.ok(embeddingByHash.has('winner'), 'sidecar keyed by rep.hash, not loser hash'); + assert.ok(!embeddingByHash.has('loser'), 'non-rep items never appear in sidecar'); + }); +}); + +describe('brief envelope cleanliness — no internal fields leak', () => { + it('composeBriefFromDigestStories output never serializes embedding / __ fields', async () => { + const { composeBriefFromDigestStories } = await import('../scripts/lib/brief-compose.mjs'); + + // Run the full flow: dedup → topic-group → compose. + const stories = [ + story('Iran closes Strait of Hormuz', 92, 1, 'h0'), + story('Iran shuts Strait of Hormuz', 88, 1, 'h1'), + story('Myanmar coup leader elected', 80, 1, 'h2'), + ]; + const vec = new Map([ + [normalizeForEmbedding(stories[0].title), [1, 0, 0]], + [normalizeForEmbedding(stories[1].title), [0.95, Math.sqrt(1 - 0.95 * 0.95), 0]], + [normalizeForEmbedding(stories[2].title), [0, 0, 1]], + ]); + const embedder = stubEmbedder(vec); + const { reps, embeddingByHash } = await deduplicateStories(stories, { + env: { ...EMBED_MODE, DIGEST_DEDUP_TOPIC_GROUPING: '1' }, + embedBatch: embedder.embedBatch, + redisPipeline: noopPipeline, + }); + const cfg = readOrchestratorConfig({ ...EMBED_MODE, DIGEST_DEDUP_TOPIC_GROUPING: '1' }); + const { reps: top } = groupTopicsPostDedup(reps, cfg, embeddingByHash); + + const rule = { + userId: 'user_test', + sensitivity: 'all', + digestTimezone: 'UTC', + }; + const envelope = composeBriefFromDigestStories(rule, top, {}, { nowMs: 1_700_000_000_000 }); + const blob = JSON.stringify(envelope ?? {}); + assert.ok(!blob.includes('"_embedding"'), 'no _embedding key'); + assert.ok(!blob.includes('"__'), 'no __-prefixed key'); + assert.ok(!blob.includes('embeddingByHash'), 'no embeddingByHash leakage'); + }); +}); + +describe('groupTopicsPostDedup — runs on sliced input, not pre-slice', () => { + it('reflects slice(0, 30) input size in topicCount', () => { + // 50 distinct singletons; slice to 30; each at an orthogonal basis so + // topic grouping produces one topic per rep = 30 topics. + const reps = []; + const emb = new Map(); + const dim = 35; + for (let i = 0; i < 50; i++) { + const r = topicRep(`s-${i}`, 100 - i, `h${i}`); + reps.push(r); + emb.set(r.hash, basisVec(dim, i % (dim - 1))); + } + const sliced = reps.slice(0, 30); + const { reps: out, topicCount, error } = groupTopicsPostDedup(sliced, DEFAULT_TOPIC_CFG, emb); + assert.equal(error, null); + assert.equal(out.length, 30); + assert.ok(topicCount <= 30); + }); +}); + +describe('readOrchestratorConfig — DIGEST_DEDUP_MODE typo falls back to Jaccard', () => { + it('an unrecognised mode value (typo) resolves to jaccard, not embed', async () => { + const { readOrchestratorConfig } = await import('../scripts/lib/brief-dedup.mjs'); + // Classic operator scenario: panicking during an embed outage, types + // the kill switch as `jacard`. The SAFE default is jaccard, not embed. + const cfg = readOrchestratorConfig({ DIGEST_DEDUP_MODE: 'jacard' }); + assert.equal(cfg.mode, 'jaccard'); + assert.equal(cfg.invalidModeRaw, 'jacard'); + }); + + it('any garbage value also falls back to jaccard', async () => { + const { readOrchestratorConfig } = await import('../scripts/lib/brief-dedup.mjs'); + for (const raw of ['xyz', 'EMBED_ENABLED', '1', 'true']) { + const cfg = readOrchestratorConfig({ DIGEST_DEDUP_MODE: raw }); + assert.equal(cfg.mode, 'jaccard', `raw=${JSON.stringify(raw)}`); + assert.equal(cfg.invalidModeRaw, raw.toLowerCase()); + } + }); + + it('unset / empty value still resolves to the embed default (normal prod path)', async () => { + const { readOrchestratorConfig } = await import('../scripts/lib/brief-dedup.mjs'); + for (const raw of [undefined, '']) { + const cfg = readOrchestratorConfig({ DIGEST_DEDUP_MODE: raw }); + assert.equal(cfg.mode, 'embed'); + assert.equal(cfg.invalidModeRaw, null); + } + }); +}); + +describe('readOrchestratorConfig — topic-grouping env parsing', () => { + it('defaults: topicGroupingEnabled=true, topicThreshold=0.45', () => { + const cfg = readOrchestratorConfig({}); + assert.equal(cfg.topicGroupingEnabled, true); + assert.equal(cfg.topicThreshold, 0.45); + }); + + it('DIGEST_DEDUP_TOPIC_GROUPING=0 disables', () => { + const cfg = readOrchestratorConfig({ DIGEST_DEDUP_TOPIC_GROUPING: '0' }); + assert.equal(cfg.topicGroupingEnabled, false); + }); + + it('any non-"0" DIGEST_DEDUP_TOPIC_GROUPING value is treated as enabled', () => { + // Default-on kill-switch pattern: "yes", "1", "true", "" all enable. + for (const v of ['yes', '1', 'true', '', 'on']) { + const cfg = readOrchestratorConfig({ DIGEST_DEDUP_TOPIC_GROUPING: v }); + assert.equal(cfg.topicGroupingEnabled, true, `value=${JSON.stringify(v)} should enable`); + } + }); + + it('DIGEST_DEDUP_TOPIC_THRESHOLD=foo (invalid) falls back to 0.45', () => { + const cfg = readOrchestratorConfig({ DIGEST_DEDUP_TOPIC_THRESHOLD: 'foo' }); + assert.equal(cfg.topicThreshold, 0.45); + }); + + it('DIGEST_DEDUP_TOPIC_THRESHOLD=1.5 (out of range) falls back to 0.45', () => { + const cfg = readOrchestratorConfig({ DIGEST_DEDUP_TOPIC_THRESHOLD: '1.5' }); + assert.equal(cfg.topicThreshold, 0.45); + }); + + it('DIGEST_DEDUP_TOPIC_THRESHOLD=0 (boundary, invalid) falls back to 0.45', () => { + const cfg = readOrchestratorConfig({ DIGEST_DEDUP_TOPIC_THRESHOLD: '0' }); + assert.equal(cfg.topicThreshold, 0.45); + }); + + it('DIGEST_DEDUP_TOPIC_THRESHOLD=0.55 (valid) is honoured', () => { + const cfg = readOrchestratorConfig({ DIGEST_DEDUP_TOPIC_THRESHOLD: '0.55' }); + assert.equal(cfg.topicThreshold, 0.55); }); }); diff --git a/tests/brief-dedup-jaccard.test.mjs b/tests/brief-dedup-jaccard.test.mjs index 1f5644d36..2e3d568fa 100644 --- a/tests/brief-dedup-jaccard.test.mjs +++ b/tests/brief-dedup-jaccard.test.mjs @@ -212,7 +212,7 @@ describe('brief-dedup orchestrator — jaccard kill switch', () => { story('Iran shuts Strait of Hormuz - Reuters', 85, 1, 'h2'), story('Myanmar coup leader elected president', 80, 1, 'h3'), ]; - const out = await deduplicateStories(stories, { + const { reps: out } = await deduplicateStories(stories, { env: { DIGEST_DEDUP_MODE: 'jaccard' }, embedBatch: stubEmbed, }); @@ -233,7 +233,7 @@ describe('brief-dedup orchestrator — jaccard kill switch', () => { jaccardCalls++; return deduplicateStoriesJaccard(s); }; - const out = await deduplicateStories([], { + const { reps: out } = await deduplicateStories([], { env: { DIGEST_DEDUP_MODE: 'jaccard' }, jaccard: stubJaccard, });