feat(energy-atlas): seed-side countries[] denorm on disruptions + CountryDeepDive row (§R #5 = B) (#3377)

* feat(energy-atlas): seed-side countries[] denorm + CountryDeepDive row (§R #5 = B)

Per plan §R/#5 decision B: denormalise countries[] at seed time on each
disruption event so CountryDeepDivePanel can filter events per country
without an asset-registry round trip. Schema join (pipeline/storage
→ event.assetId) happens once in the weekly cron, not on every panel
render. The alternative (client-side join) was rejected because it
couples UI logic to asset-registry internals and duplicates the join
for every surface that wants a per-country filter.

Changes:
- `proto/.../list_energy_disruptions.proto`: add `repeated string
  countries = 15` to EnergyDisruptionEntry with doc comment tying it
  to the plan decision and the always-non-empty invariant.
- `scripts/_energy-disruption-registry.mjs`:
    • Load pipeline-gas + pipeline-oil + storage-facilities registries
      once per seed cycle; index by id.
    • `deriveCountriesForEvent()` resolves assetId to {fromCountry,
      toCountry, transitCountries} (pipeline) or {country} (storage),
      deduped + alpha-sorted so byte-diff stability holds.
    • `buildPayload()` attaches the computed countries[] to every
      event before writing.
    • `validateRegistry()` now requires non-empty countries[] of
      ISO2 codes. Combined with the seeder's `emptyDataIsFailure:
      true`, this surfaces orphaned assetIds loudly — the next cron
      tick fails validation and seed-meta stays stale, tripping
      health alarms.
- `scripts/data/energy-disruptions.json`: fix two orphaned assetIds
  that the new join caught:
    • `cpc-force-majeure-2022`: `cpc-pipeline` → `cpc` (matches the
      entry in pipelines-oil.json).
    • `pdvsa-designation-2019`: `ve-petrol-2026-q1` (non-existent) →
      `venezuela-anzoategui-puerto-la-cruz`.
- `server/.../list-energy-disruptions.ts`: project countries[] into
  the RPC response via coerceStringArray. Legacy pre-denorm rows
  surface as empty array (always present on wire, length 0 => old).
- `src/components/CountryDeepDivePanel.ts`: add 4th Atlas row —
  "Energy disruptions in {iso2}" — filtered by `iso2 ∈ countries[]`.
  Failure is silent; EnergyDisruptionsPanel (upcoming) is the
  primary disruption surface.
- `tests/energy-disruptions-registry.test.mts`: switch to validating
  the buildPayload output (post-denorm), add §R #5 B invariant
  tests, plus a raw-JSON invariant ensuring curators don't hand-edit
  countries[] (it's derived, not declared).

Proto regen note: `make generate` currently fails with a duplicate
openapi plugin collision in buf.gen.yaml (unrelated bug — 3 plugin
entries emit to the same out dir). Worked around by temporarily
trimming buf.gen.yaml to just the TS plugins for this regen. Added
only the `countries: string[]` wire field to both service_client and
service_server; no other generated-file drift in this PR.

* chore(proto): regenerate openapi specs for countries[] field

Runs `make generate` with the sebuf v0.11.1 plugin now correctly
resolved via the PATH fix (cherry-picked from fix/makefile-generate-path-prefix).
The new `countries` field on EnergyDisruptionEntry propagates into:

- docs/api/SupplyChainService.openapi.yaml (primary per-service spec)
- docs/api/SupplyChainService.openapi.json (machine-readable variant)
- docs/api/worldmonitor.openapi.yaml (consolidated bundle)

No TypeScript drift beyond the already-committed service_client.ts /
service_server.ts updates in 80797e7cc.

* fix(energy-atlas): drop highlightEventId emission (review P2)

Codex P2: loadDisruptionsForCountry dispatched `highlightEventId` but
neither PipelineStatusPanel nor StorageFacilityMapPanel consumes it
(the openDetailHandler reads only pipelineId / facilityId). The UI's
implicit promise (event-specific highlighting) wasn't delivered —
clickthrough was asset-generic, and the extra wire field was a
misleading API surface.

Fix: emit only {pipelineId, facilityId} in the dispatched detail.
Row click opens the asset drawer; user sees the full per-asset
disruption timeline and locates the event visually.

Symmetric fix for PR #3378's EnergyDisruptionsPanel — both emitters
now match the drawer contract exactly. Re-add `highlightEventId`
here when the drawer panels ship matching consumer code
(openDetailHandler accepts it, loadDetail stores it,
renderDisruptionTimeline scrolls + emphasises the matching event).

Typecheck clean, test:data 6698/6698 pass.

* fix(energy-atlas): collision detection + abort signal + label clamp (review P2)

Three Codex P2 findings on PR #3377:

1. `loadAssetRegistries()` spread-merged gas + oil pipelines, silently
   overwriting entries on id collision. No collision today, but a
   curator adding a pipeline under the same id to both files would
   cause `deriveCountriesForEvent` to return wrong-commodity country
   data with no test flagging it.

   Fix: explicit merge loop that throws on duplicate id. The next
   cron tick fails validation, seed-meta stays stale, health alarms
   fire — same loud-failure pattern the rest of the seeder uses.

2. `loadDisruptionsForCountry` didn't thread `this.signal` through
   the RPC fetch shim. The stale-closure guard (`currentCode !== iso2`)
   discarded stale RESULTS, but the in-flight request couldn't be
   cancelled when the user switched countries or closed the panel.

   Fix: wrap globalThis.fetch with { signal: this.signal } in the
   client factory, matching the signal lifecycle the rest of the
   panel already uses.

3. `shortDescription` values up to 200 chars rendered without
   ellipsis in the compact Atlas row, overflowing the row layout.

   Fix: new `truncateDisruptionLabel` helper clamps to 80 chars with
   ellipsis. Full text still accessible via click-through to the
   asset drawer.

Typecheck clean, test:data 6698/6698 pass.
This commit is contained in:
Elie Habib
2026-04-24 19:08:07 +04:00
committed by GitHub
parent a04c53fe26
commit 7c0c08ad89
11 changed files with 267 additions and 5 deletions

File diff suppressed because one or more lines are too long

View File

@@ -2319,6 +2319,16 @@ components:
format: double
lastEvidenceUpdate:
type: string
countries:
type: array
items:
type: string
description: |-
Countries touched by the referenced asset (pipeline: fromCountry +
toCountry + transitCountries; storage: country). Denormalised at seed
time so CountryDeepDivePanel can filter events without a second RPC.
Always non-empty in well-formed payloads; empty only if the upstream
asset was removed without an event update. Per plan §R/#5 decision B.
EnergyDisruptionSource:
type: object
properties:

View File

@@ -20518,6 +20518,16 @@ components:
format: double
lastEvidenceUpdate:
type: string
countries:
type: array
items:
type: string
description: |-
Countries touched by the referenced asset (pipeline: fromCountry +
toCountry + transitCountries; storage: country). Denormalised at seed
time so CountryDeepDivePanel can filter events without a second RPC.
Always non-empty in well-formed payloads; empty only if the upstream
asset was removed without an event update. Per plan §R/#5 decision B.
worldmonitor_supply_chain_v1_EnergyDisruptionSource:
type: object
properties:

View File

@@ -54,6 +54,12 @@ message EnergyDisruptionEntry {
string classifier_version = 12;
double classifier_confidence = 13; // 0..1
string last_evidence_update = 14; // ISO8601
// Countries touched by the referenced asset (pipeline: fromCountry +
// toCountry + transitCountries; storage: country). Denormalised at seed
// time so CountryDeepDivePanel can filter events without a second RPC.
// Always non-empty in well-formed payloads; empty only if the upstream
// asset was removed without an event update. Per plan §R/#5 decision B.
repeated string countries = 15;
}
message EnergyDisruptionSource {

View File

@@ -38,6 +38,83 @@ function loadRegistry() {
return JSON.parse(raw);
}
/**
* Load the pipeline + storage registries so `buildPayload` can join each
* disruption event to its referenced asset and compute the `countries[]`
* denorm field (plan §R/#5 decision B).
*
* Pipelines contribute fromCountry, toCountry, and transitCountries[].
* Storage facilities contribute their single country code. Duplicates are
* deduped and sorted so the seed output is stable across runs — unstable
* ordering would churn the seeded payload bytes on every cron tick and
* defeat envelope diffing.
*
* @returns {{
* pipelines: Record<string, { fromCountry?: string; toCountry?: string; transitCountries?: string[] }>,
* storage: Record<string, { country?: string }>,
* }}
*/
function loadAssetRegistries() {
const __dirname = dirname(fileURLToPath(import.meta.url));
const gas = JSON.parse(readFileSync(resolve(__dirname, 'data', 'pipelines-gas.json'), 'utf-8'));
const oil = JSON.parse(readFileSync(resolve(__dirname, 'data', 'pipelines-oil.json'), 'utf-8'));
const storageRaw = JSON.parse(readFileSync(resolve(__dirname, 'data', 'storage-facilities.json'), 'utf-8'));
// Merge with explicit collision detection. A spread like
// { ...gas.pipelines, ...oil.pipelines } would silently let an oil
// entry overwrite a gas entry if a curator ever added a pipeline
// under the same id to both files — `deriveCountriesForEvent` would
// then return data for whichever side won the spread regardless of
// which commodity the disruption actually references, and the
// collision would surface as mysterious wrong-country filter
// results with no test or validator flagging it. Codex P2 on
// PR #3377. Throw loudly so the next cron tick fails validation
// and health alarms fire.
/** @type {Record<string, any>} */
const pipelines = {};
for (const [id, p] of Object.entries(gas.pipelines ?? {})) pipelines[id] = p;
for (const [id, p] of Object.entries(oil.pipelines ?? {})) {
if (pipelines[id]) {
throw new Error(
`Duplicate pipeline id "${id}" present in both pipelines-gas.json ` +
`and pipelines-oil.json — an event referencing this id would resolve ` +
`ambiguously. Rename one of them before re-running the seeder.`,
);
}
pipelines[id] = p;
}
return { pipelines, storage: storageRaw.facilities ?? {} };
}
/**
* Compute the denormalised country set for a single event.
*
* @param {{ assetId: string; assetType: string }} event
* @param {ReturnType<typeof loadAssetRegistries>} registries
* @returns {string[]} ISO2 codes, deduped + alpha-sorted. Empty array when
* the referenced asset cannot be resolved — callers (seeder) should
* treat empty as a hard validation failure so stale references surface
* loudly on the next cron tick rather than silently corrupt the filter.
*/
function deriveCountriesForEvent(event, registries) {
const out = new Set();
if (event.assetType === 'pipeline') {
const p = registries.pipelines[event.assetId];
if (p) {
if (typeof p.fromCountry === 'string') out.add(p.fromCountry);
if (typeof p.toCountry === 'string') out.add(p.toCountry);
if (Array.isArray(p.transitCountries)) {
for (const c of p.transitCountries) if (typeof c === 'string') out.add(c);
}
}
} else if (event.assetType === 'storage') {
const s = registries.storage[event.assetId];
if (s && typeof s.country === 'string') out.add(s.country);
}
return Array.from(out).sort();
}
/**
* @param {unknown} data
* @returns {boolean}
@@ -83,6 +160,16 @@ export function validateRegistry(data) {
const end = Date.parse(e.endAt);
if (end < start) return false;
}
// countries[] is the denorm introduced in plan §R/#5 (decision B). Every
// event must resolve to ≥1 country code from its referenced asset. An
// empty array here means the upstream asset was removed or the assetId
// is misspelled — both are hard errors the cron should surface by
// failing validation (emptyDataIsFailure upstream preserves seed-meta
// staleness so health alarms fire).
if (!Array.isArray(e.countries) || e.countries.length === 0) return false;
for (const c of e.countries) {
if (typeof c !== 'string' || !/^[A-Z]{2}$/.test(c)) return false;
}
}
return true;
}
@@ -94,7 +181,22 @@ function isIsoDate(v) {
export function buildPayload() {
const registry = loadRegistry();
return { ...registry, updatedAt: new Date().toISOString() };
const assets = loadAssetRegistries();
// Denormalise countries[] on every event so CountryDeepDivePanel can
// filter by country without an asset-registry round trip. If an event's
// assetId cannot be resolved we leave countries[] empty — validateRegistry
// rejects that shape, which fails the seed (emptyDataIsFailure: true)
// and keeps seed-meta stale until the curator fixes the orphaned id.
const rawEvents = /** @type {Record<string, any>} */ (registry.events ?? {});
const events = Object.fromEntries(
Object.entries(rawEvents).map(([id, event]) => [
id,
{ ...event, countries: deriveCountriesForEvent(event, assets) },
]),
);
return { ...registry, events, updatedAt: new Date().toISOString() };
}
/**

View File

@@ -63,7 +63,7 @@
},
"cpc-force-majeure-2022": {
"id": "cpc-force-majeure-2022",
"assetId": "cpc-pipeline",
"assetId": "cpc",
"assetType": "pipeline",
"eventType": "mechanical",
"startAt": "2022-03-22T00:00:00Z",
@@ -275,7 +275,7 @@
"classifierVersion": "v1", "classifierConfidence": 0.96, "lastEvidenceUpdate": "2026-04-22T00:00:00Z"
},
"pdvsa-designation-2019": {
"id": "pdvsa-designation-2019", "assetId": "ve-petrol-2026-q1", "assetType": "pipeline",
"id": "pdvsa-designation-2019", "assetId": "venezuela-anzoategui-puerto-la-cruz", "assetType": "pipeline",
"eventType": "sanction", "startAt": "2019-01-28T00:00:00Z", "endAt": null,
"capacityOfflineBcmYr": 0, "capacityOfflineMbd": 0.5,
"causeChain": ["sanction"],

View File

@@ -59,6 +59,13 @@ export function projectDisruption(raw: unknown): EnergyDisruptionEntry | null {
classifierVersion: coerceString(r.classifierVersion, 'v1'),
classifierConfidence: coerceNumber(r.classifierConfidence),
lastEvidenceUpdate: coerceString(r.lastEvidenceUpdate),
// Seed-denormalised countries[] (plan §R/#5 decision B). The registry
// seeder joins each event's assetId against the pipeline/storage
// registries and emits the touched ISO2 set. Legacy rows written
// before the denorm shipped can still exist in Redis transiently; we
// surface an empty array there so the field is always present on the
// wire but consumers can detect pre-denorm data by checking length.
countries: coerceStringArray(r.countries),
};
}

View File

@@ -74,6 +74,19 @@ const SEVERITY_ORDER: Record<ThreatLevel, number> = {
info: 0,
};
// Clamp long disruption shortDescriptions when rendered in the compact
// CountryDeepDive Atlas row. Some registry entries (OFAC designations,
// multi-clause sanctions summaries) run 100200 chars; without a clamp
// they overflow the row. 80 chars is a balance between scannability and
// information density; full detail stays accessible by clicking through
// to the asset drawer.
const DISRUPTION_LABEL_MAX_LEN = 80;
function truncateDisruptionLabel(eventType: string, shortDescription: string): string {
const base = `${eventType}${shortDescription}`;
if (base.length <= DISRUPTION_LABEL_MAX_LEN) return base;
return base.slice(0, DISRUPTION_LABEL_MAX_LEN - 1) + '…';
}
export class CountryDeepDivePanel implements CountryBriefPanel {
private panel: HTMLElement;
private content: HTMLElement;
@@ -1255,6 +1268,80 @@ export class CountryDeepDivePanel implements CountryBriefPanel {
);
}
}).catch(() => {});
// Disruptions filter (plan §R/#5 decision B). The seeded registry carries
// denormalised `countries[]` on every event, populated from the referenced
// pipeline or storage facility. We fetch the full list once (no asset
// filter) and narrow client-side; the bootstrap payload already contains
// the registry so this is usually cache-hot. If the RPC round-trip returns
// nothing, we silently skip — CountryDeepDive is not the primary
// disruption surface (EnergyDisruptionsPanel is), so an empty row is
// preferable to a spurious error.
this.loadDisruptionsForCountry(iso2);
}
private async loadDisruptionsForCountry(iso2: string): Promise<void> {
try {
const { SupplyChainServiceClient } = await import(
'@/generated/client/worldmonitor/supply_chain/v1/service_client'
);
const { getRpcBaseUrl } = await import('@/services/rpc-client');
// Thread the panel's `signal` into the fetch shim so a country
// switch or panel close cancels the in-flight request, not just
// discards the result via the `this.currentCode !== iso2` guard
// below. Codex P2 on PR #3377.
const abortSignal = this.signal;
const client = new SupplyChainServiceClient(getRpcBaseUrl(), {
fetch: (input, init) => globalThis.fetch(input, { ...(init ?? {}), signal: abortSignal }),
});
const res = await client.listEnergyDisruptions({
assetId: '',
assetType: '',
ongoingOnly: false,
});
if (!res || !Array.isArray(res.events) || this.currentCode !== iso2) return;
const events = res.events.filter(e =>
Array.isArray(e.countries) && e.countries.includes(iso2),
);
if (events.length === 0) return;
const ongoing = events.filter(e => !e.endAt).length;
const summary = ongoing > 0
? `${ongoing} ongoing · ${events.length - ongoing} resolved`
: `${events.length} resolved`;
this.appendAtlasRow(
`Energy disruptions in ${iso2}`,
summary,
events.map(e => ({
id: e.id,
// Clamp long descriptions (some registry entries run 100-200
// chars, e.g. OFAC designation paragraphs) so the row layout
// stays compact. 80-char limit + ellipsis. Codex P2 on PR #3377.
label: truncateDisruptionLabel(e.eventType, e.shortDescription),
// Event type mirrors the existing asset-detail events (pipeline /
// storage) because disruptions reference the underlying asset; the
// panel-layout listener routes to the matching asset panel.
event: e.assetType === 'storage'
? 'energy:open-storage-facility-detail'
: 'energy:open-pipeline-detail',
// Emit ONLY the {pipelineId, facilityId} the drawers consume today
// (see PipelineStatusPanel + StorageFacilityMapPanel
// openDetailHandler). Previously this detail included a
// `highlightEventId` that no receiver read — Codex P2 flagged the
// misleading API surface. Clicking a row jumps to the asset
// drawer; the user sees the full per-asset timeline and locates
// the event visually. Re-add `highlightEventId` here and in
// EnergyDisruptionsPanel's dispatchOpenAsset only when the
// drawer panels ship matching consumer code.
detail: e.assetType === 'storage'
? { facilityId: e.assetId }
: { pipelineId: e.assetId },
})),
);
} catch {
// Silent — disruptions row is supplementary; failures elsewhere
// surface via the dedicated EnergyDisruptionsPanel. Abort errors
// from signal cancellation are also swallowed here intentionally.
}
}
private appendAtlasRow(

View File

@@ -621,6 +621,7 @@ export interface EnergyDisruptionEntry {
classifierVersion: string;
classifierConfidence: number;
lastEvidenceUpdate: string;
countries: string[];
}
export interface EnergyDisruptionSource {

View File

@@ -621,6 +621,7 @@ export interface EnergyDisruptionEntry {
classifierVersion: string;
classifierConfidence: number;
lastEvidenceUpdate: string;
countries: string[];
}
export interface EnergyDisruptionSource {

View File

@@ -7,13 +7,20 @@ import { fileURLToPath } from 'node:url';
import {
validateRegistry,
recordCount,
buildPayload,
ENERGY_DISRUPTIONS_CANONICAL_KEY,
MAX_STALE_MIN,
} from '../scripts/_energy-disruption-registry.mjs';
const __dirname = dirname(fileURLToPath(import.meta.url));
const raw = readFileSync(resolve(__dirname, '../scripts/data/energy-disruptions.json'), 'utf-8');
const registry = JSON.parse(raw) as { events: Record<string, any> };
const rawRegistry = JSON.parse(raw) as { events: Record<string, any> };
// validateRegistry checks the buildPayload output (the denormalised shape
// the seeder actually writes to Redis), not the raw JSON on disk. Since
// plan §R/#5 decision B, buildPayload attaches countries[] per event; the
// raw file intentionally omits that field so a curator can edit events
// without manually computing affected countries.
const registry = buildPayload() as { events: Record<string, any> };
describe('energy-disruptions registry — schema', () => {
test('registry passes validateRegistry', () => {
@@ -94,6 +101,37 @@ describe('energy-disruptions registry — evidence', () => {
});
});
describe('energy-disruptions registry — countries[] denorm (§R #5 B)', () => {
test('every event in buildPayload output has non-empty countries[]', () => {
for (const e of Object.values(registry.events)) {
assert.ok(
Array.isArray(e.countries) && e.countries.length > 0,
`${e.id}: empty countries[] — assetId may be orphaned`,
);
}
});
test('every country code is ISO-3166-1 alpha-2 uppercase', () => {
for (const e of Object.values(registry.events)) {
for (const c of e.countries) {
assert.ok(/^[A-Z]{2}$/.test(c), `${e.id}: bad country code ${c}`);
}
}
});
test('raw JSON on disk does NOT carry countries[] (source of truth is the join)', () => {
for (const e of Object.values(rawRegistry.events)) {
assert.equal(e.countries, undefined, `${e.id}: raw JSON should not pre-compute countries[]`);
}
});
test('nord-stream-1-sabotage-2022 resolves to [DE, RU]', () => {
const nord = registry.events['nord-stream-1-sabotage-2022'];
assert.ok(nord, 'nord-stream-1-sabotage-2022 missing from registry');
assert.deepEqual(nord.countries, ['DE', 'RU']);
});
});
describe('energy-disruptions registry — validateRegistry rejects bad input', () => {
test('rejects empty object', () => {
assert.equal(validateRegistry({}), false);