Files
worldmonitor/scripts/_energy-disruption-registry.mjs
Elie Habib 80797e7cc8 feat(energy-atlas): seed-side countries[] denorm + CountryDeepDive row (§R #5 = B)
Per plan §R/#5 decision B: denormalise countries[] at seed time on each
disruption event so CountryDeepDivePanel can filter events per country
without an asset-registry round trip. Schema join (pipeline/storage
→ event.assetId) happens once in the weekly cron, not on every panel
render. The alternative (client-side join) was rejected because it
couples UI logic to asset-registry internals and duplicates the join
for every surface that wants a per-country filter.

Changes:
- `proto/.../list_energy_disruptions.proto`: add `repeated string
  countries = 15` to EnergyDisruptionEntry with doc comment tying it
  to the plan decision and the always-non-empty invariant.
- `scripts/_energy-disruption-registry.mjs`:
    • Load pipeline-gas + pipeline-oil + storage-facilities registries
      once per seed cycle; index by id.
    • `deriveCountriesForEvent()` resolves assetId to {fromCountry,
      toCountry, transitCountries} (pipeline) or {country} (storage),
      deduped + alpha-sorted so byte-diff stability holds.
    • `buildPayload()` attaches the computed countries[] to every
      event before writing.
    • `validateRegistry()` now requires non-empty countries[] of
      ISO2 codes. Combined with the seeder's `emptyDataIsFailure:
      true`, this surfaces orphaned assetIds loudly — the next cron
      tick fails validation and seed-meta stays stale, tripping
      health alarms.
- `scripts/data/energy-disruptions.json`: fix two orphaned assetIds
  that the new join caught:
    • `cpc-force-majeure-2022`: `cpc-pipeline` → `cpc` (matches the
      entry in pipelines-oil.json).
    • `pdvsa-designation-2019`: `ve-petrol-2026-q1` (non-existent) →
      `venezuela-anzoategui-puerto-la-cruz`.
- `server/.../list-energy-disruptions.ts`: project countries[] into
  the RPC response via coerceStringArray. Legacy pre-denorm rows
  surface as empty array (always present on wire, length 0 => old).
- `src/components/CountryDeepDivePanel.ts`: add 4th Atlas row —
  "Energy disruptions in {iso2}" — filtered by `iso2 ∈ countries[]`.
  Failure is silent; EnergyDisruptionsPanel (upcoming) is the
  primary disruption surface.
- `tests/energy-disruptions-registry.test.mts`: switch to validating
  the buildPayload output (post-denorm), add §R #5 B invariant
  tests, plus a raw-JSON invariant ensuring curators don't hand-edit
  countries[] (it's derived, not declared).

Proto regen note: `make generate` currently fails with a duplicate
openapi plugin collision in buf.gen.yaml (unrelated bug — 3 plugin
entries emit to the same out dir). Worked around by temporarily
trimming buf.gen.yaml to just the TS plugins for this regen. Added
only the `countries: string[]` wire field to both service_client and
service_server; no other generated-file drift in this PR.
2026-04-24 10:22:01 +04:00

197 lines
7.8 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// @ts-check
//
// Shared utility for the energy-disruption event log. NOT an entry point —
// see seed-energy-disruptions.mjs.
//
// Each event ties back to an asset seeded by the pipeline or storage
// registry (by assetId + assetType). Events are curated in
// scripts/data/energy-disruptions.json today; a state-transition
// classifier was scoped but not shipped.
//
// Schema documented in docs/methodology/disruptions.mdx.
import { readFileSync } from 'node:fs';
import { dirname, resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
export const ENERGY_DISRUPTIONS_CANONICAL_KEY = 'energy:disruptions:v1';
export const ENERGY_DISRUPTIONS_TTL_SECONDS = 21 * 24 * 3600;
const VALID_ASSET_TYPES = new Set(['pipeline', 'storage']);
const VALID_EVENT_TYPES = new Set([
'sabotage', 'sanction', 'maintenance', 'mechanical',
'weather', 'commercial', 'war', 'other',
]);
const VALID_CAUSES = new Set([
'sabotage', 'sanction', 'logistics', 'policy', 'war',
'upstream_refinery', 'chokepoint', 'import_cut',
]);
const VALID_SOURCE_TYPES = new Set([
'regulator', 'operator', 'press', 'ais-relay', 'satellite',
]);
const MIN_EVENTS = 8;
function loadRegistry() {
const __dirname = dirname(fileURLToPath(import.meta.url));
const raw = readFileSync(resolve(__dirname, 'data', 'energy-disruptions.json'), 'utf-8');
return JSON.parse(raw);
}
/**
* Load the pipeline + storage registries so `buildPayload` can join each
* disruption event to its referenced asset and compute the `countries[]`
* denorm field (plan §R/#5 decision B).
*
* Pipelines contribute fromCountry, toCountry, and transitCountries[].
* Storage facilities contribute their single country code. Duplicates are
* deduped and sorted so the seed output is stable across runs — unstable
* ordering would churn the seeded payload bytes on every cron tick and
* defeat envelope diffing.
*
* @returns {{
* pipelines: Record<string, { fromCountry?: string; toCountry?: string; transitCountries?: string[] }>,
* storage: Record<string, { country?: string }>,
* }}
*/
function loadAssetRegistries() {
const __dirname = dirname(fileURLToPath(import.meta.url));
const gas = JSON.parse(readFileSync(resolve(__dirname, 'data', 'pipelines-gas.json'), 'utf-8'));
const oil = JSON.parse(readFileSync(resolve(__dirname, 'data', 'pipelines-oil.json'), 'utf-8'));
const storageRaw = JSON.parse(readFileSync(resolve(__dirname, 'data', 'storage-facilities.json'), 'utf-8'));
return {
pipelines: { ...(gas.pipelines ?? {}), ...(oil.pipelines ?? {}) },
storage: storageRaw.facilities ?? {},
};
}
/**
* Compute the denormalised country set for a single event.
*
* @param {{ assetId: string; assetType: string }} event
* @param {ReturnType<typeof loadAssetRegistries>} registries
* @returns {string[]} ISO2 codes, deduped + alpha-sorted. Empty array when
* the referenced asset cannot be resolved — callers (seeder) should
* treat empty as a hard validation failure so stale references surface
* loudly on the next cron tick rather than silently corrupt the filter.
*/
function deriveCountriesForEvent(event, registries) {
const out = new Set();
if (event.assetType === 'pipeline') {
const p = registries.pipelines[event.assetId];
if (p) {
if (typeof p.fromCountry === 'string') out.add(p.fromCountry);
if (typeof p.toCountry === 'string') out.add(p.toCountry);
if (Array.isArray(p.transitCountries)) {
for (const c of p.transitCountries) if (typeof c === 'string') out.add(c);
}
}
} else if (event.assetType === 'storage') {
const s = registries.storage[event.assetId];
if (s && typeof s.country === 'string') out.add(s.country);
}
return Array.from(out).sort();
}
/**
* @param {unknown} data
* @returns {boolean}
*/
export function validateRegistry(data) {
if (!data || typeof data !== 'object') return false;
const obj = /** @type {Record<string, unknown>} */ (data);
if (!obj.events || typeof obj.events !== 'object') return false;
const events = /** @type {Record<string, any>} */ (obj.events);
const entries = Object.entries(events);
if (entries.length < MIN_EVENTS) return false;
const seenIds = new Set();
for (const [key, e] of entries) {
if (seenIds.has(key)) return false;
seenIds.add(key);
if (e.id !== key) return false;
if (typeof e.assetId !== 'string' || e.assetId.length === 0) return false;
if (!VALID_ASSET_TYPES.has(e.assetType)) return false;
if (!VALID_EVENT_TYPES.has(e.eventType)) return false;
if (typeof e.startAt !== 'string' || !isIsoDate(e.startAt)) return false;
if (e.endAt !== null && (typeof e.endAt !== 'string' || !isIsoDate(e.endAt))) return false;
if (typeof e.capacityOfflineBcmYr !== 'number' || e.capacityOfflineBcmYr < 0) return false;
if (typeof e.capacityOfflineMbd !== 'number' || e.capacityOfflineMbd < 0) return false;
if (!Array.isArray(e.causeChain) || e.causeChain.length === 0) return false;
for (const c of e.causeChain) if (!VALID_CAUSES.has(c)) return false;
if (typeof e.shortDescription !== 'string' || e.shortDescription.length === 0) return false;
if (!Array.isArray(e.sources) || e.sources.length === 0) return false;
for (const s of e.sources) {
if (!s || typeof s !== 'object') return false;
if (typeof s.authority !== 'string' || typeof s.title !== 'string') return false;
if (typeof s.url !== 'string' || !s.url.startsWith('http')) return false;
if (typeof s.date !== 'string' || !isIsoDate(s.date)) return false;
if (!VALID_SOURCE_TYPES.has(s.sourceType)) return false;
}
if (typeof e.classifierVersion !== 'string') return false;
if (typeof e.classifierConfidence !== 'number' ||
e.classifierConfidence < 0 || e.classifierConfidence > 1) return false;
if (typeof e.lastEvidenceUpdate !== 'string' || !isIsoDate(e.lastEvidenceUpdate)) return false;
// endAt must not be earlier than startAt.
if (e.endAt) {
const start = Date.parse(e.startAt);
const end = Date.parse(e.endAt);
if (end < start) return false;
}
// countries[] is the denorm introduced in plan §R/#5 (decision B). Every
// event must resolve to ≥1 country code from its referenced asset. An
// empty array here means the upstream asset was removed or the assetId
// is misspelled — both are hard errors the cron should surface by
// failing validation (emptyDataIsFailure upstream preserves seed-meta
// staleness so health alarms fire).
if (!Array.isArray(e.countries) || e.countries.length === 0) return false;
for (const c of e.countries) {
if (typeof c !== 'string' || !/^[A-Z]{2}$/.test(c)) return false;
}
}
return true;
}
function isIsoDate(v) {
if (typeof v !== 'string') return false;
return Number.isFinite(Date.parse(v));
}
export function buildPayload() {
const registry = loadRegistry();
const assets = loadAssetRegistries();
// Denormalise countries[] on every event so CountryDeepDivePanel can
// filter by country without an asset-registry round trip. If an event's
// assetId cannot be resolved we leave countries[] empty — validateRegistry
// rejects that shape, which fails the seed (emptyDataIsFailure: true)
// and keeps seed-meta stale until the curator fixes the orphaned id.
const rawEvents = /** @type {Record<string, any>} */ (registry.events ?? {});
const events = Object.fromEntries(
Object.entries(rawEvents).map(([id, event]) => [
id,
{ ...event, countries: deriveCountriesForEvent(event, assets) },
]),
);
return { ...registry, events, updatedAt: new Date().toISOString() };
}
/**
* @param {any} data
* @returns {number}
*/
export function recordCount(data) {
return Object.keys(data?.events ?? {}).length;
}
/**
* @param {any} data
* @returns {number}
*/
export function declareRecords(data) {
return recordCount(data);
}
export const MAX_STALE_MIN = 20_160; // weekly cron × 2 headroom