diff --git a/scripts/_proxy-utils.cjs b/scripts/_proxy-utils.cjs index d4d08da61..dffb24aea 100644 --- a/scripts/_proxy-utils.cjs +++ b/scripts/_proxy-utils.cjs @@ -88,16 +88,37 @@ function resolveProxyStringConnect() { return cfg.tls ? `https://${base}` : base; } -function proxyConnectTunnel(targetHostname, proxyConfig, { timeoutMs = 20_000, targetPort = 443 } = {}) { +function proxyConnectTunnel(targetHostname, proxyConfig, { timeoutMs = 20_000, targetPort = 443, signal } = {}) { return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - proxySock.destroy(); - reject(new Error('CONNECT tunnel timeout')); - }, timeoutMs); - - const onError = (e) => { clearTimeout(timer); reject(e); }; + if (signal && signal.aborted) { + return reject(signal.reason || new Error('aborted')); + } let proxySock; + let settled = false; + let onAbort = null; + const cleanup = () => { + clearTimeout(timer); + if (signal && onAbort) signal.removeEventListener('abort', onAbort); + }; + const resolveOnce = (val) => { if (settled) return; settled = true; cleanup(); resolve(val); }; + const rejectOnce = (err) => { if (settled) return; settled = true; cleanup(); reject(err); }; + + const timer = setTimeout(() => { + if (proxySock) proxySock.destroy(); + rejectOnce(new Error('CONNECT tunnel timeout')); + }, timeoutMs); + + if (signal) { + onAbort = () => { + if (proxySock) proxySock.destroy(); + rejectOnce(signal.reason || new Error('aborted')); + }; + signal.addEventListener('abort', onAbort, { once: true }); + } + + const onError = (e) => rejectOnce(e); + const connectCb = () => { const authHeader = proxyConfig.auth ? `\r\nProxy-Authorization: Basic ${Buffer.from(proxyConfig.auth).toString('base64')}` @@ -113,9 +134,8 @@ function proxyConnectTunnel(targetHostname, proxyConfig, { timeoutMs = 20_000, t proxySock.removeListener('data', onData); const statusLine = buf.split('\r\n')[0]; if (!statusLine.startsWith('HTTP/1.1 200') && !statusLine.startsWith('HTTP/1.0 200')) { - clearTimeout(timer); proxySock.destroy(); - return reject( + return rejectOnce( Object.assign(new Error(`Proxy CONNECT: ${statusLine}`), { status: parseInt(statusLine.split(' ')[1]) || 0, }) @@ -126,9 +146,8 @@ function proxyConnectTunnel(targetHostname, proxyConfig, { timeoutMs = 20_000, t const tlsSocket = tls.connect( { socket: proxySock, servername: targetHostname, ALPNProtocols: ['http/1.1'] }, () => { - clearTimeout(timer); proxySock.resume(); - resolve({ + resolveOnce({ socket: tlsSocket, destroy: () => { tlsSocket.destroy(); proxySock.destroy(); }, }); @@ -157,13 +176,33 @@ function proxyFetch(url, proxyConfig, { method = 'GET', body = null, timeoutMs = 20_000, + signal, } = {}) { const targetUrl = new URL(url); - return proxyConnectTunnel(targetUrl.hostname, proxyConfig, { timeoutMs }).then(({ socket: tlsSocket, destroy }) => { + if (signal && signal.aborted) { + return Promise.reject(signal.reason || new Error('aborted')); + } + + return proxyConnectTunnel(targetUrl.hostname, proxyConfig, { timeoutMs, signal }).then(({ socket: tlsSocket, destroy }) => { return new Promise((resolve, reject) => { - const timer = setTimeout(() => { destroy(); reject(new Error('proxy fetch timeout')); }, timeoutMs); - const fail = (e) => { clearTimeout(timer); destroy(); reject(e); }; + let settled = false; + let onAbort = null; + const cleanup = () => { + clearTimeout(timer); + if (signal && onAbort) signal.removeEventListener('abort', onAbort); + }; + // Both terminal paths destroy the TLS tunnel (mirrors the original + // behavior where success + failure both released the socket). + const resolveOnce = (v) => { if (settled) return; settled = true; cleanup(); destroy(); resolve(v); }; + const rejectOnce = (e) => { if (settled) return; settled = true; cleanup(); destroy(); reject(e); }; + + const timer = setTimeout(() => rejectOnce(new Error('proxy fetch timeout')), timeoutMs); + + if (signal) { + onAbort = () => rejectOnce(signal.reason || new Error('aborted')); + signal.addEventListener('abort', onAbort, { once: true }); + } const reqHeaders = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', @@ -190,18 +229,16 @@ function proxyFetch(url, proxyConfig, { const chunks = []; stream.on('data', (c) => chunks.push(c)); stream.on('end', () => { - clearTimeout(timer); - destroy(); - resolve({ + resolveOnce({ ok: resp.statusCode >= 200 && resp.statusCode < 300, status: resp.statusCode, buffer: Buffer.concat(chunks), contentType: resp.headers['content-type'] || '', }); }); - stream.on('error', fail); + stream.on('error', rejectOnce); }); - req.on('error', fail); + req.on('error', rejectOnce); if (body != null) req.write(body); req.end(); }); diff --git a/scripts/_seed-utils.mjs b/scripts/_seed-utils.mjs index 630505267..1bdf4ed6f 100644 --- a/scripts/_seed-utils.mjs +++ b/scripts/_seed-utils.mjs @@ -395,11 +395,11 @@ async function httpsProxyFetchJson(url, proxyAuth) { return JSON.parse(buffer.toString('utf8')); } -export async function httpsProxyFetchRaw(url, proxyAuth, { accept = '*/*', timeoutMs = 20_000 } = {}) { +export async function httpsProxyFetchRaw(url, proxyAuth, { accept = '*/*', timeoutMs = 20_000, signal } = {}) { const { proxyFetch, parseProxyConfig } = createRequire(import.meta.url)('./_proxy-utils.cjs'); const proxyConfig = parseProxyConfig(proxyAuth); if (!proxyConfig) throw new Error('Invalid proxy auth string'); - const result = await proxyFetch(url, proxyConfig, { accept, timeoutMs, headers: { 'User-Agent': CHROME_UA } }); + const result = await proxyFetch(url, proxyConfig, { accept, timeoutMs, signal, headers: { 'User-Agent': CHROME_UA } }); if (!result.ok) throw Object.assign(new Error(`HTTP ${result.status}`), { status: result.status }); return { buffer: result.buffer, contentType: result.contentType }; } diff --git a/scripts/seed-portwatch-port-activity.mjs b/scripts/seed-portwatch-port-activity.mjs index a58540bf5..4e97910a6 100644 --- a/scripts/seed-portwatch-port-activity.mjs +++ b/scripts/seed-portwatch-port-activity.mjs @@ -35,6 +35,12 @@ const HISTORY_DAYS = 90; const MAX_PORTS_PER_COUNTRY = 50; const CONCURRENCY = 12; const BATCH_LOG_EVERY = 5; +// Per-country budget. Promise.allSettled waits for the slowest member of the +// batch, so one runaway country (e.g. USA: many ports × many pages when EP3 +// is slow) can stall the whole batch and cascade to the section timeout, +// leaving batches 2..N unattempted. This caps a single country without +// aborting the whole section. +const PER_COUNTRY_TIMEOUT_MS = 90_000; function epochToTimestamp(epochMs) { const d = new Date(epochMs); @@ -42,16 +48,30 @@ function epochToTimestamp(epochMs) { return `timestamp '${d.getUTCFullYear()}-${p(d.getUTCMonth() + 1)}-${p(d.getUTCDate())} ${p(d.getUTCHours())}:${p(d.getUTCMinutes())}:${p(d.getUTCSeconds())}'`; } -async function fetchWithTimeout(url) { +async function fetchWithTimeout(url, { signal } = {}) { + // Combine the per-call FETCH_TIMEOUT with the upstream per-country signal + // so a per-country abort propagates into the in-flight fetch AND future + // pagination iterations (review feedback P1 on PR #3222). Without this, + // the 90s withPerCountryTimeout timer fires, the batch moves on, but the + // orphaned country keeps paginating with fresh 45s fetch timeouts — + // breaking the CONCURRENCY=12 cap and amplifying ArcGIS throttling. + const combined = signal + ? AbortSignal.any([signal, AbortSignal.timeout(FETCH_TIMEOUT)]) + : AbortSignal.timeout(FETCH_TIMEOUT); const resp = await fetch(url, { headers: { 'User-Agent': CHROME_UA, Accept: 'application/json' }, - signal: AbortSignal.timeout(FETCH_TIMEOUT), + signal: combined, }); if (resp.status === 429) { const proxyAuth = resolveProxyForConnect(); if (!proxyAuth) throw new Error(`ArcGIS HTTP 429 (rate limited) for ${url.slice(0, 80)}`); console.warn(` [portwatch] 429 rate-limited — retrying via proxy: ${url.slice(0, 80)}`); - const { buffer } = await httpsProxyFetchRaw(url, proxyAuth, { accept: 'application/json', timeoutMs: FETCH_TIMEOUT }); + // Pass the caller signal so a per-country abort also cancels the proxy + // fallback path (review feedback on PR #3222). Without this, a timed-out + // country could keep a proxy CONNECT tunnel + request alive for another + // 45s after the batch moved on, re-creating the orphan-work problem + // under the exact throttling scenario this PR addresses. + const { buffer } = await httpsProxyFetchRaw(url, proxyAuth, { accept: 'application/json', timeoutMs: FETCH_TIMEOUT, signal }); const proxied = JSON.parse(buffer.toString('utf8')); if (proxied.error) throw new Error(`ArcGIS error (via proxy): ${proxied.error.message}`); return proxied; @@ -105,11 +125,15 @@ async function fetchAllPortRefs() { return byIso3; } -async function fetchActivityRows(iso3, since) { +async function fetchActivityRows(iso3, since, { signal } = {}) { let offset = 0; const allRows = []; let body; do { + // Abort between pages so a cancelled per-country timer stops the + // paginator on the next iteration boundary even if the current fetch + // has already resolved. + if (signal?.aborted) throw signal.reason ?? new Error('aborted'); const params = new URLSearchParams({ where: `ISO3='${iso3}' AND date > ${epochToTimestamp(since)}`, outFields: 'portid,portname,ISO3,date,portcalls_tanker,import_tanker,export_tanker', @@ -119,7 +143,7 @@ async function fetchActivityRows(iso3, since) { outSR: '4326', f: 'json', }); - body = await fetchWithTimeout(`${EP3_BASE}?${params}`); + body = await fetchWithTimeout(`${EP3_BASE}?${params}`, { signal }); const features = body.features ?? []; if (features.length) allRows.push(...features); // Advance by actual returned count, not PAGE_SIZE. ArcGIS can cap below @@ -207,17 +231,40 @@ async function redisPipeline(commands) { return resp.json(); } -async function processCountry(iso3, iso2, since, refMap) { - const rawRows = await fetchActivityRows(iso3, since); +async function processCountry(iso3, iso2, since, refMap, { signal } = {}) { + const rawRows = await fetchActivityRows(iso3, since, { signal }); if (!rawRows.length) return null; const ports = computeCountryPorts(rawRows, refMap); if (!ports.length) return null; return { iso2, ports, fetchedAt: new Date().toISOString() }; } +// Runs `doWork(signal)` but rejects if the per-country timer fires first, +// aborting the controller so the in-flight fetch (and its pagination loop) +// actually stops instead of orphaning. Keeps the CONCURRENCY=12 cap real: +// the next batch cannot pile new requests on top of still-running earlier +// work. Exported with an injectable timeoutMs so runtime tests can exercise +// the abort path at 50ms instead of the production 90s. +export function withPerCountryTimeout(doWork, iso3, timeoutMs = PER_COUNTRY_TIMEOUT_MS) { + const controller = new AbortController(); + let timer; + const guard = new Promise((_, reject) => { + timer = setTimeout(() => { + const err = new Error(`per-country timeout after ${timeoutMs / 1000}s (${iso3})`); + try { controller.abort(err); } catch {} + reject(err); + }, timeoutMs); + }); + const work = doWork(controller.signal); + return Promise.race([work, guard]).finally(() => clearTimeout(timer)); +} + // fetchAll() — pure data collection, no Redis writes. // Returns { countries: string[], countryData: Map, fetchedAt: string }. -export async function fetchAll() { +// +// `progress` (optional) is mutated in-place so a SIGTERM handler in main() +// can read the last batch index, seeded count, and error list at kill time. +export async function fetchAll(progress) { const { iso3ToIso2 } = createCountryResolvers(); const since = Date.now() - HISTORY_DAYS * 86400000; @@ -229,33 +276,43 @@ export async function fetchAll() { // Only fetch activity for ISO3s that have at least one port AND exist in our iso3→iso2 map. const eligibleIso3 = [...refsByIso3.keys()].filter(iso3 => iso3ToIso2.has(iso3)); const skipped = refsByIso3.size - eligibleIso3.length; - console.log(` [port-activity] Activity queue: ${eligibleIso3.length} countries (skipping ${skipped} unmapped iso3, concurrency ${CONCURRENCY})`); + console.log(` [port-activity] Activity queue: ${eligibleIso3.length} countries (skipping ${skipped} unmapped iso3, concurrency ${CONCURRENCY}, per-country cap ${PER_COUNTRY_TIMEOUT_MS / 1000}s)`); const countryData = new Map(); - const errors = []; + const errors = progress?.errors ?? []; const batches = Math.ceil(eligibleIso3.length / CONCURRENCY); const activityStart = Date.now(); + if (progress) progress.totalBatches = batches; for (let i = 0; i < eligibleIso3.length; i += CONCURRENCY) { const batch = eligibleIso3.slice(i, i + CONCURRENCY); const batchIdx = Math.floor(i / CONCURRENCY) + 1; - const settled = await Promise.allSettled( - batch.map(iso3 => { - const iso2 = iso3ToIso2.get(iso3); - return processCountry(iso3, iso2, since, refsByIso3.get(iso3)); - }) - ); + if (progress) progress.batchIdx = batchIdx; + const promises = batch.map(iso3 => { + const iso2 = iso3ToIso2.get(iso3); + const p = withPerCountryTimeout( + (signal) => processCountry(iso3, iso2, since, refsByIso3.get(iso3), { signal }), + iso3, + ); + // Eager error flush (review feedback P2 on PR #3222). Push into the + // shared errors array the moment each promise rejects, so a SIGTERM + // that arrives MID-batch (while Promise.allSettled is still pending) + // sees the rejections that have already fired. The settled-loop + // below skips rejected outcomes to avoid double-counting. + p.catch(err => { + errors.push(`${iso3}: ${err?.message || err}`); + }); + return p; + }); + const settled = await Promise.allSettled(promises); for (let j = 0; j < batch.length; j++) { - const iso3 = batch[j]; const outcome = settled[j]; - if (outcome.status === 'rejected') { - errors.push(`${iso3}: ${outcome.reason?.message || outcome.reason}`); - continue; - } + if (outcome.status === 'rejected') continue; // already recorded via .catch above if (!outcome.value) continue; const { iso2, ports, fetchedAt } = outcome.value; countryData.set(iso2, { iso2, ports, fetchedAt }); } + if (progress) progress.seeded = countryData.size; if (batchIdx === 1 || batchIdx % BATCH_LOG_EVERY === 0 || batchIdx === batches) { const elapsed = ((Date.now() - activityStart) / 1000).toFixed(1); console.log(` [port-activity] batch ${batchIdx}/${batches}: ${countryData.size} countries seeded, ${errors.length} errors (${elapsed}s)`); @@ -293,6 +350,12 @@ async function main() { let prevCountryKeys = []; let prevCount = 0; + // Mutated in-place by fetchAll() so the SIGTERM handler can log which batch + // we died in and what the per-country errors looked like. Without this, a + // timeout kill flushes nothing from the errors array — past regressions + // have been undiagnosable for exactly this reason. + const progress = { batchIdx: 0, totalBatches: 0, seeded: 0, errors: [] }; + // Bundle-runner SIGKILLs via SIGTERM → SIGKILL on timeout. Release the lock // and extend existing TTLs synchronously(ish) so the next cron tick isn't // blocked for up to 30 min and the Redis snapshot doesn't evaporate. @@ -300,7 +363,13 @@ async function main() { const onSigterm = async () => { if (sigHandled) return; sigHandled = true; - console.error(' [port-activity] SIGTERM received — releasing lock + extending TTLs'); + console.error( + ` [port-activity] SIGTERM at batch ${progress.batchIdx}/${progress.totalBatches} — ${progress.seeded} seeded, ${progress.errors.length} errors`, + ); + if (progress.errors.length) { + console.error(` [port-activity] First errors: ${progress.errors.slice(0, 10).join('; ')}`); + } + console.error(' [port-activity] Releasing lock + extending TTLs'); try { await extendExistingTtl([CANONICAL_KEY, META_KEY, ...prevCountryKeys], TTL); } catch {} @@ -317,7 +386,7 @@ async function main() { prevCount = Array.isArray(prevIso2List) ? prevIso2List.length : 0; console.log(` Fetching port activity data (${HISTORY_DAYS}d history)...`); - const { countries, countryData } = await fetchAll(); + const { countries, countryData } = await fetchAll(progress); console.log(` Fetched ${countryData.size} countries`); diff --git a/tests/portwatch-port-activity-seed.test.mjs b/tests/portwatch-port-activity-seed.test.mjs index cfb08bd7e..80c1c140a 100644 --- a/tests/portwatch-port-activity-seed.test.mjs +++ b/tests/portwatch-port-activity-seed.test.mjs @@ -3,11 +3,16 @@ import assert from 'node:assert/strict'; import { readFileSync } from 'node:fs'; import { dirname, resolve } from 'node:path'; import { fileURLToPath } from 'node:url'; +import { createRequire } from 'node:module'; + +import { withPerCountryTimeout } from '../scripts/seed-portwatch-port-activity.mjs'; const __dirname = dirname(fileURLToPath(import.meta.url)); const root = resolve(__dirname, '..'); const src = readFileSync(resolve(root, 'scripts/seed-portwatch-port-activity.mjs'), 'utf-8'); +const seedUtilsSrc = readFileSync(resolve(root, 'scripts/_seed-utils.mjs'), 'utf-8'); +const proxyUtilsSrc = readFileSync(resolve(root, 'scripts/_proxy-utils.cjs'), 'utf-8'); // ── seeder source assertions ────────────────────────────────────────────────── @@ -62,6 +67,72 @@ describe('seed-portwatch-port-activity.mjs exports', () => { assert.match(src, /process\.on\('SIGTERM'/); }); + it('defines a per-country timeout to cap Promise.allSettled stalls', () => { + // Without this cap, one slow country (USA: many ports × many pages when + // ArcGIS is throttled) blocks the whole batch via Promise.allSettled and + // cascades to the 420s section timeout, leaving batches 2..N unattempted. + assert.match(src, /PER_COUNTRY_TIMEOUT_MS\s*=\s*\d/); + }); + + it('wraps processCountry with the per-country timeout in the batch loop', () => { + // Must pass a factory (signal) => processCountry(...) so the timer can + // abort the in-flight fetch (PR #3222 review P1), not just race a + // detached promise. + assert.match(src, /withPerCountryTimeout\s*\(\s*\n?\s*\(signal\)\s*=>\s*processCountry/); + }); + + it('fetchWithTimeout combines caller signal with FETCH_TIMEOUT via AbortSignal.any', () => { + // Otherwise the per-country abort cannot propagate into the in-flight + // fetch; orphan pagination would continue with fresh 45s budgets each + // page (PR #3222 review P1). + assert.match(src, /AbortSignal\.any\(\[signal,\s*AbortSignal\.timeout\(FETCH_TIMEOUT\)\]\)/); + }); + + it('fetchActivityRows checks signal.aborted between pages', () => { + assert.match(src, /signal\?\.aborted\)\s*throw\s+signal\.reason/); + }); + + it('429 proxy fallback threads caller signal into httpsProxyFetchRaw', () => { + // Review feedback: without this, a timed-out country can leak a + // proxy CONNECT tunnel for up to FETCH_TIMEOUT (45s) after the + // batch moved on, defeating the concurrency cap under the exact + // throttling scenario this PR addresses. + assert.match(src, /httpsProxyFetchRaw\(url,\s*proxyAuth,\s*\{[^}]*signal\s*\}/s); + }); + + it('httpsProxyFetchRaw accepts and forwards signal', () => { + assert.match(seedUtilsSrc, /httpsProxyFetchRaw\(url,\s*proxyAuth,\s*\{[^}]*signal\s*\}/s); + assert.match(seedUtilsSrc, /proxyFetch\(url,\s*proxyConfig,\s*\{[^}]*signal[^}]*\}/s); + }); + + it('proxyFetch + proxyConnectTunnel accept signal and bail early if aborted', () => { + assert.match(proxyUtilsSrc, /function proxyFetch\([\s\S]*?\bsignal,?\s*\}\s*=\s*\{\}/); + assert.match(proxyUtilsSrc, /function proxyConnectTunnel\([\s\S]*?\bsignal\s*\}\s*=\s*\{\}/); + assert.match(proxyUtilsSrc, /signal && signal\.aborted/); + assert.match(proxyUtilsSrc, /signal\.addEventListener\('abort'/); + }); + + it('eager error flush attaches p.catch before Promise.allSettled', () => { + // Review P2: without this, errors collected only AFTER allSettled + // returns. A mid-batch SIGTERM would flush zero errors even though + // several promises had already rejected. + assert.match(src, /p\.catch\(err\s*=>\s*\{[^}]*errors\.push/); + }); + + it('SIGTERM handler flushes batch progress + first errors', () => { + // Past regressions were undiagnosable because the errors array was only + // logged after all batches completed — a SIGTERM kill discarded it. + assert.match(src, /SIGTERM at batch \$\{progress\.batchIdx\}/); + assert.match(src, /progress\.errors\.slice\(0,\s*10\)/); + }); + + it('fetchAll accepts a progress object and mutates it', () => { + assert.match(src, /export async function fetchAll\(progress\)/); + assert.match(src, /progress\.totalBatches\s*=\s*batches/); + assert.match(src, /progress\.batchIdx\s*=\s*batchIdx/); + assert.match(src, /progress\.seeded\s*=\s*countryData\.size/); + }); + it('pagination advances by actual features.length, not PAGE_SIZE', () => { // ArcGIS PortWatch_ports_database caps responses at 1000 rows even when // resultRecordCount=2000. Advancing by PAGE_SIZE skips rows 1000-1999. @@ -218,6 +289,106 @@ describe('top-N port truncation', () => { }); }); +describe('withPerCountryTimeout (runtime)', () => { + it('aborts the per-country signal when the timer fires', async () => { + let observedSignal; + const p = withPerCountryTimeout( + (signal) => { + observedSignal = signal; + // Never resolves on its own; can only reject via abort. + return new Promise((_, reject) => { + signal.addEventListener('abort', () => reject(signal.reason), { once: true }); + }); + }, + 'TST', + 40, // 40ms — keeps the test fast + ); + + await assert.rejects(p, /per-country timeout after 0\.04s \(TST\)/); + assert.equal(observedSignal.aborted, true, 'underlying work received the abort'); + }); + + it('resolves with the work result when work completes before the timer', async () => { + const result = await withPerCountryTimeout( + (_signal) => Promise.resolve({ ok: true }), + 'TST', + 500, + ); + assert.deepEqual(result, { ok: true }); + }); + + it('does not invoke the timer path when work rejects first', async () => { + // Rejecting with a non-timeout error should surface as-is, not as the + // per-country timeout message. + await assert.rejects( + withPerCountryTimeout( + (_signal) => Promise.reject(new Error('ArcGIS HTTP 500')), + 'TST', + 1_000, + ), + /ArcGIS HTTP 500/, + ); + }); +}); + +describe('proxyFetch signal propagation (runtime)', () => { + const require_ = createRequire(import.meta.url); + const { proxyFetch } = require_('../scripts/_proxy-utils.cjs'); + + it('rejects synchronously when called with an already-aborted signal', async () => { + // Review feedback: the per-country AbortController must kill the proxy + // fallback too. Pre-aborted signals must short-circuit BEFORE any + // CONNECT tunnel opens; otherwise a timed-out country's proxy call + // continues in the background. No network reached in this test — the + // synchronous aborted check is the guard. + const controller = new AbortController(); + controller.abort(new Error('test-cancel')); + await assert.rejects( + proxyFetch('https://example.invalid/x', { host: 'nope', port: 1, auth: 'a:b', tls: true }, { + timeoutMs: 60_000, + signal: controller.signal, + }), + /test-cancel|aborted/, + ); + }); +}); + +describe('eager error flush (runtime)', () => { + it('populates shared errors via p.catch before Promise.allSettled resolves', async () => { + // Mirrors the wiring in fetchAll(): attach p.catch to each promise so + // rejections land in the errors array at the moment they fire, not + // only after allSettled. A SIGTERM that hits during allSettled still + // sees the already-pushed errors. + const errors = []; + let stuckResolve; + + const rejecting = Promise.reject(new Error('boom A')); + rejecting.catch(err => errors.push(`A: ${err.message}`)); + + const stuck = new Promise(resolve => { stuckResolve = resolve; }); + stuck.catch(err => errors.push(`B: ${err.message}`)); + + // Yield microtasks so `rejecting.catch` fires. + await Promise.resolve(); + await Promise.resolve(); + + assert.deepEqual(errors, ['A: boom A'], 'rejected promise pushed BEFORE allSettled is even awaited'); + + // Sanity: the hung promise still blocks allSettled, proving the + // behavior we rely on: errors from resolved-rejected members are + // visible to a SIGTERM handler even while the batch itself is stuck. + const allSettledPromise = Promise.allSettled([rejecting, stuck]); + let settledEarly = false; + allSettledPromise.then(() => { settledEarly = true; }); + await new Promise(r => setTimeout(r, 10)); + assert.equal(settledEarly, false, 'allSettled still pending while one member is stuck'); + assert.equal(errors.length, 1, 'error list observable despite pending batch'); + + stuckResolve(); + await allSettledPromise; + }); +}); + describe('validateFn', () => { it('returns true when countries array has >= 50 entries', () => { const data = { countries: Array.from({ length: 80 }, (_, i) => `C${i}`), fetchedAt: new Date().toISOString() };