feat(energy-atlas): GEM pipeline import infrastructure (parity PR 1, plan U1-U4) (#3397)

* feat(energy-atlas): GEM pipeline import infrastructure (PR 1, plan U1-U4)

Lands the parser, dedup helper, validator extensions, and operator runbook
for the Global Energy Monitor (CC-BY 4.0) pipeline-data refresh — closing
~3.6× of the Energy Atlas pipeline-scale gap once the operator runs the
import.

Per docs/plans/2026-04-25-003-feat-energy-parity-pushup-plan.md PR 1.

U1 — Validator + schema extensions:
- Add `'gem'` to VALID_SOURCES in scripts/_pipeline-registry.mjs and to the
  evidence-bearing-source whitelist in derivePipelinePublicBadge so GEM-
  sourced offline rows derive a `disputed` badge via the external-signal
  rule (parity with `press`/`satellite`/`ais-relay`).
- Export VALID_SOURCES so tests assert against the same source-of-truth
  the validator uses (matches the VALID_OIL_PRODUCT_CLASSES pattern from
  PR #3383).
- Floor bump (MIN_PIPELINES_PER_REGISTRY 8→200) intentionally DEFERRED
  to the follow-up data PR — bumping it now would gate the existing 75+75
  hand-curated rows below the new floor and break seeder publishes
  before the GEM data lands.

U2 — GEM parser (test-first):
- scripts/import-gem-pipelines.mjs reads a local JSON file (operator pre-
  converts GEM Excel externally — no `xlsx` dependency added). Schema-
  drift sentinel throws on missing columns. Status mapping covers
  Operating/Construction/Cancelled/Mothballed/Idle/Shut-in. ProductClass
  mapping covers Crude Oil / Refined Products / mixed-flow notes.
  Capacity-unit conversion handles bcm/y, bbl/d, Mbd, kbd.
- 22 tests in tests/import-gem-pipelines.test.mjs cover schema sentinel,
  fuel split, status mapping, productClass mapping, capacity conversion,
  minimum-viable-evidence shape, registry-shape conformance, and bad-
  coordinate rejection.

U3 — Deduplication (pure deterministic):
- scripts/_pipeline-dedup.mjs: dedupePipelines(existing, candidates) →
  { toAdd, skippedDuplicates }. Match rule: haversine ≤5km AND name
  Jaccard ≥0.6 (BOTH required). Reverse-direction-pair-aware.
- 19 tests cover internal helpers, match logic, id collision, determinism,
  and empty inputs.

U4 — Operator runbook (data import deferred):
- docs/methodology/pipelines.mdx: 7-step runbook for the operator to
  download GEM, pre-convert Excel→JSON, dry-run with --print-candidates,
  merge with --merge, bump the registry floor, and commit with
  provenance metadata.
- The actual data import is intentionally OUT OF SCOPE for this agent-
  authored PR because GEM downloads are registration-gated. A follow-up
  PR will commit the imported scripts/data/pipelines-{gas,oil}.json +
  bump MIN_PIPELINES_PER_REGISTRY → 200 + record the GEM release SHA256.

Tests: typecheck clean; 67 tests pass across the three test files.

Codex-approved through 8 review rounds against origin/main @ 050073354.

* fix(energy-atlas): wire --merge to dedupePipelines + within-batch dedup (PR1 review)

P1 — --merge was a TODO no-op (import-gem-pipelines.mjs:291):
- Previously exited with code 2 + a "TODO: wire dedup once U3 lands"
  message. The PR body and the methodology runbook both advertised
  --merge as the operator path.
- Add mergeIntoRegistry(filename, candidates) helper that loads the
  existing envelope, runs dedupePipelines() against the candidate
  list, sorts new entries alphabetically by id (stable diff on rerun),
  validates the merged registry via validateRegistry(), and writes
  to disk only after validation passes. CLI --merge now invokes it
  for both gas and oil + prints a per-fuel summary.
- Source attribution: the registry envelope's `source` field is
  upgraded to mention GEM (CC-BY 4.0) on first merge so the data file
  itself documents provenance.

P2 — dedup transitive-match bug (_pipeline-dedup.mjs:120):
- Pre-fix loop checked each candidate ONLY against the original
  `existing` array. Two GEM rows that match each other but not anything
  in `existing` would BOTH be added, defeating the dedup contract for
  same-batch duplicates (real example: a primary GEM entry plus a
  duplicate row from a regional supplemental sheet).
- Now compares against existing FIRST (existing wins on cross-set
  match — preserves richer hand-curated evidence), then falls back to
  the already-accepted toAdd set. Within-batch matches retain the FIRST
  accepted candidate (deterministic by candidate-list order).

Tests: 22 in tests/pipeline-dedup.test.mjs (3 new) cover the
within-batch dedup, transitive collapse, and existing-wins-over-
already-accepted scenarios. typecheck clean.

* fix(energy-atlas): cross-file-atomic --merge (PR1 review #2)

P1 — partial-import on disk if oil validation fails after gas writes
(import-gem-pipelines.mjs:329 / :350):
- Previous flow ran `mergeIntoRegistry('pipelines-gas.json', gas)` which
  wrote to disk, then `mergeIntoRegistry('pipelines-oil.json', oil)`. If
  oil validation failed, the operator was left with a half-imported
  state: gas had GEM rows committed to disk but oil didn't.
- Refactor into a two-phase API:
  1. prepareMerge(filename, candidates) — pure, no disk I/O. Builds the
     merged envelope, validates it, throws on validation failure.
  2. mergeBothRegistries(gasCandidates, oilCandidates) — calls
     prepareMerge for BOTH fuels first; only writes to disk after BOTH
     pass validation. If oil's prepareMerge throws, gas was never
     touched on disk.
- CLI --merge now invokes mergeBothRegistries. The atomicity guarantee
  is documented inline in the helper.

typecheck clean. No new tests because the existing dedup + validate
suites cover the underlying logic; the change is purely about call
ordering for atomicity.

* fix(energy-atlas): deterministic lastEvidenceUpdate + clarify test comment (PR1 review #3)

P2 — lastEvidenceUpdate was non-deterministic (Greptile P2):
- Previous code used new Date().toISOString() per parser run, so two runs
  of parseGemPipelines on the same input on different days produced
  byte-different output. Quarterly re-imports would produce noisy
  full-row diffs even when the upstream GEM data hadn't changed.
- New: resolveEvidenceTimestamp(envelope) derives the timestamp from
  envelope.downloadedAt (the operator-recorded date) or sourceVersion
  if it parses as ISO. Falls back to 1970-01-01 sentinel when neither
  is set — deliberately ugly so reviewers spot the missing field in
  the data file diff rather than getting silent today's date.
- Computed once per parse run so every emitted candidate gets the
  same timestamp.

P2 — misleading test comment (Greptile P2):
- Comment in tests/import-gem-pipelines.test.mjs:136 said "400_000 bbl/d
  ÷ 1000 = 400 Mbd" while the assertion correctly expects 0.4 (because
  the convention is millions, not thousands). Rewrote the comment to
  state the actual rule + arithmetic clearly.

3 new tests for determinism: (a) two parser runs produce identical
output, (b) timestamp derives from downloadedAt, (c) missing date
yields the epoch sentinel (loud failure mode).
This commit is contained in:
Elie Habib
2026-04-25 17:55:45 +04:00
committed by GitHub
parent eeffac31bf
commit d9a1f6a0f8
9 changed files with 1352 additions and 5 deletions

View File

@@ -65,6 +65,25 @@ No human review queue gates the transition — quality comes from the tiered evi
Pipeline-registry data derived from [Global Energy Monitor](https://globalenergymonitor.org) (CC-BY 4.0), with additional operator and regulator material incorporated under fair-use for news reporting.
The hand-curated subset (operator/regulator/sanctions-bearing rows with classifier confidence ≥ 0.7) ships with full evidence bundles: operator statements, sanction references, last-evidence-update timestamps, and named source authorities. The GEM-imported subset (long-tail coverage rows) ships with minimum-viable evidence — `physicalStateSource: gem`, `classifierConfidence ≤ 0.5`, no operator statement, no sanction references. Both subsets pass the same registry validator and feed the same public-badge derivation.
## Operator runbook — GEM import refresh
GEM publishes new releases of the Oil & Gas Infrastructure Trackers roughly quarterly. The refresh is operator-mediated rather than cron-driven because the GEM download URL changes per release; a hardcoded URL would silently fetch a different version than the one we attribute. Steps:
1. Visit the [GEM Oil & Gas Infrastructure Trackers](https://globalenergymonitor.org/projects/global-oil-gas-infrastructure-tracker/) page. Registration is required for direct download even though the data itself is CC-BY 4.0.
2. Download the latest gas + oil tracker Excel workbooks. Record the release date and download URL.
3. Pre-convert each workbook to JSON externally (Numbers / pandas / csvkit), normalizing column names to the canonical set documented in `scripts/import-gem-pipelines.mjs::REQUIRED_COLUMNS` and country names to ISO 3166-1 alpha-2 codes.
4. Run a dry pass to inspect the candidate diff:
```bash
GEM_PIPELINES_FILE=/tmp/gem.json node scripts/import-gem-pipelines.mjs --print-candidates | jq '.gas | length, .oil | length'
```
5. Run the merge to write the deduplicated rows into `scripts/data/pipelines-{gas,oil}.json`. Spot-check 5-10 random GEM-sourced rows manually before committing.
6. Commit the data + bump `MIN_PIPELINES_PER_REGISTRY` in `scripts/_pipeline-registry.mjs` to a sensible new floor (e.g. 200) so future partial imports fail loud. Record the GEM release date, download URL, and SHA256 of the source workbook in the commit message.
7. Verify `npm run test:data` is green before pushing.
Schema-drift sentinel guards against silent failures when GEM renames columns between releases — the parser throws with a clear message naming the missing column rather than producing zero-data rows.
## Corrections
See [`/corrections`](/corrections) for the planned revision-log shape

170
scripts/_pipeline-dedup.mjs Normal file
View File

@@ -0,0 +1,170 @@
// @ts-check
//
// Pure deterministic deduplication for the GEM pipeline import. NOT an entry
// point — see scripts/import-gem-pipelines.mjs for the orchestrator.
//
// Match rule (BOTH must hold):
// 1. Endpoint distance ≤ 5 km (haversine, route-direction-flipped pair-aware
// so Mozyr→Adamowo and Adamowo→Mozyr count as the same).
// 2. Name token Jaccard ≥ 0.6 (lowercased word tokens, stopwords removed).
//
// Conflict resolution: existing row WINS. Hand-curated rows have richer
// evidence (operator statements, sanction refs, classifier confidence ≥ 0.7)
// that GEM's minimum-viable evidence shouldn't overwrite. The dedup function
// returns { toAdd, skippedDuplicates } so the caller can audit which GEM
// candidates were absorbed by existing rows.
//
// Determinism: zero Date.now() / Math.random() / Set ordering reliance. Two
// invocations on identical inputs produce identical outputs.
const STOPWORDS = new Set([
'pipeline', 'pipelines', 'system', 'systems', 'line', 'lines', 'network',
'route', 'project', 'the', 'and', 'of', 'a', 'an',
]);
const MATCH_DISTANCE_KM = 5;
const MATCH_JACCARD_MIN = 0.6;
const EARTH_RADIUS_KM = 6371;
/**
* Haversine great-circle distance in km between two lat/lon points.
*/
function haversineKm(a, b) {
const toRad = (deg) => (deg * Math.PI) / 180;
const dLat = toRad(b.lat - a.lat);
const dLon = toRad(b.lon - a.lon);
const lat1 = toRad(a.lat);
const lat2 = toRad(b.lat);
const x =
Math.sin(dLat / 2) ** 2 +
Math.sin(dLon / 2) ** 2 * Math.cos(lat1) * Math.cos(lat2);
const c = 2 * Math.atan2(Math.sqrt(x), Math.sqrt(1 - x));
return EARTH_RADIUS_KM * c;
}
/**
* Average endpoint distance between two pipelines, considering both forward
* and reversed pairings. The smaller of the two is returned so a route
* direction flip doesn't appear as a different pipeline.
*/
function averageEndpointDistanceKm(a, b) {
const forward =
(haversineKm(a.startPoint, b.startPoint) + haversineKm(a.endPoint, b.endPoint)) / 2;
const reversed =
(haversineKm(a.startPoint, b.endPoint) + haversineKm(a.endPoint, b.startPoint)) / 2;
return Math.min(forward, reversed);
}
/**
* Tokenize a name: lowercased word tokens, ASCII-only word boundaries,
* stopwords removed. Stable across invocations.
*/
function tokenize(name) {
return name
.toLowerCase()
.normalize('NFKD')
// Strip combining marks (diacritics) so "Limón" → "limon", not "limo'n".
// Range ̀-ͯ covers Combining Diacritical Marks per Unicode.
.replace(/[̀-ͯ]/g, '')
.replace(/[^a-z0-9 ]+/g, ' ')
.split(/\s+/)
.filter((t) => t.length > 0 && !STOPWORDS.has(t));
}
/**
* Jaccard similarity = |A ∩ B| / |A B| over token sets.
*/
function jaccard(a, b) {
const setA = new Set(tokenize(a));
const setB = new Set(tokenize(b));
if (setA.size === 0 && setB.size === 0) return 0;
let intersection = 0;
for (const t of setA) if (setB.has(t)) intersection++;
const unionSize = setA.size + setB.size - intersection;
return unionSize === 0 ? 0 : intersection / unionSize;
}
/**
* Decide if a candidate matches an existing row. Both criteria required.
*/
function isDuplicate(candidate, existing) {
const dist = averageEndpointDistanceKm(candidate, existing);
if (dist > MATCH_DISTANCE_KM) return false;
const sim = jaccard(candidate.name, existing.name);
return sim >= MATCH_JACCARD_MIN;
}
/**
* Disambiguate a candidate's id against existing ids by appending -2, -3, ...
* until unique. Stable: same input → same output.
*/
function uniqueId(baseId, takenIds) {
if (!takenIds.has(baseId)) return baseId;
let n = 2;
while (takenIds.has(`${baseId}-${n}`)) n++;
return `${baseId}-${n}`;
}
/**
* Pure dedup function.
*
* @param {Array<{ id: string, name: string, startPoint: {lat:number,lon:number}, endPoint: {lat:number,lon:number} }>} existing
* @param {Array<{ id: string, name: string, startPoint: {lat:number,lon:number}, endPoint: {lat:number,lon:number} }>} candidates
* @returns {{ toAdd: any[], skippedDuplicates: Array<{ candidate: any, matchedExistingId: string, distanceKm: number, jaccard: number }> }}
*/
export function dedupePipelines(existing, candidates) {
const taken = new Set(existing.map((p) => p.id));
const toAdd = [];
const skippedDuplicates = [];
for (const cand of candidates) {
// Compare against BOTH existing rows AND candidates already accepted
// into toAdd. Without this, two GEM rows that match each other but
// not anything in `existing` would both be added — duplicate-import
// bug. Existing rows still win on cross-set match (they have richer
// hand-curated evidence); within-toAdd matches retain the FIRST
// accepted candidate (deterministic by candidate-list order).
let matched = null;
for (const ex of existing) {
if (isDuplicate(cand, ex)) {
matched = ex;
break;
}
}
if (!matched) {
for (const earlier of toAdd) {
if (isDuplicate(cand, earlier)) {
matched = earlier;
break;
}
}
}
if (matched) {
skippedDuplicates.push({
candidate: cand,
matchedExistingId: matched.id,
distanceKm: averageEndpointDistanceKm(cand, matched),
jaccard: jaccard(cand.name, matched.name),
});
continue;
}
const finalId = uniqueId(cand.id, taken);
taken.add(finalId);
toAdd.push({ ...cand, id: finalId });
}
return { toAdd, skippedDuplicates };
}
// Internal exports for test coverage; not part of the public surface.
export const _internal = {
haversineKm,
averageEndpointDistanceKm,
tokenize,
jaccard,
isDuplicate,
uniqueId,
STOPWORDS,
MATCH_DISTANCE_KM,
MATCH_JACCARD_MIN,
};

View File

@@ -29,7 +29,13 @@ export const PIPELINES_TTL_SECONDS = 21 * 24 * 3600;
const VALID_PHYSICAL_STATES = new Set(['flowing', 'reduced', 'offline', 'unknown']);
const VALID_COMMERCIAL_STATES = new Set(['under_contract', 'expired', 'suspended', 'unknown']);
const VALID_SOURCES = new Set(['operator', 'regulator', 'press', 'satellite', 'ais-relay']);
// `gem` covers rows imported from Global Energy Monitor's Oil & Gas
// Infrastructure Trackers (CC-BY 4.0). Treated as an evidence-bearing source
// for non-flowing badges in the same way as `press` / `satellite` / `ais-relay`,
// since GEM is an academic/curated dataset with traceable provenance — not a
// silent default. Exported alongside VALID_OIL_PRODUCT_CLASSES so test suites
// can assert against the same source of truth the validator uses.
export const VALID_SOURCES = new Set(['operator', 'regulator', 'press', 'satellite', 'ais-relay', 'gem']);
// Required on every oil pipeline. `crude` = crude-oil lines (default),
// `products` = refined-product lines (gasoline/diesel/jet), `mixed` =
// dual-use bridges moving both. Gas pipelines don't carry this field
@@ -104,13 +110,16 @@ export function validateRegistry(data) {
// Every non-`flowing` badge requires at least one evidence field with signal.
// This prevents shipping an `offline` label with zero supporting evidence.
// `gem` joins the evidence-bearing sources because GEM is a curated
// academic dataset with traceable provenance, not a silent default.
if (ev.physicalState !== 'flowing') {
const hasEvidence =
ev.operatorStatement != null ||
ev.sanctionRefs.length > 0 ||
ev.physicalStateSource === 'ais-relay' ||
ev.physicalStateSource === 'satellite' ||
ev.physicalStateSource === 'press';
ev.physicalStateSource === 'press' ||
ev.physicalStateSource === 'gem';
if (!hasEvidence) return false;
}
}

View File

@@ -0,0 +1,429 @@
// @ts-check
//
// One-shot import: GEM Oil & Gas Infrastructure Trackers (CC-BY 4.0) →
// scripts/data/pipelines-{gas,oil}.json shape.
//
// PROVENANCE / OPERATOR-MEDIATED:
// This script is INTENTIONALLY local-file-only — it does NOT fetch GEM at
// runtime. The GEM download URL changes per release; a hardcoded URL would
// silently fetch a different version than the one we attribute. The
// operator runs:
//
// 1. Visit https://globalenergymonitor.org/projects/global-oil-gas-infrastructure-tracker/
// (registration required for direct download even though the data
// itself is CC-BY 4.0 licensed).
// 2. Download the latest gas + oil tracker Excel workbooks.
// 3. Pre-convert each workbook's primary sheet to JSON (Numbers /
// pandas / csvkit / equivalent) using the canonical column names
// documented in REQUIRED_COLUMNS below. Country names should be
// pre-mapped to ISO 3166-1 alpha-2 codes during conversion.
// 4. Save the JSON to a local path and run this script with:
// GEM_PIPELINES_FILE=/path/to/gem.json node scripts/import-gem-pipelines.mjs --merge
// 5. Record the GEM release date + download URL + file SHA256 in the
// commit message and docs/methodology/pipelines.mdx, per the
// seed-imf-external.mjs provenance pattern.
//
// EXECUTION MODES:
// --print-candidates : parse + print candidates as JSON to stdout (dry run)
// --merge : parse, dedupe against existing pipelines-{gas,oil}.json,
// write merged JSON to disk, abort on validate failure
//
// NO xlsx DEPENDENCY: the operator pre-converts externally; this keeps the
// runtime dependency surface tight and avoids the known CVE history of the
// xlsx package for a quarterly one-shot operation.
import { readFileSync, writeFileSync } from 'node:fs';
import { dirname, resolve as resolvePath } from 'node:path';
import { fileURLToPath } from 'node:url';
import { dedupePipelines } from './_pipeline-dedup.mjs';
import { validateRegistry } from './_pipeline-registry.mjs';
/**
* Canonical input columns. The operator's Excel-to-JSON conversion must
* preserve these EXACT key names for each row in `pipelines[]`. Schema-drift
* sentinel below throws on missing keys before any data is emitted.
*/
export const REQUIRED_COLUMNS = [
'name',
'operator',
'fuel', // 'Natural Gas' | 'Oil'
'fromCountry', // ISO 3166-1 alpha-2
'toCountry', // ISO 3166-1 alpha-2
'transitCountries', // string[] (may be empty)
'capacity',
'capacityUnit', // 'bcm/y' | 'bbl/d' | 'Mbd'
'lengthKm',
'status', // GEM Status string (mapped below)
'startLat',
'startLon',
'endLat',
'endLon',
];
/**
* Maps GEM status strings to our `physicalState` enum.
* Default: 'unknown' — falls into the "treat as not commissioned" bucket.
*/
const STATUS_MAP = {
Operating: 'flowing',
Operational: 'flowing',
Construction: 'unknown',
Proposed: 'unknown',
Cancelled: 'offline',
Mothballed: 'offline',
Idle: 'offline',
'Shut-in': 'offline',
};
/**
* Maps GEM `product` field to our `productClass` enum (oil only).
*/
const PRODUCT_CLASS_MAP = {
'Crude Oil': 'crude',
Crude: 'crude',
'Refined Products': 'products',
'Petroleum Products': 'products',
Products: 'products',
Mixed: 'mixed',
'Crude/Products': 'mixed',
};
const VALID_LAT = (v) => Number.isFinite(v) && v >= -90 && v <= 90;
const VALID_LON = (v) => Number.isFinite(v) && v >= -180 && v <= 180;
function slugify(name, country) {
const base = name.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-+|-+$/g, '')
.slice(0, 60);
return `${base}-${country.toLowerCase()}`;
}
function inferFuel(row) {
const f = String(row.fuel ?? '').toLowerCase();
if (f.includes('gas')) return 'gas';
if (f.includes('oil') || f.includes('crude') || f.includes('petroleum')) return 'oil';
return null;
}
function mapStatus(gemStatus) {
return STATUS_MAP[gemStatus] ?? 'unknown';
}
function mapProductClass(rawProduct) {
if (!rawProduct) return 'crude'; // conservative default per plan U2
const cls = PRODUCT_CLASS_MAP[rawProduct];
if (cls) return cls;
// Best-effort substring match for Excel column variations
const lower = rawProduct.toLowerCase();
if (lower.includes('crude') && lower.includes('product')) return 'mixed';
if (lower.includes('crude')) return 'crude';
if (lower.includes('product') || lower.includes('refined')) return 'products';
return 'crude';
}
function convertCapacityToBcmYr(value, unit) {
if (unit === 'bcm/y' || unit === 'bcm/yr') return Number(value);
// Future: add bcf/d → bcm/y conversion if needed. Throw loudly so the
// operator notices instead of silently writing zeros.
throw new Error(`Unsupported gas capacity unit: ${unit}. Expected 'bcm/y'.`);
}
function convertCapacityToMbd(value, unit) {
// Schema convention: capacityMbd is in MILLION barrels per day (e.g. CPC
// pipeline = 1.4 Mbd = 1.4M bbl/day). So conversions:
// 'Mbd' → preserved
// 'bbl/d' → divide by 1_000_000
// 'kbd' → divide by 1_000 (rare)
if (unit === 'Mbd') return Number(value);
if (unit === 'bbl/d') return Number(value) / 1_000_000;
if (unit === 'kbd') return Number(value) / 1_000;
throw new Error(`Unsupported oil capacity unit: ${unit}. Expected 'Mbd' / 'bbl/d' / 'kbd'.`);
}
/**
* Resolve `lastEvidenceUpdate` for emitted candidates. Prefers the
* operator-recorded `downloadedAt` (or `sourceVersion` if it parses) so
* two parser runs on the same input produce byte-identical output.
* Falls back to the unix-epoch sentinel `1970-01-01` rather than
* `new Date()` — the fallback is deliberately ugly so anyone reviewing
* the data file sees that the operator forgot to set the date and re-runs.
*
* @param {Record<string, unknown>} envelope
*/
function resolveEvidenceTimestamp(envelope) {
const candidates = [envelope.downloadedAt, envelope.sourceVersion];
for (const v of candidates) {
if (typeof v === 'string') {
// Accept full ISO strings OR bare YYYY-MM-DD; coerce to midnight-UTC.
const isoMatch = v.match(/^\d{4}-\d{2}-\d{2}/);
if (isoMatch) return `${isoMatch[0]}T00:00:00Z`;
}
}
// Sentinel: GEM data SHOULD always carry downloadedAt per the operator
// runbook. If neither field is present, surface the gap loudly via the
// epoch date — it'll show up obviously in the diff.
return '1970-01-01T00:00:00Z';
}
/**
* Parse a GEM-shape JSON object into our two-registry candidate arrays.
*
* @param {unknown} data
* @returns {{ gas: any[], oil: any[] }}
* @throws {Error} on schema drift, malformed input, or unknown capacity units.
*/
export function parseGemPipelines(data) {
if (!data || typeof data !== 'object' || Array.isArray(data)) {
throw new Error('parseGemPipelines: input must be an object');
}
const obj = /** @type {Record<string, unknown>} */ (data);
if (!Array.isArray(obj.pipelines)) {
throw new Error('parseGemPipelines: input must contain pipelines[] array');
}
// Compute once per parse run so every emitted candidate gets the SAME
// timestamp — and so two runs on identical input produce byte-identical
// JSON (Greptile P2 on PR #3397: previous use of `new Date().toISOString()`
// made re-running the parser produce a noisy diff every time).
const evidenceTimestamp = resolveEvidenceTimestamp(obj);
// Schema sentinel: assert every required column is present on every row.
// GEM occasionally renames columns between releases; the operator's
// conversion step is supposed to normalize, but we double-check here so
// a missed rename fails loud instead of producing silent zero-data.
for (const [i, row] of obj.pipelines.entries()) {
if (!row || typeof row !== 'object') {
throw new Error(`parseGemPipelines: pipelines[${i}] is not an object`);
}
const r = /** @type {Record<string, unknown>} */ (row);
for (const col of REQUIRED_COLUMNS) {
if (!(col in r)) {
throw new Error(
`parseGemPipelines: schema drift — pipelines[${i}] missing column "${col}". ` +
`Re-run the operator's Excel→JSON conversion using the canonical ` +
`column names documented in scripts/import-gem-pipelines.mjs::REQUIRED_COLUMNS.`,
);
}
}
}
const gas = [];
const oil = [];
const droppedReasons = { fuel: 0, coords: 0, capacity: 0 };
for (const row of obj.pipelines) {
const r = /** @type {Record<string, any>} */ (row);
const fuel = inferFuel(r);
if (!fuel) {
droppedReasons.fuel++;
continue;
}
const startLat = Number(r.startLat);
const startLon = Number(r.startLon);
const endLat = Number(r.endLat);
const endLon = Number(r.endLon);
if (!VALID_LAT(startLat) || !VALID_LON(startLon) || !VALID_LAT(endLat) || !VALID_LON(endLon)) {
droppedReasons.coords++;
continue;
}
let capacityField, capacityValue;
try {
if (fuel === 'gas') {
capacityField = 'capacityBcmYr';
capacityValue = convertCapacityToBcmYr(r.capacity, r.capacityUnit);
} else {
capacityField = 'capacityMbd';
capacityValue = convertCapacityToMbd(r.capacity, r.capacityUnit);
}
} catch (err) {
// Unsupported unit → drop the row; let the operator notice via the count
// delta in dry-run output. Throwing would abort the entire run on a
// single bad row, which is too brittle.
droppedReasons.capacity++;
continue;
}
if (!Number.isFinite(capacityValue) || capacityValue <= 0) {
droppedReasons.capacity++;
continue;
}
const id = slugify(r.name, r.fromCountry);
const transitCountries = Array.isArray(r.transitCountries)
? r.transitCountries.filter((c) => typeof c === 'string')
: [];
const candidate = {
id,
name: r.name,
operator: r.operator,
commodityType: fuel,
fromCountry: r.fromCountry,
toCountry: r.toCountry,
transitCountries,
[capacityField]: capacityValue,
lengthKm: Number(r.lengthKm) || 0,
inService: Number(r.startYear) || 0,
startPoint: { lat: startLat, lon: startLon },
endPoint: { lat: endLat, lon: endLon },
evidence: {
physicalState: mapStatus(r.status),
physicalStateSource: 'gem',
operatorStatement: null,
commercialState: 'unknown',
sanctionRefs: [],
lastEvidenceUpdate: evidenceTimestamp,
classifierVersion: 'gem-import-v1',
classifierConfidence: 0.4,
},
};
if (fuel === 'oil') {
candidate.productClass = mapProductClass(r.product);
}
(fuel === 'gas' ? gas : oil).push(candidate);
}
return { gas, oil };
}
/**
* Read a GEM-shape JSON file and return parsed candidates. Returns the same
* shape as parseGemPipelines but accepts a file path instead of an in-memory
* object — useful for CLI and dedup pipelines.
*
* @param {string} filePath
* @returns {{ gas: any[], oil: any[] }}
*/
export function loadGemPipelinesFromFile(filePath) {
const raw = readFileSync(filePath, 'utf-8');
let data;
try {
data = JSON.parse(raw);
} catch (err) {
throw new Error(
`parseGemPipelines: file at ${filePath} is not valid JSON. ` +
`Did the operator pre-convert the GEM Excel correctly?`,
);
}
return parseGemPipelines(data);
}
/**
* Read an existing registry file and return its parsed envelope.
* @param {string} filename
*/
function loadExistingRegistry(filename) {
const __dirname = dirname(fileURLToPath(import.meta.url));
const path = resolvePath(__dirname, 'data', filename);
const raw = readFileSync(path, 'utf-8');
return { path, envelope: JSON.parse(raw) };
}
/**
* Build (but do NOT write) a merged registry envelope. Pure: no disk I/O.
* Throws on validation failure so the caller can short-circuit before any
* file is written.
*
* @param {string} filename - 'pipelines-gas.json' or 'pipelines-oil.json'
* @param {any[]} candidates - parser output for that fuel
* @returns {{ path: string, mergedEnvelope: any, added: number, skipped: number, total: number }}
*/
function prepareMerge(filename, candidates) {
const { path, envelope } = loadExistingRegistry(filename);
const existing = Object.values(envelope.pipelines ?? {});
const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates);
// Append in a stable order (alphabetical-by-id) so repeated runs produce
// a clean diff. Hand-curated rows keep their original ordering at the top.
const appended = [...toAdd].sort((a, b) => a.id.localeCompare(b.id));
const mergedPipelines = { ...envelope.pipelines };
for (const p of appended) mergedPipelines[p.id] = p;
const mergedEnvelope = {
...envelope,
source: envelope.source?.includes('Global Energy Monitor')
? envelope.source
: `${envelope.source ?? 'Hand-curated'} + Global Energy Monitor (CC-BY 4.0)`,
pipelines: mergedPipelines,
};
if (!validateRegistry(mergedEnvelope)) {
throw new Error(
`prepareMerge: merged ${filename} would FAIL validateRegistry. ` +
`Aborting before writing to disk. Inspect the diff with --print-candidates first.`,
);
}
return {
path,
mergedEnvelope,
added: toAdd.length,
skipped: skippedDuplicates.length,
total: Object.keys(mergedPipelines).length,
};
}
/**
* Cross-file-atomic merge: builds AND validates BOTH gas + oil envelopes
* before writing EITHER file. If oil validation fails after gas already
* succeeded, neither is written — prevents the half-imported state where
* gas has GEM rows on disk but oil doesn't.
*
* Two-phase: prepare both → write both. Pure prepare phase, side-effecting
* write phase. Order of writes is stable (gas first, oil second), but the
* "validate everything before any write" guarantee is what prevents
* partial state on failure.
*
* @returns {{ gas: ReturnType<typeof prepareMerge>, oil: ReturnType<typeof prepareMerge> }}
*/
function mergeBothRegistries(gasCandidates, oilCandidates) {
// Phase 1: prepare + validate BOTH. If either throws, neither file is
// touched on disk.
const gas = prepareMerge('pipelines-gas.json', gasCandidates);
const oil = prepareMerge('pipelines-oil.json', oilCandidates);
// Phase 2: both validated → write both.
writeFileSync(gas.path, JSON.stringify(gas.mergedEnvelope, null, 2) + '\n');
writeFileSync(oil.path, JSON.stringify(oil.mergedEnvelope, null, 2) + '\n');
return { gas, oil };
}
// CLI entry point: only fires when this file is the entry script.
if (process.argv[1] && process.argv[1].endsWith('import-gem-pipelines.mjs')) {
const filePath = process.env.GEM_PIPELINES_FILE;
if (!filePath) {
console.error('GEM_PIPELINES_FILE env var not set. See script header for operator runbook.');
process.exit(1);
}
const args = new Set(process.argv.slice(2));
const { gas, oil } = loadGemPipelinesFromFile(filePath);
if (args.has('--print-candidates')) {
process.stdout.write(JSON.stringify({ gas, oil }, null, 2) + '\n');
} else if (args.has('--merge')) {
try {
// mergeBothRegistries validates BOTH envelopes before writing
// either — so a validation failure on oil after gas succeeded
// leaves neither file modified on disk. Prevents the half-imported
// state the previous per-file flow could produce.
const { gas: gasResult, oil: oilResult } = mergeBothRegistries(gas, oil);
console.error(`gas: +${gasResult.added} added, ${gasResult.skipped} duplicates skipped, ${gasResult.total} total`);
console.error(`oil: +${oilResult.added} added, ${oilResult.skipped} duplicates skipped, ${oilResult.total} total`);
console.error(
`Wrote merged data to scripts/data/pipelines-{gas,oil}.json. ` +
`Inspect the diff before committing. Per the operator runbook, ` +
`also update MIN_PIPELINES_PER_REGISTRY in scripts/_pipeline-registry.mjs ` +
`to a sensible new floor (e.g. 200) once the data is in.`,
);
} catch (err) {
console.error(err instanceof Error ? err.message : String(err));
process.exit(2);
}
} else {
console.error('Pass --print-candidates (dry run) or --merge (write to data files).');
process.exit(1);
}
}

View File

@@ -24,7 +24,7 @@ export type PipelinePublicBadge = 'flowing' | 'reduced' | 'offline' | 'disputed'
export interface PipelineEvidenceInput {
physicalState?: string; // 'flowing'|'reduced'|'offline'|'unknown'
physicalStateSource?: string; // 'operator'|'regulator'|'press'|'satellite'|'ais-relay'
physicalStateSource?: string; // 'operator'|'regulator'|'press'|'satellite'|'ais-relay'|'gem'
operatorStatement?: { text?: string; url?: string; date?: string } | null;
commercialState?: string; // 'under_contract'|'expired'|'suspended'|'unknown'
sanctionRefs?: ReadonlyArray<{ authority?: string; listId?: string; date?: string; url?: string }>;
@@ -49,7 +49,7 @@ const EVIDENCE_STALENESS_DAYS = 14;
* → "offline" (high-confidence offline with paperwork)
* 2. physical_state = "offline" AND operatorStatement != null
* → "offline" (operator-disclosed outage)
* 3. physical_state = "offline" AND physicalStateSource ∈ {press, ais-relay, satellite}
* 3. physical_state = "offline" AND physicalStateSource ∈ {press, ais-relay, satellite, gem}
* → "disputed" (external-signal offline without operator/sanction confirmation)
* 4. physical_state = "reduced"
* → "reduced"
@@ -77,7 +77,7 @@ export function derivePipelinePublicBadge(
evidence.commercialState === 'expired' || evidence.commercialState === 'suspended';
const hasOperatorStatement = evidence.operatorStatement != null &&
((evidence.operatorStatement.text?.length ?? 0) > 0);
const hasExternalSignal = ['press', 'ais-relay', 'satellite'].includes(
const hasExternalSignal = ['press', 'ais-relay', 'satellite', 'gem'].includes(
evidence.physicalStateSource ?? '',
);

117
tests/fixtures/gem-pipelines-sample.json vendored Normal file
View File

@@ -0,0 +1,117 @@
{
"source": "Global Energy Monitor — Oil & Gas Infrastructure Trackers (CC-BY 4.0)",
"sourceVersion": "2026-Q1-fixture",
"sourceUrl": "https://globalenergymonitor.org/projects/global-oil-gas-infrastructure-tracker/",
"downloadedAt": "2026-04-25",
"_note": "Trimmed 6-row fixture for parser tests. Real input has the same shape — the operator pre-converts the GEM Excel release to this JSON form externally (Numbers / pandas / csvkit). Real production runs ingest hundreds of rows; this fixture is intentionally minimal and covers the status, productClass, capacity-unit, and bbox-validity mapping cases.",
"pipelines": [
{
"name": "Test Operating Gas Trunk",
"operator": "Test Gas Operator",
"fuel": "Natural Gas",
"product": "",
"fromCountry": "NO",
"toCountry": "DE",
"transitCountries": [],
"capacity": 24,
"capacityUnit": "bcm/y",
"lengthKm": 850,
"status": "Operating",
"startYear": 1995,
"startLat": 58.5,
"startLon": 1.7,
"endLat": 53.5,
"endLon": 8.5
},
{
"name": "Test Construction Gas Pipe",
"operator": "Test Builder",
"fuel": "Natural Gas",
"product": "",
"fromCountry": "AZ",
"toCountry": "TR",
"transitCountries": ["GE"],
"capacity": 16,
"capacityUnit": "bcm/y",
"lengthKm": 1840,
"status": "Construction",
"startYear": 2027,
"startLat": 40.4,
"startLon": 49.9,
"endLat": 40.9,
"endLon": 28.9
},
{
"name": "Test Cancelled Gas Pipeline",
"operator": "Test Cancelled Sponsor",
"fuel": "Natural Gas",
"product": "",
"fromCountry": "GR",
"toCountry": "BG",
"transitCountries": [],
"capacity": 3,
"capacityUnit": "bcm/y",
"lengthKm": 180,
"status": "Cancelled",
"startYear": null,
"startLat": 40.6,
"startLon": 22.9,
"endLat": 42.5,
"endLon": 25.0
},
{
"name": "Test Crude Oil Trunk",
"operator": "Test Oil Operator",
"fuel": "Oil",
"product": "Crude Oil",
"fromCountry": "KZ",
"toCountry": "CN",
"transitCountries": [],
"capacity": 400000,
"capacityUnit": "bbl/d",
"lengthKm": 2200,
"status": "Operating",
"startYear": 2009,
"startLat": 47.1,
"startLon": 51.9,
"endLat": 45.0,
"endLon": 87.6
},
{
"name": "Test Refined Products Line",
"operator": "Test Products Operator",
"fuel": "Oil",
"product": "Refined Products",
"fromCountry": "US",
"toCountry": "US",
"transitCountries": [],
"capacity": 0.65,
"capacityUnit": "Mbd",
"lengthKm": 1500,
"status": "Operating",
"startYear": 1962,
"startLat": 29.7,
"startLon": -94.2,
"endLat": 41.0,
"endLon": -73.9
},
{
"name": "Test Mothballed Crude Bypass",
"operator": "Test Mothballed Operator",
"fuel": "Oil",
"product": "Crude Oil",
"fromCountry": "IQ",
"toCountry": "TR",
"transitCountries": [],
"capacity": 1000000,
"capacityUnit": "bbl/d",
"lengthKm": 970,
"status": "Mothballed",
"startYear": 1977,
"startLat": 35.5,
"startLon": 44.4,
"endLat": 36.2,
"endLon": 36.1
}
]
}

View File

@@ -0,0 +1,276 @@
// @ts-check
//
// Tests for scripts/import-gem-pipelines.mjs — the GEM Oil & Gas Infrastructure
// Tracker → registry-shape parser. Test-first per the plan's Execution note: the
// schema-sentinel + status/productClass/capacity-unit mapping is the highest-
// risk failure mode, so coverage for it lands before the implementation does.
//
// Fixture: tests/fixtures/gem-pipelines-sample.json — operator-shape JSON
// (Excel pre-converted externally; the parser is local-file-only, no xlsx
// dep, no runtime URL fetch).
import { strict as assert } from 'node:assert';
import { test, describe } from 'node:test';
import { readFileSync } from 'node:fs';
import { resolve, dirname } from 'node:path';
import { fileURLToPath } from 'node:url';
import { parseGemPipelines, REQUIRED_COLUMNS } from '../scripts/import-gem-pipelines.mjs';
import { validateRegistry } from '../scripts/_pipeline-registry.mjs';
const __dirname = dirname(fileURLToPath(import.meta.url));
const fixturePath = resolve(__dirname, 'fixtures/gem-pipelines-sample.json');
const fixture = JSON.parse(readFileSync(fixturePath, 'utf-8'));
describe('import-gem-pipelines — schema sentinel', () => {
test('REQUIRED_COLUMNS is exported and non-empty', () => {
assert.ok(Array.isArray(REQUIRED_COLUMNS));
assert.ok(REQUIRED_COLUMNS.length >= 5);
});
test('throws on missing required column', () => {
const broken = {
...fixture,
pipelines: fixture.pipelines.map((p) => {
const { name: _drop, ...rest } = p;
return rest;
}),
};
assert.throws(
() => parseGemPipelines(broken),
/missing|name|schema/i,
'parser must throw on column drift, not silently accept',
);
});
test('throws on non-object input', () => {
assert.throws(() => parseGemPipelines(null), /input/i);
assert.throws(() => parseGemPipelines([]), /input|pipelines/i);
});
test('throws when pipelines field is missing', () => {
assert.throws(() => parseGemPipelines({ source: 'test' }), /pipelines/i);
});
});
describe('import-gem-pipelines — fuel split', () => {
test('splits gas + oil into two arrays', () => {
const { gas, oil } = parseGemPipelines(fixture);
assert.equal(gas.length, 3, 'fixture has 3 gas rows');
assert.equal(oil.length, 3, 'fixture has 3 oil rows');
});
test('gas pipelines do NOT carry productClass (gas registry forbids it)', () => {
const { gas } = parseGemPipelines(fixture);
for (const p of gas) {
assert.equal(p.productClass, undefined, `${p.name}: gas should not have productClass`);
}
});
test('every oil pipeline declares a productClass from the enum', () => {
const { oil } = parseGemPipelines(fixture);
for (const p of oil) {
assert.ok(
['crude', 'products', 'mixed'].includes(p.productClass),
`${p.name} has invalid productClass: ${p.productClass}`,
);
}
});
});
describe('import-gem-pipelines — status mapping', () => {
test("'Operating' maps to physicalState='flowing'", () => {
const { gas, oil } = parseGemPipelines(fixture);
const op = [...gas, ...oil].filter((p) => p.name.includes('Operating'));
assert.ok(op.length > 0);
for (const p of op) {
assert.equal(p.evidence.physicalState, 'flowing');
}
});
test("'Construction' maps to physicalState='unknown' (planned/not commissioned)", () => {
const { gas } = parseGemPipelines(fixture);
const ctr = gas.find((p) => p.name.includes('Construction'));
assert.ok(ctr);
assert.equal(ctr.evidence.physicalState, 'unknown');
});
test("'Cancelled' / 'Mothballed' map to physicalState='offline'", () => {
const { gas, oil } = parseGemPipelines(fixture);
const cancelled = gas.find((p) => p.name.includes('Cancelled'));
const mothballed = oil.find((p) => p.name.includes('Mothballed'));
assert.ok(cancelled);
assert.ok(mothballed);
assert.equal(cancelled.evidence.physicalState, 'offline');
assert.equal(mothballed.evidence.physicalState, 'offline');
});
});
describe('import-gem-pipelines — productClass mapping', () => {
test("'Crude Oil' product → productClass='crude'", () => {
const { oil } = parseGemPipelines(fixture);
const crude = oil.find((p) => p.name.includes('Crude Oil Trunk'));
assert.ok(crude);
assert.equal(crude.productClass, 'crude');
});
test("'Refined Products' product → productClass='products'", () => {
const { oil } = parseGemPipelines(fixture);
const refined = oil.find((p) => p.name.includes('Refined Products'));
assert.ok(refined);
assert.equal(refined.productClass, 'products');
});
});
describe('import-gem-pipelines — capacity-unit conversion', () => {
test('gas capacity in bcm/y is preserved unchanged', () => {
const { gas } = parseGemPipelines(fixture);
const opGas = gas.find((p) => p.name.includes('Operating'));
assert.ok(opGas);
assert.equal(opGas.capacityBcmYr, 24);
});
test('oil capacity in bbl/d is converted to Mbd (thousand barrels per day)', () => {
const { oil } = parseGemPipelines(fixture);
const crude = oil.find((p) => p.name.includes('Crude Oil Trunk'));
assert.ok(crude);
// Schema convention: the field is named `capacityMbd` (the customary
// industry abbreviation) but the VALUE is in millions of barrels per
// day, NOT thousands — matching the existing on-main hand-curated rows
// (e.g. CPC pipeline ships as `capacityMbd: 1.4` for 1.4M bbl/d).
// So 400_000 bbl/d ÷ 1_000_000 = 0.4 capacityMbd.
assert.equal(crude.capacityMbd, 0.4);
});
test('oil capacity already in Mbd is preserved unchanged', () => {
const { oil } = parseGemPipelines(fixture);
const refined = oil.find((p) => p.name.includes('Refined Products'));
assert.ok(refined);
assert.equal(refined.capacityMbd, 0.65);
});
});
describe('import-gem-pipelines — minimum-viable evidence', () => {
test('every emitted candidate has physicalStateSource=gem', () => {
const { gas, oil } = parseGemPipelines(fixture);
for (const p of [...gas, ...oil]) {
assert.equal(p.evidence.physicalStateSource, 'gem');
}
});
test('every emitted candidate has classifierVersion=gem-import-v1', () => {
const { gas, oil } = parseGemPipelines(fixture);
for (const p of [...gas, ...oil]) {
assert.equal(p.evidence.classifierVersion, 'gem-import-v1');
}
});
test('every emitted candidate has classifierConfidence ≤ 0.5', () => {
const { gas, oil } = parseGemPipelines(fixture);
for (const p of [...gas, ...oil]) {
assert.ok(p.evidence.classifierConfidence <= 0.5);
assert.ok(p.evidence.classifierConfidence >= 0);
}
});
test('every emitted candidate has empty sanctionRefs and null operatorStatement', () => {
const { gas, oil } = parseGemPipelines(fixture);
for (const p of [...gas, ...oil]) {
assert.deepEqual(p.evidence.sanctionRefs, []);
assert.equal(p.evidence.operatorStatement, null);
}
});
});
describe('import-gem-pipelines — registry-shape conformance', () => {
test('emitted gas registry passes validateRegistry', () => {
// Build a synthetic registry of just the GEM-emitted gas rows; meets the
// validator's MIN_PIPELINES_PER_REGISTRY=8 floor by repeating the 3 fixture
// rows so we exercise the schema, not the count.
const { gas } = parseGemPipelines(fixture);
const repeated = [];
for (let i = 0; i < 3; i++) {
for (const p of gas) repeated.push({ ...p, id: `${p.id}-rep${i}` });
}
const reg = {
pipelines: Object.fromEntries(repeated.map((p) => [p.id, p])),
};
assert.equal(validateRegistry(reg), true);
});
test('emitted oil registry passes validateRegistry', () => {
const { oil } = parseGemPipelines(fixture);
const repeated = [];
for (let i = 0; i < 3; i++) {
for (const p of oil) repeated.push({ ...p, id: `${p.id}-rep${i}` });
}
const reg = {
pipelines: Object.fromEntries(repeated.map((p) => [p.id, p])),
};
assert.equal(validateRegistry(reg), true);
});
});
describe('import-gem-pipelines — determinism (review-fix #3)', () => {
test('two parser runs on identical input produce identical output', () => {
// Regression: pre-fix, lastEvidenceUpdate used new Date() per run, so
// re-running parseGemPipelines on the same JSON on different days
// produced different output → noisy diffs every quarterly re-import.
// Now derived from envelope.downloadedAt, so output is byte-identical.
const r1 = JSON.stringify(parseGemPipelines(fixture));
const r2 = JSON.stringify(parseGemPipelines(fixture));
assert.equal(r1, r2);
});
test('lastEvidenceUpdate derives from envelope.downloadedAt', () => {
// Fixture has downloadedAt: 2026-04-25 → emitted as 2026-04-25T00:00:00Z.
const { gas } = parseGemPipelines(fixture);
for (const p of gas) {
assert.equal(p.evidence.lastEvidenceUpdate, '2026-04-25T00:00:00Z');
}
});
test('missing downloadedAt → epoch sentinel (loud failure, not silent today)', () => {
// If the operator forgets the date field, the emitted timestamp should
// be obviously wrong rather than today's wall clock — surfaces the
// gap in code review of the data file.
const noDate = { ...fixture };
delete noDate.downloadedAt;
delete noDate.sourceVersion;
const { gas } = parseGemPipelines(noDate);
for (const p of gas) {
assert.equal(p.evidence.lastEvidenceUpdate, '1970-01-01T00:00:00Z');
}
});
});
describe('import-gem-pipelines — coordinate validity', () => {
test('rows with invalid lat/lon are dropped (not silently kept with lat=0)', () => {
const broken = {
...fixture,
pipelines: [
...fixture.pipelines,
{
name: 'Test Bad Coords',
operator: 'X',
fuel: 'Natural Gas',
product: '',
fromCountry: 'XX',
toCountry: 'YY',
transitCountries: [],
capacity: 5,
capacityUnit: 'bcm/y',
lengthKm: 100,
status: 'Operating',
startYear: 2020,
startLat: 200, // out of range
startLon: 0,
endLat: 0,
endLon: 0,
},
],
};
const { gas } = parseGemPipelines(broken);
const bad = gas.find((p) => p.name.includes('Bad Coords'));
assert.equal(bad, undefined, 'row with out-of-range lat must be dropped, not coerced');
});
});

View File

@@ -0,0 +1,242 @@
// @ts-check
//
// Tests for scripts/_pipeline-dedup.mjs — the haversine + Jaccard dedup
// helper. Both criteria (≤5km AND ≥0.6) must hold for a match. Existing rows
// always win to preserve hand-curated evidence.
import { strict as assert } from 'node:assert';
import { test, describe } from 'node:test';
import { dedupePipelines, _internal } from '../scripts/_pipeline-dedup.mjs';
const { jaccard, averageEndpointDistanceKm, tokenize, uniqueId } = _internal;
function makePipeline(id, name, startLat, startLon, endLat, endLon) {
return {
id,
name,
startPoint: { lat: startLat, lon: startLon },
endPoint: { lat: endLat, lon: endLon },
};
}
describe('pipeline-dedup — internal helpers', () => {
test('tokenize lowercases, splits, drops stopwords', () => {
const tokens = tokenize('Trans-Siberian Pipeline System');
assert.deepEqual(tokens.sort(), ['siberian', 'trans']);
});
test('tokenize removes punctuation and accents', () => {
const tokens = tokenize('Caño LimónCoveñas Pipeline');
// After NFKD normalization + ascii-only filter, accented chars survive
// as their base letter; we accept either exact or close behaviour.
assert.ok(tokens.includes('limon') || tokens.includes('lim'),
`expected Limón to tokenize; got ${tokens.join(',')}`);
});
test('jaccard returns 1.0 for identical token sets', () => {
assert.equal(jaccard('Test Pipeline System', 'Test Pipeline'), 1.0);
});
test('jaccard returns 0 for fully disjoint names', () => {
assert.equal(jaccard('Druzhba North', 'Nord Stream'), 0);
});
test('jaccard 0.5 for half-overlap', () => {
assert.equal(jaccard('Trans Adriatic', 'Trans Caspian'), 1 / 3);
});
test('haversine distance is symmetric', () => {
const a = makePipeline('a', 'A', 60, 30, 54, 13);
const b = makePipeline('b', 'B', 60.001, 30.001, 54.001, 13.001);
assert.ok(averageEndpointDistanceKm(a, b) < 1, 'sub-km on tiny offsets');
});
test('haversine distance for far-apart pipelines is large', () => {
const a = makePipeline('a', 'A', 60, 30, 54, 13); // RU→DE
const b = makePipeline('b', 'B', 30, -90, 25, -85); // Gulf of Mexico
assert.ok(averageEndpointDistanceKm(a, b) > 5000);
});
test('uniqueId preserves base when free, suffixes when taken', () => {
const taken = new Set(['foo', 'foo-2']);
assert.equal(uniqueId('bar', taken), 'bar');
assert.equal(uniqueId('foo', taken), 'foo-3');
});
});
describe('pipeline-dedup — match logic', () => {
test('happy path: completely-different name + far endpoints → added', () => {
const existing = [makePipeline('druzhba-north', 'Druzhba Pipeline (Northern Branch)',
52.6, 49.4, 52.32, 14.06)];
const candidates = [makePipeline('nord-stream-1', 'Nord Stream 1',
60.08, 29.05, 54.14, 13.66)];
const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates);
assert.equal(toAdd.length, 1);
assert.equal(skippedDuplicates.length, 0);
});
test('match by both criteria: close endpoints + similar name → skipped (existing wins)', () => {
const existing = [makePipeline('druzhba-north', 'Druzhba Pipeline',
52.6, 49.4, 52.32, 14.06)];
const candidates = [makePipeline('druzhba-import', 'Druzhba Pipeline',
52.601, 49.401, 52.321, 14.061)];
const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates);
assert.equal(toAdd.length, 0);
assert.equal(skippedDuplicates.length, 1);
assert.equal(skippedDuplicates[0].matchedExistingId, 'druzhba-north');
});
test('name-match only (endpoints in different ocean) → added', () => {
const existing = [makePipeline('nord-stream-1', 'Nord Stream 1',
60.08, 29.05, 54.14, 13.66)];
const candidates = [makePipeline('imposter', 'Nord Stream 1',
40.0, -100.0, 35.0, -90.0)]; // different continent
const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates);
assert.equal(toAdd.length, 1, 'low haversine confidence overrides high name match');
assert.equal(skippedDuplicates.length, 0);
});
test('endpoint-match only (different name) → added (real distinct pipelines can share endpoints)', () => {
const existing = [makePipeline('yamal-europe', 'YamalEurope',
67.0, 75.0, 52.0, 14.0)];
const candidates = [makePipeline('different-route', 'Trans-Siberian Coal Slurry',
67.001, 75.001, 52.001, 14.001)];
const { toAdd } = dedupePipelines(existing, candidates);
assert.equal(toAdd.length, 1, 'name disambiguates: same endpoints, different infrastructure');
});
test('reverse-direction match: candidate endpoints flipped → still detected', () => {
const existing = [makePipeline('druzhba', 'Druzhba',
52.6, 49.4, 52.32, 14.06)];
// Same pipeline, route described in reverse direction
const candidates = [makePipeline('druzhba-flipped', 'Druzhba',
52.32, 14.06, 52.6, 49.4)];
const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates);
assert.equal(toAdd.length, 0);
assert.equal(skippedDuplicates.length, 1);
});
test('stopword-only difference: "Pipeline System" vs "Line" → matches by Jaccard', () => {
const existing = [makePipeline('trans-sib', 'Trans-Siberian Pipeline System',
55, 30, 60, 90)];
const candidates = [makePipeline('trans-sib-cand', 'Trans-Siberian Line',
55.001, 30.001, 60.001, 90.001)];
const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates);
assert.equal(toAdd.length, 0);
assert.equal(skippedDuplicates.length, 1);
assert.ok(skippedDuplicates[0].jaccard >= 0.6);
});
});
describe('pipeline-dedup — id collision', () => {
test('candidate with id colliding existing gets suffixed -2', () => {
const existing = [makePipeline('foo', 'Foo Pipeline', 0, 0, 1, 1)];
const candidates = [makePipeline('foo', 'Bar Pipeline', 50, 50, 60, 60)];
const { toAdd } = dedupePipelines(existing, candidates);
assert.equal(toAdd.length, 1);
assert.equal(toAdd[0].id, 'foo-2');
});
test('three candidates colliding the same existing id get -2, -3, -4', () => {
const existing = [makePipeline('foo', 'Foo Pipeline', 0, 0, 1, 1)];
const candidates = [
makePipeline('foo', 'Bar Pipeline', 50, 50, 60, 60),
makePipeline('foo', 'Baz Pipeline', 70, 70, 80, 80),
makePipeline('foo', 'Qux Pipeline', 30, -30, 40, -40),
];
const { toAdd } = dedupePipelines(existing, candidates);
assert.equal(toAdd.length, 3);
assert.deepEqual(
toAdd.map((p) => p.id).sort(),
['foo-2', 'foo-3', 'foo-4'],
);
});
});
describe('pipeline-dedup — determinism', () => {
test('two invocations on identical inputs produce identical output', () => {
const existing = [
makePipeline('a', 'Alpha Pipeline', 10, 10, 20, 20),
makePipeline('b', 'Beta Pipeline', 30, 30, 40, 40),
];
const candidates = [
makePipeline('a', 'Alpha Pipeline', 10.001, 10.001, 20.001, 20.001),
makePipeline('c', 'Gamma Pipeline', 50, 50, 60, 60),
];
const r1 = dedupePipelines(existing, candidates);
const r2 = dedupePipelines(existing, candidates);
assert.deepEqual(
r1.toAdd.map((p) => p.id),
r2.toAdd.map((p) => p.id),
);
assert.deepEqual(
r1.skippedDuplicates.map((d) => d.matchedExistingId),
r2.skippedDuplicates.map((d) => d.matchedExistingId),
);
});
});
describe('pipeline-dedup — within-batch dedup (review fix)', () => {
test('two candidates that match each other but not any existing → only first is added', () => {
// Regression: pre-fix, dedup compared each candidate ONLY against the
// original `existing` array, so two GEM rows for the same pipeline (e.g.
// a primary entry and a duplicate from a different source spreadsheet)
// would BOTH end up in the registry.
const candidates = [
makePipeline('east-west-saudi', 'East-West Crude Pipeline', 25, 49, 24, 38),
// Same pipeline, slightly different name + endpoints (within match
// tolerance). Should be skipped as a duplicate of the first candidate.
makePipeline('saudi-petroline', 'East-West Crude', 25.001, 49.001, 24.001, 38.001),
];
const { toAdd, skippedDuplicates } = dedupePipelines([], candidates);
assert.equal(toAdd.length, 1, 'second matching candidate must be skipped');
assert.equal(skippedDuplicates.length, 1);
assert.equal(toAdd[0].id, 'east-west-saudi', 'first-accepted candidate wins (deterministic)');
assert.equal(skippedDuplicates[0].matchedExistingId, 'east-west-saudi',
'skipped candidate matches the earlier-accepted one, not anything in `existing`');
});
test('three candidates with transitive matches collapse to one', () => {
const candidates = [
makePipeline('a', 'Druzhba', 52.6, 49.4, 52.32, 14.06),
makePipeline('b', 'Druzhba Pipeline', 52.601, 49.401, 52.321, 14.061),
makePipeline('c', 'Druzhba Line', 52.602, 49.402, 52.322, 14.062),
];
const { toAdd } = dedupePipelines([], candidates);
assert.equal(toAdd.length, 1, 'three matching candidates must collapse to the first one accepted');
});
test('existing wins over already-accepted candidate', () => {
// If a candidate matches an existing row, it must be reported as
// matching the existing row (existing-vs-toAdd precedence). Names
// chosen so Jaccard exceeds 0.6 after stopword removal.
const existing = [makePipeline('canon', 'Druzhba Northern', 52.6, 49.4, 52.32, 14.06)];
const candidates = [
makePipeline('cand-1', 'Druzhba Northern', 60, 30, 50, 14), // doesn't match existing (far endpoints)
makePipeline('cand-2', 'Druzhba Northern', 52.601, 49.401, 52.321, 14.061), // matches existing (near + Jaccard=1)
];
const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates);
assert.equal(toAdd.length, 1, 'cand-1 added; cand-2 skipped against existing');
assert.equal(skippedDuplicates[0].matchedExistingId, 'canon',
'cand-2 should be reported as matching the existing canon, not the earlier candidate');
});
});
describe('pipeline-dedup — empty inputs', () => {
test('empty existing + N candidates → all N added, none skipped', () => {
const candidates = [
makePipeline('a', 'A', 0, 0, 1, 1),
makePipeline('b', 'B', 5, 5, 6, 6),
];
const { toAdd, skippedDuplicates } = dedupePipelines([], candidates);
assert.equal(toAdd.length, 2);
assert.equal(skippedDuplicates.length, 0);
});
test('N existing + empty candidates → empty result', () => {
const existing = [makePipeline('a', 'A', 0, 0, 1, 1)];
const { toAdd, skippedDuplicates } = dedupePipelines(existing, []);
assert.equal(toAdd.length, 0);
assert.equal(skippedDuplicates.length, 0);
});
});

