Files
worldmonitor/scripts/seed-recovery-reexport-share.mjs
Elie Habib 8cca8d19e3 feat(resilience): Comtrade-backed re-export-share seeder + SWF Redis read (#3385)
* feat(seed): BUNDLE_RUN_STARTED_AT_MS env + runSeed SIGTERM cleanup

Prereq for the re-export-share Comtrade seeder (plan 2026-04-24-003),
usable by any cohort seeder whose consumer needs bundle-level freshness.

Two coupled changes:

1. `_bundle-runner.mjs` injects `BUNDLE_RUN_STARTED_AT_MS` into every
   spawned child. All siblings in a single bundle run share one value
   (captured at `runBundle` start, not spawn time). Consumers use this
   to detect stale peer keys — if a peer's seed-meta predates the
   current bundle run, fall back to a hard default rather than read
   a cohort-peer's last-week output.

2. `_seed-utils.mjs::runSeed` registers a `process.once('SIGTERM')`
   handler that releases the acquired lock and extends existing-data
   TTL before exiting 143. `_bundle-runner.mjs` sends SIGTERM on
   section timeout, then SIGKILL after KILL_GRACE_MS (5s). Without
   this handler the `finally` path never runs on SIGKILL, leaving
   the 30-min acquireLock reservation in place until its own TTL
   expires — the next cron tick silently skips the resource.

Regression guard memory: `bundle-runner-sigkill-leaks-child-lock` (PR
#3128 root cause).

Tests added:
- bundle-runner env injection (value within run bounds)
- sibling sections share the same timestamp (critical for the
  consumer freshness guard)
- runSeed SIGTERM path: exit 143 + cleanup log
- process.once contract: second SIGTERM does not re-enter handler

* fix(seed): address P1/P2 review findings on SIGTERM + bundle contracts

Addresses PR #3384 review findings (todos 256, 257, 259, 260):

#256 (P1) — SIGTERM handler narrowed to fetch phase only. Was installed
at runSeed entry and armed through every `process.exit` path; could
race `emptyDataIsFailure: true` strict-floor exits (IMF-External,
WB-bulk) and extend seed-meta TTL when the contract forbids it —
silently re-masking 30-day outages. Now the handler is attached
immediately before `withRetry(fetchFn)` and removed in a try/finally
that covers all fetch-phase exit branches.

#257 (P1) — `BUNDLE_RUN_STARTED_AT_MS` now has a first-class helper.
Exported `getBundleRunStartedAtMs()` from `_seed-utils.mjs` with JSDoc
describing the bundle-freshness contract. Fleet-wide helper so the
next consumer seeder imports instead of rediscovering the idiom.

#259 (P2) — SIGTERM cleanup runs `Promise.allSettled` on disjoint-key
ops (`releaseLock` + `extendExistingTtl`). Serialising compounded
Upstash latency during the exact failure mode (Redis degraded) this
handler exists to handle, risking breach of the 5s SIGKILL grace.

#260 (P2) — `_bundle-runner.mjs` asserts topological order on
optional `dependsOn` section field. Throws on unknown-label refs and
on deps appearing at a later index. Fleet-wide contract replacing
the previous prose-comment ordering guarantee.

Tests added/updated:
- New: SIGTERM handler removed after fetchFn completes (narrowed-scope
  contract — post-fetch SIGTERM must NOT trigger TTL extension)
- New: dependsOn unknown-label + out-of-order + happy-path (3 tests)

Full test suite: 6,866 tests pass (+4 net).

* fix(seed): getBundleRunStartedAtMs returns null outside a bundle run

Review follow-up: the earlier `Math.floor(Date.now()/1000)*1000` fallback
regressed standalone (non-bundle) runs. A consumer seeder invoked
manually just after its peer wrote `fetchedAt = (now - 5s)` would see
`bundleStartMs = Date.now()`, reject the perfectly-fresh peer envelope
as "stale", and fall back to defaults — defeating the point of the
peer-read path outside the bundle.

Returning null when `BUNDLE_RUN_STARTED_AT_MS` is unset/invalid keeps
the freshness gate scoped to its real purpose (across-bundle-tick
staleness) and lets standalone runs skip the gate entirely. Consumers
check `bundleStartMs != null` before applying the comparison; see the
companion `seed-sovereign-wealth.mjs` change on the stacked PR.

* test(seed): SIGTERM cleanup test now verifies Redis DEL + EXPIRE calls

Greptile review P2 on PR #3384: the existing test only asserted exit
code + log line, not that the Redis ops were actually issued. The
log claim was ahead of the test.

Fixture now logs every Upstash fetch call's shape (EVAL / pipeline-
EXPIRE / other) to stderr. Test asserts:

- >=1 EVAL op was issued during SIGTERM cleanup (releaseLock Lua
  script on the lock key)
- >=1 pipeline-EXPIRE op was issued (extendExistingTtl on canonical
  + seed-meta keys)
- The EVAL body carries the runSeed-generated runId (proves it's
  THIS run's release, not a phantom op)
- The EXPIRE pipeline touches both the canonicalKey AND the
  seed-meta key (proves the keys[] array was built correctly
  including the extraKeys merge path)

Full test suite: 6,866 tests pass, typecheck clean.

* feat(resilience): Comtrade-backed re-export-share seeder + SWF Redis read

Plan ref: docs/plans/2026-04-24-003-feat-reexport-share-comtrade-seeder-plan.md

Motivating case. Before this PR, the SWF `rawMonths` denominator for
the `sovereignFiscalBuffer` dimension used GROSS annual imports for
every country. For re-export hubs (goods transiting without domestic
settlement), this structurally under-reports resilience: UAE's 2023
$941B of imports include $334B of transit flow that never represents
domestic consumption. Net imports = gross × (1 − reexport_share).

The previous (PR 3A) design flattened a hand-curated YAML into Redis;
the YAML shipped empty and never populated, so the correction never
applied and the cohort audit showed no movement.

Gap #2 (this PR). Two coupled changes to make the correction actually
apply:

1. Comtrade-backed seeder (`scripts/seed-recovery-reexport-share.mjs`).
   Rewritten to fetch UN Comtrade `flowCode=RX` (re-exports) and
   `flowCode=M` (imports) per cohort member, compute share = RX/M at
   the latest co-populated year, clamp to [0.05, 0.95], publish the
   envelope. Header auth (`Ocp-Apim-Subscription-Key`) — subscription
   key never reaches URL/logs/Redis. `maxRecords=250000` cap with
   truncation detection. Sequential + retry-on-429 with backoff.

   Hub cohort resolved by Phase 0 empirical probe (plan §Phase 0):
   ['AE', 'PA']. Six candidates (SG/HK/NL/BE/MY/LT) return HTTP 200
   with zero RX rows — Comtrade doesn't expose RX for those reporters.

2. SWF seeder reads from Redis (`scripts/seed-sovereign-wealth.mjs`).
   Swaps `loadReexportShareByCountry()` (YAML) for
   `loadReexportShareFromRedis()` (Redis key written by #1). Guarded
   by bundle-run freshness: if the sibling Reexport-Share seeder's
   `seed-meta` predates `BUNDLE_RUN_STARTED_AT_MS` (set by the
   prereq PR's `_bundle-runner.mjs` env-injection), HARD fallback
   to gross imports rather than apply last-month's stale share.

Health registries. Both new keys registered in BOTH `api/health.js`
SEED_META (60-day alert threshold) and `api/seed-health.js`
SEED_DOMAINS (43200min interval). feedback_two_health_endpoints_must_match.

Bundle wiring. `seed-bundle-resilience-recovery` Reexport-Share
timeout bumped 60s → 300s (Comtrade + retry can take 2-3 min
worst-case). Ordering preserved: Reexport-Share before Sovereign-
Wealth so the SWF seeder reads a freshly-written key in the same
cron tick.

Deletions. YAML + loader + 7 obsolete loader tests removed; single
source of truth is now Comtrade → Redis.

Prereq. Stacks on PR #3384 (feat/bundle-runner-env-sigterm)
which adds BUNDLE_RUN_STARTED_AT_MS env injection + runSeed
SIGTERM cleanup. This PR's bundle-freshness guard depends on
that env variable.

Tests (19 new, 7 deleted, +12 net):
- Pure math: parseComtradeFlowResponse, computeShareFromFlows,
  clampShare, declareRecords + credential-leak source scan (15)
- Integration (Gap #2 regression guards): SWF seeder loadReexport
  ShareFromRedis — fresh/absent/malformed/stale-meta/missing-meta (5)
- Health registry dual-registry drift guard — scoped to this PR's
  keys, respecting pre-existing asymmetry (4)
- Bundle-ordering + timeout assertions (2)

Phase 0 cohort validation committed to plan. Full test suite
passes: 6,881 tests.

* fix(resilience): address P1/P2 review findings — adopt shared helpers, pin freshness boundary

Addresses PR #3385 review findings:

#257 (P1) consumer — `seed-sovereign-wealth.mjs` imports the shared
`getBundleRunStartedAtMs` helper from `_seed-utils.mjs` (added in the
prereq commit) instead of its own `getBundleStartMs`. Single source of
truth for the bundle-freshness contract.

#258 (P2) — `seed-recovery-reexport-share.mjs` isMain guard uses the
canonical `pathToFileURL(process.argv[1]).href === import.meta.url`
form instead of basename-suffix matching. Handles symlinks, case-
different paths on macOS HFS+, and Windows path separators without
string munging.

#260 (P2) consumer — Sovereign-Wealth declares `dependsOn:
['Reexport-Share']` in the bundle spec. `_bundle-runner.mjs` (prereq
commit) now enforces topological order on load and throws on
violation — replaces the previous prose-comment ordering contract.

#261 (P2) — added a test to `tests/seed-sovereign-wealth-reads-redis-
reexport-share.test.mts` pinning the inclusive-boundary semantic:
`fetchedAtMs === bundleStartMs` must be treated as FRESH. Guards
against a future refactor to `<=` that would silently reject peers
writing at the very first millisecond of the bundle run.

Rebased onto updated prereq. Full test suite: 6,886 tests pass (+5 net).

* fix(resilience): freshness gate skipped in standalone mode; meta still required

Review catch: the previous `bundleStartMs = Date.now()` fallback made
standalone/manual `seed-sovereign-wealth.mjs` runs ALWAYS reject any
previously-seeded re-export-share meta as "stale" — even when the
operator ran the Reexport seeder milliseconds beforehand. Defeated
the point of the peer-read path outside the bundle.

With `getBundleRunStartedAtMs()` now returning null outside a bundle
(companion commit on the prereq branch), the consumer only applies
the freshness gate when `bundleStartMs != null`. Standalone runs
accept any `fetchedAt` — the operator is responsible for ordering.

Two guards survive the change:
- Meta MUST exist (absence = peer-outage fail-safe, both modes)
- In-bundle: meta MUST be at or after `BUNDLE_RUN_STARTED_AT_MS`

Two new tests pin both modes:
- standalone: accepts meta written 10 min before this process started
- standalone: still rejects missing meta (peer-outage fail-safe
  survives gate bypass)

Rebased onto updated prereq. Full test suite: 6,888 tests (+2 net).

* fix(resilience): filter world-aggregate Comtrade rows + skip final-retry sleep

Greptile review of PR #3385 flagged two P2s in the Comtrade seeder.

Finding #3 (parseComtradeFlowResponse double-count risk):
`cmdCode=TOTAL` without a partner filter currently returns only
world-aggregate rows in practice — but `parseComtradeFlowResponse`
summed every row unconditionally. A future refactor adding per-
partner querying would silently double-count (world-aggregate row +
partner-level rows for the same year), cutting the derived share in
half with no test signal.

Fix: explicit `partnerCode ∈ {'0', 0, null/undefined}` filter. Matches
current empirical behavior (aggregate-only responses) and makes the
construct robust to a future partner-level query.

Finding #4 (wasted backoff on final retry):
429 and 5xx branches slept `backoffMs` before `continue`, but on
`attempt === RETRY_MAX_ATTEMPTS` the loop condition fails immediately
after — the sleep was pure waste. Added early-return (parallel to the
existing pattern in the network-error catch branch) so the final
attempt exits the retry loop at the first non-success response
without extra latency.

Tests:
- 3 new `parseComtradeFlowResponse` variants: world-only filter,
  numeric-0 partnerCode shape, rows without partnerCode field
- Existing tests updated: the double-count assertion replaced with
  a "per-partner rows must NOT sum into the world-aggregate total"
  assertion that pins the new contract

Rebased onto updated prereq. Full test suite: 6,890 tests (+2 net).
2026-04-25 00:14:17 +04:00

345 lines
14 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env node
// seed-recovery-reexport-share
// ============================
//
// Publishes `resilience:recovery:reexport-share:v1` from UN Comtrade,
// computing each country's re-export-share-of-imports as a live ratio
// of `flowCode=RX` over `flowCode=M` aggregate merchandise trade.
//
// Consumed by `scripts/seed-sovereign-wealth.mjs` to convert GROSS
// annual imports into NET annual imports when computing the SWF
// `rawMonths` denominator for the `sovereignFiscalBuffer` dimension.
//
// netAnnualImports = grossAnnualImports × (1 reexportShareOfImports)
//
// Design decisions — see plan §Phase 1 at
// `docs/plans/2026-04-24-003-feat-reexport-share-comtrade-seeder-plan.md`:
//
// - Hub cohort resolved by Phase 0 empirical RX+M co-population probe
// (see the plan's §"Phase 0 cohort validation results"). As of the
// 2026-04-24 probe: AE + PA. Six other candidates (SG, HK, NL, BE,
// MY, LT) return HTTP 200 with zero RX rows and are excluded until
// Comtrade exposes RX for those reporters.
// - Header auth (`Ocp-Apim-Subscription-Key`) — key never leaks into
// the URL → logs → Redis payload → clipboard.
// - `maxRecords=250000` cap with truncation detection: a full-cap
// response triggers per-country omission so partial data never
// under-reports the share.
// - 4-year period window (Y-1..Y-4), matching the HHI seeder PR #3372.
// - Clamps: share < 0.05 → omit (per-run discipline); share > 0.95 →
// cap at 0.95. computeNetImports requires share < 1.
// - Envelope schema v2 (bumped from manifestVersion=1 manifest flattener).
//
// Revision cadence: none — the monthly bundle cron re-seeds from Comtrade.
//
// Duplication policy: the retry-classification loop is duplicated here
// rather than extracted into a `_comtrade.mjs` helper. Per CLAUDE.md,
// duplication is cheaper than a premature abstraction — a second
// Comtrade caller in the future can extract then.
import { pathToFileURL } from 'node:url';
import { CHROME_UA, loadEnvFile, runSeed, sleep } from './_seed-utils.mjs';
loadEnvFile(import.meta.url);
const CANONICAL_KEY = 'resilience:recovery:reexport-share:v1';
// Monthly bundle cron. TTL large enough that one missed tick doesn't
// evict (the SWF seeder's bundle-freshness guard falls back to gross
// imports if seed-meta predates the current bundle run, independent
// of data-key TTL).
const CACHE_TTL_SECONDS = 35 * 24 * 3600;
const COMTRADE_URL = 'https://comtradeapi.un.org/data/v1/get/C/A/HS';
const MAX_RECORDS = 250_000;
const FETCH_TIMEOUT_MS = 45_000;
const RETRY_MAX_ATTEMPTS = 3;
const INTER_CALL_PACING_MS = 750;
// Share bounds. Floor 0.05 drops commercially-immaterial contributions
// (Panama's 1.4% observed in Phase 0). Ceiling 0.95 prevents pathological
// share=1 reporters from zeroing the denominator via computeNetImports.
const MIN_MATERIAL_SHARE = 0.05;
const MAX_SHARE_CAP = 0.95;
// Phase 0 resolved cohort — commit 2026-04-24, candidates AE, SG, HK,
// NL, BE, PA, MY, LT probed sequentially via railway run. Only AE and
// PA returned co-populated RX+M rows; see plan §"Phase 0 cohort
// validation results" for full table and HTTP status per candidate.
const REEXPORT_HUB_COHORT = [
{ iso2: 'AE', reporterCode: '784', name: 'United Arab Emirates' },
{ iso2: 'PA', reporterCode: '591', name: 'Panama' },
];
function buildPeriodYears() {
// Y-1..Y-4. Same window as the HHI seeder (PR #3372). Excludes the
// current calendar year (Comtrade lag for annual aggregates).
const now = new Date().getFullYear();
return [now - 1, now - 2, now - 3, now - 4];
}
function auditSafeSourceUrl(reporterCode, flowCode, years) {
// Belt-and-suspenders: even though header auth means the
// subscription-key never gets appended to the URL, construct the
// displayed source string WITHOUT any credential query-params. If
// a future refactor ever adds subscription-key to the URL again,
// this function strips it before it reaches the Redis envelope.
const u = new URL(COMTRADE_URL);
u.searchParams.set('reporterCode', reporterCode);
u.searchParams.set('flowCode', flowCode);
u.searchParams.set('cmdCode', 'TOTAL');
u.searchParams.set('period', years.join(','));
u.searchParams.delete('subscription-key');
return u.toString();
}
async function fetchComtradeFlow(apiKey, reporterCode, flowCode, years, { iso2 }) {
const u = new URL(COMTRADE_URL);
u.searchParams.set('reporterCode', reporterCode);
u.searchParams.set('flowCode', flowCode);
u.searchParams.set('cmdCode', 'TOTAL');
u.searchParams.set('period', years.join(','));
u.searchParams.set('maxRecords', String(MAX_RECORDS));
const urlStr = u.toString();
for (let attempt = 1; attempt <= RETRY_MAX_ATTEMPTS; attempt += 1) {
try {
const resp = await fetch(urlStr, {
headers: {
'Ocp-Apim-Subscription-Key': apiKey,
'User-Agent': CHROME_UA,
Accept: 'application/json',
},
signal: AbortSignal.timeout(FETCH_TIMEOUT_MS),
});
if (resp.status === 429) {
if (attempt === RETRY_MAX_ATTEMPTS) {
console.warn(`[reexport-share] ${iso2} ${flowCode}: 429 after ${RETRY_MAX_ATTEMPTS} attempts; omitting`);
return { rows: [], truncated: false, status: 429 };
}
const backoffMs = 2000 * attempt;
console.warn(`[reexport-share] ${iso2} ${flowCode}: 429 rate-limited, backoff ${backoffMs}ms (attempt ${attempt}/${RETRY_MAX_ATTEMPTS})`);
await sleep(backoffMs);
continue;
}
if (resp.status >= 500) {
if (attempt === RETRY_MAX_ATTEMPTS) {
console.warn(`[reexport-share] ${iso2} ${flowCode}: HTTP ${resp.status} after ${RETRY_MAX_ATTEMPTS} attempts; omitting`);
return { rows: [], truncated: false, status: resp.status };
}
const backoffMs = 5000 * attempt;
console.warn(`[reexport-share] ${iso2} ${flowCode}: HTTP ${resp.status}, backoff ${backoffMs}ms (attempt ${attempt}/${RETRY_MAX_ATTEMPTS})`);
await sleep(backoffMs);
continue;
}
if (!resp.ok) {
console.warn(`[reexport-share] ${iso2} ${flowCode}: HTTP ${resp.status}; omitting`);
return { rows: [], truncated: false, status: resp.status };
}
const json = await resp.json();
const rows = Array.isArray(json?.data) ? json.data : [];
if (rows.length >= MAX_RECORDS) {
console.warn(`[reexport-share] ${iso2} ${flowCode}: response at cap (${rows.length}>=${MAX_RECORDS}); possible truncation — omitting country`);
return { rows: [], truncated: true, status: 200 };
}
return { rows, truncated: false, status: 200 };
} catch (err) {
if (attempt === RETRY_MAX_ATTEMPTS) {
console.warn(`[reexport-share] ${iso2} ${flowCode}: exhausted retries (${err?.message || err}); omitting`);
return { rows: [], truncated: false, status: null, error: err?.message || String(err) };
}
const backoffMs = 3000 * attempt;
console.warn(`[reexport-share] ${iso2} ${flowCode}: fetch error "${err?.message || err}", backoff ${backoffMs}ms (attempt ${attempt}/${RETRY_MAX_ATTEMPTS})`);
await sleep(backoffMs);
}
}
return { rows: [], truncated: false, status: null };
}
/**
* Sum primaryValue per year from a Comtrade flow response.
* USES world-aggregate rows only (partnerCode='0' / 0 / absent) —
* this construct wants the country-total flow as a single figure, not
* a partner-level breakdown. The `cmdCode=TOTAL` query without a
* partner filter defaults to returning only world-aggregate rows in
* practice, but this filter is defensive: if a future refactor asks
* Comtrade for partner-level decomposition (e.g. to cross-check),
* summing partner rows ON TOP of the world-aggregate row would
* silently double-count and cut the derived share in half.
*
* Pure function — exported for tests.
*
* @param {Array} rows
* @returns {Map<number, number>} year → summed primaryValue in USD
*/
export function parseComtradeFlowResponse(rows) {
const byYear = new Map();
for (const r of rows) {
// Accept world-aggregate rows only: string '0', numeric 0, or
// the field absent entirely (older response shapes). Any specific
// partnerCode (e.g. '842' for US, '826' for UK) is a per-partner
// breakdown row and must be excluded to avoid double-counting
// against the world-aggregate row for the same year.
const partnerCode = r?.partnerCode;
const isWorldAggregate = partnerCode == null
|| partnerCode === '0'
|| partnerCode === 0;
if (!isWorldAggregate) continue;
const yRaw = r?.period ?? r?.refPeriodId;
const y = Number(yRaw);
const v = Number(r?.primaryValue ?? 0);
if (!Number.isInteger(y) || !Number.isFinite(v) || v <= 0) continue;
byYear.set(y, (byYear.get(y) ?? 0) + v);
}
return byYear;
}
/**
* Given per-year RX and M sums, pick the latest year where BOTH are
* populated (>0), and return the share = RX / M plus metadata.
*
* Returns null if no co-populated year exists.
*
* Pure function — exported for tests.
*
* @param {Map<number, number>} rxByYear
* @param {Map<number, number>} mByYear
* @returns {{ year: number, share: number, reexportsUsd: number, importsUsd: number } | null}
*/
export function computeShareFromFlows(rxByYear, mByYear) {
const coPopulated = [];
for (const y of rxByYear.keys()) {
if (mByYear.has(y)) coPopulated.push(y);
}
if (coPopulated.length === 0) return null;
coPopulated.sort((a, b) => b - a);
const year = coPopulated[0];
const reexportsUsd = rxByYear.get(year);
const importsUsd = mByYear.get(year);
if (!(importsUsd > 0)) return null;
const rawShare = reexportsUsd / importsUsd;
return { year, share: rawShare, reexportsUsd, importsUsd };
}
/**
* Clamp a raw share into the material-and-safe range. Returns null for
* sub-floor shares (caller omits the country); caps at MAX_SHARE_CAP
* for above-ceiling shares. Pure function — exported for tests.
*
* @param {number} rawShare
* @returns {number | null} clamped share, or null if sub-floor
*/
export function clampShare(rawShare) {
if (!Number.isFinite(rawShare) || rawShare < 0) return null;
if (rawShare < MIN_MATERIAL_SHARE) return null;
if (rawShare > MAX_SHARE_CAP) return MAX_SHARE_CAP;
return rawShare;
}
async function fetchReexportShare() {
const apiKey = (process.env.COMTRADE_API_KEYS || '').split(',').filter(Boolean)[0];
if (!apiKey) {
throw new Error('[reexport-share] COMTRADE_API_KEYS not set — cannot fetch');
}
const years = buildPeriodYears();
const countries = {};
for (const { iso2, reporterCode } of REEXPORT_HUB_COHORT) {
const mResult = await fetchComtradeFlow(apiKey, reporterCode, 'M', years, { iso2 });
await sleep(INTER_CALL_PACING_MS);
const rxResult = await fetchComtradeFlow(apiKey, reporterCode, 'RX', years, { iso2 });
await sleep(INTER_CALL_PACING_MS);
if (mResult.truncated || rxResult.truncated) {
console.warn(`[reexport-share] ${iso2}: skipping due to truncation`);
continue;
}
const mByYear = parseComtradeFlowResponse(mResult.rows);
const rxByYear = parseComtradeFlowResponse(rxResult.rows);
const picked = computeShareFromFlows(rxByYear, mByYear);
if (!picked) {
console.warn(`[reexport-share] ${iso2}: no co-populated RX+M year in window ${years.join(',')}; omitting`);
continue;
}
const clamped = clampShare(picked.share);
if (clamped == null) {
console.log(`[reexport-share] ${iso2}: raw share ${(picked.share * 100).toFixed(2)}% below floor (${MIN_MATERIAL_SHARE * 100}%) at Y=${picked.year}; omitting`);
continue;
}
countries[iso2] = {
reexportShareOfImports: clamped,
year: picked.year,
reexportsUsd: picked.reexportsUsd,
grossImportsUsd: picked.importsUsd,
source: 'comtrade',
sources: [
auditSafeSourceUrl(reporterCode, 'RX', years),
auditSafeSourceUrl(reporterCode, 'M', years),
],
};
console.log(`[reexport-share] ${iso2}: share=${(clamped * 100).toFixed(1)}% at Y=${picked.year} (RX $${(picked.reexportsUsd / 1e9).toFixed(1)}B / M $${(picked.importsUsd / 1e9).toFixed(1)}B)`);
}
const payload = {
manifestVersion: 2,
lastReviewed: new Date().toISOString().slice(0, 10),
externalReviewStatus: 'REVIEWED',
countries,
seededAt: new Date().toISOString(),
};
// Hard guarantee: no serialized field may contain the subscription-
// key query param. If any future refactor leaks it into the sources
// array or anywhere else in the envelope, fail the run loudly
// instead of publishing the credential.
const serialized = JSON.stringify(payload);
if (/subscription-key=/i.test(serialized)) {
throw new Error('[reexport-share] serialized payload contains subscription-key — refusing to publish');
}
return payload;
}
function validate(data) {
if (!data || typeof data !== 'object') return false;
if (data.manifestVersion !== 2) return false;
if (!data.countries || typeof data.countries !== 'object') return false;
return true;
}
export function declareRecords(data) {
return Object.keys(data?.countries ?? {}).length;
}
// Guard top-level runSeed so the module can be imported by tests without
// triggering the full fetch/publish flow. Uses the canonical
// `pathToFileURL` comparison — unambiguous across path forms (symlink,
// case-different on macOS HFS+, Windows backslash vs slash) — rather
// than the basename-suffix matching pattern used by some older seeders.
const isMain = process.argv[1] && import.meta.url === pathToFileURL(process.argv[1]).href;
if (isMain) {
runSeed('resilience', 'recovery:reexport-share', CANONICAL_KEY, fetchReexportShare, {
validateFn: validate,
ttlSeconds: CACHE_TTL_SECONDS,
sourceVersion: 'comtrade-rx-m-ratio-v2',
declareRecords,
schemaVersion: 2,
// Empty-countries is ACCEPTABLE if every cohort member omits (Phase 0
// may prune all; per-country floor may omit all). Downstream SWF
// seeder handles an empty map as "all gross imports". Not strict.
zeroIsValid: true,
maxStaleMin: 10080,
}).catch((err) => {
const cause = err.cause ? ` (cause: ${err.cause.message || err.cause.code || err.cause})` : '';
console.error('FATAL:', (err.message || err) + cause);
process.exit(1);
});
}