diff --git a/docs/methodology/pipelines.mdx b/docs/methodology/pipelines.mdx index 2721636d6..55144cda1 100644 --- a/docs/methodology/pipelines.mdx +++ b/docs/methodology/pipelines.mdx @@ -65,6 +65,25 @@ No human review queue gates the transition — quality comes from the tiered evi Pipeline-registry data derived from [Global Energy Monitor](https://globalenergymonitor.org) (CC-BY 4.0), with additional operator and regulator material incorporated under fair-use for news reporting. +The hand-curated subset (operator/regulator/sanctions-bearing rows with classifier confidence ≥ 0.7) ships with full evidence bundles: operator statements, sanction references, last-evidence-update timestamps, and named source authorities. The GEM-imported subset (long-tail coverage rows) ships with minimum-viable evidence — `physicalStateSource: gem`, `classifierConfidence ≤ 0.5`, no operator statement, no sanction references. Both subsets pass the same registry validator and feed the same public-badge derivation. + +## Operator runbook — GEM import refresh + +GEM publishes new releases of the Oil & Gas Infrastructure Trackers roughly quarterly. The refresh is operator-mediated rather than cron-driven because the GEM download URL changes per release; a hardcoded URL would silently fetch a different version than the one we attribute. Steps: + +1. Visit the [GEM Oil & Gas Infrastructure Trackers](https://globalenergymonitor.org/projects/global-oil-gas-infrastructure-tracker/) page. Registration is required for direct download even though the data itself is CC-BY 4.0. +2. Download the latest gas + oil tracker Excel workbooks. Record the release date and download URL. +3. Pre-convert each workbook to JSON externally (Numbers / pandas / csvkit), normalizing column names to the canonical set documented in `scripts/import-gem-pipelines.mjs::REQUIRED_COLUMNS` and country names to ISO 3166-1 alpha-2 codes. +4. Run a dry pass to inspect the candidate diff: + ```bash + GEM_PIPELINES_FILE=/tmp/gem.json node scripts/import-gem-pipelines.mjs --print-candidates | jq '.gas | length, .oil | length' + ``` +5. Run the merge to write the deduplicated rows into `scripts/data/pipelines-{gas,oil}.json`. Spot-check 5-10 random GEM-sourced rows manually before committing. +6. Commit the data + bump `MIN_PIPELINES_PER_REGISTRY` in `scripts/_pipeline-registry.mjs` to a sensible new floor (e.g. 200) so future partial imports fail loud. Record the GEM release date, download URL, and SHA256 of the source workbook in the commit message. +7. Verify `npm run test:data` is green before pushing. + +Schema-drift sentinel guards against silent failures when GEM renames columns between releases — the parser throws with a clear message naming the missing column rather than producing zero-data rows. + ## Corrections See [`/corrections`](/corrections) for the planned revision-log shape diff --git a/scripts/_pipeline-dedup.mjs b/scripts/_pipeline-dedup.mjs new file mode 100644 index 000000000..99fbc991b --- /dev/null +++ b/scripts/_pipeline-dedup.mjs @@ -0,0 +1,170 @@ +// @ts-check +// +// Pure deterministic deduplication for the GEM pipeline import. NOT an entry +// point — see scripts/import-gem-pipelines.mjs for the orchestrator. +// +// Match rule (BOTH must hold): +// 1. Endpoint distance ≤ 5 km (haversine, route-direction-flipped pair-aware +// so Mozyr→Adamowo and Adamowo→Mozyr count as the same). +// 2. Name token Jaccard ≥ 0.6 (lowercased word tokens, stopwords removed). +// +// Conflict resolution: existing row WINS. Hand-curated rows have richer +// evidence (operator statements, sanction refs, classifier confidence ≥ 0.7) +// that GEM's minimum-viable evidence shouldn't overwrite. The dedup function +// returns { toAdd, skippedDuplicates } so the caller can audit which GEM +// candidates were absorbed by existing rows. +// +// Determinism: zero Date.now() / Math.random() / Set ordering reliance. Two +// invocations on identical inputs produce identical outputs. + +const STOPWORDS = new Set([ + 'pipeline', 'pipelines', 'system', 'systems', 'line', 'lines', 'network', + 'route', 'project', 'the', 'and', 'of', 'a', 'an', +]); + +const MATCH_DISTANCE_KM = 5; +const MATCH_JACCARD_MIN = 0.6; +const EARTH_RADIUS_KM = 6371; + +/** + * Haversine great-circle distance in km between two lat/lon points. + */ +function haversineKm(a, b) { + const toRad = (deg) => (deg * Math.PI) / 180; + const dLat = toRad(b.lat - a.lat); + const dLon = toRad(b.lon - a.lon); + const lat1 = toRad(a.lat); + const lat2 = toRad(b.lat); + const x = + Math.sin(dLat / 2) ** 2 + + Math.sin(dLon / 2) ** 2 * Math.cos(lat1) * Math.cos(lat2); + const c = 2 * Math.atan2(Math.sqrt(x), Math.sqrt(1 - x)); + return EARTH_RADIUS_KM * c; +} + +/** + * Average endpoint distance between two pipelines, considering both forward + * and reversed pairings. The smaller of the two is returned so a route + * direction flip doesn't appear as a different pipeline. + */ +function averageEndpointDistanceKm(a, b) { + const forward = + (haversineKm(a.startPoint, b.startPoint) + haversineKm(a.endPoint, b.endPoint)) / 2; + const reversed = + (haversineKm(a.startPoint, b.endPoint) + haversineKm(a.endPoint, b.startPoint)) / 2; + return Math.min(forward, reversed); +} + +/** + * Tokenize a name: lowercased word tokens, ASCII-only word boundaries, + * stopwords removed. Stable across invocations. + */ +function tokenize(name) { + return name + .toLowerCase() + .normalize('NFKD') + // Strip combining marks (diacritics) so "Limón" → "limon", not "limo'n". + // Range ̀-ͯ covers Combining Diacritical Marks per Unicode. + .replace(/[̀-ͯ]/g, '') + .replace(/[^a-z0-9 ]+/g, ' ') + .split(/\s+/) + .filter((t) => t.length > 0 && !STOPWORDS.has(t)); +} + +/** + * Jaccard similarity = |A ∩ B| / |A ∪ B| over token sets. + */ +function jaccard(a, b) { + const setA = new Set(tokenize(a)); + const setB = new Set(tokenize(b)); + if (setA.size === 0 && setB.size === 0) return 0; + let intersection = 0; + for (const t of setA) if (setB.has(t)) intersection++; + const unionSize = setA.size + setB.size - intersection; + return unionSize === 0 ? 0 : intersection / unionSize; +} + +/** + * Decide if a candidate matches an existing row. Both criteria required. + */ +function isDuplicate(candidate, existing) { + const dist = averageEndpointDistanceKm(candidate, existing); + if (dist > MATCH_DISTANCE_KM) return false; + const sim = jaccard(candidate.name, existing.name); + return sim >= MATCH_JACCARD_MIN; +} + +/** + * Disambiguate a candidate's id against existing ids by appending -2, -3, ... + * until unique. Stable: same input → same output. + */ +function uniqueId(baseId, takenIds) { + if (!takenIds.has(baseId)) return baseId; + let n = 2; + while (takenIds.has(`${baseId}-${n}`)) n++; + return `${baseId}-${n}`; +} + +/** + * Pure dedup function. + * + * @param {Array<{ id: string, name: string, startPoint: {lat:number,lon:number}, endPoint: {lat:number,lon:number} }>} existing + * @param {Array<{ id: string, name: string, startPoint: {lat:number,lon:number}, endPoint: {lat:number,lon:number} }>} candidates + * @returns {{ toAdd: any[], skippedDuplicates: Array<{ candidate: any, matchedExistingId: string, distanceKm: number, jaccard: number }> }} + */ +export function dedupePipelines(existing, candidates) { + const taken = new Set(existing.map((p) => p.id)); + const toAdd = []; + const skippedDuplicates = []; + + for (const cand of candidates) { + // Compare against BOTH existing rows AND candidates already accepted + // into toAdd. Without this, two GEM rows that match each other but + // not anything in `existing` would both be added — duplicate-import + // bug. Existing rows still win on cross-set match (they have richer + // hand-curated evidence); within-toAdd matches retain the FIRST + // accepted candidate (deterministic by candidate-list order). + let matched = null; + for (const ex of existing) { + if (isDuplicate(cand, ex)) { + matched = ex; + break; + } + } + if (!matched) { + for (const earlier of toAdd) { + if (isDuplicate(cand, earlier)) { + matched = earlier; + break; + } + } + } + if (matched) { + skippedDuplicates.push({ + candidate: cand, + matchedExistingId: matched.id, + distanceKm: averageEndpointDistanceKm(cand, matched), + jaccard: jaccard(cand.name, matched.name), + }); + continue; + } + const finalId = uniqueId(cand.id, taken); + taken.add(finalId); + toAdd.push({ ...cand, id: finalId }); + } + + return { toAdd, skippedDuplicates }; +} + +// Internal exports for test coverage; not part of the public surface. +export const _internal = { + haversineKm, + averageEndpointDistanceKm, + tokenize, + jaccard, + isDuplicate, + uniqueId, + STOPWORDS, + MATCH_DISTANCE_KM, + MATCH_JACCARD_MIN, +}; diff --git a/scripts/_pipeline-registry.mjs b/scripts/_pipeline-registry.mjs index ff1a769b9..a74b5aef3 100644 --- a/scripts/_pipeline-registry.mjs +++ b/scripts/_pipeline-registry.mjs @@ -29,7 +29,13 @@ export const PIPELINES_TTL_SECONDS = 21 * 24 * 3600; const VALID_PHYSICAL_STATES = new Set(['flowing', 'reduced', 'offline', 'unknown']); const VALID_COMMERCIAL_STATES = new Set(['under_contract', 'expired', 'suspended', 'unknown']); -const VALID_SOURCES = new Set(['operator', 'regulator', 'press', 'satellite', 'ais-relay']); +// `gem` covers rows imported from Global Energy Monitor's Oil & Gas +// Infrastructure Trackers (CC-BY 4.0). Treated as an evidence-bearing source +// for non-flowing badges in the same way as `press` / `satellite` / `ais-relay`, +// since GEM is an academic/curated dataset with traceable provenance — not a +// silent default. Exported alongside VALID_OIL_PRODUCT_CLASSES so test suites +// can assert against the same source of truth the validator uses. +export const VALID_SOURCES = new Set(['operator', 'regulator', 'press', 'satellite', 'ais-relay', 'gem']); // Required on every oil pipeline. `crude` = crude-oil lines (default), // `products` = refined-product lines (gasoline/diesel/jet), `mixed` = // dual-use bridges moving both. Gas pipelines don't carry this field @@ -104,13 +110,16 @@ export function validateRegistry(data) { // Every non-`flowing` badge requires at least one evidence field with signal. // This prevents shipping an `offline` label with zero supporting evidence. + // `gem` joins the evidence-bearing sources because GEM is a curated + // academic dataset with traceable provenance, not a silent default. if (ev.physicalState !== 'flowing') { const hasEvidence = ev.operatorStatement != null || ev.sanctionRefs.length > 0 || ev.physicalStateSource === 'ais-relay' || ev.physicalStateSource === 'satellite' || - ev.physicalStateSource === 'press'; + ev.physicalStateSource === 'press' || + ev.physicalStateSource === 'gem'; if (!hasEvidence) return false; } } diff --git a/scripts/import-gem-pipelines.mjs b/scripts/import-gem-pipelines.mjs new file mode 100644 index 000000000..cbea7e2a6 --- /dev/null +++ b/scripts/import-gem-pipelines.mjs @@ -0,0 +1,429 @@ +// @ts-check +// +// One-shot import: GEM Oil & Gas Infrastructure Trackers (CC-BY 4.0) → +// scripts/data/pipelines-{gas,oil}.json shape. +// +// PROVENANCE / OPERATOR-MEDIATED: +// This script is INTENTIONALLY local-file-only — it does NOT fetch GEM at +// runtime. The GEM download URL changes per release; a hardcoded URL would +// silently fetch a different version than the one we attribute. The +// operator runs: +// +// 1. Visit https://globalenergymonitor.org/projects/global-oil-gas-infrastructure-tracker/ +// (registration required for direct download even though the data +// itself is CC-BY 4.0 licensed). +// 2. Download the latest gas + oil tracker Excel workbooks. +// 3. Pre-convert each workbook's primary sheet to JSON (Numbers / +// pandas / csvkit / equivalent) using the canonical column names +// documented in REQUIRED_COLUMNS below. Country names should be +// pre-mapped to ISO 3166-1 alpha-2 codes during conversion. +// 4. Save the JSON to a local path and run this script with: +// GEM_PIPELINES_FILE=/path/to/gem.json node scripts/import-gem-pipelines.mjs --merge +// 5. Record the GEM release date + download URL + file SHA256 in the +// commit message and docs/methodology/pipelines.mdx, per the +// seed-imf-external.mjs provenance pattern. +// +// EXECUTION MODES: +// --print-candidates : parse + print candidates as JSON to stdout (dry run) +// --merge : parse, dedupe against existing pipelines-{gas,oil}.json, +// write merged JSON to disk, abort on validate failure +// +// NO xlsx DEPENDENCY: the operator pre-converts externally; this keeps the +// runtime dependency surface tight and avoids the known CVE history of the +// xlsx package for a quarterly one-shot operation. + +import { readFileSync, writeFileSync } from 'node:fs'; +import { dirname, resolve as resolvePath } from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { dedupePipelines } from './_pipeline-dedup.mjs'; +import { validateRegistry } from './_pipeline-registry.mjs'; + +/** + * Canonical input columns. The operator's Excel-to-JSON conversion must + * preserve these EXACT key names for each row in `pipelines[]`. Schema-drift + * sentinel below throws on missing keys before any data is emitted. + */ +export const REQUIRED_COLUMNS = [ + 'name', + 'operator', + 'fuel', // 'Natural Gas' | 'Oil' + 'fromCountry', // ISO 3166-1 alpha-2 + 'toCountry', // ISO 3166-1 alpha-2 + 'transitCountries', // string[] (may be empty) + 'capacity', + 'capacityUnit', // 'bcm/y' | 'bbl/d' | 'Mbd' + 'lengthKm', + 'status', // GEM Status string (mapped below) + 'startLat', + 'startLon', + 'endLat', + 'endLon', +]; + +/** + * Maps GEM status strings to our `physicalState` enum. + * Default: 'unknown' — falls into the "treat as not commissioned" bucket. + */ +const STATUS_MAP = { + Operating: 'flowing', + Operational: 'flowing', + Construction: 'unknown', + Proposed: 'unknown', + Cancelled: 'offline', + Mothballed: 'offline', + Idle: 'offline', + 'Shut-in': 'offline', +}; + +/** + * Maps GEM `product` field to our `productClass` enum (oil only). + */ +const PRODUCT_CLASS_MAP = { + 'Crude Oil': 'crude', + Crude: 'crude', + 'Refined Products': 'products', + 'Petroleum Products': 'products', + Products: 'products', + Mixed: 'mixed', + 'Crude/Products': 'mixed', +}; + +const VALID_LAT = (v) => Number.isFinite(v) && v >= -90 && v <= 90; +const VALID_LON = (v) => Number.isFinite(v) && v >= -180 && v <= 180; + +function slugify(name, country) { + const base = name.toLowerCase() + .replace(/[^a-z0-9]+/g, '-') + .replace(/^-+|-+$/g, '') + .slice(0, 60); + return `${base}-${country.toLowerCase()}`; +} + +function inferFuel(row) { + const f = String(row.fuel ?? '').toLowerCase(); + if (f.includes('gas')) return 'gas'; + if (f.includes('oil') || f.includes('crude') || f.includes('petroleum')) return 'oil'; + return null; +} + +function mapStatus(gemStatus) { + return STATUS_MAP[gemStatus] ?? 'unknown'; +} + +function mapProductClass(rawProduct) { + if (!rawProduct) return 'crude'; // conservative default per plan U2 + const cls = PRODUCT_CLASS_MAP[rawProduct]; + if (cls) return cls; + // Best-effort substring match for Excel column variations + const lower = rawProduct.toLowerCase(); + if (lower.includes('crude') && lower.includes('product')) return 'mixed'; + if (lower.includes('crude')) return 'crude'; + if (lower.includes('product') || lower.includes('refined')) return 'products'; + return 'crude'; +} + +function convertCapacityToBcmYr(value, unit) { + if (unit === 'bcm/y' || unit === 'bcm/yr') return Number(value); + // Future: add bcf/d → bcm/y conversion if needed. Throw loudly so the + // operator notices instead of silently writing zeros. + throw new Error(`Unsupported gas capacity unit: ${unit}. Expected 'bcm/y'.`); +} + +function convertCapacityToMbd(value, unit) { + // Schema convention: capacityMbd is in MILLION barrels per day (e.g. CPC + // pipeline = 1.4 Mbd = 1.4M bbl/day). So conversions: + // 'Mbd' → preserved + // 'bbl/d' → divide by 1_000_000 + // 'kbd' → divide by 1_000 (rare) + if (unit === 'Mbd') return Number(value); + if (unit === 'bbl/d') return Number(value) / 1_000_000; + if (unit === 'kbd') return Number(value) / 1_000; + throw new Error(`Unsupported oil capacity unit: ${unit}. Expected 'Mbd' / 'bbl/d' / 'kbd'.`); +} + +/** + * Resolve `lastEvidenceUpdate` for emitted candidates. Prefers the + * operator-recorded `downloadedAt` (or `sourceVersion` if it parses) so + * two parser runs on the same input produce byte-identical output. + * Falls back to the unix-epoch sentinel `1970-01-01` rather than + * `new Date()` — the fallback is deliberately ugly so anyone reviewing + * the data file sees that the operator forgot to set the date and re-runs. + * + * @param {Record} envelope + */ +function resolveEvidenceTimestamp(envelope) { + const candidates = [envelope.downloadedAt, envelope.sourceVersion]; + for (const v of candidates) { + if (typeof v === 'string') { + // Accept full ISO strings OR bare YYYY-MM-DD; coerce to midnight-UTC. + const isoMatch = v.match(/^\d{4}-\d{2}-\d{2}/); + if (isoMatch) return `${isoMatch[0]}T00:00:00Z`; + } + } + // Sentinel: GEM data SHOULD always carry downloadedAt per the operator + // runbook. If neither field is present, surface the gap loudly via the + // epoch date — it'll show up obviously in the diff. + return '1970-01-01T00:00:00Z'; +} + +/** + * Parse a GEM-shape JSON object into our two-registry candidate arrays. + * + * @param {unknown} data + * @returns {{ gas: any[], oil: any[] }} + * @throws {Error} on schema drift, malformed input, or unknown capacity units. + */ +export function parseGemPipelines(data) { + if (!data || typeof data !== 'object' || Array.isArray(data)) { + throw new Error('parseGemPipelines: input must be an object'); + } + const obj = /** @type {Record} */ (data); + if (!Array.isArray(obj.pipelines)) { + throw new Error('parseGemPipelines: input must contain pipelines[] array'); + } + // Compute once per parse run so every emitted candidate gets the SAME + // timestamp — and so two runs on identical input produce byte-identical + // JSON (Greptile P2 on PR #3397: previous use of `new Date().toISOString()` + // made re-running the parser produce a noisy diff every time). + const evidenceTimestamp = resolveEvidenceTimestamp(obj); + + // Schema sentinel: assert every required column is present on every row. + // GEM occasionally renames columns between releases; the operator's + // conversion step is supposed to normalize, but we double-check here so + // a missed rename fails loud instead of producing silent zero-data. + for (const [i, row] of obj.pipelines.entries()) { + if (!row || typeof row !== 'object') { + throw new Error(`parseGemPipelines: pipelines[${i}] is not an object`); + } + const r = /** @type {Record} */ (row); + for (const col of REQUIRED_COLUMNS) { + if (!(col in r)) { + throw new Error( + `parseGemPipelines: schema drift — pipelines[${i}] missing column "${col}". ` + + `Re-run the operator's Excel→JSON conversion using the canonical ` + + `column names documented in scripts/import-gem-pipelines.mjs::REQUIRED_COLUMNS.`, + ); + } + } + } + + const gas = []; + const oil = []; + const droppedReasons = { fuel: 0, coords: 0, capacity: 0 }; + + for (const row of obj.pipelines) { + const r = /** @type {Record} */ (row); + const fuel = inferFuel(r); + if (!fuel) { + droppedReasons.fuel++; + continue; + } + + const startLat = Number(r.startLat); + const startLon = Number(r.startLon); + const endLat = Number(r.endLat); + const endLon = Number(r.endLon); + if (!VALID_LAT(startLat) || !VALID_LON(startLon) || !VALID_LAT(endLat) || !VALID_LON(endLon)) { + droppedReasons.coords++; + continue; + } + + let capacityField, capacityValue; + try { + if (fuel === 'gas') { + capacityField = 'capacityBcmYr'; + capacityValue = convertCapacityToBcmYr(r.capacity, r.capacityUnit); + } else { + capacityField = 'capacityMbd'; + capacityValue = convertCapacityToMbd(r.capacity, r.capacityUnit); + } + } catch (err) { + // Unsupported unit → drop the row; let the operator notice via the count + // delta in dry-run output. Throwing would abort the entire run on a + // single bad row, which is too brittle. + droppedReasons.capacity++; + continue; + } + if (!Number.isFinite(capacityValue) || capacityValue <= 0) { + droppedReasons.capacity++; + continue; + } + + const id = slugify(r.name, r.fromCountry); + const transitCountries = Array.isArray(r.transitCountries) + ? r.transitCountries.filter((c) => typeof c === 'string') + : []; + + const candidate = { + id, + name: r.name, + operator: r.operator, + commodityType: fuel, + fromCountry: r.fromCountry, + toCountry: r.toCountry, + transitCountries, + [capacityField]: capacityValue, + lengthKm: Number(r.lengthKm) || 0, + inService: Number(r.startYear) || 0, + startPoint: { lat: startLat, lon: startLon }, + endPoint: { lat: endLat, lon: endLon }, + evidence: { + physicalState: mapStatus(r.status), + physicalStateSource: 'gem', + operatorStatement: null, + commercialState: 'unknown', + sanctionRefs: [], + lastEvidenceUpdate: evidenceTimestamp, + classifierVersion: 'gem-import-v1', + classifierConfidence: 0.4, + }, + }; + + if (fuel === 'oil') { + candidate.productClass = mapProductClass(r.product); + } + + (fuel === 'gas' ? gas : oil).push(candidate); + } + + return { gas, oil }; +} + +/** + * Read a GEM-shape JSON file and return parsed candidates. Returns the same + * shape as parseGemPipelines but accepts a file path instead of an in-memory + * object — useful for CLI and dedup pipelines. + * + * @param {string} filePath + * @returns {{ gas: any[], oil: any[] }} + */ +export function loadGemPipelinesFromFile(filePath) { + const raw = readFileSync(filePath, 'utf-8'); + let data; + try { + data = JSON.parse(raw); + } catch (err) { + throw new Error( + `parseGemPipelines: file at ${filePath} is not valid JSON. ` + + `Did the operator pre-convert the GEM Excel correctly?`, + ); + } + return parseGemPipelines(data); +} + +/** + * Read an existing registry file and return its parsed envelope. + * @param {string} filename + */ +function loadExistingRegistry(filename) { + const __dirname = dirname(fileURLToPath(import.meta.url)); + const path = resolvePath(__dirname, 'data', filename); + const raw = readFileSync(path, 'utf-8'); + return { path, envelope: JSON.parse(raw) }; +} + +/** + * Build (but do NOT write) a merged registry envelope. Pure: no disk I/O. + * Throws on validation failure so the caller can short-circuit before any + * file is written. + * + * @param {string} filename - 'pipelines-gas.json' or 'pipelines-oil.json' + * @param {any[]} candidates - parser output for that fuel + * @returns {{ path: string, mergedEnvelope: any, added: number, skipped: number, total: number }} + */ +function prepareMerge(filename, candidates) { + const { path, envelope } = loadExistingRegistry(filename); + const existing = Object.values(envelope.pipelines ?? {}); + const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates); + + // Append in a stable order (alphabetical-by-id) so repeated runs produce + // a clean diff. Hand-curated rows keep their original ordering at the top. + const appended = [...toAdd].sort((a, b) => a.id.localeCompare(b.id)); + const mergedPipelines = { ...envelope.pipelines }; + for (const p of appended) mergedPipelines[p.id] = p; + + const mergedEnvelope = { + ...envelope, + source: envelope.source?.includes('Global Energy Monitor') + ? envelope.source + : `${envelope.source ?? 'Hand-curated'} + Global Energy Monitor (CC-BY 4.0)`, + pipelines: mergedPipelines, + }; + + if (!validateRegistry(mergedEnvelope)) { + throw new Error( + `prepareMerge: merged ${filename} would FAIL validateRegistry. ` + + `Aborting before writing to disk. Inspect the diff with --print-candidates first.`, + ); + } + + return { + path, + mergedEnvelope, + added: toAdd.length, + skipped: skippedDuplicates.length, + total: Object.keys(mergedPipelines).length, + }; +} + +/** + * Cross-file-atomic merge: builds AND validates BOTH gas + oil envelopes + * before writing EITHER file. If oil validation fails after gas already + * succeeded, neither is written — prevents the half-imported state where + * gas has GEM rows on disk but oil doesn't. + * + * Two-phase: prepare both → write both. Pure prepare phase, side-effecting + * write phase. Order of writes is stable (gas first, oil second), but the + * "validate everything before any write" guarantee is what prevents + * partial state on failure. + * + * @returns {{ gas: ReturnType, oil: ReturnType }} + */ +function mergeBothRegistries(gasCandidates, oilCandidates) { + // Phase 1: prepare + validate BOTH. If either throws, neither file is + // touched on disk. + const gas = prepareMerge('pipelines-gas.json', gasCandidates); + const oil = prepareMerge('pipelines-oil.json', oilCandidates); + + // Phase 2: both validated → write both. + writeFileSync(gas.path, JSON.stringify(gas.mergedEnvelope, null, 2) + '\n'); + writeFileSync(oil.path, JSON.stringify(oil.mergedEnvelope, null, 2) + '\n'); + + return { gas, oil }; +} + +// CLI entry point: only fires when this file is the entry script. +if (process.argv[1] && process.argv[1].endsWith('import-gem-pipelines.mjs')) { + const filePath = process.env.GEM_PIPELINES_FILE; + if (!filePath) { + console.error('GEM_PIPELINES_FILE env var not set. See script header for operator runbook.'); + process.exit(1); + } + const args = new Set(process.argv.slice(2)); + const { gas, oil } = loadGemPipelinesFromFile(filePath); + if (args.has('--print-candidates')) { + process.stdout.write(JSON.stringify({ gas, oil }, null, 2) + '\n'); + } else if (args.has('--merge')) { + try { + // mergeBothRegistries validates BOTH envelopes before writing + // either — so a validation failure on oil after gas succeeded + // leaves neither file modified on disk. Prevents the half-imported + // state the previous per-file flow could produce. + const { gas: gasResult, oil: oilResult } = mergeBothRegistries(gas, oil); + console.error(`gas: +${gasResult.added} added, ${gasResult.skipped} duplicates skipped, ${gasResult.total} total`); + console.error(`oil: +${oilResult.added} added, ${oilResult.skipped} duplicates skipped, ${oilResult.total} total`); + console.error( + `Wrote merged data to scripts/data/pipelines-{gas,oil}.json. ` + + `Inspect the diff before committing. Per the operator runbook, ` + + `also update MIN_PIPELINES_PER_REGISTRY in scripts/_pipeline-registry.mjs ` + + `to a sensible new floor (e.g. 200) once the data is in.`, + ); + } catch (err) { + console.error(err instanceof Error ? err.message : String(err)); + process.exit(2); + } + } else { + console.error('Pass --print-candidates (dry run) or --merge (write to data files).'); + process.exit(1); + } +} diff --git a/src/shared/pipeline-evidence.ts b/src/shared/pipeline-evidence.ts index 5796c3e29..581d9df34 100644 --- a/src/shared/pipeline-evidence.ts +++ b/src/shared/pipeline-evidence.ts @@ -24,7 +24,7 @@ export type PipelinePublicBadge = 'flowing' | 'reduced' | 'offline' | 'disputed' export interface PipelineEvidenceInput { physicalState?: string; // 'flowing'|'reduced'|'offline'|'unknown' - physicalStateSource?: string; // 'operator'|'regulator'|'press'|'satellite'|'ais-relay' + physicalStateSource?: string; // 'operator'|'regulator'|'press'|'satellite'|'ais-relay'|'gem' operatorStatement?: { text?: string; url?: string; date?: string } | null; commercialState?: string; // 'under_contract'|'expired'|'suspended'|'unknown' sanctionRefs?: ReadonlyArray<{ authority?: string; listId?: string; date?: string; url?: string }>; @@ -49,7 +49,7 @@ const EVIDENCE_STALENESS_DAYS = 14; * → "offline" (high-confidence offline with paperwork) * 2. physical_state = "offline" AND operatorStatement != null * → "offline" (operator-disclosed outage) - * 3. physical_state = "offline" AND physicalStateSource ∈ {press, ais-relay, satellite} + * 3. physical_state = "offline" AND physicalStateSource ∈ {press, ais-relay, satellite, gem} * → "disputed" (external-signal offline without operator/sanction confirmation) * 4. physical_state = "reduced" * → "reduced" @@ -77,7 +77,7 @@ export function derivePipelinePublicBadge( evidence.commercialState === 'expired' || evidence.commercialState === 'suspended'; const hasOperatorStatement = evidence.operatorStatement != null && ((evidence.operatorStatement.text?.length ?? 0) > 0); - const hasExternalSignal = ['press', 'ais-relay', 'satellite'].includes( + const hasExternalSignal = ['press', 'ais-relay', 'satellite', 'gem'].includes( evidence.physicalStateSource ?? '', ); diff --git a/tests/fixtures/gem-pipelines-sample.json b/tests/fixtures/gem-pipelines-sample.json new file mode 100644 index 000000000..43720cdfa --- /dev/null +++ b/tests/fixtures/gem-pipelines-sample.json @@ -0,0 +1,117 @@ +{ + "source": "Global Energy Monitor — Oil & Gas Infrastructure Trackers (CC-BY 4.0)", + "sourceVersion": "2026-Q1-fixture", + "sourceUrl": "https://globalenergymonitor.org/projects/global-oil-gas-infrastructure-tracker/", + "downloadedAt": "2026-04-25", + "_note": "Trimmed 6-row fixture for parser tests. Real input has the same shape — the operator pre-converts the GEM Excel release to this JSON form externally (Numbers / pandas / csvkit). Real production runs ingest hundreds of rows; this fixture is intentionally minimal and covers the status, productClass, capacity-unit, and bbox-validity mapping cases.", + "pipelines": [ + { + "name": "Test Operating Gas Trunk", + "operator": "Test Gas Operator", + "fuel": "Natural Gas", + "product": "", + "fromCountry": "NO", + "toCountry": "DE", + "transitCountries": [], + "capacity": 24, + "capacityUnit": "bcm/y", + "lengthKm": 850, + "status": "Operating", + "startYear": 1995, + "startLat": 58.5, + "startLon": 1.7, + "endLat": 53.5, + "endLon": 8.5 + }, + { + "name": "Test Construction Gas Pipe", + "operator": "Test Builder", + "fuel": "Natural Gas", + "product": "", + "fromCountry": "AZ", + "toCountry": "TR", + "transitCountries": ["GE"], + "capacity": 16, + "capacityUnit": "bcm/y", + "lengthKm": 1840, + "status": "Construction", + "startYear": 2027, + "startLat": 40.4, + "startLon": 49.9, + "endLat": 40.9, + "endLon": 28.9 + }, + { + "name": "Test Cancelled Gas Pipeline", + "operator": "Test Cancelled Sponsor", + "fuel": "Natural Gas", + "product": "", + "fromCountry": "GR", + "toCountry": "BG", + "transitCountries": [], + "capacity": 3, + "capacityUnit": "bcm/y", + "lengthKm": 180, + "status": "Cancelled", + "startYear": null, + "startLat": 40.6, + "startLon": 22.9, + "endLat": 42.5, + "endLon": 25.0 + }, + { + "name": "Test Crude Oil Trunk", + "operator": "Test Oil Operator", + "fuel": "Oil", + "product": "Crude Oil", + "fromCountry": "KZ", + "toCountry": "CN", + "transitCountries": [], + "capacity": 400000, + "capacityUnit": "bbl/d", + "lengthKm": 2200, + "status": "Operating", + "startYear": 2009, + "startLat": 47.1, + "startLon": 51.9, + "endLat": 45.0, + "endLon": 87.6 + }, + { + "name": "Test Refined Products Line", + "operator": "Test Products Operator", + "fuel": "Oil", + "product": "Refined Products", + "fromCountry": "US", + "toCountry": "US", + "transitCountries": [], + "capacity": 0.65, + "capacityUnit": "Mbd", + "lengthKm": 1500, + "status": "Operating", + "startYear": 1962, + "startLat": 29.7, + "startLon": -94.2, + "endLat": 41.0, + "endLon": -73.9 + }, + { + "name": "Test Mothballed Crude Bypass", + "operator": "Test Mothballed Operator", + "fuel": "Oil", + "product": "Crude Oil", + "fromCountry": "IQ", + "toCountry": "TR", + "transitCountries": [], + "capacity": 1000000, + "capacityUnit": "bbl/d", + "lengthKm": 970, + "status": "Mothballed", + "startYear": 1977, + "startLat": 35.5, + "startLon": 44.4, + "endLat": 36.2, + "endLon": 36.1 + } + ] +} diff --git a/tests/import-gem-pipelines.test.mjs b/tests/import-gem-pipelines.test.mjs new file mode 100644 index 000000000..e68adc2f7 --- /dev/null +++ b/tests/import-gem-pipelines.test.mjs @@ -0,0 +1,276 @@ +// @ts-check +// +// Tests for scripts/import-gem-pipelines.mjs — the GEM Oil & Gas Infrastructure +// Tracker → registry-shape parser. Test-first per the plan's Execution note: the +// schema-sentinel + status/productClass/capacity-unit mapping is the highest- +// risk failure mode, so coverage for it lands before the implementation does. +// +// Fixture: tests/fixtures/gem-pipelines-sample.json — operator-shape JSON +// (Excel pre-converted externally; the parser is local-file-only, no xlsx +// dep, no runtime URL fetch). + +import { strict as assert } from 'node:assert'; +import { test, describe } from 'node:test'; +import { readFileSync } from 'node:fs'; +import { resolve, dirname } from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { parseGemPipelines, REQUIRED_COLUMNS } from '../scripts/import-gem-pipelines.mjs'; +import { validateRegistry } from '../scripts/_pipeline-registry.mjs'; + +const __dirname = dirname(fileURLToPath(import.meta.url)); +const fixturePath = resolve(__dirname, 'fixtures/gem-pipelines-sample.json'); +const fixture = JSON.parse(readFileSync(fixturePath, 'utf-8')); + +describe('import-gem-pipelines — schema sentinel', () => { + test('REQUIRED_COLUMNS is exported and non-empty', () => { + assert.ok(Array.isArray(REQUIRED_COLUMNS)); + assert.ok(REQUIRED_COLUMNS.length >= 5); + }); + + test('throws on missing required column', () => { + const broken = { + ...fixture, + pipelines: fixture.pipelines.map((p) => { + const { name: _drop, ...rest } = p; + return rest; + }), + }; + assert.throws( + () => parseGemPipelines(broken), + /missing|name|schema/i, + 'parser must throw on column drift, not silently accept', + ); + }); + + test('throws on non-object input', () => { + assert.throws(() => parseGemPipelines(null), /input/i); + assert.throws(() => parseGemPipelines([]), /input|pipelines/i); + }); + + test('throws when pipelines field is missing', () => { + assert.throws(() => parseGemPipelines({ source: 'test' }), /pipelines/i); + }); +}); + +describe('import-gem-pipelines — fuel split', () => { + test('splits gas + oil into two arrays', () => { + const { gas, oil } = parseGemPipelines(fixture); + assert.equal(gas.length, 3, 'fixture has 3 gas rows'); + assert.equal(oil.length, 3, 'fixture has 3 oil rows'); + }); + + test('gas pipelines do NOT carry productClass (gas registry forbids it)', () => { + const { gas } = parseGemPipelines(fixture); + for (const p of gas) { + assert.equal(p.productClass, undefined, `${p.name}: gas should not have productClass`); + } + }); + + test('every oil pipeline declares a productClass from the enum', () => { + const { oil } = parseGemPipelines(fixture); + for (const p of oil) { + assert.ok( + ['crude', 'products', 'mixed'].includes(p.productClass), + `${p.name} has invalid productClass: ${p.productClass}`, + ); + } + }); +}); + +describe('import-gem-pipelines — status mapping', () => { + test("'Operating' maps to physicalState='flowing'", () => { + const { gas, oil } = parseGemPipelines(fixture); + const op = [...gas, ...oil].filter((p) => p.name.includes('Operating')); + assert.ok(op.length > 0); + for (const p of op) { + assert.equal(p.evidence.physicalState, 'flowing'); + } + }); + + test("'Construction' maps to physicalState='unknown' (planned/not commissioned)", () => { + const { gas } = parseGemPipelines(fixture); + const ctr = gas.find((p) => p.name.includes('Construction')); + assert.ok(ctr); + assert.equal(ctr.evidence.physicalState, 'unknown'); + }); + + test("'Cancelled' / 'Mothballed' map to physicalState='offline'", () => { + const { gas, oil } = parseGemPipelines(fixture); + const cancelled = gas.find((p) => p.name.includes('Cancelled')); + const mothballed = oil.find((p) => p.name.includes('Mothballed')); + assert.ok(cancelled); + assert.ok(mothballed); + assert.equal(cancelled.evidence.physicalState, 'offline'); + assert.equal(mothballed.evidence.physicalState, 'offline'); + }); +}); + +describe('import-gem-pipelines — productClass mapping', () => { + test("'Crude Oil' product → productClass='crude'", () => { + const { oil } = parseGemPipelines(fixture); + const crude = oil.find((p) => p.name.includes('Crude Oil Trunk')); + assert.ok(crude); + assert.equal(crude.productClass, 'crude'); + }); + + test("'Refined Products' product → productClass='products'", () => { + const { oil } = parseGemPipelines(fixture); + const refined = oil.find((p) => p.name.includes('Refined Products')); + assert.ok(refined); + assert.equal(refined.productClass, 'products'); + }); +}); + +describe('import-gem-pipelines — capacity-unit conversion', () => { + test('gas capacity in bcm/y is preserved unchanged', () => { + const { gas } = parseGemPipelines(fixture); + const opGas = gas.find((p) => p.name.includes('Operating')); + assert.ok(opGas); + assert.equal(opGas.capacityBcmYr, 24); + }); + + test('oil capacity in bbl/d is converted to Mbd (thousand barrels per day)', () => { + const { oil } = parseGemPipelines(fixture); + const crude = oil.find((p) => p.name.includes('Crude Oil Trunk')); + assert.ok(crude); + // Schema convention: the field is named `capacityMbd` (the customary + // industry abbreviation) but the VALUE is in millions of barrels per + // day, NOT thousands — matching the existing on-main hand-curated rows + // (e.g. CPC pipeline ships as `capacityMbd: 1.4` for 1.4M bbl/d). + // So 400_000 bbl/d ÷ 1_000_000 = 0.4 capacityMbd. + assert.equal(crude.capacityMbd, 0.4); + }); + + test('oil capacity already in Mbd is preserved unchanged', () => { + const { oil } = parseGemPipelines(fixture); + const refined = oil.find((p) => p.name.includes('Refined Products')); + assert.ok(refined); + assert.equal(refined.capacityMbd, 0.65); + }); +}); + +describe('import-gem-pipelines — minimum-viable evidence', () => { + test('every emitted candidate has physicalStateSource=gem', () => { + const { gas, oil } = parseGemPipelines(fixture); + for (const p of [...gas, ...oil]) { + assert.equal(p.evidence.physicalStateSource, 'gem'); + } + }); + + test('every emitted candidate has classifierVersion=gem-import-v1', () => { + const { gas, oil } = parseGemPipelines(fixture); + for (const p of [...gas, ...oil]) { + assert.equal(p.evidence.classifierVersion, 'gem-import-v1'); + } + }); + + test('every emitted candidate has classifierConfidence ≤ 0.5', () => { + const { gas, oil } = parseGemPipelines(fixture); + for (const p of [...gas, ...oil]) { + assert.ok(p.evidence.classifierConfidence <= 0.5); + assert.ok(p.evidence.classifierConfidence >= 0); + } + }); + + test('every emitted candidate has empty sanctionRefs and null operatorStatement', () => { + const { gas, oil } = parseGemPipelines(fixture); + for (const p of [...gas, ...oil]) { + assert.deepEqual(p.evidence.sanctionRefs, []); + assert.equal(p.evidence.operatorStatement, null); + } + }); +}); + +describe('import-gem-pipelines — registry-shape conformance', () => { + test('emitted gas registry passes validateRegistry', () => { + // Build a synthetic registry of just the GEM-emitted gas rows; meets the + // validator's MIN_PIPELINES_PER_REGISTRY=8 floor by repeating the 3 fixture + // rows so we exercise the schema, not the count. + const { gas } = parseGemPipelines(fixture); + const repeated = []; + for (let i = 0; i < 3; i++) { + for (const p of gas) repeated.push({ ...p, id: `${p.id}-rep${i}` }); + } + const reg = { + pipelines: Object.fromEntries(repeated.map((p) => [p.id, p])), + }; + assert.equal(validateRegistry(reg), true); + }); + + test('emitted oil registry passes validateRegistry', () => { + const { oil } = parseGemPipelines(fixture); + const repeated = []; + for (let i = 0; i < 3; i++) { + for (const p of oil) repeated.push({ ...p, id: `${p.id}-rep${i}` }); + } + const reg = { + pipelines: Object.fromEntries(repeated.map((p) => [p.id, p])), + }; + assert.equal(validateRegistry(reg), true); + }); +}); + +describe('import-gem-pipelines — determinism (review-fix #3)', () => { + test('two parser runs on identical input produce identical output', () => { + // Regression: pre-fix, lastEvidenceUpdate used new Date() per run, so + // re-running parseGemPipelines on the same JSON on different days + // produced different output → noisy diffs every quarterly re-import. + // Now derived from envelope.downloadedAt, so output is byte-identical. + const r1 = JSON.stringify(parseGemPipelines(fixture)); + const r2 = JSON.stringify(parseGemPipelines(fixture)); + assert.equal(r1, r2); + }); + + test('lastEvidenceUpdate derives from envelope.downloadedAt', () => { + // Fixture has downloadedAt: 2026-04-25 → emitted as 2026-04-25T00:00:00Z. + const { gas } = parseGemPipelines(fixture); + for (const p of gas) { + assert.equal(p.evidence.lastEvidenceUpdate, '2026-04-25T00:00:00Z'); + } + }); + + test('missing downloadedAt → epoch sentinel (loud failure, not silent today)', () => { + // If the operator forgets the date field, the emitted timestamp should + // be obviously wrong rather than today's wall clock — surfaces the + // gap in code review of the data file. + const noDate = { ...fixture }; + delete noDate.downloadedAt; + delete noDate.sourceVersion; + const { gas } = parseGemPipelines(noDate); + for (const p of gas) { + assert.equal(p.evidence.lastEvidenceUpdate, '1970-01-01T00:00:00Z'); + } + }); +}); + +describe('import-gem-pipelines — coordinate validity', () => { + test('rows with invalid lat/lon are dropped (not silently kept with lat=0)', () => { + const broken = { + ...fixture, + pipelines: [ + ...fixture.pipelines, + { + name: 'Test Bad Coords', + operator: 'X', + fuel: 'Natural Gas', + product: '', + fromCountry: 'XX', + toCountry: 'YY', + transitCountries: [], + capacity: 5, + capacityUnit: 'bcm/y', + lengthKm: 100, + status: 'Operating', + startYear: 2020, + startLat: 200, // out of range + startLon: 0, + endLat: 0, + endLon: 0, + }, + ], + }; + const { gas } = parseGemPipelines(broken); + const bad = gas.find((p) => p.name.includes('Bad Coords')); + assert.equal(bad, undefined, 'row with out-of-range lat must be dropped, not coerced'); + }); +}); diff --git a/tests/pipeline-dedup.test.mjs b/tests/pipeline-dedup.test.mjs new file mode 100644 index 000000000..31fdfd377 --- /dev/null +++ b/tests/pipeline-dedup.test.mjs @@ -0,0 +1,242 @@ +// @ts-check +// +// Tests for scripts/_pipeline-dedup.mjs — the haversine + Jaccard dedup +// helper. Both criteria (≤5km AND ≥0.6) must hold for a match. Existing rows +// always win to preserve hand-curated evidence. + +import { strict as assert } from 'node:assert'; +import { test, describe } from 'node:test'; +import { dedupePipelines, _internal } from '../scripts/_pipeline-dedup.mjs'; + +const { jaccard, averageEndpointDistanceKm, tokenize, uniqueId } = _internal; + +function makePipeline(id, name, startLat, startLon, endLat, endLon) { + return { + id, + name, + startPoint: { lat: startLat, lon: startLon }, + endPoint: { lat: endLat, lon: endLon }, + }; +} + +describe('pipeline-dedup — internal helpers', () => { + test('tokenize lowercases, splits, drops stopwords', () => { + const tokens = tokenize('Trans-Siberian Pipeline System'); + assert.deepEqual(tokens.sort(), ['siberian', 'trans']); + }); + + test('tokenize removes punctuation and accents', () => { + const tokens = tokenize('Caño Limón–Coveñas Pipeline'); + // After NFKD normalization + ascii-only filter, accented chars survive + // as their base letter; we accept either exact or close behaviour. + assert.ok(tokens.includes('limon') || tokens.includes('lim'), + `expected Limón to tokenize; got ${tokens.join(',')}`); + }); + + test('jaccard returns 1.0 for identical token sets', () => { + assert.equal(jaccard('Test Pipeline System', 'Test Pipeline'), 1.0); + }); + + test('jaccard returns 0 for fully disjoint names', () => { + assert.equal(jaccard('Druzhba North', 'Nord Stream'), 0); + }); + + test('jaccard 0.5 for half-overlap', () => { + assert.equal(jaccard('Trans Adriatic', 'Trans Caspian'), 1 / 3); + }); + + test('haversine distance is symmetric', () => { + const a = makePipeline('a', 'A', 60, 30, 54, 13); + const b = makePipeline('b', 'B', 60.001, 30.001, 54.001, 13.001); + assert.ok(averageEndpointDistanceKm(a, b) < 1, 'sub-km on tiny offsets'); + }); + + test('haversine distance for far-apart pipelines is large', () => { + const a = makePipeline('a', 'A', 60, 30, 54, 13); // RU→DE + const b = makePipeline('b', 'B', 30, -90, 25, -85); // Gulf of Mexico + assert.ok(averageEndpointDistanceKm(a, b) > 5000); + }); + + test('uniqueId preserves base when free, suffixes when taken', () => { + const taken = new Set(['foo', 'foo-2']); + assert.equal(uniqueId('bar', taken), 'bar'); + assert.equal(uniqueId('foo', taken), 'foo-3'); + }); +}); + +describe('pipeline-dedup — match logic', () => { + test('happy path: completely-different name + far endpoints → added', () => { + const existing = [makePipeline('druzhba-north', 'Druzhba Pipeline (Northern Branch)', + 52.6, 49.4, 52.32, 14.06)]; + const candidates = [makePipeline('nord-stream-1', 'Nord Stream 1', + 60.08, 29.05, 54.14, 13.66)]; + const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates); + assert.equal(toAdd.length, 1); + assert.equal(skippedDuplicates.length, 0); + }); + + test('match by both criteria: close endpoints + similar name → skipped (existing wins)', () => { + const existing = [makePipeline('druzhba-north', 'Druzhba Pipeline', + 52.6, 49.4, 52.32, 14.06)]; + const candidates = [makePipeline('druzhba-import', 'Druzhba Pipeline', + 52.601, 49.401, 52.321, 14.061)]; + const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates); + assert.equal(toAdd.length, 0); + assert.equal(skippedDuplicates.length, 1); + assert.equal(skippedDuplicates[0].matchedExistingId, 'druzhba-north'); + }); + + test('name-match only (endpoints in different ocean) → added', () => { + const existing = [makePipeline('nord-stream-1', 'Nord Stream 1', + 60.08, 29.05, 54.14, 13.66)]; + const candidates = [makePipeline('imposter', 'Nord Stream 1', + 40.0, -100.0, 35.0, -90.0)]; // different continent + const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates); + assert.equal(toAdd.length, 1, 'low haversine confidence overrides high name match'); + assert.equal(skippedDuplicates.length, 0); + }); + + test('endpoint-match only (different name) → added (real distinct pipelines can share endpoints)', () => { + const existing = [makePipeline('yamal-europe', 'Yamal–Europe', + 67.0, 75.0, 52.0, 14.0)]; + const candidates = [makePipeline('different-route', 'Trans-Siberian Coal Slurry', + 67.001, 75.001, 52.001, 14.001)]; + const { toAdd } = dedupePipelines(existing, candidates); + assert.equal(toAdd.length, 1, 'name disambiguates: same endpoints, different infrastructure'); + }); + + test('reverse-direction match: candidate endpoints flipped → still detected', () => { + const existing = [makePipeline('druzhba', 'Druzhba', + 52.6, 49.4, 52.32, 14.06)]; + // Same pipeline, route described in reverse direction + const candidates = [makePipeline('druzhba-flipped', 'Druzhba', + 52.32, 14.06, 52.6, 49.4)]; + const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates); + assert.equal(toAdd.length, 0); + assert.equal(skippedDuplicates.length, 1); + }); + + test('stopword-only difference: "Pipeline System" vs "Line" → matches by Jaccard', () => { + const existing = [makePipeline('trans-sib', 'Trans-Siberian Pipeline System', + 55, 30, 60, 90)]; + const candidates = [makePipeline('trans-sib-cand', 'Trans-Siberian Line', + 55.001, 30.001, 60.001, 90.001)]; + const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates); + assert.equal(toAdd.length, 0); + assert.equal(skippedDuplicates.length, 1); + assert.ok(skippedDuplicates[0].jaccard >= 0.6); + }); +}); + +describe('pipeline-dedup — id collision', () => { + test('candidate with id colliding existing gets suffixed -2', () => { + const existing = [makePipeline('foo', 'Foo Pipeline', 0, 0, 1, 1)]; + const candidates = [makePipeline('foo', 'Bar Pipeline', 50, 50, 60, 60)]; + const { toAdd } = dedupePipelines(existing, candidates); + assert.equal(toAdd.length, 1); + assert.equal(toAdd[0].id, 'foo-2'); + }); + + test('three candidates colliding the same existing id get -2, -3, -4', () => { + const existing = [makePipeline('foo', 'Foo Pipeline', 0, 0, 1, 1)]; + const candidates = [ + makePipeline('foo', 'Bar Pipeline', 50, 50, 60, 60), + makePipeline('foo', 'Baz Pipeline', 70, 70, 80, 80), + makePipeline('foo', 'Qux Pipeline', 30, -30, 40, -40), + ]; + const { toAdd } = dedupePipelines(existing, candidates); + assert.equal(toAdd.length, 3); + assert.deepEqual( + toAdd.map((p) => p.id).sort(), + ['foo-2', 'foo-3', 'foo-4'], + ); + }); +}); + +describe('pipeline-dedup — determinism', () => { + test('two invocations on identical inputs produce identical output', () => { + const existing = [ + makePipeline('a', 'Alpha Pipeline', 10, 10, 20, 20), + makePipeline('b', 'Beta Pipeline', 30, 30, 40, 40), + ]; + const candidates = [ + makePipeline('a', 'Alpha Pipeline', 10.001, 10.001, 20.001, 20.001), + makePipeline('c', 'Gamma Pipeline', 50, 50, 60, 60), + ]; + const r1 = dedupePipelines(existing, candidates); + const r2 = dedupePipelines(existing, candidates); + assert.deepEqual( + r1.toAdd.map((p) => p.id), + r2.toAdd.map((p) => p.id), + ); + assert.deepEqual( + r1.skippedDuplicates.map((d) => d.matchedExistingId), + r2.skippedDuplicates.map((d) => d.matchedExistingId), + ); + }); +}); + +describe('pipeline-dedup — within-batch dedup (review fix)', () => { + test('two candidates that match each other but not any existing → only first is added', () => { + // Regression: pre-fix, dedup compared each candidate ONLY against the + // original `existing` array, so two GEM rows for the same pipeline (e.g. + // a primary entry and a duplicate from a different source spreadsheet) + // would BOTH end up in the registry. + const candidates = [ + makePipeline('east-west-saudi', 'East-West Crude Pipeline', 25, 49, 24, 38), + // Same pipeline, slightly different name + endpoints (within match + // tolerance). Should be skipped as a duplicate of the first candidate. + makePipeline('saudi-petroline', 'East-West Crude', 25.001, 49.001, 24.001, 38.001), + ]; + const { toAdd, skippedDuplicates } = dedupePipelines([], candidates); + assert.equal(toAdd.length, 1, 'second matching candidate must be skipped'); + assert.equal(skippedDuplicates.length, 1); + assert.equal(toAdd[0].id, 'east-west-saudi', 'first-accepted candidate wins (deterministic)'); + assert.equal(skippedDuplicates[0].matchedExistingId, 'east-west-saudi', + 'skipped candidate matches the earlier-accepted one, not anything in `existing`'); + }); + + test('three candidates with transitive matches collapse to one', () => { + const candidates = [ + makePipeline('a', 'Druzhba', 52.6, 49.4, 52.32, 14.06), + makePipeline('b', 'Druzhba Pipeline', 52.601, 49.401, 52.321, 14.061), + makePipeline('c', 'Druzhba Line', 52.602, 49.402, 52.322, 14.062), + ]; + const { toAdd } = dedupePipelines([], candidates); + assert.equal(toAdd.length, 1, 'three matching candidates must collapse to the first one accepted'); + }); + + test('existing wins over already-accepted candidate', () => { + // If a candidate matches an existing row, it must be reported as + // matching the existing row (existing-vs-toAdd precedence). Names + // chosen so Jaccard exceeds 0.6 after stopword removal. + const existing = [makePipeline('canon', 'Druzhba Northern', 52.6, 49.4, 52.32, 14.06)]; + const candidates = [ + makePipeline('cand-1', 'Druzhba Northern', 60, 30, 50, 14), // doesn't match existing (far endpoints) + makePipeline('cand-2', 'Druzhba Northern', 52.601, 49.401, 52.321, 14.061), // matches existing (near + Jaccard=1) + ]; + const { toAdd, skippedDuplicates } = dedupePipelines(existing, candidates); + assert.equal(toAdd.length, 1, 'cand-1 added; cand-2 skipped against existing'); + assert.equal(skippedDuplicates[0].matchedExistingId, 'canon', + 'cand-2 should be reported as matching the existing canon, not the earlier candidate'); + }); +}); + +describe('pipeline-dedup — empty inputs', () => { + test('empty existing + N candidates → all N added, none skipped', () => { + const candidates = [ + makePipeline('a', 'A', 0, 0, 1, 1), + makePipeline('b', 'B', 5, 5, 6, 6), + ]; + const { toAdd, skippedDuplicates } = dedupePipelines([], candidates); + assert.equal(toAdd.length, 2); + assert.equal(skippedDuplicates.length, 0); + }); + + test('N existing + empty candidates → empty result', () => { + const existing = [makePipeline('a', 'A', 0, 0, 1, 1)]; + const { toAdd, skippedDuplicates } = dedupePipelines(existing, []); + assert.equal(toAdd.length, 0); + assert.equal(skippedDuplicates.length, 0); + }); +}); diff --git a/tests/pipelines-registry.test.mts b/tests/pipelines-registry.test.mts index 122a91934..dd47e9e2f 100644 --- a/tests/pipelines-registry.test.mts +++ b/tests/pipelines-registry.test.mts @@ -11,6 +11,7 @@ import { GAS_CANONICAL_KEY, OIL_CANONICAL_KEY, VALID_OIL_PRODUCT_CLASSES, + VALID_SOURCES, } from '../scripts/_pipeline-registry.mjs'; const __dirname = dirname(fileURLToPath(import.meta.url)); @@ -216,3 +217,87 @@ describe('pipeline registries — validateRegistry rejects bad input', () => { assert.equal(validateRegistry(bad), false); }); }); + +describe('pipeline registries — GEM source enum', () => { + test('VALID_SOURCES exported and includes the existing six members plus gem', () => { + // Same source-of-truth pattern as VALID_OIL_PRODUCT_CLASSES (PR #3383): + // export the Set so future tests can't drift from the validator. + assert.ok(VALID_SOURCES.has('operator')); + assert.ok(VALID_SOURCES.has('regulator')); + assert.ok(VALID_SOURCES.has('press')); + assert.ok(VALID_SOURCES.has('satellite')); + assert.ok(VALID_SOURCES.has('ais-relay')); + assert.ok(VALID_SOURCES.has('gem')); + }); + + test('validateRegistry accepts GEM-sourced minimum-viable evidence (state=unknown)', () => { + // GEM rows ship as state=unknown until classifier promotes them. + // physicalStateSource='gem' is sufficient evidence per the audit. + const gasSample = gas.pipelines[Object.keys(gas.pipelines)[0]!]; + const good = { + pipelines: Object.fromEntries( + Array.from({ length: 8 }, (_, i) => [`p${i}`, { + ...gasSample, + id: `p${i}`, + evidence: { + physicalState: 'unknown', + physicalStateSource: 'gem', + commercialState: 'unknown', + operatorStatement: null, + sanctionRefs: [], + classifierVersion: 'gem-import-v1', + classifierConfidence: 0.4, + lastEvidenceUpdate: '2026-04-25T00:00:00Z', + }, + }]) + ), + }; + assert.equal(validateRegistry(good), true); + }); + + test('validateRegistry accepts GEM-sourced offline row (state=offline + only source=gem)', () => { + // Per plan U1 audit: 'gem' is evidence-bearing for non-flowing badges, + // parity with press/satellite/ais-relay. An offline row with no operator + // statement and no sanctionRefs but physicalStateSource='gem' should pass + // validation (the public-badge derivation downstream will then map it + // to "disputed" via the external-signal rule). + const gasSample = gas.pipelines[Object.keys(gas.pipelines)[0]!]; + const good = { + pipelines: Object.fromEntries( + Array.from({ length: 8 }, (_, i) => [`p${i}`, { + ...gasSample, + id: `p${i}`, + evidence: { + physicalState: 'offline', + physicalStateSource: 'gem', + commercialState: 'unknown', + operatorStatement: null, + sanctionRefs: [], + classifierVersion: 'gem-import-v1', + classifierConfidence: 0.4, + lastEvidenceUpdate: '2026-04-25T00:00:00Z', + }, + }]) + ), + }; + assert.equal(validateRegistry(good), true); + }); + + test('validateRegistry still rejects unknown physicalStateSource values', () => { + // Adding 'gem' must not loosen the enum — unknown sources still fail. + const gasSample = gas.pipelines[Object.keys(gas.pipelines)[0]!]; + const bad = { + pipelines: Object.fromEntries( + Array.from({ length: 8 }, (_, i) => [`p${i}`, { + ...gasSample, + id: `p${i}`, + evidence: { + ...gasSample.evidence, + physicalStateSource: 'rumor', + }, + }]) + ), + }; + assert.equal(validateRegistry(bad), false); + }); +});