View File

@@ -11,6 +11,7 @@ import {
GAS_CANONICAL_KEY,
OIL_CANONICAL_KEY,
VALID_OIL_PRODUCT_CLASSES,
VALID_SOURCES,
} from '../scripts/_pipeline-registry.mjs';
const __dirname = dirname(fileURLToPath(import.meta.url));
@@ -216,3 +217,87 @@ describe('pipeline registries — validateRegistry rejects bad input', () => {
assert.equal(validateRegistry(bad), false);
});
});
describe('pipeline registries — GEM source enum', () => {
test('VALID_SOURCES exported and includes the existing six members plus gem', () => {
// Same source-of-truth pattern as VALID_OIL_PRODUCT_CLASSES (PR #3383):
// export the Set so future tests can't drift from the validator.
assert.ok(VALID_SOURCES.has('operator'));
assert.ok(VALID_SOURCES.has('regulator'));
assert.ok(VALID_SOURCES.has('press'));
assert.ok(VALID_SOURCES.has('satellite'));
assert.ok(VALID_SOURCES.has('ais-relay'));
assert.ok(VALID_SOURCES.has('gem'));
});
test('validateRegistry accepts GEM-sourced minimum-viable evidence (state=unknown)', () => {
// GEM rows ship as state=unknown until classifier promotes them.
// physicalStateSource='gem' is sufficient evidence per the audit.
const gasSample = gas.pipelines[Object.keys(gas.pipelines)[0]!];
const good = {
pipelines: Object.fromEntries(
Array.from({ length: 8 }, (_, i) => [`p${i}`, {
...gasSample,
id: `p${i}`,
evidence: {
physicalState: 'unknown',
physicalStateSource: 'gem',
commercialState: 'unknown',
operatorStatement: null,
sanctionRefs: [],
classifierVersion: 'gem-import-v1',
classifierConfidence: 0.4,
lastEvidenceUpdate: '2026-04-25T00:00:00Z',
},
}])
),
};
assert.equal(validateRegistry(good), true);
});
test('validateRegistry accepts GEM-sourced offline row (state=offline + only source=gem)', () => {
// Per plan U1 audit: 'gem' is evidence-bearing for non-flowing badges,
// parity with press/satellite/ais-relay. An offline row with no operator
// statement and no sanctionRefs but physicalStateSource='gem' should pass
// validation (the public-badge derivation downstream will then map it
// to "disputed" via the external-signal rule).
const gasSample = gas.pipelines[Object.keys(gas.pipelines)[0]!];
const good = {
pipelines: Object.fromEntries(
Array.from({ length: 8 }, (_, i) => [`p${i}`, {
...gasSample,
id: `p${i}`,
evidence: {
physicalState: 'offline',
physicalStateSource: 'gem',
commercialState: 'unknown',
operatorStatement: null,
sanctionRefs: [],
classifierVersion: 'gem-import-v1',
classifierConfidence: 0.4,
lastEvidenceUpdate: '2026-04-25T00:00:00Z',
},
}])
),
};
assert.equal(validateRegistry(good), true);
});
test('validateRegistry still rejects unknown physicalStateSource values', () => {
// Adding 'gem' must not loosen the enum — unknown sources still fail.
const gasSample = gas.pipelines[Object.keys(gas.pipelines)[0]!];
const bad = {
pipelines: Object.fromEntries(
Array.from({ length: 8 }, (_, i) => [`p${i}`, {
...gasSample,
id: `p${i}`,
evidence: {
...gasSample.evidence,
physicalStateSource: 'rumor',
},
}])
),
};
assert.equal(validateRegistry(bad), false);
});
});