diff --git a/proto/worldmonitor/supply_chain/v1/list_energy_disruptions.proto b/proto/worldmonitor/supply_chain/v1/list_energy_disruptions.proto index ac8c61f85..3a5650c0f 100644 --- a/proto/worldmonitor/supply_chain/v1/list_energy_disruptions.proto +++ b/proto/worldmonitor/supply_chain/v1/list_energy_disruptions.proto @@ -54,6 +54,12 @@ message EnergyDisruptionEntry { string classifier_version = 12; double classifier_confidence = 13; // 0..1 string last_evidence_update = 14; // ISO8601 + // Countries touched by the referenced asset (pipeline: fromCountry + + // toCountry + transitCountries; storage: country). Denormalised at seed + // time so CountryDeepDivePanel can filter events without a second RPC. + // Always non-empty in well-formed payloads; empty only if the upstream + // asset was removed without an event update. Per plan §R/#5 decision B. + repeated string countries = 15; } message EnergyDisruptionSource { diff --git a/scripts/_energy-disruption-registry.mjs b/scripts/_energy-disruption-registry.mjs index bf1038402..264298467 100644 --- a/scripts/_energy-disruption-registry.mjs +++ b/scripts/_energy-disruption-registry.mjs @@ -38,6 +38,61 @@ function loadRegistry() { return JSON.parse(raw); } +/** + * Load the pipeline + storage registries so `buildPayload` can join each + * disruption event to its referenced asset and compute the `countries[]` + * denorm field (plan §R/#5 decision B). + * + * Pipelines contribute fromCountry, toCountry, and transitCountries[]. + * Storage facilities contribute their single country code. Duplicates are + * deduped and sorted so the seed output is stable across runs — unstable + * ordering would churn the seeded payload bytes on every cron tick and + * defeat envelope diffing. + * + * @returns {{ + * pipelines: Record, + * storage: Record, + * }} + */ +function loadAssetRegistries() { + const __dirname = dirname(fileURLToPath(import.meta.url)); + const gas = JSON.parse(readFileSync(resolve(__dirname, 'data', 'pipelines-gas.json'), 'utf-8')); + const oil = JSON.parse(readFileSync(resolve(__dirname, 'data', 'pipelines-oil.json'), 'utf-8')); + const storageRaw = JSON.parse(readFileSync(resolve(__dirname, 'data', 'storage-facilities.json'), 'utf-8')); + return { + pipelines: { ...(gas.pipelines ?? {}), ...(oil.pipelines ?? {}) }, + storage: storageRaw.facilities ?? {}, + }; +} + +/** + * Compute the denormalised country set for a single event. + * + * @param {{ assetId: string; assetType: string }} event + * @param {ReturnType} registries + * @returns {string[]} ISO2 codes, deduped + alpha-sorted. Empty array when + * the referenced asset cannot be resolved — callers (seeder) should + * treat empty as a hard validation failure so stale references surface + * loudly on the next cron tick rather than silently corrupt the filter. + */ +function deriveCountriesForEvent(event, registries) { + const out = new Set(); + if (event.assetType === 'pipeline') { + const p = registries.pipelines[event.assetId]; + if (p) { + if (typeof p.fromCountry === 'string') out.add(p.fromCountry); + if (typeof p.toCountry === 'string') out.add(p.toCountry); + if (Array.isArray(p.transitCountries)) { + for (const c of p.transitCountries) if (typeof c === 'string') out.add(c); + } + } + } else if (event.assetType === 'storage') { + const s = registries.storage[event.assetId]; + if (s && typeof s.country === 'string') out.add(s.country); + } + return Array.from(out).sort(); +} + /** * @param {unknown} data * @returns {boolean} @@ -83,6 +138,16 @@ export function validateRegistry(data) { const end = Date.parse(e.endAt); if (end < start) return false; } + // countries[] is the denorm introduced in plan §R/#5 (decision B). Every + // event must resolve to ≥1 country code from its referenced asset. An + // empty array here means the upstream asset was removed or the assetId + // is misspelled — both are hard errors the cron should surface by + // failing validation (emptyDataIsFailure upstream preserves seed-meta + // staleness so health alarms fire). + if (!Array.isArray(e.countries) || e.countries.length === 0) return false; + for (const c of e.countries) { + if (typeof c !== 'string' || !/^[A-Z]{2}$/.test(c)) return false; + } } return true; } @@ -94,7 +159,22 @@ function isIsoDate(v) { export function buildPayload() { const registry = loadRegistry(); - return { ...registry, updatedAt: new Date().toISOString() }; + const assets = loadAssetRegistries(); + + // Denormalise countries[] on every event so CountryDeepDivePanel can + // filter by country without an asset-registry round trip. If an event's + // assetId cannot be resolved we leave countries[] empty — validateRegistry + // rejects that shape, which fails the seed (emptyDataIsFailure: true) + // and keeps seed-meta stale until the curator fixes the orphaned id. + const rawEvents = /** @type {Record} */ (registry.events ?? {}); + const events = Object.fromEntries( + Object.entries(rawEvents).map(([id, event]) => [ + id, + { ...event, countries: deriveCountriesForEvent(event, assets) }, + ]), + ); + + return { ...registry, events, updatedAt: new Date().toISOString() }; } /** diff --git a/scripts/data/energy-disruptions.json b/scripts/data/energy-disruptions.json index e1550dbe4..3dac49f62 100644 --- a/scripts/data/energy-disruptions.json +++ b/scripts/data/energy-disruptions.json @@ -63,7 +63,7 @@ }, "cpc-force-majeure-2022": { "id": "cpc-force-majeure-2022", - "assetId": "cpc-pipeline", + "assetId": "cpc", "assetType": "pipeline", "eventType": "mechanical", "startAt": "2022-03-22T00:00:00Z", @@ -275,7 +275,7 @@ "classifierVersion": "v1", "classifierConfidence": 0.96, "lastEvidenceUpdate": "2026-04-22T00:00:00Z" }, "pdvsa-designation-2019": { - "id": "pdvsa-designation-2019", "assetId": "ve-petrol-2026-q1", "assetType": "pipeline", + "id": "pdvsa-designation-2019", "assetId": "venezuela-anzoategui-puerto-la-cruz", "assetType": "pipeline", "eventType": "sanction", "startAt": "2019-01-28T00:00:00Z", "endAt": null, "capacityOfflineBcmYr": 0, "capacityOfflineMbd": 0.5, "causeChain": ["sanction"], diff --git a/server/worldmonitor/supply-chain/v1/list-energy-disruptions.ts b/server/worldmonitor/supply-chain/v1/list-energy-disruptions.ts index ae3509943..38a80734e 100644 --- a/server/worldmonitor/supply-chain/v1/list-energy-disruptions.ts +++ b/server/worldmonitor/supply-chain/v1/list-energy-disruptions.ts @@ -59,6 +59,13 @@ export function projectDisruption(raw: unknown): EnergyDisruptionEntry | null { classifierVersion: coerceString(r.classifierVersion, 'v1'), classifierConfidence: coerceNumber(r.classifierConfidence), lastEvidenceUpdate: coerceString(r.lastEvidenceUpdate), + // Seed-denormalised countries[] (plan §R/#5 decision B). The registry + // seeder joins each event's assetId against the pipeline/storage + // registries and emits the touched ISO2 set. Legacy rows written + // before the denorm shipped can still exist in Redis transiently; we + // surface an empty array there so the field is always present on the + // wire but consumers can detect pre-denorm data by checking length. + countries: coerceStringArray(r.countries), }; } diff --git a/src/components/CountryDeepDivePanel.ts b/src/components/CountryDeepDivePanel.ts index f8e8e2144..16296f874 100644 --- a/src/components/CountryDeepDivePanel.ts +++ b/src/components/CountryDeepDivePanel.ts @@ -1255,6 +1255,62 @@ export class CountryDeepDivePanel implements CountryBriefPanel { ); } }).catch(() => {}); + + // Disruptions filter (plan §R/#5 decision B). The seeded registry carries + // denormalised `countries[]` on every event, populated from the referenced + // pipeline or storage facility. We fetch the full list once (no asset + // filter) and narrow client-side; the bootstrap payload already contains + // the registry so this is usually cache-hot. If the RPC round-trip returns + // nothing, we silently skip — CountryDeepDive is not the primary + // disruption surface (EnergyDisruptionsPanel is), so an empty row is + // preferable to a spurious error. + this.loadDisruptionsForCountry(iso2); + } + + private async loadDisruptionsForCountry(iso2: string): Promise { + try { + const { SupplyChainServiceClient } = await import( + '@/generated/client/worldmonitor/supply_chain/v1/service_client' + ); + const { getRpcBaseUrl } = await import('@/services/rpc-client'); + const client = new SupplyChainServiceClient(getRpcBaseUrl(), { + fetch: (...args: Parameters) => globalThis.fetch(...args), + }); + const res = await client.listEnergyDisruptions({ + assetId: '', + assetType: '', + ongoingOnly: false, + }); + if (!res || !Array.isArray(res.events) || this.currentCode !== iso2) return; + const events = res.events.filter(e => + Array.isArray(e.countries) && e.countries.includes(iso2), + ); + if (events.length === 0) return; + const ongoing = events.filter(e => !e.endAt).length; + const summary = ongoing > 0 + ? `${ongoing} ongoing · ${events.length - ongoing} resolved` + : `${events.length} resolved`; + this.appendAtlasRow( + `Energy disruptions in ${iso2}`, + summary, + events.map(e => ({ + id: e.id, + label: `${e.eventType} — ${e.shortDescription}`, + // Event type mirrors the existing asset-detail events (pipeline / + // storage) because disruptions reference the underlying asset; the + // panel-layout listener routes to the matching asset panel. + event: e.assetType === 'storage' + ? 'energy:open-storage-facility-detail' + : 'energy:open-pipeline-detail', + detail: e.assetType === 'storage' + ? { facilityId: e.assetId, highlightEventId: e.id } + : { pipelineId: e.assetId, highlightEventId: e.id }, + })), + ); + } catch { + // Silent — disruptions row is supplementary; failures elsewhere + // surface via the dedicated EnergyDisruptionsPanel. + } } private appendAtlasRow( diff --git a/src/generated/client/worldmonitor/supply_chain/v1/service_client.ts b/src/generated/client/worldmonitor/supply_chain/v1/service_client.ts index 81bcd350f..5df2b0821 100644 --- a/src/generated/client/worldmonitor/supply_chain/v1/service_client.ts +++ b/src/generated/client/worldmonitor/supply_chain/v1/service_client.ts @@ -621,6 +621,7 @@ export interface EnergyDisruptionEntry { classifierVersion: string; classifierConfidence: number; lastEvidenceUpdate: string; + countries: string[]; } export interface EnergyDisruptionSource { diff --git a/src/generated/server/worldmonitor/supply_chain/v1/service_server.ts b/src/generated/server/worldmonitor/supply_chain/v1/service_server.ts index f0ae00db7..52f5fb6f1 100644 --- a/src/generated/server/worldmonitor/supply_chain/v1/service_server.ts +++ b/src/generated/server/worldmonitor/supply_chain/v1/service_server.ts @@ -621,6 +621,7 @@ export interface EnergyDisruptionEntry { classifierVersion: string; classifierConfidence: number; lastEvidenceUpdate: string; + countries: string[]; } export interface EnergyDisruptionSource { diff --git a/tests/energy-disruptions-registry.test.mts b/tests/energy-disruptions-registry.test.mts index 7fa3a190d..1c2fa8a72 100644 --- a/tests/energy-disruptions-registry.test.mts +++ b/tests/energy-disruptions-registry.test.mts @@ -7,13 +7,20 @@ import { fileURLToPath } from 'node:url'; import { validateRegistry, recordCount, + buildPayload, ENERGY_DISRUPTIONS_CANONICAL_KEY, MAX_STALE_MIN, } from '../scripts/_energy-disruption-registry.mjs'; const __dirname = dirname(fileURLToPath(import.meta.url)); const raw = readFileSync(resolve(__dirname, '../scripts/data/energy-disruptions.json'), 'utf-8'); -const registry = JSON.parse(raw) as { events: Record }; +const rawRegistry = JSON.parse(raw) as { events: Record }; +// validateRegistry checks the buildPayload output (the denormalised shape +// the seeder actually writes to Redis), not the raw JSON on disk. Since +// plan §R/#5 decision B, buildPayload attaches countries[] per event; the +// raw file intentionally omits that field so a curator can edit events +// without manually computing affected countries. +const registry = buildPayload() as { events: Record }; describe('energy-disruptions registry — schema', () => { test('registry passes validateRegistry', () => { @@ -94,6 +101,37 @@ describe('energy-disruptions registry — evidence', () => { }); }); +describe('energy-disruptions registry — countries[] denorm (§R #5 B)', () => { + test('every event in buildPayload output has non-empty countries[]', () => { + for (const e of Object.values(registry.events)) { + assert.ok( + Array.isArray(e.countries) && e.countries.length > 0, + `${e.id}: empty countries[] — assetId may be orphaned`, + ); + } + }); + + test('every country code is ISO-3166-1 alpha-2 uppercase', () => { + for (const e of Object.values(registry.events)) { + for (const c of e.countries) { + assert.ok(/^[A-Z]{2}$/.test(c), `${e.id}: bad country code ${c}`); + } + } + }); + + test('raw JSON on disk does NOT carry countries[] (source of truth is the join)', () => { + for (const e of Object.values(rawRegistry.events)) { + assert.equal(e.countries, undefined, `${e.id}: raw JSON should not pre-compute countries[]`); + } + }); + + test('nord-stream-1-sabotage-2022 resolves to [DE, RU]', () => { + const nord = registry.events['nord-stream-1-sabotage-2022']; + assert.ok(nord, 'nord-stream-1-sabotage-2022 missing from registry'); + assert.deepEqual(nord.countries, ['DE', 'RU']); + }); +}); + describe('energy-disruptions registry — validateRegistry rejects bad input', () => { test('rejects empty object', () => { assert.equal(validateRegistry({}), false);