mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
* feat(seed): BUNDLE_RUN_STARTED_AT_MS env + runSeed SIGTERM cleanup
Prereq for the re-export-share Comtrade seeder (plan 2026-04-24-003),
usable by any cohort seeder whose consumer needs bundle-level freshness.
Two coupled changes:
1. `_bundle-runner.mjs` injects `BUNDLE_RUN_STARTED_AT_MS` into every
spawned child. All siblings in a single bundle run share one value
(captured at `runBundle` start, not spawn time). Consumers use this
to detect stale peer keys — if a peer's seed-meta predates the
current bundle run, fall back to a hard default rather than read
a cohort-peer's last-week output.
2. `_seed-utils.mjs::runSeed` registers a `process.once('SIGTERM')`
handler that releases the acquired lock and extends existing-data
TTL before exiting 143. `_bundle-runner.mjs` sends SIGTERM on
section timeout, then SIGKILL after KILL_GRACE_MS (5s). Without
this handler the `finally` path never runs on SIGKILL, leaving
the 30-min acquireLock reservation in place until its own TTL
expires — the next cron tick silently skips the resource.
Regression guard memory: `bundle-runner-sigkill-leaks-child-lock` (PR
#3128 root cause).
Tests added:
- bundle-runner env injection (value within run bounds)
- sibling sections share the same timestamp (critical for the
consumer freshness guard)
- runSeed SIGTERM path: exit 143 + cleanup log
- process.once contract: second SIGTERM does not re-enter handler
* fix(seed): address P1/P2 review findings on SIGTERM + bundle contracts
Addresses PR #3384 review findings (todos 256, 257, 259, 260):
#256 (P1) — SIGTERM handler narrowed to fetch phase only. Was installed
at runSeed entry and armed through every `process.exit` path; could
race `emptyDataIsFailure: true` strict-floor exits (IMF-External,
WB-bulk) and extend seed-meta TTL when the contract forbids it —
silently re-masking 30-day outages. Now the handler is attached
immediately before `withRetry(fetchFn)` and removed in a try/finally
that covers all fetch-phase exit branches.
#257 (P1) — `BUNDLE_RUN_STARTED_AT_MS` now has a first-class helper.
Exported `getBundleRunStartedAtMs()` from `_seed-utils.mjs` with JSDoc
describing the bundle-freshness contract. Fleet-wide helper so the
next consumer seeder imports instead of rediscovering the idiom.
#259 (P2) — SIGTERM cleanup runs `Promise.allSettled` on disjoint-key
ops (`releaseLock` + `extendExistingTtl`). Serialising compounded
Upstash latency during the exact failure mode (Redis degraded) this
handler exists to handle, risking breach of the 5s SIGKILL grace.
#260 (P2) — `_bundle-runner.mjs` asserts topological order on
optional `dependsOn` section field. Throws on unknown-label refs and
on deps appearing at a later index. Fleet-wide contract replacing
the previous prose-comment ordering guarantee.
Tests added/updated:
- New: SIGTERM handler removed after fetchFn completes (narrowed-scope
contract — post-fetch SIGTERM must NOT trigger TTL extension)
- New: dependsOn unknown-label + out-of-order + happy-path (3 tests)
Full test suite: 6,866 tests pass (+4 net).
* fix(seed): getBundleRunStartedAtMs returns null outside a bundle run
Review follow-up: the earlier `Math.floor(Date.now()/1000)*1000` fallback
regressed standalone (non-bundle) runs. A consumer seeder invoked
manually just after its peer wrote `fetchedAt = (now - 5s)` would see
`bundleStartMs = Date.now()`, reject the perfectly-fresh peer envelope
as "stale", and fall back to defaults — defeating the point of the
peer-read path outside the bundle.
Returning null when `BUNDLE_RUN_STARTED_AT_MS` is unset/invalid keeps
the freshness gate scoped to its real purpose (across-bundle-tick
staleness) and lets standalone runs skip the gate entirely. Consumers
check `bundleStartMs != null` before applying the comparison; see the
companion `seed-sovereign-wealth.mjs` change on the stacked PR.
* test(seed): SIGTERM cleanup test now verifies Redis DEL + EXPIRE calls
Greptile review P2 on PR #3384: the existing test only asserted exit
code + log line, not that the Redis ops were actually issued. The
log claim was ahead of the test.
Fixture now logs every Upstash fetch call's shape (EVAL / pipeline-
EXPIRE / other) to stderr. Test asserts:
- >=1 EVAL op was issued during SIGTERM cleanup (releaseLock Lua
script on the lock key)
- >=1 pipeline-EXPIRE op was issued (extendExistingTtl on canonical
+ seed-meta keys)
- The EVAL body carries the runSeed-generated runId (proves it's
THIS run's release, not a phantom op)
- The EXPIRE pipeline touches both the canonicalKey AND the
seed-meta key (proves the keys[] array was built correctly
including the extraKeys merge path)
Full test suite: 6,866 tests pass, typecheck clean.
* feat(resilience): Comtrade-backed re-export-share seeder + SWF Redis read
Plan ref: docs/plans/2026-04-24-003-feat-reexport-share-comtrade-seeder-plan.md
Motivating case. Before this PR, the SWF `rawMonths` denominator for
the `sovereignFiscalBuffer` dimension used GROSS annual imports for
every country. For re-export hubs (goods transiting without domestic
settlement), this structurally under-reports resilience: UAE's 2023
$941B of imports include $334B of transit flow that never represents
domestic consumption. Net imports = gross × (1 − reexport_share).
The previous (PR 3A) design flattened a hand-curated YAML into Redis;
the YAML shipped empty and never populated, so the correction never
applied and the cohort audit showed no movement.
Gap #2 (this PR). Two coupled changes to make the correction actually
apply:
1. Comtrade-backed seeder (`scripts/seed-recovery-reexport-share.mjs`).
Rewritten to fetch UN Comtrade `flowCode=RX` (re-exports) and
`flowCode=M` (imports) per cohort member, compute share = RX/M at
the latest co-populated year, clamp to [0.05, 0.95], publish the
envelope. Header auth (`Ocp-Apim-Subscription-Key`) — subscription
key never reaches URL/logs/Redis. `maxRecords=250000` cap with
truncation detection. Sequential + retry-on-429 with backoff.
Hub cohort resolved by Phase 0 empirical probe (plan §Phase 0):
['AE', 'PA']. Six candidates (SG/HK/NL/BE/MY/LT) return HTTP 200
with zero RX rows — Comtrade doesn't expose RX for those reporters.
2. SWF seeder reads from Redis (`scripts/seed-sovereign-wealth.mjs`).
Swaps `loadReexportShareByCountry()` (YAML) for
`loadReexportShareFromRedis()` (Redis key written by #1). Guarded
by bundle-run freshness: if the sibling Reexport-Share seeder's
`seed-meta` predates `BUNDLE_RUN_STARTED_AT_MS` (set by the
prereq PR's `_bundle-runner.mjs` env-injection), HARD fallback
to gross imports rather than apply last-month's stale share.
Health registries. Both new keys registered in BOTH `api/health.js`
SEED_META (60-day alert threshold) and `api/seed-health.js`
SEED_DOMAINS (43200min interval). feedback_two_health_endpoints_must_match.
Bundle wiring. `seed-bundle-resilience-recovery` Reexport-Share
timeout bumped 60s → 300s (Comtrade + retry can take 2-3 min
worst-case). Ordering preserved: Reexport-Share before Sovereign-
Wealth so the SWF seeder reads a freshly-written key in the same
cron tick.
Deletions. YAML + loader + 7 obsolete loader tests removed; single
source of truth is now Comtrade → Redis.
Prereq. Stacks on PR #3384 (feat/bundle-runner-env-sigterm)
which adds BUNDLE_RUN_STARTED_AT_MS env injection + runSeed
SIGTERM cleanup. This PR's bundle-freshness guard depends on
that env variable.
Tests (19 new, 7 deleted, +12 net):
- Pure math: parseComtradeFlowResponse, computeShareFromFlows,
clampShare, declareRecords + credential-leak source scan (15)
- Integration (Gap #2 regression guards): SWF seeder loadReexport
ShareFromRedis — fresh/absent/malformed/stale-meta/missing-meta (5)
- Health registry dual-registry drift guard — scoped to this PR's
keys, respecting pre-existing asymmetry (4)
- Bundle-ordering + timeout assertions (2)
Phase 0 cohort validation committed to plan. Full test suite
passes: 6,881 tests.
* fix(resilience): address P1/P2 review findings — adopt shared helpers, pin freshness boundary
Addresses PR #3385 review findings:
#257 (P1) consumer — `seed-sovereign-wealth.mjs` imports the shared
`getBundleRunStartedAtMs` helper from `_seed-utils.mjs` (added in the
prereq commit) instead of its own `getBundleStartMs`. Single source of
truth for the bundle-freshness contract.
#258 (P2) — `seed-recovery-reexport-share.mjs` isMain guard uses the
canonical `pathToFileURL(process.argv[1]).href === import.meta.url`
form instead of basename-suffix matching. Handles symlinks, case-
different paths on macOS HFS+, and Windows path separators without
string munging.
#260 (P2) consumer — Sovereign-Wealth declares `dependsOn:
['Reexport-Share']` in the bundle spec. `_bundle-runner.mjs` (prereq
commit) now enforces topological order on load and throws on
violation — replaces the previous prose-comment ordering contract.
#261 (P2) — added a test to `tests/seed-sovereign-wealth-reads-redis-
reexport-share.test.mts` pinning the inclusive-boundary semantic:
`fetchedAtMs === bundleStartMs` must be treated as FRESH. Guards
against a future refactor to `<=` that would silently reject peers
writing at the very first millisecond of the bundle run.
Rebased onto updated prereq. Full test suite: 6,886 tests pass (+5 net).
* fix(resilience): freshness gate skipped in standalone mode; meta still required
Review catch: the previous `bundleStartMs = Date.now()` fallback made
standalone/manual `seed-sovereign-wealth.mjs` runs ALWAYS reject any
previously-seeded re-export-share meta as "stale" — even when the
operator ran the Reexport seeder milliseconds beforehand. Defeated
the point of the peer-read path outside the bundle.
With `getBundleRunStartedAtMs()` now returning null outside a bundle
(companion commit on the prereq branch), the consumer only applies
the freshness gate when `bundleStartMs != null`. Standalone runs
accept any `fetchedAt` — the operator is responsible for ordering.
Two guards survive the change:
- Meta MUST exist (absence = peer-outage fail-safe, both modes)
- In-bundle: meta MUST be at or after `BUNDLE_RUN_STARTED_AT_MS`
Two new tests pin both modes:
- standalone: accepts meta written 10 min before this process started
- standalone: still rejects missing meta (peer-outage fail-safe
survives gate bypass)
Rebased onto updated prereq. Full test suite: 6,888 tests (+2 net).
* fix(resilience): filter world-aggregate Comtrade rows + skip final-retry sleep
Greptile review of PR #3385 flagged two P2s in the Comtrade seeder.
Finding #3 (parseComtradeFlowResponse double-count risk):
`cmdCode=TOTAL` without a partner filter currently returns only
world-aggregate rows in practice — but `parseComtradeFlowResponse`
summed every row unconditionally. A future refactor adding per-
partner querying would silently double-count (world-aggregate row +
partner-level rows for the same year), cutting the derived share in
half with no test signal.
Fix: explicit `partnerCode ∈ {'0', 0, null/undefined}` filter. Matches
current empirical behavior (aggregate-only responses) and makes the
construct robust to a future partner-level query.
Finding #4 (wasted backoff on final retry):
429 and 5xx branches slept `backoffMs` before `continue`, but on
`attempt === RETRY_MAX_ATTEMPTS` the loop condition fails immediately
after — the sleep was pure waste. Added early-return (parallel to the
existing pattern in the network-error catch branch) so the final
attempt exits the retry loop at the first non-success response
without extra latency.
Tests:
- 3 new `parseComtradeFlowResponse` variants: world-only filter,
numeric-0 partnerCode shape, rows without partnerCode field
- Existing tests updated: the double-count assertion replaced with
a "per-partner rows must NOT sum into the world-aggregate total"
assertion that pins the new contract
Rebased onto updated prereq. Full test suite: 6,890 tests (+2 net).
345 lines
14 KiB
JavaScript
345 lines
14 KiB
JavaScript
#!/usr/bin/env node
|
||
// seed-recovery-reexport-share
|
||
// ============================
|
||
//
|
||
// Publishes `resilience:recovery:reexport-share:v1` from UN Comtrade,
|
||
// computing each country's re-export-share-of-imports as a live ratio
|
||
// of `flowCode=RX` over `flowCode=M` aggregate merchandise trade.
|
||
//
|
||
// Consumed by `scripts/seed-sovereign-wealth.mjs` to convert GROSS
|
||
// annual imports into NET annual imports when computing the SWF
|
||
// `rawMonths` denominator for the `sovereignFiscalBuffer` dimension.
|
||
//
|
||
// netAnnualImports = grossAnnualImports × (1 − reexportShareOfImports)
|
||
//
|
||
// Design decisions — see plan §Phase 1 at
|
||
// `docs/plans/2026-04-24-003-feat-reexport-share-comtrade-seeder-plan.md`:
|
||
//
|
||
// - Hub cohort resolved by Phase 0 empirical RX+M co-population probe
|
||
// (see the plan's §"Phase 0 cohort validation results"). As of the
|
||
// 2026-04-24 probe: AE + PA. Six other candidates (SG, HK, NL, BE,
|
||
// MY, LT) return HTTP 200 with zero RX rows and are excluded until
|
||
// Comtrade exposes RX for those reporters.
|
||
// - Header auth (`Ocp-Apim-Subscription-Key`) — key never leaks into
|
||
// the URL → logs → Redis payload → clipboard.
|
||
// - `maxRecords=250000` cap with truncation detection: a full-cap
|
||
// response triggers per-country omission so partial data never
|
||
// under-reports the share.
|
||
// - 4-year period window (Y-1..Y-4), matching the HHI seeder PR #3372.
|
||
// - Clamps: share < 0.05 → omit (per-run discipline); share > 0.95 →
|
||
// cap at 0.95. computeNetImports requires share < 1.
|
||
// - Envelope schema v2 (bumped from manifestVersion=1 manifest flattener).
|
||
//
|
||
// Revision cadence: none — the monthly bundle cron re-seeds from Comtrade.
|
||
//
|
||
// Duplication policy: the retry-classification loop is duplicated here
|
||
// rather than extracted into a `_comtrade.mjs` helper. Per CLAUDE.md,
|
||
// duplication is cheaper than a premature abstraction — a second
|
||
// Comtrade caller in the future can extract then.
|
||
|
||
import { pathToFileURL } from 'node:url';
|
||
|
||
import { CHROME_UA, loadEnvFile, runSeed, sleep } from './_seed-utils.mjs';
|
||
|
||
loadEnvFile(import.meta.url);
|
||
|
||
const CANONICAL_KEY = 'resilience:recovery:reexport-share:v1';
|
||
// Monthly bundle cron. TTL large enough that one missed tick doesn't
|
||
// evict (the SWF seeder's bundle-freshness guard falls back to gross
|
||
// imports if seed-meta predates the current bundle run, independent
|
||
// of data-key TTL).
|
||
const CACHE_TTL_SECONDS = 35 * 24 * 3600;
|
||
|
||
const COMTRADE_URL = 'https://comtradeapi.un.org/data/v1/get/C/A/HS';
|
||
const MAX_RECORDS = 250_000;
|
||
const FETCH_TIMEOUT_MS = 45_000;
|
||
const RETRY_MAX_ATTEMPTS = 3;
|
||
const INTER_CALL_PACING_MS = 750;
|
||
|
||
// Share bounds. Floor 0.05 drops commercially-immaterial contributions
|
||
// (Panama's 1.4% observed in Phase 0). Ceiling 0.95 prevents pathological
|
||
// share=1 reporters from zeroing the denominator via computeNetImports.
|
||
const MIN_MATERIAL_SHARE = 0.05;
|
||
const MAX_SHARE_CAP = 0.95;
|
||
|
||
// Phase 0 resolved cohort — commit 2026-04-24, candidates AE, SG, HK,
|
||
// NL, BE, PA, MY, LT probed sequentially via railway run. Only AE and
|
||
// PA returned co-populated RX+M rows; see plan §"Phase 0 cohort
|
||
// validation results" for full table and HTTP status per candidate.
|
||
const REEXPORT_HUB_COHORT = [
|
||
{ iso2: 'AE', reporterCode: '784', name: 'United Arab Emirates' },
|
||
{ iso2: 'PA', reporterCode: '591', name: 'Panama' },
|
||
];
|
||
|
||
function buildPeriodYears() {
|
||
// Y-1..Y-4. Same window as the HHI seeder (PR #3372). Excludes the
|
||
// current calendar year (Comtrade lag for annual aggregates).
|
||
const now = new Date().getFullYear();
|
||
return [now - 1, now - 2, now - 3, now - 4];
|
||
}
|
||
|
||
function auditSafeSourceUrl(reporterCode, flowCode, years) {
|
||
// Belt-and-suspenders: even though header auth means the
|
||
// subscription-key never gets appended to the URL, construct the
|
||
// displayed source string WITHOUT any credential query-params. If
|
||
// a future refactor ever adds subscription-key to the URL again,
|
||
// this function strips it before it reaches the Redis envelope.
|
||
const u = new URL(COMTRADE_URL);
|
||
u.searchParams.set('reporterCode', reporterCode);
|
||
u.searchParams.set('flowCode', flowCode);
|
||
u.searchParams.set('cmdCode', 'TOTAL');
|
||
u.searchParams.set('period', years.join(','));
|
||
u.searchParams.delete('subscription-key');
|
||
return u.toString();
|
||
}
|
||
|
||
async function fetchComtradeFlow(apiKey, reporterCode, flowCode, years, { iso2 }) {
|
||
const u = new URL(COMTRADE_URL);
|
||
u.searchParams.set('reporterCode', reporterCode);
|
||
u.searchParams.set('flowCode', flowCode);
|
||
u.searchParams.set('cmdCode', 'TOTAL');
|
||
u.searchParams.set('period', years.join(','));
|
||
u.searchParams.set('maxRecords', String(MAX_RECORDS));
|
||
const urlStr = u.toString();
|
||
|
||
for (let attempt = 1; attempt <= RETRY_MAX_ATTEMPTS; attempt += 1) {
|
||
try {
|
||
const resp = await fetch(urlStr, {
|
||
headers: {
|
||
'Ocp-Apim-Subscription-Key': apiKey,
|
||
'User-Agent': CHROME_UA,
|
||
Accept: 'application/json',
|
||
},
|
||
signal: AbortSignal.timeout(FETCH_TIMEOUT_MS),
|
||
});
|
||
|
||
if (resp.status === 429) {
|
||
if (attempt === RETRY_MAX_ATTEMPTS) {
|
||
console.warn(`[reexport-share] ${iso2} ${flowCode}: 429 after ${RETRY_MAX_ATTEMPTS} attempts; omitting`);
|
||
return { rows: [], truncated: false, status: 429 };
|
||
}
|
||
const backoffMs = 2000 * attempt;
|
||
console.warn(`[reexport-share] ${iso2} ${flowCode}: 429 rate-limited, backoff ${backoffMs}ms (attempt ${attempt}/${RETRY_MAX_ATTEMPTS})`);
|
||
await sleep(backoffMs);
|
||
continue;
|
||
}
|
||
if (resp.status >= 500) {
|
||
if (attempt === RETRY_MAX_ATTEMPTS) {
|
||
console.warn(`[reexport-share] ${iso2} ${flowCode}: HTTP ${resp.status} after ${RETRY_MAX_ATTEMPTS} attempts; omitting`);
|
||
return { rows: [], truncated: false, status: resp.status };
|
||
}
|
||
const backoffMs = 5000 * attempt;
|
||
console.warn(`[reexport-share] ${iso2} ${flowCode}: HTTP ${resp.status}, backoff ${backoffMs}ms (attempt ${attempt}/${RETRY_MAX_ATTEMPTS})`);
|
||
await sleep(backoffMs);
|
||
continue;
|
||
}
|
||
if (!resp.ok) {
|
||
console.warn(`[reexport-share] ${iso2} ${flowCode}: HTTP ${resp.status}; omitting`);
|
||
return { rows: [], truncated: false, status: resp.status };
|
||
}
|
||
|
||
const json = await resp.json();
|
||
const rows = Array.isArray(json?.data) ? json.data : [];
|
||
if (rows.length >= MAX_RECORDS) {
|
||
console.warn(`[reexport-share] ${iso2} ${flowCode}: response at cap (${rows.length}>=${MAX_RECORDS}); possible truncation — omitting country`);
|
||
return { rows: [], truncated: true, status: 200 };
|
||
}
|
||
return { rows, truncated: false, status: 200 };
|
||
} catch (err) {
|
||
if (attempt === RETRY_MAX_ATTEMPTS) {
|
||
console.warn(`[reexport-share] ${iso2} ${flowCode}: exhausted retries (${err?.message || err}); omitting`);
|
||
return { rows: [], truncated: false, status: null, error: err?.message || String(err) };
|
||
}
|
||
const backoffMs = 3000 * attempt;
|
||
console.warn(`[reexport-share] ${iso2} ${flowCode}: fetch error "${err?.message || err}", backoff ${backoffMs}ms (attempt ${attempt}/${RETRY_MAX_ATTEMPTS})`);
|
||
await sleep(backoffMs);
|
||
}
|
||
}
|
||
return { rows: [], truncated: false, status: null };
|
||
}
|
||
|
||
/**
|
||
* Sum primaryValue per year from a Comtrade flow response.
|
||
* USES world-aggregate rows only (partnerCode='0' / 0 / absent) —
|
||
* this construct wants the country-total flow as a single figure, not
|
||
* a partner-level breakdown. The `cmdCode=TOTAL` query without a
|
||
* partner filter defaults to returning only world-aggregate rows in
|
||
* practice, but this filter is defensive: if a future refactor asks
|
||
* Comtrade for partner-level decomposition (e.g. to cross-check),
|
||
* summing partner rows ON TOP of the world-aggregate row would
|
||
* silently double-count and cut the derived share in half.
|
||
*
|
||
* Pure function — exported for tests.
|
||
*
|
||
* @param {Array} rows
|
||
* @returns {Map<number, number>} year → summed primaryValue in USD
|
||
*/
|
||
export function parseComtradeFlowResponse(rows) {
|
||
const byYear = new Map();
|
||
for (const r of rows) {
|
||
// Accept world-aggregate rows only: string '0', numeric 0, or
|
||
// the field absent entirely (older response shapes). Any specific
|
||
// partnerCode (e.g. '842' for US, '826' for UK) is a per-partner
|
||
// breakdown row and must be excluded to avoid double-counting
|
||
// against the world-aggregate row for the same year.
|
||
const partnerCode = r?.partnerCode;
|
||
const isWorldAggregate = partnerCode == null
|
||
|| partnerCode === '0'
|
||
|| partnerCode === 0;
|
||
if (!isWorldAggregate) continue;
|
||
|
||
const yRaw = r?.period ?? r?.refPeriodId;
|
||
const y = Number(yRaw);
|
||
const v = Number(r?.primaryValue ?? 0);
|
||
if (!Number.isInteger(y) || !Number.isFinite(v) || v <= 0) continue;
|
||
byYear.set(y, (byYear.get(y) ?? 0) + v);
|
||
}
|
||
return byYear;
|
||
}
|
||
|
||
/**
|
||
* Given per-year RX and M sums, pick the latest year where BOTH are
|
||
* populated (>0), and return the share = RX / M plus metadata.
|
||
*
|
||
* Returns null if no co-populated year exists.
|
||
*
|
||
* Pure function — exported for tests.
|
||
*
|
||
* @param {Map<number, number>} rxByYear
|
||
* @param {Map<number, number>} mByYear
|
||
* @returns {{ year: number, share: number, reexportsUsd: number, importsUsd: number } | null}
|
||
*/
|
||
export function computeShareFromFlows(rxByYear, mByYear) {
|
||
const coPopulated = [];
|
||
for (const y of rxByYear.keys()) {
|
||
if (mByYear.has(y)) coPopulated.push(y);
|
||
}
|
||
if (coPopulated.length === 0) return null;
|
||
coPopulated.sort((a, b) => b - a);
|
||
const year = coPopulated[0];
|
||
const reexportsUsd = rxByYear.get(year);
|
||
const importsUsd = mByYear.get(year);
|
||
if (!(importsUsd > 0)) return null;
|
||
const rawShare = reexportsUsd / importsUsd;
|
||
return { year, share: rawShare, reexportsUsd, importsUsd };
|
||
}
|
||
|
||
/**
|
||
* Clamp a raw share into the material-and-safe range. Returns null for
|
||
* sub-floor shares (caller omits the country); caps at MAX_SHARE_CAP
|
||
* for above-ceiling shares. Pure function — exported for tests.
|
||
*
|
||
* @param {number} rawShare
|
||
* @returns {number | null} clamped share, or null if sub-floor
|
||
*/
|
||
export function clampShare(rawShare) {
|
||
if (!Number.isFinite(rawShare) || rawShare < 0) return null;
|
||
if (rawShare < MIN_MATERIAL_SHARE) return null;
|
||
if (rawShare > MAX_SHARE_CAP) return MAX_SHARE_CAP;
|
||
return rawShare;
|
||
}
|
||
|
||
async function fetchReexportShare() {
|
||
const apiKey = (process.env.COMTRADE_API_KEYS || '').split(',').filter(Boolean)[0];
|
||
if (!apiKey) {
|
||
throw new Error('[reexport-share] COMTRADE_API_KEYS not set — cannot fetch');
|
||
}
|
||
|
||
const years = buildPeriodYears();
|
||
const countries = {};
|
||
|
||
for (const { iso2, reporterCode } of REEXPORT_HUB_COHORT) {
|
||
const mResult = await fetchComtradeFlow(apiKey, reporterCode, 'M', years, { iso2 });
|
||
await sleep(INTER_CALL_PACING_MS);
|
||
const rxResult = await fetchComtradeFlow(apiKey, reporterCode, 'RX', years, { iso2 });
|
||
await sleep(INTER_CALL_PACING_MS);
|
||
|
||
if (mResult.truncated || rxResult.truncated) {
|
||
console.warn(`[reexport-share] ${iso2}: skipping due to truncation`);
|
||
continue;
|
||
}
|
||
|
||
const mByYear = parseComtradeFlowResponse(mResult.rows);
|
||
const rxByYear = parseComtradeFlowResponse(rxResult.rows);
|
||
const picked = computeShareFromFlows(rxByYear, mByYear);
|
||
if (!picked) {
|
||
console.warn(`[reexport-share] ${iso2}: no co-populated RX+M year in window ${years.join(',')}; omitting`);
|
||
continue;
|
||
}
|
||
|
||
const clamped = clampShare(picked.share);
|
||
if (clamped == null) {
|
||
console.log(`[reexport-share] ${iso2}: raw share ${(picked.share * 100).toFixed(2)}% below floor (${MIN_MATERIAL_SHARE * 100}%) at Y=${picked.year}; omitting`);
|
||
continue;
|
||
}
|
||
|
||
countries[iso2] = {
|
||
reexportShareOfImports: clamped,
|
||
year: picked.year,
|
||
reexportsUsd: picked.reexportsUsd,
|
||
grossImportsUsd: picked.importsUsd,
|
||
source: 'comtrade',
|
||
sources: [
|
||
auditSafeSourceUrl(reporterCode, 'RX', years),
|
||
auditSafeSourceUrl(reporterCode, 'M', years),
|
||
],
|
||
};
|
||
console.log(`[reexport-share] ${iso2}: share=${(clamped * 100).toFixed(1)}% at Y=${picked.year} (RX $${(picked.reexportsUsd / 1e9).toFixed(1)}B / M $${(picked.importsUsd / 1e9).toFixed(1)}B)`);
|
||
}
|
||
|
||
const payload = {
|
||
manifestVersion: 2,
|
||
lastReviewed: new Date().toISOString().slice(0, 10),
|
||
externalReviewStatus: 'REVIEWED',
|
||
countries,
|
||
seededAt: new Date().toISOString(),
|
||
};
|
||
|
||
// Hard guarantee: no serialized field may contain the subscription-
|
||
// key query param. If any future refactor leaks it into the sources
|
||
// array or anywhere else in the envelope, fail the run loudly
|
||
// instead of publishing the credential.
|
||
const serialized = JSON.stringify(payload);
|
||
if (/subscription-key=/i.test(serialized)) {
|
||
throw new Error('[reexport-share] serialized payload contains subscription-key — refusing to publish');
|
||
}
|
||
|
||
return payload;
|
||
}
|
||
|
||
function validate(data) {
|
||
if (!data || typeof data !== 'object') return false;
|
||
if (data.manifestVersion !== 2) return false;
|
||
if (!data.countries || typeof data.countries !== 'object') return false;
|
||
return true;
|
||
}
|
||
|
||
export function declareRecords(data) {
|
||
return Object.keys(data?.countries ?? {}).length;
|
||
}
|
||
|
||
// Guard top-level runSeed so the module can be imported by tests without
|
||
// triggering the full fetch/publish flow. Uses the canonical
|
||
// `pathToFileURL` comparison — unambiguous across path forms (symlink,
|
||
// case-different on macOS HFS+, Windows backslash vs slash) — rather
|
||
// than the basename-suffix matching pattern used by some older seeders.
|
||
const isMain = process.argv[1] && import.meta.url === pathToFileURL(process.argv[1]).href;
|
||
if (isMain) {
|
||
runSeed('resilience', 'recovery:reexport-share', CANONICAL_KEY, fetchReexportShare, {
|
||
validateFn: validate,
|
||
ttlSeconds: CACHE_TTL_SECONDS,
|
||
sourceVersion: 'comtrade-rx-m-ratio-v2',
|
||
declareRecords,
|
||
schemaVersion: 2,
|
||
// Empty-countries is ACCEPTABLE if every cohort member omits (Phase 0
|
||
// may prune all; per-country floor may omit all). Downstream SWF
|
||
// seeder handles an empty map as "all gross imports". Not strict.
|
||
zeroIsValid: true,
|
||
maxStaleMin: 10080,
|
||
}).catch((err) => {
|
||
const cause = err.cause ? ` (cause: ${err.cause.message || err.cause.code || err.cause})` : '';
|
||
console.error('FATAL:', (err.message || err) + cause);
|
||
process.exit(1);
|
||
});
|
||
}
|