feat(digest): topic-grouped brief ordering (size-first) (#3247)

This commit is contained in:
Elie Habib
2026-04-21 08:58:02 +04:00
committed by GitHub
parent ee93fb475f
commit 4d9ae3b214
4 changed files with 770 additions and 39 deletions

View File

@@ -14,6 +14,10 @@
* location veto; default on
* DIGEST_DEDUP_COSINE_THRESHOLD = float in (0, 1], default 0.60
* DIGEST_DEDUP_WALL_CLOCK_MS = int ms, default 45000
* DIGEST_DEDUP_TOPIC_GROUPING = '0' disables secondary topic
* grouping pass; default on
* DIGEST_DEDUP_TOPIC_THRESHOLD = float in (0, 1], default 0.45
* — looser secondary-pass cosine
*
* Anything non-{embed,jaccard} in MODE = jaccard with a loud warn so
* a typo can't stay hidden.
@@ -53,6 +57,8 @@ import { defaultRedisPipeline } from './_upstash-pipeline.mjs';
* entityVetoEnabled: boolean,
* cosineThreshold: number,
* wallClockMs: number,
* topicGroupingEnabled: boolean,
* topicThreshold: number,
* invalidModeRaw: string | null,
* }}
*/
@@ -65,9 +71,14 @@ export function readOrchestratorConfig(env = process.env) {
} else if (modeRaw === 'jaccard') {
mode = 'jaccard';
} else {
// Unrecognised value — default to embed (the normal prod path)
// but surface so a DIGEST_DEDUP_MODE=embbed typo is obvious.
mode = 'embed';
// Unrecognised value — fall back to the SAFE path (Jaccard), not
// the newer embed path. This matches the file-header contract: a
// typo like `DIGEST_DEDUP_MODE=jacard` while an operator is trying
// to set the kill switch during an embed outage must NOT silently
// keep embed on. The invalidModeRaw warn surfaces the typo so it's
// fixed, but the fail-closed default protects the cron in the
// meantime.
mode = 'jaccard';
invalidModeRaw = modeRaw;
}
@@ -90,12 +101,27 @@ export function readOrchestratorConfig(env = process.env) {
const wallClockMs =
Number.isInteger(wallClockRaw) && wallClockRaw > 0 ? wallClockRaw : 45_000;
// Secondary topic-grouping pass (default on). Kill switch: set to '0'.
// Any non-'0' value (including '', 'yes', '1') is treated as enabled.
const topicGroupingEnabled = env.DIGEST_DEDUP_TOPIC_GROUPING !== '0';
// Looser cosine for the secondary pass (default 0.45). Invalid/out-of-range
// values fall back to the default silently so a Railway typo can't disable
// the feature by accident.
const topicThresholdRaw = Number.parseFloat(env.DIGEST_DEDUP_TOPIC_THRESHOLD ?? '');
const topicThreshold =
Number.isFinite(topicThresholdRaw) && topicThresholdRaw > 0 && topicThresholdRaw <= 1
? topicThresholdRaw
: 0.45;
return {
mode,
clustering,
entityVetoEnabled: env.DIGEST_DEDUP_ENTITY_VETO_ENABLED !== '0',
cosineThreshold,
wallClockMs,
topicGroupingEnabled,
topicThreshold,
invalidModeRaw,
};
}
@@ -116,28 +142,33 @@ function titleHashHex(normalizedTitle) {
* @param {typeof deduplicateStoriesJaccard} [deps.jaccard]
* @param {typeof defaultRedisPipeline} [deps.redisPipeline]
* @param {() => number} [deps.now]
* @param {(line: string) => void} [deps.log]
* @param {(line: string) => void} [deps.warn]
* @returns {Promise<{
* reps: Array<object>,
* embeddingByHash: Map<string, number[]>,
* logSummary: string,
* }>}
*/
export async function deduplicateStories(stories, deps = {}) {
const cfg = readOrchestratorConfig(deps.env ?? process.env);
const jaccard = deps.jaccard ?? deduplicateStoriesJaccard;
const log = deps.log ?? ((line) => console.log(line));
const warn = deps.warn ?? ((line) => console.warn(line));
if (cfg.invalidModeRaw !== null) {
warn(
`[digest] dedup unrecognised DIGEST_DEDUP_MODE=${cfg.invalidModeRaw}` +
'defaulting to embed. Valid values: embed | jaccard.',
'falling back to jaccard (safe rollback path). Valid values: embed | jaccard.',
);
}
if (!Array.isArray(stories) || stories.length === 0) return [];
if (!Array.isArray(stories) || stories.length === 0) {
return { reps: [], embeddingByHash: new Map(), logSummary: '' };
}
// Kill switch: Railway operator sets MODE=jaccard to instantly
// revert to the legacy deduper without a redeploy.
if (cfg.mode === 'jaccard') {
return jaccard(stories);
return { reps: jaccard(stories), embeddingByHash: new Map(), logSummary: '' };
}
const embedImpl = deps.embedBatch ?? embedBatch;
@@ -196,16 +227,34 @@ export async function deduplicateStories(stories, deps = {}) {
});
const embedClusters = clusterResult.clusters;
const embedOutput = embedClusters.map((cluster) =>
materializeCluster(cluster.map((i) => items[i].story)),
);
const embeddingByHash = new Map();
const embedOutput = [];
for (const cluster of embedClusters) {
const rep = materializeCluster(cluster.map((i) => items[i].story));
embedOutput.push(rep);
if (cfg.topicGroupingEnabled) {
// Find the item inside this cluster whose story wins materialize
// (materializeCluster sort key: currentScore DESC, mentionCount DESC
// — ties broken by input order). The winning story's hash matches
// rep.hash; its embedding is the topic-grouping vector for rep.
const winningIdx = cluster.find((i) => items[i].story.hash === rep.hash);
if (winningIdx !== undefined) {
embeddingByHash.set(rep.hash, items[winningIdx].embedding);
} else {
// Defensive: shouldn't fire — materializeCluster always picks a
// hash that's in the cluster. Warn so a future refactor that
// synthesises a new rep doesn't silently skip the sidecar
// (would cause topic grouping to fall through to primary order).
warn(`[digest] dedup sidecar: materialized rep ${rep.hash} not found in its cluster — topic grouping will skip this rep`);
}
}
}
log(
const logSummary =
`[digest] dedup mode=embed clustering=${cfg.clustering} stories=${items.length} clusters=${embedClusters.length} ` +
`veto_fires=${clusterResult.vetoFires} ms=${nowImpl() - started} ` +
`threshold=${cfg.cosineThreshold} fallback=false`,
);
return embedOutput;
`veto_fires=${clusterResult.vetoFires} ms=${nowImpl() - started} ` +
`threshold=${cfg.cosineThreshold} fallback=false`;
return { reps: embedOutput, embeddingByHash, logSummary };
} catch (err) {
const reason =
err instanceof Error && typeof err.name === 'string' && err.name !== 'Error'
@@ -215,6 +264,145 @@ export async function deduplicateStories(stories, deps = {}) {
warn(
`[digest] dedup embed path failed, falling back to Jaccard reason=${reason} msg=${msg}`,
);
return jaccard(stories);
return { reps: jaccard(stories), embeddingByHash: new Map(), logSummary: '' };
}
}
// ── Secondary topic-grouping pass ───────────────────────────────────────
/**
* Pure function. Re-orders already-sliced, already-deduped reps so related
* stories form contiguous blocks, with the dominant thread (by topic size)
* leading. Runs AFTER `deduplicateStories` + score-floor + top-N slice.
*
* No I/O, no logging, no Redis. Caller owns logging. Errors are RETURNED
* not thrown — a throw would otherwise propagate into the caller's outer
* try/catch around `deduplicateStories` and trigger the Jaccard fallback
* for a topic-grouping bug, which is the wrong blast radius.
*
* Sort key: (topicSize DESC, topicMax DESC, repScore DESC, titleHashHex ASC)
* — total, deterministic, stable across input permutations.
*
* @param {Array<{hash:string, title:string, currentScore:number}>} top
* @param {{ topicGroupingEnabled: boolean, topicThreshold: number }} cfg
* @param {Map<string, number[]>} embeddingByHash
* @param {object} [deps]
* @param {typeof singleLinkCluster} [deps.clusterFn] — injected for testing
* @returns {{ reps: Array<object>, topicCount: number, error: Error | null }}
*/
export function groupTopicsPostDedup(top, cfg, embeddingByHash, deps = {}) {
if (!cfg.topicGroupingEnabled || !Array.isArray(top) || top.length <= 1) {
return { reps: Array.isArray(top) ? top : [], topicCount: Array.isArray(top) ? top.length : 0, error: null };
}
const clusterFn = deps.clusterFn ?? singleLinkCluster;
try {
const items = top.map((rep) => ({
title: rep.title,
embedding: embeddingByHash?.get(rep.hash),
}));
if (items.some((it) => !Array.isArray(it.embedding))) {
return {
reps: top,
topicCount: top.length,
error: new Error('topic grouping: missing embedding for at least one rep'),
};
}
const { clusters } = clusterFn(items, {
cosineThreshold: cfg.topicThreshold,
// Topic level: do NOT re-apply the event-level entity veto. At this
// cosine (~0.45) stories sharing the same broader narrative should
// group even when their actor sets diverge (Biden+Xi vs Biden+Putin).
vetoFn: null,
});
// Dense-fill with -1 sentinel so an incomplete clusterFn (a future
// injection that doesn't cover every input index) surfaces as an
// explicit error instead of silently poisoning the phase-1 aggregates
// (topicSize[undefined] / topicMax[undefined] would degrade the sort).
const topicOf = new Array(top.length).fill(-1);
clusters.forEach((members, tIdx) => {
for (const i of members) topicOf[i] = tIdx;
});
for (let i = 0; i < topicOf.length; i++) {
if (topicOf[i] === -1) {
throw new Error(`topic grouping: clusterFn missed index ${i}`);
}
}
const hashOf = top.map((rep) =>
titleHashHex(normalizeForEmbedding(rep.title ?? '')),
);
// Two-phase sort, NOT a single global key. A global key that ties
// on (topicSize, topicMax) falls through to per-rep repScore, which
// interleaves members of same-size-same-max topics (A90,B90,A80,B70
// would sort as [A90,B90,A80,B70] — broken contiguity). Phase 1
// orders the TOPICS; phase 2 orders members inside each topic.
// Phase 1 prep: per-topic aggregates + a TOPIC-level tiebreak hash
// (min member title hash) so cross-topic ties break by topic
// identity, not by an individual rep's hash.
const topicSize = new Array(clusters.length).fill(0);
const topicMax = new Array(clusters.length).fill(-Infinity);
const topicTieHash = new Array(clusters.length).fill(null);
top.forEach((rep, i) => {
const t = topicOf[i];
topicSize[t] += 1;
const s = Number(rep.currentScore ?? 0);
if (s > topicMax[t]) topicMax[t] = s;
if (topicTieHash[t] === null || hashOf[i] < topicTieHash[t]) {
topicTieHash[t] = hashOf[i];
}
});
// Members grouped by topic for phase-2 ordering.
const membersOf = Array.from({ length: clusters.length }, () => []);
for (let i = 0; i < top.length; i++) {
membersOf[topicOf[i]].push(i);
}
// Phase 2: sort members within each topic by (repScore DESC,
// titleHashHex ASC). Deterministic within a topic.
for (const members of membersOf) {
members.sort((a, b) => {
const sA = Number(top[a].currentScore ?? 0);
const sB = Number(top[b].currentScore ?? 0);
if (sA !== sB) return sB - sA;
return hashOf[a] < hashOf[b] ? -1 : hashOf[a] > hashOf[b] ? 1 : 0;
});
}
// Phase 1 sort: order TOPICS by (topicSize DESC, topicMax DESC,
// topicTieHash ASC). The topic-tie hash is a property of the topic
// itself, so two topics with the same (size, max) order stably and
// — critically — do not interleave their members.
const topicOrder = [...Array(clusters.length).keys()].sort((a, b) => {
if (topicSize[a] !== topicSize[b]) return topicSize[b] - topicSize[a];
if (topicMax[a] !== topicMax[b]) return topicMax[b] - topicMax[a];
return topicTieHash[a] < topicTieHash[b] ? -1 : topicTieHash[a] > topicTieHash[b] ? 1 : 0;
});
// Concatenate: for each topic in topicOrder, emit its members in
// their intra-topic order.
const order = [];
for (const t of topicOrder) {
for (const i of membersOf[t]) order.push(i);
}
return {
reps: order.map((i) => top[i]),
topicCount: clusters.length,
error: null,
};
} catch (err) {
return {
reps: top,
topicCount: top.length,
error: err instanceof Error ? err : new Error(String(err)),
};
}
}

