Files
worldmonitor/scripts/_bundle-runner.mjs
Elie Habib 8a988323d2 chore(bundle-runner): emit reliable per-section summary line on parent stdout (#3320)
* chore(bundle-runner): emit reliable per-section summary line on parent stdout

Fixes observability asymmetry in Railway bundle service logs where some
seeders appeared to skip lines like \`Run ID\`, \`Mode\`, or the
structured \`seed_complete\` JSON event. Root cause is Railway's log
ingestion dropping child-stdout lines when multiple seeders emit at
similar timestamps — observed in the PR #3294 launch run where
Pipelines-Gas was missing its \`=== Seed ===\` banner, Pipelines-Oil had
\`Key:\` emitted BEFORE the banner, Storage-Facilities and Energy-
Disruptions were missing Run ID + Mode + seed_complete entirely,
despite identical code paths.

All child processes emit the same lines; Railway just loses some. Fix
is to piggy-back on the observation that bundle-level lines (\`[Bundle:X]
Starting\`, \`[Bundle:X] Finished\`) ARE reliably captured — they come
from the parent process's single stdout stream.

Changes in scripts/_bundle-runner.mjs:
- spawnSeed now captures the child's \`{"event":"seed_complete",...}\` JSON
  line while streaming stdout, parses it, and attaches to the settle
  result.
- Main loop emits one bundle-level summary line per section after child
  exit:
    [Bundle:X] section=NAME status=OK durationMs=1237 records=15 state=OK
  (or \`status=FAILED elapsed=...s reason=...\` for failures).
- Summary line survives Railway's log ingestion even when per-section
  child lines drop, giving monitors a reliable event to key off.

Observability consumers (log-based alerts, seed telemetry scrapers)
should now key off the bundle-level summary rather than per-section
child lines which remain best-effort. The per-section child lines stay
as-is for interactive debugging.

Verification: parse logic sanity-checked against the exact seed_complete
line format. Node syntax check clean. No schema changes.

* fix(bundle-runner): emit FAILED summary line to stderr, not stdout

The prior commit introduced a bundle-level structured summary line per
section. On success that correctly goes to stdout; on FAILED it was
also going to stdout — but that broke tests/bundle-runner.test.mjs test
140 ("timeout emits terminal reason BEFORE SIGTERM/SIGKILL grace").

The test concatenates stdout+stderr and asserts that `SIGKILL` appears
AFTER `Failed after` in the combined string (verifying the kill-decision
log line is emitted BEFORE the 10s SIGTERM→SIGKILL grace window, so
it survives container termination). My new FAILED summary line — which
includes the reason string `timeout after 1s (signal SIGKILL)` —
landed on stdout, which comes first in the concatenation, and its
`SIGKILL` substring matched before the stderr-side `Did not exit on
SIGTERM...SIGKILL` line. Ordering assertion failed.

Fix: route the FAILED summary line through console.error (same stream
as the pre-kill `Failed after ... sending SIGTERM` and the grace-window
`Did not exit...SIGKILL` lines). Chronological ordering in combined
output is preserved; test passes.

OK summary lines stay on stdout — they're observability data, not
error diagnostics, and belong on the normal stream alongside the
bundle Starting/Finished lines.

Local: `node --test tests/bundle-runner.test.mjs` — 4/4 pass including
the previously-failing ordering test.
2026-04-23 08:36:05 +04:00

266 lines
12 KiB
JavaScript

#!/usr/bin/env node
/**
* Bundle orchestrator: spawns multiple seed scripts sequentially via
* child_process.spawn, with line-streamed stdio, SIGTERM→SIGKILL escalation on
* timeout, and freshness-gated skipping. Streaming matters because a hanging
* section would otherwise buffer its logs until exit and look like a silent
* container crash (see PR that replaced execFile).
*
* Usage from a bundle script:
* import { runBundle } from './_bundle-runner.mjs';
* await runBundle('ecb-eu', [ { label, script, seedMetaKey, intervalMs, timeoutMs } ]);
*
* Budget (opt-in): Railway cron services SIGKILL the container at 10min. If
* the sum of timeoutMs for sections that happen to be due exceeds ~9min, we
* risk losing the in-flight section's logs AND marking the job as crashed.
* Callers on Railway cron can pass `{ maxBundleMs }` to enforce a wall-time
* budget — sections whose worst-case timeout wouldn't fit in the remaining
* budget are deferred to the next tick. Default is Infinity (no budget) so
* existing bundles whose individual sections already exceed 9min (e.g.
* 600_000-1 timeouts in imf-extended, energy-sources) are not silently
* broken by adopting the runner.
*/
import { spawn } from 'node:child_process';
import { dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';
import { loadEnvFile } from './_seed-utils.mjs';
import { unwrapEnvelope } from './_seed-envelope-source.mjs';
const __dirname = dirname(fileURLToPath(import.meta.url));
export const MIN = 60_000;
export const HOUR = 3_600_000;
export const DAY = 86_400_000;
export const WEEK = 604_800_000;
loadEnvFile(import.meta.url);
const REDIS_URL = process.env.UPSTASH_REDIS_REST_URL;
const REDIS_TOKEN = process.env.UPSTASH_REDIS_REST_TOKEN;
async function readRedisKey(key) {
if (!REDIS_URL || !REDIS_TOKEN) return null;
try {
const resp = await fetch(`${REDIS_URL}/get/${encodeURIComponent(key)}`, {
headers: { Authorization: `Bearer ${REDIS_TOKEN}` },
signal: AbortSignal.timeout(5_000),
});
if (!resp.ok) return null;
const body = await resp.json();
return body.result ? JSON.parse(body.result) : null;
} catch {
return null;
}
}
/**
* Read section freshness for the interval gate.
*
* Returns `{ fetchedAt }` or null. Prefers envelope-form data when the section
* declares `canonicalKey` (PR 2+); falls back to the legacy `seed-meta:<key>`
* read used by every bundle file today. PR 1 keeps legacy as the ONLY live
* path — `unwrapEnvelope` here is behavior-preserving because legacy seed-meta
* values have no `_seed` field and pass through as `data` unchanged. When PR 2
* migrates bundles to `canonicalKey`, this function starts reading envelopes.
*/
async function readSectionFreshness(section) {
// Try the envelope path first when a canonicalKey is declared. If the canonical
// key isn't yet written as an envelope (PR 2 writer migration lagging reader
// migration, or a legacy payload still present), fall through to the legacy
// seed-meta read so the bundle doesn't over-run during the transition.
if (section.canonicalKey) {
const raw = await readRedisKey(section.canonicalKey);
const { _seed } = unwrapEnvelope(raw);
if (_seed?.fetchedAt) return { fetchedAt: _seed.fetchedAt };
}
if (section.seedMetaKey) {
const raw = await readRedisKey(`seed-meta:${section.seedMetaKey}`);
// Legacy seed-meta is `{ fetchedAt, recordCount, sourceVersion }` at top
// level. It has no `_seed` wrapper so unwrapEnvelope returns it as data.
const meta = unwrapEnvelope(raw).data;
if (meta?.fetchedAt) return { fetchedAt: meta.fetchedAt };
}
return null;
}
// Stream child stdio line-by-line so hung sections surface progress instead of
// looking like a silent crash. Escalate SIGTERM → SIGKILL on timeout so child
// processes with in-flight HTTPS sockets can't outlive the deadline.
const KILL_GRACE_MS = 10_000;
function streamLines(stream, onLine) {
let buf = '';
stream.setEncoding('utf8');
stream.on('data', (chunk) => {
buf += chunk;
let idx;
while ((idx = buf.indexOf('\n')) !== -1) {
const line = buf.slice(0, idx);
buf = buf.slice(idx + 1);
if (line) onLine(line);
}
});
stream.on('end', () => { if (buf) onLine(buf); });
// Child-stdio `error` is rare (SIGKILL emits `end`), but Node throws on an
// unhandled `error` event. Log it instead of crashing the runner.
stream.on('error', (err) => onLine(`<stdio error: ${err.message}>`));
}
function spawnSeed(scriptPath, { timeoutMs, label }) {
return new Promise((resolve) => {
const t0 = Date.now();
// Capture the child's structured `seed_complete` event if emitted, so
// the parent can re-emit the key fields on a single bundle-level line.
// Railway log ingestion drops child-stdout lines when many seeders log
// at similar timestamps (observed across Storage-Facilities /
// Energy-Disruptions / Pipelines-Gas in PR #3294 launch run: each
// dropped a different subset of Run ID / Mode / seed_complete lines
// despite identical code paths). Bundle-level lines survive reliably.
let lastSeedComplete = null;
const child = spawn(process.execPath, [scriptPath], {
env: process.env,
stdio: ['ignore', 'pipe', 'pipe'],
});
streamLines(child.stdout, (line) => {
console.log(` [${label}] ${line}`);
const idx = line.indexOf('{"event":"seed_complete"');
if (idx >= 0) {
try {
lastSeedComplete = JSON.parse(line.slice(idx));
} catch { /* malformed JSON — keep previous */ }
}
});
streamLines(child.stderr, (line) => console.warn(` [${label}] ${line}`));
let settled = false;
let timedOut = false;
let killTimer = null;
// Fire the terminal "Failed ... timeout" log the moment we decide to kill,
// BEFORE the SIGTERM→SIGKILL grace window. This guarantees the reason
// reaches the log stream even if the container itself is killed during
// the grace period (Railway's ~10min cap can land inside the grace for
// sections whose timeoutMs is close to 10min).
const softKill = setTimeout(() => {
timedOut = true;
const elapsedAtTimeout = ((Date.now() - t0) / 1000).toFixed(1);
console.error(` [${label}] Failed after ${elapsedAtTimeout}s: timeout after ${Math.round(timeoutMs / 1000)}s — sending SIGTERM`);
child.kill('SIGTERM');
killTimer = setTimeout(() => {
console.warn(` [${label}] Did not exit on SIGTERM within ${KILL_GRACE_MS / 1000}s — sending SIGKILL`);
child.kill('SIGKILL');
}, KILL_GRACE_MS);
}, timeoutMs);
const settle = (value) => {
if (settled) return;
settled = true;
clearTimeout(softKill);
if (killTimer) clearTimeout(killTimer);
resolve(value);
};
child.on('error', (err) => {
const elapsed = ((Date.now() - t0) / 1000).toFixed(1);
console.error(` [${label}] Failed after ${elapsed}s: spawn error: ${err.message}`);
settle({ elapsed, ok: false, reason: `spawn error: ${err.message}`, alreadyLogged: true });
});
child.on('close', (code, signal) => {
const elapsed = ((Date.now() - t0) / 1000).toFixed(1);
if (timedOut) {
// Terminal reason already logged by softKill — just record the outcome.
settle({ elapsed, ok: false, reason: `timeout after ${Math.round(timeoutMs / 1000)}s (signal ${signal || 'SIGTERM'})`, alreadyLogged: true });
} else if (code === 0) {
settle({ elapsed, ok: true, seedComplete: lastSeedComplete });
} else {
settle({ elapsed, ok: false, reason: `exit ${code ?? 'null'}${signal ? ` (signal ${signal})` : ''}` });
}
});
});
}
/**
* @param {string} label - Bundle name for logging
* @param {Array<{
* label: string,
* script: string,
* seedMetaKey?: string, // legacy (pre-contract); reads `seed-meta:<key>`
* canonicalKey?: string, // PR 2+: reads envelope from the canonical data key
* intervalMs: number,
* timeoutMs?: number,
* }>} sections
* @param {{ maxBundleMs?: number }} [opts]
*/
export async function runBundle(label, sections, opts = {}) {
const t0 = Date.now();
const maxBundleMs = opts.maxBundleMs ?? Infinity;
const budgetLabel = Number.isFinite(maxBundleMs) ? `, budget ${Math.round(maxBundleMs / 1000)}s` : '';
console.log(`[Bundle:${label}] Starting (${sections.length} sections${budgetLabel})`);
let ran = 0, skipped = 0, deferred = 0, failed = 0;
for (const section of sections) {
const scriptPath = join(__dirname, section.script);
const timeout = section.timeoutMs || 300_000;
const freshness = await readSectionFreshness(section);
if (freshness?.fetchedAt) {
const elapsed = Date.now() - freshness.fetchedAt;
if (elapsed < section.intervalMs * 0.8) {
const agoMin = Math.round(elapsed / 60_000);
const intervalMin = Math.round(section.intervalMs / 60_000);
console.log(` [${section.label}] Skipped, last seeded ${agoMin}min ago (interval: ${intervalMin}min)`);
skipped++;
continue;
}
}
const elapsedBundle = Date.now() - t0;
// Worst-case runtime is timeoutMs + KILL_GRACE_MS (child may ignore SIGTERM
// and need SIGKILL after grace). Admit only when the full worst-case fits.
const worstCase = timeout + KILL_GRACE_MS;
if (elapsedBundle + worstCase > maxBundleMs) {
const remainingSec = Math.max(0, Math.round((maxBundleMs - elapsedBundle) / 1000));
const needSec = Math.round(worstCase / 1000);
console.log(` [${section.label}] Deferred, needs ${needSec}s (timeout+grace) but only ${remainingSec}s left in bundle budget`);
deferred++;
continue;
}
const result = await spawnSeed(scriptPath, { timeoutMs: timeout, label: section.label });
if (result.ok) {
console.log(` [${section.label}] Done (${result.elapsed}s)`);
// Bundle-level per-section summary — emitted from parent stdout so
// Railway log ingestion captures it reliably even when child lines
// drop. Observability tools should key off this line, not per-section
// Run ID / Mode / seed_complete lines which are best-effort only.
const sc = result.seedComplete;
if (sc && typeof sc === 'object') {
console.log(`[Bundle:${label}] section=${section.label} status=OK durationMs=${sc.durationMs ?? ''} records=${sc.recordCount ?? ''} state=${sc.state || 'OK'}`);
} else {
// Seeder didn't emit seed_complete (legacy non-contract seeders, or
// the child's event line was dropped before parsing).
console.log(`[Bundle:${label}] section=${section.label} status=OK elapsed=${result.elapsed}s`);
}
ran++;
} else {
if (!result.alreadyLogged) {
console.error(` [${section.label}] Failed after ${result.elapsed}s: ${result.reason}`);
}
// Emit the FAILED summary to stderr (same stream as the Failed line
// and SIGKILL escalation log) so chronological ordering in combined
// output is preserved. If we went to stdout here, the line would
// appear before those stderr lines when consumers concatenate
// stdout+stderr, breaking tests (and log readers) that rely on
// signal-escalation ordering.
console.error(`[Bundle:${label}] section=${section.label} status=FAILED elapsed=${result.elapsed}s reason=${(result.reason || 'unknown').replace(/\s+/g, ' ')}`);
failed++;
}
}
const totalSec = ((Date.now() - t0) / 1000).toFixed(1);
console.log(`[Bundle:${label}] Finished in ${totalSec}s, ran:${ran} skipped:${skipped} deferred:${deferred} failed:${failed}`);
process.exit(failed > 0 ? 1 : 0);
}