mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
feat(portwatch): H+F — cache by upstream maxDate + parallel window split (#3299)
This commit is contained in:
@@ -32,15 +32,11 @@ const EP4_BASE =
|
||||
|
||||
const PAGE_SIZE = 2000;
|
||||
const FETCH_TIMEOUT = 45_000;
|
||||
// 60 days. Enough to cover both window aggregates used by the UI
|
||||
// (last30 for current metrics + prev30 = days 30-60 for trendDelta),
|
||||
// without the extra 30d of tail data that we never actually look at.
|
||||
// Cutting from 90→60 days drops each per-country query by ~33% in row
|
||||
// count and page count — prod log on 2026-04-21 00:02Z showed 90d
|
||||
// per-country pagination averaging ~75s/batch at concurrency 12, which
|
||||
// mathematically cannot fit 15 batches into the 540s section budget.
|
||||
// 60 days should bring avg batch time down enough for a full publish.
|
||||
const HISTORY_DAYS = 60;
|
||||
// Two aggregation windows, hardcoded in fetchCountryAccum:
|
||||
// last30 = days 0-30 → tankerCalls30d, avg30d, import/export sums
|
||||
// prev30 = days 30-60 → trendDelta baseline
|
||||
// Any change to these window sizes must update BOTH the WHERE clauses
|
||||
// in paginateWindowInto callers AND the cutoff* math in fetchCountryAccum.
|
||||
const MAX_PORTS_PER_COUNTRY = 50;
|
||||
|
||||
// Per-country budget. ArcGIS's ISO3 index makes per-country fetches O(rows-in-country),
|
||||
@@ -50,6 +46,16 @@ const MAX_PORTS_PER_COUNTRY = 50;
|
||||
const PER_COUNTRY_TIMEOUT_MS = 90_000;
|
||||
const CONCURRENCY = 12;
|
||||
const BATCH_LOG_EVERY = 5;
|
||||
// Cache hygiene: force a full refetch if the cached payload is older than 7 days
|
||||
// even when upstream maxDate is unchanged. Protects against window-shift drift
|
||||
// (cached aggregates were computed against a window that's now 7+ days offset
|
||||
// from today's last30/prev30 cutoffs) and serves as a belt-and-braces refresh
|
||||
// if the maxDate check ever silently short-circuits.
|
||||
const MAX_CACHE_AGE_MS = 7 * 86_400_000;
|
||||
// Concurrency for the cheap per-country maxDate preflight. These are tiny
|
||||
// outStatistics queries (returns 1 row), so we can push harder than the
|
||||
// expensive fetch concurrency without tripping ArcGIS 429s in practice.
|
||||
const PREFLIGHT_CONCURRENCY = 24;
|
||||
|
||||
function epochToTimestamp(epochMs) {
|
||||
const d = new Date(epochMs);
|
||||
@@ -139,27 +145,17 @@ async function fetchAllPortRefs({ signal } = {}) {
|
||||
return byIso3;
|
||||
}
|
||||
|
||||
// Fetch ONE country's activity rows, streaming into per-port accumulators.
|
||||
// ArcGIS's ISO3 index makes this cheap for most countries (~3-9s typical).
|
||||
// Heavy countries (USA/CHN/etc.) can still be 30-60s because 60 days × their many
|
||||
// ports = thousands of rows across multiple pages. Hence the per-country
|
||||
// timeout + single retry.
|
||||
//
|
||||
// Returns Map<portId, PortAccum> — same shape `finalisePortsForCountry`
|
||||
// consumes. Memory per country is O(unique ports for that country) ≈ <200.
|
||||
async function fetchCountryAccum(iso3, since, { signal } = {}) {
|
||||
const now = Date.now();
|
||||
const cutoff30 = now - 30 * 86400000;
|
||||
const cutoff60 = now - 60 * 86400000;
|
||||
const cutoff7 = now - 7 * 86400000;
|
||||
|
||||
const portAccumMap = new Map();
|
||||
// Paginate a single ArcGIS EP3 window into per-port accumulators. Called
|
||||
// twice per country — once for each aggregation window (last30, prev30) —
|
||||
// in parallel so heavy countries no longer have to serialise through both
|
||||
// windows inside a single 90s cap.
|
||||
async function paginateWindowInto(portAccumMap, iso3, where, windowKind, { signal } = {}) {
|
||||
let offset = 0;
|
||||
let body;
|
||||
do {
|
||||
if (signal?.aborted) throw signal.reason ?? new Error('aborted');
|
||||
const params = new URLSearchParams({
|
||||
where: `ISO3='${iso3}' AND date > ${epochToTimestamp(since)}`,
|
||||
where,
|
||||
outFields: 'portid,portname,ISO3,date,portcalls_tanker,import_tanker,export_tanker',
|
||||
returnGeometry: 'false',
|
||||
orderByFields: 'portid ASC,date ASC',
|
||||
@@ -174,47 +170,135 @@ async function fetchCountryAccum(iso3, since, { signal } = {}) {
|
||||
const a = f.attributes;
|
||||
if (!a || a.portid == null || a.date == null) continue;
|
||||
const portId = String(a.portid);
|
||||
// ArcGIS date is esriFieldTypeDateOnly → "YYYY-MM-DD" string (or epoch ms).
|
||||
const date = typeof a.date === 'number' ? a.date : Date.parse(a.date + 'T12:00:00Z');
|
||||
const calls = Number(a.portcalls_tanker ?? 0);
|
||||
const imports = Number(a.import_tanker ?? 0);
|
||||
const exports_ = Number(a.export_tanker ?? 0);
|
||||
|
||||
// JS is single-threaded; two concurrent paginateWindowInto calls never
|
||||
// hit the `get`/`set` pair here in interleaved fashion because there's
|
||||
// no `await` between them. So this is safe without a mutex.
|
||||
let acc = portAccumMap.get(portId);
|
||||
if (!acc) {
|
||||
acc = {
|
||||
portname: String(a.portname || ''),
|
||||
last30_calls: 0, last30_count: 0, last30_import: 0, last30_export: 0,
|
||||
prev30_calls: 0,
|
||||
last7_calls: 0, last7_count: 0,
|
||||
};
|
||||
portAccumMap.set(portId, acc);
|
||||
}
|
||||
if (date >= cutoff30) {
|
||||
if (windowKind === 'last30') {
|
||||
acc.last30_calls += calls;
|
||||
acc.last30_count += 1;
|
||||
acc.last30_import += imports;
|
||||
acc.last30_export += exports_;
|
||||
if (date >= cutoff7) {
|
||||
acc.last7_calls += calls;
|
||||
acc.last7_count += 1;
|
||||
}
|
||||
} else if (date >= cutoff60) {
|
||||
} else {
|
||||
// windowKind === 'prev30'
|
||||
acc.prev30_calls += calls;
|
||||
}
|
||||
}
|
||||
if (features.length === 0) break;
|
||||
offset += features.length;
|
||||
} while (body.exceededTransferLimit);
|
||||
}
|
||||
|
||||
// Parse a "YYYY-MM-DD" string (from ArcGIS outStatistics max(date)) into an
|
||||
// epoch-ms anchor used as the upper bound of the last30 window. Uses the
|
||||
// END of the day (23:59:59.999 UTC) so rows dated exactly maxDate still
|
||||
// satisfy `date <= anchor`. Returns null on parse failure; callers fall
|
||||
// back to `Date.now()` when anchor is null.
|
||||
function parseMaxDateToAnchor(maxDateStr) {
|
||||
if (!maxDateStr || typeof maxDateStr !== 'string') return null;
|
||||
const ts = Date.parse(maxDateStr + 'T23:59:59.999Z');
|
||||
return Number.isFinite(ts) ? ts : null;
|
||||
}
|
||||
|
||||
// Fetch ONE country's activity rows, streaming into per-port accumulators.
|
||||
// Splits into TWO parallel windowed queries:
|
||||
// - Q1 (last30): WHERE ISO3='X' AND date > cutoff30
|
||||
// - Q2 (prev30): WHERE ISO3='X' AND date > cutoff60 AND date <= cutoff30
|
||||
// Each returns ~half the rows a single 60-day query would. Heavy countries
|
||||
// (USA/CHN/etc.) drop from ~90s → ~30s because max(Q1,Q2) < Q1+Q2.
|
||||
//
|
||||
// The window ANCHOR is upstream max(date), not `Date.now()`. This makes the
|
||||
// aggregate stable across cron runs whenever upstream hasn't advanced —
|
||||
// which is essential for the H-path cache (see fetchAll). Without the
|
||||
// anchor, rolling `now - 30d` windows shift every day even when upstream
|
||||
// is frozen, so `tankerCalls30d` would drift day-over-day and cache reuse
|
||||
// would serve stale aggregates. PR #3299 review P1.
|
||||
//
|
||||
// `last7` aggregation was removed: ArcGIS's Daily_Ports_Data max date lags
|
||||
// ~10 days behind real-time, so the last-7-day window was always empty and
|
||||
// anomalySignal always false. Not a feature regression — it was already dead.
|
||||
//
|
||||
// Returns Map<portId, PortAccum>. Memory per country is O(unique ports) ≈ <200.
|
||||
async function fetchCountryAccum(iso3, { signal, anchorEpochMs } = {}) {
|
||||
const anchor = anchorEpochMs ?? Date.now();
|
||||
const cutoff30 = anchor - 30 * 86400000;
|
||||
const cutoff60 = anchor - 60 * 86400000;
|
||||
|
||||
const portAccumMap = new Map();
|
||||
|
||||
await Promise.all([
|
||||
paginateWindowInto(
|
||||
portAccumMap,
|
||||
iso3,
|
||||
`ISO3='${iso3}' AND date > ${epochToTimestamp(cutoff30)}`,
|
||||
'last30',
|
||||
{ signal },
|
||||
),
|
||||
paginateWindowInto(
|
||||
portAccumMap,
|
||||
iso3,
|
||||
`ISO3='${iso3}' AND date > ${epochToTimestamp(cutoff60)} AND date <= ${epochToTimestamp(cutoff30)}`,
|
||||
'prev30',
|
||||
{ signal },
|
||||
),
|
||||
]);
|
||||
|
||||
return portAccumMap;
|
||||
}
|
||||
|
||||
// Cheap preflight: single outStatistics query returning max(date) for one
|
||||
// country. Used to skip the expensive fetch when upstream data hasn't
|
||||
// advanced since the last cached run. ~1-2s per call at ArcGIS's current
|
||||
// steady-state. Returns ISO date string "YYYY-MM-DD" or null on any error
|
||||
// (we then fall through to the expensive path, which has its own retry).
|
||||
async function fetchMaxDate(iso3, { signal } = {}) {
|
||||
const outStats = JSON.stringify([{
|
||||
statisticType: 'max',
|
||||
onStatisticField: 'date',
|
||||
outStatisticFieldName: 'max_date',
|
||||
}]);
|
||||
const params = new URLSearchParams({
|
||||
where: `ISO3='${iso3}'`,
|
||||
outStatistics: outStats,
|
||||
f: 'json',
|
||||
});
|
||||
try {
|
||||
const body = await fetchWithRetryOnInvalidParams(`${EP3_BASE}?${params}`, { signal });
|
||||
const attrs = body.features?.[0]?.attributes;
|
||||
if (!attrs) return null;
|
||||
const raw = attrs.max_date;
|
||||
if (raw == null) return null;
|
||||
// ArcGIS may return max(date) as epoch ms OR ISO string depending on field type
|
||||
// (esriFieldTypeDate vs esriFieldTypeDateOnly). Normalize to YYYY-MM-DD.
|
||||
if (typeof raw === 'number') {
|
||||
const d = new Date(raw);
|
||||
return `${d.getUTCFullYear()}-${String(d.getUTCMonth() + 1).padStart(2, '0')}-${String(d.getUTCDate()).padStart(2, '0')}`;
|
||||
}
|
||||
return String(raw).slice(0, 10);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export function finalisePortsForCountry(portAccumMap, refMap) {
|
||||
const ports = [];
|
||||
for (const [portId, a] of portAccumMap) {
|
||||
const avg30d = a.last30_count > 0 ? a.last30_calls / a.last30_count : 0;
|
||||
const avg7d = a.last7_count > 0 ? a.last7_calls / a.last7_count : 0;
|
||||
const anomalySignal = avg30d > 0 && avg7d < avg30d * 0.5;
|
||||
// anomalySignal dropped: ArcGIS dataset max date lags 10+ days behind
|
||||
// real-time, so the last-7-day window always returned 0 rows and
|
||||
// anomalySignal was always false. Removed the dead aggregation in the
|
||||
// H+F refactor rather than plumbing a now-always-false field.
|
||||
const trendDelta = a.prev30_calls > 0
|
||||
? Math.round(((a.last30_calls - a.prev30_calls) / a.prev30_calls) * 1000) / 10
|
||||
: 0;
|
||||
@@ -228,7 +312,10 @@ export function finalisePortsForCountry(portAccumMap, refMap) {
|
||||
trendDelta,
|
||||
importTankerDwt30d: a.last30_import,
|
||||
exportTankerDwt30d: a.last30_export,
|
||||
anomalySignal,
|
||||
// Preserve field for downstream consumers but always false now.
|
||||
// TODO: Remove once UI stops reading it; ports.proto already tolerates
|
||||
// the missing field in future responses.
|
||||
anomalySignal: false,
|
||||
});
|
||||
}
|
||||
return ports
|
||||
@@ -270,6 +357,25 @@ async function redisPipeline(commands) {
|
||||
return resp.json();
|
||||
}
|
||||
|
||||
// MGET-style batch read via the Upstash REST /pipeline endpoint. Returns an
|
||||
// array aligned with `keys` where each element is either the parsed JSON
|
||||
// payload or null (for missing/unparseable/errored keys). Used to prime the
|
||||
// per-country cache lookup in one round-trip instead of 174 sequential GETs.
|
||||
async function redisMgetJson(keys) {
|
||||
if (keys.length === 0) return [];
|
||||
const commands = keys.map((k) => ['GET', k]);
|
||||
const results = await redisPipeline(commands);
|
||||
return results.map((r, idx) => {
|
||||
if (r?.error) return null;
|
||||
const raw = r?.result;
|
||||
if (raw == null) return null;
|
||||
try { return JSON.parse(raw); } catch {
|
||||
console.warn(` [port-activity] redisMget: skipping unparseable cached payload for ${keys[idx]}`);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// fetchAll() — pure data collection, no Redis writes.
|
||||
// Returns { countries: string[], countryData: Map<iso2, payload>, fetchedAt: string }.
|
||||
//
|
||||
@@ -277,7 +383,6 @@ async function redisPipeline(commands) {
|
||||
// can report which batch / country we died on.
|
||||
export async function fetchAll(progress, { signal } = {}) {
|
||||
const { iso3ToIso2 } = createCountryResolvers();
|
||||
const since = Date.now() - HISTORY_DAYS * 86400000;
|
||||
|
||||
if (progress) progress.stage = 'refs';
|
||||
console.log(' [port-activity] Fetching global port reference (EP4)...');
|
||||
@@ -285,25 +390,103 @@ export async function fetchAll(progress, { signal } = {}) {
|
||||
const refsByIso3 = await fetchAllPortRefs({ signal });
|
||||
console.log(` [port-activity] Refs loaded: ${refsByIso3.size} countries with ports (${((Date.now() - t0) / 1000).toFixed(1)}s)`);
|
||||
|
||||
if (progress) progress.stage = 'activity';
|
||||
const eligibleIso3 = [...refsByIso3.keys()].filter(iso3 => iso3ToIso2.has(iso3));
|
||||
const skipped = refsByIso3.size - eligibleIso3.length;
|
||||
const batches = Math.ceil(eligibleIso3.length / CONCURRENCY);
|
||||
if (progress) progress.totalBatches = batches;
|
||||
console.log(` [port-activity] Activity queue: ${eligibleIso3.length} countries (skipping ${skipped} unmapped iso3, concurrency ${CONCURRENCY}, per-country cap ${PER_COUNTRY_TIMEOUT_MS / 1000}s)`);
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────
|
||||
// Preflight: load every country's previous payload in one MGET pipeline.
|
||||
// Payloads written by this script since the H+F refactor carry an `asof`
|
||||
// (upstream max(date) at the time of the last successful fetch) and a
|
||||
// `cacheWrittenAt` (ms epoch). We re-use them as-is when both of the
|
||||
// following hold:
|
||||
// 1. upstream max(date) for the country is unchanged since `asof`
|
||||
// 2. `cacheWrittenAt` is within MAX_CACHE_AGE_MS
|
||||
// Either check failing → fall through to the expensive paginated fetch.
|
||||
//
|
||||
// Cold run (no cache / legacy payloads without asof) always falls through.
|
||||
// ─────────────────────────────────────────────────────────────────────────
|
||||
if (progress) progress.stage = 'cache-lookup';
|
||||
const cacheT0 = Date.now();
|
||||
const prevKeys = eligibleIso3.map((iso3) => `${KEY_PREFIX}${iso3ToIso2.get(iso3)}`);
|
||||
// A transient Upstash outage at run-start must NOT abort the seed before
|
||||
// any ArcGIS data is fetched — that's a regression from the previous
|
||||
// behaviour where Redis was only required at the final write. On MGET
|
||||
// failure, degrade to cold-path: treat every country as a cache miss
|
||||
// and re-fetch. The write at run-end will retry its own Redis calls
|
||||
// and fail loudly if Redis is genuinely down then too. PR #3299 review P1.
|
||||
const prevPayloads = await redisMgetJson(prevKeys).catch((err) => {
|
||||
console.warn(` [port-activity] cache MGET failed (${err?.message || err}) — treating all countries as cache miss`);
|
||||
return new Array(prevKeys.length).fill(null);
|
||||
});
|
||||
console.log(` [port-activity] Loaded ${prevPayloads.filter(Boolean).length}/${prevKeys.length} cached payloads (${((Date.now() - cacheT0) / 1000).toFixed(1)}s)`);
|
||||
|
||||
// Preflight: maxDate check for every eligible country in parallel.
|
||||
// Each request is tiny (1 row outStatistics), so we push to PREFLIGHT_CONCURRENCY
|
||||
// which is higher than the expensive-fetch CONCURRENCY.
|
||||
if (progress) progress.stage = 'preflight';
|
||||
const preflightT0 = Date.now();
|
||||
const maxDates = new Array(eligibleIso3.length).fill(null);
|
||||
for (let i = 0; i < eligibleIso3.length; i += PREFLIGHT_CONCURRENCY) {
|
||||
if (signal?.aborted) throw signal.reason ?? new Error('aborted');
|
||||
const slice = eligibleIso3.slice(i, i + PREFLIGHT_CONCURRENCY);
|
||||
const settled = await Promise.allSettled(
|
||||
slice.map((iso3) => fetchMaxDate(iso3, { signal })),
|
||||
);
|
||||
for (let j = 0; j < slice.length; j++) {
|
||||
const r = settled[j];
|
||||
maxDates[i + j] = r.status === 'fulfilled' ? r.value : null;
|
||||
}
|
||||
}
|
||||
console.log(` [port-activity] Preflight maxDate for ${eligibleIso3.length} countries (${((Date.now() - preflightT0) / 1000).toFixed(1)}s)`);
|
||||
|
||||
// Partition: cache hits (reusable) vs misses (need expensive fetch).
|
||||
const countryData = new Map();
|
||||
const needsFetch = [];
|
||||
let cacheHits = 0;
|
||||
const now = Date.now();
|
||||
for (let i = 0; i < eligibleIso3.length; i++) {
|
||||
const iso3 = eligibleIso3[i];
|
||||
const iso2 = iso3ToIso2.get(iso3);
|
||||
const upstreamMaxDate = maxDates[i];
|
||||
const prev = prevPayloads[i];
|
||||
const cacheFresh = prev && typeof prev === 'object'
|
||||
&& prev.asof === upstreamMaxDate
|
||||
&& upstreamMaxDate != null
|
||||
&& typeof prev.cacheWrittenAt === 'number'
|
||||
&& (now - prev.cacheWrittenAt) < MAX_CACHE_AGE_MS;
|
||||
if (cacheFresh) {
|
||||
countryData.set(iso2, prev);
|
||||
cacheHits++;
|
||||
} else {
|
||||
needsFetch.push({ iso3, iso2, upstreamMaxDate });
|
||||
}
|
||||
}
|
||||
console.log(` [port-activity] Cache: ${cacheHits} hits, ${needsFetch.length} misses`);
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────
|
||||
// Expensive path: paginated fetch for cache misses only.
|
||||
// ─────────────────────────────────────────────────────────────────────────
|
||||
if (progress) progress.stage = 'activity';
|
||||
const batches = Math.ceil(needsFetch.length / CONCURRENCY);
|
||||
if (progress) progress.totalBatches = batches;
|
||||
console.log(` [port-activity] Activity queue: ${needsFetch.length} countries (skipped ${cacheHits} via cache, ${skipped} unmapped, concurrency ${CONCURRENCY}, per-country cap ${PER_COUNTRY_TIMEOUT_MS / 1000}s)`);
|
||||
|
||||
const errors = progress?.errors ?? [];
|
||||
const activityStart = Date.now();
|
||||
|
||||
for (let i = 0; i < eligibleIso3.length; i += CONCURRENCY) {
|
||||
const batch = eligibleIso3.slice(i, i + CONCURRENCY);
|
||||
for (let i = 0; i < needsFetch.length; i += CONCURRENCY) {
|
||||
const batch = needsFetch.slice(i, i + CONCURRENCY);
|
||||
const batchIdx = Math.floor(i / CONCURRENCY) + 1;
|
||||
if (progress) progress.batchIdx = batchIdx;
|
||||
|
||||
const promises = batch.map(iso3 => {
|
||||
const promises = batch.map(({ iso3, upstreamMaxDate }) => {
|
||||
// Anchor the rolling windows to upstream max(date) so the aggregate
|
||||
// is stable day-over-day when upstream is frozen (required for cache
|
||||
// reuse to be semantically correct — see PR #3299 review P1).
|
||||
// Falls back to Date.now() when preflight returned null.
|
||||
const anchorEpochMs = parseMaxDateToAnchor(upstreamMaxDate);
|
||||
const p = withPerCountryTimeout(
|
||||
(childSignal) => fetchCountryAccum(iso3, since, { signal: childSignal }),
|
||||
(childSignal) => fetchCountryAccum(iso3, { signal: childSignal, anchorEpochMs }),
|
||||
iso3,
|
||||
);
|
||||
// Eager error flush so a SIGTERM mid-batch captures rejections that
|
||||
@@ -314,21 +497,29 @@ export async function fetchAll(progress, { signal } = {}) {
|
||||
const settled = await Promise.allSettled(promises);
|
||||
|
||||
for (let j = 0; j < batch.length; j++) {
|
||||
const iso3 = batch[j];
|
||||
const { iso3, iso2, upstreamMaxDate } = batch[j];
|
||||
const outcome = settled[j];
|
||||
if (outcome.status === 'rejected') continue; // already recorded via .catch
|
||||
const portAccumMap = outcome.value;
|
||||
if (!portAccumMap || portAccumMap.size === 0) continue;
|
||||
const ports = finalisePortsForCountry(portAccumMap, refsByIso3.get(iso3));
|
||||
if (!ports.length) continue;
|
||||
const iso2 = iso3ToIso2.get(iso3);
|
||||
countryData.set(iso2, { iso2, ports, fetchedAt: new Date().toISOString() });
|
||||
countryData.set(iso2, {
|
||||
iso2,
|
||||
ports,
|
||||
fetchedAt: new Date().toISOString(),
|
||||
// Cache fields. `asof` may be null if preflight failed; that's fine —
|
||||
// next run will always be a miss (null !== any string) so we'll
|
||||
// re-fetch and repopulate.
|
||||
asof: upstreamMaxDate,
|
||||
cacheWrittenAt: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
if (progress) progress.seeded = countryData.size;
|
||||
if (batchIdx === 1 || batchIdx % BATCH_LOG_EVERY === 0 || batchIdx === batches) {
|
||||
const elapsed = ((Date.now() - activityStart) / 1000).toFixed(1);
|
||||
console.log(` [port-activity] batch ${batchIdx}/${batches}: ${countryData.size} countries seeded, ${errors.length} errors (${elapsed}s)`);
|
||||
console.log(` [port-activity] batch ${batchIdx}/${batches}: ${countryData.size} countries published, ${errors.length} errors (${elapsed}s)`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -398,7 +589,7 @@ async function main() {
|
||||
prevCountryKeys = Array.isArray(prevIso2List) ? prevIso2List.map(iso2 => `${KEY_PREFIX}${iso2}`) : [];
|
||||
prevCount = Array.isArray(prevIso2List) ? prevIso2List.length : 0;
|
||||
|
||||
console.log(` Fetching port activity data (${HISTORY_DAYS}d history)...`);
|
||||
console.log(` Fetching port activity data (60d: last30 + prev30 windows)...`);
|
||||
const { countries, countryData } = await fetchAll(progress, { signal: shutdownController.signal });
|
||||
|
||||
console.log(` Fetched ${countryData.size} countries`);
|
||||
|
||||
@@ -48,8 +48,11 @@ describe('seed-portwatch-port-activity.mjs exports', () => {
|
||||
// After the PR #3225 globalisation failed in prod, we restored the
|
||||
// per-country shape because ArcGIS has an ISO3 index but NO date
|
||||
// index — the per-country filter is what keeps queries fast.
|
||||
assert.match(src, /where:\s*`ISO3='\$\{iso3\}'\s+AND\s+date\s*>/);
|
||||
// Global where=date>X shape must NOT be present any more.
|
||||
// H+F refactor: the WHERE clause is now built inline at the
|
||||
// paginateWindowInto call site (not as a `where:` param in a params
|
||||
// bag) because each window has a different date predicate.
|
||||
assert.match(src, /`ISO3='\$\{iso3\}'\s+AND\s+date\s*>/);
|
||||
// Global where=date>X shape (PR #3225) must NOT be present.
|
||||
assert.doesNotMatch(src, /where:\s*`date\s*>\s*\$\{epochToTimestamp\(since\)\}`/);
|
||||
});
|
||||
|
||||
@@ -108,7 +111,50 @@ describe('seed-portwatch-port-activity.mjs exports', () => {
|
||||
assert.match(src, /async function fetchCountryAccum/);
|
||||
assert.match(src, /last30_calls:\s*0/);
|
||||
assert.match(src, /prev30_calls:\s*0/);
|
||||
assert.match(src, /last7_calls:\s*0/);
|
||||
// last7 aggregation removed — ArcGIS max-date lag made it always empty,
|
||||
// so anomalySignal was always false. See fetchCountryAccum header.
|
||||
assert.doesNotMatch(src, /last7_calls:\s*0/);
|
||||
});
|
||||
|
||||
it('fetchCountryAccum splits windows (last30 + prev30) into parallel queries', () => {
|
||||
// Heavy countries hit the 90s per-country cap under a single 60-day
|
||||
// query. Splitting into two parallel windowed queries (max ~half the
|
||||
// rows each) drops heavy-country time from ~90s → ~30s.
|
||||
assert.match(src, /await Promise\.all\(\[/);
|
||||
assert.match(src, /paginateWindowInto\(/);
|
||||
assert.match(src, /'last30'/);
|
||||
assert.match(src, /'prev30'/);
|
||||
});
|
||||
|
||||
it('fetchMaxDate preflight uses outStatistics for cheap cache invalidation', () => {
|
||||
assert.match(src, /async function fetchMaxDate/);
|
||||
assert.match(src, /statisticType:\s*'max'/);
|
||||
assert.match(src, /onStatisticField:\s*'date'/);
|
||||
});
|
||||
|
||||
it('fetchAll cache path: MGET preflight + maxDate check + reuse payload', () => {
|
||||
// H+F architecture: preflight reads prior payloads and maxDate, reuses
|
||||
// cache when upstream hasn't advanced. Without this, we re-fetched the
|
||||
// full 60 days every day even when ArcGIS hadn't published new rows.
|
||||
assert.match(src, /redisMgetJson/);
|
||||
assert.match(src, /async function redisMgetJson/);
|
||||
assert.match(src, /prev\.asof\s*===\s*upstreamMaxDate/);
|
||||
assert.match(src, /MAX_CACHE_AGE_MS/);
|
||||
});
|
||||
|
||||
it('cached payloads store asof + cacheWrittenAt for next-run invalidation', () => {
|
||||
assert.match(src, /asof:\s*upstreamMaxDate/);
|
||||
assert.match(src, /cacheWrittenAt:\s*Date\.now\(\)/);
|
||||
});
|
||||
|
||||
it('redisMgetJson failure degrades to cold-path (does not abort the seed)', () => {
|
||||
// PR #3299 review P1: a transient Upstash outage at run-start used to
|
||||
// abort the seed before any ArcGIS data was fetched — regression from
|
||||
// the prior behaviour where Redis was only required at write-time.
|
||||
// The MGET call is now wrapped in .catch that returns all-null so
|
||||
// every country falls through to the expensive-fetch path.
|
||||
assert.match(src, /redisMgetJson\(prevKeys\)\.catch\(/);
|
||||
assert.match(src, /new Array\(prevKeys\.length\)\.fill\(null\)/);
|
||||
});
|
||||
|
||||
it('registers SIGTERM + SIGINT + aborts shutdownController', () => {
|
||||
@@ -134,20 +180,40 @@ describe('seed-portwatch-port-activity.mjs exports', () => {
|
||||
assert.match(src, /LOCK_TTL_MS\s*=\s*60\s*\*\s*60\s*\*\s*1000/);
|
||||
});
|
||||
|
||||
it('anomalySignal computation is present', () => {
|
||||
assert.match(src, /anomalySignal/);
|
||||
it('anomalySignal field is still emitted (always false after H+F refactor)', () => {
|
||||
// The field stays in the payload shape for backward compatibility with
|
||||
// UI consumers reading `anomalySignal`. After H+F it is hardcoded to
|
||||
// false because the last7 aggregation that drove it was always empty
|
||||
// (ArcGIS data lag). TODO remove field once UI stops reading it.
|
||||
assert.match(src, /anomalySignal:\s*false/);
|
||||
});
|
||||
|
||||
it('MAX_PORTS_PER_COUNTRY is 50', () => {
|
||||
assert.match(src, /MAX_PORTS_PER_COUNTRY\s*=\s*50/);
|
||||
});
|
||||
|
||||
it('HISTORY_DAYS is 60 (enough for last30 + prev30/trendDelta, no more)', () => {
|
||||
// 90d was the prior default but prod log 2026-04-21 00:02Z showed
|
||||
// per-country pagination at 90d cannot fit 174 countries in the 540s
|
||||
// section budget even in a standalone Railway cron. 60d is the minimum
|
||||
// that still covers the UI's trendDelta window (prev30 = days 30-60).
|
||||
assert.match(src, /const\s+HISTORY_DAYS\s*=\s*60\b/);
|
||||
it('window cutoffs hardcoded to 30d + 60d anchored to upstream maxDate', () => {
|
||||
// HISTORY_DAYS constant was removed in the H+F refactor because the
|
||||
// actual windows are hardcoded in fetchCountryAccum. 60d is the
|
||||
// minimum that still covers trendDelta (prev30 = days 30-60).
|
||||
//
|
||||
// PR #3299 review P1: windows are anchored to upstream max(date),
|
||||
// not Date.now(), so the aggregate is STABLE day-over-day when
|
||||
// upstream is frozen. Without this, rolling `now - 30d` shifts the
|
||||
// window every day and the cache serves stale aggregates.
|
||||
assert.match(src, /anchor - 30 \* 86400000/);
|
||||
assert.match(src, /anchor - 60 \* 86400000/);
|
||||
// And the anchor is derived from the preflight maxDate, not just Date.now:
|
||||
assert.match(src, /function parseMaxDateToAnchor/);
|
||||
assert.match(src, /const anchor = anchorEpochMs \?\? Date\.now\(\)/);
|
||||
});
|
||||
|
||||
it('fetchCountryAccum receives anchorEpochMs at the call site', () => {
|
||||
// The call site must thread the parsed maxDate anchor into
|
||||
// fetchCountryAccum — otherwise the windows default to Date.now()
|
||||
// and cache reuse serves stale data (defeats the H-path entirely).
|
||||
assert.match(src, /parseMaxDateToAnchor\(upstreamMaxDate\)/);
|
||||
assert.match(src, /fetchCountryAccum\(iso3,\s*\{\s*signal:\s*childSignal,\s*anchorEpochMs\s*\}\)/);
|
||||
});
|
||||
|
||||
it('TTL is 259200 (3 days)', () => {
|
||||
@@ -326,7 +392,7 @@ describe('finalisePortsForCountry (runtime, semantic equivalence)', () => {
|
||||
({ finalisePortsForCountry } = await import('../scripts/seed-portwatch-port-activity.mjs'));
|
||||
});
|
||||
|
||||
it('emits tankerCalls30d / trendDelta / anomalySignal that match the old per-row formula', () => {
|
||||
it('emits tankerCalls30d + trendDelta + import/export sums; anomalySignal always false', () => {
|
||||
const portAccumMap = new Map([
|
||||
['42', {
|
||||
portname: 'Test Port',
|
||||
@@ -335,8 +401,6 @@ describe('finalisePortsForCountry (runtime, semantic equivalence)', () => {
|
||||
last30_import: 1000,
|
||||
last30_export: 500,
|
||||
prev30_calls: 40 * 30,
|
||||
last7_calls: 20 * 7,
|
||||
last7_count: 7,
|
||||
}],
|
||||
]);
|
||||
const refMap = new Map([['42', { lat: 10, lon: 20 }]]);
|
||||
@@ -346,12 +410,15 @@ describe('finalisePortsForCountry (runtime, semantic equivalence)', () => {
|
||||
assert.equal(port.exportTankerDwt30d, 500);
|
||||
const expectedTrend = Math.round(((60 * 23 + 20 * 7 - 40 * 30) / (40 * 30)) * 1000) / 10;
|
||||
assert.equal(port.trendDelta, expectedTrend);
|
||||
assert.equal(port.anomalySignal, true);
|
||||
// anomalySignal is hardcoded false post-H+F. See finalisePortsForCountry
|
||||
// header for rationale (last7 aggregation was always empty due to
|
||||
// ArcGIS max-date lag, so the field was always false anyway).
|
||||
assert.equal(port.anomalySignal, false);
|
||||
});
|
||||
|
||||
it('trendDelta=0 when prev30_calls=0', () => {
|
||||
const portAccumMap = new Map([
|
||||
['1', { portname: 'P', last30_calls: 100, last30_count: 30, last30_import: 0, last30_export: 0, prev30_calls: 0, last7_calls: Math.round((100 / 30) * 7), last7_count: 7 }],
|
||||
['1', { portname: 'P', last30_calls: 100, last30_count: 30, last30_import: 0, last30_export: 0, prev30_calls: 0 }],
|
||||
]);
|
||||
const [port] = finalisePortsForCountry(portAccumMap, new Map());
|
||||
assert.equal(port.trendDelta, 0);
|
||||
@@ -361,7 +428,7 @@ describe('finalisePortsForCountry (runtime, semantic equivalence)', () => {
|
||||
it('sorts desc + truncates to MAX_PORTS_PER_COUNTRY=50', () => {
|
||||
const portAccumMap = new Map();
|
||||
for (let i = 0; i < 60; i++) {
|
||||
portAccumMap.set(String(i), { portname: `P${i}`, last30_calls: 60 - i, last30_count: 1, last30_import: 0, last30_export: 0, prev30_calls: 0, last7_calls: 0, last7_count: 0 });
|
||||
portAccumMap.set(String(i), { portname: `P${i}`, last30_calls: 60 - i, last30_count: 1, last30_import: 0, last30_export: 0, prev30_calls: 0 });
|
||||
}
|
||||
const out = finalisePortsForCountry(portAccumMap, new Map());
|
||||
assert.equal(out.length, 50);
|
||||
@@ -371,7 +438,7 @@ describe('finalisePortsForCountry (runtime, semantic equivalence)', () => {
|
||||
|
||||
it('falls back to lat/lon=0 when refMap lacks the portId', () => {
|
||||
const portAccumMap = new Map([
|
||||
['999', { portname: 'Orphan', last30_calls: 1, last30_count: 1, last30_import: 0, last30_export: 0, prev30_calls: 0, last7_calls: 0, last7_count: 0 }],
|
||||
['999', { portname: 'Orphan', last30_calls: 1, last30_count: 1, last30_import: 0, last30_export: 0, prev30_calls: 0 }],
|
||||
]);
|
||||
const [port] = finalisePortsForCountry(portAccumMap, new Map());
|
||||
assert.equal(port.lat, 0);
|
||||
|
||||
Reference in New Issue
Block a user