feat(energy-atlas): seed-side countries[] denorm + CountryDeepDive row (§R #5 = B)

Per plan §R/#5 decision B: denormalise countries[] at seed time on each
disruption event so CountryDeepDivePanel can filter events per country
without an asset-registry round trip. Schema join (pipeline/storage
→ event.assetId) happens once in the weekly cron, not on every panel
render. The alternative (client-side join) was rejected because it
couples UI logic to asset-registry internals and duplicates the join
for every surface that wants a per-country filter.

Changes:
- `proto/.../list_energy_disruptions.proto`: add `repeated string
  countries = 15` to EnergyDisruptionEntry with doc comment tying it
  to the plan decision and the always-non-empty invariant.
- `scripts/_energy-disruption-registry.mjs`:
    • Load pipeline-gas + pipeline-oil + storage-facilities registries
      once per seed cycle; index by id.
    • `deriveCountriesForEvent()` resolves assetId to {fromCountry,
      toCountry, transitCountries} (pipeline) or {country} (storage),
      deduped + alpha-sorted so byte-diff stability holds.
    • `buildPayload()` attaches the computed countries[] to every
      event before writing.
    • `validateRegistry()` now requires non-empty countries[] of
      ISO2 codes. Combined with the seeder's `emptyDataIsFailure:
      true`, this surfaces orphaned assetIds loudly — the next cron
      tick fails validation and seed-meta stays stale, tripping
      health alarms.
- `scripts/data/energy-disruptions.json`: fix two orphaned assetIds
  that the new join caught:
    • `cpc-force-majeure-2022`: `cpc-pipeline` → `cpc` (matches the
      entry in pipelines-oil.json).
    • `pdvsa-designation-2019`: `ve-petrol-2026-q1` (non-existent) →
      `venezuela-anzoategui-puerto-la-cruz`.
- `server/.../list-energy-disruptions.ts`: project countries[] into
  the RPC response via coerceStringArray. Legacy pre-denorm rows
  surface as empty array (always present on wire, length 0 => old).
- `src/components/CountryDeepDivePanel.ts`: add 4th Atlas row —
  "Energy disruptions in {iso2}" — filtered by `iso2 ∈ countries[]`.
  Failure is silent; EnergyDisruptionsPanel (upcoming) is the
  primary disruption surface.
- `tests/energy-disruptions-registry.test.mts`: switch to validating
  the buildPayload output (post-denorm), add §R #5 B invariant
  tests, plus a raw-JSON invariant ensuring curators don't hand-edit
  countries[] (it's derived, not declared).

Proto regen note: `make generate` currently fails with a duplicate
openapi plugin collision in buf.gen.yaml (unrelated bug — 3 plugin
entries emit to the same out dir). Worked around by temporarily
trimming buf.gen.yaml to just the TS plugins for this regen. Added
only the `countries: string[]` wire field to both service_client and
service_server; no other generated-file drift in this PR.
This commit is contained in:
Elie Habib
2026-04-24 10:22:01 +04:00
parent 959086fd45
commit 80797e7cc8
8 changed files with 193 additions and 4 deletions

View File

@@ -54,6 +54,12 @@ message EnergyDisruptionEntry {
string classifier_version = 12;
double classifier_confidence = 13; // 0..1
string last_evidence_update = 14; // ISO8601
// Countries touched by the referenced asset (pipeline: fromCountry +
// toCountry + transitCountries; storage: country). Denormalised at seed
// time so CountryDeepDivePanel can filter events without a second RPC.
// Always non-empty in well-formed payloads; empty only if the upstream
// asset was removed without an event update. Per plan §R/#5 decision B.
repeated string countries = 15;
}
message EnergyDisruptionSource {

View File

@@ -38,6 +38,61 @@ function loadRegistry() {
return JSON.parse(raw);
}
/**
* Load the pipeline + storage registries so `buildPayload` can join each
* disruption event to its referenced asset and compute the `countries[]`
* denorm field (plan §R/#5 decision B).
*
* Pipelines contribute fromCountry, toCountry, and transitCountries[].
* Storage facilities contribute their single country code. Duplicates are
* deduped and sorted so the seed output is stable across runs — unstable
* ordering would churn the seeded payload bytes on every cron tick and
* defeat envelope diffing.
*
* @returns {{
* pipelines: Record<string, { fromCountry?: string; toCountry?: string; transitCountries?: string[] }>,
* storage: Record<string, { country?: string }>,
* }}
*/
function loadAssetRegistries() {
const __dirname = dirname(fileURLToPath(import.meta.url));
const gas = JSON.parse(readFileSync(resolve(__dirname, 'data', 'pipelines-gas.json'), 'utf-8'));
const oil = JSON.parse(readFileSync(resolve(__dirname, 'data', 'pipelines-oil.json'), 'utf-8'));
const storageRaw = JSON.parse(readFileSync(resolve(__dirname, 'data', 'storage-facilities.json'), 'utf-8'));
return {
pipelines: { ...(gas.pipelines ?? {}), ...(oil.pipelines ?? {}) },
storage: storageRaw.facilities ?? {},
};
}
/**
* Compute the denormalised country set for a single event.
*
* @param {{ assetId: string; assetType: string }} event
* @param {ReturnType<typeof loadAssetRegistries>} registries
* @returns {string[]} ISO2 codes, deduped + alpha-sorted. Empty array when
* the referenced asset cannot be resolved — callers (seeder) should
* treat empty as a hard validation failure so stale references surface
* loudly on the next cron tick rather than silently corrupt the filter.
*/
function deriveCountriesForEvent(event, registries) {
const out = new Set();
if (event.assetType === 'pipeline') {
const p = registries.pipelines[event.assetId];
if (p) {
if (typeof p.fromCountry === 'string') out.add(p.fromCountry);
if (typeof p.toCountry === 'string') out.add(p.toCountry);
if (Array.isArray(p.transitCountries)) {
for (const c of p.transitCountries) if (typeof c === 'string') out.add(c);
}
}
} else if (event.assetType === 'storage') {
const s = registries.storage[event.assetId];
if (s && typeof s.country === 'string') out.add(s.country);
}
return Array.from(out).sort();
}
/**
* @param {unknown} data
* @returns {boolean}
@@ -83,6 +138,16 @@ export function validateRegistry(data) {
const end = Date.parse(e.endAt);
if (end < start) return false;
}
// countries[] is the denorm introduced in plan §R/#5 (decision B). Every
// event must resolve to ≥1 country code from its referenced asset. An
// empty array here means the upstream asset was removed or the assetId
// is misspelled — both are hard errors the cron should surface by
// failing validation (emptyDataIsFailure upstream preserves seed-meta
// staleness so health alarms fire).
if (!Array.isArray(e.countries) || e.countries.length === 0) return false;
for (const c of e.countries) {
if (typeof c !== 'string' || !/^[A-Z]{2}$/.test(c)) return false;
}
}
return true;
}
@@ -94,7 +159,22 @@ function isIsoDate(v) {
export function buildPayload() {
const registry = loadRegistry();
return { ...registry, updatedAt: new Date().toISOString() };
const assets = loadAssetRegistries();
// Denormalise countries[] on every event so CountryDeepDivePanel can
// filter by country without an asset-registry round trip. If an event's
// assetId cannot be resolved we leave countries[] empty — validateRegistry
// rejects that shape, which fails the seed (emptyDataIsFailure: true)
// and keeps seed-meta stale until the curator fixes the orphaned id.
const rawEvents = /** @type {Record<string, any>} */ (registry.events ?? {});
const events = Object.fromEntries(
Object.entries(rawEvents).map(([id, event]) => [
id,
{ ...event, countries: deriveCountriesForEvent(event, assets) },
]),
);
return { ...registry, events, updatedAt: new Date().toISOString() };
}
/**

View File

@@ -63,7 +63,7 @@
},
"cpc-force-majeure-2022": {
"id": "cpc-force-majeure-2022",
"assetId": "cpc-pipeline",
"assetId": "cpc",
"assetType": "pipeline",
"eventType": "mechanical",
"startAt": "2022-03-22T00:00:00Z",
@@ -275,7 +275,7 @@
"classifierVersion": "v1", "classifierConfidence": 0.96, "lastEvidenceUpdate": "2026-04-22T00:00:00Z"
},
"pdvsa-designation-2019": {
"id": "pdvsa-designation-2019", "assetId": "ve-petrol-2026-q1", "assetType": "pipeline",
"id": "pdvsa-designation-2019", "assetId": "venezuela-anzoategui-puerto-la-cruz", "assetType": "pipeline",
"eventType": "sanction", "startAt": "2019-01-28T00:00:00Z", "endAt": null,
"capacityOfflineBcmYr": 0, "capacityOfflineMbd": 0.5,
"causeChain": ["sanction"],

View File

@@ -59,6 +59,13 @@ export function projectDisruption(raw: unknown): EnergyDisruptionEntry | null {
classifierVersion: coerceString(r.classifierVersion, 'v1'),
classifierConfidence: coerceNumber(r.classifierConfidence),
lastEvidenceUpdate: coerceString(r.lastEvidenceUpdate),
// Seed-denormalised countries[] (plan §R/#5 decision B). The registry
// seeder joins each event's assetId against the pipeline/storage
// registries and emits the touched ISO2 set. Legacy rows written
// before the denorm shipped can still exist in Redis transiently; we
// surface an empty array there so the field is always present on the
// wire but consumers can detect pre-denorm data by checking length.
countries: coerceStringArray(r.countries),
};
}

View File

@@ -1255,6 +1255,62 @@ export class CountryDeepDivePanel implements CountryBriefPanel {
);
}
}).catch(() => {});
// Disruptions filter (plan §R/#5 decision B). The seeded registry carries
// denormalised `countries[]` on every event, populated from the referenced
// pipeline or storage facility. We fetch the full list once (no asset
// filter) and narrow client-side; the bootstrap payload already contains
// the registry so this is usually cache-hot. If the RPC round-trip returns
// nothing, we silently skip — CountryDeepDive is not the primary
// disruption surface (EnergyDisruptionsPanel is), so an empty row is
// preferable to a spurious error.
this.loadDisruptionsForCountry(iso2);
}
private async loadDisruptionsForCountry(iso2: string): Promise<void> {
try {
const { SupplyChainServiceClient } = await import(
'@/generated/client/worldmonitor/supply_chain/v1/service_client'
);
const { getRpcBaseUrl } = await import('@/services/rpc-client');
const client = new SupplyChainServiceClient(getRpcBaseUrl(), {
fetch: (...args: Parameters<typeof fetch>) => globalThis.fetch(...args),
});
const res = await client.listEnergyDisruptions({
assetId: '',
assetType: '',
ongoingOnly: false,
});
if (!res || !Array.isArray(res.events) || this.currentCode !== iso2) return;
const events = res.events.filter(e =>
Array.isArray(e.countries) && e.countries.includes(iso2),
);
if (events.length === 0) return;
const ongoing = events.filter(e => !e.endAt).length;
const summary = ongoing > 0
? `${ongoing} ongoing · ${events.length - ongoing} resolved`
: `${events.length} resolved`;
this.appendAtlasRow(
`Energy disruptions in ${iso2}`,
summary,
events.map(e => ({
id: e.id,
label: `${e.eventType}${e.shortDescription}`,
// Event type mirrors the existing asset-detail events (pipeline /
// storage) because disruptions reference the underlying asset; the
// panel-layout listener routes to the matching asset panel.
event: e.assetType === 'storage'
? 'energy:open-storage-facility-detail'
: 'energy:open-pipeline-detail',
detail: e.assetType === 'storage'
? { facilityId: e.assetId, highlightEventId: e.id }
: { pipelineId: e.assetId, highlightEventId: e.id },
})),
);
} catch {
// Silent — disruptions row is supplementary; failures elsewhere
// surface via the dedicated EnergyDisruptionsPanel.
}
}
private appendAtlasRow(

View File

@@ -621,6 +621,7 @@ export interface EnergyDisruptionEntry {
classifierVersion: string;
classifierConfidence: number;
lastEvidenceUpdate: string;
countries: string[];
}
export interface EnergyDisruptionSource {

View File

@@ -621,6 +621,7 @@ export interface EnergyDisruptionEntry {
classifierVersion: string;
classifierConfidence: number;
lastEvidenceUpdate: string;
countries: string[];
}
export interface EnergyDisruptionSource {

View File

@@ -7,13 +7,20 @@ import { fileURLToPath } from 'node:url';
import {
validateRegistry,
recordCount,
buildPayload,
ENERGY_DISRUPTIONS_CANONICAL_KEY,
MAX_STALE_MIN,
} from '../scripts/_energy-disruption-registry.mjs';
const __dirname = dirname(fileURLToPath(import.meta.url));
const raw = readFileSync(resolve(__dirname, '../scripts/data/energy-disruptions.json'), 'utf-8');
const registry = JSON.parse(raw) as { events: Record<string, any> };
const rawRegistry = JSON.parse(raw) as { events: Record<string, any> };
// validateRegistry checks the buildPayload output (the denormalised shape
// the seeder actually writes to Redis), not the raw JSON on disk. Since
// plan §R/#5 decision B, buildPayload attaches countries[] per event; the
// raw file intentionally omits that field so a curator can edit events
// without manually computing affected countries.
const registry = buildPayload() as { events: Record<string, any> };
describe('energy-disruptions registry — schema', () => {
test('registry passes validateRegistry', () => {
@@ -94,6 +101,37 @@ describe('energy-disruptions registry — evidence', () => {
});
});
describe('energy-disruptions registry — countries[] denorm (§R #5 B)', () => {
test('every event in buildPayload output has non-empty countries[]', () => {
for (const e of Object.values(registry.events)) {
assert.ok(
Array.isArray(e.countries) && e.countries.length > 0,
`${e.id}: empty countries[] — assetId may be orphaned`,
);
}
});
test('every country code is ISO-3166-1 alpha-2 uppercase', () => {
for (const e of Object.values(registry.events)) {
for (const c of e.countries) {
assert.ok(/^[A-Z]{2}$/.test(c), `${e.id}: bad country code ${c}`);
}
}
});
test('raw JSON on disk does NOT carry countries[] (source of truth is the join)', () => {
for (const e of Object.values(rawRegistry.events)) {
assert.equal(e.countries, undefined, `${e.id}: raw JSON should not pre-compute countries[]`);
}
});
test('nord-stream-1-sabotage-2022 resolves to [DE, RU]', () => {
const nord = registry.events['nord-stream-1-sabotage-2022'];
assert.ok(nord, 'nord-stream-1-sabotage-2022 missing from registry');
assert.deepEqual(nord.countries, ['DE', 'RU']);
});
});
describe('energy-disruptions registry — validateRegistry rejects bad input', () => {
test('rejects empty object', () => {
assert.equal(validateRegistry({}), false);