View File

@@ -40,7 +40,11 @@ import { issueSlotInTz } from '../shared/brief-filter.js';
import { enrichBriefEnvelopeWithLLM } from './lib/brief-llm.mjs';
import { assertBriefEnvelope } from '../server/_shared/brief-render.js';
import { signBriefUrl, BriefUrlError } from './lib/brief-url-sign.mjs';
import { deduplicateStories } from './lib/brief-dedup.mjs';
import {
deduplicateStories,
groupTopicsPostDedup,
readOrchestratorConfig,
} from './lib/brief-dedup.mjs';
import { stripSourceSuffix } from './lib/brief-dedup-jaccard.mjs';
// ── Config ────────────────────────────────────────────────────────────────────
@@ -289,7 +293,9 @@ async function buildDigest(rule, windowStartMs) {
if (stories.length === 0) return null;
stories.sort((a, b) => b.currentScore - a.currentScore);
const dedupedAll = await deduplicateStories(stories);
const cfg = readOrchestratorConfig(process.env);
const { reps: dedupedAll, embeddingByHash, logSummary } =
await deduplicateStories(stories);
// Apply the absolute-score floor AFTER dedup so the floor runs on
// the representative's score (mentionCount-sum doesn't change the
// score field; the rep is the highest-scoring member of its
@@ -319,7 +325,40 @@ async function buildDigest(rule, windowStartMs) {
}
return null;
}
const top = deduped.slice(0, DIGEST_MAX_ITEMS);
const sliced = deduped.slice(0, DIGEST_MAX_ITEMS);
// Secondary topic-grouping pass: re-orders `sliced` so related stories
// form contiguous blocks. Disabled via DIGEST_DEDUP_TOPIC_GROUPING=0.
// Gate on the sidecar Map being non-empty — this is the precise
// signal for "primary embed path produced vectors". Gating on
// cfg.mode is WRONG: the embed path can run AND fall back to
// Jaccard at runtime (try/catch inside deduplicateStories), leaving
// cfg.mode==='embed' but embeddingByHash empty. The Map size is the
// only ground truth. Kill-switch (mode=jaccard) and runtime fallback
// both produce size=0 → shouldGroupTopics=false → no misleading
// "topic grouping failed: missing embedding" warn.
// Errors from the helper are returned (not thrown) and MUST NOT
// cascade into the outer Jaccard fallback — they just preserve
// primary order.
const shouldGroupTopics = cfg.topicGroupingEnabled && embeddingByHash.size > 0;
const { reps: top, topicCount, error: topicErr } = shouldGroupTopics
? groupTopicsPostDedup(sliced, cfg, embeddingByHash)
: { reps: sliced, topicCount: sliced.length, error: null };
if (topicErr) {
console.warn(
`[digest] topic grouping failed, preserving primary order: ${topicErr.message}`,
);
}
if (logSummary) {
const finalLog =
shouldGroupTopics && !topicErr
? logSummary.replace(
/clusters=(\d+) /,
`clusters=$1 topics=${topicCount} `,
)
: logSummary;
console.log(finalLog);
}
const allSourceCmds = [];
const cmdIndex = [];

