mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
feat(digest-dedup): replayable per-story input log (opt-in, no behaviour change) (#3330)
* feat(digest-dedup): replayable per-story input log (opt-in, no behaviour change)
Ship the measurement layer before picking any recall-lift strategy.
Why: the current dedup path embeds titles only, so brief-wire headlines
that share a real event but drop the geographic anchor (e.g. "Alleged
Coup: defendant arrives in court" vs "Trial opens after Nigeria charges
six over 2025 coup plot") can slip past the 0.60 cosine threshold. To
tune recall without regressing precision we need a replayable per-tick
dataset — one record per story with the exact fields any downstream
candidate (title+slug, LLM-canonicalise, text-embedding-3-large, cross-
encoder re-rank, etc.) would need to score.
This PR ships ONLY the log. Zero behaviour change:
- Opt-in via DIGEST_DEDUP_REPLAY_LOG=1 (default OFF).
- Writer is best-effort: all errors swallowed + warned, never affects
digest delivery. No throw path.
- Records include hash, originalIndex, isRep, clusterId, raw +
normalised title, link, severity/score/mentions/phase/sources,
embeddingCacheKey, hasEmbedding sidecar flag, and the tick's config
snapshot (mode, clustering, cosineThreshold, topicThreshold, veto).
- clusterId derives from rep.mergedHashes (already set by
materializeCluster) so the orchestrator is untouched.
- Storage: Upstash list keyed by {variant}:{lang}:{sensitivity}:{date}
with 30-day EXPIRE. Date suffix caps per-key growth; retention
covers the labelling cadence + cross-candidate comparison window.
- Env flag is '1'-only (fail-closed on typos, same pattern as
DIGEST_DEDUP_MODE).
Activation path (post-merge): flip DIGEST_DEDUP_REPLAY_LOG=1 on the
seed-digest-notifications Railway service. Watch one cron tick for the
RPUSH + EXPIRE pair (or a single warn line if creds/upstream flake),
then leave running for at least one week to accumulate calibration data.
Tests: 21 unit tests covering flag parsing, key shape + sanitisation,
record field correctness (isRep, clusterId, embeddingCacheKey,
hasEmbedding, tickConfig), pipeline null/throw handling, malformed
input. Existing 77 dedup tests unchanged and still green.
* fix(digest-dedup): capture topicGroupingEnabled in replay tickConfig
Review catch (PR #3330): the tickConfig snapshot omitted
topicGroupingEnabled even though readOrchestratorConfig returns it and
the digest's post-dedup topic ordering gates on it. A tick run with
DIGEST_DEDUP_TOPIC_GROUPING=0 serialised identically to a default
tick, making those runs non-replayable for the calibration work this
log is meant to enable.
Add topicGroupingEnabled to the recorded tickConfig. One-line schema
fix + regression test asserting topic-grouping-off ticks serialise
distinctly from default.
22/22 tests pass.
* fix(digest-dedup): await replay-log write to survive explicit process.exit
Review catch (PR #3330): the fire-and-forget `void writeReplayLog(...)`
call could be dropped on the explicit-exit paths — the brief-compose
failure gate at line 1539 and main().catch at line 1545 both call
process.exit(1). Unlike natural exit, process.exit does not drain
in-flight promises, so the last N ticks' replay records could be
silently lost on runs where measurement fidelity matters most.
Fix: await the writeReplayLog call. Safe because:
- writeReplayLog returns synchronously when the flag is off
(replayLogEnabled check is the first thing it does)
- It has a top-level try/catch that always returns a result object
- The Upstash pipeline call has a 10s timeout ceiling
- buildDigest already awaits many Upstash calls (dedup, compose,
render) so one more is not a hot-path concern
Comment block added above the call explains why the await is
deliberate — so a future refactor doesn't revert it to void thinking
it's a leftover.
No test change: existing writeReplayLog unit tests already cover the
disabled / empty / success / error paths. The fix is a single-keyword
change in a caller that was already guaranteed-safe by the callee's
contract.
* refactor(digest-dedup): address Greptile P2 review comments on replay log
Three non-blocking polish items from the automated review, bundled
because they all touch the same new module and none change behaviour.
1. tsMs captured BEFORE deduplicateStories (seed-digest-notifications.mjs).
Previously sampled after dedup returned, so briefTickId reflected
dedup-completion time rather than tick-start. For downstream readers
the natural reading of "briefTickId" is when the tick began
processing; moved the Date.now() call to match that expectation.
Drift is maybe 100ms-2s on cold-cache embed calls — small, but
moving it is free.
2. buildReplayLogKey emptiness check now strips ':' and '-' in addition
to '_'. A pathological ruleId of ':::' previously passed through
verbatim, producing keys like `digest:replay-log:v1::::2026-04-23`
that confuse redis-cli's namespace tooling (SCAN / KEYS / tab
completion). The new guard falls back to "unknown" on any input
that's all separators. Added a regression test covering the
':::' / '---' / '___' / mixed cases.
3. tickConfig is now a per-record shallow copy instead of a shared
reference. Storage is unaffected (writeReplayLog serialises each
record via JSON.stringify independently) but an in-memory consumer
that mutated one record's tickConfig for experimentation would have
silently affected all other records in the same batch. Added a
regression test asserting mutation doesn't leak across records.
Tests: 24/24 pass (22 prior + 2 new regression). Typecheck + lint clean.
This commit is contained in:
245
scripts/lib/brief-dedup-replay-log.mjs
Normal file
245
scripts/lib/brief-dedup-replay-log.mjs
Normal file
@@ -0,0 +1,245 @@
|
||||
/**
|
||||
* Replayable per-story input log for brief-dedup calibration.
|
||||
*
|
||||
* Problem this solves: we can't validate recall-lift options that shift
|
||||
* the embedding score distribution (title+slug, LLM-canonicalise, 3-large
|
||||
* model upgrade, etc.) from a baseline-band pair log alone. We need the
|
||||
* per-story inputs for every tick so offline replays can re-embed with
|
||||
* alternative configs and re-score the full pair matrix.
|
||||
*
|
||||
* See docs/brainstorms/2026-04-23-001-brief-dedup-recall-gap.md §5 Phase 1.
|
||||
*
|
||||
* Contract:
|
||||
* - Opt-in via DIGEST_DEDUP_REPLAY_LOG=1 (default OFF — zero behaviour
|
||||
* change on merge).
|
||||
* - Best-effort: ALL failures are swallowed + warned. Replay-log write
|
||||
* errors MUST NEVER affect digest delivery.
|
||||
* - Append-only list in Upstash: one JSON record per story, keyed by
|
||||
* rule + date so operators can range-query a day's traffic.
|
||||
* - 30-day TTL (see §5 Phase 1 retention rationale: covers labelling
|
||||
* cadence + cross-candidate comparison window; cache TTL is not the
|
||||
* right anchor — replays that change embed config pay a fresh embed
|
||||
* regardless of cache).
|
||||
*/
|
||||
|
||||
import { cacheKeyFor, normalizeForEmbedding } from './brief-embedding.mjs';
|
||||
import { defaultRedisPipeline } from './_upstash-pipeline.mjs';
|
||||
|
||||
const KEY_PREFIX = 'digest:replay-log:v1';
|
||||
const TTL_SECONDS = 30 * 24 * 60 * 60; // 30 days
|
||||
|
||||
/**
|
||||
* Env-read at call time so Railway can flip the flag without a redeploy.
|
||||
* Anything other than literal '1' (including unset, '0', 'yes', 'true',
|
||||
* mis-cased 'True') is treated as OFF — fail-closed so a typo can't
|
||||
* silently turn the log on in prod. '1' is the single intentional value.
|
||||
*
|
||||
* @param {Record<string,string|undefined>} [env]
|
||||
*/
|
||||
export function replayLogEnabled(env = process.env) {
|
||||
return env.DIGEST_DEDUP_REPLAY_LOG === '1';
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the Upstash list key for a given tick.
|
||||
*
|
||||
* Format: digest:replay-log:v1:{ruleId}:{YYYY-MM-DD}
|
||||
*
|
||||
* Scoped per-rule so operators can range-query a single rule's day
|
||||
* without scanning traffic from other digest variants. Date suffix
|
||||
* (UTC) caps list length to one day's cron ticks — prevents unbounded
|
||||
* growth of a single key over the 30-day retention window.
|
||||
*
|
||||
* Safe-characters gate on ruleId: strip anything not alnum/underscore/
|
||||
* hyphen so an exotic rule id can't escape the key namespace.
|
||||
*/
|
||||
export function buildReplayLogKey(ruleId, tsMs) {
|
||||
// Allow ':' so `variant:lang:sensitivity` composite ruleIds stay
|
||||
// readable as Redis key segments. Strip anything else to '_'; then
|
||||
// if the whole string collapsed to nothing meaningful — all '_',
|
||||
// ':', '-', or empty — use 'unknown' so the key namespace stays
|
||||
// consistent. Stripping ':' / '-' in the emptiness check prevents
|
||||
// pathological inputs like ':::' producing keys like
|
||||
// `digest:replay-log:v1::::2026-04-23` that confuse Redis namespace
|
||||
// tooling (SCAN / KEYS / redis-cli tab completion).
|
||||
const raw = String(ruleId ?? '').replace(/[^A-Za-z0-9:_-]/g, '_');
|
||||
const safeRuleId = raw.replace(/[_:-]/g, '') === '' ? 'unknown' : raw;
|
||||
const iso = new Date(tsMs).toISOString();
|
||||
const dateKey = iso.slice(0, 10); // YYYY-MM-DD
|
||||
return `${KEY_PREFIX}:${safeRuleId}:${dateKey}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build one JSON record per story in the dedup input.
|
||||
*
|
||||
* `clusterId` is derived from `reps[].mergedHashes` — the authoritative
|
||||
* cluster-membership contract that materializeCluster already provides
|
||||
* (brief-dedup-jaccard.mjs:75-85). No change to the orchestrator needed.
|
||||
*
|
||||
* `embeddingCacheKey` is computed from normalizeForEmbedding(title). It
|
||||
* only helps replays that keep the SAME embedding config (model, dims,
|
||||
* input transform) — replays that change any of those pay fresh embed
|
||||
* calls regardless. Still worth recording: it's ~60 bytes and makes
|
||||
* same-config replays cheap.
|
||||
*
|
||||
* @param {Array<object>} stories — the input passed to deduplicateStories
|
||||
* @param {Array<object>} reps — the reps returned by deduplicateStories
|
||||
* @param {Map<string, number[]>} embeddingByHash — sidecar from the embed path
|
||||
* @param {object} cfg — the full config object from readOrchestratorConfig
|
||||
* @param {object} tickContext
|
||||
* @param {string} tickContext.briefTickId
|
||||
* @param {string} tickContext.ruleId
|
||||
* @param {number} tickContext.tsMs
|
||||
* @returns {Array<object>}
|
||||
*/
|
||||
export function buildReplayRecords(stories, reps, embeddingByHash, cfg, tickContext) {
|
||||
// Derive hash → clusterId from rep membership. A rep's mergedHashes
|
||||
// lists every hash in its cluster including the rep's own; iterate
|
||||
// reps in output order and use the index as clusterId.
|
||||
const clusterByHash = new Map();
|
||||
if (Array.isArray(reps)) {
|
||||
reps.forEach((rep, clusterId) => {
|
||||
const hashes = Array.isArray(rep?.mergedHashes) ? rep.mergedHashes : [rep?.hash];
|
||||
for (const h of hashes) {
|
||||
if (typeof h === 'string' && !clusterByHash.has(h)) {
|
||||
clusterByHash.set(h, clusterId);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// `repHashes` is a Set of the winning story's hash per cluster. A
|
||||
// story is the rep iff its hash === the rep.hash at its clusterId.
|
||||
const repHashes = new Set();
|
||||
if (Array.isArray(reps)) {
|
||||
for (const rep of reps) {
|
||||
if (typeof rep?.hash === 'string') repHashes.add(rep.hash);
|
||||
}
|
||||
}
|
||||
|
||||
const tickConfig = {
|
||||
mode: cfg?.mode ?? null,
|
||||
clustering: cfg?.clustering ?? null,
|
||||
cosineThreshold: cfg?.cosineThreshold ?? null,
|
||||
// topicGroupingEnabled gates the post-dedup topic ordering pass in
|
||||
// seed-digest-notifications. Omitting it makes topic-grouping-off
|
||||
// ticks indistinguishable from default ticks at replay time, so
|
||||
// downstream replays can't reconstruct output behaviour for runs
|
||||
// with DIGEST_DEDUP_TOPIC_GROUPING=0. Serialise explicitly.
|
||||
topicGroupingEnabled: cfg?.topicGroupingEnabled ?? null,
|
||||
topicThreshold: cfg?.topicThreshold ?? null,
|
||||
entityVetoEnabled: cfg?.entityVetoEnabled ?? null,
|
||||
};
|
||||
|
||||
const records = [];
|
||||
stories.forEach((story, originalIndex) => {
|
||||
const rawTitle = typeof story?.title === 'string' ? story.title : '';
|
||||
const normalizedTitle = normalizeForEmbedding(rawTitle);
|
||||
const cacheKey = rawTitle ? cacheKeyFor(normalizedTitle) : null;
|
||||
// hasEmbedding is a diagnostic: if the embed path produced a vector
|
||||
// for this rep, the sidecar has it. Useful in replay to tell apart
|
||||
// "embed path completed" from "embed path fell back to Jaccard".
|
||||
const hasEmbedding =
|
||||
embeddingByHash instanceof Map && embeddingByHash.has(story?.hash);
|
||||
records.push({
|
||||
v: 1,
|
||||
briefTickId: tickContext.briefTickId,
|
||||
ruleId: tickContext.ruleId,
|
||||
tsMs: tickContext.tsMs,
|
||||
storyHash: story?.hash ?? null,
|
||||
originalIndex,
|
||||
isRep: repHashes.has(story?.hash),
|
||||
clusterId: clusterByHash.has(story?.hash)
|
||||
? clusterByHash.get(story?.hash)
|
||||
: null,
|
||||
title: rawTitle,
|
||||
normalizedTitle,
|
||||
link: typeof story?.link === 'string' ? story.link : null,
|
||||
severity: story?.severity ?? null,
|
||||
currentScore: Number(story?.currentScore ?? 0),
|
||||
mentionCount: Number(story?.mentionCount ?? 1),
|
||||
phase: story?.phase ?? null,
|
||||
sources: Array.isArray(story?.sources) ? story.sources : [],
|
||||
embeddingCacheKey: cacheKey,
|
||||
hasEmbedding,
|
||||
// Per-record shallow copy so an in-memory consumer (future
|
||||
// replay harness, test) that mutates one record's tickConfig
|
||||
// can't silently affect every other record via shared reference.
|
||||
// Serialisation goes through JSON.stringify in writeReplayLog so
|
||||
// storage is unaffected either way; this is purely an in-memory
|
||||
// footgun fix.
|
||||
tickConfig: { ...tickConfig },
|
||||
});
|
||||
});
|
||||
return records;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the replay log for one dedup tick. Best-effort: every error is
|
||||
* caught and warned; the function NEVER throws.
|
||||
*
|
||||
* @param {object} args
|
||||
* @param {Array<object>} args.stories — input to deduplicateStories
|
||||
* @param {Array<object>} args.reps — output from deduplicateStories
|
||||
* @param {Map<string, number[]>} args.embeddingByHash — sidecar from deduplicateStories
|
||||
* @param {object} args.cfg — readOrchestratorConfig result
|
||||
* @param {object} args.tickContext
|
||||
* @param {string} args.tickContext.briefTickId
|
||||
* @param {string} args.tickContext.ruleId
|
||||
* @param {number} args.tickContext.tsMs
|
||||
* @param {object} [args.deps]
|
||||
* @param {Record<string,string|undefined>} [args.deps.env]
|
||||
* @param {typeof defaultRedisPipeline} [args.deps.redisPipeline]
|
||||
* @param {(line: string) => void} [args.deps.warn]
|
||||
* @returns {Promise<{ wrote: number, key: string | null, skipped: 'disabled' | 'empty' | null }>}
|
||||
*/
|
||||
export async function writeReplayLog(args) {
|
||||
const {
|
||||
stories,
|
||||
reps,
|
||||
embeddingByHash,
|
||||
cfg,
|
||||
tickContext,
|
||||
deps = {},
|
||||
} = args ?? {};
|
||||
const env = deps.env ?? process.env;
|
||||
const warn = deps.warn ?? ((line) => console.warn(line));
|
||||
|
||||
if (!replayLogEnabled(env)) {
|
||||
return { wrote: 0, key: null, skipped: 'disabled' };
|
||||
}
|
||||
if (!Array.isArray(stories) || stories.length === 0) {
|
||||
return { wrote: 0, key: null, skipped: 'empty' };
|
||||
}
|
||||
|
||||
try {
|
||||
const pipelineImpl = deps.redisPipeline ?? defaultRedisPipeline;
|
||||
const records = buildReplayRecords(
|
||||
stories,
|
||||
reps ?? [],
|
||||
embeddingByHash instanceof Map ? embeddingByHash : new Map(),
|
||||
cfg ?? {},
|
||||
tickContext ?? { briefTickId: 'unknown', ruleId: 'unknown', tsMs: Date.now() },
|
||||
);
|
||||
if (records.length === 0) {
|
||||
return { wrote: 0, key: null, skipped: 'empty' };
|
||||
}
|
||||
const key = buildReplayLogKey(tickContext?.ruleId, tickContext?.tsMs ?? Date.now());
|
||||
// Single RPUSH with variadic values (one per story) + EXPIRE. Keep
|
||||
// to two commands so Upstash's pipeline stays cheap even on large
|
||||
// ticks. Stringify each record individually so downstream readers
|
||||
// can consume with LRANGE + JSON.parse.
|
||||
const rpushCmd = ['RPUSH', key, ...records.map((r) => JSON.stringify(r))];
|
||||
const expireCmd = ['EXPIRE', key, String(TTL_SECONDS)];
|
||||
const result = await pipelineImpl([rpushCmd, expireCmd]);
|
||||
if (result == null) {
|
||||
warn(`[digest] replay-log: pipeline returned null (creds missing or upstream down) key=${key}`);
|
||||
return { wrote: 0, key, skipped: null };
|
||||
}
|
||||
return { wrote: records.length, key, skipped: null };
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
warn(`[digest] replay-log: write failed — ${msg}`);
|
||||
return { wrote: 0, key: null, skipped: null };
|
||||
}
|
||||
}
|
||||
@@ -48,6 +48,7 @@ import {
|
||||
readOrchestratorConfig,
|
||||
} from './lib/brief-dedup.mjs';
|
||||
import { stripSourceSuffix } from './lib/brief-dedup-jaccard.mjs';
|
||||
import { writeReplayLog } from './lib/brief-dedup-replay-log.mjs';
|
||||
|
||||
// ── Config ────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -390,8 +391,42 @@ async function buildDigest(rule, windowStartMs) {
|
||||
|
||||
stories.sort((a, b) => b.currentScore - a.currentScore);
|
||||
const cfg = readOrchestratorConfig(process.env);
|
||||
// Sample tsMs BEFORE dedup so briefTickId anchors to tick-start, not
|
||||
// to dedup-completion. Dedup can take a few seconds on cold-cache
|
||||
// embed calls; we want the replay log's tick id to reflect when the
|
||||
// tick began processing, which is the natural reading of
|
||||
// "briefTickId" for downstream readers.
|
||||
const tsMs = Date.now();
|
||||
const { reps: dedupedAll, embeddingByHash, logSummary } =
|
||||
await deduplicateStories(stories);
|
||||
// Replay log (opt-in via DIGEST_DEDUP_REPLAY_LOG=1). Best-effort — any
|
||||
// failure is swallowed by writeReplayLog. Runs AFTER dedup so the log
|
||||
// captures the real rep + cluster assignments. RuleId omits userId on
|
||||
// purpose: dedup input is shared across users of the same (variant,
|
||||
// lang, sensitivity), and we don't want user identity in log keys.
|
||||
// See docs/brainstorms/2026-04-23-001-brief-dedup-recall-gap.md §5 Phase 1.
|
||||
//
|
||||
// AWAITED on purpose: this script exits via explicit process.exit(1)
|
||||
// on the brief-compose failure gate (~line 1539) and on main().catch
|
||||
// (~line 1545). process.exit does NOT drain in-flight promises like
|
||||
// natural exit does, so a `void` call here would silently drop the
|
||||
// last N ticks' replay records — exactly the runs where measurement
|
||||
// fidelity matters most. writeReplayLog has its own internal try/
|
||||
// catch + early return when the flag is off, so awaiting is free on
|
||||
// the disabled path and bounded by the 10s Upstash pipeline timeout
|
||||
// on the enabled path.
|
||||
const ruleKey = `${variant}:${lang}:${rule.sensitivity ?? 'high'}`;
|
||||
await writeReplayLog({
|
||||
stories,
|
||||
reps: dedupedAll,
|
||||
embeddingByHash,
|
||||
cfg,
|
||||
tickContext: {
|
||||
briefTickId: `${ruleKey}:${tsMs}`,
|
||||
ruleId: ruleKey,
|
||||
tsMs,
|
||||
},
|
||||
});
|
||||
// Apply the absolute-score floor AFTER dedup so the floor runs on
|
||||
// the representative's score (mentionCount-sum doesn't change the
|
||||
// score field; the rep is the highest-scoring member of its
|
||||
|
||||
383
tests/brief-dedup-replay-log.test.mjs
Normal file
383
tests/brief-dedup-replay-log.test.mjs
Normal file
@@ -0,0 +1,383 @@
|
||||
/**
|
||||
* Unit tests for the replayable per-story input log.
|
||||
*
|
||||
* Covers:
|
||||
* 1. Flag OFF → no writes, returns skipped=disabled (default behaviour)
|
||||
* 2. Flag ON + empty stories → no writes
|
||||
* 3. Flag ON + stories → RPUSH with one record per story + EXPIRE 30d
|
||||
* 4. Record fields match §5 Phase 1 spec (hash, originalIndex, isRep,
|
||||
* clusterId, title/normalizedTitle, link, severity/score/mentions,
|
||||
* phase/sources, embeddingCacheKey, hasEmbedding, tickConfig)
|
||||
* 5. clusterId derived correctly from rep.mergedHashes (Jaccard + embed
|
||||
* output shapes both populate mergedHashes — one codepath)
|
||||
* 6. isRep only set for the materialized winning hash of each cluster
|
||||
* 7. Pipeline returns null (creds missing) → warn + skipped
|
||||
* 8. Pipeline throws → caught, warn emitted, no exception propagates
|
||||
* 9. Key shape: digest:replay-log:v1:{safeRuleId}:{YYYY-MM-DD}
|
||||
* 10. Only DIGEST_DEDUP_REPLAY_LOG='1' literal enables (case-sensitive,
|
||||
* no 'yes'/'true'/'True' lenience — fail-closed typo-safety)
|
||||
*
|
||||
* Run: node --test tests/brief-dedup-replay-log.test.mjs
|
||||
*/
|
||||
|
||||
import { describe, it } from 'node:test';
|
||||
import assert from 'node:assert/strict';
|
||||
|
||||
import {
|
||||
buildReplayLogKey,
|
||||
buildReplayRecords,
|
||||
replayLogEnabled,
|
||||
writeReplayLog,
|
||||
} from '../scripts/lib/brief-dedup-replay-log.mjs';
|
||||
import { cacheKeyFor, normalizeForEmbedding } from '../scripts/lib/brief-embedding.mjs';
|
||||
|
||||
// ── Fixture helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
function story(title, { hash, score = 10, mentions = 1, severity = 'high', link = '', phase = 'emerging', sources = [] } = {}) {
|
||||
return {
|
||||
hash: hash ?? `h-${title.slice(0, 16).replace(/\W+/g, '-')}`,
|
||||
title,
|
||||
link,
|
||||
severity,
|
||||
currentScore: score,
|
||||
mentionCount: mentions,
|
||||
phase,
|
||||
sources,
|
||||
};
|
||||
}
|
||||
|
||||
function rep(hash, mergedHashes, extras = {}) {
|
||||
return { hash, mergedHashes, ...extras };
|
||||
}
|
||||
|
||||
function mockPipeline() {
|
||||
const calls = [];
|
||||
const impl = async (commands) => {
|
||||
calls.push(commands);
|
||||
return commands.map(() => ({ result: 'OK' }));
|
||||
};
|
||||
return { impl, calls };
|
||||
}
|
||||
|
||||
function mockWarn() {
|
||||
const lines = [];
|
||||
return { impl: (line) => lines.push(line), lines };
|
||||
}
|
||||
|
||||
// ── Tests ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
describe('replayLogEnabled — env parsing', () => {
|
||||
it('OFF by default (unset)', () => {
|
||||
assert.equal(replayLogEnabled({}), false);
|
||||
});
|
||||
|
||||
it('OFF on "0", "yes", "true", "True", "1 " (trailing space), empty', () => {
|
||||
for (const v of ['0', 'yes', 'true', 'True', '1 ', '']) {
|
||||
assert.equal(
|
||||
replayLogEnabled({ DIGEST_DEDUP_REPLAY_LOG: v }),
|
||||
false,
|
||||
`expected OFF for ${JSON.stringify(v)}`,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
it('ON only for literal "1"', () => {
|
||||
assert.equal(replayLogEnabled({ DIGEST_DEDUP_REPLAY_LOG: '1' }), true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('buildReplayLogKey — key shape', () => {
|
||||
it('produces digest:replay-log:v1:{ruleId}:{YYYY-MM-DD} on sample input', () => {
|
||||
const ts = Date.UTC(2026, 3, 23, 8, 2, 0); // 2026-04-23T08:02:00Z
|
||||
const key = buildReplayLogKey('full:en:high', ts);
|
||||
assert.equal(key, 'digest:replay-log:v1:full:en:high:2026-04-23');
|
||||
});
|
||||
|
||||
it('sanitises ruleId to [A-Za-z0-9:_-] (keeps colons for composite ids)', () => {
|
||||
const ts = Date.UTC(2026, 3, 23);
|
||||
const key = buildReplayLogKey('full/en::weird chars!', ts);
|
||||
assert.equal(key, 'digest:replay-log:v1:full_en::weird_chars_:2026-04-23');
|
||||
});
|
||||
|
||||
it('falls back to ruleId="unknown" on null/empty', () => {
|
||||
const ts = Date.UTC(2026, 3, 23);
|
||||
assert.equal(
|
||||
buildReplayLogKey(null, ts),
|
||||
'digest:replay-log:v1:unknown:2026-04-23',
|
||||
);
|
||||
// A string of only unsafe chars → sanitised to empty → "unknown"
|
||||
assert.equal(
|
||||
buildReplayLogKey('!!!!', ts),
|
||||
'digest:replay-log:v1:unknown:2026-04-23',
|
||||
);
|
||||
});
|
||||
|
||||
it('falls back to ruleId="unknown" on pathological separator-only inputs', () => {
|
||||
// Regression guard (Greptile P2): a ruleId of pure separators
|
||||
// (':', '-', '_' or mixtures) has no identifying content — passing
|
||||
// it through verbatim would produce keys like
|
||||
// `digest:replay-log:v1::::2026-04-23` that confuse redis-cli
|
||||
// namespace tooling. The emptiness check strips ':' / '-' / '_'
|
||||
// before deciding to fall back.
|
||||
const ts = Date.UTC(2026, 3, 23);
|
||||
for (const raw of [':::', '---', '___', ':_:', '-_-', '::-:--']) {
|
||||
assert.equal(
|
||||
buildReplayLogKey(raw, ts),
|
||||
'digest:replay-log:v1:unknown:2026-04-23',
|
||||
`ruleId=${JSON.stringify(raw)} should fall back to "unknown"`,
|
||||
);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('buildReplayRecords — record shape', () => {
|
||||
const s1 = story('Nigeria coup trial opens, six charged', { hash: 'h1' });
|
||||
const s2 = story('Alleged Coup: one defendant arrives', { hash: 'h2' });
|
||||
const s3 = story('Russia halts Druzhba oil to Germany', { hash: 'h3' });
|
||||
const stories = [s1, s2, s3];
|
||||
// Pretend dedup merged s1+s2 into cluster-0 (rep=s1), s3 alone in cluster-1.
|
||||
const reps = [
|
||||
rep('h1', ['h1', 'h2'], { currentScore: 10 }),
|
||||
rep('h3', ['h3'], { currentScore: 8 }),
|
||||
];
|
||||
const cfg = {
|
||||
mode: 'embed',
|
||||
clustering: 'single',
|
||||
cosineThreshold: 0.6,
|
||||
topicGroupingEnabled: true,
|
||||
topicThreshold: 0.45,
|
||||
entityVetoEnabled: true,
|
||||
};
|
||||
const tickContext = { briefTickId: 'tick-1', ruleId: 'full:en:high', tsMs: 1000 };
|
||||
|
||||
it('produces one record per input story', () => {
|
||||
const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext);
|
||||
assert.equal(records.length, 3);
|
||||
});
|
||||
|
||||
it('preserves originalIndex in input order', () => {
|
||||
const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext);
|
||||
assert.deepEqual(records.map((r) => r.originalIndex), [0, 1, 2]);
|
||||
});
|
||||
|
||||
it('isRep is true exactly for each cluster rep hash', () => {
|
||||
const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext);
|
||||
const byHash = new Map(records.map((r) => [r.storyHash, r]));
|
||||
assert.equal(byHash.get('h1').isRep, true);
|
||||
assert.equal(byHash.get('h2').isRep, false);
|
||||
assert.equal(byHash.get('h3').isRep, true);
|
||||
});
|
||||
|
||||
it('clusterId derives from rep.mergedHashes (s1+s2 → 0, s3 → 1)', () => {
|
||||
const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext);
|
||||
const byHash = new Map(records.map((r) => [r.storyHash, r]));
|
||||
assert.equal(byHash.get('h1').clusterId, 0);
|
||||
assert.equal(byHash.get('h2').clusterId, 0);
|
||||
assert.equal(byHash.get('h3').clusterId, 1);
|
||||
});
|
||||
|
||||
it('embeddingCacheKey matches cacheKeyFor(normalizeForEmbedding(title))', () => {
|
||||
const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext);
|
||||
assert.equal(
|
||||
records[0].embeddingCacheKey,
|
||||
cacheKeyFor(normalizeForEmbedding(s1.title)),
|
||||
);
|
||||
});
|
||||
|
||||
it('hasEmbedding reflects sidecar membership', () => {
|
||||
const vec = new Array(512).fill(0.1);
|
||||
const sidecar = new Map([['h1', vec]]);
|
||||
const records = buildReplayRecords(stories, reps, sidecar, cfg, tickContext);
|
||||
const byHash = new Map(records.map((r) => [r.storyHash, r]));
|
||||
assert.equal(byHash.get('h1').hasEmbedding, true);
|
||||
assert.equal(byHash.get('h2').hasEmbedding, false);
|
||||
assert.equal(byHash.get('h3').hasEmbedding, false);
|
||||
});
|
||||
|
||||
it('tickConfig snapshot mirrors cfg (all behaviour-defining fields)', () => {
|
||||
const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext);
|
||||
assert.deepEqual(records[0].tickConfig, {
|
||||
mode: 'embed',
|
||||
clustering: 'single',
|
||||
cosineThreshold: 0.6,
|
||||
topicGroupingEnabled: true,
|
||||
topicThreshold: 0.45,
|
||||
entityVetoEnabled: true,
|
||||
});
|
||||
});
|
||||
|
||||
it('tickConfig is a per-record shallow copy (not a shared reference)', () => {
|
||||
// Regression guard (Greptile P2): mutating one record's tickConfig
|
||||
// must not affect other records in the same batch. A shared-ref
|
||||
// implementation had no storage bug (JSON.stringify serialises
|
||||
// each record independently) but would bite any in-memory
|
||||
// consumer that mutates for experimentation.
|
||||
const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext);
|
||||
assert.ok(records.length >= 2);
|
||||
assert.notStrictEqual(
|
||||
records[0].tickConfig,
|
||||
records[1].tickConfig,
|
||||
'records must not share tickConfig by reference',
|
||||
);
|
||||
records[0].tickConfig.mode = 'MUTATED';
|
||||
assert.equal(records[1].tickConfig.mode, 'embed', 'mutation must not leak');
|
||||
});
|
||||
|
||||
it('serialises topicGroupingEnabled=false distinctly from default', () => {
|
||||
// Regression guard: a tick run with DIGEST_DEDUP_TOPIC_GROUPING=0
|
||||
// must be replay-distinguishable from a normal tick. Prior to this
|
||||
// field being captured, both serialised to the same tickConfig and
|
||||
// downstream replays could not reconstruct the output ordering.
|
||||
const cfgOff = { ...cfg, topicGroupingEnabled: false };
|
||||
const records = buildReplayRecords(stories, reps, new Map(), cfgOff, tickContext);
|
||||
assert.equal(records[0].tickConfig.topicGroupingEnabled, false);
|
||||
// And the default-on tick serialises true, not identical:
|
||||
const recordsOn = buildReplayRecords(stories, reps, new Map(), cfg, tickContext);
|
||||
assert.notDeepEqual(recordsOn[0].tickConfig, records[0].tickConfig);
|
||||
});
|
||||
|
||||
it('tickContext fields copied onto every record', () => {
|
||||
const records = buildReplayRecords(stories, reps, new Map(), cfg, tickContext);
|
||||
for (const r of records) {
|
||||
assert.equal(r.briefTickId, 'tick-1');
|
||||
assert.equal(r.ruleId, 'full:en:high');
|
||||
assert.equal(r.tsMs, 1000);
|
||||
assert.equal(r.v, 1);
|
||||
}
|
||||
});
|
||||
|
||||
it('handles rep without mergedHashes (falls back to rep.hash alone)', () => {
|
||||
const repsNoMerged = [{ hash: 'h1' }, { hash: 'h3' }];
|
||||
const records = buildReplayRecords(
|
||||
[story('a', { hash: 'h1' }), story('b', { hash: 'h3' })],
|
||||
repsNoMerged,
|
||||
new Map(),
|
||||
cfg,
|
||||
tickContext,
|
||||
);
|
||||
assert.equal(records[0].clusterId, 0);
|
||||
assert.equal(records[1].clusterId, 1);
|
||||
assert.equal(records[0].isRep, true);
|
||||
assert.equal(records[1].isRep, true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('writeReplayLog — behaviour', () => {
|
||||
const baseArgs = () => {
|
||||
const s1 = story('Nigeria coup trial opens', { hash: 'h1', link: 'https://example.com/nigeria-coup' });
|
||||
const s2 = story('Alleged Coup: defendant arrives', { hash: 'h2' });
|
||||
return {
|
||||
stories: [s1, s2],
|
||||
reps: [rep('h1', ['h1', 'h2'])],
|
||||
embeddingByHash: new Map(),
|
||||
cfg: {
|
||||
mode: 'embed',
|
||||
clustering: 'single',
|
||||
cosineThreshold: 0.6,
|
||||
topicThreshold: 0.45,
|
||||
entityVetoEnabled: true,
|
||||
},
|
||||
tickContext: {
|
||||
briefTickId: 'tick-1',
|
||||
ruleId: 'full:en:high',
|
||||
tsMs: Date.UTC(2026, 3, 23, 8, 2, 0),
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
it('flag OFF → skipped=disabled, no pipeline call, no warn', async () => {
|
||||
const pipe = mockPipeline();
|
||||
const warn = mockWarn();
|
||||
const res = await writeReplayLog({
|
||||
...baseArgs(),
|
||||
deps: { env: {}, redisPipeline: pipe.impl, warn: warn.impl },
|
||||
});
|
||||
assert.deepEqual(res, { wrote: 0, key: null, skipped: 'disabled' });
|
||||
assert.equal(pipe.calls.length, 0);
|
||||
assert.equal(warn.lines.length, 0);
|
||||
});
|
||||
|
||||
it('flag ON + empty stories → skipped=empty, no pipeline call', async () => {
|
||||
const pipe = mockPipeline();
|
||||
const warn = mockWarn();
|
||||
const res = await writeReplayLog({
|
||||
...baseArgs(),
|
||||
stories: [],
|
||||
deps: { env: { DIGEST_DEDUP_REPLAY_LOG: '1' }, redisPipeline: pipe.impl, warn: warn.impl },
|
||||
});
|
||||
assert.equal(res.skipped, 'empty');
|
||||
assert.equal(pipe.calls.length, 0);
|
||||
});
|
||||
|
||||
it('flag ON + stories → RPUSH + EXPIRE 30d on correct key', async () => {
|
||||
const pipe = mockPipeline();
|
||||
const warn = mockWarn();
|
||||
const res = await writeReplayLog({
|
||||
...baseArgs(),
|
||||
deps: { env: { DIGEST_DEDUP_REPLAY_LOG: '1' }, redisPipeline: pipe.impl, warn: warn.impl },
|
||||
});
|
||||
assert.equal(res.wrote, 2);
|
||||
assert.equal(res.key, 'digest:replay-log:v1:full:en:high:2026-04-23');
|
||||
assert.equal(res.skipped, null);
|
||||
assert.equal(pipe.calls.length, 1);
|
||||
const [commands] = pipe.calls;
|
||||
assert.equal(commands.length, 2, 'two commands: RPUSH + EXPIRE');
|
||||
const [rpushCmd, expireCmd] = commands;
|
||||
assert.equal(rpushCmd[0], 'RPUSH');
|
||||
assert.equal(rpushCmd[1], 'digest:replay-log:v1:full:en:high:2026-04-23');
|
||||
assert.equal(rpushCmd.length, 4, 'RPUSH + key + 2 story records');
|
||||
assert.equal(expireCmd[0], 'EXPIRE');
|
||||
assert.equal(expireCmd[1], 'digest:replay-log:v1:full:en:high:2026-04-23');
|
||||
assert.equal(expireCmd[2], String(30 * 24 * 60 * 60));
|
||||
// Each pushed value is a JSON-stringified record.
|
||||
const rec0 = JSON.parse(rpushCmd[2]);
|
||||
const rec1 = JSON.parse(rpushCmd[3]);
|
||||
assert.equal(rec0.storyHash, 'h1');
|
||||
assert.equal(rec0.isRep, true);
|
||||
assert.equal(rec0.link, 'https://example.com/nigeria-coup');
|
||||
assert.equal(rec1.storyHash, 'h2');
|
||||
assert.equal(rec1.isRep, false);
|
||||
assert.equal(rec1.clusterId, 0, 'h2 is in the same cluster as h1');
|
||||
assert.equal(warn.lines.length, 0);
|
||||
});
|
||||
|
||||
it('pipeline returns null → warn + skipped=null + wrote=0', async () => {
|
||||
const warn = mockWarn();
|
||||
const res = await writeReplayLog({
|
||||
...baseArgs(),
|
||||
deps: {
|
||||
env: { DIGEST_DEDUP_REPLAY_LOG: '1' },
|
||||
redisPipeline: async () => null,
|
||||
warn: warn.impl,
|
||||
},
|
||||
});
|
||||
assert.equal(res.wrote, 0);
|
||||
assert.notEqual(res.key, null, 'key is reported even on null pipeline (diagnostic)');
|
||||
assert.equal(warn.lines.length, 1);
|
||||
assert.match(warn.lines[0], /replay-log.*pipeline returned null/);
|
||||
});
|
||||
|
||||
it('pipeline throws → caught, warn emitted, never re-throws', async () => {
|
||||
const warn = mockWarn();
|
||||
const res = await writeReplayLog({
|
||||
...baseArgs(),
|
||||
deps: {
|
||||
env: { DIGEST_DEDUP_REPLAY_LOG: '1' },
|
||||
redisPipeline: async () => { throw new Error('upstash exploded'); },
|
||||
warn: warn.impl,
|
||||
},
|
||||
});
|
||||
assert.equal(res.wrote, 0);
|
||||
assert.equal(warn.lines.length, 1);
|
||||
assert.match(warn.lines[0], /replay-log.*write failed.*upstash exploded/);
|
||||
});
|
||||
|
||||
it('malformed args → no throw, returns a result object', async () => {
|
||||
const warn = mockWarn();
|
||||
// No stories at all — should skip cleanly.
|
||||
const res = await writeReplayLog({
|
||||
deps: { env: { DIGEST_DEDUP_REPLAY_LOG: '1' }, warn: warn.impl },
|
||||
});
|
||||
assert.equal(res.skipped, 'empty');
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user