mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
* feat(energy-atlas): seed-side countries[] denorm + CountryDeepDive row (§R #5 = B)
Per plan §R/#5 decision B: denormalise countries[] at seed time on each
disruption event so CountryDeepDivePanel can filter events per country
without an asset-registry round trip. Schema join (pipeline/storage
→ event.assetId) happens once in the weekly cron, not on every panel
render. The alternative (client-side join) was rejected because it
couples UI logic to asset-registry internals and duplicates the join
for every surface that wants a per-country filter.
Changes:
- `proto/.../list_energy_disruptions.proto`: add `repeated string
countries = 15` to EnergyDisruptionEntry with doc comment tying it
to the plan decision and the always-non-empty invariant.
- `scripts/_energy-disruption-registry.mjs`:
• Load pipeline-gas + pipeline-oil + storage-facilities registries
once per seed cycle; index by id.
• `deriveCountriesForEvent()` resolves assetId to {fromCountry,
toCountry, transitCountries} (pipeline) or {country} (storage),
deduped + alpha-sorted so byte-diff stability holds.
• `buildPayload()` attaches the computed countries[] to every
event before writing.
• `validateRegistry()` now requires non-empty countries[] of
ISO2 codes. Combined with the seeder's `emptyDataIsFailure:
true`, this surfaces orphaned assetIds loudly — the next cron
tick fails validation and seed-meta stays stale, tripping
health alarms.
- `scripts/data/energy-disruptions.json`: fix two orphaned assetIds
that the new join caught:
• `cpc-force-majeure-2022`: `cpc-pipeline` → `cpc` (matches the
entry in pipelines-oil.json).
• `pdvsa-designation-2019`: `ve-petrol-2026-q1` (non-existent) →
`venezuela-anzoategui-puerto-la-cruz`.
- `server/.../list-energy-disruptions.ts`: project countries[] into
the RPC response via coerceStringArray. Legacy pre-denorm rows
surface as empty array (always present on wire, length 0 => old).
- `src/components/CountryDeepDivePanel.ts`: add 4th Atlas row —
"Energy disruptions in {iso2}" — filtered by `iso2 ∈ countries[]`.
Failure is silent; EnergyDisruptionsPanel (upcoming) is the
primary disruption surface.
- `tests/energy-disruptions-registry.test.mts`: switch to validating
the buildPayload output (post-denorm), add §R #5 B invariant
tests, plus a raw-JSON invariant ensuring curators don't hand-edit
countries[] (it's derived, not declared).
Proto regen note: `make generate` currently fails with a duplicate
openapi plugin collision in buf.gen.yaml (unrelated bug — 3 plugin
entries emit to the same out dir). Worked around by temporarily
trimming buf.gen.yaml to just the TS plugins for this regen. Added
only the `countries: string[]` wire field to both service_client and
service_server; no other generated-file drift in this PR.
* chore(proto): regenerate openapi specs for countries[] field
Runs `make generate` with the sebuf v0.11.1 plugin now correctly
resolved via the PATH fix (cherry-picked from fix/makefile-generate-path-prefix).
The new `countries` field on EnergyDisruptionEntry propagates into:
- docs/api/SupplyChainService.openapi.yaml (primary per-service spec)
- docs/api/SupplyChainService.openapi.json (machine-readable variant)
- docs/api/worldmonitor.openapi.yaml (consolidated bundle)
No TypeScript drift beyond the already-committed service_client.ts /
service_server.ts updates in 80797e7cc.
* fix(energy-atlas): drop highlightEventId emission (review P2)
Codex P2: loadDisruptionsForCountry dispatched `highlightEventId` but
neither PipelineStatusPanel nor StorageFacilityMapPanel consumes it
(the openDetailHandler reads only pipelineId / facilityId). The UI's
implicit promise (event-specific highlighting) wasn't delivered —
clickthrough was asset-generic, and the extra wire field was a
misleading API surface.
Fix: emit only {pipelineId, facilityId} in the dispatched detail.
Row click opens the asset drawer; user sees the full per-asset
disruption timeline and locates the event visually.
Symmetric fix for PR #3378's EnergyDisruptionsPanel — both emitters
now match the drawer contract exactly. Re-add `highlightEventId`
here when the drawer panels ship matching consumer code
(openDetailHandler accepts it, loadDetail stores it,
renderDisruptionTimeline scrolls + emphasises the matching event).
Typecheck clean, test:data 6698/6698 pass.
* fix(energy-atlas): collision detection + abort signal + label clamp (review P2)
Three Codex P2 findings on PR #3377:
1. `loadAssetRegistries()` spread-merged gas + oil pipelines, silently
overwriting entries on id collision. No collision today, but a
curator adding a pipeline under the same id to both files would
cause `deriveCountriesForEvent` to return wrong-commodity country
data with no test flagging it.
Fix: explicit merge loop that throws on duplicate id. The next
cron tick fails validation, seed-meta stays stale, health alarms
fire — same loud-failure pattern the rest of the seeder uses.
2. `loadDisruptionsForCountry` didn't thread `this.signal` through
the RPC fetch shim. The stale-closure guard (`currentCode !== iso2`)
discarded stale RESULTS, but the in-flight request couldn't be
cancelled when the user switched countries or closed the panel.
Fix: wrap globalThis.fetch with { signal: this.signal } in the
client factory, matching the signal lifecycle the rest of the
panel already uses.
3. `shortDescription` values up to 200 chars rendered without
ellipsis in the compact Atlas row, overflowing the row layout.
Fix: new `truncateDisruptionLabel` helper clamps to 80 chars with
ellipsis. Full text still accessible via click-through to the
asset drawer.
Typecheck clean, test:data 6698/6698 pass.
219 lines
8.9 KiB
JavaScript
219 lines
8.9 KiB
JavaScript
// @ts-check
|
||
//
|
||
// Shared utility for the energy-disruption event log. NOT an entry point —
|
||
// see seed-energy-disruptions.mjs.
|
||
//
|
||
// Each event ties back to an asset seeded by the pipeline or storage
|
||
// registry (by assetId + assetType). Events are curated in
|
||
// scripts/data/energy-disruptions.json today; a state-transition
|
||
// classifier was scoped but not shipped.
|
||
//
|
||
// Schema documented in docs/methodology/disruptions.mdx.
|
||
|
||
import { readFileSync } from 'node:fs';
|
||
import { dirname, resolve } from 'node:path';
|
||
import { fileURLToPath } from 'node:url';
|
||
|
||
export const ENERGY_DISRUPTIONS_CANONICAL_KEY = 'energy:disruptions:v1';
|
||
export const ENERGY_DISRUPTIONS_TTL_SECONDS = 21 * 24 * 3600;
|
||
|
||
const VALID_ASSET_TYPES = new Set(['pipeline', 'storage']);
|
||
const VALID_EVENT_TYPES = new Set([
|
||
'sabotage', 'sanction', 'maintenance', 'mechanical',
|
||
'weather', 'commercial', 'war', 'other',
|
||
]);
|
||
const VALID_CAUSES = new Set([
|
||
'sabotage', 'sanction', 'logistics', 'policy', 'war',
|
||
'upstream_refinery', 'chokepoint', 'import_cut',
|
||
]);
|
||
const VALID_SOURCE_TYPES = new Set([
|
||
'regulator', 'operator', 'press', 'ais-relay', 'satellite',
|
||
]);
|
||
|
||
const MIN_EVENTS = 8;
|
||
|
||
function loadRegistry() {
|
||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||
const raw = readFileSync(resolve(__dirname, 'data', 'energy-disruptions.json'), 'utf-8');
|
||
return JSON.parse(raw);
|
||
}
|
||
|
||
/**
|
||
* Load the pipeline + storage registries so `buildPayload` can join each
|
||
* disruption event to its referenced asset and compute the `countries[]`
|
||
* denorm field (plan §R/#5 decision B).
|
||
*
|
||
* Pipelines contribute fromCountry, toCountry, and transitCountries[].
|
||
* Storage facilities contribute their single country code. Duplicates are
|
||
* deduped and sorted so the seed output is stable across runs — unstable
|
||
* ordering would churn the seeded payload bytes on every cron tick and
|
||
* defeat envelope diffing.
|
||
*
|
||
* @returns {{
|
||
* pipelines: Record<string, { fromCountry?: string; toCountry?: string; transitCountries?: string[] }>,
|
||
* storage: Record<string, { country?: string }>,
|
||
* }}
|
||
*/
|
||
function loadAssetRegistries() {
|
||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||
const gas = JSON.parse(readFileSync(resolve(__dirname, 'data', 'pipelines-gas.json'), 'utf-8'));
|
||
const oil = JSON.parse(readFileSync(resolve(__dirname, 'data', 'pipelines-oil.json'), 'utf-8'));
|
||
const storageRaw = JSON.parse(readFileSync(resolve(__dirname, 'data', 'storage-facilities.json'), 'utf-8'));
|
||
|
||
// Merge with explicit collision detection. A spread like
|
||
// { ...gas.pipelines, ...oil.pipelines } would silently let an oil
|
||
// entry overwrite a gas entry if a curator ever added a pipeline
|
||
// under the same id to both files — `deriveCountriesForEvent` would
|
||
// then return data for whichever side won the spread regardless of
|
||
// which commodity the disruption actually references, and the
|
||
// collision would surface as mysterious wrong-country filter
|
||
// results with no test or validator flagging it. Codex P2 on
|
||
// PR #3377. Throw loudly so the next cron tick fails validation
|
||
// and health alarms fire.
|
||
/** @type {Record<string, any>} */
|
||
const pipelines = {};
|
||
for (const [id, p] of Object.entries(gas.pipelines ?? {})) pipelines[id] = p;
|
||
for (const [id, p] of Object.entries(oil.pipelines ?? {})) {
|
||
if (pipelines[id]) {
|
||
throw new Error(
|
||
`Duplicate pipeline id "${id}" present in both pipelines-gas.json ` +
|
||
`and pipelines-oil.json — an event referencing this id would resolve ` +
|
||
`ambiguously. Rename one of them before re-running the seeder.`,
|
||
);
|
||
}
|
||
pipelines[id] = p;
|
||
}
|
||
|
||
return { pipelines, storage: storageRaw.facilities ?? {} };
|
||
}
|
||
|
||
/**
|
||
* Compute the denormalised country set for a single event.
|
||
*
|
||
* @param {{ assetId: string; assetType: string }} event
|
||
* @param {ReturnType<typeof loadAssetRegistries>} registries
|
||
* @returns {string[]} ISO2 codes, deduped + alpha-sorted. Empty array when
|
||
* the referenced asset cannot be resolved — callers (seeder) should
|
||
* treat empty as a hard validation failure so stale references surface
|
||
* loudly on the next cron tick rather than silently corrupt the filter.
|
||
*/
|
||
function deriveCountriesForEvent(event, registries) {
|
||
const out = new Set();
|
||
if (event.assetType === 'pipeline') {
|
||
const p = registries.pipelines[event.assetId];
|
||
if (p) {
|
||
if (typeof p.fromCountry === 'string') out.add(p.fromCountry);
|
||
if (typeof p.toCountry === 'string') out.add(p.toCountry);
|
||
if (Array.isArray(p.transitCountries)) {
|
||
for (const c of p.transitCountries) if (typeof c === 'string') out.add(c);
|
||
}
|
||
}
|
||
} else if (event.assetType === 'storage') {
|
||
const s = registries.storage[event.assetId];
|
||
if (s && typeof s.country === 'string') out.add(s.country);
|
||
}
|
||
return Array.from(out).sort();
|
||
}
|
||
|
||
/**
|
||
* @param {unknown} data
|
||
* @returns {boolean}
|
||
*/
|
||
export function validateRegistry(data) {
|
||
if (!data || typeof data !== 'object') return false;
|
||
const obj = /** @type {Record<string, unknown>} */ (data);
|
||
if (!obj.events || typeof obj.events !== 'object') return false;
|
||
const events = /** @type {Record<string, any>} */ (obj.events);
|
||
const entries = Object.entries(events);
|
||
if (entries.length < MIN_EVENTS) return false;
|
||
|
||
const seenIds = new Set();
|
||
for (const [key, e] of entries) {
|
||
if (seenIds.has(key)) return false;
|
||
seenIds.add(key);
|
||
if (e.id !== key) return false;
|
||
if (typeof e.assetId !== 'string' || e.assetId.length === 0) return false;
|
||
if (!VALID_ASSET_TYPES.has(e.assetType)) return false;
|
||
if (!VALID_EVENT_TYPES.has(e.eventType)) return false;
|
||
if (typeof e.startAt !== 'string' || !isIsoDate(e.startAt)) return false;
|
||
if (e.endAt !== null && (typeof e.endAt !== 'string' || !isIsoDate(e.endAt))) return false;
|
||
if (typeof e.capacityOfflineBcmYr !== 'number' || e.capacityOfflineBcmYr < 0) return false;
|
||
if (typeof e.capacityOfflineMbd !== 'number' || e.capacityOfflineMbd < 0) return false;
|
||
if (!Array.isArray(e.causeChain) || e.causeChain.length === 0) return false;
|
||
for (const c of e.causeChain) if (!VALID_CAUSES.has(c)) return false;
|
||
if (typeof e.shortDescription !== 'string' || e.shortDescription.length === 0) return false;
|
||
if (!Array.isArray(e.sources) || e.sources.length === 0) return false;
|
||
for (const s of e.sources) {
|
||
if (!s || typeof s !== 'object') return false;
|
||
if (typeof s.authority !== 'string' || typeof s.title !== 'string') return false;
|
||
if (typeof s.url !== 'string' || !s.url.startsWith('http')) return false;
|
||
if (typeof s.date !== 'string' || !isIsoDate(s.date)) return false;
|
||
if (!VALID_SOURCE_TYPES.has(s.sourceType)) return false;
|
||
}
|
||
if (typeof e.classifierVersion !== 'string') return false;
|
||
if (typeof e.classifierConfidence !== 'number' ||
|
||
e.classifierConfidence < 0 || e.classifierConfidence > 1) return false;
|
||
if (typeof e.lastEvidenceUpdate !== 'string' || !isIsoDate(e.lastEvidenceUpdate)) return false;
|
||
// endAt must not be earlier than startAt.
|
||
if (e.endAt) {
|
||
const start = Date.parse(e.startAt);
|
||
const end = Date.parse(e.endAt);
|
||
if (end < start) return false;
|
||
}
|
||
// countries[] is the denorm introduced in plan §R/#5 (decision B). Every
|
||
// event must resolve to ≥1 country code from its referenced asset. An
|
||
// empty array here means the upstream asset was removed or the assetId
|
||
// is misspelled — both are hard errors the cron should surface by
|
||
// failing validation (emptyDataIsFailure upstream preserves seed-meta
|
||
// staleness so health alarms fire).
|
||
if (!Array.isArray(e.countries) || e.countries.length === 0) return false;
|
||
for (const c of e.countries) {
|
||
if (typeof c !== 'string' || !/^[A-Z]{2}$/.test(c)) return false;
|
||
}
|
||
}
|
||
return true;
|
||
}
|
||
|
||
function isIsoDate(v) {
|
||
if (typeof v !== 'string') return false;
|
||
return Number.isFinite(Date.parse(v));
|
||
}
|
||
|
||
export function buildPayload() {
|
||
const registry = loadRegistry();
|
||
const assets = loadAssetRegistries();
|
||
|
||
// Denormalise countries[] on every event so CountryDeepDivePanel can
|
||
// filter by country without an asset-registry round trip. If an event's
|
||
// assetId cannot be resolved we leave countries[] empty — validateRegistry
|
||
// rejects that shape, which fails the seed (emptyDataIsFailure: true)
|
||
// and keeps seed-meta stale until the curator fixes the orphaned id.
|
||
const rawEvents = /** @type {Record<string, any>} */ (registry.events ?? {});
|
||
const events = Object.fromEntries(
|
||
Object.entries(rawEvents).map(([id, event]) => [
|
||
id,
|
||
{ ...event, countries: deriveCountriesForEvent(event, assets) },
|
||
]),
|
||
);
|
||
|
||
return { ...registry, events, updatedAt: new Date().toISOString() };
|
||
}
|
||
|
||
/**
|
||
* @param {any} data
|
||
* @returns {number}
|
||
*/
|
||
export function recordCount(data) {
|
||
return Object.keys(data?.events ?? {}).length;
|
||
}
|
||
|
||
/**
|
||
* @param {any} data
|
||
* @returns {number}
|
||
*/
|
||
export function declareRecords(data) {
|
||
return recordCount(data);
|
||
}
|
||
|
||
export const MAX_STALE_MIN = 20_160; // weekly cron × 2 headroom
|