mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-05-13 10:36:21 +02:00
* feat(forecast): add AI Forecasts prediction module (Pro-tier)
MiroFish-inspired prediction engine that generates structured forecasts
across 6 domains (conflict, market, supply chain, political, military,
infrastructure) using existing WorldMonitor data streams.
- Proto definitions for ForecastService with GetForecasts RPC
- Dedicated seed script (seed-forecasts.mjs) with 6 domain detectors,
cross-domain cascade resolver, prediction market calibration, and
trend detection via prior snapshot comparison
- Premium-gated RPC handler (PREMIUM_RPC_PATHS enforcement)
- Lazy-loaded ForecastPanel with domain filters, probability bars,
trend arrows, signal evidence, and cascade links
- Health monitoring integration (seed-meta freshness tracking)
- Refresh scheduler with API key guard
* test(forecast): add 47 unit tests for forecast detectors and utilities
Covers forecastId, normalize, resolveCascades, calibrateWithMarkets,
computeTrends, and smoke tests for all 6 domain detectors. Exports
testable functions from seed script with direct-run guard.
* fix(forecast): domain mismatch 'infra' vs 'infrastructure', add panel category
- Seed script used 'infra' but ForecastPanel filtered on 'infrastructure',
causing Infra tab to show zero results
- Added 'forecast' to intelligence category in PANEL_CATEGORY_MAP
* fix(forecast): move CSS to one-time injection, improve type safety
- P2: Move style block from setContent to one-time document.head injection
to prevent CSS accumulation on repeated renders
- P3: Replace +toFixed(3) with Math.round for readability in seed script
- P3: Use Forecast type instead of any[] in RPC handler filter
* fix(forecast): handle sebuf proto data shapes from Redis
Detectors now normalize CII scores from server-side proto format
(combinedScore, TREND_DIRECTION_RISING, region) to uniform shape.
Outage severity handles proto enum format (SEVERITY_LEVEL_HIGH).
Added confidence floor of 0.3 for single-source predictions.
Verified against live Redis: 2 predictions generated (Iran infra
shutdown, IL political instability).
* feat(forecast): unlock AI Forecasts on web, lock desktop only (trial)
- Remove forecast RPC from PREMIUM_RPC_PATHS (web access is free)
- Panel locked on desktop only (same as oref-sirens/telegram-intel)
- Remove API key guards from data-loader and refresh scheduler
- Web users get full access during trial period
* chore: regenerate proto types with make generate
Re-ran make generate after rebasing on main. Plugin v0.7.0 dropped
@ts-nocheck from output, added it back to all 50 generated files.
Fixed 4 type errors from proto codegen changes:
- MarketSource enum -> string union type
- TemporalAnomalyProto -> TemporalAnomaly rename
- webcam lastUpdated number -> string
* chore: add proto freshness check to pre-push hook
Runs make generate before push and compares checksums of generated files.
If proto types are stale, blocks push with instructions to regenerate.
Skips gracefully if buf CLI is not installed.
* fix(forecast): use chokepoints v4 key, include ciiContribution in unrest
- P1: Switch chokepoints input from stale v2 to active v4 Redis key,
matching bootstrap.js and cache-keys.ts
- P2: Add ciiContribution to unrest component fallback chain in
normalizeCiiEntry so political detector reads the correct sebuf field
* feat(forecast): Phase 2 LLM scenario enrichment + confidence model
MiroFish-inspired enhancements:
- LLM scenario narratives via Groq/OpenRouter (narrative-only, no numeric
adjustment). Evidence-grounded prompts with mandatory signal citation
and few-shot examples from MiroFish's SECTION_SYSTEM_PROMPT_TEMPLATE.
- Top-4 predictions batched into single LLM call for cost efficiency.
- News context from newsInsights attached to all predictions for LLM
prompt grounding (NOT in signals, cannot affect confidence).
- Deterministic confidence model: source diversity via SIGNAL_TO_SOURCE
mapping (deduplicates cii+cii_delta, theater+indicators) + calibration
agreement from prediction market drift. Floor 0.2, ceiling 1.0.
- Output validation: rejects scenarios without signal references.
- Truncated JSON repair for small model output.
- Structured JSON logging for LLM calls.
- Redis cache for LLM scenarios (1h TTL).
- 23 new tests (70 total), all passing.
- Live-tested: OpenRouter gemini-2.5-flash produces evidence-grounded
scenario narratives from real WorldMonitor data.
* feat(forecast): Phase 3 multi-perspective scenarios, projections, data-driven cascades
MiroFish-inspired enhancements:
- Multi-perspective LLM analysis: top-2 predictions get strategic,
regional, and contrarian viewpoints via combined LLM call
- Probability projections: domain-specific decay curves (h24/d7/d30)
anchored to timeHorizon so probability equals projections[timeHorizon]
- Data-driven cascade rules: moved from hardcoded array to JSON config
(scripts/data/cascade-rules.json) with schema validation, named
predicate evaluators, unknown key rejection, and fallback to defaults
- 4 new cascade paths: infrastructure->supply_chain, infrastructure->market
(both requiresSeverity:total), conflict->political, political->market
- Proto: added Perspectives and Projections messages to Forecast
- ForecastPanel: renders projections row and conditional perspectives toggle
- 89 tests (19 new), all passing
- Live-tested: OpenRouter produces perspectives from real data
* feat(forecast): Phase 4 data utilization + entity graph
Fixes data gaps that prevented 4 of 6 detectors from firing:
- Input normalizers: chokepoint v4 shape + GPS hexes-to-zones mapping
- Chokepoint warm-ping (production-only, requires WM_API_BASE_URL)
- Lowered CII conflict threshold from 70 to 60, gated on level=high|critical
4 new standalone detectors:
- UCDP conflict zones (10+ events per country)
- Cyber threat concentration (5+ threats per country)
- GPS jamming in maritime shipping zones (5 regions)
- Prediction markets as signals (60-90% probability markets)
Entity-relationship graph (file-based, 38 nodes):
- Countries, theaters, commodities, chokepoints, alliances
- Alias table resolves both ISO codes and display names
- Graph cascade discovery links predictions across entities
Result: 51 predictions (up from 1-2), spanning conflict, infrastructure,
and supply chain domains. 112 tests, all passing.
* fix(forecast): redis cache format, signal source mapping, type safety
Fresh-eyes audit fixes:
- BUG: redisSet used wrong Upstash API format (POST body with {value,ex}
instead of command array ['SET',key,value,'EX',ttl]). LLM cache writes
were silently failing, causing fresh LLM calls every run.
- BUG: prediction_market signal type missing from SIGNAL_TO_SOURCE,
inflating confidence for market-derived predictions.
- CLEANUP: Remove unnecessary (f as any) casts in ForecastPanel since
generated Forecast type already has projections/perspectives fields.
- CLEANUP: Bump health maxStaleMin from 60 to 90 to avoid false STALE
alerts when LLM calls add latency to seed runs.
* feat(forecast): headline-entity matching with news corroboration signals
Uses entity graph aliases to match headlines to predictions by
country/theater (excludes commodity/infrastructure nodes to prevent
false positives). Predictions with matching headlines get a
news_corroboration signal visible in the panel.
Also fixes buildUserPrompt to merge unique headlines from ALL
predictions in the LLM batch (was only reading preds[0].newsContext).
Live-tested: 13 of 51 predictions now have corroborating headlines
(Iran, Israel, Syria, Ukraine, etc). 116 tests, all passing.
* feat(forecast): add country-codes.json for headline-entity matching
56 countries with ISO codes, full names, and scoring keywords (extracted
from src/config/countries.ts + UCDP-relevant additions). Used by
attachNewsContext for richer headline matching via getSearchTermsForRegion
which combines country-codes + entity graph + keyword aliases.
14/57 predictions now have news corroboration (limited by headline
coverage, not matching quality: only 8 headlines currently available).
* feat(forecast): read 300 headlines from news digest instead of 8
Read news:digest:v1:full:en (300 headlines across 16 categories) instead
of just news:insights:v1 topStories (8 headlines). Fallback to topStories
if digest is unavailable.
Result: news corroboration jumped from 25% to 64% (38/59 predictions).
* fix(forecast): handle parenthetical country names in headline matching
Strip suffixes like '(Zaire)', '(Burma)', '(Soviet Union)' from UCDP
region names before matching against country-codes.json. Also use
includes() for reverse name lookup to catch partial matches.
Corroboration: 64% -> 69% (41/59). Remaining 18 unmatched are countries
with no current English-language news coverage.
* fix(forecast): cache validated LLM output, add digest test, log cache errors
Fresh-eyes audit fixes:
- Combined LLM cache now stores only validated items (was caching raw
unvalidated output, serving potentially invalid scenarios on cache hit)
- redisSet logs warnings on failure (was silently swallowing all errors)
- Added digest-based test for attachNewsContext (primary path was untested)
- Fixed test arity: attachNewsContext(preds, news, digest) with 3 params
* fix(forecast): remove dead confidenceFromSources, reduce warm-ping timeout
- P2: Remove confidenceFromSources (dead code, computeConfidence overwrites
all initial confidence values). Inline the formula in original detectors.
- P3: Reduce warm-ping timeout from 30s to 15s (non-critical step)
- P3: Add trial status comment on forecast panel config
* fix(forecast): resolve ISO codes to country names, fix market detector, safe pre-push
P1 fixes from code review:
- CII ISO codes (IL, IR) now resolved to full country names (Israel, Iran)
via country-codes.json. Prevents substring false positives (IL matching
Chile) in event correlation. Uses word-boundary regex for matching.
- Market detector CII-to-theater mapping now uses entity graph traversal
instead of broken theater-name substring matching. Iran correctly maps
to Middle East theater via graph links.
- Pre-push hook no longer runs destructive git checkout on proto freshness
failure. Reports mismatch and exits without modifying worktree.
* feat(forecast): add structured scenario pipeline and trace export
* fix(forecast): hydrate bootstrap and trim generated drift
* fix(forecast): keep required supply-chain contract updates
* fix(ci): add forecasts to cache-keys registry and regenerate proto
Add forecasts entry to BOOTSTRAP_CACHE_KEYS and BOOTSTRAP_TIERS in
cache-keys.ts to match api/bootstrap.js. Regenerate SupplyChain proto
to fix duplicate TransitDayCount and add riskSummary/riskReportAction.
368 lines
13 KiB
JavaScript
368 lines
13 KiB
JavaScript
#!/usr/bin/env node
|
|
|
|
import { readFileSync, existsSync } from 'node:fs';
|
|
import { dirname, join } from 'node:path';
|
|
import { fileURLToPath } from 'node:url';
|
|
|
|
const CHROME_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36';
|
|
const MAX_PAYLOAD_BYTES = 5 * 1024 * 1024; // 5MB per key
|
|
|
|
const __seed_dirname = dirname(fileURLToPath(import.meta.url));
|
|
|
|
export { CHROME_UA };
|
|
|
|
export function loadSharedConfig(filename) {
|
|
for (const base of [join(__seed_dirname, '..', 'shared'), join(__seed_dirname, 'shared')]) {
|
|
const p = join(base, filename);
|
|
if (existsSync(p)) return JSON.parse(readFileSync(p, 'utf8'));
|
|
}
|
|
throw new Error(`Cannot find shared/${filename} — checked ../shared/ and ./shared/`);
|
|
}
|
|
|
|
export function loadEnvFile(metaUrl) {
|
|
const __dirname = metaUrl ? dirname(fileURLToPath(metaUrl)) : process.cwd();
|
|
const candidates = [
|
|
join(__dirname, '..', '.env.local'),
|
|
join(__dirname, '..', '..', '.env.local'),
|
|
];
|
|
if (process.env.HOME) {
|
|
candidates.push(join(process.env.HOME, 'Documents/GitHub/worldmonitor', '.env.local'));
|
|
}
|
|
for (const envPath of candidates) {
|
|
if (!existsSync(envPath)) continue;
|
|
const lines = readFileSync(envPath, 'utf8').split('\n');
|
|
for (const line of lines) {
|
|
const trimmed = line.trim();
|
|
if (!trimmed || trimmed.startsWith('#')) continue;
|
|
const eqIdx = trimmed.indexOf('=');
|
|
if (eqIdx === -1) continue;
|
|
const key = trimmed.slice(0, eqIdx).trim();
|
|
let val = trimmed.slice(eqIdx + 1).trim();
|
|
if ((val.startsWith('"') && val.endsWith('"')) || (val.startsWith("'") && val.endsWith("'"))) {
|
|
val = val.slice(1, -1);
|
|
}
|
|
if (!process.env[key]) process.env[key] = val;
|
|
}
|
|
return;
|
|
}
|
|
}
|
|
|
|
export function maskToken(token) {
|
|
if (!token || token.length < 8) return '***';
|
|
return token.slice(0, 4) + '***' + token.slice(-4);
|
|
}
|
|
|
|
export function getRedisCredentials() {
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) {
|
|
console.error('Missing UPSTASH_REDIS_REST_URL or UPSTASH_REDIS_REST_TOKEN');
|
|
process.exit(1);
|
|
}
|
|
return { url, token };
|
|
}
|
|
|
|
async function redisCommand(url, token, command) {
|
|
const resp = await fetch(url, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(command),
|
|
signal: AbortSignal.timeout(15_000),
|
|
});
|
|
if (!resp.ok) {
|
|
const text = await resp.text().catch(() => '');
|
|
throw new Error(`Redis command failed: HTTP ${resp.status} — ${text.slice(0, 200)}`);
|
|
}
|
|
return resp.json();
|
|
}
|
|
|
|
async function redisGet(url, token, key) {
|
|
const resp = await fetch(`${url}/get/${encodeURIComponent(key)}`, {
|
|
headers: { Authorization: `Bearer ${token}` },
|
|
signal: AbortSignal.timeout(5_000),
|
|
});
|
|
if (!resp.ok) return null;
|
|
const data = await resp.json();
|
|
return data.result ? JSON.parse(data.result) : null;
|
|
}
|
|
|
|
async function redisSet(url, token, key, value, ttlSeconds) {
|
|
const payload = JSON.stringify(value);
|
|
const cmd = ttlSeconds
|
|
? ['SET', key, payload, 'EX', ttlSeconds]
|
|
: ['SET', key, payload];
|
|
return redisCommand(url, token, cmd);
|
|
}
|
|
|
|
async function redisDel(url, token, key) {
|
|
return redisCommand(url, token, ['DEL', key]);
|
|
}
|
|
|
|
export async function acquireLock(domain, runId, ttlMs) {
|
|
const { url, token } = getRedisCredentials();
|
|
const lockKey = `seed-lock:${domain}`;
|
|
const result = await redisCommand(url, token, ['SET', lockKey, runId, 'NX', 'PX', ttlMs]);
|
|
return result?.result === 'OK';
|
|
}
|
|
|
|
export async function releaseLock(domain, runId) {
|
|
const { url, token } = getRedisCredentials();
|
|
const lockKey = `seed-lock:${domain}`;
|
|
const script = `if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end`;
|
|
try {
|
|
await redisCommand(url, token, ['EVAL', script, 1, lockKey, runId]);
|
|
} catch {
|
|
// Best-effort release; lock will expire via TTL
|
|
}
|
|
}
|
|
|
|
export async function atomicPublish(canonicalKey, data, validateFn, ttlSeconds) {
|
|
const { url, token } = getRedisCredentials();
|
|
const runId = String(Date.now());
|
|
const stagingKey = `${canonicalKey}:staging:${runId}`;
|
|
|
|
const payload = JSON.stringify(data);
|
|
const payloadBytes = Buffer.byteLength(payload, 'utf8');
|
|
if (payloadBytes > MAX_PAYLOAD_BYTES) {
|
|
throw new Error(`Payload too large: ${(payloadBytes / 1024 / 1024).toFixed(1)}MB > 5MB limit`);
|
|
}
|
|
|
|
if (validateFn) {
|
|
const valid = validateFn(data);
|
|
if (!valid) {
|
|
return { payloadBytes: 0, skipped: true };
|
|
}
|
|
}
|
|
|
|
// Write to staging key
|
|
await redisSet(url, token, stagingKey, data, 300); // 5 min staging TTL
|
|
|
|
// Overwrite canonical key
|
|
if (ttlSeconds) {
|
|
await redisCommand(url, token, ['SET', canonicalKey, payload, 'EX', ttlSeconds]);
|
|
} else {
|
|
await redisCommand(url, token, ['SET', canonicalKey, payload]);
|
|
}
|
|
|
|
// Cleanup staging
|
|
await redisDel(url, token, stagingKey).catch(() => {});
|
|
|
|
return { payloadBytes, recordCount: Array.isArray(data) ? data.length : null };
|
|
}
|
|
|
|
export async function writeFreshnessMetadata(domain, resource, count, source) {
|
|
const { url, token } = getRedisCredentials();
|
|
const metaKey = `seed-meta:${domain}:${resource}`;
|
|
const meta = {
|
|
fetchedAt: Date.now(),
|
|
recordCount: count,
|
|
sourceVersion: source || '',
|
|
};
|
|
await redisSet(url, token, metaKey, meta, 86400 * 7); // 7 day TTL on metadata
|
|
return meta;
|
|
}
|
|
|
|
export async function withRetry(fn, maxRetries = 3, delayMs = 1000) {
|
|
let lastErr;
|
|
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
|
try {
|
|
return await fn();
|
|
} catch (err) {
|
|
lastErr = err;
|
|
if (attempt < maxRetries) {
|
|
const wait = delayMs * 2 ** attempt;
|
|
const cause = err.cause ? ` (cause: ${err.cause.message || err.cause.code || err.cause})` : '';
|
|
console.warn(` Retry ${attempt + 1}/${maxRetries} in ${wait}ms: ${err.message || err}${cause}`);
|
|
await new Promise(r => setTimeout(r, wait));
|
|
}
|
|
}
|
|
}
|
|
throw lastErr;
|
|
}
|
|
|
|
export function logSeedResult(domain, count, durationMs, extra = {}) {
|
|
console.log(JSON.stringify({
|
|
event: 'seed_complete',
|
|
domain,
|
|
recordCount: count,
|
|
durationMs: Math.round(durationMs),
|
|
timestamp: new Date().toISOString(),
|
|
...extra,
|
|
}));
|
|
}
|
|
|
|
export async function verifySeedKey(key) {
|
|
const { url, token } = getRedisCredentials();
|
|
const data = await redisGet(url, token, key);
|
|
return data;
|
|
}
|
|
|
|
export async function writeExtraKey(key, data, ttl) {
|
|
const { url, token } = getRedisCredentials();
|
|
const payload = JSON.stringify(data);
|
|
const resp = await fetch(url, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(['SET', key, payload, 'EX', ttl]),
|
|
signal: AbortSignal.timeout(10_000),
|
|
});
|
|
if (!resp.ok) throw new Error(`Extra key ${key}: write failed (HTTP ${resp.status})`);
|
|
console.log(` Extra key ${key}: written`);
|
|
}
|
|
|
|
export async function writeExtraKeyWithMeta(key, data, ttl, recordCount, metaKeyOverride) {
|
|
await writeExtraKey(key, data, ttl);
|
|
const { url, token } = getRedisCredentials();
|
|
const metaKey = metaKeyOverride || `seed-meta:${key.replace(/:v\d+$/, '')}`;
|
|
const meta = { fetchedAt: Date.now(), recordCount: recordCount ?? 0 };
|
|
const resp = await fetch(url, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(['SET', metaKey, JSON.stringify(meta), 'EX', 86400 * 7]),
|
|
signal: AbortSignal.timeout(5_000),
|
|
});
|
|
if (!resp.ok) console.warn(` seed-meta ${metaKey}: write failed`);
|
|
}
|
|
|
|
export async function extendExistingTtl(keys, ttlSeconds = 600) {
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) {
|
|
console.error(' Cannot extend TTL: missing Redis credentials');
|
|
return;
|
|
}
|
|
try {
|
|
const pipeline = keys.map(k => ['EXPIRE', k, ttlSeconds]);
|
|
const resp = await fetch(`${url}/pipeline`, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(pipeline),
|
|
signal: AbortSignal.timeout(10_000),
|
|
});
|
|
if (resp.ok) {
|
|
console.log(` Extended TTL on ${keys.length} existing key(s) (${ttlSeconds}s)`);
|
|
}
|
|
} catch (e) {
|
|
console.error(` TTL extension failed: ${e.message}`);
|
|
}
|
|
}
|
|
|
|
export function sleep(ms) {
|
|
return new Promise((r) => setTimeout(r, ms));
|
|
}
|
|
|
|
export function parseYahooChart(data, symbol) {
|
|
const result = data?.chart?.result?.[0];
|
|
const meta = result?.meta;
|
|
if (!meta) return null;
|
|
|
|
const price = meta.regularMarketPrice;
|
|
const prevClose = meta.chartPreviousClose || meta.previousClose || price;
|
|
const change = prevClose ? ((price - prevClose) / prevClose) * 100 : 0;
|
|
const closes = result.indicators?.quote?.[0]?.close;
|
|
const sparkline = Array.isArray(closes) ? closes.filter((v) => v != null) : [];
|
|
|
|
return { symbol, name: symbol, display: symbol, price, change: +change.toFixed(2), sparkline };
|
|
}
|
|
|
|
export async function runSeed(domain, resource, canonicalKey, fetchFn, opts = {}) {
|
|
const { validateFn, ttlSeconds, lockTtlMs = 120_000, extraKeys, afterPublish } = opts;
|
|
const runId = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
|
const startMs = Date.now();
|
|
|
|
console.log(`=== ${domain}:${resource} Seed ===`);
|
|
console.log(` Run ID: ${runId}`);
|
|
console.log(` Key: ${canonicalKey}`);
|
|
|
|
// Acquire lock
|
|
const locked = await acquireLock(`${domain}:${resource}`, runId, lockTtlMs);
|
|
if (!locked) {
|
|
console.log(' SKIPPED: another seed run in progress');
|
|
process.exit(0);
|
|
}
|
|
|
|
// Phase 1: Fetch data (graceful on failure — extend TTL on stale data)
|
|
let data;
|
|
try {
|
|
data = await withRetry(fetchFn);
|
|
} catch (err) {
|
|
await releaseLock(`${domain}:${resource}`, runId);
|
|
const durationMs = Date.now() - startMs;
|
|
const cause = err.cause ? ` (cause: ${err.cause.message || err.cause.code || err.cause})` : '';
|
|
console.error(` FETCH FAILED: ${err.message || err}${cause}`);
|
|
|
|
const ttl = ttlSeconds || 600;
|
|
const keys = [canonicalKey];
|
|
if (extraKeys) keys.push(...extraKeys.map(ek => ek.key));
|
|
await extendExistingTtl(keys, ttl);
|
|
|
|
console.log(`\n=== Failed gracefully (${Math.round(durationMs)}ms) ===`);
|
|
process.exit(0);
|
|
}
|
|
|
|
// Phase 2: Publish to Redis (rethrow on failure — data was fetched but not stored)
|
|
try {
|
|
const publishResult = await atomicPublish(canonicalKey, data, validateFn, ttlSeconds);
|
|
if (publishResult.skipped) {
|
|
const durationMs = Date.now() - startMs;
|
|
console.log(` SKIPPED: validation failed (empty data) — preserving existing cache`);
|
|
console.log(`\n=== Done (${Math.round(durationMs)}ms, no write) ===`);
|
|
await releaseLock(`${domain}:${resource}`, runId);
|
|
process.exit(0);
|
|
}
|
|
const { payloadBytes } = publishResult;
|
|
const topicArticleCount = Array.isArray(data?.topics)
|
|
? data.topics.reduce((n, t) => n + (t?.articles?.length || t?.events?.length || 0), 0)
|
|
: undefined;
|
|
const recordCount = opts.recordCount != null
|
|
? (typeof opts.recordCount === 'function' ? opts.recordCount(data) : opts.recordCount)
|
|
: Array.isArray(data) ? data.length
|
|
: (topicArticleCount
|
|
?? data?.predictions?.length
|
|
?? data?.events?.length ?? data?.earthquakes?.length ?? data?.outages?.length
|
|
?? data?.fireDetections?.length ?? data?.anomalies?.length ?? data?.threats?.length
|
|
?? data?.quotes?.length ?? data?.stablecoins?.length
|
|
?? data?.cables?.length ?? 0);
|
|
|
|
// Write extra keys (e.g., bootstrap hydration keys)
|
|
if (extraKeys) {
|
|
for (const ek of extraKeys) {
|
|
await writeExtraKey(ek.key, ek.transform ? ek.transform(data) : data, ek.ttl || ttlSeconds);
|
|
}
|
|
}
|
|
|
|
if (afterPublish) {
|
|
await afterPublish(data, { canonicalKey, ttlSeconds, recordCount, runId });
|
|
}
|
|
|
|
const meta = await writeFreshnessMetadata(domain, resource, recordCount, opts.sourceVersion);
|
|
|
|
const durationMs = Date.now() - startMs;
|
|
logSeedResult(domain, recordCount, durationMs, { payloadBytes });
|
|
|
|
// Verify (best-effort: write already succeeded, don't fail the job on transient read issues)
|
|
let verified = false;
|
|
for (let attempt = 0; attempt < 2; attempt++) {
|
|
try {
|
|
verified = !!(await verifySeedKey(canonicalKey));
|
|
if (verified) break;
|
|
if (attempt === 0) await new Promise(r => setTimeout(r, 500));
|
|
} catch {
|
|
if (attempt === 0) await new Promise(r => setTimeout(r, 500));
|
|
}
|
|
}
|
|
if (verified) {
|
|
console.log(` Verified: data present in Redis`);
|
|
} else {
|
|
console.warn(` WARNING: verification read returned null for ${canonicalKey} (write succeeded, may be transient)`);
|
|
}
|
|
|
|
console.log(`\n=== Done (${Math.round(durationMs)}ms) ===`);
|
|
await releaseLock(`${domain}:${resource}`, runId);
|
|
process.exit(0);
|
|
} catch (err) {
|
|
await releaseLock(`${domain}:${resource}`, runId);
|
|
throw err;
|
|
}
|
|
}
|