mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
* feat(seed-contract): PR 2a — runSeed envelope dual-write + 91 seeders migrated
Opt-in contract path in runSeed: when opts.declareRecords is provided, write
{_seed, data} envelope to the canonical key alongside legacy seed-meta:*
(dual-write). State machine: OK / OK_ZERO / RETRY with zeroIsValid opt.
declareRecords throws or returns non-integer → hard fail (contract violation).
extraKeys[*] support per-key declareRecords; each extra key writes its own
envelope. Legacy seeders (no declareRecords) entirely unchanged.
Migrated all 91 scripts/seed-*.mjs to contract mode. Each exports
declareRecords returning the canonical record count, and passes
schemaVersion: 1 + maxStaleMin (matched to api/health.js SEED_META, or 2.5x
interval where no registry entry exists). Contract conformance reports 84/86
seeders with full descriptor (2 pre-existing warnings).
Legacy seed-meta keys still written so unmigrated readers keep working;
follow-up slices flip health.js + readers to envelope-first.
Tests: 61/61 PR 1 tests still pass.
Next slices for PR 2:
- api/health.js registry collapse + 15 seed-bundle-*.mjs canonicalKey wiring
- reader migration (mcp, resilience, aviation, displacement, regional-snapshot)
- direct writers — ais-relay.cjs, consumer-prices-core publish.ts
- public-boundary stripSeedEnvelope + test migration
Plan: docs/plans/2026-04-14-002-fix-runseed-zero-record-lockout-plan.md
* fix(seed-contract): unwrap envelopes in internal cross-seed readers
After PR 2a enveloped 91 canonical keys as {_seed, data}, every script-side
reader that returned the raw parsed JSON started silently handing callers the
envelope instead of the bare payload. WoW baselines (bigmac, grocery-basket,
fear-greed) saw undefined .countries / .composite; seed-climate-anomalies saw
undefined .normals from climate:zone-normals:v1; seed-thermal-escalation saw
undefined .fireDetections from wildfire:fires:v1; seed-forecasts' ~40-key
pipeline batch returned envelopes for every input.
Fix: route every script-side reader through unwrapEnvelope(...).data. Legacy
bare-shape values pass through unchanged (unwrapEnvelope returns
{_seed: null, data: raw} for any non-envelope shape).
Changed:
- scripts/_seed-utils.mjs: import unwrapEnvelope; redisGet, readSeedSnapshot,
verifySeedKey all unwrap. Exported new readCanonicalValue() helper for
cross-seed consumers.
- 18 seed-*.mjs scripts with local redisGet-style helpers or inline fetch
patched to unwrap via the envelope source module (subagent sweep).
- scripts/seed-forecasts.mjs pipeline batch: parse() unwraps each result.
- scripts/seed-energy-spine.mjs redisMget: unwraps each result.
Tests:
- tests/seed-utils-envelope-reads.test.mjs: 7 new cases covering envelope
+ legacy + null paths for readSeedSnapshot and verifySeedKey.
- Full seed suite: 67/67 pass (was 61, +6 new).
Addresses both of user's P1 findings on PR #3097.
* feat(seed-contract): envelope-aware reads in server + api helpers
Every RPC and public-boundary reader now automatically strips _seed from
contract-mode canonical keys. Legacy bare-shape values pass through unchanged
(unwrapEnvelope no-ops on non-envelope shapes).
Changed helpers (one-place fix — unblocks ~60 call sites):
- server/_shared/redis.ts: getRawJson, getCachedJson, getCachedJsonBatch
unwrap by default. cachedFetchJson inherits via getCachedJson.
- api/_upstash-json.js: readJsonFromUpstash unwraps (covers api/mcp.ts
tool responses + all its canonical-key reads).
- api/bootstrap.js: getCachedJsonBatch unwraps (public-boundary —
clients never see envelope metadata).
Left intentionally unchanged:
- api/health.js / api/seed-health.js: read only seed-meta:* keys which
remain bare-shape during dual-write. unwrapEnvelope already imported at
the meta-read boundary (PR 1) as a defensive no-op.
Tests: 67/67 seed tests pass. typecheck + typecheck:api clean.
This is the blast-radius fix the PR #3097 review called out — external
readers that would otherwise see {_seed, data} after the writer side
migrated.
* fix(test): strip export keyword in vm.runInContext'd seed source
cross-source-signals-regulatory.test.mjs loads scripts/seed-cross-source-signals.mjs
via vm.runInContext, which cannot parse ESM `export` syntax. PR 2a added
`export function declareRecords` to every seeder, which broke this test's
static-analysis approach.
Fix: strip the `export` keyword from the declareRecords line in the
preprocessed source string so the function body still evaluates as a plain
declaration.
Full test:data suite: 5307/5307 pass. typecheck + typecheck:api clean.
* feat(seed-contract): consumer-prices publish.ts writes envelopes
Wrap the 5 canonical keys written by consumer-prices-core/src/jobs/publish.ts
(overview, movers:7d/30d, freshness, categories:7d/30d/90d, retailer-spread,
basket-series) in {_seed, data} envelopes. Legacy seed-meta:<key> writes
preserved for dual-write.
Inlined a buildEnvelope helper (10 lines) rather than taking a cross-package
dependency — consumer-prices-core is a standalone npm package. Documented the
four-file parity contract (mjs source, ts mirror, js edge mirror, this copy).
Contract fields: sourceVersion='consumer-prices-core-publish-v1', schemaVersion=1,
state='OK' (recordCount>0) or 'OK_ZERO' (legitimate zero).
Typecheck: no new errors in publish.ts.
* fix(seed-contract): 3 more server-side readers unwrap envelopes
Found during final audit:
- server/worldmonitor/resilience/v1/_shared.ts: resilience score reader
parsed cached GetResilienceScoreResponse raw. Contract-mode seed-resilience-scores
now envelopes those keys.
- server/worldmonitor/resilience/v1/get-resilience-ranking.ts: p05/p95
interval lookup parsed raw from seed-resilience-scores' extra-key path.
- server/worldmonitor/infrastructure/v1/_shared.ts: mgetJson() used for
count-source keys (wildfire:fires:v1, news:insights:v1) which are both
contract-mode now.
All three now unwrap via server/_shared/seed-envelope. Legacy shapes pass
through unchanged.
Typecheck clean.
* feat(seed-contract): ais-relay.cjs direct writes produce envelopes
32 canonical-key write sites in scripts/ais-relay.cjs now produce {_seed, data}
envelopes. Inlined buildEnvelope() (CJS module can't require ESM source) +
envelopeWrite(key, data, ttlSeconds, meta) wrapper. Enveloped keys span market
bootstrap, aviation, cyber-threats, theater-posture, weather-alerts, economic
spending/fred/worldbank, tech-events, corridor-risk, usni-fleet, shipping-stress,
social:reddit, wsb-tickers, pizzint, product-catalog, chokepoint transits,
ucdp-events, satellites, oref.
Left bare (not seeded data keys): seed-meta:* (dual-write legacy),
classifyCacheKey LLM cache, notam:prev-closed-state internal state,
wm:notif:scan-dedup flags.
Updated tests/ucdp-seed-resilience.test.mjs regex to accept both upstashSet
(pre-contract) and envelopeWrite (post-contract) call patterns.
* feat(seed-contract): 15 bundle files add canonicalKey for envelope gate
54 bundle sections across 12 files now declare canonicalKey alongside the
existing seedMetaKey. _bundle-runner.mjs (from PR 1) prefers canonicalKey
when both are present — gates section runs on envelope._seed.fetchedAt
read directly from the data key, eliminating the meta-outlives-data class
of bugs.
Files touched:
- climate (5), derived-signals (2), ecb-eu (3), energy-sources (6),
health (2), imf-extended (4), macro (10), market-backup (9),
portwatch (4), relay-backup (2), resilience-recovery (5), static-ref (2)
Skipped (14 sections, 3 whole bundles): multi-key writers, dynamic
templated keys (displacement year-scoped), or non-runSeed orchestrators
(regional brief cron, resilience-scores' 222-country publish, validation/
benchmark scripts). These continue to use seedMetaKey or their own gate.
seedMetaKey preserved everywhere — dual-write. _bundle-runner.mjs falls
back to legacy when canonicalKey is absent.
All 15 bundles pass node --check. test:data: 5307/5307. typecheck:all: clean.
* fix(seed-contract): 4 PR #3097 review P1s — transform/declareRecords mismatches + envelope leaks
Addresses both P1 findings and the extra-key seed-meta leak surfaced in review:
1. runSeed helper-level invariant: seed-meta:* keys NEVER envelope.
scripts/_seed-utils.mjs exports shouldEnvelopeKey(key) — returns false for
any key starting with 'seed-meta:'. Both atomicPublish (canonical) and
writeExtraKey (extras) gate the envelope wrap through this helper. Fixes
seed-iea-oil-stocks' ANALYSIS_META_EXTRA_KEY silently getting enveloped,
which broke health.js parsing the value as bare {fetchedAt, recordCount}.
Also defends against any future manual writeExtraKey(..., envelopeMeta)
call that happens to target a seed-meta:* key.
2. seed-token-panels canonical + extras fixed.
publishTransform returns data.defi (the defi panel itself, shape {tokens}).
Old declareRecords counted data.defi.tokens + data.ai.tokens + data.other.tokens
on the transformed payload → 0 → RETRY path → canonical market:defi-tokens:v1
never wrote, and because runSeed returned before the extraKeys loop,
market:ai-tokens:v1 + market:other-tokens:v1 stayed stale too.
New: declareRecords counts data.tokens on the transformed shape. AI_KEY +
OTHER_KEY extras reuse the same function (transforms return structurally
identical panels). Added isMain guard so test imports don't fire runSeed.
3. api/product-catalog.js cached reader unwraps envelope.
ais-relay.cjs now envelopes product-catalog:v2 via envelopeWrite(). The
edge reader did raw JSON.parse(result) and returned {_seed, data} to
clients, breaking the cached path. Fix: import unwrapEnvelope from
./_seed-envelope.js, apply after JSON.parse. One site — :238-241 is
downstream of getFromCache(), so the single reader fix covers both.
4. Regression lock tests/seed-contract-transform-regressions.test.mjs (11 cases):
- shouldEnvelopeKey invariant: seed-meta:* false, canonical true
- Token-panels declareRecords works on transformed shape (canonical + both extras)
- Explicit repro of pre-fix buggy signature returning 0 — guards against revert
- resolveRecordCount accepts 0, rejects non-integer
- Product-catalog envelope unwrap returns bare shape; legacy passes through
Verification:
- npm run test:data → 5318/5318 pass (was 5307 — 11 new regressions)
- npm run typecheck:all → clean
- node --check on every modified script
iea-oil-stocks canonical declareRecords was NOT broken (user confirmed during
review — buildIndex preserves .members); only its ANALYSIS_META_EXTRA_KEY
was affected, now covered generically by commit 1's helper invariant.
* fix(seed-contract): seed-token-panels validateFn also runs on post-transform shape
Review finding: fixing declareRecords wasn't sufficient — atomicPublish() runs
validateFn(publishData) on the transformed payload too. seed-token-panels'
validate() checked data.defi/.ai/.other on the transformed {tokens} shape,
returned false, and runSeed took the early skipped-write branch (before even
reaching the declareRecords RETRY logic). Net effect: same as before the
declareRecords fix — canonical + both extras stayed stale.
Fix: validate() now checks the canonical defi panel directly (Array.isArray
(data?.tokens) && has at least one t.price > 0). AI/OTHER panels are validated
implicitly by their own extraKey declareRecords on write.
Audited the other 9 seeders with publishTransform (bls-series, bis-extended,
bis-data, gdelt-intel, trade-flows, iea-oil-stocks, jodi-gas, sanctions-pressure,
forecasts): all validateFn's correctly target the post-transform shape. Only
token-panels regressed.
Added 4 regression tests (tests/seed-contract-transform-regressions.test.mjs):
- validate accepts transformed panel with priced tokens
- validate rejects all-zero-price tokens
- validate rejects empty/missing tokens
- Explicit pre-fix repro (buggy old signature fails on transformed shape)
Verification:
- npm run test:data → 5322/5322 pass (was 5318; +4 new)
- npm run typecheck:all → clean
- node --check clean
* feat(seed-contract): add /api/seed-contract-probe validation endpoint
Single machine-readable gate for 'is PR #3097 working in production'.
Replaces the curl/jq ritual with one authenticated edge call that returns
HTTP 200 ok:true or 503 + failing check list.
What it validates:
- 8 canonical keys have {_seed, data} envelopes with required data fields
and minRecords floors (fsi-eu, zone-normals, 3 token panels + minRecords
guard against token-panels RETRY regression, product-catalog, wildfire,
earthquakes).
- 2 seed-meta:* keys remain BARE (shouldEnvelopeKey invariant; guards
against iea-oil-stocks ANALYSIS_META_EXTRA_KEY-class regressions).
- /api/product-catalog + /api/bootstrap responses contain no '_seed' leak.
Auth: x-probe-secret header must match RELAY_SHARED_SECRET (reuses existing
Vercel↔Railway internal trust boundary).
Probe logic is exported (checkProbe, checkPublicBoundary, DEFAULT_PROBES) for
hermetic testing. tests/seed-contract-probe.test.mjs covers every branch:
envelope pass/fail on field/records/shape, bare pass/fail on shape/field,
missing/malformed JSON, Redis non-2xx, boundary seed-leak detection,
DEFAULT_PROBES sanity (seed-meta invariant present, token-panels minRecords
guard present).
Usage:
curl -H "x-probe-secret: $RELAY_SHARED_SECRET" \
https://api.worldmonitor.app/api/seed-contract-probe
PR 3 will extend the probe with a stricter mode that asserts seed-meta:*
keys are GONE (not just bare) once legacy dual-write is removed.
Verification:
- tests/seed-contract-probe.test.mjs → 15/15 pass
- npm run test:data → 5338/5338 (was 5322; +16 new incl. conformance)
- npm run typecheck:all → clean
* fix(seed-contract): tighten probe — minRecords on AI/OTHER + cache-path source header
Review P2 findings: the probe's stated guards were weaker than advertised.
1. market:ai-tokens:v1 + market:other-tokens:v1 probes claimed to guard the
token-panels extra-key RETRY regression but only checked shape='envelope'
+ dataHas:['tokens']. If an extra-key declareRecords regressed to 0, both
probes would still pass because checkProbe() only inspects _seed.recordCount
when minRecords is set. Now both enforce minRecords: 1.
2. /api/product-catalog boundary check only asserted no '_seed' leak — which
is also true for the static fallback path. A broken cached reader
(getFromCache returning null or throwing) could serve fallback silently
and still pass this probe. Now:
- api/product-catalog.js emits X-Product-Catalog-Source: cache|dodo|fallback
on the response (the json() helper gained an optional source param wired
to each of the three branches).
- checkPublicBoundary declaratively requires that header's value match
'cache' for /api/product-catalog, so a fallback-serve fails the probe
with reason 'source:fallback!=cache' or 'source:missing!=cache'.
Test updates (tests/seed-contract-probe.test.mjs):
- Boundary check reworked to use a BOUNDARY_CHECKS config with optional
requireSourceHeader per endpoint.
- New cases: served-from-cache passes, served-from-fallback fails with source
mismatch, missing header fails, seed-leak still takes precedence, bad
status fails.
- Token-panels sanity test now asserts minRecords≥1 on all 3 panels.
Verification:
- tests/seed-contract-probe.test.mjs → 17/17 pass (was 15, +2 net)
- npm run test:data → 5340/5340
- npm run typecheck:all → clean
605 lines
24 KiB
JavaScript
605 lines
24 KiB
JavaScript
#!/usr/bin/env node
|
|
|
|
// SAX streaming parser: response.body is piped chunk-by-chunk into the parser.
|
|
// The full XML string is never held in memory, which avoids the OOM crash that
|
|
// occurred when fast-xml-parser tried to build a ~300MB object tree from a
|
|
// 120MB XML download against Railway's 512MB container limit.
|
|
import sax from 'sax';
|
|
|
|
import { CHROME_UA, loadEnvFile, runSeed, verifySeedKey, writeExtraKeyWithMeta } from './_seed-utils.mjs';
|
|
|
|
loadEnvFile(import.meta.url);
|
|
|
|
const CANONICAL_KEY = 'sanctions:pressure:v1';
|
|
const STATE_KEY = 'sanctions:pressure:state:v1';
|
|
const ENTITY_INDEX_KEY = 'sanctions:entities:v1';
|
|
const COUNTRY_COUNTS_KEY = 'sanctions:country-counts:v1';
|
|
const CACHE_TTL = 15 * 60 * 60; // 15h — 3h buffer over 12h cron cadence (was 12h = 0 buffer)
|
|
// Compact entity type codes for the lookup index (saves space vs full enum strings)
|
|
const ET_CODE = {
|
|
SANCTIONS_ENTITY_TYPE_VESSEL: 'vessel',
|
|
SANCTIONS_ENTITY_TYPE_AIRCRAFT: 'aircraft',
|
|
SANCTIONS_ENTITY_TYPE_INDIVIDUAL: 'individual',
|
|
SANCTIONS_ENTITY_TYPE_ENTITY: 'entity',
|
|
};
|
|
const DEFAULT_RECENT_LIMIT = 60;
|
|
const OFAC_TIMEOUT_MS = 45_000;
|
|
const PROGRAM_CODE_RE = /^[A-Z0-9][A-Z0-9-]{1,24}$/;
|
|
|
|
const OFAC_SOURCES = [
|
|
{ label: 'SDN', url: 'https://sanctionslistservice.ofac.treas.gov/api/PublicationPreview/exports/sdn_advanced.xml' },
|
|
{ label: 'CONSOLIDATED', url: 'https://sanctionslistservice.ofac.treas.gov/api/PublicationPreview/exports/cons_advanced.xml' },
|
|
];
|
|
|
|
// Strip XML namespace prefix (e.g. "sanc:SanctionsEntry" → "SanctionsEntry")
|
|
function local(name) {
|
|
const colon = name.indexOf(':');
|
|
return colon === -1 ? name : name.slice(colon + 1);
|
|
}
|
|
|
|
function uniqueSorted(values) {
|
|
return [...new Set(values.filter(Boolean).map((v) => String(v).trim()).filter(Boolean))].sort((a, b) => a.localeCompare(b));
|
|
}
|
|
|
|
function compactNote(value) {
|
|
const note = String(value || '').replace(/\s+/g, ' ').trim();
|
|
if (!note) return '';
|
|
return note.length > 240 ? `${note.slice(0, 237)}...` : note;
|
|
}
|
|
|
|
function sortEntries(a, b) {
|
|
return (Number(b.isNew) - Number(a.isNew))
|
|
|| (Number(b.effectiveAt) - Number(a.effectiveAt))
|
|
|| a.name.localeCompare(b.name);
|
|
}
|
|
|
|
function buildCountryPressure(entries) {
|
|
const map = new Map();
|
|
for (const entry of entries) {
|
|
const codes = entry.countryCodes.length > 0 ? entry.countryCodes : ['XX'];
|
|
const names = entry.countryNames.length > 0 ? entry.countryNames : ['Unknown'];
|
|
codes.forEach((code, index) => {
|
|
const key = `${code}:${names[index] || names[0] || 'Unknown'}`;
|
|
const current = map.get(key) || {
|
|
countryCode: code,
|
|
countryName: names[index] || names[0] || 'Unknown',
|
|
entryCount: 0,
|
|
newEntryCount: 0,
|
|
vesselCount: 0,
|
|
aircraftCount: 0,
|
|
};
|
|
current.entryCount += 1;
|
|
if (entry.isNew) current.newEntryCount += 1;
|
|
if (entry.entityType === 'SANCTIONS_ENTITY_TYPE_VESSEL') current.vesselCount += 1;
|
|
if (entry.entityType === 'SANCTIONS_ENTITY_TYPE_AIRCRAFT') current.aircraftCount += 1;
|
|
map.set(key, current);
|
|
});
|
|
}
|
|
return [...map.values()]
|
|
.sort((a, b) => b.newEntryCount - a.newEntryCount || b.entryCount - a.entryCount || a.countryName.localeCompare(b.countryName))
|
|
.slice(0, 12);
|
|
}
|
|
|
|
// Full ISO2 → entryCount map across ALL entries (not truncated like buildCountryPressure).
|
|
// Used by get-country-risk RPC for accurate per-country sanctions screening.
|
|
function buildCountryCounts(entries) {
|
|
const map = {};
|
|
for (const entry of entries) {
|
|
for (const code of entry.countryCodes) {
|
|
if (code && code !== 'XX') map[code] = (map[code] ?? 0) + 1;
|
|
}
|
|
}
|
|
return map;
|
|
}
|
|
|
|
function buildProgramPressure(entries) {
|
|
const map = new Map();
|
|
for (const entry of entries) {
|
|
const programs = entry.programs.length > 0 ? entry.programs : ['UNSPECIFIED'];
|
|
for (const program of programs) {
|
|
const current = map.get(program) || { program, entryCount: 0, newEntryCount: 0 };
|
|
current.entryCount += 1;
|
|
if (entry.isNew) current.newEntryCount += 1;
|
|
map.set(program, current);
|
|
}
|
|
}
|
|
return [...map.values()]
|
|
.sort((a, b) => b.newEntryCount - a.newEntryCount || b.entryCount - a.entryCount || a.program.localeCompare(b.program))
|
|
.slice(0, 12);
|
|
}
|
|
|
|
/**
|
|
* Stream-parse one OFAC Advanced XML source via SAX.
|
|
*
|
|
* Memory model: response.body chunks → sax.parser (stateful, O(1) RAM per chunk)
|
|
* → accumulate only the minimal data structures needed for output.
|
|
* Peak heap is proportional to the number of entries/parties, not the XML size.
|
|
*/
|
|
async function fetchSource(source) {
|
|
console.log(` Fetching OFAC ${source.label}...`);
|
|
const t0 = Date.now();
|
|
const response = await fetch(source.url, {
|
|
headers: { 'User-Agent': CHROME_UA },
|
|
signal: AbortSignal.timeout(OFAC_TIMEOUT_MS),
|
|
});
|
|
if (!response.ok) throw new Error(`OFAC ${source.label} HTTP ${response.status}`);
|
|
|
|
return new Promise((resolve, reject) => {
|
|
// strict=true: case-sensitive tag names. xmlns=false: we strip prefixes manually.
|
|
const parser = sax.parser(true, { trim: false, normalize: false });
|
|
|
|
// ── Reference maps (built first, small, kept for cross-reference) ──────────
|
|
const areaCodes = new Map(); // ID → { code, name }
|
|
const featureTypes = new Map(); // ID → label string
|
|
const legalBasis = new Map(); // ID → shortRef string
|
|
const locations = new Map(); // ID → { codes[], names[] }
|
|
const parties = new Map(); // profileId → { name, entityType, countryCodes[], countryNames[] }
|
|
const entries = [];
|
|
let datasetDate = 0;
|
|
let bytesReceived = 0;
|
|
|
|
// ── Element stack & text buffer ────────────────────────────────────────────
|
|
const stack = []; // local element names
|
|
let text = ''; // accumulated character data for current leaf
|
|
|
|
// ── Section flags ──────────────────────────────────────────────────────────
|
|
let inDateOfIssue = false;
|
|
let inAreaCodeValues = false;
|
|
let inFeatureTypeValues = false;
|
|
let inLegalBasisValues = false;
|
|
let inLocations = false;
|
|
let inDistinctParties = false;
|
|
let inSanctionsEntries = false;
|
|
|
|
// ── Current-object accumulators ────────────────────────────────────────────
|
|
// DateOfIssue
|
|
let doiYear = 0, doiMonth = 1, doiDay = 1;
|
|
|
|
// AreaCode / FeatureType / LegalBasis (reference value section)
|
|
let refId = '', refShortRef = '', refDescription = '';
|
|
|
|
// Location
|
|
let locId = '';
|
|
let locAreaCodeIds = null; // string[] | null
|
|
|
|
// DistinctParty / Profile
|
|
let partyFixedRef = '';
|
|
let profileId = '', profileSubTypeId = '';
|
|
let aliases = null; // Alias[]
|
|
let curAlias = null; // { primary, typeId, nameParts[] }
|
|
let inDocumentedName = false;
|
|
let namePartsBuf = null; // string[] collecting NamePartValue text
|
|
let profileFeatures = null; // Feature[]
|
|
let curFeature = null; // { featureTypeId, locationIds[] }
|
|
|
|
// SanctionsEntry
|
|
let entryId = '', entryProfileId = '';
|
|
let entryDates = null; // number[] (epochs from EntryEvent.Date)
|
|
let entryMeasureDates = null; // number[] (from SanctionsMeasure.DatePeriod)
|
|
let entryPrograms = null; // string[]
|
|
let entryNoteComments = null; // string[] (non-program comments)
|
|
let entryLegalIds = null; // string[] (LegalBasisID from EntryEvent)
|
|
|
|
// Date sub-elements (shared by multiple contexts)
|
|
let dateYear = 0, dateMonth = 1, dateDay = 1;
|
|
let inEntryEventDate = false;
|
|
let inMeasureDatePeriod = false;
|
|
|
|
// ── Helpers ────────────────────────────────────────────────────────────────
|
|
function epoch(y, m, d) {
|
|
if (!y) return 0;
|
|
return Date.UTC(y, Math.max(1, m) - 1, Math.max(1, d));
|
|
}
|
|
|
|
function resolveLocation(locId) {
|
|
const ids = locAreaCodeIds;
|
|
const mapped = ids.map((id) => areaCodes.get(id)).filter(Boolean);
|
|
const pairs = [...new Map(mapped.map((item) => [item.code, item.name])).entries()]
|
|
.filter(([code]) => code.length > 0)
|
|
.sort(([a], [b]) => a.localeCompare(b));
|
|
return { codes: pairs.map(([c]) => c), names: pairs.map(([, n]) => n) };
|
|
}
|
|
|
|
function finalizeParty() {
|
|
const primaryAlias = aliases?.find((a) => a.primary)
|
|
|| aliases?.find((a) => a.typeId === '1403')
|
|
|| aliases?.[0];
|
|
const name = primaryAlias?.nameParts.join(' ') || 'Unnamed designation';
|
|
|
|
let entityType = 'SANCTIONS_ENTITY_TYPE_ENTITY';
|
|
if (profileSubTypeId === '1') entityType = 'SANCTIONS_ENTITY_TYPE_VESSEL';
|
|
else if (profileSubTypeId === '2') entityType = 'SANCTIONS_ENTITY_TYPE_AIRCRAFT';
|
|
else if (profileFeatures?.some((f) => /birth|citizenship|nationality/i.test(featureTypes.get(f.featureTypeId) || ''))) {
|
|
entityType = 'SANCTIONS_ENTITY_TYPE_INDIVIDUAL';
|
|
}
|
|
|
|
const seen = new Map();
|
|
for (const feat of profileFeatures ?? []) {
|
|
if (!/location/i.test(featureTypes.get(feat.featureTypeId) || '')) continue;
|
|
for (const lid of feat.locationIds) {
|
|
const loc = locations.get(lid);
|
|
if (!loc) continue;
|
|
loc.codes.forEach((code, i) => { if (code && !seen.has(code)) seen.set(code, loc.names[i] ?? ''); });
|
|
}
|
|
}
|
|
const sorted = [...seen.entries()].sort(([a], [b]) => a.localeCompare(b));
|
|
|
|
parties.set(profileId, {
|
|
name,
|
|
entityType,
|
|
countryCodes: sorted.map(([c]) => c),
|
|
countryNames: sorted.map(([, n]) => n),
|
|
});
|
|
}
|
|
|
|
function finalizeEntry() {
|
|
const party = parties.get(entryProfileId);
|
|
const name = party?.name || 'Unnamed designation';
|
|
const programs = uniqueSorted((entryPrograms ?? []).filter((c) => PROGRAM_CODE_RE.test(c)));
|
|
const allDates = [...(entryDates ?? []), ...(entryMeasureDates ?? [])];
|
|
const effectiveAt = String(allDates.length > 0 ? Math.max(...allDates) : 0);
|
|
|
|
const commentNote = (entryNoteComments ?? []).find((c) => c);
|
|
const legalNote = (entryLegalIds ?? []).map((id) => legalBasis.get(id) || '').find((n) => n) || '';
|
|
const note = compactNote(commentNote || legalNote);
|
|
|
|
entries.push({
|
|
id: `${source.label}:${entryId || entryProfileId}`,
|
|
name,
|
|
entityType: party?.entityType || 'SANCTIONS_ENTITY_TYPE_ENTITY',
|
|
countryCodes: party?.countryCodes ?? [],
|
|
countryNames: party?.countryNames ?? [],
|
|
programs: programs.length > 0 ? programs : [source.label],
|
|
sourceLists: [source.label],
|
|
effectiveAt,
|
|
isNew: false,
|
|
note,
|
|
});
|
|
}
|
|
|
|
// ── SAX event handlers ─────────────────────────────────────────────────────
|
|
parser.onopentag = (node) => {
|
|
const name = local(node.name);
|
|
const attrs = node.attributes;
|
|
stack.push(name);
|
|
text = '';
|
|
|
|
switch (name) {
|
|
// ── Section markers ──
|
|
case 'DateOfIssue': inDateOfIssue = true; break;
|
|
case 'AreaCodeValues': inAreaCodeValues = true; break;
|
|
case 'FeatureTypeValues': inFeatureTypeValues = true; break;
|
|
case 'LegalBasisValues': inLegalBasisValues = true; break;
|
|
case 'Locations': inLocations = true; break;
|
|
case 'DistinctParties': inDistinctParties = true; break;
|
|
case 'SanctionsEntries': inSanctionsEntries = true; break;
|
|
|
|
// ── Reference values ──
|
|
case 'AreaCode':
|
|
if (inAreaCodeValues) { refId = attrs.ID || ''; refDescription = attrs.Description || ''; }
|
|
break;
|
|
case 'FeatureType':
|
|
if (inFeatureTypeValues) refId = attrs.ID || '';
|
|
break;
|
|
case 'LegalBasis':
|
|
if (inLegalBasisValues) { refId = attrs.ID || ''; refShortRef = attrs.LegalBasisShortRef || ''; }
|
|
break;
|
|
|
|
// ── Locations ──
|
|
case 'Location':
|
|
if (inLocations) { locId = attrs.ID || ''; locAreaCodeIds = []; }
|
|
break;
|
|
case 'LocationAreaCode':
|
|
if (locAreaCodeIds && attrs.AreaCodeID) locAreaCodeIds.push(attrs.AreaCodeID);
|
|
break;
|
|
|
|
// ── DistinctParty / Profile ──
|
|
case 'DistinctParty':
|
|
if (inDistinctParties) { partyFixedRef = attrs.FixedRef || ''; aliases = []; profileFeatures = []; }
|
|
break;
|
|
case 'Profile':
|
|
if (inDistinctParties) { profileId = attrs.ID || partyFixedRef; profileSubTypeId = attrs.PartySubTypeID || ''; }
|
|
break;
|
|
case 'Alias':
|
|
if (inDistinctParties) curAlias = { primary: attrs.Primary === 'true', typeId: attrs.AliasTypeID || '', nameParts: [] };
|
|
break;
|
|
case 'DocumentedName':
|
|
if (curAlias) { inDocumentedName = true; namePartsBuf = []; }
|
|
break;
|
|
case 'Feature':
|
|
if (inDistinctParties) curFeature = { featureTypeId: attrs.FeatureTypeID || '', locationIds: [] };
|
|
break;
|
|
case 'VersionLocation':
|
|
if (curFeature && attrs.LocationID) curFeature.locationIds.push(attrs.LocationID);
|
|
break;
|
|
|
|
// ── SanctionsEntry ──
|
|
case 'SanctionsEntry':
|
|
if (inSanctionsEntries) {
|
|
entryId = attrs.ID || ''; entryProfileId = attrs.ProfileID || '';
|
|
entryDates = []; entryMeasureDates = []; entryPrograms = []; entryNoteComments = []; entryLegalIds = [];
|
|
}
|
|
break;
|
|
case 'EntryEvent':
|
|
if (entryDates) inEntryEventDate = true;
|
|
break;
|
|
case 'SanctionsMeasure':
|
|
if (entryDates) inMeasureDatePeriod = false; // reset, set when we see DatePeriod
|
|
break;
|
|
case 'DatePeriod':
|
|
if (entryMeasureDates) inMeasureDatePeriod = true;
|
|
break;
|
|
case 'Date':
|
|
case 'From':
|
|
dateYear = 0; dateMonth = 1; dateDay = 1;
|
|
break;
|
|
}
|
|
};
|
|
|
|
parser.onclosetag = (rawName) => {
|
|
const name = local(rawName);
|
|
const t = text.trim();
|
|
text = '';
|
|
stack.pop();
|
|
|
|
switch (name) {
|
|
// ── DateOfIssue ──
|
|
case 'DateOfIssue': inDateOfIssue = false; datasetDate = epoch(doiYear, doiMonth, doiDay); break;
|
|
|
|
// ── Shared Year/Month/Day (context determined by flags) ──
|
|
case 'Year':
|
|
if (inDateOfIssue) doiYear = Number(t) || 0;
|
|
else dateYear = Number(t) || 0;
|
|
break;
|
|
case 'Month':
|
|
if (inDateOfIssue) doiMonth = Number(t) || 1;
|
|
else dateMonth = Number(t) || 1;
|
|
break;
|
|
case 'Day':
|
|
if (inDateOfIssue) doiDay = Number(t) || 1;
|
|
else dateDay = Number(t) || 1;
|
|
break;
|
|
|
|
// ── Section close ──
|
|
case 'AreaCodeValues': inAreaCodeValues = false; break;
|
|
case 'FeatureTypeValues': inFeatureTypeValues = false; break;
|
|
case 'LegalBasisValues': inLegalBasisValues = false; break;
|
|
case 'Locations': inLocations = false; break;
|
|
case 'DistinctParties': inDistinctParties = false; break;
|
|
case 'SanctionsEntries': inSanctionsEntries = false; break;
|
|
|
|
// ── Reference values ──
|
|
case 'AreaCode':
|
|
if (inAreaCodeValues && refId) areaCodes.set(refId, { code: t, name: refDescription });
|
|
break;
|
|
case 'FeatureType':
|
|
if (inFeatureTypeValues && refId) featureTypes.set(refId, t);
|
|
break;
|
|
case 'LegalBasis':
|
|
if (inLegalBasisValues && refId) legalBasis.set(refId, refShortRef || t);
|
|
break;
|
|
|
|
// ── Locations ──
|
|
case 'Location':
|
|
if (locAreaCodeIds !== null) {
|
|
locations.set(locId, resolveLocation(locId));
|
|
locId = ''; locAreaCodeIds = null;
|
|
}
|
|
break;
|
|
|
|
// ── DistinctParty / Profile ──
|
|
case 'NamePartValue':
|
|
if (namePartsBuf !== null && t) namePartsBuf.push(t);
|
|
break;
|
|
case 'DocumentedName':
|
|
if (curAlias && namePartsBuf !== null) { curAlias.nameParts = namePartsBuf; namePartsBuf = null; inDocumentedName = false; }
|
|
break;
|
|
case 'Alias':
|
|
if (curAlias) { aliases.push(curAlias); curAlias = null; }
|
|
break;
|
|
case 'Feature':
|
|
if (curFeature) { profileFeatures.push(curFeature); curFeature = null; }
|
|
break;
|
|
case 'Profile':
|
|
if (inDistinctParties && profileId) finalizeParty();
|
|
profileId = ''; profileSubTypeId = ''; aliases = []; profileFeatures = [];
|
|
break;
|
|
case 'DistinctParty':
|
|
partyFixedRef = '';
|
|
break;
|
|
|
|
// ── SanctionsEntry date contexts ──
|
|
case 'Date':
|
|
if (inEntryEventDate && entryDates) {
|
|
const e = epoch(dateYear, dateMonth, dateDay);
|
|
if (e > 0) entryDates.push(e);
|
|
}
|
|
break;
|
|
case 'From':
|
|
if (inMeasureDatePeriod && entryMeasureDates) {
|
|
const e = epoch(dateYear, dateMonth, dateDay);
|
|
if (e > 0) entryMeasureDates.push(e);
|
|
}
|
|
break;
|
|
case 'EntryEvent':
|
|
inEntryEventDate = false;
|
|
break;
|
|
case 'SanctionsMeasure':
|
|
inMeasureDatePeriod = false;
|
|
break;
|
|
case 'DatePeriod':
|
|
inMeasureDatePeriod = false;
|
|
break;
|
|
|
|
// ── SanctionsEntry leaf data ──
|
|
case 'LegalBasisID':
|
|
if (entryLegalIds) entryLegalIds.push(t);
|
|
break;
|
|
case 'Comment':
|
|
if (entryPrograms !== null) entryPrograms.push(t);
|
|
if (entryNoteComments !== null && t && !PROGRAM_CODE_RE.test(t)) entryNoteComments.push(t);
|
|
break;
|
|
|
|
case 'SanctionsEntry':
|
|
if (entryDates !== null) finalizeEntry();
|
|
entryId = ''; entryProfileId = ''; entryDates = null; entryMeasureDates = null;
|
|
entryPrograms = null; entryNoteComments = null; entryLegalIds = null;
|
|
break;
|
|
}
|
|
};
|
|
|
|
parser.ontext = (chunk) => { text += chunk; };
|
|
parser.oncdata = (chunk) => { text += chunk; };
|
|
|
|
parser.onerror = (err) => {
|
|
parser.resume(); // keep streaming; log but don't abort — partial results are valid
|
|
console.warn(` ${source.label}: SAX parse warning: ${err.message}`);
|
|
};
|
|
|
|
parser.onend = () => {
|
|
console.log(` ${source.label}: ${(bytesReceived / 1024).toFixed(0)}KB streamed, ${entries.length} entries parsed (${Date.now() - t0}ms)`);
|
|
resolve({ entries, datasetDate });
|
|
};
|
|
|
|
// Stream response body through the SAX parser chunk by chunk.
|
|
// response.body is a web ReadableStream (Node.js 20 native fetch).
|
|
const decoder = new TextDecoder('utf-8');
|
|
(async () => {
|
|
try {
|
|
for await (const chunk of response.body) {
|
|
bytesReceived += chunk.byteLength;
|
|
parser.write(decoder.decode(chunk, { stream: true }));
|
|
}
|
|
// Flush any remaining bytes in the decoder
|
|
const tail = decoder.decode();
|
|
if (tail) parser.write(tail);
|
|
parser.close();
|
|
} catch (err) {
|
|
reject(err);
|
|
}
|
|
})();
|
|
});
|
|
}
|
|
|
|
async function fetchSanctionsPressure() {
|
|
const previousState = await verifySeedKey(STATE_KEY).catch(() => null);
|
|
const previousIds = new Set(Array.isArray(previousState?.entryIds) ? previousState.entryIds.map((id) => String(id)) : []);
|
|
const hasPrevious = previousIds.size > 0;
|
|
console.log(` Previous state: ${hasPrevious ? `${previousIds.size} known IDs` : 'none (first run or expired)'}`);
|
|
|
|
// Sequential fetch: SDN then Consolidated. SAX streaming keeps peak RAM low
|
|
// regardless of file size — no full XML string or DOM tree is ever built.
|
|
const results = [];
|
|
for (const source of OFAC_SOURCES) {
|
|
results.push(await fetchSource(source));
|
|
}
|
|
const entries = results.flatMap((result) => result.entries);
|
|
const datasetDate = results.reduce((max, result) => Math.max(max, result.datasetDate || 0), 0);
|
|
|
|
if (hasPrevious) {
|
|
for (const entry of entries) {
|
|
entry.isNew = !previousIds.has(entry.id);
|
|
}
|
|
}
|
|
|
|
const sortedEntries = [...entries].sort(sortEntries);
|
|
const totalCount = entries.length;
|
|
const newEntryCount = hasPrevious ? entries.filter((entry) => entry.isNew).length : 0;
|
|
const vesselCount = entries.filter((entry) => entry.entityType === 'SANCTIONS_ENTITY_TYPE_VESSEL').length;
|
|
const aircraftCount = entries.filter((entry) => entry.entityType === 'SANCTIONS_ENTITY_TYPE_AIRCRAFT').length;
|
|
console.log(` Merged: ${totalCount} total (${results[0]?.entries.length ?? 0} SDN + ${results[1]?.entries.length ?? 0} consolidated), ${newEntryCount} new, ${vesselCount} vessels, ${aircraftCount} aircraft`);
|
|
|
|
// Build compact entity index for name-based lookup (Phase 1 — issue #2042).
|
|
// Each record: { id, name, et (compact type), cc (country codes), pr (programs) }
|
|
// Stored as a flat array in a single Redis key for O(N) in-memory search.
|
|
const _entityIndex = entries.map((e) => ({
|
|
id: e.id,
|
|
name: e.name,
|
|
et: ET_CODE[e.entityType] ?? 'entity',
|
|
cc: e.countryCodes.slice(0, 3),
|
|
pr: e.programs.slice(0, 3),
|
|
}));
|
|
console.log(` Entity index: ${_entityIndex.length} records (~${Math.round(JSON.stringify(_entityIndex).length / 1024)}KB)`);
|
|
|
|
return {
|
|
fetchedAt: String(Date.now()),
|
|
datasetDate: String(datasetDate),
|
|
totalCount,
|
|
sdnCount: results[0]?.entries.length ?? 0,
|
|
consolidatedCount: results[1]?.entries.length ?? 0,
|
|
newEntryCount,
|
|
vesselCount,
|
|
aircraftCount,
|
|
countries: buildCountryPressure(entries),
|
|
programs: buildProgramPressure(entries),
|
|
entries: sortedEntries.slice(0, DEFAULT_RECENT_LIMIT),
|
|
_entityIndex,
|
|
_countryCounts: buildCountryCounts(entries),
|
|
_state: {
|
|
entryIds: entries.map((entry) => entry.id),
|
|
},
|
|
};
|
|
}
|
|
|
|
function validate(data) {
|
|
return (data?.totalCount ?? 0) > 0;
|
|
}
|
|
|
|
export function declareRecords(data) {
|
|
return data?.totalCount ?? 0;
|
|
}
|
|
|
|
runSeed('sanctions', 'pressure', CANONICAL_KEY, fetchSanctionsPressure, {
|
|
ttlSeconds: CACHE_TTL,
|
|
validateFn: validate,
|
|
sourceVersion: 'ofac-sls-advanced-xml-v1',
|
|
recordCount: (data) => data.totalCount ?? 0,
|
|
// Strip internal-only fields before writing the main key so the pressure payload
|
|
// does not include the entity index (~hundreds of KB) or state snapshot.
|
|
publishTransform: (data) => {
|
|
const { _entityIndex: _ei, _state: _s, _countryCounts: _cc, ...rest } = data;
|
|
return rest;
|
|
},
|
|
extraKeys: [
|
|
{
|
|
key: STATE_KEY,
|
|
ttl: CACHE_TTL,
|
|
transform: (data) => data._state,
|
|
},
|
|
{
|
|
key: COUNTRY_COUNTS_KEY,
|
|
ttl: CACHE_TTL,
|
|
transform: (data) => data._countryCounts,
|
|
},
|
|
],
|
|
afterPublish: async (data, _ctx) => {
|
|
// Write entity lookup index with seed-meta so health.js can monitor it.
|
|
// Uses writeExtraKeyWithMeta rather than extraKeys because runSeed's extraKeys
|
|
// calls writeExtraKey (no meta), and we need a seed-meta key for health tracking.
|
|
if (data._entityIndex) {
|
|
await writeExtraKeyWithMeta(
|
|
ENTITY_INDEX_KEY,
|
|
data._entityIndex,
|
|
CACHE_TTL,
|
|
data._entityIndex.length,
|
|
);
|
|
}
|
|
// Write full ISO2→count map for per-country sanctions lookup (no top-12 truncation).
|
|
if (data._countryCounts) {
|
|
await writeExtraKeyWithMeta(
|
|
COUNTRY_COUNTS_KEY,
|
|
data._countryCounts,
|
|
CACHE_TTL,
|
|
Object.keys(data._countryCounts).length,
|
|
);
|
|
}
|
|
delete data._state;
|
|
delete data._entityIndex;
|
|
delete data._countryCounts;
|
|
},
|
|
|
|
declareRecords,
|
|
schemaVersion: 1,
|
|
maxStaleMin: 720,
|
|
});
|