From 5c1fcc18fe8598d9419d81ab80d63d32799049b2 Mon Sep 17 00:00:00 2001 From: Elie Habib Date: Sun, 12 Apr 2026 20:29:50 +0400 Subject: [PATCH] fix(supply-chain): flow-weighted per-sector chokepoint exposure (#3017) * fix(supply-chain): flow-weighted per-sector chokepoint exposure (#2968) COUNTRY_PORT_CLUSTERS is country-level, producing identical scores for all sectors. Replace with flow-weighted model that reads bilateral HS4 trade data (exporter shares) to differentiate sector exposure by actual supplier routing through chokepoints. * fix(chokepoint): short-cache fallback when bilateral data is transiently unavailable loadBilateralProducts() now returns { products, transient } instead of just products|null. When bilateral HS4 data is unavailable due to a transient state (lazy fetch in-flight or Comtrade 429), the fallback exposures are cached for 60s instead of 24h. This prevents nine sectors from being stuck on country-level fallback scores for a full day when the bilateral payload arrives moments after the first lazy fetch. Replaced cachedFetchJson with manual getCachedJson + setCachedJson to vary the TTL based on whether bilateral data was available. * fix(lazy-hs4): distinguish 429 from server errors, fix raw key writes Two fixes in _bilateral-hs4-lazy.ts: 1. fetchComtradeBilateral now returns { products, rateLimited, serverError } instead of null for all non-2xx. Transient 500/503 no longer writes a 24h rateLimited sentinel, allowing immediate retry on next request. Only real 429s write the sentinel. 2. All setCachedJson calls now pass raw=true to match the raw=true reads. Seeds write these keys unprefixed; without raw=true on writes, lazy-fetch results were invisible to readers in prefixed environments. * fix(chokepoint): treat lazy server-error path as transient comtradeSource 'lazy' with empty products is the upstream 500/timeout path from _bilateral-hs4-lazy.ts. Without including it in the transient check, fallback exposures were still cached for 24h on server errors. --- .../supply-chain/v1/_bilateral-hs4-lazy.ts | 38 ++-- .../v1/chokepoint-exposure-utils.ts | 127 ++++++++++++ .../v1/get-country-chokepoint-index.ts | 153 +++++++------- tests/country-chokepoint-index.test.mts | 195 ++++++++++++++++++ 4 files changed, 420 insertions(+), 93 deletions(-) create mode 100644 server/worldmonitor/supply-chain/v1/chokepoint-exposure-utils.ts create mode 100644 tests/country-chokepoint-index.test.mts diff --git a/server/worldmonitor/supply-chain/v1/_bilateral-hs4-lazy.ts b/server/worldmonitor/supply-chain/v1/_bilateral-hs4-lazy.ts index 65bed1af7..33681a9e5 100644 --- a/server/worldmonitor/supply-chain/v1/_bilateral-hs4-lazy.ts +++ b/server/worldmonitor/supply-chain/v1/_bilateral-hs4-lazy.ts @@ -120,7 +120,13 @@ function groupByProduct(records: ParsedRecord[]): CountryProduct[] { return products; } -async function fetchComtradeBilateral(reporterCode: string): Promise { +interface ComtradeResult { + products: CountryProduct[]; + rateLimited: boolean; + serverError: boolean; +} + +async function fetchComtradeBilateral(reporterCode: string): Promise { const url = new URL(COMTRADE_BASE); url.searchParams.set('reporterCode', reporterCode); url.searchParams.set('cmdCode', HS4_CODES.join(',')); @@ -131,12 +137,12 @@ async function fetchComtradeBilateral(reporterCode: string): Promise= 500 }; const data = await resp.json(); const records = parseRecords(data); - return groupByProduct(records); + return { products: groupByProduct(records), rateLimited: false, serverError: false }; } export interface LazyFetchResult { @@ -167,27 +173,33 @@ export async function lazyFetchBilateralHs4(iso2: string): Promise; + +export function getRouteIdsForCountry(iso2: string): string[] { + return clusters[iso2]?.nearestRouteIds ?? []; +} + +export function getCoastSide(iso2: string): string { + return clusters[iso2]?.coastSide ?? 'unknown'; +} + +export function hs4ToHs2(hs4: string): string { + return String(Number.parseInt(hs4.slice(0, 2), 10)); +} + +export function computeFlowWeightedExposures( + importerIso2: string, + hs2: string, + products: CountryProduct[], +): ExposureEntry[] { + const isEnergy = hs2 === '27'; + const normalizedHs2 = String(Number.parseInt(hs2, 10)); + const matchingProducts = products.filter(p => hs4ToHs2(p.hs4) === normalizedHs2); + + if (matchingProducts.length === 0) return []; + + const importerRoutes = new Set(getRouteIdsForCountry(importerIso2)); + const totalSectorValue = matchingProducts.reduce((s, p) => s + p.totalValue, 0); + + const cpScores = new Map(); + for (const cp of CHOKEPOINT_REGISTRY) cpScores.set(cp.id, 0); + + for (const product of matchingProducts) { + const productWeight = totalSectorValue > 0 ? product.totalValue / totalSectorValue : 0; + + for (const exporter of product.topExporters) { + if (!exporter.partnerIso2) continue; + const exporterRoutes = new Set(getRouteIdsForCountry(exporter.partnerIso2)); + + for (const cp of CHOKEPOINT_REGISTRY) { + const cpRoutes = cp.routeIds; + let overlap = 0; + for (const r of cpRoutes) { + if (importerRoutes.has(r) || exporterRoutes.has(r)) overlap++; + } + const routeCoverage = overlap / Math.max(cpRoutes.length, 1); + const contribution = routeCoverage * exporter.share * productWeight * 100; + cpScores.set(cp.id, (cpScores.get(cp.id) ?? 0) + contribution); + } + } + } + + const entries: ExposureEntry[] = CHOKEPOINT_REGISTRY.map(cp => { + let score = cpScores.get(cp.id) ?? 0; + if (isEnergy && cp.shockModelSupported) score = Math.min(score * 1.5, 100); + score = Math.min(score, 100); + return { + chokepointId: cp.id, + chokepointName: cp.displayName, + exposureScore: Math.round(score * 10) / 10, + coastSide: '', + shockSupported: cp.shockModelSupported, + }; + }); + + return entries.sort((a, b) => b.exposureScore - a.exposureScore); +} + +export function computeFallbackExposures( + nearestRouteIds: string[], + hs2: string, +): ExposureEntry[] { + const isEnergy = hs2 === '27'; + const routeSet = new Set(nearestRouteIds); + + const entries: ExposureEntry[] = CHOKEPOINT_REGISTRY.map(cp => { + const overlap = cp.routeIds.filter(r => routeSet.has(r)).length; + const maxRoutes = Math.max(cp.routeIds.length, 1); + let score = (overlap / maxRoutes) * 100; + if (isEnergy && cp.shockModelSupported) score = Math.min(score * 1.5, 100); + return { + chokepointId: cp.id, + chokepointName: cp.displayName, + exposureScore: Math.round(score * 10) / 10, + coastSide: '', + shockSupported: cp.shockModelSupported, + }; + }); + + return entries.sort((a, b) => b.exposureScore - a.exposureScore); +} + +export function vulnerabilityIndex(sorted: ExposureEntry[]): number { + const weights = [0.5, 0.3, 0.2]; + const total = sorted.slice(0, 3).reduce((sum, e, i) => sum + e.exposureScore * weights[i]!, 0); + return Math.round(total * 10) / 10; +} diff --git a/server/worldmonitor/supply-chain/v1/get-country-chokepoint-index.ts b/server/worldmonitor/supply-chain/v1/get-country-chokepoint-index.ts index 084f1bb22..3ac1d4856 100644 --- a/server/worldmonitor/supply-chain/v1/get-country-chokepoint-index.ts +++ b/server/worldmonitor/supply-chain/v1/get-country-chokepoint-index.ts @@ -2,51 +2,60 @@ import type { ServerContext, GetCountryChokepointIndexRequest, GetCountryChokepointIndexResponse, - ChokepointExposureEntry, } from '../../../../src/generated/server/worldmonitor/supply_chain/v1/service_server'; -import { cachedFetchJson } from '../../../_shared/redis'; +import { getCachedJson, setCachedJson } from '../../../_shared/redis'; import { isCallerPremium } from '../../../_shared/premium-check'; import { CHOKEPOINT_EXPOSURE_KEY } from '../../../_shared/cache-keys'; -import { CHOKEPOINT_REGISTRY } from '../../../../src/config/chokepoint-registry'; -import COUNTRY_PORT_CLUSTERS from '../../../../scripts/shared/country-port-clusters.json'; +import { lazyFetchBilateralHs4 } from './_bilateral-hs4-lazy'; +import { + computeFlowWeightedExposures, + computeFallbackExposures, + vulnerabilityIndex, + getRouteIdsForCountry, + getCoastSide, + type CountryProduct, +} from './chokepoint-exposure-utils'; const CACHE_TTL = 86400; // 24 hours +const TRANSIENT_CACHE_TTL = 60; // 60s when bilateral data is still loading -interface PortClusterEntry { - nearestRouteIds: string[]; - coastSide: string; +interface BilateralHs4Payload { + iso2: string; + products: CountryProduct[]; + fetchedAt: string; } -function computeExposures( - nearestRouteIds: string[], - hs2: string, -): ChokepointExposureEntry[] { - const isEnergy = hs2 === '27'; - const routeSet = new Set(nearestRouteIds); - - const entries: ChokepointExposureEntry[] = CHOKEPOINT_REGISTRY.map(cp => { - const overlap = cp.routeIds.filter(r => routeSet.has(r)).length; - const maxRoutes = Math.max(cp.routeIds.length, 1); - let score = (overlap / maxRoutes) * 100; - // Energy sector: boost shock-model chokepoints by 50% (oil + LNG dependency) - if (isEnergy && cp.shockModelSupported) score = Math.min(score * 1.5, 100); - return { - chokepointId: cp.id, - chokepointName: cp.displayName, - exposureScore: Math.round(score * 10) / 10, - coastSide: '', - shockSupported: cp.shockModelSupported, - }; - }); - - return entries.sort((a, b) => b.exposureScore - a.exposureScore); +interface BilateralResult { + products: CountryProduct[] | null; + transient: boolean; } -function vulnerabilityIndex(sorted: ChokepointExposureEntry[]): number { - const weights = [0.5, 0.3, 0.2]; - const total = sorted.slice(0, 3).reduce((sum, e, i) => sum + e.exposureScore * weights[i]!, 0); - return Math.round(total * 10) / 10; +async function loadBilateralProducts(iso2: string): Promise { + const bilateralKey = `comtrade:bilateral-hs4:${iso2}:v1`; + const rawPayload = await getCachedJson(bilateralKey, true).catch(() => null) as BilateralHs4Payload | null; + if (rawPayload?.products?.length) return { products: rawPayload.products, transient: false }; + + const lazyResult = await lazyFetchBilateralHs4(iso2); + if (lazyResult && lazyResult.products.length > 0) return { products: lazyResult.products, transient: false }; + + // Transient states: null = in-flight concurrent fetch, rateLimited = 429, + // comtradeSource 'lazy' with no products = upstream server error / timeout + const isTransient = lazyResult === null + || lazyResult.rateLimited === true + || (lazyResult.comtradeSource === 'lazy' && lazyResult.products.length === 0); + return { products: null, transient: isTransient }; +} + +function emptyResponse(iso2: string, hs2: string): GetCountryChokepointIndexResponse { + return { + iso2, + hs2, + exposures: [], + primaryChokepointId: '', + vulnerabilityIndex: 0, + fetchedAt: new Date().toISOString(), + }; } export async function getCountryChokepointIndex( @@ -54,70 +63,54 @@ export async function getCountryChokepointIndex( req: GetCountryChokepointIndexRequest, ): Promise { const isPro = await isCallerPremium(ctx.request); - if (!isPro) { - return { - iso2: req.iso2, - hs2: req.hs2 || '27', - exposures: [], - primaryChokepointId: '', - vulnerabilityIndex: 0, - fetchedAt: new Date().toISOString(), - }; - } + if (!isPro) return emptyResponse(req.iso2, req.hs2 || '27'); const iso2 = req.iso2.trim().toUpperCase(); const hs2 = (req.hs2?.trim() || '27').replace(/\D/g, '') || '27'; if (!/^[A-Z]{2}$/.test(iso2) || !/^\d{1,2}$/.test(hs2)) { - return { iso2: req.iso2, hs2: req.hs2 || '27', exposures: [], primaryChokepointId: '', vulnerabilityIndex: 0, fetchedAt: new Date().toISOString() }; + return emptyResponse(req.iso2, req.hs2 || '27'); } const cacheKey = CHOKEPOINT_EXPOSURE_KEY(iso2, hs2); try { - const result = await cachedFetchJson( - cacheKey, - CACHE_TTL, - async () => { - const clusters = COUNTRY_PORT_CLUSTERS as unknown as Record; - const cluster = clusters[iso2]; - const nearestRouteIds = cluster?.nearestRouteIds ?? []; - const coastSide = cluster?.coastSide ?? 'unknown'; + const cached = await getCachedJson(cacheKey) as GetCountryChokepointIndexResponse | null; + if (cached) return cached; - const exposures = computeExposures(nearestRouteIds, hs2); - // Attach coastSide only to the top entry - if (exposures[0]) exposures[0] = { ...exposures[0], coastSide }; + const { products, transient } = await loadBilateralProducts(iso2); - const primaryId = exposures[0]?.chokepointId ?? ''; - const vulnIndex = vulnerabilityIndex(exposures); + let exposures; + if (products) { + exposures = computeFlowWeightedExposures(iso2, hs2, products); + } else { + exposures = computeFallbackExposures(getRouteIdsForCountry(iso2), hs2); + } - return { - iso2, - hs2, - exposures, - primaryChokepointId: primaryId, - vulnerabilityIndex: vulnIndex, - fetchedAt: new Date().toISOString(), - }; - }, - ); + if (exposures.length === 0) { + exposures = computeFallbackExposures(getRouteIdsForCountry(iso2), hs2); + } - return result ?? { + const coastSide = getCoastSide(iso2); + if (exposures[0]) exposures[0] = { ...exposures[0], coastSide }; + + const primaryId = exposures[0]?.chokepointId ?? ''; + const vulnIndex = vulnerabilityIndex(exposures); + + const result: GetCountryChokepointIndexResponse = { iso2, hs2, - exposures: [], - primaryChokepointId: '', - vulnerabilityIndex: 0, + exposures, + primaryChokepointId: primaryId, + vulnerabilityIndex: vulnIndex, fetchedAt: new Date().toISOString(), }; + + const ttl = transient ? TRANSIENT_CACHE_TTL : CACHE_TTL; + await setCachedJson(cacheKey, result, ttl); + + return result; } catch { - return { - iso2, - hs2, - exposures: [], - primaryChokepointId: '', - vulnerabilityIndex: 0, - fetchedAt: new Date().toISOString(), - }; + return emptyResponse(iso2, hs2); } } diff --git a/tests/country-chokepoint-index.test.mts b/tests/country-chokepoint-index.test.mts new file mode 100644 index 000000000..a40ecc985 --- /dev/null +++ b/tests/country-chokepoint-index.test.mts @@ -0,0 +1,195 @@ +import { describe, it } from 'node:test'; +import assert from 'node:assert/strict'; +import { + computeFlowWeightedExposures, + computeFallbackExposures, + vulnerabilityIndex, + type CountryProduct, + type ExposureEntry, +} from '../server/worldmonitor/supply-chain/v1/chokepoint-exposure-utils.js'; + +function makeProduct(hs4: string, exporterIso2: string, share: number, value = 1_000_000): CountryProduct { + return { + hs4, + description: `Product ${hs4}`, + totalValue: value, + topExporters: [{ partnerCode: 0, partnerIso2: exporterIso2, value, share }], + year: 2024, + }; +} + +function scoreMap(entries: ExposureEntry[]): Map { + return new Map(entries.map(e => [e.chokepointId, e.exposureScore])); +} + +describe('Flow-weighted chokepoint exposure (#2968)', () => { + describe('Turkey (TR)', () => { + it('Energy (HS2=27) from SA scores differently than Pharma (HS2=30) from DE', () => { + const energyProducts = [makeProduct('2709', 'SA', 0.6), makeProduct('2711', 'SA', 0.4)]; + const pharmaProducts = [makeProduct('3004', 'DE', 0.5), makeProduct('3004', 'FR', 0.3)]; + + const energyExposures = computeFlowWeightedExposures('TR', '27', energyProducts); + const pharmaExposures = computeFlowWeightedExposures('TR', '30', pharmaProducts); + + assert.ok(energyExposures.length > 0, 'energy exposures should not be empty'); + assert.ok(pharmaExposures.length > 0, 'pharma exposures should not be empty'); + + const energyScores = scoreMap(energyExposures); + const pharmaScores = scoreMap(pharmaExposures); + + let hasDifference = false; + for (const [cpId, eScore] of energyScores) { + const pScore = pharmaScores.get(cpId) ?? 0; + if (eScore !== pScore) { hasDifference = true; break; } + } + assert.ok(hasDifference, 'Energy and Pharma must have different chokepoint scores for Turkey'); + }); + + it('Energy from SA should have higher Hormuz exposure than Pharma from DE', () => { + const energyProducts = [makeProduct('2709', 'SA', 0.8)]; + const pharmaProducts = [makeProduct('3004', 'DE', 0.8)]; + + const energyScores = scoreMap(computeFlowWeightedExposures('TR', '27', energyProducts)); + const pharmaScores = scoreMap(computeFlowWeightedExposures('TR', '30', pharmaProducts)); + + const hormuzEnergy = energyScores.get('hormuz_strait') ?? 0; + const hormuzPharma = pharmaScores.get('hormuz_strait') ?? 0; + assert.ok( + hormuzEnergy > hormuzPharma, + `Hormuz energy (${hormuzEnergy}) should exceed Hormuz pharma (${hormuzPharma})`, + ); + }); + }); + + describe('United States (US)', () => { + it('Electronics (HS2=85) from CN scores differently than Vehicles (HS2=87) from DE', () => { + const electronicsProducts = [makeProduct('8542', 'CN', 0.6), makeProduct('8517', 'TW', 0.3)]; + const vehicleProducts = [makeProduct('8703', 'DE', 0.5), makeProduct('8708', 'JP', 0.3)]; + + const elecExposures = computeFlowWeightedExposures('US', '85', electronicsProducts); + const vehExposures = computeFlowWeightedExposures('US', '87', vehicleProducts); + + const elecScores = scoreMap(elecExposures); + const vehScores = scoreMap(vehExposures); + + let hasDifference = false; + for (const [cpId, eScore] of elecScores) { + const vScore = vehScores.get(cpId) ?? 0; + if (eScore !== vScore) { hasDifference = true; break; } + } + assert.ok(hasDifference, 'Electronics and Vehicles must have different scores for the US'); + }); + + it('Top chokepoints differ between Energy (SA/QA suppliers) and Electronics (CN/TW suppliers)', () => { + const energyProducts = [makeProduct('2709', 'SA', 0.5), makeProduct('2711', 'QA', 0.3)]; + const elecProducts = [makeProduct('8542', 'CN', 0.7), makeProduct('8517', 'TW', 0.2)]; + + const energyExposures = computeFlowWeightedExposures('US', '27', energyProducts); + const elecExposures = computeFlowWeightedExposures('US', '85', elecProducts); + + const energyTop = energyExposures[0]?.chokepointId; + const elecTop = elecExposures[0]?.chokepointId; + + assert.ok(energyTop, 'Energy should have a top chokepoint'); + assert.ok(elecTop, 'Electronics should have a top chokepoint'); + assert.notEqual(energyTop, elecTop, `Top chokepoint should differ: energy=${energyTop}, elec=${elecTop}`); + }); + }); + + describe('Cross-country differentiation', () => { + const testCountries = ['TR', 'US', 'CN', 'DE', 'JP', 'IN', 'BR', 'GB', 'FR', 'SA']; + + it('At least 8 of 10 test countries produce differentiated Energy vs Pharma scores', () => { + let differentiated = 0; + for (const iso2 of testCountries) { + const energyProducts = [makeProduct('2709', 'SA', 0.5), makeProduct('2711', 'RU', 0.3)]; + const pharmaProducts = [makeProduct('3004', 'DE', 0.4), makeProduct('3004', 'IN', 0.3)]; + + const energyScores = scoreMap(computeFlowWeightedExposures(iso2, '27', energyProducts)); + const pharmaScores = scoreMap(computeFlowWeightedExposures(iso2, '30', pharmaProducts)); + + for (const [cpId, eScore] of energyScores) { + if (eScore !== (pharmaScores.get(cpId) ?? 0)) { differentiated++; break; } + } + } + assert.ok( + differentiated >= 8, + `Only ${differentiated}/10 countries showed differentiation (need ≥8)`, + ); + }); + }); + + describe('Edge cases', () => { + it('Empty products list returns empty exposures', () => { + const result = computeFlowWeightedExposures('TR', '27', []); + assert.equal(result.length, 0); + }); + + it('Products with no matching HS2 return empty exposures', () => { + const products = [makeProduct('8542', 'CN', 0.8)]; + const result = computeFlowWeightedExposures('TR', '27', products); + assert.equal(result.length, 0, 'HS2=27 should not match HS4=8542'); + }); + + it('Unknown exporter country falls back gracefully (no routes)', () => { + const products = [makeProduct('2709', 'ZZ', 1.0)]; + const result = computeFlowWeightedExposures('TR', '27', products); + assert.ok(result.length > 0, 'should still return entries even for unknown exporter'); + }); + + it('Scores are capped at 100', () => { + const heavyProducts = [ + makeProduct('2709', 'SA', 0.9, 10_000_000), + makeProduct('2710', 'SA', 0.9, 10_000_000), + makeProduct('2711', 'SA', 0.9, 10_000_000), + ]; + const result = computeFlowWeightedExposures('TR', '27', heavyProducts); + for (const e of result) { + assert.ok(e.exposureScore <= 100, `${e.chokepointId} scored ${e.exposureScore} > 100`); + } + }); + }); + + describe('Fallback scoring', () => { + it('Produces identical scores for different HS2 (except energy boost)', () => { + const routes = ['gulf-europe-oil', 'russia-med-oil']; + const pharma = computeFallbackExposures(routes, '30'); + const textiles = computeFallbackExposures(routes, '62'); + + const pharmaScores = scoreMap(pharma); + const textileScores = scoreMap(textiles); + + for (const [cpId, pScore] of pharmaScores) { + assert.equal(pScore, textileScores.get(cpId) ?? 0, `Fallback: ${cpId} should be identical across non-energy sectors`); + } + }); + + it('Energy boost differentiates HS2=27 from others in fallback mode', () => { + const routes = ['gulf-europe-oil', 'gulf-asia-oil']; + const energy = computeFallbackExposures(routes, '27'); + const pharma = computeFallbackExposures(routes, '30'); + + const energyScores = scoreMap(energy); + const pharmaScores = scoreMap(pharma); + + let hasDifference = false; + for (const [cpId, eScore] of energyScores) { + if (eScore !== (pharmaScores.get(cpId) ?? 0)) { hasDifference = true; break; } + } + assert.ok(hasDifference, 'Energy fallback should differ from non-energy due to 1.5x boost'); + }); + }); + + describe('Vulnerability index', () => { + it('Computes weighted average of top 3 scores', () => { + const entries: ExposureEntry[] = [ + { chokepointId: 'a', chokepointName: 'A', exposureScore: 100, coastSide: '', shockSupported: false }, + { chokepointId: 'b', chokepointName: 'B', exposureScore: 80, coastSide: '', shockSupported: false }, + { chokepointId: 'c', chokepointName: 'C', exposureScore: 60, coastSide: '', shockSupported: false }, + ]; + const result = vulnerabilityIndex(entries); + const expected = Math.round((100 * 0.5 + 80 * 0.3 + 60 * 0.2) * 10) / 10; + assert.equal(result, expected); + }); + }); +});