View File

@@ -23,7 +23,11 @@
import { describe, it } from 'node:test';
import assert from 'node:assert/strict';
import { deduplicateStories } from '../scripts/lib/brief-dedup.mjs';
import {
deduplicateStories,
groupTopicsPostDedup,
readOrchestratorConfig,
} from '../scripts/lib/brief-dedup.mjs';
import { deduplicateStoriesJaccard } from '../scripts/lib/brief-dedup-jaccard.mjs';
import {
EmbeddingProviderError,
@@ -114,7 +118,7 @@ describe('Scenario 1 — happy path: embed clusters near-duplicates', () => {
story('Iran shuts Strait of Hormuz', 85, 1, 'h1'),
story('Myanmar coup leader elected president', 80, 1, 'h2'),
];
const out = await deduplicateStories(stories, {
const { reps: out, logSummary } = await deduplicateStories(stories, {
env: EMBED_MODE,
embedBatch: embedder.embedBatch,
redisPipeline: noopPipeline,
@@ -133,9 +137,37 @@ describe('Scenario 1 — happy path: embed clusters near-duplicates', () => {
assert.ok(singleton);
assert.equal(singleton.mergedHashes[0], 'h2');
// Structured log line emitted.
assert.ok(collector.lines.some((l) => l.line.includes('mode=embed')));
assert.ok(collector.lines.some((l) => l.line.includes('fallback=false')));
// Structured log line composed in logSummary (caller emits).
assert.match(logSummary, /mode=embed/);
assert.match(logSummary, /fallback=false/);
});
it('runtime Jaccard fallback returns empty embeddingByHash + empty logSummary', async () => {
// Regression guard for the nested-fallback leak: when the embed
// path throws at runtime, deduplicateStories falls back to Jaccard
// but cfg.mode is still 'embed'. The caller's shouldGroupTopics
// gate must rely on embeddingByHash.size > 0 (ground truth) rather
// than cfg.mode === 'embed' (stale signal), else a false
// "topic grouping failed: missing embedding" warn fires on top
// of the legitimate "falling back to Jaccard" warn.
const throwingEmbedder = async () => {
throw new EmbeddingProviderError('forced', { status: 500 });
};
const stories = [
story('Iran closes Strait of Hormuz', 90, 1, 'h0'),
story('Iran shuts Strait of Hormuz', 85, 1, 'h1'),
];
const { reps, embeddingByHash, logSummary } = await deduplicateStories(stories, {
env: EMBED_MODE, // configured mode === 'embed'
embedBatch: throwingEmbedder,
redisPipeline: noopPipeline,
});
assert.ok(reps.length >= 1, 'Jaccard produced reps');
assert.equal(embeddingByHash.size, 0, 'fallback path MUST return empty Map');
assert.equal(logSummary, '', 'fallback path MUST return empty logSummary');
// Caller-side invariant: shouldGroupTopics using Map size (ground
// truth) is false; using cfg.mode would be true (stale) and leak.
assert.equal(embeddingByHash.size > 0, false, 'correct gate: size-based');
});
});
@@ -152,7 +184,7 @@ describe('Scenario 2 — cold-cache timeout collapses to Jaccard', () => {
];
const collector = lineCollector();
const out = await deduplicateStories(stories, {
const { reps: out } = await deduplicateStories(stories, {
env: EMBED_MODE,
embedBatch: throwingEmbedder,
redisPipeline: noopPipeline,
@@ -188,7 +220,7 @@ describe('Scenario 3 — provider outage collapses to Jaccard', () => {
const stories = [story('a', 10, 1, 'a1'), story('b', 10, 1, 'b1')];
const collector = lineCollector();
const out = await deduplicateStories(stories, {
const { reps: out } = await deduplicateStories(stories, {
env: EMBED_MODE,
embedBatch: throwingEmbedder,
redisPipeline: noopPipeline,
@@ -264,7 +296,7 @@ describe('Scenario 5 — entity veto blocks same-location, different-actor merge
]);
const embedder = stubEmbedder(vecByTitle);
const out = await deduplicateStories(stories, {
const { reps: out } = await deduplicateStories(stories, {
env: EMBED_MODE,
embedBatch: embedder.embedBatch,
redisPipeline: noopPipeline,
@@ -284,7 +316,7 @@ describe('Scenario 5 — entity veto blocks same-location, different-actor merge
]);
const embedder = stubEmbedder(vecByTitle);
const out = await deduplicateStories(stories, {
const { reps: out } = await deduplicateStories(stories, {
env: { ...EMBED_MODE, DIGEST_DEDUP_ENTITY_VETO_ENABLED: '0' },
embedBatch: embedder.embedBatch,
redisPipeline: noopPipeline,
@@ -371,7 +403,7 @@ describe('Scenario 7 — cluster-level fixture', () => {
);
const embedder = stubEmbedder(vecByTitle);
const out = await deduplicateStories(stories, {
const { reps: out } = await deduplicateStories(stories, {
env: EMBED_MODE,
embedBatch: embedder.embedBatch,
redisPipeline: noopPipeline,
@@ -427,7 +459,7 @@ describe('Scenario 9 — permutation-invariance', () => {
}
// Baseline run on the canonical input order.
const baseline = await deduplicateStories(stories, {
const { reps: baseline } = await deduplicateStories(stories, {
env: EMBED_MODE,
embedBatch: stubEmbedder(vecByTitle).embedBatch,
redisPipeline: noopPipeline,
@@ -446,7 +478,7 @@ describe('Scenario 9 — permutation-invariance', () => {
const j = Math.floor(rand() * (i + 1));
[shuffled[i], shuffled[j]] = [shuffled[j], shuffled[i]];
}
const out = await deduplicateStories(shuffled, {
const { reps: out } = await deduplicateStories(shuffled, {
env: EMBED_MODE,
embedBatch: stubEmbedder(vecByTitle).embedBatch,
redisPipeline: noopPipeline,
@@ -658,7 +690,7 @@ describe('readOrchestratorConfig — DIGEST_DEDUP_CLUSTERING', () => {
const cfg = readOrchestratorConfig({ DIGEST_DEDUP_CLUSTERING: 'average' });
assert.equal(cfg.clustering, 'single');
});
it('structured log line includes clustering=<algo>', async () => {
it('structured logSummary includes clustering=<algo>', async () => {
const { deduplicateStories } = await import('../scripts/lib/brief-dedup.mjs');
const stories = [story('x', 10, 1, 'x1'), story('y', 10, 1, 'y1')];
const vec = new Map([
@@ -666,14 +698,486 @@ describe('readOrchestratorConfig — DIGEST_DEDUP_CLUSTERING', () => {
[normalizeForEmbedding('y'), [0.99, Math.sqrt(1 - 0.99 * 0.99), 0]],
]);
const { embedBatch } = stubEmbedder(vec);
const lines = [];
await deduplicateStories(stories, {
const { logSummary } = await deduplicateStories(stories, {
env: { DIGEST_DEDUP_MODE: 'embed', DIGEST_DEDUP_COSINE_THRESHOLD: '0.5' },
embedBatch,
redisPipeline: async () => [],
log: (l) => lines.push(l),
});
assert.ok(lines.some((l) => /clustering=(single|complete)/.test(l)), 'log line must mention clustering algorithm');
assert.match(logSummary, /clustering=(single|complete)/, 'logSummary must mention clustering algorithm');
});
});
// ── Topic-grouping post-dedup (secondary pass) ────────────────────────────────
/**
* Build a basis-aligned unit vector for topic `c`. `jitter ∈ [0, 0.1)`
* lets within-topic members share cosine ~0.99+ while staying unit
* length. The jitter is parked in dimension `dim-1`, which no topic or
* singleton basis occupies — this guarantees cross-topic cosine = 0
* regardless of jitter, so the 0.45 secondary threshold has a clean
* separation in either direction.
*/
function basisVec(dim, c, jitter = 0) {
const v = new Array(dim).fill(0);
v[c] = 1 - jitter;
if (jitter > 0) v[dim - 1] = Math.sqrt(1 - (1 - jitter) * (1 - jitter));
return v;
}
function topicRep(title, score, hash) {
return {
title,
currentScore: score,
mentionCount: 1,
sources: [],
severity: 'critical',
hash,
mergedHashes: [hash],
};
}
const DEFAULT_TOPIC_CFG = { topicGroupingEnabled: true, topicThreshold: 0.45 };
describe('groupTopicsPostDedup — size-first total ordering', () => {
it('4-member topic leads 3-member topic leads singletons (size DESC)', () => {
// 12 reps: topic A (basis 0, 4 members, scores 98/92/85/80),
// topic B (basis 1, 3 members, scores 91/90/85),
// 5 singletons (bases 2..6, scores 95/88/70/65/60).
const reps = [];
const emb = new Map();
const dim = 10;
[98, 92, 85, 80].forEach((s, i) => {
const r = topicRep(`A-${i}`, s, `a${i}`);
reps.push(r);
emb.set(r.hash, basisVec(dim, 0, (i + 1) * 0.01));
});
[91, 90, 85].forEach((s, i) => {
const r = topicRep(`B-${i}`, s, `b${i}`);
reps.push(r);
emb.set(r.hash, basisVec(dim, 1, (i + 1) * 0.01));
});
[95, 88, 70, 65, 60].forEach((s, i) => {
const r = topicRep(`S-${i}`, s, `s${i}`);
reps.push(r);
emb.set(r.hash, basisVec(dim, 2 + i, 0));
});
// Feed in score-DESC (the digest's pre-grouping order) and verify
// topic ordering overrides raw score order.
const primaryOrder = reps.slice().sort((a, b) => b.currentScore - a.currentScore);
const { reps: ordered, topicCount, error } = groupTopicsPostDedup(
primaryOrder,
DEFAULT_TOPIC_CFG,
emb,
);
assert.equal(error, null);
// 1 topic (size 4) + 1 topic (size 3) + 5 singletons = 7
assert.equal(topicCount, 7);
// Topic A leads; members in score DESC: 98, 92, 85, 80
assert.deepEqual(
ordered.slice(0, 4).map((r) => r.hash),
['a0', 'a1', 'a2', 'a3'],
);
// Topic B next; members in score DESC: 91, 90, 85
assert.deepEqual(
ordered.slice(4, 7).map((r) => r.hash),
['b0', 'b1', 'b2'],
);
// Singletons by score DESC: 95, 88, 70, 65, 60
assert.deepEqual(
ordered.slice(7).map((r) => r.hash),
['s0', 's1', 's2', 's3', 's4'],
);
// Critically: Louisiana-score-95 singleton comes AFTER Iran-war-max-91
// (topic of 3) — the user's explicit editorial intent.
const louisianaIdx = ordered.findIndex((r) => r.hash === 's0');
const lastTopicBIdx = ordered.findIndex((r) => r.hash === 'b2');
assert.ok(louisianaIdx > lastTopicBIdx, 'single-rep score 95 appears after 3-member topic max 91');
});
it('topicMax breaks ties between same-size topics', () => {
// Two topics, both size 2. Topic X max=80, topic Y max=90 → Y leads.
const reps = [];
const emb = new Map();
const dim = 6;
[80, 70].forEach((s, i) => {
const r = topicRep(`X-${i}`, s, `x${i}`);
reps.push(r);
emb.set(r.hash, basisVec(dim, 0, (i + 1) * 0.01));
});
[90, 60].forEach((s, i) => {
const r = topicRep(`Y-${i}`, s, `y${i}`);
reps.push(r);
emb.set(r.hash, basisVec(dim, 1, (i + 1) * 0.01));
});
const { reps: ordered, error } = groupTopicsPostDedup(reps, DEFAULT_TOPIC_CFG, emb);
assert.equal(error, null);
// Y-topic (max 90) leads X-topic (max 80) despite X having a higher low.
assert.deepEqual(
ordered.map((r) => r.hash),
['y0', 'y1', 'x0', 'x1'],
);
});
it('within a topic, reps are ordered by currentScore DESC', () => {
const reps = [
topicRep('T-low', 70, 't2'),
topicRep('T-hi', 90, 't0'),
topicRep('T-mid', 80, 't1'),
];
const emb = new Map([
['t0', basisVec(4, 0, 0.01)],
['t1', basisVec(4, 0, 0.02)],
['t2', basisVec(4, 0, 0.03)],
]);
const { reps: ordered, error } = groupTopicsPostDedup(reps, DEFAULT_TOPIC_CFG, emb);
assert.equal(error, null);
assert.deepEqual(ordered.map((r) => r.hash), ['t0', 't1', 't2']);
});
// `titleHashHex is the final deterministic tiebreak` test was removed —
// the permutation-invariance test below exercises the same invariant
// against a larger fixture and would catch any tiebreak drift.
it('same-size same-topicMax topics KEEP MEMBERS CONTIGUOUS (regression)', () => {
// Regression guard for the round-2 bug: a global sort key that
// tied on (topicSize, topicMax) fell through to per-rep repScore,
// interleaving A/B members (output was [a0,b0,a1,b1] instead of
// a contiguous block). Two-phase sort fixes this.
//
// Topic A: score 90, 80 (size 2, max 90)
// Topic B: score 90, 70 (size 2, max 90) — same size and max
const reps = [];
const emb = new Map();
const dim = 6;
[90, 80].forEach((s, i) => {
const r = topicRep(`A-${i}`, s, `a${i}`);
reps.push(r);
emb.set(r.hash, basisVec(dim, 0, (i + 1) * 0.01));
});
[90, 70].forEach((s, i) => {
const r = topicRep(`B-${i}`, s, `b${i}`);
reps.push(r);
emb.set(r.hash, basisVec(dim, 1, (i + 1) * 0.01));
});
const { reps: ordered, error } = groupTopicsPostDedup(
reps,
DEFAULT_TOPIC_CFG,
emb,
);
assert.equal(error, null);
// The two A reps must appear as a contiguous pair, and the two B
// reps must appear as a contiguous pair. Which topic leads is
// determined by the deterministic topic-level tiebreak hash, but
// their members MUST NOT interleave.
const hashes = ordered.map((r) => r.hash);
const firstAIdx = hashes.indexOf('a0');
const firstBIdx = hashes.indexOf('b0');
const lastAIdx = Math.max(hashes.indexOf('a0'), hashes.indexOf('a1'));
const lastBIdx = Math.max(hashes.indexOf('b0'), hashes.indexOf('b1'));
const aIdxs = [hashes.indexOf('a0'), hashes.indexOf('a1')].sort((x, y) => x - y);
const bIdxs = [hashes.indexOf('b0'), hashes.indexOf('b1')].sort((x, y) => x - y);
assert.equal(aIdxs[1] - aIdxs[0], 1, `A members must be adjacent; got ${JSON.stringify(hashes)}`);
assert.equal(bIdxs[1] - bIdxs[0], 1, `B members must be adjacent; got ${JSON.stringify(hashes)}`);
// And within each topic, higher score first.
assert.ok(hashes.indexOf('a0') < hashes.indexOf('a1'), 'A-90 precedes A-80');
assert.ok(hashes.indexOf('b0') < hashes.indexOf('b1'), 'B-90 precedes B-70');
void firstAIdx;
void firstBIdx;
void lastAIdx;
void lastBIdx;
});
});
describe('groupTopicsPostDedup — kill switch & edge cases', () => {
it('topicGroupingEnabled=false preserves primary order byte-identical', () => {
const reps = [
topicRep('a', 98, 'a'),
topicRep('b', 95, 'b'),
topicRep('c', 92, 'c'),
];
// Embeddings would normally merge all three into one topic, but kill
// switch must short-circuit before calling the clusterer.
const emb = new Map([
['a', basisVec(4, 0, 0.01)],
['b', basisVec(4, 0, 0.02)],
['c', basisVec(4, 0, 0.03)],
]);
const { reps: ordered, topicCount, error } = groupTopicsPostDedup(
reps,
{ topicGroupingEnabled: false, topicThreshold: 0.45 },
emb,
);
assert.equal(error, null);
assert.equal(topicCount, reps.length);
assert.deepEqual(ordered, reps, 'output === input reference when disabled');
});
it('empty input returns {reps: [], topicCount: 0, error: null}', () => {
const { reps, topicCount, error } = groupTopicsPostDedup([], DEFAULT_TOPIC_CFG, new Map());
assert.deepEqual(reps, []);
assert.equal(topicCount, 0);
assert.equal(error, null);
});
it('single-rep input passes through with topicCount=1', () => {
const only = [topicRep('solo', 99, 'solo')];
const { reps: out, topicCount, error } = groupTopicsPostDedup(
only,
DEFAULT_TOPIC_CFG,
new Map([['solo', basisVec(4, 0)]]),
);
assert.equal(error, null);
assert.equal(topicCount, 1);
assert.deepEqual(out, only);
});
});
describe('groupTopicsPostDedup — permutation invariance', () => {
it('15 reps in 5 topics of 3 produce identical ordering across 5 shuffles', () => {
const N_TOPICS = 5;
const PER = 3;
const dim = N_TOPICS + 1; // +1 free dimension for jitter
const reps = [];
const emb = new Map();
for (let c = 0; c < N_TOPICS; c++) {
for (let k = 0; k < PER; k++) {
const score = 100 - (c * PER + k);
const r = topicRep(`c${c}-k${k}`, score, `c${c}k${k}`);
reps.push(r);
emb.set(r.hash, basisVec(dim, c, 0.001 * (k + 1)));
}
}
const sigFor = (arr) => arr.map((r) => r.hash).join('|');
const baseline = groupTopicsPostDedup(reps.slice(), DEFAULT_TOPIC_CFG, emb);
assert.equal(baseline.error, null);
const baselineSig = sigFor(baseline.reps);
let seed = 7;
const rand = () => {
seed = (seed * 1103515245 + 12345) & 0x7fffffff;
return seed / 0x7fffffff;
};
for (let r = 0; r < 5; r++) {
const shuffled = reps.slice();
for (let i = shuffled.length - 1; i > 0; i--) {
const j = Math.floor(rand() * (i + 1));
[shuffled[i], shuffled[j]] = [shuffled[j], shuffled[i]];
}
const run = groupTopicsPostDedup(shuffled, DEFAULT_TOPIC_CFG, emb);
assert.equal(run.error, null);
assert.equal(
sigFor(run.reps),
baselineSig,
`shuffle ${r} produced a different ordering`,
);
}
});
});
describe('groupTopicsPostDedup — error boundary (nested fallback)', () => {
it('injected clusterFn that throws returns error, primary order preserved, no re-throw', () => {
const reps = [
topicRep('a', 90, 'a'),
topicRep('b', 80, 'b'),
topicRep('c', 70, 'c'),
];
const emb = new Map([
['a', basisVec(4, 0)],
['b', basisVec(4, 1)],
['c', basisVec(4, 2)],
]);
const boom = () => {
throw new Error('boom');
};
let threw = false;
let result;
try {
result = groupTopicsPostDedup(reps, DEFAULT_TOPIC_CFG, emb, { clusterFn: boom });
} catch (_err) {
threw = true;
}
assert.equal(threw, false, 'helper must NOT re-throw — it returns the error');
assert.ok(result.error instanceof Error);
assert.equal(result.error.message, 'boom');
assert.equal(result.topicCount, reps.length);
assert.deepEqual(result.reps, reps, 'primary order preserved on failure');
});
it('missing embedding for any rep returns primary order + descriptive error', () => {
const reps = [
topicRep('a', 90, 'a'),
topicRep('b', 80, 'b'),
];
const emb = new Map([['a', basisVec(4, 0)]]);
const { reps: out, error } = groupTopicsPostDedup(reps, DEFAULT_TOPIC_CFG, emb);
assert.ok(error instanceof Error);
assert.match(error.message, /missing embedding/);
assert.deepEqual(out, reps);
});
});
describe('deduplicateStories — embeddingByHash keys match materialized rep', () => {
it('winning rep is items[1] (higher mentionCount) — sidecar key is that hash', async () => {
// Primary cluster of two items at the SAME score; items[1] has a
// higher mentionCount so materializeCluster picks it as rep.
// Sidecar embeddingByHash must be keyed by the rep's hash.
const loser = story('Iran shuts Hormuz', 80, 1, 'loser');
const winner = story('Iran closes Strait of Hormuz', 80, 5, 'winner');
const vec = new Map([
[normalizeForEmbedding(loser.title), [1, 0, 0]],
[normalizeForEmbedding(winner.title), [0.95, Math.sqrt(1 - 0.95 * 0.95), 0]],
]);
const embedder = stubEmbedder(vec);
const { reps, embeddingByHash } = await deduplicateStories([loser, winner], {
env: {
...EMBED_MODE,
DIGEST_DEDUP_TOPIC_GROUPING: '1',
DIGEST_DEDUP_ENTITY_VETO_ENABLED: '0', // let cosine merge w/o veto
},
embedBatch: embedder.embedBatch,
redisPipeline: noopPipeline,
});
assert.equal(reps.length, 1, 'one merged cluster');
const rep = reps[0];
// Sort key for materializeCluster is (currentScore DESC, mentionCount DESC)
// → `winner` (mentionCount 5) wins over `loser` (mentionCount 1).
assert.equal(rep.hash, 'winner');
assert.ok(embeddingByHash.has('winner'), 'sidecar keyed by rep.hash, not loser hash');
assert.ok(!embeddingByHash.has('loser'), 'non-rep items never appear in sidecar');
});
});
describe('brief envelope cleanliness — no internal fields leak', () => {
it('composeBriefFromDigestStories output never serializes embedding / __ fields', async () => {
const { composeBriefFromDigestStories } = await import('../scripts/lib/brief-compose.mjs');
// Run the full flow: dedup → topic-group → compose.
const stories = [
story('Iran closes Strait of Hormuz', 92, 1, 'h0'),
story('Iran shuts Strait of Hormuz', 88, 1, 'h1'),
story('Myanmar coup leader elected', 80, 1, 'h2'),
];
const vec = new Map([
[normalizeForEmbedding(stories[0].title), [1, 0, 0]],
[normalizeForEmbedding(stories[1].title), [0.95, Math.sqrt(1 - 0.95 * 0.95), 0]],
[normalizeForEmbedding(stories[2].title), [0, 0, 1]],
]);
const embedder = stubEmbedder(vec);
const { reps, embeddingByHash } = await deduplicateStories(stories, {
env: { ...EMBED_MODE, DIGEST_DEDUP_TOPIC_GROUPING: '1' },
embedBatch: embedder.embedBatch,
redisPipeline: noopPipeline,
});
const cfg = readOrchestratorConfig({ ...EMBED_MODE, DIGEST_DEDUP_TOPIC_GROUPING: '1' });
const { reps: top } = groupTopicsPostDedup(reps, cfg, embeddingByHash);
const rule = {
userId: 'user_test',
sensitivity: 'all',
digestTimezone: 'UTC',
};
const envelope = composeBriefFromDigestStories(rule, top, {}, { nowMs: 1_700_000_000_000 });
const blob = JSON.stringify(envelope ?? {});
assert.ok(!blob.includes('"_embedding"'), 'no _embedding key');
assert.ok(!blob.includes('"__'), 'no __-prefixed key');
assert.ok(!blob.includes('embeddingByHash'), 'no embeddingByHash leakage');
});
});
describe('groupTopicsPostDedup — runs on sliced input, not pre-slice', () => {
it('reflects slice(0, 30) input size in topicCount', () => {
// 50 distinct singletons; slice to 30; each at an orthogonal basis so
// topic grouping produces one topic per rep = 30 topics.
const reps = [];
const emb = new Map();
const dim = 35;
for (let i = 0; i < 50; i++) {
const r = topicRep(`s-${i}`, 100 - i, `h${i}`);
reps.push(r);
emb.set(r.hash, basisVec(dim, i % (dim - 1)));
}
const sliced = reps.slice(0, 30);
const { reps: out, topicCount, error } = groupTopicsPostDedup(sliced, DEFAULT_TOPIC_CFG, emb);
assert.equal(error, null);
assert.equal(out.length, 30);
assert.ok(topicCount <= 30);
});
});
describe('readOrchestratorConfig — DIGEST_DEDUP_MODE typo falls back to Jaccard', () => {
it('an unrecognised mode value (typo) resolves to jaccard, not embed', async () => {
const { readOrchestratorConfig } = await import('../scripts/lib/brief-dedup.mjs');
// Classic operator scenario: panicking during an embed outage, types
// the kill switch as `jacard`. The SAFE default is jaccard, not embed.
const cfg = readOrchestratorConfig({ DIGEST_DEDUP_MODE: 'jacard' });
assert.equal(cfg.mode, 'jaccard');
assert.equal(cfg.invalidModeRaw, 'jacard');
});
it('any garbage value also falls back to jaccard', async () => {
const { readOrchestratorConfig } = await import('../scripts/lib/brief-dedup.mjs');
for (const raw of ['xyz', 'EMBED_ENABLED', '1', 'true']) {
const cfg = readOrchestratorConfig({ DIGEST_DEDUP_MODE: raw });
assert.equal(cfg.mode, 'jaccard', `raw=${JSON.stringify(raw)}`);
assert.equal(cfg.invalidModeRaw, raw.toLowerCase());
}
});
it('unset / empty value still resolves to the embed default (normal prod path)', async () => {
const { readOrchestratorConfig } = await import('../scripts/lib/brief-dedup.mjs');
for (const raw of [undefined, '']) {
const cfg = readOrchestratorConfig({ DIGEST_DEDUP_MODE: raw });
assert.equal(cfg.mode, 'embed');
assert.equal(cfg.invalidModeRaw, null);
}
});
});
describe('readOrchestratorConfig — topic-grouping env parsing', () => {
it('defaults: topicGroupingEnabled=true, topicThreshold=0.45', () => {
const cfg = readOrchestratorConfig({});
assert.equal(cfg.topicGroupingEnabled, true);
assert.equal(cfg.topicThreshold, 0.45);
});
it('DIGEST_DEDUP_TOPIC_GROUPING=0 disables', () => {
const cfg = readOrchestratorConfig({ DIGEST_DEDUP_TOPIC_GROUPING: '0' });
assert.equal(cfg.topicGroupingEnabled, false);
});
it('any non-"0" DIGEST_DEDUP_TOPIC_GROUPING value is treated as enabled', () => {
// Default-on kill-switch pattern: "yes", "1", "true", "" all enable.
for (const v of ['yes', '1', 'true', '', 'on']) {
const cfg = readOrchestratorConfig({ DIGEST_DEDUP_TOPIC_GROUPING: v });
assert.equal(cfg.topicGroupingEnabled, true, `value=${JSON.stringify(v)} should enable`);
}
});
it('DIGEST_DEDUP_TOPIC_THRESHOLD=foo (invalid) falls back to 0.45', () => {
const cfg = readOrchestratorConfig({ DIGEST_DEDUP_TOPIC_THRESHOLD: 'foo' });
assert.equal(cfg.topicThreshold, 0.45);
});
it('DIGEST_DEDUP_TOPIC_THRESHOLD=1.5 (out of range) falls back to 0.45', () => {
const cfg = readOrchestratorConfig({ DIGEST_DEDUP_TOPIC_THRESHOLD: '1.5' });
assert.equal(cfg.topicThreshold, 0.45);
});
it('DIGEST_DEDUP_TOPIC_THRESHOLD=0 (boundary, invalid) falls back to 0.45', () => {
const cfg = readOrchestratorConfig({ DIGEST_DEDUP_TOPIC_THRESHOLD: '0' });
assert.equal(cfg.topicThreshold, 0.45);
});
it('DIGEST_DEDUP_TOPIC_THRESHOLD=0.55 (valid) is honoured', () => {
const cfg = readOrchestratorConfig({ DIGEST_DEDUP_TOPIC_THRESHOLD: '0.55' });
assert.equal(cfg.topicThreshold, 0.55);
});
});

View File

@@ -212,7 +212,7 @@ describe('brief-dedup orchestrator — jaccard kill switch', () => {
story('Iran shuts Strait of Hormuz - Reuters', 85, 1, 'h2'),
story('Myanmar coup leader elected president', 80, 1, 'h3'),
];
const out = await deduplicateStories(stories, {
const { reps: out } = await deduplicateStories(stories, {
env: { DIGEST_DEDUP_MODE: 'jaccard' },
embedBatch: stubEmbed,
});
@@ -233,7 +233,7 @@ describe('brief-dedup orchestrator — jaccard kill switch', () => {
jaccardCalls++;
return deduplicateStoriesJaccard(s);
};
const out = await deduplicateStories([], {
const { reps: out } = await deduplicateStories([], {
env: { DIGEST_DEDUP_MODE: 'jaccard' },
jaccard: stubJaccard,
});