feat(forecasts): EMA-based threat velocity spike detection (#2463)

* feat(forecasts): add EMA-based threat velocity spike detection

- New pure-math module _ema-threat-engine.mjs: updateWindow, computeWindowStats,
  computeEmaWindows, computeRisk24h with ALPHA=0.3, 24-entry sliding window
- Add ACLED (conflict:acled:v1:all:0:0) and EMA windows (conflict:ema-windows:v1)
  to readInputKeys() Redis pipeline batch
- updateEmaWindows() merges ACLED + UCDP event counts, computes z-scores, persists
  updated windows with 26h TTL before domain detectors run
- detectConflictScenarios and detectUcdpConflictZones inject velocity_spike signals
  (weight 0.35, +0.08 prob boost) when risk24h >= 75

* fix(ema): delta counts + EMA-based z-score in threat engine

- computeEmaWindows: track lastRawCount per country, append delta
  (currentRaw - priorRaw) instead of absolute snapshot totals
- Union prior + current keys so countries absent from current snapshot
  get delta=0 (not silently dropped)
- First run: priorRaw = currentRaw so initial delta is 0, not a spike
- Clamp delta >= 0 to handle snapshot window shrinkage
- computeRisk24h: z-score now uses window.ema instead of raw last entry,
  so ALPHA=0.3 smoothing actually dampens single-run spikes before alerting

* fix(ema): timestamp-based 24h event counting replaces unreliable snapshot delta

- Remove lastRawCount delta logic that suppressed real acceleration in rolling source windows
- computeEmaWindows now counts events with event_date/date_start >= now-24h directly from snapshot objects
- Add private normalizeCountry() helper; add nowMs param for testability
- Remove lastRawCount from WindowState typedef (no longer stored or persisted)

* fix(ema): seed-meta write + dead code cleanup

- P1: write seed-meta:conflict:ema-windows:v1 after EMA persist so health monitoring detects silent failures
- P2: remove dead `last` variable from computeWindowStats
- P2: remove redundant full O(n) EMA sweep in computeWindowStats (incremental EMA in updateWindow is authoritative)
- P2: remove dead `?? 0` guard on `prob` in detectConflictScenarios (already a number at that point)
This commit is contained in:
Elie Habib
2026-03-29 11:09:26 +04:00
committed by GitHub
parent 4435d43436
commit 994aa05e63
2 changed files with 199 additions and 8 deletions

View File

@@ -0,0 +1,124 @@
// @ts-check
/**
* EMA-based threat velocity engine for conflict data.
* Pure functions — no Redis, no side effects.
*/
const ALPHA = 0.3;
const MIN_WINDOW = 6; // min points before z-score is meaningful
/**
* @typedef {{ region: string, window: number[], ema: number, mean: number, stddev: number, updatedAt: number }} WindowState
*/
/**
* @param {string} region
* @param {number} count
* @param {WindowState|null} prior - prior WindowState or null
* @returns {WindowState}
*/
export function updateWindow(region, count, prior) {
const prevWindow = Array.isArray(prior?.window) ? prior.window : [];
const window = [...prevWindow, count].slice(-24);
const prevEma = typeof prior?.ema === 'number' ? prior.ema : count;
const ema = ALPHA * count + (1 - ALPHA) * prevEma;
const { mean, stddev } = computeWindowStats(window);
return { region, window, ema, mean, stddev, updatedAt: Date.now() };
}
/** @param {string|undefined} name @returns {string} */
function normalizeCountry(name) {
return (name ?? '').trim().toLowerCase();
}
/**
* @param {number[]} window
* @returns {{ mean: number, stddev: number }}
*/
export function computeWindowStats(window) {
if (window.length === 0) return { mean: 0, stddev: 0 };
const mean = window.reduce((s, v) => s + v, 0) / window.length;
const variance = window.reduce((s, v) => s + (v - mean) ** 2, 0) / window.length;
const stddev = Math.sqrt(variance);
return { mean, stddev };
}
/**
* @param {Map<string,any>} priorWindows
* @param {any[]} acledEvents — each has event_date: 'YYYY-MM-DD'
* @param {any[]} ucdpEvents — each has date_start: 'YYYY-MM-DD' and country/country_name
* @param {number} [nowMs]
* @returns {Map<string, WindowState>}
*/
export function computeEmaWindows(priorWindows, acledEvents, ucdpEvents, nowMs = Date.now()) {
const cutoff = nowMs - 24 * 60 * 60 * 1000;
/** @type {Map<string, number>} */
const counts24h = new Map();
const safeAcled = Array.isArray(acledEvents) ? acledEvents : [];
const safeUcdp = Array.isArray(ucdpEvents) ? ucdpEvents : [];
for (const ev of safeAcled) {
const country = normalizeCountry(ev?.country);
if (!country) continue;
const ts = Date.parse(ev.event_date);
if (Number.isFinite(ts) && ts >= cutoff) {
counts24h.set(country, (counts24h.get(country) ?? 0) + 1);
}
}
for (const ev of safeUcdp) {
const country = normalizeCountry(ev?.country ?? ev?.country_name);
if (!country) continue;
const ts = Date.parse(ev.date_start);
if (Number.isFinite(ts) && ts >= cutoff) {
counts24h.set(country, (counts24h.get(country) ?? 0) + 1);
}
}
const safePrior = priorWindows instanceof Map ? priorWindows : new Map();
const allCountries = new Set([...safePrior.keys(), ...counts24h.keys()]);
/** @type {Map<string, WindowState>} */
const updated = new Map();
for (const country of allCountries) {
const count = counts24h.get(country) ?? 0;
const prior = safePrior.get(country) ?? null;
const ws = updateWindow(country, count, prior);
updated.set(country, ws);
}
return updated;
}
/**
* @param {Map<string, WindowState>} windows
* @returns {Map<string, { risk24h: number, zscore: number, velocitySpike: boolean, region: string }>}
*/
export function computeRisk24h(windows) {
/** @type {Map<string, { risk24h: number, zscore: number, velocitySpike: boolean, region: string }>} */
const result = new Map();
for (const [country, state] of windows) {
if (state.window.length < MIN_WINDOW) {
result.set(country, { risk24h: 0, zscore: 0, velocitySpike: false, region: country });
continue;
}
const zscore = state.stddev > 0 ? (state.ema - state.mean) / state.stddev : 0;
const risk24h = Math.min(100, Math.max(0, Math.round(50 + zscore * 20)));
const velocitySpike = risk24h >= 75;
result.set(country, { risk24h, zscore, velocitySpike, region: country });
}
return result;
}

View File

@@ -9,6 +9,7 @@ import { tagRegions } from './_prediction-scoring.mjs';
import { resolveR2StorageConfig, putR2JsonObject, getR2JsonObject } from './_r2-storage.mjs';
import { extractFirstJsonObject, extractFirstJsonArray, cleanJsonText } from './_llm-json.mjs';
import { loadTickerSet } from './_ticker-validation.mjs';
import { computeEmaWindows, computeRisk24h } from './_ema-threat-engine.mjs';
const _isDirectRun = process.argv[1] && import.meta.url.endsWith(process.argv[1].replace(/\\/g, '/'));
if (_isDirectRun) loadEnvFile(import.meta.url);
@@ -678,6 +679,8 @@ async function readInputKeys() {
MARKET_INPUT_KEYS.bisPolicy,
MARKET_INPUT_KEYS.shippingRates,
MARKET_INPUT_KEYS.correlationCards,
'conflict:acled:v1:all:0:0',
'conflict:ema-windows:v1',
...fredKeys,
];
const pipeline = keys.map(k => ['GET', k]);
@@ -728,6 +731,12 @@ async function readInputKeys() {
bisPolicyRates: parsedByKey[MARKET_INPUT_KEYS.bisPolicy],
shippingRates: parsedByKey[MARKET_INPUT_KEYS.shippingRates],
correlationCards: parsedByKey[MARKET_INPUT_KEYS.correlationCards],
acledEvents: (() => {
const raw = parsedByKey['conflict:acled:v1:all:0:0'];
if (!raw) return [];
return Array.isArray(raw) ? raw : (raw?.events ?? []);
})(),
emaWindowsRaw: results[keys.indexOf('conflict:ema-windows:v1')]?.result ?? null,
fredSeries,
};
}
@@ -893,7 +902,7 @@ function extractCiiScores(inputs) {
return arr.map(normalizeCiiEntry);
}
function detectConflictScenarios(inputs) {
function detectConflictScenarios(inputs, emaRiskScores) {
const predictions = [];
const scores = extractCiiScores(inputs);
const theaters = inputs.theaterPosture?.theaters || [];
@@ -932,7 +941,19 @@ function detectConflictScenarios(inputs) {
const ciiNorm = normalize(c.score, 50, 100);
const eventBoost = (matchingIran.length + matchingUcdp.length) > 0 ? 0.1 : 0;
const prob = Math.min(0.9, ciiNorm * 0.6 + eventBoost + (c.trend === 'rising' ? 0.1 : 0));
let prob = Math.min(0.9, ciiNorm * 0.6 + eventBoost + (c.trend === 'rising' ? 0.1 : 0));
const emaRisk = emaRiskScores?.get(countryName ?? '');
if (emaRisk?.velocitySpike) {
signals.push({
type: 'velocity_spike',
value: `EMA z-score: ${emaRisk.zscore.toFixed(1)} (${emaRisk.risk24h}/100 risk)`,
weight: 0.35,
});
prob = Math.min(0.99, prob + 0.08);
sourceCount++;
}
const confidence = Math.max(0.3, normalize(sourceCount, 0, 4));
predictions.push(makePrediction(
@@ -1757,7 +1778,7 @@ function detectInfraScenarios(inputs) {
}
// ── Phase 4: Standalone detectors ───────────────────────────
function detectUcdpConflictZones(inputs) {
function detectUcdpConflictZones(inputs, emaRiskScores) {
const predictions = [];
const ucdp = Array.isArray(inputs.ucdpEvents) ? inputs.ucdpEvents : inputs.ucdpEvents?.events || [];
if (ucdp.length === 0) return predictions;
@@ -1771,12 +1792,24 @@ function detectUcdpConflictZones(inputs) {
for (const [country, count] of Object.entries(byCountry)) {
if (count < 10) continue;
const signals = [{ type: 'ucdp', value: `${count} UCDP conflict events`, weight: 0.5 }];
let prob = Math.min(0.85, normalize(count, 5, 100) * 0.7);
const emaRisk = emaRiskScores?.get(country?.toLowerCase?.() ?? '');
if (emaRisk?.velocitySpike) {
signals.push({
type: 'velocity_spike',
value: `EMA z-score: ${emaRisk.zscore.toFixed(1)} (${emaRisk.risk24h}/100 risk)`,
weight: 0.35,
});
prob = Math.min(0.99, prob + 0.08);
}
predictions.push(makePrediction(
'conflict', country,
`Active armed conflict: ${country}`,
Math.min(0.85, normalize(count, 5, 100) * 0.7),
0.3, '30d',
[{ type: 'ucdp', value: `${count} UCDP conflict events`, weight: 0.5 }],
prob, 0.3, '30d', signals,
));
}
return predictions;
@@ -14753,6 +14786,38 @@ async function enrichScenariosWithLLM(predictions) {
return enrichmentMeta;
}
async function updateEmaWindows(inputs, url, token) {
let priorWindows = new Map();
try {
const raw = inputs.emaWindowsRaw;
if (raw) {
const parsed = JSON.parse(raw);
priorWindows = new Map(Object.entries(parsed));
}
} catch { /* cold start */ }
const ucdpEvents = Array.isArray(inputs.ucdpEvents) ? inputs.ucdpEvents : (inputs.ucdpEvents?.events ?? []);
const acledEvents = inputs.acledEvents ?? [];
const updatedWindows = computeEmaWindows(priorWindows, acledEvents, ucdpEvents);
const riskScores = computeRisk24h(updatedWindows);
const windowsObj = Object.fromEntries(updatedWindows);
const ttl = 26 * 3600;
await redisCommand(url, token, ['SET', 'conflict:ema-windows:v1', JSON.stringify(windowsObj), 'EX', ttl])
.catch(err => console.warn(` [EMA] Failed to persist windows: ${err.message}`));
await redisCommand(url, token, ['SET', 'seed-meta:conflict:ema-windows:v1', JSON.stringify({ fetchedAt: new Date().toISOString(), recordCount: updatedWindows.size }), 'EX', ttl])
.catch(err => console.warn(` [EMA] Failed to persist seed-meta: ${err.message}`));
const spikeCount = [...riskScores.values()].filter(r => r.velocitySpike).length;
if (spikeCount > 0) {
console.log(` [EMA] ${spikeCount} velocity spike(s) detected:`,
[...riskScores.entries()].filter(([, v]) => v.velocitySpike).map(([k, v]) => `${k}(${v.risk24h})`).join(', '));
}
return riskScores;
}
// ── Main pipeline ──────────────────────────────────────────
async function fetchForecasts() {
await warmPingChokepoints();
@@ -14775,14 +14840,16 @@ async function fetchForecasts() {
const prior = await readPriorPredictions();
console.log(' Running domain detectors...');
const { url: emaUrl, token: emaToken } = getRedisCredentials();
const emaRiskScores = await updateEmaWindows(inputs, emaUrl, emaToken);
const predictions = [
...detectConflictScenarios(inputs),
...detectConflictScenarios(inputs, emaRiskScores),
...detectMarketScenarios(inputs),
...detectSupplyChainScenarios(inputs),
...detectPoliticalScenarios(inputs),
...detectMilitaryScenarios(inputs),
...detectInfraScenarios(inputs),
...detectUcdpConflictZones(inputs),
...detectUcdpConflictZones(inputs, emaRiskScores),
...detectCyberScenarios(inputs),
...detectGpsJammingScenarios(inputs),
...detectFromPredictionMarkets(inputs),