From 0a4eff0053930340085b62c5b3811b2b2c14daec Mon Sep 17 00:00:00 2001 From: Elie Habib Date: Mon, 20 Apr 2026 15:21:43 +0400 Subject: [PATCH] feat(portwatch): split port-activity into standalone Railway cron + restore per-country shape (#3231) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Context: PR #3225 globalised EP3 because the per-country shape was missing the section budget. Post-merge production log (2026-04-20) proved the globalisation itself was worse: 42s/page full-table scans (ArcGIS has no `date` index — confirmed via service metadata probe) AND intermittent "Invalid query parameters" on the global WHERE. Probes of outStatistics as an alternative showed it works for small countries (BRA: 19s, 103 ports) but times out server-side for heavy ones (USA: 313k historic rows, 30s+ server-compute, multiple retries returned HTTP_STATUS 000). Not a reliable path. The only shape ArcGIS reliably handles is per-country WHERE ISO3='X' AND date > Y (uses the ISO3 index). Its problem was fitting 174 countries in the 420s portwatch bundle budget — solve that by giving it its own container. Changes: - scripts/seed-portwatch-port-activity.mjs: restore per-country paginated EP3 with the accumulator shape from PR #3225 folded into the per-country loop (memory stays O(ports-per-country), not O(all-rows)). Keep every stabiliser: AbortSignal.any through fetchWithTimeout, SIGTERM handler with stage/batch/errors flush, per-country Promise.race with AbortController that actually cancels the work, eager p.catch for mid-batch error flush. - Add fetchWithRetryOnInvalidParams — single retry on the specific "Invalid query parameters" error class ArcGIS has returned intermittently in prod. Does not retry other error classes. - Bump LOCK_TTL_MS from 30 to 60 min to match the wider wall-time budget of the standalone cron. - scripts/seed-bundle-portwatch.mjs: remove PW-Port-Activity from the main portwatch bundle. Keeps PW-Disruptions (hourly), PW-Main (6h), PW-Chokepoints-Ref (weekly). - scripts/seed-bundle-portwatch-port-activity.mjs: new 1-section bundle. 540s section timeout, 570s bundle budget. Includes the full Railway service provisioning checklist in the header. - Dockerfile.seed-bundle-portwatch-port-activity: mirrors the resilience-validation pattern — node:22-alpine, full scripts/ tree copy (avoids the add-an-import-forget-to-COPY class that has bit us 3+ times), shared/ for _country-resolver. - tests/portwatch-port-activity-seed.test.mjs: rewrite assertions for the per-country shape. 54 tests pass (was 50, +4 for new assertions on the standalone bundle + Dockerfile + retry wrapper + ISO3 shape). Full test:data: 5883 pass. Typecheck + lint clean. Post-merge Railway provisioning: see header of seed-bundle-portwatch-port-activity.mjs for the 7-step checklist. --- ...erfile.seed-bundle-portwatch-port-activity | 31 ++ .../seed-bundle-portwatch-port-activity.mjs | 51 +++ scripts/seed-bundle-portwatch.mjs | 9 +- scripts/seed-portwatch-port-activity.mjs | 228 ++++++------ tests/portwatch-port-activity-seed.test.mjs | 348 ++++++++---------- 5 files changed, 370 insertions(+), 297 deletions(-) create mode 100644 Dockerfile.seed-bundle-portwatch-port-activity create mode 100644 scripts/seed-bundle-portwatch-port-activity.mjs diff --git a/Dockerfile.seed-bundle-portwatch-port-activity b/Dockerfile.seed-bundle-portwatch-port-activity new file mode 100644 index 000000000..1a683d162 --- /dev/null +++ b/Dockerfile.seed-bundle-portwatch-port-activity @@ -0,0 +1,31 @@ +# ============================================================================= +# Seed Bundle: PortWatch Port-Activity (daily cron, standalone) +# ============================================================================= +# Runs scripts/seed-bundle-portwatch-port-activity.mjs which spawns the single +# PW-Port-Activity section via _bundle-runner.mjs. Split out of the main +# portwatch bundle on 2026-04-20 because ArcGIS Daily_Ports_Data scales poorly +# at the per-country level — see the bundle script header for the full +# rationale. +# +# Runtime deps this image needs: +# - node:22-alpine (ESM, AbortSignal.any, fetch) +# - scripts/ tree (seeder + _seed-utils + _proxy-utils + _country-resolver +# + _bundle-runner + proxy helpers) +# - shared/ (country mappings that _country-resolver depends on) +# - .nvmrc? (not needed; node version baked into the base image) +# ============================================================================= + +FROM node:22-alpine + +WORKDIR /app + +# Ship the full scripts/ tree rather than cherry-picking. The cherry-picked +# approach has broken three+ seed services in the past when a local import +# was added without updating the Dockerfile (PR #3041, #3052, #3196). 2 MB of +# scripts + robustness is a good trade. +COPY scripts/ ./scripts/ +COPY shared/ ./shared/ + +ENV NODE_OPTIONS="--max-old-space-size=1024 --dns-result-order=ipv4first" + +CMD ["node", "scripts/seed-bundle-portwatch-port-activity.mjs"] diff --git a/scripts/seed-bundle-portwatch-port-activity.mjs b/scripts/seed-bundle-portwatch-port-activity.mjs new file mode 100644 index 000000000..6e1081c9f --- /dev/null +++ b/scripts/seed-bundle-portwatch-port-activity.mjs @@ -0,0 +1,51 @@ +#!/usr/bin/env node +// Standalone Railway cron service for supply_chain:portwatch-ports. +// +// Split out of seed-bundle-portwatch.mjs on 2026-04-20 because ArcGIS +// Daily_Ports_Data queries scale poorly at the N-countries level: even +// with per-country ISO3-indexed WHERE clauses + concurrency 12, wall +// time exceeded the bundle's 540s budget. Globalising the fetch (PR +// #3225) traded timeouts for a different failure mode (42s full-table +// scans + intermittent "Invalid query parameters"). Giving this seeder +// its own container decouples its worst-case runtime from the main +// portwatch bundle and lets it run on an interval appropriate to the +// ~10-day upstream dataset lag. +// +// Railway service provisioning checklist (after merge): +// 1. Create new service: portwatch-port-activity-seed +// 2. Builder: DOCKERFILE, dockerfilePath: Dockerfile.seed-bundle-portwatch-port-activity +// 3. Root directory: "" (empty) — avoids NIXPACKS auto-detection (see +// feedback_railway_dockerfile_autodetect_overrides_builder.md) +// 4. Cron schedule: "0 */24 * * *" (daily, UTC) — dataset lag means +// 12h cadence is overkill; 24h keeps us inside the freshness +// expectations downstream +// 5. Env vars (copy from existing seed services): +// UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN, +// PROXY_URL (for 429 fallback) +// 6. Watch paths (in service settings): +// scripts/seed-portwatch-port-activity.mjs, +// scripts/seed-bundle-portwatch-port-activity.mjs, +// scripts/_seed-utils.mjs, +// scripts/_proxy-utils.cjs, +// scripts/_country-resolver.mjs, +// scripts/_bundle-runner.mjs, +// Dockerfile.seed-bundle-portwatch-port-activity +// 7. Monitor first run for STALE_SEED recovery on portwatch-ports. +import { runBundle, HOUR } from './_bundle-runner.mjs'; + +await runBundle('portwatch-port-activity', [ + { + label: 'PW-Port-Activity', + script: 'seed-portwatch-port-activity.mjs', + seedMetaKey: 'supply_chain:portwatch-ports', + canonicalKey: 'supply_chain:portwatch-ports:v1:_countries', + // 12h interval gate — matches the historical cadence. Actual Railway + // cron should trigger at 24h; the interval gate prevents rapid-fire + // re-runs if someone manually retriggers mid-day. + intervalMs: 12 * HOUR, + // 540s section timeout — full budget for the one section. Bundle + // runner still SIGTERMs if the child hangs, and the seeder's + // SIGTERM handler releases the lock + extends TTLs. + timeoutMs: 540_000, + }, +], { maxBundleMs: 570_000 }); diff --git a/scripts/seed-bundle-portwatch.mjs b/scripts/seed-bundle-portwatch.mjs index 138676da0..56ac77cb7 100644 --- a/scripts/seed-bundle-portwatch.mjs +++ b/scripts/seed-bundle-portwatch.mjs @@ -1,9 +1,16 @@ #!/usr/bin/env node import { runBundle, HOUR, WEEK } from './_bundle-runner.mjs'; +// PW-Port-Activity was removed from this bundle on 2026-04-20 (see +// seed-bundle-portwatch-port-activity.mjs + Dockerfile.seed-bundle-portwatch-port-activity). +// Rationale: per-country EP3 fetches against ArcGIS consistently exceeded +// the section budget at scale, and the globalised variant (PR #3225) +// failed intermittently with "Invalid query parameters" plus 42s/page +// full-table scans. Running it in its own Railway cron with a longer +// wall-time budget decouples its worst-case runtime from the rest of the +// bundle. The three sections below are small and well-behaved. await runBundle('portwatch', [ { label: 'PW-Disruptions', script: 'seed-portwatch-disruptions.mjs', seedMetaKey: 'portwatch:disruptions', canonicalKey: 'portwatch:disruptions:active:v1', intervalMs: HOUR, timeoutMs: 120_000 }, { label: 'PW-Main', script: 'seed-portwatch.mjs', seedMetaKey: 'supply_chain:portwatch', canonicalKey: 'supply_chain:portwatch:v1', intervalMs: 6 * HOUR, timeoutMs: 300_000 }, - { label: 'PW-Port-Activity', script: 'seed-portwatch-port-activity.mjs', seedMetaKey: 'supply_chain:portwatch-ports', canonicalKey: 'supply_chain:portwatch-ports:v1:_countries', intervalMs: 12 * HOUR, timeoutMs: 420_000 }, { label: 'PW-Chokepoints-Ref', script: 'seed-portwatch-chokepoints-ref.mjs', seedMetaKey: 'portwatch:chokepoints-ref', canonicalKey: 'portwatch:chokepoints:ref:v1', intervalMs: WEEK, timeoutMs: 120_000 }, ], { maxBundleMs: 540_000 }); diff --git a/scripts/seed-portwatch-port-activity.mjs b/scripts/seed-portwatch-port-activity.mjs index 61c130542..d7705b550 100644 --- a/scripts/seed-portwatch-port-activity.mjs +++ b/scripts/seed-portwatch-port-activity.mjs @@ -20,7 +20,8 @@ export const CANONICAL_KEY = 'supply_chain:portwatch-ports:v1:_countries'; const KEY_PREFIX = 'supply_chain:portwatch-ports:v1:'; const META_KEY = 'seed-meta:supply_chain:portwatch-ports'; const LOCK_DOMAIN = 'supply_chain:portwatch-ports'; -const LOCK_TTL_MS = 30 * 60 * 1000; // 30 min — covers worst-case full run +// 60 min — covers the widest realistic run of this standalone service. +const LOCK_TTL_MS = 60 * 60 * 1000; const TTL = 259_200; // 3 days — 6× the 12h cron interval const MIN_VALID_COUNTRIES = 50; @@ -33,7 +34,14 @@ const PAGE_SIZE = 2000; const FETCH_TIMEOUT = 45_000; const HISTORY_DAYS = 90; const MAX_PORTS_PER_COUNTRY = 50; -const ACTIVITY_LOG_EVERY = 20; + +// Per-country budget. ArcGIS's ISO3 index makes per-country fetches O(rows-in-country), +// which is fine for most countries but heavy ones (USA ~313k historic rows, CHN/IND/RUS +// similar) can push 60-90s when the server is under load. Promise.allSettled would +// otherwise wait for the slowest, stalling the whole batch. +const PER_COUNTRY_TIMEOUT_MS = 90_000; +const CONCURRENCY = 12; +const BATCH_LOG_EVERY = 5; function epochToTimestamp(epochMs) { const d = new Date(epochMs); @@ -42,12 +50,8 @@ function epochToTimestamp(epochMs) { } async function fetchWithTimeout(url, { signal } = {}) { - // Combine the per-call FETCH_TIMEOUT with the upstream per-country signal - // so a per-country abort propagates into the in-flight fetch AND future - // pagination iterations (review feedback P1 on PR #3222). Without this, - // the 90s withPerCountryTimeout timer fires, the batch moves on, but the - // orphaned country keeps paginating with fresh 45s fetch timeouts — - // breaking the CONCURRENCY=12 cap and amplifying ArcGIS throttling. + // Combine the per-call FETCH_TIMEOUT with the upstream caller signal so an + // abort propagates into the in-flight fetch AND future pagination iterations. const combined = signal ? AbortSignal.any([signal, AbortSignal.timeout(FETCH_TIMEOUT)]) : AbortSignal.timeout(FETCH_TIMEOUT); @@ -59,11 +63,6 @@ async function fetchWithTimeout(url, { signal } = {}) { const proxyAuth = resolveProxyForConnect(); if (!proxyAuth) throw new Error(`ArcGIS HTTP 429 (rate limited) for ${url.slice(0, 80)}`); console.warn(` [portwatch] 429 rate-limited — retrying via proxy: ${url.slice(0, 80)}`); - // Pass the caller signal so a per-country abort also cancels the proxy - // fallback path (review feedback on PR #3222). Without this, a timed-out - // country could keep a proxy CONNECT tunnel + request alive for another - // 45s after the batch moved on, re-creating the orphan-work problem - // under the exact throttling scenario this PR addresses. const { buffer } = await httpsProxyFetchRaw(url, proxyAuth, { accept: 'application/json', timeoutMs: FETCH_TIMEOUT, signal }); const proxied = JSON.parse(buffer.toString('utf8')); if (proxied.error) throw new Error(`ArcGIS error (via proxy): ${proxied.error.message}`); @@ -75,14 +74,27 @@ async function fetchWithTimeout(url, { signal } = {}) { return body; } +// ArcGIS's Daily_Ports_Data FeatureServer intermittently returns "Cannot +// perform query. Invalid query parameters." for otherwise-valid queries — +// observed in prod 2026-04-20 for BRA/IDN/NGA on per-country WHERE, and +// also for the global WHERE after the PR #3225 rollout. A single retry with +// a short back-off clears it in practice. No retry loop — one attempt +// bounded. Does not retry any other error class. +async function fetchWithRetryOnInvalidParams(url, { signal } = {}) { + try { + return await fetchWithTimeout(url, { signal }); + } catch (err) { + const msg = err?.message || ''; + if (!/Invalid query parameters/i.test(msg)) throw err; + await new Promise((r) => setTimeout(r, 500)); + if (signal?.aborted) throw signal.reason ?? err; + console.warn(` [port-activity] retrying after "${msg}": ${url.slice(0, 80)}`); + return await fetchWithTimeout(url, { signal }); + } +} + // Fetch ALL ports globally in one paginated pass, grouped by ISO3. -// Replaces 240× per-country queries with a handful of pages. Returns -// Map>. -// -// IMPORTANT: ArcGIS FeatureServer can cap responses below the requested -// resultRecordCount (PortWatch_ports_database caps at 1000 despite -// PAGE_SIZE=2000). Advancing by PAGE_SIZE silently skips the rows between -// the server cap and PAGE_SIZE. Advance by the actual features.length. +// ArcGIS server-cap: advance by actual features.length, never PAGE_SIZE. async function fetchAllPortRefs({ signal } = {}) { const byIso3 = new Map(); let offset = 0; @@ -101,7 +113,7 @@ async function fetchAllPortRefs({ signal } = {}) { outSR: '4326', f: 'json', }); - body = await fetchWithTimeout(`${EP4_BASE}?${params}`, { signal }); + body = await fetchWithRetryOnInvalidParams(`${EP4_BASE}?${params}`, { signal }); const features = body.features ?? []; for (const f of features) { const a = f.attributes; @@ -113,50 +125,34 @@ async function fetchAllPortRefs({ signal } = {}) { ports.set(portId, { lat: Number(a.lat ?? 0), lon: Number(a.lon ?? 0) }); } console.log(` [port-activity] ref page ${page}: +${features.length} ports (${byIso3.size} countries so far)`); - if (features.length === 0) break; // defensive: ETL=true + 0 features would infinite-loop + if (features.length === 0) break; offset += features.length; } while (body.exceededTransferLimit); return byIso3; } -// Stream-aggregate ALL activity rows into per-port running counters. -// Replaces 174× per-country WHERE=ISO3 round-trips (which hit ~90s each at -// concurrency 12, far exceeding the 420s section budget even when none of -// them hung) with a single sequential loop of ~150-200 pages that completes -// comfortably inside the section budget. Also eliminates the `Invalid query -// parameters` errors we saw in prod for BRA/IDN/NGA on the per-country -// filter: the global WHERE has no ISO3 equality, so those failure modes -// disappear. +// Fetch ONE country's activity rows, streaming into per-port accumulators. +// ArcGIS's ISO3 index makes this cheap for most countries (~3-9s typical). +// Heavy countries (USA/CHN/etc.) can be 30-60s because 90 days × their many +// ports = thousands of rows across multiple pages. Hence the per-country +// timeout + single retry. // -// Memory: each page's features are folded into Map> -// and discarded. We never materialise the full 180k+ rows at once; only -// ~2000 accumulators (≈100 bytes each = ~200KB) live across pages. Review -// feedback on PR #3225 flagged the prior shape (Map) as an -// OOM risk on the 1GB Railway container — this addresses it. -// -// Returns Map>. Aborts between pages when -// signal.aborted is set. -async function fetchAndAggregateActivity(since, { signal, progress } = {}) { +// Returns Map — same shape `finalisePortsForCountry` +// consumes. Memory per country is O(unique ports for that country) ≈ <200. +async function fetchCountryAccum(iso3, since, { signal } = {}) { const now = Date.now(); const cutoff30 = now - 30 * 86400000; const cutoff60 = now - 60 * 86400000; const cutoff7 = now - 7 * 86400000; - const accumByIso3 = new Map(); + const portAccumMap = new Map(); let offset = 0; let body; - let page = 0; - const t0 = Date.now(); - let totalRows = 0; - do { if (signal?.aborted) throw signal.reason ?? new Error('aborted'); const params = new URLSearchParams({ - where: `date > ${epochToTimestamp(since)}`, + where: `ISO3='${iso3}' AND date > ${epochToTimestamp(since)}`, outFields: 'portid,portname,ISO3,date,portcalls_tanker,import_tanker,export_tanker', - // ArcGIS returns geometry by default (~100-200KB per page). We only - // need attributes — skip geometry to shave tens of MB off the wire - // across ~150-200 pages on the perf-critical path (PR #3225 review). returnGeometry: 'false', orderByFields: 'portid ASC,date ASC', resultRecordCount: String(PAGE_SIZE), @@ -164,38 +160,28 @@ async function fetchAndAggregateActivity(since, { signal, progress } = {}) { outSR: '4326', f: 'json', }); - body = await fetchWithTimeout(`${EP3_BASE}?${params}`, { signal }); + body = await fetchWithRetryOnInvalidParams(`${EP3_BASE}?${params}`, { signal }); const features = body.features ?? []; for (const f of features) { const a = f.attributes; - if (!a || a.portid == null || !a.ISO3 || a.date == null) continue; - const iso3 = String(a.ISO3); + if (!a || a.portid == null || a.date == null) continue; const portId = String(a.portid); - // ArcGIS changed date field to esriFieldTypeDateOnly — returns ISO - // string "YYYY-MM-DD", not epoch ms. Same parse as the prior per-row - // code, just done inline here so we can fold without keeping the row. + // ArcGIS date is esriFieldTypeDateOnly → "YYYY-MM-DD" string (or epoch ms). const date = typeof a.date === 'number' ? a.date : Date.parse(a.date + 'T12:00:00Z'); const calls = Number(a.portcalls_tanker ?? 0); const imports = Number(a.import_tanker ?? 0); const exports_ = Number(a.export_tanker ?? 0); - let countryMap = accumByIso3.get(iso3); - if (!countryMap) { countryMap = new Map(); accumByIso3.set(iso3, countryMap); } - - let acc = countryMap.get(portId); + let acc = portAccumMap.get(portId); if (!acc) { - // First time we see this port — capture its name. Rows arrive in - // (portid ASC, date ASC) order, so this matches the old behaviour - // where `rows[0].portname` was the earliest row's portname. acc = { portname: String(a.portname || ''), last30_calls: 0, last30_count: 0, last30_import: 0, last30_export: 0, prev30_calls: 0, last7_calls: 0, last7_count: 0, }; - countryMap.set(portId, acc); + portAccumMap.set(portId, acc); } - if (date >= cutoff30) { acc.last30_calls += calls; acc.last30_count += 1; @@ -209,21 +195,10 @@ async function fetchAndAggregateActivity(since, { signal, progress } = {}) { acc.prev30_calls += calls; } } - page++; - totalRows += features.length; - if (progress) { - progress.pages = page; - progress.countries = accumByIso3.size; - } - if (page === 1 || page % ACTIVITY_LOG_EVERY === 0) { - const elapsed = ((Date.now() - t0) / 1000).toFixed(1); - console.log(` [port-activity] activity page ${page}: +${features.length} rows (${accumByIso3.size} countries, ${totalRows} total rows, ${elapsed}s)`); - } if (features.length === 0) break; offset += features.length; } while (body.exceededTransferLimit); - console.log(` [port-activity] Activity folded: ${page} pages, ${totalRows} rows, ${accumByIso3.size} countries (${((Date.now() - t0) / 1000).toFixed(1)}s)`); - return accumByIso3; + return portAccumMap; } export function finalisePortsForCountry(portAccumMap, refMap) { @@ -253,6 +228,25 @@ export function finalisePortsForCountry(portAccumMap, refMap) { .slice(0, MAX_PORTS_PER_COUNTRY); } +// Runs `doWork(signal)` but rejects if the per-country timer fires first, +// aborting the controller so the in-flight fetch (and its pagination loop) +// actually stops instead of orphaning. Keeps the CONCURRENCY cap real. +// Exported with an injectable timeoutMs so runtime tests can exercise the +// abort path at 40ms instead of the production 90s. +export function withPerCountryTimeout(doWork, iso3, timeoutMs = PER_COUNTRY_TIMEOUT_MS) { + const controller = new AbortController(); + let timer; + const guard = new Promise((_, reject) => { + timer = setTimeout(() => { + const err = new Error(`per-country timeout after ${timeoutMs / 1000}s (${iso3})`); + try { controller.abort(err); } catch {} + reject(err); + }, timeoutMs); + }); + const work = doWork(controller.signal); + return Promise.race([work, guard]).finally(() => clearTimeout(timer)); +} + async function redisPipeline(commands) { const { url, token } = getRedisCredentials(); const resp = await fetch(`${url}/pipeline`, { @@ -272,7 +266,7 @@ async function redisPipeline(commands) { // Returns { countries: string[], countryData: Map, fetchedAt: string }. // // `progress` (optional) is mutated in-place so a SIGTERM handler in main() -// can report which stage was running and how far into it we got. +// can report which batch / country we died on. export async function fetchAll(progress, { signal } = {}) { const { iso3ToIso2 } = createCountryResolvers(); const since = Date.now() - HISTORY_DAYS * 86400000; @@ -284,27 +278,54 @@ export async function fetchAll(progress, { signal } = {}) { console.log(` [port-activity] Refs loaded: ${refsByIso3.size} countries with ports (${((Date.now() - t0) / 1000).toFixed(1)}s)`); if (progress) progress.stage = 'activity'; - console.log(` [port-activity] Fetching + aggregating global activity (${HISTORY_DAYS}d history, EP3)...`); - const accumByIso3 = await fetchAndAggregateActivity(since, { signal, progress }); - - if (progress) progress.stage = 'compute'; const eligibleIso3 = [...refsByIso3.keys()].filter(iso3 => iso3ToIso2.has(iso3)); const skipped = refsByIso3.size - eligibleIso3.length; - console.log(` [port-activity] Finalising ports for ${eligibleIso3.length} eligible countries (skipping ${skipped} unmapped iso3)`); + const batches = Math.ceil(eligibleIso3.length / CONCURRENCY); + if (progress) progress.totalBatches = batches; + console.log(` [port-activity] Activity queue: ${eligibleIso3.length} countries (skipping ${skipped} unmapped iso3, concurrency ${CONCURRENCY}, per-country cap ${PER_COUNTRY_TIMEOUT_MS / 1000}s)`); const countryData = new Map(); - let missingActivity = 0; - for (const iso3 of eligibleIso3) { - const accum = accumByIso3.get(iso3); - if (!accum || accum.size === 0) { missingActivity++; continue; } - const ports = finalisePortsForCountry(accum, refsByIso3.get(iso3)); - if (!ports.length) continue; - const iso2 = iso3ToIso2.get(iso3); - countryData.set(iso2, { iso2, ports, fetchedAt: new Date().toISOString() }); + const errors = progress?.errors ?? []; + const activityStart = Date.now(); + + for (let i = 0; i < eligibleIso3.length; i += CONCURRENCY) { + const batch = eligibleIso3.slice(i, i + CONCURRENCY); + const batchIdx = Math.floor(i / CONCURRENCY) + 1; + if (progress) progress.batchIdx = batchIdx; + + const promises = batch.map(iso3 => { + const p = withPerCountryTimeout( + (childSignal) => fetchCountryAccum(iso3, since, { signal: childSignal }), + iso3, + ); + // Eager error flush so a SIGTERM mid-batch captures rejections that + // have already fired, not only those that settled after allSettled. + p.catch(err => errors.push(`${iso3}: ${err?.message || err}`)); + return p; + }); + const settled = await Promise.allSettled(promises); + + for (let j = 0; j < batch.length; j++) { + const iso3 = batch[j]; + const outcome = settled[j]; + if (outcome.status === 'rejected') continue; // already recorded via .catch + const portAccumMap = outcome.value; + if (!portAccumMap || portAccumMap.size === 0) continue; + const ports = finalisePortsForCountry(portAccumMap, refsByIso3.get(iso3)); + if (!ports.length) continue; + const iso2 = iso3ToIso2.get(iso3); + countryData.set(iso2, { iso2, ports, fetchedAt: new Date().toISOString() }); + } + + if (progress) progress.seeded = countryData.size; + if (batchIdx === 1 || batchIdx % BATCH_LOG_EVERY === 0 || batchIdx === batches) { + const elapsed = ((Date.now() - activityStart) / 1000).toFixed(1); + console.log(` [port-activity] batch ${batchIdx}/${batches}: ${countryData.size} countries seeded, ${errors.length} errors (${elapsed}s)`); + } } - if (missingActivity > 0) { - console.log(` [port-activity] ${missingActivity} eligible countries had no activity rows in the global dataset`); + if (errors.length) { + console.warn(` [port-activity] ${errors.length} country errors: ${errors.slice(0, 5).join('; ')}${errors.length > 5 ? ' ...' : ''}`); } if (countryData.size === 0) throw new Error('No country port data returned from ArcGIS'); @@ -334,27 +355,26 @@ async function main() { let prevCountryKeys = []; let prevCount = 0; - // Mutated in-place by fetchAll() so the SIGTERM handler can report which - // stage was running and how far into the global paginator we got. - const progress = { stage: 'starting', pages: 0, countries: 0 }; + // Shared progress object so the SIGTERM handler can report which batch / + // stage we died in and what per-country errors have fired so far. + const progress = { stage: 'starting', batchIdx: 0, totalBatches: 0, seeded: 0, errors: [] }; - // AbortController plumbed through fetchAll → fetchAllActivityRows → - // fetchWithTimeout → _proxy-utils so a SIGTERM kill (or bundle-runner - // grace-window escalation) actually stops any in-flight HTTP work - // instead of leaving orphan requests running into the SIGKILL. + // AbortController threaded through fetchAll → fetchCountryAccum → fetchWithTimeout + // → _proxy-utils so a SIGTERM kill (or bundle-runner grace-window escalation) + // actually stops any in-flight HTTP work. const shutdownController = new AbortController(); - // Bundle-runner SIGKILLs via SIGTERM → SIGKILL on timeout. Release the lock - // and extend existing TTLs synchronously(ish) so the next cron tick isn't - // blocked for up to 30 min and the Redis snapshot doesn't evaporate. let sigHandled = false; const onSigterm = async () => { if (sigHandled) return; sigHandled = true; try { shutdownController.abort(new Error('SIGTERM')); } catch {} console.error( - ` [port-activity] SIGTERM during stage=${progress.stage} (pages=${progress.pages}, countries=${progress.countries})`, + ` [port-activity] SIGTERM at batch ${progress.batchIdx}/${progress.totalBatches} (stage=${progress.stage}) — ${progress.seeded} seeded, ${progress.errors.length} errors`, ); + if (progress.errors.length) { + console.error(` [port-activity] First errors: ${progress.errors.slice(0, 10).join('; ')}`); + } console.error(' [port-activity] Releasing lock + extending TTLs'); try { await extendExistingTtl([CANONICAL_KEY, META_KEY, ...prevCountryKeys], TTL); @@ -366,7 +386,6 @@ async function main() { process.on('SIGINT', onSigterm); try { - // Read previous snapshot first — needed for both degradation guard and error TTL extension. const prevIso2List = await readSeedSnapshot(CANONICAL_KEY).catch(() => null); prevCountryKeys = Array.isArray(prevIso2List) ? prevIso2List.map(iso2 => `${KEY_PREFIX}${iso2}`) : []; prevCount = Array.isArray(prevIso2List) ? prevIso2List.length : 0; @@ -382,9 +401,6 @@ async function main() { return; } - // Degradation guard: refuse to replace a healthy snapshot that is significantly smaller. - // Transient ArcGIS outages cause per-country fetches to fail via Promise.allSettled() without - // throwing — publishin a 50-country result over a 120-country snapshot silently drops 70 countries. if (prevCount > 0 && countryData.size < prevCount * 0.8) { console.error(` DEGRADATION GUARD: ${countryData.size} countries vs ${prevCount} previous — refusing to overwrite (need ≥${Math.ceil(prevCount * 0.8)})`); await extendExistingTtl([CANONICAL_KEY, META_KEY, ...prevCountryKeys], TTL).catch(() => {}); diff --git a/tests/portwatch-port-activity-seed.test.mjs b/tests/portwatch-port-activity-seed.test.mjs index 71b432a75..5bf1f7c3b 100644 --- a/tests/portwatch-port-activity-seed.test.mjs +++ b/tests/portwatch-port-activity-seed.test.mjs @@ -9,8 +9,9 @@ const __dirname = dirname(fileURLToPath(import.meta.url)); const root = resolve(__dirname, '..'); const src = readFileSync(resolve(root, 'scripts/seed-portwatch-port-activity.mjs'), 'utf-8'); -const seedUtilsSrc = readFileSync(resolve(root, 'scripts/_seed-utils.mjs'), 'utf-8'); -const proxyUtilsSrc = readFileSync(resolve(root, 'scripts/_proxy-utils.cjs'), 'utf-8'); +const bundleSrc = readFileSync(resolve(root, 'scripts/seed-bundle-portwatch-port-activity.mjs'), 'utf-8'); +const mainBundleSrc = readFileSync(resolve(root, 'scripts/seed-bundle-portwatch.mjs'), 'utf-8'); +const dockerfileSrc = readFileSync(resolve(root, 'Dockerfile.seed-bundle-portwatch-port-activity'), 'utf-8'); // ── seeder source assertions ────────────────────────────────────────────────── @@ -23,12 +24,16 @@ describe('seed-portwatch-port-activity.mjs exports', () => { assert.match(src, /export\s+function\s+validateFn/); }); - it('CANONICAL_KEY is supply_chain:portwatch-ports:v1:_countries', () => { - assert.match(src, /supply_chain:portwatch-ports:v1:_countries/); + it('exports withPerCountryTimeout', () => { + assert.match(src, /export\s+function\s+withPerCountryTimeout/); }); - it('KEY_PREFIX is supply_chain:portwatch-ports:v1:', () => { - assert.match(src, /supply_chain:portwatch-ports:v1:/); + it('exports finalisePortsForCountry', () => { + assert.match(src, /export\s+function\s+finalisePortsForCountry/); + }); + + it('CANONICAL_KEY is supply_chain:portwatch-ports:v1:_countries', () => { + assert.match(src, /supply_chain:portwatch-ports:v1:_countries/); }); it('Endpoint 3 URL contains Daily_Ports_Data', () => { @@ -39,135 +44,96 @@ describe('seed-portwatch-port-activity.mjs exports', () => { assert.match(src, /PortWatch_ports_database/); }); - it('date filter uses epochToTimestamp', () => { - assert.match(src, /epochToTimestamp/); + it('EP3 per-country WHERE uses ISO3 index + date filter', () => { + // After the PR #3225 globalisation failed in prod, we restored the + // per-country shape because ArcGIS has an ISO3 index but NO date + // index — the per-country filter is what keeps queries fast. + assert.match(src, /where:\s*`ISO3='\$\{iso3\}'\s+AND\s+date\s*>/); + // Global where=date>X shape must NOT be present any more. + assert.doesNotMatch(src, /where:\s*`date\s*>\s*\$\{epochToTimestamp\(since\)\}`/); }); - it('Endpoint 3 pagination loop checks body.exceededTransferLimit', () => { - assert.match(src, /body\.exceededTransferLimit/); - }); - - it('Endpoint 4 query fetches all ports globally with where=1=1', () => { - assert.match(src, /PortWatch_ports_database/); + it('EP4 refs query fetches all ports globally with where=1=1', () => { assert.match(src, /where:\s*'1=1'/); assert.match(src, /outFields:\s*'portid,ISO3,lat,lon'/); }); - it('both paginators set returnGeometry:false to avoid wasted wire bandwidth', () => { - // ArcGIS returns geometry by default (~100-200KB per page). Omitting - // this in the EP3 paginator across ~150-200 pages adds tens of MB to - // the perf-critical path. Review feedback on PR #3225. + it('both paginators set returnGeometry:false', () => { const matches = src.match(/returnGeometry:\s*'false'/g) ?? []; - assert.ok(matches.length >= 2, `expected returnGeometry:'false' in both EP3 and EP4 paginators, found ${matches.length}`); - }); - - it('fetchAllPortRefs accepts + forwards signal for SIGTERM cancellation', () => { - // Review feedback on PR #3225: during the 'refs' stage a SIGTERM must - // cancel in-flight EP4 fetches, not let them run up to FETCH_TIMEOUT - // after the handler fires. The signal must thread through the - // paginator into fetchWithTimeout. - assert.match(src, /async function fetchAllPortRefs\(\{\s*signal\s*\}\s*=\s*\{\}\)/); - assert.match(src, /fetchWithTimeout\(`\$\{EP4_BASE\}\?\$\{params\}`,\s*\{\s*signal\s*\}\)/); - // fetchAll must pass the signal when calling it. - assert.match(src, /fetchAllPortRefs\(\{\s*signal\s*\}\)/); - }); - - it('Endpoint 3 activity query is globalised — no per-country ISO3 filter', () => { - // The per-country `WHERE ISO3='XX' AND date > ...` shape is gone; the - // globalised paginator uses a single date filter and groups by ISO3 in - // memory. This eliminates the 174-per-country round-trip cost that - // blew the 420s section budget even when every country was fast, and - // also removes the `Invalid query parameters` errors that hit - // BRA/IDN/NGA under the per-country shape. - assert.doesNotMatch(src, /where:\s*`ISO3=/); - assert.match(src, /where:\s*`date\s*>\s*\$\{epochToTimestamp\(since\)\}`/); - }); - - it('streams EP3 into per-port accumulators, not a flat rows array', () => { - // Review feedback on PR #3225: materialising the full 90d activity - // dataset as Map holds ~180k feature objects at once - // (~70MB) on a 1GB Railway container. The aggregator now folds each - // page into Map> (~200KB) and discards - // the rows. Assert both the rename and the accumulator shape. - assert.match(src, /async function fetchAndAggregateActivity/); - assert.doesNotMatch(src, /async function fetchAllActivityRows/); - // Accumulator fields (at least the key ones we rely on downstream). - assert.match(src, /last30_calls:\s*0/); - assert.match(src, /last30_count:\s*0/); - assert.match(src, /prev30_calls:\s*0/); - assert.match(src, /last7_calls:\s*0/); - // No more flat rows array collected per country. - assert.doesNotMatch(src, /byIso3\.set\(key,\s*list\)/); - }); - - it('finalisePortsForCountry emits top-N ports from accumulators', () => { - assert.match(src, /function finalisePortsForCountry\(portAccumMap,\s*refMap\)/); - assert.match(src, /MAX_PORTS_PER_COUNTRY/); - // Same anomaly/trend formula as the old per-row code. - assert.match(src, /avg7d\s*<\s*avg30d\s*\*\s*0\.5/); - }); - - it('registers SIGTERM handler for graceful shutdown', () => { - assert.match(src, /process\.on\('SIGTERM'/); - }); - - it('SIGTERM handler aborts shutdownController + logs stage/pages/countries', () => { - // Per-country batching is gone, but the SIGTERM path still must (a) - // abort the in-flight global paginator via the shared controller, and - // (b) emit a forensic line identifying which stage we died in. - assert.match(src, /shutdownController\.abort\(new Error\('SIGTERM'\)\)/); - assert.match(src, /SIGTERM during stage=\$\{progress\.stage\}/); - assert.match(src, /pages=\$\{progress\.pages\},\s*countries=\$\{progress\.countries\}/); - }); - - it('fetchAll accepts progress + { signal } and mutates progress.stage', () => { - assert.match(src, /export async function fetchAll\(progress,\s*\{\s*signal\s*\}\s*=\s*\{\}\)/); - assert.match(src, /progress\.stage\s*=\s*'refs'/); - assert.match(src, /progress\.stage\s*=\s*'activity'/); - assert.match(src, /progress\.stage\s*=\s*'compute'/); - }); - - it('fetchAndAggregateActivity updates progress.pages + progress.countries', () => { - assert.match(src, /progress\.pages\s*=\s*page/); - assert.match(src, /progress\.countries\s*=\s*accumByIso3\.size/); + assert.ok(matches.length >= 2, `expected returnGeometry:'false' in both paginators, found ${matches.length}`); }); it('fetchWithTimeout combines caller signal with FETCH_TIMEOUT via AbortSignal.any', () => { - // Still needed so a shutdown-controller abort propagates into the - // in-flight fetch instead of orphaning it for up to 45s. assert.match(src, /AbortSignal\.any\(\[signal,\s*AbortSignal\.timeout\(FETCH_TIMEOUT\)\]\)/); }); - it('fetchAndAggregateActivity checks signal.aborted between pages', () => { - assert.match(src, /signal\?\.aborted\)\s*throw\s+signal\.reason/); + it('paginators check signal.aborted between pages', () => { + // Both refs + activity paginators must exit fast on abort. + const matches = src.match(/signal\?\.aborted\)\s*throw\s+signal\.reason/g) ?? []; + assert.ok(matches.length >= 2, `expected signal.aborted checks in both paginators, found ${matches.length}`); }); - it('429 proxy fallback threads caller signal into httpsProxyFetchRaw', () => { - assert.match(src, /httpsProxyFetchRaw\(url,\s*proxyAuth,\s*\{[^}]*signal\s*\}/s); + it('defines fetchWithRetryOnInvalidParams — single retry on transient ArcGIS error', () => { + // Prod log 2026-04-20 showed ArcGIS returning "Cannot perform query. + // Invalid query parameters." for otherwise-valid queries (BRA/IDN/NGA + // on per-country; also the global WHERE). One retry clears it. + assert.match(src, /async function fetchWithRetryOnInvalidParams/); + assert.match(src, /Invalid query parameters/); + // Must NOT retry other error classes. + assert.match(src, /if\s*\(!\/Invalid query parameters\/i\.test\(msg\)\)\s*throw\s+err/); }); - it('httpsProxyFetchRaw accepts and forwards signal', () => { - assert.match(seedUtilsSrc, /httpsProxyFetchRaw\(url,\s*proxyAuth,\s*\{[^}]*signal\s*\}/s); - assert.match(seedUtilsSrc, /proxyFetch\(url,\s*proxyConfig,\s*\{[^}]*signal[^}]*\}/s); + it('both EP3 + EP4 paginators route through fetchWithRetryOnInvalidParams', () => { + const matches = src.match(/fetchWithRetryOnInvalidParams\(/g) ?? []; + // Called in: fetchAllPortRefs (EP4), fetchCountryAccum (EP3). 2+ usages. + assert.ok(matches.length >= 2, `expected retry wrapper used by both paginators, found ${matches.length}`); }); - it('proxyFetch + proxyConnectTunnel accept signal and bail early if aborted', () => { - assert.match(proxyUtilsSrc, /function proxyFetch\([\s\S]*?\bsignal,?\s*\}\s*=\s*\{\}/); - assert.match(proxyUtilsSrc, /function proxyConnectTunnel\([\s\S]*?\bsignal\s*\}\s*=\s*\{\}/); - assert.match(proxyUtilsSrc, /signal && signal\.aborted/); - assert.match(proxyUtilsSrc, /signal\.addEventListener\('abort'/); + it('CONCURRENCY is 12 and PER_COUNTRY_TIMEOUT_MS is 90s', () => { + assert.match(src, /CONCURRENCY\s*=\s*12/); + assert.match(src, /PER_COUNTRY_TIMEOUT_MS\s*=\s*90_000/); + }); + + it('batch loop wires eager .catch for mid-batch SIGTERM diagnostics', () => { + assert.match(src, /p\.catch\(err\s*=>\s*errors\.push/); + }); + + it('withPerCountryTimeout aborts the controller when timer fires', () => { + // Abort propagation must be real — not just a Promise.race that lets + // the inner work keep running (PR #3222 review P1). + assert.match(src, /controller\.abort\(err\)/); + }); + + it('fetchCountryAccum returns per-port accumulators, not raw rows', () => { + assert.match(src, /async function fetchCountryAccum/); + assert.match(src, /last30_calls:\s*0/); + assert.match(src, /prev30_calls:\s*0/); + assert.match(src, /last7_calls:\s*0/); + }); + + it('registers SIGTERM + SIGINT + aborts shutdownController', () => { + assert.match(src, /process\.on\('SIGTERM'/); + assert.match(src, /process\.on\('SIGINT'/); + assert.match(src, /shutdownController\.abort\(new Error\('SIGTERM'\)\)/); + }); + + it('SIGTERM handler logs batch + stage + seeded + first errors', () => { + assert.match(src, /SIGTERM at batch \$\{progress\.batchIdx\}\/\$\{progress\.totalBatches\}/); + assert.match(src, /progress\.errors\.slice\(0,\s*10\)/); }); it('pagination advances by actual features.length, not PAGE_SIZE', () => { - // ArcGIS PortWatch_ports_database caps responses at 1000 rows even when - // resultRecordCount=2000. Advancing by PAGE_SIZE skips rows 1000-1999. - // Guard: no 'offset += PAGE_SIZE' anywhere in the file, both loops use - // 'offset += features.length'. assert.doesNotMatch(src, /offset\s*\+=\s*PAGE_SIZE/); const matches = src.match(/offset\s*\+=\s*features\.length/g) ?? []; assert.ok(matches.length >= 2, `expected both paginators to advance by features.length, found ${matches.length}`); }); + it('LOCK_TTL_MS is 60 min', () => { + // Bumped from 30 → 60 min when this moved to its own Railway cron with + // a bigger wall-time budget. + assert.match(src, /LOCK_TTL_MS\s*=\s*60\s*\*\s*60\s*\*\s*1000/); + }); + it('anomalySignal computation is present', () => { assert.match(src, /anomalySignal/); }); @@ -196,16 +162,40 @@ describe('ArcGIS 429 proxy fallback', () => { assert.match(src, /resp\.status\s*===\s*429/); }); - it('calls resolveProxyForConnect() on 429', () => { - assert.match(src, /resolveProxyForConnect\(\)/); + it('429 proxy fallback threads caller signal', () => { + assert.match(src, /httpsProxyFetchRaw\(url,\s*proxyAuth,\s*\{[^}]*signal\s*\}/s); + }); +}); + +// ── standalone bundle + Dockerfile assertions ──────────────────────────────── + +describe('standalone Railway cron split', () => { + it('main portwatch bundle NO LONGER contains PW-Port-Activity', () => { + assert.doesNotMatch(mainBundleSrc, /label:\s*'PW-Port-Activity'/); + assert.doesNotMatch(mainBundleSrc, /seed-portwatch-port-activity\.mjs/); }); - it('calls httpsProxyFetchRaw with proxy auth on 429', () => { - assert.match(src, /httpsProxyFetchRaw\(url,\s*proxyAuth/); + it('new dedicated bundle script exists and references the seeder', () => { + assert.match(bundleSrc, /seed-portwatch-port-activity\.mjs/); + assert.match(bundleSrc, /runBundle\('portwatch-port-activity'/); + assert.match(bundleSrc, /label:\s*'PW-Port-Activity'/); }); - it('throws if 429 and no proxy configured', () => { - assert.match(src, /429.*rate limited/); + it('new bundle gives the section a 540s timeout', () => { + assert.match(bundleSrc, /timeoutMs:\s*540_000/); + }); + + it('Dockerfile copies scripts/ + shared/ (needed at runtime)', () => { + assert.match(dockerfileSrc, /COPY\s+scripts\/\s+\.\/scripts\//); + assert.match(dockerfileSrc, /COPY\s+shared\/\s+\.\/shared\//); + }); + + it('Dockerfile CMD runs the new bundle script', () => { + assert.match(dockerfileSrc, /CMD\s*\["node",\s*"scripts\/seed-bundle-portwatch-port-activity\.mjs"\]/); + }); + + it('Dockerfile sets dns-result-order=ipv4first (matches other seed services)', () => { + assert.match(dockerfileSrc, /dns-result-order=ipv4first/); }); }); @@ -247,12 +237,10 @@ describe('anomalySignal computation', () => { for (let i = 0; i < 30; i++) { rows.push({ date: now - (29 - i) * 86400000, portcalls_tanker: 60 }); } - // last 7 days avg = 2 (spike down) for (let i = 0; i < 7; i++) { rows[rows.length - 7 + i].portcalls_tanker = 2; } - const result = computeAnomalySignal(rows, cutoff30, cutoff7); - assert.equal(result, true, 'should detect anomaly when 7d avg is far below 30d avg'); + assert.equal(computeAnomalySignal(rows, cutoff30, cutoff7), true); }); it('does NOT flag anomaly when 7d avg is close to 30d avg', () => { @@ -260,76 +248,81 @@ describe('anomalySignal computation', () => { for (let i = 0; i < 30; i++) { rows.push({ date: now - (29 - i) * 86400000, portcalls_tanker: 60 }); } - // last 7 days avg = 55 (close to 60) for (let i = 0; i < 7; i++) { rows[rows.length - 7 + i].portcalls_tanker = 55; } - const result = computeAnomalySignal(rows, cutoff30, cutoff7); - assert.equal(result, false, 'should not flag anomaly when 7d is close to 30d avg'); + assert.equal(computeAnomalySignal(rows, cutoff30, cutoff7), false); }); - it('returns false when 30d avg is zero (no baseline)', () => { - const rows = []; - for (let i = 0; i < 30; i++) { - rows.push({ date: now - (29 - i) * 86400000, portcalls_tanker: 0 }); - } - const result = computeAnomalySignal(rows, cutoff30, cutoff7); - assert.equal(result, false, 'should return false when baseline is zero'); + it('returns false when 30d avg is zero', () => { + const rows = Array.from({ length: 30 }, (_, i) => ({ date: now - (29 - i) * 86400000, portcalls_tanker: 0 })); + assert.equal(computeAnomalySignal(rows, cutoff30, cutoff7), false); }); }); describe('top-N port truncation', () => { it('returns top 50 ports from a set of 60', () => { - const ports = Array.from({ length: 60 }, (_, i) => ({ - portId: String(i), - portName: `Port ${i}`, - tankerCalls30d: 60 - i, - })); + const ports = Array.from({ length: 60 }, (_, i) => ({ portId: String(i), portName: `P${i}`, tankerCalls30d: 60 - i })); const result = topN(ports, 50); - assert.equal(result.length, 50, 'should return exactly 50 ports'); - assert.equal(result[0].tankerCalls30d, 60, 'first port should have highest tankerCalls30d'); - assert.equal(result[49].tankerCalls30d, 11, 'last port should be rank 50'); + assert.equal(result.length, 50); + assert.equal(result[0].tankerCalls30d, 60); + assert.equal(result[49].tankerCalls30d, 11); }); it('returns all ports when count is less than N', () => { - const ports = Array.from({ length: 10 }, (_, i) => ({ - portId: String(i), - portName: `Port ${i}`, - tankerCalls30d: 10 - i, - })); - const result = topN(ports, 50); - assert.equal(result.length, 10, 'should return all 10 ports when fewer than 50'); + const ports = Array.from({ length: 10 }, (_, i) => ({ portId: String(i), portName: `P${i}`, tankerCalls30d: 10 - i })); + assert.equal(topN(ports, 50).length, 10); + }); +}); + +// ── runtime tests ──────────────────────────────────────────────────────────── + +describe('withPerCountryTimeout (runtime)', () => { + let withPerCountryTimeout; + before(async () => { + ({ withPerCountryTimeout } = await import('../scripts/seed-portwatch-port-activity.mjs')); }); - it('sorts by tankerCalls30d descending', () => { - const ports = [ - { portId: 'a', portName: 'A', tankerCalls30d: 5 }, - { portId: 'b', portName: 'B', tankerCalls30d: 100 }, - { portId: 'c', portName: 'C', tankerCalls30d: 50 }, - ]; - const result = topN(ports, 50); - assert.equal(result[0].portId, 'b'); - assert.equal(result[1].portId, 'c'); - assert.equal(result[2].portId, 'a'); + it('aborts the per-country signal when the timer fires', async () => { + let observedSignal; + const p = withPerCountryTimeout( + (signal) => { + observedSignal = signal; + return new Promise((_, reject) => { + signal.addEventListener('abort', () => reject(signal.reason), { once: true }); + }); + }, + 'TST', + 40, + ); + await assert.rejects(p, /per-country timeout after 0\.04s \(TST\)/); + assert.equal(observedSignal.aborted, true); + }); + + it('resolves with the work result when work completes before the timer', async () => { + const result = await withPerCountryTimeout((_s) => Promise.resolve({ ok: true }), 'TST', 500); + assert.deepEqual(result, { ok: true }); + }); + + it('surfaces the real error when work rejects first (not timeout message)', async () => { + await assert.rejects( + withPerCountryTimeout((_s) => Promise.reject(new Error('ArcGIS HTTP 500')), 'TST', 1_000), + /ArcGIS HTTP 500/, + ); }); }); describe('finalisePortsForCountry (runtime, semantic equivalence)', () => { - // eslint-disable-next-line import/first let finalisePortsForCountry; before(async () => { ({ finalisePortsForCountry } = await import('../scripts/seed-portwatch-port-activity.mjs')); }); it('emits tankerCalls30d / trendDelta / anomalySignal that match the old per-row formula', () => { - // Accum equivalent of: last30 has 30 rows × 60 calls, prev30 has 30 × 40, - // last7 has 7 rows × 20 calls (subset of last30 — but note that the - // streaming aggregator increments last30 AND last7 on the same row for - // dates ≤ 7d, so last7_calls should be <= last30_calls). const portAccumMap = new Map([ ['42', { portname: 'Test Port', - last30_calls: 60 * 23 + 20 * 7, // 23 rows in 8-30d window + 7 rows in 0-7d window + last30_calls: 60 * 23 + 20 * 7, last30_count: 30, last30_import: 1000, last30_export: 500, @@ -340,47 +333,27 @@ describe('finalisePortsForCountry (runtime, semantic equivalence)', () => { ]); const refMap = new Map([['42', { lat: 10, lon: 20 }]]); const [port] = finalisePortsForCountry(portAccumMap, refMap); - - assert.equal(port.portId, '42'); - assert.equal(port.portName, 'Test Port'); - assert.equal(port.lat, 10); - assert.equal(port.lon, 20); assert.equal(port.tankerCalls30d, 60 * 23 + 20 * 7); assert.equal(port.importTankerDwt30d, 1000); assert.equal(port.exportTankerDwt30d, 500); - // trendDelta = ((last30 - prev30) / prev30) * 100, rounded to 1 decimal const expectedTrend = Math.round(((60 * 23 + 20 * 7 - 40 * 30) / (40 * 30)) * 1000) / 10; assert.equal(port.trendDelta, expectedTrend); - // avg30d = last30_calls / last30_count; avg7d = last7_calls / last7_count - // (60*23 + 20*7) / 30 = (1380+140)/30 = 50.67; last7 avg = 140/7 = 20 - // 20 < 50.67 * 0.5 = 25.33 → anomaly = true assert.equal(port.anomalySignal, true); }); - it('returns trendDelta=0 when prev30_calls is zero (no baseline)', () => { + it('trendDelta=0 when prev30_calls=0', () => { const portAccumMap = new Map([ - ['1', { - portname: 'P', last30_calls: 100, last30_count: 30, - last30_import: 0, last30_export: 0, - prev30_calls: 0, // no prior-period baseline - // last7 matches last30 rate → no anomaly - last7_calls: Math.round((100 / 30) * 7), - last7_count: 7, - }], + ['1', { portname: 'P', last30_calls: 100, last30_count: 30, last30_import: 0, last30_export: 0, prev30_calls: 0, last7_calls: Math.round((100 / 30) * 7), last7_count: 7 }], ]); const [port] = finalisePortsForCountry(portAccumMap, new Map()); assert.equal(port.trendDelta, 0); assert.equal(port.anomalySignal, false); }); - it('sorts by tankerCalls30d desc and truncates to MAX_PORTS_PER_COUNTRY=50', () => { + it('sorts desc + truncates to MAX_PORTS_PER_COUNTRY=50', () => { const portAccumMap = new Map(); for (let i = 0; i < 60; i++) { - portAccumMap.set(String(i), { - portname: `P${i}`, last30_calls: 60 - i, last30_count: 1, - last30_import: 0, last30_export: 0, prev30_calls: 0, - last7_calls: 0, last7_count: 0, - }); + portAccumMap.set(String(i), { portname: `P${i}`, last30_calls: 60 - i, last30_count: 1, last30_import: 0, last30_export: 0, prev30_calls: 0, last7_calls: 0, last7_count: 0 }); } const out = finalisePortsForCountry(portAccumMap, new Map()); assert.equal(out.length, 50); @@ -388,7 +361,7 @@ describe('finalisePortsForCountry (runtime, semantic equivalence)', () => { assert.equal(out[49].tankerCalls30d, 11); }); - it('falls back to lat/lon=0 when a portId is missing from refMap', () => { + it('falls back to lat/lon=0 when refMap lacks the portId', () => { const portAccumMap = new Map([ ['999', { portname: 'Orphan', last30_calls: 1, last30_count: 1, last30_import: 0, last30_export: 0, prev30_calls: 0, last7_calls: 0, last7_count: 0 }], ]); @@ -403,9 +376,6 @@ describe('proxyFetch signal propagation (runtime)', () => { const { proxyFetch } = require_('../scripts/_proxy-utils.cjs'); it('rejects synchronously when called with an already-aborted signal', async () => { - // A shutdown-controller abort must short-circuit BEFORE any CONNECT - // tunnel opens; otherwise a killed run's proxy call continues in the - // background past SIGKILL. No network reached in this test. const controller = new AbortController(); controller.abort(new Error('test-cancel')); await assert.rejects( @@ -421,15 +391,13 @@ describe('proxyFetch signal propagation (runtime)', () => { describe('validateFn', () => { it('returns true when countries array has >= 50 entries', () => { const data = { countries: Array.from({ length: 80 }, (_, i) => `C${i}`), fetchedAt: new Date().toISOString() }; - const countries = data.countries; - const valid = data && Array.isArray(countries) && countries.length >= 50; + const valid = data && Array.isArray(data.countries) && data.countries.length >= 50; assert.equal(valid, true); }); it('returns false when countries array has < 50 entries', () => { const data = { countries: ['US', 'SA'], fetchedAt: new Date().toISOString() }; - const countries = data.countries; - const valid = data && Array.isArray(countries) && countries.length >= 50; + const valid = data && Array.isArray(data.countries) && data.countries.length >= 50; assert.equal(valid, false); });