mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
* feat(intelligence): state-change alerts from diff engine (Phase 2 PR1)
Phase 2 PR1 of the Regional Intelligence Model. Wires the existing
diffRegionalSnapshot output into the existing wm:events:queue so the
notification-relay on Railway can route state changes to Telegram / email /
Slack / Convex for Pro subscribers. No UI changes.
Single public entry point:
emitRegionalAlerts(region, snapshot, diff, { publishEvent? })
-> { enqueued: number, events: object[] }
Emits on 4 event types:
regional_regime_shift — diff.regime_changed set
regional_trigger_activation — one per entry in diff.trigger_activations
regional_corridor_break — one per entry in diff.corridor_breaks
regional_buffer_failure — one per entry in diff.buffer_failures
scenario_jumps and leverage_shifts are intentionally NOT emitted —
probability fluctuations are noisy and not actionable as push alerts.
Severity mapping:
- critical when regime shifts to escalation_ladder or fragmentation_risk
- critical for every corridor_break
- high for other regime shifts, trigger activations, buffer failures
- nothing below high is emitted
Dedup: 6h TTL on wm:notif:scan-dedup:{eventType}:{hash}, matching the cron
cadence and mirroring ais-relay.cjs publishNotificationEvent's path-based
Upstash REST calls.
emitRegionalAlerts NEVER throws. Three layers of guarding:
1. Null/undefined arguments short-circuit to { enqueued: 0, events: [] }
2. Each publisher call is individually try/catch'd so one failure can't
block other events in the same region
3. The seed writer wraps the call in its own try/catch so alert emission
can never block snapshot persist
This matches how ais-relay.cjs handles rss_alert events today.
After persistSnapshot succeeds in main(), the diff already computed in
step 15 is passed to emitRegionalAlerts. Logs per-region alert counts:
[mena] alerts: 3/3 enqueued
Zero impact on the persist path when the diff is empty (steady state
between state changes).
opts.publishEvent is the single injection point. Default is the Upstash
REST path-based publisher that walks SET NX -> LPUSH. Unit tests inject
a mock that captures events in an array, so the full diff -> events ->
dedup pipeline runs offline without touching Redis.
tests/regional-snapshot-alerts.test.mjs covers:
buildAlertEvents (13):
- empty diff -> []
- regime shift high vs critical (escalation_ladder / fragmentation_risk)
- 'none' fallback when previous regime is empty
- one event per trigger activation / corridor break / buffer failure
- scenario_jumps + leverage_shifts excluded
- stable output order: regime -> triggers -> corridors -> buffers
- null/undefined defensive returns
- region.label carried into titles
buildDedupKey + simpleHash (6):
- key shape matches ais-relay convention
- deterministic for same input
- different titles -> different hashes
- same title, different eventType -> different hashes
- missing payload.title handled safely
emitRegionalAlerts (6):
- no-op on empty diff (publisher never called)
- per-event enqueue through injected publisher
- publisher returning false only counts successes
- publisher throwing is swallowed, loop continues
- null/undefined arguments return empty result
- realistic escalation scenario enqueues all 6 event types correctly
- npm run test:data: 4377/4377 pass
- npm run typecheck: clean
- npm run typecheck:api: clean
- biome lint on touched files: clean
The relay already handles generic events with a `[SEVERITY] title` fallback
formatter. No relay-side changes are required — the new event types flow
through the same default path as rss_alert. If we later want custom
templates per regional event type, that's a follow-up to
scripts/notification-relay.cjs.
* fix(intelligence): roll back dedup key on LPUSH failure (PR #2966 review)
P1 review finding on PR #2966. The default publisher used to SET NX the
dedup key and then return false on LPUSH failure without removing it.
Any transient Upstash or network hiccup therefore suppressed the alert
for the full 6h dedup window even though nothing was enqueued — real
state-change alerts could silently vanish for a quarter of a day.
Fix: mirror the rollback path in scripts/ais-relay.cjs
publishNotificationEvent() — when LPUSH fails after SET NX succeeded,
DEL the dedup key so the next cron cycle can retry cleanly.
## Refactor for testability
Extracted publishEventWithOps(event, ops) as the pure orchestrator with
an injected Redis-ops interface ({ setNx, lpush, del }). Returns an
outcome object { enqueued, dedupHit, rolledBack } so tests can assert
exactly which branch fired without log scraping. Default publisher is
now a thin wrapper that binds real Upstash REST calls and delegates.
## Tests — 6 new regression tests
tests/regional-snapshot-alerts.test.mjs:
- happy path: SET NX + LPUSH both succeed, nothing rolled back
- dedup hit: dedupHit=true without touching the queue
- LPUSH failure: rolledBack=true, DEL called, dedup key gone
- retry-after-rollback: next call enqueues normally (the core bug)
- LPUSH failure + DEL failure: still reports rolledBack=true (best-effort)
- exceptions from setNx/lpush are swallowed, return non-enqueued outcome
## Verification
- node --test tests/regional-snapshot-alerts.test.mjs: 31/31 pass
- npm run test:data: 4383/4383 pass (was 4377; +6 regression tests)
- npm run typecheck: clean
- biome lint on touched files: clean
531 lines
20 KiB
JavaScript
531 lines
20 KiB
JavaScript
// Tests for the Regional Intelligence state-change alert emitter (Phase 2 PR1).
|
|
// Pure-function + injectable-publisher unit tests; no network. Run via:
|
|
// npm run test:data
|
|
|
|
import { describe, it } from 'node:test';
|
|
import assert from 'node:assert/strict';
|
|
|
|
import {
|
|
buildAlertEvents,
|
|
buildDedupKey,
|
|
simpleHash,
|
|
emitRegionalAlerts,
|
|
publishEventWithOps,
|
|
} from '../scripts/regional-snapshot/alert-emitter.mjs';
|
|
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
// Fixtures
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
|
|
const menaRegion = { id: 'mena', label: 'Middle East & North Africa' };
|
|
const eastAsiaRegion = { id: 'east-asia', label: 'East Asia & Pacific' };
|
|
|
|
function snapshotFixture(overrides = {}) {
|
|
return {
|
|
region_id: 'mena',
|
|
generated_at: 1_700_000_000_000,
|
|
meta: {
|
|
snapshot_id: 'snap-mena-1',
|
|
model_version: '0.1.0',
|
|
scoring_version: '1.0.0',
|
|
geography_version: '1.0.0',
|
|
snapshot_confidence: 0.9,
|
|
missing_inputs: [],
|
|
stale_inputs: [],
|
|
valid_until: 0,
|
|
trigger_reason: 'scheduled_6h',
|
|
narrative_provider: '',
|
|
narrative_model: '',
|
|
},
|
|
regime: { label: 'coercive_stalemate', previous_label: 'calm', transitioned_at: 0, transition_driver: '' },
|
|
balance: {
|
|
coercive_pressure: 0.5,
|
|
domestic_fragility: 0.5,
|
|
capital_stress: 0.5,
|
|
energy_vulnerability: 0.5,
|
|
alliance_cohesion: 0.5,
|
|
maritime_access: 0.5,
|
|
energy_leverage: 0.5,
|
|
net_balance: 0,
|
|
pressures: [],
|
|
buffers: [],
|
|
},
|
|
actors: [],
|
|
leverage_edges: [],
|
|
scenario_sets: [],
|
|
transmission_paths: [],
|
|
triggers: { active: [], watching: [], dormant: [] },
|
|
mobility: { airspace: [], flight_corridors: [], airports: [], reroute_intensity: 0, notam_closures: [] },
|
|
evidence: [],
|
|
narrative: {
|
|
situation: { text: '', evidence_ids: [] },
|
|
balance_assessment: { text: '', evidence_ids: [] },
|
|
outlook_24h: { text: '', evidence_ids: [] },
|
|
outlook_7d: { text: '', evidence_ids: [] },
|
|
outlook_30d: { text: '', evidence_ids: [] },
|
|
watch_items: [],
|
|
},
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
function emptyDiff(overrides = {}) {
|
|
return {
|
|
regime_changed: null,
|
|
scenario_jumps: [],
|
|
trigger_activations: [],
|
|
trigger_deactivations: [],
|
|
corridor_breaks: [],
|
|
leverage_shifts: [],
|
|
buffer_failures: [],
|
|
reroute_waves: null,
|
|
mobility_disruptions: [],
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
// buildAlertEvents — pure event builder
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
|
|
describe('buildAlertEvents', () => {
|
|
it('returns [] when the diff has no meaningful changes', () => {
|
|
const events = buildAlertEvents(menaRegion, snapshotFixture(), emptyDiff());
|
|
assert.deepEqual(events, []);
|
|
});
|
|
|
|
it('emits a regime_shift event with high severity for non-critical labels', () => {
|
|
const events = buildAlertEvents(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({ regime_changed: { from: 'calm', to: 'coercive_stalemate' } }),
|
|
);
|
|
assert.equal(events.length, 1);
|
|
assert.equal(events[0].eventType, 'regional_regime_shift');
|
|
assert.equal(events[0].severity, 'high');
|
|
assert.match(events[0].payload.title, /calm → coercive stalemate/);
|
|
assert.equal(events[0].payload.region_id, 'mena');
|
|
assert.equal(events[0].payload.snapshot_id, 'snap-mena-1');
|
|
});
|
|
|
|
it('marks regime_shift critical when target is escalation_ladder', () => {
|
|
const events = buildAlertEvents(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({ regime_changed: { from: 'stressed_equilibrium', to: 'escalation_ladder' } }),
|
|
);
|
|
assert.equal(events[0].severity, 'critical');
|
|
});
|
|
|
|
it('marks regime_shift critical when target is fragmentation_risk', () => {
|
|
const events = buildAlertEvents(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({ regime_changed: { from: 'calm', to: 'fragmentation_risk' } }),
|
|
);
|
|
assert.equal(events[0].severity, 'critical');
|
|
});
|
|
|
|
it('labels "none" when previous regime is empty (first snapshot)', () => {
|
|
const events = buildAlertEvents(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({ regime_changed: { from: '', to: 'coercive_stalemate' } }),
|
|
);
|
|
assert.match(events[0].payload.title, /none → coercive stalemate/);
|
|
});
|
|
|
|
it('emits one trigger_activation event per activated trigger', () => {
|
|
const events = buildAlertEvents(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({
|
|
trigger_activations: [
|
|
{ id: 'mena_coercive_high', description: 'Coercive pressure crossed 0.7' },
|
|
{ id: 'hormuz_transit_drop', description: 'Transit volume fell 30%' },
|
|
],
|
|
}),
|
|
);
|
|
assert.equal(events.length, 2);
|
|
assert.equal(events[0].eventType, 'regional_trigger_activation');
|
|
assert.equal(events[0].severity, 'high');
|
|
assert.match(events[0].payload.title, /mena_coercive_high/);
|
|
assert.equal(events[1].payload.details.trigger_id, 'hormuz_transit_drop');
|
|
});
|
|
|
|
it('trigger_activation with empty description still renders a clean title', () => {
|
|
const events = buildAlertEvents(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({ trigger_activations: [{ id: 'raw_trigger', description: '' }] }),
|
|
);
|
|
assert.match(events[0].payload.title, /trigger raw_trigger/);
|
|
assert.doesNotMatch(events[0].payload.title, /—$/); // no trailing em dash
|
|
});
|
|
|
|
it('emits one corridor_break event per break with critical severity', () => {
|
|
const events = buildAlertEvents(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({
|
|
corridor_breaks: [
|
|
{ corridor_id: 'aggregate', from: '0.80', to: '0.45' },
|
|
],
|
|
}),
|
|
);
|
|
assert.equal(events.length, 1);
|
|
assert.equal(events[0].eventType, 'regional_corridor_break');
|
|
assert.equal(events[0].severity, 'critical');
|
|
assert.match(events[0].payload.title, /aggregate.*0\.80.*0\.45/);
|
|
});
|
|
|
|
it('emits one buffer_failure event per axis with high severity', () => {
|
|
const events = buildAlertEvents(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({
|
|
buffer_failures: [
|
|
{ axis: 'alliance_cohesion', from: 0.70, to: 0.40 },
|
|
{ axis: 'maritime_access', from: 0.90, to: 0.50 },
|
|
],
|
|
}),
|
|
);
|
|
assert.equal(events.length, 2);
|
|
assert.equal(events[0].eventType, 'regional_buffer_failure');
|
|
assert.equal(events[0].severity, 'high');
|
|
assert.match(events[0].payload.title, /alliance cohesion.*0\.70.*0\.40/);
|
|
});
|
|
|
|
it('skips scenario_jumps and leverage_shifts (intentionally not emitted)', () => {
|
|
const events = buildAlertEvents(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({
|
|
scenario_jumps: [{ horizon: '24h', lane: 'escalation', from: 0.2, to: 0.5 }],
|
|
leverage_shifts: [{ actor_id: 'IR', from: 0.5, to: 0.8, delta: 0.3 }],
|
|
}),
|
|
);
|
|
assert.deepEqual(events, []);
|
|
});
|
|
|
|
it('returns events in stable order: regime → triggers → corridors → buffers', () => {
|
|
const events = buildAlertEvents(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({
|
|
regime_changed: { from: 'calm', to: 'coercive_stalemate' },
|
|
trigger_activations: [{ id: 'trig1', description: '' }],
|
|
corridor_breaks: [{ corridor_id: 'hormuz', from: '0.9', to: '0.4' }],
|
|
buffer_failures: [{ axis: 'alliance_cohesion', from: 0.7, to: 0.4 }],
|
|
}),
|
|
);
|
|
assert.equal(events[0].eventType, 'regional_regime_shift');
|
|
assert.equal(events[1].eventType, 'regional_trigger_activation');
|
|
assert.equal(events[2].eventType, 'regional_corridor_break');
|
|
assert.equal(events[3].eventType, 'regional_buffer_failure');
|
|
});
|
|
|
|
it('returns [] for null/undefined arguments (defensive)', () => {
|
|
assert.deepEqual(buildAlertEvents(null, snapshotFixture(), emptyDiff()), []);
|
|
assert.deepEqual(buildAlertEvents(menaRegion, null, emptyDiff()), []);
|
|
assert.deepEqual(buildAlertEvents(menaRegion, snapshotFixture(), null), []);
|
|
});
|
|
|
|
it('carries region.label into the title verbatim', () => {
|
|
const events = buildAlertEvents(
|
|
eastAsiaRegion,
|
|
snapshotFixture({ region_id: 'east-asia' }),
|
|
emptyDiff({ regime_changed: { from: 'calm', to: 'coercive_stalemate' } }),
|
|
);
|
|
assert.ok(events[0].payload.title.startsWith('East Asia & Pacific:'));
|
|
});
|
|
});
|
|
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
// Dedup key derivation
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
|
|
describe('buildDedupKey + simpleHash', () => {
|
|
it('produces the expected key shape', () => {
|
|
const key = buildDedupKey({
|
|
eventType: 'regional_regime_shift',
|
|
payload: { title: 'MENA: regime calm → coercive stalemate' },
|
|
});
|
|
assert.match(key, /^wm:notif:scan-dedup:regional_regime_shift:[a-z0-9]+$/);
|
|
});
|
|
|
|
it('produces the same hash for the same title + eventType', () => {
|
|
const a = buildDedupKey({ eventType: 'x', payload: { title: 'hello' } });
|
|
const b = buildDedupKey({ eventType: 'x', payload: { title: 'hello' } });
|
|
assert.equal(a, b);
|
|
});
|
|
|
|
it('produces different hashes for different titles', () => {
|
|
const a = buildDedupKey({ eventType: 'x', payload: { title: 'one' } });
|
|
const b = buildDedupKey({ eventType: 'x', payload: { title: 'two' } });
|
|
assert.notEqual(a, b);
|
|
});
|
|
|
|
it('produces different hashes for the same title under different eventTypes', () => {
|
|
const a = buildDedupKey({ eventType: 'x', payload: { title: 'hi' } });
|
|
const b = buildDedupKey({ eventType: 'y', payload: { title: 'hi' } });
|
|
assert.notEqual(a, b);
|
|
});
|
|
|
|
it('handles missing payload.title safely', () => {
|
|
const key = buildDedupKey({ eventType: 'x', payload: {} });
|
|
assert.match(key, /^wm:notif:scan-dedup:x:/);
|
|
});
|
|
|
|
it('simpleHash returns a non-empty base36 string for any input', () => {
|
|
assert.ok(simpleHash('').length > 0);
|
|
assert.ok(simpleHash('a').length > 0);
|
|
assert.ok(simpleHash('a much longer input string').length > 0);
|
|
});
|
|
});
|
|
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
// emitRegionalAlerts — integration with injected publisher
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
|
|
describe('emitRegionalAlerts', () => {
|
|
function mockPublisher() {
|
|
const calls = [];
|
|
const publish = async (event) => {
|
|
calls.push(event);
|
|
return true;
|
|
};
|
|
return { publish, calls };
|
|
}
|
|
|
|
it('no-ops on an empty diff without calling the publisher', async () => {
|
|
const pub = mockPublisher();
|
|
const result = await emitRegionalAlerts(menaRegion, snapshotFixture(), emptyDiff(), {
|
|
publishEvent: pub.publish,
|
|
});
|
|
assert.equal(result.enqueued, 0);
|
|
assert.equal(result.events.length, 0);
|
|
assert.equal(pub.calls.length, 0);
|
|
});
|
|
|
|
it('enqueues each event through the injected publisher', async () => {
|
|
const pub = mockPublisher();
|
|
const result = await emitRegionalAlerts(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({
|
|
regime_changed: { from: 'calm', to: 'coercive_stalemate' },
|
|
trigger_activations: [{ id: 'mena_coercive_high', description: 'x' }],
|
|
}),
|
|
{ publishEvent: pub.publish },
|
|
);
|
|
assert.equal(result.enqueued, 2);
|
|
assert.equal(pub.calls.length, 2);
|
|
assert.equal(pub.calls[0].eventType, 'regional_regime_shift');
|
|
assert.equal(pub.calls[1].eventType, 'regional_trigger_activation');
|
|
});
|
|
|
|
it('counts only successful enqueues when the publisher returns false', async () => {
|
|
let n = 0;
|
|
const publish = async () => {
|
|
n += 1;
|
|
return n === 1; // only the first succeeds
|
|
};
|
|
const result = await emitRegionalAlerts(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({
|
|
regime_changed: { from: 'calm', to: 'coercive_stalemate' },
|
|
buffer_failures: [{ axis: 'alliance_cohesion', from: 0.7, to: 0.4 }],
|
|
}),
|
|
{ publishEvent: publish },
|
|
);
|
|
assert.equal(result.enqueued, 1);
|
|
assert.equal(result.events.length, 2);
|
|
});
|
|
|
|
it('swallows publisher exceptions and continues the loop', async () => {
|
|
let n = 0;
|
|
const publish = async () => {
|
|
n += 1;
|
|
if (n === 1) throw new Error('upstream down');
|
|
return true;
|
|
};
|
|
const result = await emitRegionalAlerts(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({
|
|
regime_changed: { from: 'calm', to: 'coercive_stalemate' },
|
|
trigger_activations: [{ id: 't1', description: '' }],
|
|
}),
|
|
{ publishEvent: publish },
|
|
);
|
|
// First event threw, second succeeded → enqueued === 1
|
|
assert.equal(result.enqueued, 1);
|
|
assert.equal(result.events.length, 2);
|
|
});
|
|
|
|
it('returns empty result when region/snapshot/diff are missing', async () => {
|
|
const pub = mockPublisher();
|
|
const a = await emitRegionalAlerts(null, snapshotFixture(), emptyDiff(), { publishEvent: pub.publish });
|
|
const b = await emitRegionalAlerts(menaRegion, null, emptyDiff(), { publishEvent: pub.publish });
|
|
const c = await emitRegionalAlerts(menaRegion, snapshotFixture(), null, { publishEvent: pub.publish });
|
|
for (const r of [a, b, c]) {
|
|
assert.equal(r.enqueued, 0);
|
|
assert.deepEqual(r.events, []);
|
|
}
|
|
assert.equal(pub.calls.length, 0);
|
|
});
|
|
|
|
it('works end-to-end on a realistic escalation scenario', async () => {
|
|
const pub = mockPublisher();
|
|
const result = await emitRegionalAlerts(
|
|
menaRegion,
|
|
snapshotFixture(),
|
|
emptyDiff({
|
|
regime_changed: { from: 'stressed_equilibrium', to: 'escalation_ladder' },
|
|
trigger_activations: [
|
|
{ id: 'mena_coercive_high', description: 'Coercive pressure > 0.7' },
|
|
{ id: 'hormuz_transit_drop', description: 'Transit volume < 70% baseline' },
|
|
],
|
|
corridor_breaks: [{ corridor_id: 'hormuz', from: '0.90', to: '0.45' }],
|
|
buffer_failures: [
|
|
{ axis: 'maritime_access', from: 0.90, to: 0.45 },
|
|
{ axis: 'alliance_cohesion', from: 0.70, to: 0.45 },
|
|
],
|
|
}),
|
|
{ publishEvent: pub.publish },
|
|
);
|
|
assert.equal(result.enqueued, 6);
|
|
// regime_shift should be critical (target = escalation_ladder)
|
|
assert.equal(pub.calls[0].severity, 'critical');
|
|
// corridor_break should always be critical
|
|
const corridorEvent = pub.calls.find((e) => e.eventType === 'regional_corridor_break');
|
|
assert.equal(corridorEvent.severity, 'critical');
|
|
// Every event carries the same region_id + snapshot_id
|
|
for (const ev of pub.calls) {
|
|
assert.equal(ev.payload.region_id, 'mena');
|
|
assert.equal(ev.payload.snapshot_id, 'snap-mena-1');
|
|
}
|
|
});
|
|
});
|
|
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
// publishEventWithOps — dedup rollback on LPUSH failure (P1 fix for PR #2966)
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
|
|
describe('publishEventWithOps — dedup rollback', () => {
|
|
/** Minimal in-memory Redis that tracks keys and the queue list. */
|
|
function memoryOps({ lpushFails = false, setNxFails = false } = {}) {
|
|
/** @type {Record<string, boolean>} */
|
|
const dedupKeys = {};
|
|
/** @type {string[]} */
|
|
const queue = [];
|
|
/** @type {string[]} */
|
|
const deletedKeys = [];
|
|
const ops = {
|
|
setNx: async (key, _ttl) => {
|
|
if (setNxFails) return false;
|
|
if (dedupKeys[key]) return false;
|
|
dedupKeys[key] = true;
|
|
return true;
|
|
},
|
|
lpush: async (_key, value) => {
|
|
if (lpushFails) return false;
|
|
queue.push(value);
|
|
return true;
|
|
},
|
|
del: async (key) => {
|
|
delete dedupKeys[key];
|
|
deletedKeys.push(key);
|
|
return true;
|
|
},
|
|
};
|
|
return { ops, dedupKeys, queue, deletedKeys };
|
|
}
|
|
|
|
const sampleEvent = {
|
|
eventType: 'regional_regime_shift',
|
|
severity: 'high',
|
|
payload: { title: 'MENA: regime shift test' },
|
|
};
|
|
|
|
it('happy path: SET NX + LPUSH both succeed, no rollback', async () => {
|
|
const mem = memoryOps();
|
|
const outcome = await publishEventWithOps(sampleEvent, mem.ops);
|
|
assert.deepEqual(outcome, { enqueued: true, dedupHit: false, rolledBack: false });
|
|
assert.equal(mem.queue.length, 1);
|
|
assert.equal(Object.keys(mem.dedupKeys).length, 1);
|
|
assert.equal(mem.deletedKeys.length, 0);
|
|
});
|
|
|
|
it('dedup hit: returns dedupHit=true without touching the queue', async () => {
|
|
const mem = memoryOps();
|
|
// Pre-populate the dedup key so the second call hits it.
|
|
mem.dedupKeys[buildDedupKey(sampleEvent)] = true;
|
|
const outcome = await publishEventWithOps(sampleEvent, mem.ops);
|
|
assert.deepEqual(outcome, { enqueued: false, dedupHit: true, rolledBack: false });
|
|
assert.equal(mem.queue.length, 0);
|
|
assert.equal(mem.deletedKeys.length, 0);
|
|
});
|
|
|
|
it('LPUSH failure: dedup key is rolled back via DEL', async () => {
|
|
const mem = memoryOps({ lpushFails: true });
|
|
const outcome = await publishEventWithOps(sampleEvent, mem.ops);
|
|
assert.deepEqual(outcome, { enqueued: false, dedupHit: false, rolledBack: true });
|
|
// Queue untouched...
|
|
assert.equal(mem.queue.length, 0);
|
|
// ...and dedup key was removed so next cycle can retry.
|
|
assert.equal(Object.keys(mem.dedupKeys).length, 0);
|
|
assert.equal(mem.deletedKeys.length, 1);
|
|
assert.equal(mem.deletedKeys[0], buildDedupKey(sampleEvent));
|
|
});
|
|
|
|
it('retry-after-rollback: next call enqueues normally', async () => {
|
|
// First call fails LPUSH and rolls back.
|
|
const mem = memoryOps({ lpushFails: true });
|
|
await publishEventWithOps(sampleEvent, mem.ops);
|
|
assert.equal(Object.keys(mem.dedupKeys).length, 0);
|
|
|
|
// Switch to a working LPUSH (new memory ops instance preserves dedup state).
|
|
const retryOps = {
|
|
setNx: mem.ops.setNx,
|
|
lpush: async (_key, value) => {
|
|
mem.queue.push(value);
|
|
return true;
|
|
},
|
|
del: mem.ops.del,
|
|
};
|
|
const outcome = await publishEventWithOps(sampleEvent, retryOps);
|
|
assert.equal(outcome.enqueued, true);
|
|
assert.equal(mem.queue.length, 1);
|
|
});
|
|
|
|
it('LPUSH failure + DEL failure still returns rolledBack=true (best-effort)', async () => {
|
|
const mem = memoryOps({ lpushFails: true });
|
|
const opsWithBrokenDel = {
|
|
setNx: mem.ops.setNx,
|
|
lpush: mem.ops.lpush,
|
|
del: async () => {
|
|
throw new Error('del broke');
|
|
},
|
|
};
|
|
const outcome = await publishEventWithOps(sampleEvent, opsWithBrokenDel);
|
|
// We attempted rollback; del threw; still report rolledBack=true.
|
|
assert.equal(outcome.rolledBack, true);
|
|
assert.equal(outcome.enqueued, false);
|
|
});
|
|
|
|
it('swallows exceptions from setNx/lpush and returns a non-enqueued outcome', async () => {
|
|
const brokenOps = {
|
|
setNx: async () => {
|
|
throw new Error('network blown up');
|
|
},
|
|
lpush: async () => true,
|
|
del: async () => true,
|
|
};
|
|
const outcome = await publishEventWithOps(sampleEvent, brokenOps);
|
|
assert.deepEqual(outcome, { enqueued: false, dedupHit: false, rolledBack: false });
|
|
});
|
|
});
|