diff --git a/scripts/seed-comtrade-bilateral-hs4.mjs b/scripts/seed-comtrade-bilateral-hs4.mjs index 3c29442b6..b7fcd4d8d 100644 --- a/scripts/seed-comtrade-bilateral-hs4.mjs +++ b/scripts/seed-comtrade-bilateral-hs4.mjs @@ -112,44 +112,78 @@ async function redisPipeline(commands) { * @param {string[]} hs4Batch * @returns {Promise>} */ -async function fetchBilateral(reporterCode, hs4Batch) { +// Comtrade's API regularly returns transient 5xx (500/502/503/504) on otherwise +// valid reporter fetches — observed 2026-04-14 with India (699) 503×2 and +// Iran (364) 500. Without a 5xx retry those reporters silently drop from +// the snapshot and the panel shows missing countries for a full cycle. +export function isTransientComtrade(status) { + return status === 500 || status === 502 || status === 503 || status === 504; +} + +// Retry sleep is indirected through a module-local binding so unit tests can +// swap in a no-op without changing production cadence. Production defaults +// to the real sleep import; tests call __setSleepForTests(() => Promise.resolve()). +let _retrySleep = sleep; +export function __setSleepForTests(fn) { _retrySleep = typeof fn === 'function' ? fn : sleep; } + +async function fetchBilateralOnce(url, timeoutMs = 45_000) { + return fetch(url, { + headers: { 'User-Agent': CHROME_UA, Accept: 'application/json' }, + signal: AbortSignal.timeout(timeoutMs), + }); +} + +function buildFetchUrl(reporterCode, hs4Batch, key) { const url = new URL(COMTRADE_FETCH_URL); url.searchParams.set('reporterCode', reporterCode); url.searchParams.set('cmdCode', hs4Batch.join(',')); url.searchParams.set('flowCode', 'M'); - const key = getNextKey(); if (key) url.searchParams.set('subscription-key', key); + return url.toString(); +} - const resp = await fetch(url.toString(), { - headers: { 'User-Agent': CHROME_UA, Accept: 'application/json' }, - signal: AbortSignal.timeout(45_000), - }); +/** + * Single classification loop so a post-429 5xx still consumes the bounded + * 5xx retries (and vice versa). Caps: one 429 wait (60s), then up to two + * transient-5xx retries (5s, 15s). Any non-transient non-OK status exits. + * + * @param {string} reporterCode + * @param {string[]} hs4Batch + * @returns {Promise>} + */ +export async function fetchBilateral(reporterCode, hs4Batch) { + let rateLimitedOnce = false; + let transientRetries = 0; + const MAX_TRANSIENT_RETRIES = 2; - if (resp.status === 429) { - console.warn(` 429 rate-limited for reporter ${reporterCode}, waiting 60s...`); - await sleep(60_000); - const retryKey = getNextKey(); - const retryUrl = new URL(COMTRADE_FETCH_URL); - retryUrl.searchParams.set('reporterCode', reporterCode); - retryUrl.searchParams.set('cmdCode', hs4Batch.join(',')); - retryUrl.searchParams.set('flowCode', 'M'); - if (retryKey) retryUrl.searchParams.set('subscription-key', retryKey); - const retry = await fetch(retryUrl.toString(), { - headers: { 'User-Agent': CHROME_UA, Accept: 'application/json' }, - signal: AbortSignal.timeout(45_000), - }); - if (!retry.ok) { - console.warn(` Retry for reporter ${reporterCode} also failed (HTTP ${retry.status})`); - return []; + let resp; + while (true) { + resp = await fetchBilateralOnce(buildFetchUrl(reporterCode, hs4Batch, getNextKey())); + + if (resp.status === 429 && !rateLimitedOnce) { + console.warn(` 429 rate-limited for reporter ${reporterCode}, waiting 60s...`); + await _retrySleep(60_000); + rateLimitedOnce = true; + continue; } - const retryData = await retry.json(); - return parseRecords(retryData); + + if (isTransientComtrade(resp.status) && transientRetries < MAX_TRANSIENT_RETRIES) { + const delay = transientRetries === 0 ? 5_000 : 15_000; + console.warn(` transient HTTP ${resp.status} for reporter ${reporterCode}, retrying in ${delay / 1000}s...`); + await _retrySleep(delay); + transientRetries++; + continue; + } + + break; } if (!resp.ok) { - console.warn(` HTTP ${resp.status} for reporter ${reporterCode}`); + const tag = (rateLimitedOnce || transientRetries > 0) ? ' (after retries)' : ''; + console.warn(` HTTP ${resp.status} for reporter ${reporterCode}${tag}`); return []; } + const data = await resp.json(); const parsed = parseRecords(data); if (parsed.length === 0 && data?.count > 0) { diff --git a/scripts/seed-recovery-import-hhi.mjs b/scripts/seed-recovery-import-hhi.mjs index 7a67ad970..ee4c9647a 100644 --- a/scripts/seed-recovery-import-hhi.mjs +++ b/scripts/seed-recovery-import-hhi.mjs @@ -81,7 +81,20 @@ function parseRecords(data) { })); } -async function fetchImportsForReporter(reporterCode, apiKey) { +// Comtrade transient 5xx (500/502/503/504) must be retried or the reporter +// silently drops from the HHI calc. The seeder's resume cache picks up +// still-missing reporters on the next run, so we cap retries to keep the +// 30-min bundle budget viable. +export function isTransientComtrade(status) { + return status === 500 || status === 502 || status === 503 || status === 504; +} + +// Injectable sleep so unit tests can exercise the classification loop without +// real 15s/5s/10s waits. Production defaults to the real sleep. +let _retrySleep = sleep; +export function __setSleepForTests(fn) { _retrySleep = typeof fn === 'function' ? fn : sleep; } + +export async function fetchImportsForReporter(reporterCode, apiKey) { const url = new URL(COMTRADE_URL); url.searchParams.set('reporterCode', reporterCode); url.searchParams.set('flowCode', 'M'); @@ -89,22 +102,37 @@ async function fetchImportsForReporter(reporterCode, apiKey) { url.searchParams.set('period', `${new Date().getFullYear() - 1},${new Date().getFullYear() - 2}`); url.searchParams.set('subscription-key', apiKey); - const resp = await fetch(url.toString(), { - headers: { 'User-Agent': CHROME_UA, Accept: 'application/json' }, - signal: AbortSignal.timeout(45_000), - }); - - if (resp.status === 429) { - // Short backoff on 429 — 60s is too long when the overall bundle budget is tight. - // We only retry once; subsequent 429s count as a skip and the resume cache picks - // them up on the next run. - await sleep(15_000); - const retry = await fetch(url.toString(), { + async function once() { + return fetch(url.toString(), { headers: { 'User-Agent': CHROME_UA, Accept: 'application/json' }, signal: AbortSignal.timeout(45_000), }); - if (!retry.ok) return { records: [], status: retry.status }; - return { records: parseRecords(await retry.json()), status: retry.status }; + } + + // Single classification loop: 429 wait (15s — bundle budget is tight), then + // up to two transient-5xx retries (5s, 10s). Collapsed from branched retries + // so a post-429 5xx still consumes the bounded 5xx retries. + let rateLimitedOnce = false; + let transientRetries = 0; + const MAX_TRANSIENT_RETRIES = 2; + + let resp; + while (true) { + resp = await once(); + + if (resp.status === 429 && !rateLimitedOnce) { + await _retrySleep(15_000); + rateLimitedOnce = true; + continue; + } + + if (isTransientComtrade(resp.status) && transientRetries < MAX_TRANSIENT_RETRIES) { + await _retrySleep(transientRetries === 0 ? 5_000 : 10_000); + transientRetries++; + continue; + } + + break; } if (!resp.ok) return { records: [], status: resp.status }; diff --git a/scripts/seed-trade-flows.mjs b/scripts/seed-trade-flows.mjs index 8e4299223..a1e856341 100644 --- a/scripts/seed-trade-flows.mjs +++ b/scripts/seed-trade-flows.mjs @@ -8,14 +8,21 @@ loadEnvFile(import.meta.url); const CANONICAL_KEY = 'comtrade:flows:v1'; const CACHE_TTL = 259200; // 72h = 3× daily interval -const KEY_PREFIX = 'comtrade:flows'; +export const KEY_PREFIX = 'comtrade:flows'; const COMTRADE_BASE = 'https://comtradeapi.un.org/public/v1'; const INTER_REQUEST_DELAY_MS = 3_000; const ANOMALY_THRESHOLD = 0.30; // 30% YoY change // Require at least this fraction of (reporter × commodity) pairs to return // non-empty flows. Guards against an entire reporter silently flatlining // (e.g., wrong reporterCode → HTTP 200 with count:0 for every commodity). +// Global coverage floor — overall populated/total must be ≥ this. const MIN_COVERAGE_RATIO = 0.70; +// Per-reporter coverage floor — each reporter must have ≥ this fraction of +// its commodities populated. Prevents the "India/Taiwan flatlines entirely" +// failure mode: with 6 reporters × 5 commodities, losing one full reporter +// is only 5/30 missing (83% global coverage → passes MIN_COVERAGE_RATIO), +// but 0/5 per-reporter coverage for the dead one blocks publish here. +const MIN_PER_REPORTER_RATIO = 0.40; // at least 2 of 5 commodities per reporter // Strategic reporters: US, China, Russia, Iran, India, Taiwan const REPORTERS = [ @@ -36,16 +43,45 @@ const COMMODITIES = [ { code: '9301', desc: 'Arms / military equipment' }, ]; -async function fetchFlows(reporter, commodity) { +// Comtrade preview regularly hits transient 5xx (500/502/503/504). Without +// retry each (reporter,commodity) pair that drew a 5xx is silently lost. +export function isTransientComtrade(status) { + return status === 500 || status === 502 || status === 503 || status === 504; +} + +// Injectable sleep so unit tests can exercise the retry loop without real +// 5s/15s waits. Production defaults to the real sleep. +let _retrySleep = sleep; +export function __setSleepForTests(fn) { _retrySleep = typeof fn === 'function' ? fn : sleep; } + +export async function fetchFlows(reporter, commodity) { const url = new URL(`${COMTRADE_BASE}/preview/C/A/HS`); url.searchParams.set('reporterCode', reporter.code); url.searchParams.set('cmdCode', commodity.code); url.searchParams.set('flowCode', 'X,M'); // exports + imports - const resp = await fetch(url.toString(), { - headers: { 'User-Agent': CHROME_UA, Accept: 'application/json' }, - signal: AbortSignal.timeout(15_000), - }); + async function once() { + return fetch(url.toString(), { + headers: { 'User-Agent': CHROME_UA, Accept: 'application/json' }, + signal: AbortSignal.timeout(15_000), + }); + } + + // Classification loop: up to two transient-5xx retries (5s, 15s) then give up. + let transientRetries = 0; + const MAX_TRANSIENT_RETRIES = 2; + let resp; + while (true) { + resp = await once(); + if (isTransientComtrade(resp.status) && transientRetries < MAX_TRANSIENT_RETRIES) { + const delay = transientRetries === 0 ? 5_000 : 15_000; + console.warn(` transient HTTP ${resp.status} for reporter ${reporter.code} cmd ${commodity.code}, retrying in ${delay / 1000}s...`); + await _retrySleep(delay); + transientRetries++; + continue; + } + break; + } if (!resp.ok) throw new Error(`HTTP ${resp.status}`); const data = await resp.json(); @@ -130,17 +166,48 @@ async function fetchAllFlows() { } } - const total = REPORTERS.length * COMMODITIES.length; - const populated = Object.values(perKeyFlows).filter((v) => (v.flows?.length ?? 0) > 0).length; - const coverage = populated / total; - console.log(` Coverage: ${populated}/${total} (${(coverage * 100).toFixed(0)}%) reporter×commodity pairs populated`); - if (coverage < MIN_COVERAGE_RATIO) { - throw new Error(`coverage ${populated}/${total} below floor ${MIN_COVERAGE_RATIO}; refusing to publish partial snapshot`); + const gate = checkCoverage(perKeyFlows, REPORTERS, COMMODITIES); + console.log(` Coverage: ${gate.populated}/${gate.total} (${(gate.globalRatio * 100).toFixed(0)}%) reporter×commodity pairs populated`); + for (const r of gate.perReporter) { + if (r.ratio < MIN_PER_REPORTER_RATIO) { + console.warn(` ${r.reporter} reporter ${r.code}: ${r.populated}/${r.total} (${(r.ratio * 100).toFixed(0)}%) — below per-reporter floor ${MIN_PER_REPORTER_RATIO}`); + } } + if (!gate.ok) throw new Error(gate.reason); return { flows: allFlows, perKeyFlows, fetchedAt: new Date().toISOString() }; } +/** + * Pure coverage gate. Returns pass/fail + per-reporter breakdown. + * Exported for unit testing — mocking 30+ fetches in fetchAllFlows is fragile, + * and the failure mode the PR is trying to block lives here, not in fetchFlows. + * + * Blocks publish when EITHER: global ratio < MIN_COVERAGE_RATIO, OR any single + * reporter's commodity coverage < MIN_PER_REPORTER_RATIO. The latter catches + * the India/Taiwan-style "one reporter flatlines completely" case that passes + * a global-only gate. + */ +export function checkCoverage(perKeyFlows, reporters, commodities) { + const total = reporters.length * commodities.length; + const populated = Object.values(perKeyFlows).filter((v) => (v.flows?.length ?? 0) > 0).length; + const globalRatio = total > 0 ? populated / total : 0; + + const perReporter = reporters.map((r) => { + const pop = commodities.filter((c) => (perKeyFlows[`${KEY_PREFIX}:${r.code}:${c.code}`]?.flows?.length ?? 0) > 0).length; + return { reporter: r.name, code: r.code, populated: pop, total: commodities.length, ratio: commodities.length > 0 ? pop / commodities.length : 0 }; + }); + + if (globalRatio < MIN_COVERAGE_RATIO) { + return { ok: false, populated, total, globalRatio, perReporter, reason: `coverage ${populated}/${total} below global floor ${MIN_COVERAGE_RATIO}; refusing to publish partial snapshot` }; + } + const dead = perReporter.find((r) => r.ratio < MIN_PER_REPORTER_RATIO); + if (dead) { + return { ok: false, populated, total, globalRatio, perReporter, reason: `reporter ${dead.reporter} (${dead.code}) only ${dead.populated}/${dead.total} commodities — below per-reporter floor ${MIN_PER_REPORTER_RATIO}; refusing to publish snapshot with a flatlined reporter` }; + } + return { ok: true, populated, total, globalRatio, perReporter, reason: null }; +} + function validate(data) { return Array.isArray(data?.flows) && data.flows.length > 0; } @@ -158,14 +225,17 @@ async function afterPublish(data, _meta) { } } -runSeed('trade', 'comtrade-flows', CANONICAL_KEY, fetchAllFlows, { - validateFn: validate, - ttlSeconds: CACHE_TTL, - sourceVersion: 'comtrade-preview-v1', - publishTransform, - afterPublish, -}).catch((err) => { - const _cause = err.cause ? ` (cause: ${err.cause.message || err.cause.code || err.cause})` : ''; - console.error('FATAL:', (err.message || err) + _cause); - process.exit(0); -}); +// isMain guard so tests can import fetchFlows without triggering a real seed run. +if (process.argv[1]?.endsWith('seed-trade-flows.mjs')) { + runSeed('trade', 'comtrade-flows', CANONICAL_KEY, fetchAllFlows, { + validateFn: validate, + ttlSeconds: CACHE_TTL, + sourceVersion: 'comtrade-preview-v1', + publishTransform, + afterPublish, + }).catch((err) => { + const _cause = err.cause ? ` (cause: ${err.cause.message || err.cause.code || err.cause})` : ''; + console.error('FATAL:', (err.message || err) + _cause); + process.exit(0); + }); +} diff --git a/tests/comtrade-bilateral-hs4.test.mjs b/tests/comtrade-bilateral-hs4.test.mjs index b7f0d00e5..1aec47c5b 100644 --- a/tests/comtrade-bilateral-hs4.test.mjs +++ b/tests/comtrade-bilateral-hs4.test.mjs @@ -262,8 +262,11 @@ describe('Comtrade bilateral HS4 seeder (scripts/seed-comtrade-bilateral-hs4.mjs src.includes('rate-limited'), 'seeder: must log rate limit events', ); + // Matches bare `sleep(60_000)` or indirected `_retrySleep(60_000)` — the + // latter is the test-injectable form used so retry unit tests don't + // actually sleep 60s. Either form preserves the 60s production cadence. assert.ok( - src.includes('sleep(60_000)') || src.includes('sleep(60000)'), + /\b(?:_retrySleep|sleep)\(60[_]?000\)/.test(src), 'seeder: must wait 60 seconds on 429 before retrying', ); }); diff --git a/tests/seed-comtrade-5xx-retry.test.mjs b/tests/seed-comtrade-5xx-retry.test.mjs new file mode 100644 index 000000000..f0848d329 --- /dev/null +++ b/tests/seed-comtrade-5xx-retry.test.mjs @@ -0,0 +1,304 @@ +// Regression test for comtrade seeders' 5xx retry behavior. +// See Railway log 2026-04-14 bilateral-hs4: India (699) hit HTTP 503 on both +// batches with no retry → dropped silently from the snapshot. This test pins +// the retry contract. + +import { test, beforeEach, afterEach } from 'node:test'; +import assert from 'node:assert/strict'; + +import { isTransientComtrade, fetchBilateral, __setSleepForTests } from '../scripts/seed-comtrade-bilateral-hs4.mjs'; +import { fetchFlows, checkCoverage, KEY_PREFIX, __setSleepForTests as __setFlowsSleep } from '../scripts/seed-trade-flows.mjs'; +import { fetchImportsForReporter, __setSleepForTests as __setHhiSleep } from '../scripts/seed-recovery-import-hhi.mjs'; + +const ORIGINAL_FETCH = globalThis.fetch; + +let fetchCalls; +let fetchResponses; // queue of { status, body } per call +let sleepCalls; + +beforeEach(() => { + fetchCalls = []; + fetchResponses = []; + sleepCalls = []; + globalThis.fetch = async (url) => { + fetchCalls.push(String(url)); + const next = fetchResponses.shift() ?? { status: 200, body: { data: [] } }; + return new Response(JSON.stringify(next.body ?? {}), { status: next.status }); + }; + // Swap the retry sleep for a no-op that records the requested delay across + // all three seeders so tests can assert the production backoff cadence + // without actually waiting. + const stub = (ms) => { sleepCalls.push(ms); return Promise.resolve(); }; + __setSleepForTests(stub); + __setFlowsSleep(stub); + __setHhiSleep(stub); +}); + +afterEach(() => { + globalThis.fetch = ORIGINAL_FETCH; + __setSleepForTests(null); + __setFlowsSleep(null); + __setHhiSleep(null); +}); + +test('isTransientComtrade: recognizes 500/502/503/504 only', () => { + for (const s of [500, 502, 503, 504]) { + assert.equal(isTransientComtrade(s), true, `${s} should be transient`); + } + for (const s of [200, 400, 401, 403, 404, 429, 418, 499, 505]) { + assert.equal(isTransientComtrade(s), false, `${s} should NOT be transient`); + } +}); + +test('fetchBilateral: succeeds on first attempt with 200', async () => { + fetchResponses = [ + { status: 200, body: { data: [{ cmdCode: '2709', partnerCode: '156', primaryValue: 1000, period: 2024 }] } }, + ]; + const result = await fetchBilateral('699', ['2709']); + assert.equal(fetchCalls.length, 1, 'one fetch, no retries'); + assert.equal(result.length, 1); + assert.equal(result[0].cmdCode, '2709'); +}); + +test('fetchBilateral: retries once after a single 503, succeeds on second attempt', async () => { + fetchResponses = [ + { status: 503, body: {} }, + { status: 200, body: { data: [{ cmdCode: '2709', partnerCode: '156', primaryValue: 500, period: 2024 }] } }, + ]; + const result = await fetchBilateral('699', ['2709']); + assert.equal(fetchCalls.length, 2, 'one initial + one retry'); + assert.equal(result.length, 1, 'data recovered on retry'); +}); + +test('fetchBilateral: retries twice on consecutive 503s, succeeds on third', async () => { + fetchResponses = [ + { status: 503, body: {} }, + { status: 503, body: {} }, + // Real partner code (China=156), NOT '000': groupByProduct() downstream + // filters 0/000 partners, so a test asserting "data recovered" with '000' + // would pass here while the user-visible seeder would still drop the row. + { status: 200, body: { data: [{ cmdCode: '2709', partnerCode: '156', primaryValue: 999, period: 2024 }] } }, + ]; + const result = await fetchBilateral('699', ['2709']); + assert.equal(fetchCalls.length, 3, 'initial + two retries'); + assert.equal(result.length, 1); + assert.deepEqual(sleepCalls, [5_000, 15_000]); +}); + +test('fetchBilateral: gives up (returns []) after 3 consecutive 5xx', async () => { + fetchResponses = [ + { status: 503, body: {} }, + { status: 502, body: {} }, + { status: 500, body: {} }, + ]; + const result = await fetchBilateral('699', ['2709']); + assert.equal(fetchCalls.length, 3, 'caps at 3 attempts'); + assert.deepEqual(result, [], 'empty array after exhausting retries — caller can skip write'); + assert.deepEqual(sleepCalls, [5_000, 15_000], 'no sleep after final attempt'); +}); + +test('fetchBilateral: does NOT retry on 4xx (non-transient)', async () => { + fetchResponses = [{ status: 403, body: {} }]; + const result = await fetchBilateral('699', ['2709']); + assert.equal(fetchCalls.length, 1, 'no retry on client error'); + assert.deepEqual(result, []); +}); + +test('fetchBilateral: 429 then 503 still consumes the 5xx retries (regression for PR review)', async () => { + // Previously the 429 branch would return immediately if its retry came back + // 5xx, bypassing the bounded transient retries. Now the classification loop + // reclassifies each response: 429 waits → retry hits 503 → 5s backoff → 15s + // backoff → 200 success. + fetchResponses = [ + { status: 429, body: {} }, + { status: 503, body: {} }, + { status: 502, body: {} }, + { status: 200, body: { data: [{ cmdCode: '2709', partnerCode: '156', primaryValue: 42, period: 2024 }] } }, + ]; + const result = await fetchBilateral('699', ['2709']); + assert.equal(fetchCalls.length, 4, '1 initial 429 + 1 post-429 retry + 2 transient-5xx retries'); + assert.equal(result.length, 1, 'recovered after mixed 429+5xx sequence'); + // Pin the production backoff cadence so a future refactor that changes + // these numbers has to update the test too. + assert.deepEqual(sleepCalls, [60_000, 5_000, 15_000], '60s 429 wait, then 5s and 15s transient backoffs'); +}); + +test('fetchBilateral: 429 once → 429 again does NOT re-wait 60s (one 429 cap)', async () => { + fetchResponses = [ + { status: 429, body: {} }, + { status: 429, body: {} }, + ]; + const result = await fetchBilateral('699', ['2709']); + assert.equal(fetchCalls.length, 2, 'cap 429 retries at one wait'); + assert.deepEqual(result, []); + assert.deepEqual(sleepCalls, [60_000], 'only one 60s wait, no second 429 backoff'); +}); + +// ----------------------------------------------------------------------------- +// seed-trade-flows.mjs — fetchFlows +// ----------------------------------------------------------------------------- + +test('fetchFlows: succeeds on first 200', async () => { + fetchResponses = [{ status: 200, body: { data: [{ period: 2024, flowCode: 'M', primaryValue: 100, partnerCode: '156' }] } }]; + const result = await fetchFlows({ code: '699', name: 'India' }, { code: '2709', desc: 'Crude' }); + assert.equal(fetchCalls.length, 1); + assert.ok(result.length >= 1, 'returns aggregated flows'); + assert.deepEqual(sleepCalls, []); +}); + +test('fetchFlows: retries twice on 503s, succeeds on third', async () => { + fetchResponses = [ + { status: 503, body: {} }, + { status: 502, body: {} }, + { status: 200, body: { data: [{ period: 2024, flowCode: 'X', primaryValue: 500, partnerCode: '156' }] } }, + ]; + const result = await fetchFlows({ code: '699', name: 'India' }, { code: '2709', desc: 'Crude' }); + assert.equal(fetchCalls.length, 3); + assert.ok(result.length >= 1, 'recovered after transient 5xx'); + assert.deepEqual(sleepCalls, [5_000, 15_000]); +}); + +test('fetchFlows: throws after 3 consecutive 5xx (caller catches via allSettled)', async () => { + fetchResponses = [{ status: 503 }, { status: 502 }, { status: 500 }]; + await assert.rejects( + () => fetchFlows({ code: '699', name: 'India' }, { code: '2709', desc: 'Crude' }), + /HTTP 500/, + ); + assert.equal(fetchCalls.length, 3, 'caps at 3 attempts'); + assert.deepEqual(sleepCalls, [5_000, 15_000]); +}); + +// ----------------------------------------------------------------------------- +// seed-recovery-import-hhi.mjs — fetchImportsForReporter +// ----------------------------------------------------------------------------- + +test('fetchImportsForReporter: succeeds on first 200', async () => { + fetchResponses = [{ status: 200, body: { data: [{ period: 2024, primaryValue: 1_000_000, partnerCode: '156' }] } }]; + const { records, status } = await fetchImportsForReporter('699', 'fake-key'); + assert.equal(fetchCalls.length, 1); + assert.equal(status, 200); + assert.ok(records.length >= 0); + assert.deepEqual(sleepCalls, []); +}); + +test('fetchImportsForReporter: retries twice on 503s, succeeds on third', async () => { + fetchResponses = [ + { status: 503, body: {} }, + { status: 503, body: {} }, + { status: 200, body: { data: [{ period: 2024, primaryValue: 999, partnerCode: '156' }] } }, + ]; + const { records, status } = await fetchImportsForReporter('699', 'fake-key'); + assert.equal(fetchCalls.length, 3); + assert.equal(status, 200); + assert.ok(records.length >= 0); + assert.deepEqual(sleepCalls, [5_000, 10_000], 'import-hhi uses 10s not 15s for second retry (tighter bundle budget)'); +}); + +test('fetchImportsForReporter: 429 then 503 still consumes the 5xx retries', async () => { + fetchResponses = [ + { status: 429, body: {} }, + { status: 503, body: {} }, + { status: 502, body: {} }, + { status: 200, body: { data: [{ period: 2024, primaryValue: 42, partnerCode: '156' }] } }, + ]; + const { records, status } = await fetchImportsForReporter('699', 'fake-key'); + assert.equal(fetchCalls.length, 4, 'classification loop: 429 + 3 transient 5xx attempts (of which 2 retried)'); + assert.equal(status, 200); + assert.ok(records.length >= 0); + assert.deepEqual(sleepCalls, [15_000, 5_000, 10_000], '15s 429 + 5s/10s transient backoffs'); +}); + +test('fetchImportsForReporter: gives up ({records:[], status:503}) after 3 consecutive 5xx', async () => { + fetchResponses = [{ status: 503 }, { status: 502 }, { status: 500 }]; + const { records, status } = await fetchImportsForReporter('699', 'fake-key'); + assert.deepEqual(records, []); + assert.equal(status, 500, 'returns the final upstream status so caller can log it'); + assert.equal(fetchCalls.length, 3); +}); + +// ----------------------------------------------------------------------------- +// seed-trade-flows — checkCoverage (publish gate) +// Regression for the India/Taiwan-style "entire reporter flatlines" case. +// 6 reporters × 5 commodities = 30 pairs. MIN_COVERAGE_RATIO = 0.70 means +// >=21 pairs pass the global gate. Losing one full reporter (5 pairs) yields +// 25/30 = 83% — which passes the global ratio but should fail per-reporter. +// ----------------------------------------------------------------------------- + +const FLOWS_REPORTERS = [ + { code: '842', name: 'USA' }, { code: '156', name: 'China' }, { code: '643', name: 'Russia' }, + { code: '364', name: 'Iran' }, { code: '699', name: 'India' }, { code: '490', name: 'Taiwan' }, +]; +const FLOWS_COMMODITIES = [ + { code: '2709', desc: 'Crude' }, { code: '7108', desc: 'Gold' }, + { code: '7112', desc: 'Rare earths' }, { code: '8542', desc: 'Semis' }, + { code: '9301', desc: 'Arms' }, +]; + +function buildPerKey(populatedPairs /* Array<[reporterCode, commodityCode]> */) { + const out = {}; + for (const r of FLOWS_REPORTERS) { + for (const c of FLOWS_COMMODITIES) { + const key = `${KEY_PREFIX}:${r.code}:${c.code}`; + const isPop = populatedPairs.some(([rc, cc]) => rc === r.code && cc === c.code); + out[key] = { flows: isPop ? [{ year: 2024 }] : [], fetchedAt: '2026-04-14T00:00Z' }; + } + } + return out; +} + +test('checkCoverage: all 30/30 pairs populated → ok', () => { + const pairs = []; + for (const r of FLOWS_REPORTERS) for (const c of FLOWS_COMMODITIES) pairs.push([r.code, c.code]); + const res = checkCoverage(buildPerKey(pairs), FLOWS_REPORTERS, FLOWS_COMMODITIES); + assert.equal(res.ok, true); + assert.equal(res.populated, 30); +}); + +test('checkCoverage: India flatlines (0/5 commodities) → REJECT despite 83% global coverage', () => { + // 25/30 populated = 83% global (passes MIN_COVERAGE_RATIO 0.70) but India + // has 0/5 per-reporter coverage. Prior gate published this silently. + const pairs = []; + for (const r of FLOWS_REPORTERS) { + if (r.code === '699') continue; // India flatlines + for (const c of FLOWS_COMMODITIES) pairs.push([r.code, c.code]); + } + const res = checkCoverage(buildPerKey(pairs), FLOWS_REPORTERS, FLOWS_COMMODITIES); + assert.equal(res.ok, false, 'per-reporter gate must block full-reporter flatline'); + assert.match(res.reason, /India.*per-reporter/); + assert.equal(res.populated, 25); + assert.equal(Math.round(res.globalRatio * 100), 83, 'global ratio alone would have allowed this'); +}); + +test('checkCoverage: Taiwan flatlines → REJECT by reporter name', () => { + const pairs = []; + for (const r of FLOWS_REPORTERS) { + if (r.code === '490') continue; // Taiwan + for (const c of FLOWS_COMMODITIES) pairs.push([r.code, c.code]); + } + const res = checkCoverage(buildPerKey(pairs), FLOWS_REPORTERS, FLOWS_COMMODITIES); + assert.equal(res.ok, false); + assert.match(res.reason, /Taiwan/); +}); + +test('checkCoverage: each reporter missing 3/5 commodities → global 12/30 = 40% → REJECT global', () => { + // Failure mode: broad upstream outage. Global ratio catches this. + const pairs = []; + for (const r of FLOWS_REPORTERS) pairs.push([r.code, FLOWS_COMMODITIES[0].code], [r.code, FLOWS_COMMODITIES[1].code]); + const res = checkCoverage(buildPerKey(pairs), FLOWS_REPORTERS, FLOWS_COMMODITIES); + assert.equal(res.ok, false); + assert.match(res.reason, /below global floor/); +}); + +test('checkCoverage: each reporter has 4/5 (global 80%) → passes both gates', () => { + const pairs = []; + for (const r of FLOWS_REPORTERS) for (const c of FLOWS_COMMODITIES.slice(0, 4)) pairs.push([r.code, c.code]); + // 6 × 4 = 24/30 = 80% global (≥70%), each reporter 4/5 = 80% (≥40%) + const res = checkCoverage(buildPerKey(pairs), FLOWS_REPORTERS, FLOWS_COMMODITIES); + assert.equal(res.ok, true, `expected ok, got: ${res.reason}`); +}); + +test('checkCoverage: per-reporter breakdown includes every reporter', () => { + const res = checkCoverage(buildPerKey([]), FLOWS_REPORTERS, FLOWS_COMMODITIES); + assert.equal(res.perReporter.length, FLOWS_REPORTERS.length); + assert.ok(res.perReporter.every((r) => r.populated === 0 && r.total === 5)); +});