diff --git a/scripts/lib/brief-dedup-replay-log.mjs b/scripts/lib/brief-dedup-replay-log.mjs new file mode 100644 index 000000000..ffe598231 --- /dev/null +++ b/scripts/lib/brief-dedup-replay-log.mjs @@ -0,0 +1,245 @@ +/** + * Replayable per-story input log for brief-dedup calibration. + * + * Problem this solves: we can't validate recall-lift options that shift + * the embedding score distribution (title+slug, LLM-canonicalise, 3-large + * model upgrade, etc.) from a baseline-band pair log alone. We need the + * per-story inputs for every tick so offline replays can re-embed with + * alternative configs and re-score the full pair matrix. + * + * See docs/brainstorms/2026-04-23-001-brief-dedup-recall-gap.md §5 Phase 1. + * + * Contract: + * - Opt-in via DIGEST_DEDUP_REPLAY_LOG=1 (default OFF — zero behaviour + * change on merge). + * - Best-effort: ALL failures are swallowed + warned. Replay-log write + * errors MUST NEVER affect digest delivery. + * - Append-only list in Upstash: one JSON record per story, keyed by + * rule + date so operators can range-query a day's traffic. + * - 30-day TTL (see §5 Phase 1 retention rationale: covers labelling + * cadence + cross-candidate comparison window; cache TTL is not the + * right anchor — replays that change embed config pay a fresh embed + * regardless of cache). + */ + +import { cacheKeyFor, normalizeForEmbedding } from './brief-embedding.mjs'; +import { defaultRedisPipeline } from './_upstash-pipeline.mjs'; + +const KEY_PREFIX = 'digest:replay-log:v1'; +const TTL_SECONDS = 30 * 24 * 60 * 60; // 30 days + +/** + * Env-read at call time so Railway can flip the flag without a redeploy. + * Anything other than literal '1' (including unset, '0', 'yes', 'true', + * mis-cased 'True') is treated as OFF — fail-closed so a typo can't + * silently turn the log on in prod. '1' is the single intentional value. + * + * @param {Record} [env] + */ +export function replayLogEnabled(env = process.env) { + return env.DIGEST_DEDUP_REPLAY_LOG === '1'; +} + +/** + * Build the Upstash list key for a given tick. + * + * Format: digest:replay-log:v1:{ruleId}:{YYYY-MM-DD} + * + * Scoped per-rule so operators can range-query a single rule's day + * without scanning traffic from other digest variants. Date suffix + * (UTC) caps list length to one day's cron ticks — prevents unbounded + * growth of a single key over the 30-day retention window. + * + * Safe-characters gate on ruleId: strip anything not alnum/underscore/ + * hyphen so an exotic rule id can't escape the key namespace. + */ +export function buildReplayLogKey(ruleId, tsMs) { + // Allow ':' so `variant:lang:sensitivity` composite ruleIds stay + // readable as Redis key segments. Strip anything else to '_'; then + // if the whole string collapsed to nothing meaningful — all '_', + // ':', '-', or empty — use 'unknown' so the key namespace stays + // consistent. Stripping ':' / '-' in the emptiness check prevents + // pathological inputs like ':::' producing keys like + // `digest:replay-log:v1::::2026-04-23` that confuse Redis namespace + // tooling (SCAN / KEYS / redis-cli tab completion). + const raw = String(ruleId ?? '').replace(/[^A-Za-z0-9:_-]/g, '_'); + const safeRuleId = raw.replace(/[_:-]/g, '') === '' ? 'unknown' : raw; + const iso = new Date(tsMs).toISOString(); + const dateKey = iso.slice(0, 10); // YYYY-MM-DD + return `${KEY_PREFIX}:${safeRuleId}:${dateKey}`; +} + +/** + * Build one JSON record per story in the dedup input. + * + * `clusterId` is derived from `reps[].mergedHashes` — the authoritative + * cluster-membership contract that materializeCluster already provides + * (brief-dedup-jaccard.mjs:75-85). No change to the orchestrator needed. + * + * `embeddingCacheKey` is computed from normalizeForEmbedding(title). It + * only helps replays that keep the SAME embedding config (model, dims, + * input transform) — replays that change any of those pay fresh embed + * calls regardless. Still worth recording: it's ~60 bytes and makes + * same-config replays cheap. + * + * @param {Array} stories — the input passed to deduplicateStories + * @param {Array} reps — the reps returned by deduplicateStories + * @param {Map} embeddingByHash — sidecar from the embed path + * @param {object} cfg — the full config object from readOrchestratorConfig + * @param {object} tickContext + * @param {string} tickContext.briefTickId + * @param {string} tickContext.ruleId + * @param {number} tickContext.tsMs + * @returns {Array} + */ +export function buildReplayRecords(stories, reps, embeddingByHash, cfg, tickContext) { + // Derive hash → clusterId from rep membership. A rep's mergedHashes + // lists every hash in its cluster including the rep's own; iterate + // reps in output order and use the index as clusterId. + const clusterByHash = new Map(); + if (Array.isArray(reps)) { + reps.forEach((rep, clusterId) => { + const hashes = Array.isArray(rep?.mergedHashes) ? rep.mergedHashes : [rep?.hash]; + for (const h of hashes) { + if (typeof h === 'string' && !clusterByHash.has(h)) { + clusterByHash.set(h, clusterId); + } + } + }); + } + + // `repHashes` is a Set of the winning story's hash per cluster. A + // story is the rep iff its hash === the rep.hash at its clusterId. + const repHashes = new Set(); + if (Array.isArray(reps)) { + for (const rep of reps) { + if (typeof rep?.hash === 'string') repHashes.add(rep.hash); + } + } + + const tickConfig = { + mode: cfg?.mode ?? null, + clustering: cfg?.clustering ?? null, + cosineThreshold: cfg?.cosineThreshold ?? null, + // topicGroupingEnabled gates the post-dedup topic ordering pass in + // seed-digest-notifications. Omitting it makes topic-grouping-off + // ticks indistinguishable from default ticks at replay time, so + // downstream replays can't reconstruct output behaviour for runs + // with DIGEST_DEDUP_TOPIC_GROUPING=0. Serialise explicitly. + topicGroupingEnabled: cfg?.topicGroupingEnabled ?? null, + topicThreshold: cfg?.topicThreshold ?? null, + entityVetoEnabled: cfg?.entityVetoEnabled ?? null, + }; + + const records = []; + stories.forEach((story, originalIndex) => { + const rawTitle = typeof story?.title === 'string' ? story.title : ''; + const normalizedTitle = normalizeForEmbedding(rawTitle); + const cacheKey = rawTitle ? cacheKeyFor(normalizedTitle) : null; + // hasEmbedding is a diagnostic: if the embed path produced a vector + // for this rep, the sidecar has it. Useful in replay to tell apart + // "embed path completed" from "embed path fell back to Jaccard". + const hasEmbedding = + embeddingByHash instanceof Map && embeddingByHash.has(story?.hash); + records.push({ + v: 1, + briefTickId: tickContext.briefTickId, + ruleId: tickContext.ruleId, + tsMs: tickContext.tsMs, + storyHash: story?.hash ?? null, + originalIndex, + isRep: repHashes.has(story?.hash), + clusterId: clusterByHash.has(story?.hash) + ? clusterByHash.get(story?.hash) + : null, + title: rawTitle, + normalizedTitle, + link: typeof story?.link === 'string' ? story.link : null, + severity: story?.severity ?? null, + currentScore: Number(story?.currentScore ?? 0), + mentionCount: Number(story?.mentionCount ?? 1), + phase: story?.phase ?? null, + sources: Array.isArray(story?.sources) ? story.sources : [], + embeddingCacheKey: cacheKey, + hasEmbedding, + // Per-record shallow copy so an in-memory consumer (future + // replay harness, test) that mutates one record's tickConfig + // can't silently affect every other record via shared reference. + // Serialisation goes through JSON.stringify in writeReplayLog so + // storage is unaffected either way; this is purely an in-memory + // footgun fix. + tickConfig: { ...tickConfig }, + }); + }); + return records; +} + +/** + * Write the replay log for one dedup tick. Best-effort: every error is + * caught and warned; the function NEVER throws. + * + * @param {object} args + * @param {Array} args.stories — input to deduplicateStories + * @param {Array} args.reps — output from deduplicateStories + * @param {Map} args.embeddingByHash — sidecar from deduplicateStories + * @param {object} args.cfg — readOrchestratorConfig result + * @param {object} args.tickContext + * @param {string} args.tickContext.briefTickId + * @param {string} args.tickContext.ruleId + * @param {number} args.tickContext.tsMs + * @param {object} [args.deps] + * @param {Record} [args.deps.env] + * @param {typeof defaultRedisPipeline} [args.deps.redisPipeline] + * @param {(line: string) => void} [args.deps.warn] + * @returns {Promise<{ wrote: number, key: string | null, skipped: 'disabled' | 'empty' | null }>} + */ +export async function writeReplayLog(args) { + const { + stories, + reps, + embeddingByHash, + cfg, + tickContext, + deps = {}, + } = args ?? {}; + const env = deps.env ?? process.env; + const warn = deps.warn ?? ((line) => console.warn(line)); + + if (!replayLogEnabled(env)) { + return { wrote: 0, key: null, skipped: 'disabled' }; + } + if (!Array.isArray(stories) || stories.length === 0) { + return { wrote: 0, key: null, skipped: 'empty' }; + } + + try { + const pipelineImpl = deps.redisPipeline ?? defaultRedisPipeline; + const records = buildReplayRecords( + stories, + reps ?? [], + embeddingByHash instanceof Map ? embeddingByHash : new Map(), + cfg ?? {}, + tickContext ?? { briefTickId: 'unknown', ruleId: 'unknown', tsMs: Date.now() }, + ); + if (records.length === 0) { + return { wrote: 0, key: null, skipped: 'empty' }; + } + const key = buildReplayLogKey(tickContext?.ruleId, tickContext?.tsMs ?? Date.now()); + // Single RPUSH with variadic values (one per story) + EXPIRE. Keep + // to two commands so Upstash's pipeline stays cheap even on large + // ticks. Stringify each record individually so downstream readers + // can consume with LRANGE + JSON.parse. + const rpushCmd = ['RPUSH', key, ...records.map((r) => JSON.stringify(r))]; + const expireCmd = ['EXPIRE', key, String(TTL_SECONDS)]; + const result = await pipelineImpl([rpushCmd, expireCmd]); + if (result == null) { + warn(`[digest] replay-log: pipeline returned null (creds missing or upstream down) key=${key}`); + return { wrote: 0, key, skipped: null }; + } + return { wrote: records.length, key, skipped: null }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + warn(`[digest] replay-log: write failed — ${msg}`); + return { wrote: 0, key: null, skipped: null }; + } +} diff --git a/scripts/seed-digest-notifications.mjs b/scripts/seed-digest-notifications.mjs index 8e84d85f0..1950e3606 100644 --- a/scripts/seed-digest-notifications.mjs +++ b/scripts/seed-digest-notifications.mjs @@ -48,6 +48,7 @@ import { readOrchestratorConfig, } from './lib/brief-dedup.mjs'; import { stripSourceSuffix } from './lib/brief-dedup-jaccard.mjs'; +import { writeReplayLog } from './lib/brief-dedup-replay-log.mjs'; // ── Config ──────────────────────────────────────────────────────────────────── @@ -390,8 +391,42 @@ async function buildDigest(rule, windowStartMs) { stories.sort((a, b) => b.currentScore - a.currentScore); const cfg = readOrchestratorConfig(process.env); + // Sample tsMs BEFORE dedup so briefTickId anchors to tick-start, not + // to dedup-completion. Dedup can take a few seconds on cold-cache + // embed calls; we want the replay log's tick id to reflect when the + // tick began processing, which is the natural reading of + // "briefTickId" for downstream readers. + const tsMs = Date.now(); const { reps: dedupedAll, embeddingByHash, logSummary } = await deduplicateStories(stories); + // Replay log (opt-in via DIGEST_DEDUP_REPLAY_LOG=1). Best-effort — any + // failure is swallowed by writeReplayLog. Runs AFTER dedup so the log + // captures the real rep + cluster assignments. RuleId omits userId on + // purpose: dedup input is shared across users of the same (variant, + // lang, sensitivity), and we don't want user identity in log keys. + // See docs/brainstorms/2026-04-23-001-brief-dedup-recall-gap.md §5 Phase 1. + // + // AWAITED on purpose: this script exits via explicit process.exit(1) + // on the brief-compose failure gate (~line 1539) and on main().catch + // (~line 1545). process.exit does NOT drain in-flight promises like + // natural exit does, so a `void` call here would silently drop the + // last N ticks' replay records — exactly the runs where measurement + // fidelity matters most. writeReplayLog has its own internal try/ + // catch + early return when the flag is off, so awaiting is free on + // the disabled path and bounded by the 10s Upstash pipeline timeout + // on the enabled path. + const ruleKey = `${variant}:${lang}:${rule.sensitivity ?? 'high'}`; + await writeReplayLog({ + stories, + reps: dedupedAll, + embeddingByHash, + cfg, + tickContext: { + briefTickId: `${ruleKey}:${tsMs}`, + ruleId: ruleKey, + tsMs, + }, + }); // Apply the absolute-score floor AFTER dedup so the floor runs on // the representative's score (mentionCount-sum doesn't change the // score field; the rep is the highest-scoring member of its diff --git a/tests/brief-dedup-replay-log.test.mjs b/tests/brief-dedup-replay-log.test.mjs new file mode 100644 index 000000000..116ec968a --- /dev/null +++ b/tests/brief-dedup-replay-log.test.mjs @@ -0,0 +1,383 @@ +/** + * Unit tests for the replayable per-story input log. + * + * Covers: + * 1. Flag OFF → no writes, returns skipped=disabled (default behaviour) + * 2. Flag ON + empty stories → no writes + * 3. Flag ON + stories → RPUSH with one record per story + EXPIRE 30d + * 4. Record fields match §5 Phase 1 spec (hash, originalIndex, isRep, + * clusterId, title/normalizedTitle, link, severity/score/mentions, + * phase/sources, embeddingCacheKey, hasEmbedding, tickConfig) + * 5. clusterId derived correctly from rep.mergedHashes (Jaccard + embed + * output shapes both populate mergedHashes — one codepath) + * 6. isRep only set for the materialized winning hash of each cluster + * 7. Pipeline returns null (creds missing) → warn + skipped + * 8. Pipeline throws → caught, warn emitted, no exception propagates + * 9. Key shape: digest:replay-log:v1:{safeRuleId}:{YYYY-MM-DD} + * 10. Only DIGEST_DEDUP_REPLAY_LOG='1' literal enables (case-sensitive, + * no 'yes'/'true'/'True' lenience — fail-closed typo-safety) + * + * Run: node --test tests/brief-dedup-replay-log.test.mjs + */ + +import { describe, it } from 'node:test'; +import assert from 'node:assert/strict'; + +import { + buildReplayLogKey, + buildReplayRecords, + replayLogEnabled, + writeReplayLog, +} from '../scripts/lib/brief-dedup-replay-log.mjs'; +import { cacheKeyFor, normalizeForEmbedding } from '../scripts/lib/brief-embedding.mjs'; + +// ── Fixture helpers ─────────────────────────────────────────────────────────── + +function story(title, { hash, score = 10, mentions = 1, severity = 'high', link = '', phase = 'emerging', sources = [] } = {}) { + return { + hash: hash ?? `h-${title.slice(0, 16).replace(/\W+/g, '-')}`, + title, + link, + severity, + currentScore: score, + mentionCount: mentions, + phase, + sources, + }; +} + +function rep(hash, mergedHashes, extras = {}) { + return { hash, mergedHashes, ...extras }; +} + +function mockPipeline() { + const calls = []; + const impl = async (commands) => { + calls.push(commands); + return commands.map(() => ({ result: 'OK' })); + }; + return { impl, calls }; +} + +function mockWarn() { + const lines = []; + return { impl: (line) => lines.push(line), lines }; +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +describe('replayLogEnabled — env parsing', () => { + it('OFF by default (unset)', () => { + assert.equal(replayLogEnabled({}), false); + }); + + it('OFF on "0", "yes", "true", "True", "1 " (trailing space), empty', () => { + for (const v of ['0', 'yes', 'true', 'True', '1 ', '']) { + assert.equal( + replayLogEnabled({ DIGEST_DEDUP_REPLAY_LOG: v }), + false, + `expected OFF for ${JSON.stringify(v)}`, + ); + } + }); + + it('ON only for literal "1"', () => { + assert.equal(replayLogEnabled({ DIGEST_DEDUP_REPLAY_LOG: '1' }), true); + }); +}); + +describe('buildReplayLogKey — key shape', () => { + it('produces digest:replay-log:v1:{ruleId}:{YYYY-MM-DD} on sample input', () => { + const ts = Date.UTC(2026, 3, 23, 8, 2, 0); // 2026-04-23T08:02:00Z + const key = buildReplayLogKey('full:en:high', ts); + assert.equal(key, 'digest:replay-log:v1:full:en:high:2026-04-23'); + }); + + it('sanitises ruleId to [A-Za-z0-9:_-] (keeps colons for composite ids)', () => { + const ts = Date.UTC(2026, 3, 23); + const key = buildReplayLogKey('full/en::weird chars!', ts); + assert.equal(key, 'digest:replay-log:v1:full_en::weird_chars_:2026-04-23'); + }); + + it('falls back to ruleId="unknown" on null/empty', () => { + const ts = Date.UTC(2026, 3, 23); + assert.equal( + buildReplayLogKey(null, ts), + 'digest:replay-log:v1:unknown:2026-04-23', + ); + // A string of only unsafe chars → sanitised to empty → "unknown" + assert.equal( + buildReplayLogKey('!!!!', ts), + 'digest:replay-log:v1:unknown:2026-04-23', + ); + }); + + it('falls back to ruleId="unknown" on pathological separator-only inputs', () => { + // Regression guard (Greptile P2): a ruleId of pure separators + // (':', '-', '_' or mixtures) has no identifying content — passing + // it through verbatim would produce keys like + // `digest:replay-log:v1::::2026-04-23` that confuse redis-cli + // namespace tooling. The emptiness check strips ':' / '-' / '_' + // before deciding to fall back. + const ts = Date.UTC(2026, 3, 23); + for (const raw of [':::', '---', '___', ':_:', '-_-', '::-:--']) { + assert.equal( + buildReplayLogKey(raw, ts), + 'digest:replay-log:v1:unknown:2026-04-23', + `ruleId=${JSON.stringify(raw)} should fall back to "unknown"`, + ); + } + }); +}); + +describe('buildReplayRecords — record shape', () => { + const s1 = story('Nigeria coup trial opens, six charged', { hash: 'h1' }); + const s2 = story('Alleged Coup: one defendant arrives', { hash: 'h2' }); + const s3 = story('Russia halts Druzhba oil to Germany', { hash: 'h3' }); + const stories = [s1, s2, s3]; + // Pretend dedup merged s1+s2 into cluster-0 (rep=s1), s3 alone in cluster-1. + const reps = [ + rep('h1', ['h1', 'h2'], { currentScore: 10 }), + rep('h3', ['h3'], { currentScore: 8 }), + ]; + const cfg = { + mode: 'embed', + clustering: 'single', + cosineThreshold: 0.6, + topicGroupingEnabled: true, + topicThreshold: 0.45, + entityVetoEnabled: true, + }; + const tickContext = { briefTickId: 'tick-1', ruleId: 'full:en:high', tsMs: 1000 }; + + it('produces one record per input story', () => { + const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext); + assert.equal(records.length, 3); + }); + + it('preserves originalIndex in input order', () => { + const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext); + assert.deepEqual(records.map((r) => r.originalIndex), [0, 1, 2]); + }); + + it('isRep is true exactly for each cluster rep hash', () => { + const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext); + const byHash = new Map(records.map((r) => [r.storyHash, r])); + assert.equal(byHash.get('h1').isRep, true); + assert.equal(byHash.get('h2').isRep, false); + assert.equal(byHash.get('h3').isRep, true); + }); + + it('clusterId derives from rep.mergedHashes (s1+s2 → 0, s3 → 1)', () => { + const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext); + const byHash = new Map(records.map((r) => [r.storyHash, r])); + assert.equal(byHash.get('h1').clusterId, 0); + assert.equal(byHash.get('h2').clusterId, 0); + assert.equal(byHash.get('h3').clusterId, 1); + }); + + it('embeddingCacheKey matches cacheKeyFor(normalizeForEmbedding(title))', () => { + const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext); + assert.equal( + records[0].embeddingCacheKey, + cacheKeyFor(normalizeForEmbedding(s1.title)), + ); + }); + + it('hasEmbedding reflects sidecar membership', () => { + const vec = new Array(512).fill(0.1); + const sidecar = new Map([['h1', vec]]); + const records = buildReplayRecords(stories, reps, sidecar, cfg, tickContext); + const byHash = new Map(records.map((r) => [r.storyHash, r])); + assert.equal(byHash.get('h1').hasEmbedding, true); + assert.equal(byHash.get('h2').hasEmbedding, false); + assert.equal(byHash.get('h3').hasEmbedding, false); + }); + + it('tickConfig snapshot mirrors cfg (all behaviour-defining fields)', () => { + const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext); + assert.deepEqual(records[0].tickConfig, { + mode: 'embed', + clustering: 'single', + cosineThreshold: 0.6, + topicGroupingEnabled: true, + topicThreshold: 0.45, + entityVetoEnabled: true, + }); + }); + + it('tickConfig is a per-record shallow copy (not a shared reference)', () => { + // Regression guard (Greptile P2): mutating one record's tickConfig + // must not affect other records in the same batch. A shared-ref + // implementation had no storage bug (JSON.stringify serialises + // each record independently) but would bite any in-memory + // consumer that mutates for experimentation. + const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext); + assert.ok(records.length >= 2); + assert.notStrictEqual( + records[0].tickConfig, + records[1].tickConfig, + 'records must not share tickConfig by reference', + ); + records[0].tickConfig.mode = 'MUTATED'; + assert.equal(records[1].tickConfig.mode, 'embed', 'mutation must not leak'); + }); + + it('serialises topicGroupingEnabled=false distinctly from default', () => { + // Regression guard: a tick run with DIGEST_DEDUP_TOPIC_GROUPING=0 + // must be replay-distinguishable from a normal tick. Prior to this + // field being captured, both serialised to the same tickConfig and + // downstream replays could not reconstruct the output ordering. + const cfgOff = { ...cfg, topicGroupingEnabled: false }; + const records = buildReplayRecords(stories, reps, new Map(), cfgOff, tickContext); + assert.equal(records[0].tickConfig.topicGroupingEnabled, false); + // And the default-on tick serialises true, not identical: + const recordsOn = buildReplayRecords(stories, reps, new Map(), cfg, tickContext); + assert.notDeepEqual(recordsOn[0].tickConfig, records[0].tickConfig); + }); + + it('tickContext fields copied onto every record', () => { + const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext); + for (const r of records) { + assert.equal(r.briefTickId, 'tick-1'); + assert.equal(r.ruleId, 'full:en:high'); + assert.equal(r.tsMs, 1000); + assert.equal(r.v, 1); + } + }); + + it('handles rep without mergedHashes (falls back to rep.hash alone)', () => { + const repsNoMerged = [{ hash: 'h1' }, { hash: 'h3' }]; + const records = buildReplayRecords( + [story('a', { hash: 'h1' }), story('b', { hash: 'h3' })], + repsNoMerged, + new Map(), + cfg, + tickContext, + ); + assert.equal(records[0].clusterId, 0); + assert.equal(records[1].clusterId, 1); + assert.equal(records[0].isRep, true); + assert.equal(records[1].isRep, true); + }); +}); + +describe('writeReplayLog — behaviour', () => { + const baseArgs = () => { + const s1 = story('Nigeria coup trial opens', { hash: 'h1', link: 'https://example.com/nigeria-coup' }); + const s2 = story('Alleged Coup: defendant arrives', { hash: 'h2' }); + return { + stories: [s1, s2], + reps: [rep('h1', ['h1', 'h2'])], + embeddingByHash: new Map(), + cfg: { + mode: 'embed', + clustering: 'single', + cosineThreshold: 0.6, + topicThreshold: 0.45, + entityVetoEnabled: true, + }, + tickContext: { + briefTickId: 'tick-1', + ruleId: 'full:en:high', + tsMs: Date.UTC(2026, 3, 23, 8, 2, 0), + }, + }; + }; + + it('flag OFF → skipped=disabled, no pipeline call, no warn', async () => { + const pipe = mockPipeline(); + const warn = mockWarn(); + const res = await writeReplayLog({ + ...baseArgs(), + deps: { env: {}, redisPipeline: pipe.impl, warn: warn.impl }, + }); + assert.deepEqual(res, { wrote: 0, key: null, skipped: 'disabled' }); + assert.equal(pipe.calls.length, 0); + assert.equal(warn.lines.length, 0); + }); + + it('flag ON + empty stories → skipped=empty, no pipeline call', async () => { + const pipe = mockPipeline(); + const warn = mockWarn(); + const res = await writeReplayLog({ + ...baseArgs(), + stories: [], + deps: { env: { DIGEST_DEDUP_REPLAY_LOG: '1' }, redisPipeline: pipe.impl, warn: warn.impl }, + }); + assert.equal(res.skipped, 'empty'); + assert.equal(pipe.calls.length, 0); + }); + + it('flag ON + stories → RPUSH + EXPIRE 30d on correct key', async () => { + const pipe = mockPipeline(); + const warn = mockWarn(); + const res = await writeReplayLog({ + ...baseArgs(), + deps: { env: { DIGEST_DEDUP_REPLAY_LOG: '1' }, redisPipeline: pipe.impl, warn: warn.impl }, + }); + assert.equal(res.wrote, 2); + assert.equal(res.key, 'digest:replay-log:v1:full:en:high:2026-04-23'); + assert.equal(res.skipped, null); + assert.equal(pipe.calls.length, 1); + const [commands] = pipe.calls; + assert.equal(commands.length, 2, 'two commands: RPUSH + EXPIRE'); + const [rpushCmd, expireCmd] = commands; + assert.equal(rpushCmd[0], 'RPUSH'); + assert.equal(rpushCmd[1], 'digest:replay-log:v1:full:en:high:2026-04-23'); + assert.equal(rpushCmd.length, 4, 'RPUSH + key + 2 story records'); + assert.equal(expireCmd[0], 'EXPIRE'); + assert.equal(expireCmd[1], 'digest:replay-log:v1:full:en:high:2026-04-23'); + assert.equal(expireCmd[2], String(30 * 24 * 60 * 60)); + // Each pushed value is a JSON-stringified record. + const rec0 = JSON.parse(rpushCmd[2]); + const rec1 = JSON.parse(rpushCmd[3]); + assert.equal(rec0.storyHash, 'h1'); + assert.equal(rec0.isRep, true); + assert.equal(rec0.link, 'https://example.com/nigeria-coup'); + assert.equal(rec1.storyHash, 'h2'); + assert.equal(rec1.isRep, false); + assert.equal(rec1.clusterId, 0, 'h2 is in the same cluster as h1'); + assert.equal(warn.lines.length, 0); + }); + + it('pipeline returns null → warn + skipped=null + wrote=0', async () => { + const warn = mockWarn(); + const res = await writeReplayLog({ + ...baseArgs(), + deps: { + env: { DIGEST_DEDUP_REPLAY_LOG: '1' }, + redisPipeline: async () => null, + warn: warn.impl, + }, + }); + assert.equal(res.wrote, 0); + assert.notEqual(res.key, null, 'key is reported even on null pipeline (diagnostic)'); + assert.equal(warn.lines.length, 1); + assert.match(warn.lines[0], /replay-log.*pipeline returned null/); + }); + + it('pipeline throws → caught, warn emitted, never re-throws', async () => { + const warn = mockWarn(); + const res = await writeReplayLog({ + ...baseArgs(), + deps: { + env: { DIGEST_DEDUP_REPLAY_LOG: '1' }, + redisPipeline: async () => { throw new Error('upstash exploded'); }, + warn: warn.impl, + }, + }); + assert.equal(res.wrote, 0); + assert.equal(warn.lines.length, 1); + assert.match(warn.lines[0], /replay-log.*write failed.*upstash exploded/); + }); + + it('malformed args → no throw, returns a result object', async () => { + const warn = mockWarn(); + // No stories at all — should skip cleanly. + const res = await writeReplayLog({ + deps: { env: { DIGEST_DEDUP_REPLAY_LOG: '1' }, warn: warn.impl }, + }); + assert.equal(res.skipped, 'empty'); + }); +});