fix(portwatch): per-country timeout + SIGTERM progress flush (#3222)

* fix(portwatch): per-country timeout + SIGTERM progress flush

Diagnosed from Railway log 2026-04-20T04:00-04:07: Port-Activity section hit
the 420s section cap with only batch 1/15 logged. Gap between batch 1 (67.3s)
and SIGTERM was 352s of silence — batch 2 stalled because Promise.allSettled
waits for the slowest country and processCountry had no per-country budget.
One slow country (USA/CHN with many ports × many pages under ArcGIS EP3
throttling) blocked the whole batch and cascaded to the section timeout,
leaving batches 2..15 unattempted.

Two changes, both stabilisers ahead of the proper fix (globalising EP3):

1. Wrap processCountry in Promise.race against a 90s PER_COUNTRY_TIMEOUT_MS.
   Bounds worst-case batch time at ~90s regardless of ArcGIS behaviour.
   Orphan fetches keep running until their own AbortSignal.timeout(45s)
   fires — acceptable since the process exits soon after either way.

2. Share a `progress` object between fetchAll() and the SIGTERM handler so
   the kill path flushes batch index, seeded count, and the first 10 error
   messages. Past timeout kills discarded the errors array entirely,
   making every regression undiagnosable.

* fix(portwatch): address PR #3222 P1+P2 (propagate abort, eager error flush)

Review feedback on #3222:

P1 — The 90s per-country timeout did not actually stop the timed-out
country's work; Promise.race rejected but processCountry kept paginating
with fresh 45s fetch timeouts per page, violating the CONCURRENCY=12 cap
and amplifying ArcGIS throttling instead of containing it.

Fix: thread an AbortController signal from withPerCountryTimeout through
processCountry → fetchActivityRows → fetchWithTimeout. fetchWithTimeout
combines the caller signal with AbortSignal.timeout(FETCH_TIMEOUT) via
AbortSignal.any so the per-country abort propagates into the in-flight
fetch. fetchActivityRows also checks signal.aborted between pages so a
cancel lands on the next iteration boundary even if the current page
has already resolved. Node 24 runtime supports AbortSignal.any.

P2 — SIGTERM diagnostics missed failures from the currently-stuck batch
because progress.errors was only populated after Promise.allSettled
returned. A kill during the pending await left progress.errors empty.

Fix: attach p.catch(err => errors.push(...)) to each wrapped promise
before Promise.allSettled. Rejections land in the shared errors array
at the moment they fire, so a SIGTERM mid-batch sees every rejection
that has already occurred (including per-country timeouts that have
already aborted their controllers). The settled loop skips rejected
outcomes to avoid double-counting.

Also exports withPerCountryTimeout with an injectable timeoutMs so the
new runtime tests can exercise the abort path at 40ms. Runtime tests
verify: (a) timer fires → underlying signal aborted + work rejects with
the per-country message, (b) work-resolves-first returns the value,
(c) work-rejects-first surfaces the real error, (d) eager .catch flush
populates a shared errors array before allSettled resolves.

Tests: 45 pass (was 38, +7 — 4 runtime + 3 source-regex).
Full test:data: 5867 pass. Typecheck + lint clean.

* fix(portwatch): abort also cancels 429 proxy fallback (PR #3222 P1 follow-up)

Second review iteration on #3222: the per-country AbortController fix
from b2f4a2626 stopped at the direct fetch() and did not reach the 429
proxy fallback. httpsProxyFetchRaw only accepted timeoutMs, so a
timed-out country could keep a CONNECT tunnel + request alive for up
to another FETCH_TIMEOUT (45s) after the batch moved on — the exact
throttling scenario the PR is meant to contain. The concurrency cap
was still violated on the slow path.

Threads `signal` all the way through:

- scripts/_proxy-utils.cjs: proxyConnectTunnel + proxyFetch accept an
  optional signal option. Early-reject if `signal.aborted` before
  opening the socket. Otherwise addEventListener('abort') destroys the
  in-flight proxy socket + TLS tunnel and rejects with signal.reason.
  Listener removed in cleanup() on all terminal paths. Refactored both
  functions around resolveOnce/rejectOnce guards so the abort path
  races cleanly with timeout and network errors without double-settle.

- scripts/_seed-utils.mjs: httpsProxyFetchRaw accepts + forwards
  `signal` to proxyFetch.

- scripts/seed-portwatch-port-activity.mjs: fetchWithTimeout's 429
  branch passes its caller signal to httpsProxyFetchRaw.

Backward compatible: signal is optional in every layer, so the many
other callers of proxyFetch / httpsProxyFetchRaw across the repo are
unaffected.

Tests: 49 pass (was 45, +4). New runtime test proves pre-aborted
signals reject proxyFetch synchronously without touching the network.
Source-regex tests assert signal threading at each layer. Full
test:data 5871 pass. Typecheck + lint clean.
This commit is contained in:
Elie Habib
2026-04-20 09:36:10 +04:00
committed by GitHub
parent 6e639274f1
commit 4f38ee5a19
4 changed files with 321 additions and 44 deletions

View File

@@ -88,16 +88,37 @@ function resolveProxyStringConnect() {
return cfg.tls ? `https://${base}` : base; return cfg.tls ? `https://${base}` : base;
} }
function proxyConnectTunnel(targetHostname, proxyConfig, { timeoutMs = 20_000, targetPort = 443 } = {}) { function proxyConnectTunnel(targetHostname, proxyConfig, { timeoutMs = 20_000, targetPort = 443, signal } = {}) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const timer = setTimeout(() => { if (signal && signal.aborted) {
proxySock.destroy(); return reject(signal.reason || new Error('aborted'));
reject(new Error('CONNECT tunnel timeout')); }
}, timeoutMs);
const onError = (e) => { clearTimeout(timer); reject(e); };
let proxySock; let proxySock;
let settled = false;
let onAbort = null;
const cleanup = () => {
clearTimeout(timer);
if (signal && onAbort) signal.removeEventListener('abort', onAbort);
};
const resolveOnce = (val) => { if (settled) return; settled = true; cleanup(); resolve(val); };
const rejectOnce = (err) => { if (settled) return; settled = true; cleanup(); reject(err); };
const timer = setTimeout(() => {
if (proxySock) proxySock.destroy();
rejectOnce(new Error('CONNECT tunnel timeout'));
}, timeoutMs);
if (signal) {
onAbort = () => {
if (proxySock) proxySock.destroy();
rejectOnce(signal.reason || new Error('aborted'));
};
signal.addEventListener('abort', onAbort, { once: true });
}
const onError = (e) => rejectOnce(e);
const connectCb = () => { const connectCb = () => {
const authHeader = proxyConfig.auth const authHeader = proxyConfig.auth
? `\r\nProxy-Authorization: Basic ${Buffer.from(proxyConfig.auth).toString('base64')}` ? `\r\nProxy-Authorization: Basic ${Buffer.from(proxyConfig.auth).toString('base64')}`
@@ -113,9 +134,8 @@ function proxyConnectTunnel(targetHostname, proxyConfig, { timeoutMs = 20_000, t
proxySock.removeListener('data', onData); proxySock.removeListener('data', onData);
const statusLine = buf.split('\r\n')[0]; const statusLine = buf.split('\r\n')[0];
if (!statusLine.startsWith('HTTP/1.1 200') && !statusLine.startsWith('HTTP/1.0 200')) { if (!statusLine.startsWith('HTTP/1.1 200') && !statusLine.startsWith('HTTP/1.0 200')) {
clearTimeout(timer);
proxySock.destroy(); proxySock.destroy();
return reject( return rejectOnce(
Object.assign(new Error(`Proxy CONNECT: ${statusLine}`), { Object.assign(new Error(`Proxy CONNECT: ${statusLine}`), {
status: parseInt(statusLine.split(' ')[1]) || 0, status: parseInt(statusLine.split(' ')[1]) || 0,
}) })
@@ -126,9 +146,8 @@ function proxyConnectTunnel(targetHostname, proxyConfig, { timeoutMs = 20_000, t
const tlsSocket = tls.connect( const tlsSocket = tls.connect(
{ socket: proxySock, servername: targetHostname, ALPNProtocols: ['http/1.1'] }, { socket: proxySock, servername: targetHostname, ALPNProtocols: ['http/1.1'] },
() => { () => {
clearTimeout(timer);
proxySock.resume(); proxySock.resume();
resolve({ resolveOnce({
socket: tlsSocket, socket: tlsSocket,
destroy: () => { tlsSocket.destroy(); proxySock.destroy(); }, destroy: () => { tlsSocket.destroy(); proxySock.destroy(); },
}); });
@@ -157,13 +176,33 @@ function proxyFetch(url, proxyConfig, {
method = 'GET', method = 'GET',
body = null, body = null,
timeoutMs = 20_000, timeoutMs = 20_000,
signal,
} = {}) { } = {}) {
const targetUrl = new URL(url); const targetUrl = new URL(url);
return proxyConnectTunnel(targetUrl.hostname, proxyConfig, { timeoutMs }).then(({ socket: tlsSocket, destroy }) => { if (signal && signal.aborted) {
return Promise.reject(signal.reason || new Error('aborted'));
}
return proxyConnectTunnel(targetUrl.hostname, proxyConfig, { timeoutMs, signal }).then(({ socket: tlsSocket, destroy }) => {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const timer = setTimeout(() => { destroy(); reject(new Error('proxy fetch timeout')); }, timeoutMs); let settled = false;
const fail = (e) => { clearTimeout(timer); destroy(); reject(e); }; let onAbort = null;
const cleanup = () => {
clearTimeout(timer);
if (signal && onAbort) signal.removeEventListener('abort', onAbort);
};
// Both terminal paths destroy the TLS tunnel (mirrors the original
// behavior where success + failure both released the socket).
const resolveOnce = (v) => { if (settled) return; settled = true; cleanup(); destroy(); resolve(v); };
const rejectOnce = (e) => { if (settled) return; settled = true; cleanup(); destroy(); reject(e); };
const timer = setTimeout(() => rejectOnce(new Error('proxy fetch timeout')), timeoutMs);
if (signal) {
onAbort = () => rejectOnce(signal.reason || new Error('aborted'));
signal.addEventListener('abort', onAbort, { once: true });
}
const reqHeaders = { const reqHeaders = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
@@ -190,18 +229,16 @@ function proxyFetch(url, proxyConfig, {
const chunks = []; const chunks = [];
stream.on('data', (c) => chunks.push(c)); stream.on('data', (c) => chunks.push(c));
stream.on('end', () => { stream.on('end', () => {
clearTimeout(timer); resolveOnce({
destroy();
resolve({
ok: resp.statusCode >= 200 && resp.statusCode < 300, ok: resp.statusCode >= 200 && resp.statusCode < 300,
status: resp.statusCode, status: resp.statusCode,
buffer: Buffer.concat(chunks), buffer: Buffer.concat(chunks),
contentType: resp.headers['content-type'] || '', contentType: resp.headers['content-type'] || '',
}); });
}); });
stream.on('error', fail); stream.on('error', rejectOnce);
}); });
req.on('error', fail); req.on('error', rejectOnce);
if (body != null) req.write(body); if (body != null) req.write(body);
req.end(); req.end();
}); });

View File

@@ -395,11 +395,11 @@ async function httpsProxyFetchJson(url, proxyAuth) {
return JSON.parse(buffer.toString('utf8')); return JSON.parse(buffer.toString('utf8'));
} }
export async function httpsProxyFetchRaw(url, proxyAuth, { accept = '*/*', timeoutMs = 20_000 } = {}) { export async function httpsProxyFetchRaw(url, proxyAuth, { accept = '*/*', timeoutMs = 20_000, signal } = {}) {
const { proxyFetch, parseProxyConfig } = createRequire(import.meta.url)('./_proxy-utils.cjs'); const { proxyFetch, parseProxyConfig } = createRequire(import.meta.url)('./_proxy-utils.cjs');
const proxyConfig = parseProxyConfig(proxyAuth); const proxyConfig = parseProxyConfig(proxyAuth);
if (!proxyConfig) throw new Error('Invalid proxy auth string'); if (!proxyConfig) throw new Error('Invalid proxy auth string');
const result = await proxyFetch(url, proxyConfig, { accept, timeoutMs, headers: { 'User-Agent': CHROME_UA } }); const result = await proxyFetch(url, proxyConfig, { accept, timeoutMs, signal, headers: { 'User-Agent': CHROME_UA } });
if (!result.ok) throw Object.assign(new Error(`HTTP ${result.status}`), { status: result.status }); if (!result.ok) throw Object.assign(new Error(`HTTP ${result.status}`), { status: result.status });
return { buffer: result.buffer, contentType: result.contentType }; return { buffer: result.buffer, contentType: result.contentType };
} }

View File

@@ -35,6 +35,12 @@ const HISTORY_DAYS = 90;
const MAX_PORTS_PER_COUNTRY = 50; const MAX_PORTS_PER_COUNTRY = 50;
const CONCURRENCY = 12; const CONCURRENCY = 12;
const BATCH_LOG_EVERY = 5; const BATCH_LOG_EVERY = 5;
// Per-country budget. Promise.allSettled waits for the slowest member of the
// batch, so one runaway country (e.g. USA: many ports × many pages when EP3
// is slow) can stall the whole batch and cascade to the section timeout,
// leaving batches 2..N unattempted. This caps a single country without
// aborting the whole section.
const PER_COUNTRY_TIMEOUT_MS = 90_000;
function epochToTimestamp(epochMs) { function epochToTimestamp(epochMs) {
const d = new Date(epochMs); const d = new Date(epochMs);
@@ -42,16 +48,30 @@ function epochToTimestamp(epochMs) {
return `timestamp '${d.getUTCFullYear()}-${p(d.getUTCMonth() + 1)}-${p(d.getUTCDate())} ${p(d.getUTCHours())}:${p(d.getUTCMinutes())}:${p(d.getUTCSeconds())}'`; return `timestamp '${d.getUTCFullYear()}-${p(d.getUTCMonth() + 1)}-${p(d.getUTCDate())} ${p(d.getUTCHours())}:${p(d.getUTCMinutes())}:${p(d.getUTCSeconds())}'`;
} }
async function fetchWithTimeout(url) { async function fetchWithTimeout(url, { signal } = {}) {
// Combine the per-call FETCH_TIMEOUT with the upstream per-country signal
// so a per-country abort propagates into the in-flight fetch AND future
// pagination iterations (review feedback P1 on PR #3222). Without this,
// the 90s withPerCountryTimeout timer fires, the batch moves on, but the
// orphaned country keeps paginating with fresh 45s fetch timeouts —
// breaking the CONCURRENCY=12 cap and amplifying ArcGIS throttling.
const combined = signal
? AbortSignal.any([signal, AbortSignal.timeout(FETCH_TIMEOUT)])
: AbortSignal.timeout(FETCH_TIMEOUT);
const resp = await fetch(url, { const resp = await fetch(url, {
headers: { 'User-Agent': CHROME_UA, Accept: 'application/json' }, headers: { 'User-Agent': CHROME_UA, Accept: 'application/json' },
signal: AbortSignal.timeout(FETCH_TIMEOUT), signal: combined,
}); });
if (resp.status === 429) { if (resp.status === 429) {
const proxyAuth = resolveProxyForConnect(); const proxyAuth = resolveProxyForConnect();
if (!proxyAuth) throw new Error(`ArcGIS HTTP 429 (rate limited) for ${url.slice(0, 80)}`); if (!proxyAuth) throw new Error(`ArcGIS HTTP 429 (rate limited) for ${url.slice(0, 80)}`);
console.warn(` [portwatch] 429 rate-limited — retrying via proxy: ${url.slice(0, 80)}`); console.warn(` [portwatch] 429 rate-limited — retrying via proxy: ${url.slice(0, 80)}`);
const { buffer } = await httpsProxyFetchRaw(url, proxyAuth, { accept: 'application/json', timeoutMs: FETCH_TIMEOUT }); // Pass the caller signal so a per-country abort also cancels the proxy
// fallback path (review feedback on PR #3222). Without this, a timed-out
// country could keep a proxy CONNECT tunnel + request alive for another
// 45s after the batch moved on, re-creating the orphan-work problem
// under the exact throttling scenario this PR addresses.
const { buffer } = await httpsProxyFetchRaw(url, proxyAuth, { accept: 'application/json', timeoutMs: FETCH_TIMEOUT, signal });
const proxied = JSON.parse(buffer.toString('utf8')); const proxied = JSON.parse(buffer.toString('utf8'));
if (proxied.error) throw new Error(`ArcGIS error (via proxy): ${proxied.error.message}`); if (proxied.error) throw new Error(`ArcGIS error (via proxy): ${proxied.error.message}`);
return proxied; return proxied;
@@ -105,11 +125,15 @@ async function fetchAllPortRefs() {
return byIso3; return byIso3;
} }
async function fetchActivityRows(iso3, since) { async function fetchActivityRows(iso3, since, { signal } = {}) {
let offset = 0; let offset = 0;
const allRows = []; const allRows = [];
let body; let body;
do { do {
// Abort between pages so a cancelled per-country timer stops the
// paginator on the next iteration boundary even if the current fetch
// has already resolved.
if (signal?.aborted) throw signal.reason ?? new Error('aborted');
const params = new URLSearchParams({ const params = new URLSearchParams({
where: `ISO3='${iso3}' AND date > ${epochToTimestamp(since)}`, where: `ISO3='${iso3}' AND date > ${epochToTimestamp(since)}`,
outFields: 'portid,portname,ISO3,date,portcalls_tanker,import_tanker,export_tanker', outFields: 'portid,portname,ISO3,date,portcalls_tanker,import_tanker,export_tanker',
@@ -119,7 +143,7 @@ async function fetchActivityRows(iso3, since) {
outSR: '4326', outSR: '4326',
f: 'json', f: 'json',
}); });
body = await fetchWithTimeout(`${EP3_BASE}?${params}`); body = await fetchWithTimeout(`${EP3_BASE}?${params}`, { signal });
const features = body.features ?? []; const features = body.features ?? [];
if (features.length) allRows.push(...features); if (features.length) allRows.push(...features);
// Advance by actual returned count, not PAGE_SIZE. ArcGIS can cap below // Advance by actual returned count, not PAGE_SIZE. ArcGIS can cap below
@@ -207,17 +231,40 @@ async function redisPipeline(commands) {
return resp.json(); return resp.json();
} }
async function processCountry(iso3, iso2, since, refMap) { async function processCountry(iso3, iso2, since, refMap, { signal } = {}) {
const rawRows = await fetchActivityRows(iso3, since); const rawRows = await fetchActivityRows(iso3, since, { signal });
if (!rawRows.length) return null; if (!rawRows.length) return null;
const ports = computeCountryPorts(rawRows, refMap); const ports = computeCountryPorts(rawRows, refMap);
if (!ports.length) return null; if (!ports.length) return null;
return { iso2, ports, fetchedAt: new Date().toISOString() }; return { iso2, ports, fetchedAt: new Date().toISOString() };
} }
// Runs `doWork(signal)` but rejects if the per-country timer fires first,
// aborting the controller so the in-flight fetch (and its pagination loop)
// actually stops instead of orphaning. Keeps the CONCURRENCY=12 cap real:
// the next batch cannot pile new requests on top of still-running earlier
// work. Exported with an injectable timeoutMs so runtime tests can exercise
// the abort path at 50ms instead of the production 90s.
export function withPerCountryTimeout(doWork, iso3, timeoutMs = PER_COUNTRY_TIMEOUT_MS) {
const controller = new AbortController();
let timer;
const guard = new Promise((_, reject) => {
timer = setTimeout(() => {
const err = new Error(`per-country timeout after ${timeoutMs / 1000}s (${iso3})`);
try { controller.abort(err); } catch {}
reject(err);
}, timeoutMs);
});
const work = doWork(controller.signal);
return Promise.race([work, guard]).finally(() => clearTimeout(timer));
}
// fetchAll() — pure data collection, no Redis writes. // fetchAll() — pure data collection, no Redis writes.
// Returns { countries: string[], countryData: Map<iso2, payload>, fetchedAt: string }. // Returns { countries: string[], countryData: Map<iso2, payload>, fetchedAt: string }.
export async function fetchAll() { //
// `progress` (optional) is mutated in-place so a SIGTERM handler in main()
// can read the last batch index, seeded count, and error list at kill time.
export async function fetchAll(progress) {
const { iso3ToIso2 } = createCountryResolvers(); const { iso3ToIso2 } = createCountryResolvers();
const since = Date.now() - HISTORY_DAYS * 86400000; const since = Date.now() - HISTORY_DAYS * 86400000;
@@ -229,33 +276,43 @@ export async function fetchAll() {
// Only fetch activity for ISO3s that have at least one port AND exist in our iso3→iso2 map. // Only fetch activity for ISO3s that have at least one port AND exist in our iso3→iso2 map.
const eligibleIso3 = [...refsByIso3.keys()].filter(iso3 => iso3ToIso2.has(iso3)); const eligibleIso3 = [...refsByIso3.keys()].filter(iso3 => iso3ToIso2.has(iso3));
const skipped = refsByIso3.size - eligibleIso3.length; const skipped = refsByIso3.size - eligibleIso3.length;
console.log(` [port-activity] Activity queue: ${eligibleIso3.length} countries (skipping ${skipped} unmapped iso3, concurrency ${CONCURRENCY})`); console.log(` [port-activity] Activity queue: ${eligibleIso3.length} countries (skipping ${skipped} unmapped iso3, concurrency ${CONCURRENCY}, per-country cap ${PER_COUNTRY_TIMEOUT_MS / 1000}s)`);
const countryData = new Map(); const countryData = new Map();
const errors = []; const errors = progress?.errors ?? [];
const batches = Math.ceil(eligibleIso3.length / CONCURRENCY); const batches = Math.ceil(eligibleIso3.length / CONCURRENCY);
const activityStart = Date.now(); const activityStart = Date.now();
if (progress) progress.totalBatches = batches;
for (let i = 0; i < eligibleIso3.length; i += CONCURRENCY) { for (let i = 0; i < eligibleIso3.length; i += CONCURRENCY) {
const batch = eligibleIso3.slice(i, i + CONCURRENCY); const batch = eligibleIso3.slice(i, i + CONCURRENCY);
const batchIdx = Math.floor(i / CONCURRENCY) + 1; const batchIdx = Math.floor(i / CONCURRENCY) + 1;
const settled = await Promise.allSettled( if (progress) progress.batchIdx = batchIdx;
batch.map(iso3 => { const promises = batch.map(iso3 => {
const iso2 = iso3ToIso2.get(iso3); const iso2 = iso3ToIso2.get(iso3);
return processCountry(iso3, iso2, since, refsByIso3.get(iso3)); const p = withPerCountryTimeout(
}) (signal) => processCountry(iso3, iso2, since, refsByIso3.get(iso3), { signal }),
); iso3,
);
// Eager error flush (review feedback P2 on PR #3222). Push into the
// shared errors array the moment each promise rejects, so a SIGTERM
// that arrives MID-batch (while Promise.allSettled is still pending)
// sees the rejections that have already fired. The settled-loop
// below skips rejected outcomes to avoid double-counting.
p.catch(err => {
errors.push(`${iso3}: ${err?.message || err}`);
});
return p;
});
const settled = await Promise.allSettled(promises);
for (let j = 0; j < batch.length; j++) { for (let j = 0; j < batch.length; j++) {
const iso3 = batch[j];
const outcome = settled[j]; const outcome = settled[j];
if (outcome.status === 'rejected') { if (outcome.status === 'rejected') continue; // already recorded via .catch above
errors.push(`${iso3}: ${outcome.reason?.message || outcome.reason}`);
continue;
}
if (!outcome.value) continue; if (!outcome.value) continue;
const { iso2, ports, fetchedAt } = outcome.value; const { iso2, ports, fetchedAt } = outcome.value;
countryData.set(iso2, { iso2, ports, fetchedAt }); countryData.set(iso2, { iso2, ports, fetchedAt });
} }
if (progress) progress.seeded = countryData.size;
if (batchIdx === 1 || batchIdx % BATCH_LOG_EVERY === 0 || batchIdx === batches) { if (batchIdx === 1 || batchIdx % BATCH_LOG_EVERY === 0 || batchIdx === batches) {
const elapsed = ((Date.now() - activityStart) / 1000).toFixed(1); const elapsed = ((Date.now() - activityStart) / 1000).toFixed(1);
console.log(` [port-activity] batch ${batchIdx}/${batches}: ${countryData.size} countries seeded, ${errors.length} errors (${elapsed}s)`); console.log(` [port-activity] batch ${batchIdx}/${batches}: ${countryData.size} countries seeded, ${errors.length} errors (${elapsed}s)`);
@@ -293,6 +350,12 @@ async function main() {
let prevCountryKeys = []; let prevCountryKeys = [];
let prevCount = 0; let prevCount = 0;
// Mutated in-place by fetchAll() so the SIGTERM handler can log which batch
// we died in and what the per-country errors looked like. Without this, a
// timeout kill flushes nothing from the errors array — past regressions
// have been undiagnosable for exactly this reason.
const progress = { batchIdx: 0, totalBatches: 0, seeded: 0, errors: [] };
// Bundle-runner SIGKILLs via SIGTERM → SIGKILL on timeout. Release the lock // Bundle-runner SIGKILLs via SIGTERM → SIGKILL on timeout. Release the lock
// and extend existing TTLs synchronously(ish) so the next cron tick isn't // and extend existing TTLs synchronously(ish) so the next cron tick isn't
// blocked for up to 30 min and the Redis snapshot doesn't evaporate. // blocked for up to 30 min and the Redis snapshot doesn't evaporate.
@@ -300,7 +363,13 @@ async function main() {
const onSigterm = async () => { const onSigterm = async () => {
if (sigHandled) return; if (sigHandled) return;
sigHandled = true; sigHandled = true;
console.error(' [port-activity] SIGTERM received — releasing lock + extending TTLs'); console.error(
` [port-activity] SIGTERM at batch ${progress.batchIdx}/${progress.totalBatches}${progress.seeded} seeded, ${progress.errors.length} errors`,
);
if (progress.errors.length) {
console.error(` [port-activity] First errors: ${progress.errors.slice(0, 10).join('; ')}`);
}
console.error(' [port-activity] Releasing lock + extending TTLs');
try { try {
await extendExistingTtl([CANONICAL_KEY, META_KEY, ...prevCountryKeys], TTL); await extendExistingTtl([CANONICAL_KEY, META_KEY, ...prevCountryKeys], TTL);
} catch {} } catch {}
@@ -317,7 +386,7 @@ async function main() {
prevCount = Array.isArray(prevIso2List) ? prevIso2List.length : 0; prevCount = Array.isArray(prevIso2List) ? prevIso2List.length : 0;
console.log(` Fetching port activity data (${HISTORY_DAYS}d history)...`); console.log(` Fetching port activity data (${HISTORY_DAYS}d history)...`);
const { countries, countryData } = await fetchAll(); const { countries, countryData } = await fetchAll(progress);
console.log(` Fetched ${countryData.size} countries`); console.log(` Fetched ${countryData.size} countries`);

View File

@@ -3,11 +3,16 @@ import assert from 'node:assert/strict';
import { readFileSync } from 'node:fs'; import { readFileSync } from 'node:fs';
import { dirname, resolve } from 'node:path'; import { dirname, resolve } from 'node:path';
import { fileURLToPath } from 'node:url'; import { fileURLToPath } from 'node:url';
import { createRequire } from 'node:module';
import { withPerCountryTimeout } from '../scripts/seed-portwatch-port-activity.mjs';
const __dirname = dirname(fileURLToPath(import.meta.url)); const __dirname = dirname(fileURLToPath(import.meta.url));
const root = resolve(__dirname, '..'); const root = resolve(__dirname, '..');
const src = readFileSync(resolve(root, 'scripts/seed-portwatch-port-activity.mjs'), 'utf-8'); const src = readFileSync(resolve(root, 'scripts/seed-portwatch-port-activity.mjs'), 'utf-8');
const seedUtilsSrc = readFileSync(resolve(root, 'scripts/_seed-utils.mjs'), 'utf-8');
const proxyUtilsSrc = readFileSync(resolve(root, 'scripts/_proxy-utils.cjs'), 'utf-8');
// ── seeder source assertions ────────────────────────────────────────────────── // ── seeder source assertions ──────────────────────────────────────────────────
@@ -62,6 +67,72 @@ describe('seed-portwatch-port-activity.mjs exports', () => {
assert.match(src, /process\.on\('SIGTERM'/); assert.match(src, /process\.on\('SIGTERM'/);
}); });
it('defines a per-country timeout to cap Promise.allSettled stalls', () => {
// Without this cap, one slow country (USA: many ports × many pages when
// ArcGIS is throttled) blocks the whole batch via Promise.allSettled and
// cascades to the 420s section timeout, leaving batches 2..N unattempted.
assert.match(src, /PER_COUNTRY_TIMEOUT_MS\s*=\s*\d/);
});
it('wraps processCountry with the per-country timeout in the batch loop', () => {
// Must pass a factory (signal) => processCountry(...) so the timer can
// abort the in-flight fetch (PR #3222 review P1), not just race a
// detached promise.
assert.match(src, /withPerCountryTimeout\s*\(\s*\n?\s*\(signal\)\s*=>\s*processCountry/);
});
it('fetchWithTimeout combines caller signal with FETCH_TIMEOUT via AbortSignal.any', () => {
// Otherwise the per-country abort cannot propagate into the in-flight
// fetch; orphan pagination would continue with fresh 45s budgets each
// page (PR #3222 review P1).
assert.match(src, /AbortSignal\.any\(\[signal,\s*AbortSignal\.timeout\(FETCH_TIMEOUT\)\]\)/);
});
it('fetchActivityRows checks signal.aborted between pages', () => {
assert.match(src, /signal\?\.aborted\)\s*throw\s+signal\.reason/);
});
it('429 proxy fallback threads caller signal into httpsProxyFetchRaw', () => {
// Review feedback: without this, a timed-out country can leak a
// proxy CONNECT tunnel for up to FETCH_TIMEOUT (45s) after the
// batch moved on, defeating the concurrency cap under the exact
// throttling scenario this PR addresses.
assert.match(src, /httpsProxyFetchRaw\(url,\s*proxyAuth,\s*\{[^}]*signal\s*\}/s);
});
it('httpsProxyFetchRaw accepts and forwards signal', () => {
assert.match(seedUtilsSrc, /httpsProxyFetchRaw\(url,\s*proxyAuth,\s*\{[^}]*signal\s*\}/s);
assert.match(seedUtilsSrc, /proxyFetch\(url,\s*proxyConfig,\s*\{[^}]*signal[^}]*\}/s);
});
it('proxyFetch + proxyConnectTunnel accept signal and bail early if aborted', () => {
assert.match(proxyUtilsSrc, /function proxyFetch\([\s\S]*?\bsignal,?\s*\}\s*=\s*\{\}/);
assert.match(proxyUtilsSrc, /function proxyConnectTunnel\([\s\S]*?\bsignal\s*\}\s*=\s*\{\}/);
assert.match(proxyUtilsSrc, /signal && signal\.aborted/);
assert.match(proxyUtilsSrc, /signal\.addEventListener\('abort'/);
});
it('eager error flush attaches p.catch before Promise.allSettled', () => {
// Review P2: without this, errors collected only AFTER allSettled
// returns. A mid-batch SIGTERM would flush zero errors even though
// several promises had already rejected.
assert.match(src, /p\.catch\(err\s*=>\s*\{[^}]*errors\.push/);
});
it('SIGTERM handler flushes batch progress + first errors', () => {
// Past regressions were undiagnosable because the errors array was only
// logged after all batches completed — a SIGTERM kill discarded it.
assert.match(src, /SIGTERM at batch \$\{progress\.batchIdx\}/);
assert.match(src, /progress\.errors\.slice\(0,\s*10\)/);
});
it('fetchAll accepts a progress object and mutates it', () => {
assert.match(src, /export async function fetchAll\(progress\)/);
assert.match(src, /progress\.totalBatches\s*=\s*batches/);
assert.match(src, /progress\.batchIdx\s*=\s*batchIdx/);
assert.match(src, /progress\.seeded\s*=\s*countryData\.size/);
});
it('pagination advances by actual features.length, not PAGE_SIZE', () => { it('pagination advances by actual features.length, not PAGE_SIZE', () => {
// ArcGIS PortWatch_ports_database caps responses at 1000 rows even when // ArcGIS PortWatch_ports_database caps responses at 1000 rows even when
// resultRecordCount=2000. Advancing by PAGE_SIZE skips rows 1000-1999. // resultRecordCount=2000. Advancing by PAGE_SIZE skips rows 1000-1999.
@@ -218,6 +289,106 @@ describe('top-N port truncation', () => {
}); });
}); });
describe('withPerCountryTimeout (runtime)', () => {
it('aborts the per-country signal when the timer fires', async () => {
let observedSignal;
const p = withPerCountryTimeout(
(signal) => {
observedSignal = signal;
// Never resolves on its own; can only reject via abort.
return new Promise((_, reject) => {
signal.addEventListener('abort', () => reject(signal.reason), { once: true });
});
},
'TST',
40, // 40ms — keeps the test fast
);
await assert.rejects(p, /per-country timeout after 0\.04s \(TST\)/);
assert.equal(observedSignal.aborted, true, 'underlying work received the abort');
});
it('resolves with the work result when work completes before the timer', async () => {
const result = await withPerCountryTimeout(
(_signal) => Promise.resolve({ ok: true }),
'TST',
500,
);
assert.deepEqual(result, { ok: true });
});
it('does not invoke the timer path when work rejects first', async () => {
// Rejecting with a non-timeout error should surface as-is, not as the
// per-country timeout message.
await assert.rejects(
withPerCountryTimeout(
(_signal) => Promise.reject(new Error('ArcGIS HTTP 500')),
'TST',
1_000,
),
/ArcGIS HTTP 500/,
);
});
});
describe('proxyFetch signal propagation (runtime)', () => {
const require_ = createRequire(import.meta.url);
const { proxyFetch } = require_('../scripts/_proxy-utils.cjs');
it('rejects synchronously when called with an already-aborted signal', async () => {
// Review feedback: the per-country AbortController must kill the proxy
// fallback too. Pre-aborted signals must short-circuit BEFORE any
// CONNECT tunnel opens; otherwise a timed-out country's proxy call
// continues in the background. No network reached in this test — the
// synchronous aborted check is the guard.
const controller = new AbortController();
controller.abort(new Error('test-cancel'));
await assert.rejects(
proxyFetch('https://example.invalid/x', { host: 'nope', port: 1, auth: 'a:b', tls: true }, {
timeoutMs: 60_000,
signal: controller.signal,
}),
/test-cancel|aborted/,
);
});
});
describe('eager error flush (runtime)', () => {
it('populates shared errors via p.catch before Promise.allSettled resolves', async () => {
// Mirrors the wiring in fetchAll(): attach p.catch to each promise so
// rejections land in the errors array at the moment they fire, not
// only after allSettled. A SIGTERM that hits during allSettled still
// sees the already-pushed errors.
const errors = [];
let stuckResolve;
const rejecting = Promise.reject(new Error('boom A'));
rejecting.catch(err => errors.push(`A: ${err.message}`));
const stuck = new Promise(resolve => { stuckResolve = resolve; });
stuck.catch(err => errors.push(`B: ${err.message}`));
// Yield microtasks so `rejecting.catch` fires.
await Promise.resolve();
await Promise.resolve();
assert.deepEqual(errors, ['A: boom A'], 'rejected promise pushed BEFORE allSettled is even awaited');
// Sanity: the hung promise still blocks allSettled, proving the
// behavior we rely on: errors from resolved-rejected members are
// visible to a SIGTERM handler even while the batch itself is stuck.
const allSettledPromise = Promise.allSettled([rejecting, stuck]);
let settledEarly = false;
allSettledPromise.then(() => { settledEarly = true; });
await new Promise(r => setTimeout(r, 10));
assert.equal(settledEarly, false, 'allSettled still pending while one member is stuck');
assert.equal(errors.length, 1, 'error list observable despite pending batch');
stuckResolve();
await allSettledPromise;
});
});
describe('validateFn', () => { describe('validateFn', () => {
it('returns true when countries array has >= 50 entries', () => { it('returns true when countries array has >= 50 entries', () => {
const data = { countries: Array.from({ length: 80 }, (_, i) => `C${i}`), fetchedAt: new Date().toISOString() }; const data = { countries: Array.from({ length: 80 }, (_, i) => `C${i}`), fetchedAt: new Date().toISOString() };