Files
worldmonitor/tests/seed-utils-sigterm-cleanup.test.mjs
Elie Habib 5f40f8a13a feat(seed): BUNDLE_RUN_STARTED_AT_MS env + runSeed SIGTERM cleanup (#3384)
* feat(seed): BUNDLE_RUN_STARTED_AT_MS env + runSeed SIGTERM cleanup

Prereq for the re-export-share Comtrade seeder (plan 2026-04-24-003),
usable by any cohort seeder whose consumer needs bundle-level freshness.

Two coupled changes:

1. `_bundle-runner.mjs` injects `BUNDLE_RUN_STARTED_AT_MS` into every
   spawned child. All siblings in a single bundle run share one value
   (captured at `runBundle` start, not spawn time). Consumers use this
   to detect stale peer keys — if a peer's seed-meta predates the
   current bundle run, fall back to a hard default rather than read
   a cohort-peer's last-week output.

2. `_seed-utils.mjs::runSeed` registers a `process.once('SIGTERM')`
   handler that releases the acquired lock and extends existing-data
   TTL before exiting 143. `_bundle-runner.mjs` sends SIGTERM on
   section timeout, then SIGKILL after KILL_GRACE_MS (5s). Without
   this handler the `finally` path never runs on SIGKILL, leaving
   the 30-min acquireLock reservation in place until its own TTL
   expires — the next cron tick silently skips the resource.

Regression guard memory: `bundle-runner-sigkill-leaks-child-lock` (PR
#3128 root cause).

Tests added:
- bundle-runner env injection (value within run bounds)
- sibling sections share the same timestamp (critical for the
  consumer freshness guard)
- runSeed SIGTERM path: exit 143 + cleanup log
- process.once contract: second SIGTERM does not re-enter handler

* fix(seed): address P1/P2 review findings on SIGTERM + bundle contracts

Addresses PR #3384 review findings (todos 256, 257, 259, 260):

#256 (P1) — SIGTERM handler narrowed to fetch phase only. Was installed
at runSeed entry and armed through every `process.exit` path; could
race `emptyDataIsFailure: true` strict-floor exits (IMF-External,
WB-bulk) and extend seed-meta TTL when the contract forbids it —
silently re-masking 30-day outages. Now the handler is attached
immediately before `withRetry(fetchFn)` and removed in a try/finally
that covers all fetch-phase exit branches.

#257 (P1) — `BUNDLE_RUN_STARTED_AT_MS` now has a first-class helper.
Exported `getBundleRunStartedAtMs()` from `_seed-utils.mjs` with JSDoc
describing the bundle-freshness contract. Fleet-wide helper so the
next consumer seeder imports instead of rediscovering the idiom.

#259 (P2) — SIGTERM cleanup runs `Promise.allSettled` on disjoint-key
ops (`releaseLock` + `extendExistingTtl`). Serialising compounded
Upstash latency during the exact failure mode (Redis degraded) this
handler exists to handle, risking breach of the 5s SIGKILL grace.

#260 (P2) — `_bundle-runner.mjs` asserts topological order on
optional `dependsOn` section field. Throws on unknown-label refs and
on deps appearing at a later index. Fleet-wide contract replacing
the previous prose-comment ordering guarantee.

Tests added/updated:
- New: SIGTERM handler removed after fetchFn completes (narrowed-scope
  contract — post-fetch SIGTERM must NOT trigger TTL extension)
- New: dependsOn unknown-label + out-of-order + happy-path (3 tests)

Full test suite: 6,866 tests pass (+4 net).

* fix(seed): getBundleRunStartedAtMs returns null outside a bundle run

Review follow-up: the earlier `Math.floor(Date.now()/1000)*1000` fallback
regressed standalone (non-bundle) runs. A consumer seeder invoked
manually just after its peer wrote `fetchedAt = (now - 5s)` would see
`bundleStartMs = Date.now()`, reject the perfectly-fresh peer envelope
as "stale", and fall back to defaults — defeating the point of the
peer-read path outside the bundle.

Returning null when `BUNDLE_RUN_STARTED_AT_MS` is unset/invalid keeps
the freshness gate scoped to its real purpose (across-bundle-tick
staleness) and lets standalone runs skip the gate entirely. Consumers
check `bundleStartMs != null` before applying the comparison; see the
companion `seed-sovereign-wealth.mjs` change on the stacked PR.

* test(seed): SIGTERM cleanup test now verifies Redis DEL + EXPIRE calls

Greptile review P2 on PR #3384: the existing test only asserted exit
code + log line, not that the Redis ops were actually issued. The
log claim was ahead of the test.

Fixture now logs every Upstash fetch call's shape (EVAL / pipeline-
EXPIRE / other) to stderr. Test asserts:

- >=1 EVAL op was issued during SIGTERM cleanup (releaseLock Lua
  script on the lock key)
- >=1 pipeline-EXPIRE op was issued (extendExistingTtl on canonical
  + seed-meta keys)
- The EVAL body carries the runSeed-generated runId (proves it's
  THIS run's release, not a phantom op)
- The EXPIRE pipeline touches both the canonicalKey AND the
  seed-meta key (proves the keys[] array was built correctly
  including the extraKeys merge path)

Full test suite: 6,866 tests pass, typecheck clean.
2026-04-25 00:14:04 +04:00

275 lines
12 KiB
JavaScript

// Regression test: runSeed must release its lock and extend existing-data
// TTL when it receives SIGTERM from _bundle-runner.mjs. Without this, the
// 30-min acquireLock reservation leaks to the NEXT cron tick, which then
// silently skips the resource — long-tail outage window described in
// memory `bundle-runner-sigkill-leaks-child-lock` (PR #3128).
//
// Strategy: spawn a real child that monkey-patches global fetch to capture
// every Upstash call, invokes runSeed() with a fetchFn that awaits forever,
// sends SIGTERM, and verifies the child (a) exits 143 (b) prints the
// "SIGTERM received" line (c) emits the DEL (releaseLock) + EXPIRE pipeline
// (extendExistingTtl) calls before exit.
import { test } from 'node:test';
import assert from 'node:assert/strict';
import { spawn } from 'node:child_process';
import { writeFileSync, unlinkSync } from 'node:fs';
import { join } from 'node:path';
const SCRIPTS_DIR = new URL('../scripts/', import.meta.url).pathname;
function runFixture(bodyJs) {
const path = join(SCRIPTS_DIR, `_sigterm-fixture-${Date.now()}.mjs`);
writeFileSync(path, bodyJs);
return new Promise((resolve) => {
const child = spawn(process.execPath, [path], {
stdio: ['ignore', 'pipe', 'pipe'],
env: {
...process.env,
UPSTASH_REDIS_REST_URL: 'https://fake-upstash.example.com',
UPSTASH_REDIS_REST_TOKEN: 'fake-token',
},
});
let stdout = '';
let stderr = '';
child.stdout.on('data', (c) => { stdout += c; });
child.stderr.on('data', (c) => { stderr += c; });
child.on('close', (code, signal) => {
try { unlinkSync(path); } catch {}
resolve({ code, signal, stdout, stderr });
});
// Let runSeed register the SIGTERM handler and enter fetchFn before we kill.
// The fixture logs "READY" once the fetchFn is awaited; we kill then.
const readyCheck = setInterval(() => {
if (stdout.includes('READY')) {
clearInterval(readyCheck);
child.kill('SIGTERM');
}
}, 25);
setTimeout(() => {
clearInterval(readyCheck);
try { child.kill('SIGKILL'); } catch {}
}, 10_000);
});
}
test('runSeed releases lock and extends existing TTL on SIGTERM', async () => {
// Fixture logs every Upstash HTTP call (shape + body) on its own
// line so the test can assert that the SIGTERM cleanup actually
// emitted (a) an EVAL DEL-on-match for the lock key, and (b) an
// EXPIRE pipeline for the canonical + seed-meta keys. Log goes to
// stderr so READY-signal on stdout stays uncontended.
const body = `
import { runSeed } from './_seed-utils.mjs';
globalThis.fetch = async (url, opts = {}) => {
const body = opts?.body ? (() => { try { return JSON.parse(opts.body); } catch { return opts.body; } })() : null;
// Shape signature: EVAL / EXPIRE / pipeline / other — so the test
// asserts on the exact op without having to deep-inspect.
let shape = 'other';
if (Array.isArray(body)) {
if (Array.isArray(body[0])) {
shape = body[0][0] === 'EXPIRE' ? 'pipeline-EXPIRE' : 'pipeline-other';
} else if (body[0] === 'EVAL') {
shape = 'EVAL';
} else {
shape = 'cmd-' + body[0];
}
}
console.error('FETCH_OP shape=' + shape + ' body=' + JSON.stringify(body));
// Lock SET NX → return result:0 (not already held). Pipeline → array.
if (Array.isArray(body) && Array.isArray(body[0])) {
return new Response(JSON.stringify(body.map(() => ({ result: 1 }))), { status: 200 });
}
return new Response(JSON.stringify({ result: 'OK' }), { status: 200 });
};
// fetchFn that awaits "forever" — we want SIGTERM to interrupt mid-fetch.
// setInterval keeps the event loop alive (otherwise Node bails with
// "Detected unsettled top-level await" before SIGTERM can be delivered).
const foreverFetch = () => new Promise(() => {
console.log('READY');
setInterval(() => {}, 10_000);
});
await runSeed('test-domain', 'sigterm', 'data:test:sigterm:v1', foreverFetch, {
ttlSeconds: 900,
lockTtlMs: 60_000,
});
`;
const { code, signal, stderr } = await runFixture(body);
// process.exit(143) should produce code=143; on some platforms Node maps it
// back to a signal termination, so accept either code or signal.
assert.ok(code === 143 || signal === 'SIGTERM',
`expected exit 143 or SIGTERM; got code=${code} signal=${signal}\nstderr:\n${stderr}`);
assert.match(stderr, /SIGTERM received — releasing lock/,
`expected SIGTERM cleanup log; stderr:\n${stderr}`);
// Verify cleanup actually ISSUED the Redis ops, not just logged intent.
// Extract FETCH_OP lines; separate the acquire-time ops from SIGTERM-time.
const fetchOps = stderr.split('\n').filter((l) => l.startsWith('FETCH_OP '));
const evalOps = fetchOps.filter((l) => l.includes('shape=EVAL'));
const pipelineExpireOps = fetchOps.filter((l) => l.includes('shape=pipeline-EXPIRE'));
// The SIGTERM handler must emit at least one EVAL (releaseLock Lua
// script on the lock key) and at least one pipeline EXPIRE (extend
// existing TTL on canonical + seed-meta keys).
assert.ok(evalOps.length >= 1,
`expected >=1 EVAL (releaseLock) call; saw ${evalOps.length}\nstderr:\n${stderr}`);
assert.ok(pipelineExpireOps.length >= 1,
`expected >=1 pipeline-EXPIRE (extendExistingTtl) call; saw ${pipelineExpireOps.length}\nstderr:\n${stderr}`);
// Specific: the EVAL body must reference our runId (body[4]) so we
// know it's the SIGTERM-time release, not a different lock op.
// runId format: `${Date.now()}-${Math.random().toString(36).slice(2,8)}`
// → e.g. "1777061031282-cmgso6", JSON-quoted inside the EVAL body.
const evalHasRunId = evalOps.some((l) => /"\d{10,}-[a-z0-9]{6}"/.test(l));
assert.ok(evalHasRunId,
`expected EVAL body to carry the runSeed-generated runId; stderr:\n${stderr}`);
// Specific: the EXPIRE pipeline must reference both the canonical
// key and the seed-meta key (proves keys[] was constructed correctly).
const expireTouchesCanonical = pipelineExpireOps.some((l) => l.includes('data:test:sigterm:v1'));
const expireTouchesSeedMeta = pipelineExpireOps.some((l) => l.includes('seed-meta:test-domain:sigterm'));
assert.ok(expireTouchesCanonical,
`EXPIRE pipeline must include canonicalKey; stderr:\n${stderr}`);
assert.ok(expireTouchesSeedMeta,
`EXPIRE pipeline must include seed-meta key; stderr:\n${stderr}`);
});
test('runSeed SIGTERM handler fires once even if multiple SIGTERMs arrive', async () => {
// Uses process.once under the hood; verify by emitting SIGTERM twice.
// A second SIGTERM while the handler is mid-cleanup should not trigger
// re-entry. If the handler was registered with process.on instead of
// process.once, the second SIGTERM would re-enter and double-release.
const body = `
import { runSeed } from './_seed-utils.mjs';
globalThis.fetch = async (url, opts = {}) => {
const body = opts?.body ? (() => { try { return JSON.parse(opts.body); } catch { return opts.body; } })() : null;
if (Array.isArray(body) && Array.isArray(body[0])) {
return new Response(JSON.stringify(body.map(() => ({ result: 0 }))), { status: 200 });
}
return new Response(JSON.stringify({ result: 'OK' }), { status: 200 });
};
const foreverFetch = () => new Promise(() => { console.log('READY'); setInterval(() => {}, 10_000); });
await runSeed('test-domain', 'sigterm-once', 'data:test:sigterm-once:v1', foreverFetch, {
ttlSeconds: 900,
lockTtlMs: 60_000,
});
`;
const path = join(SCRIPTS_DIR, `_sigterm-once-fixture-${Date.now()}.mjs`);
writeFileSync(path, body);
try {
await new Promise((resolve) => {
const child = spawn(process.execPath, [path], {
stdio: ['ignore', 'pipe', 'pipe'],
env: {
...process.env,
UPSTASH_REDIS_REST_URL: 'https://fake-upstash.example.com',
UPSTASH_REDIS_REST_TOKEN: 'fake-token',
},
});
let stdout = '';
let stderr = '';
let sigtermLinesSeen = 0;
child.stdout.on('data', (c) => { stdout += c; });
child.stderr.on('data', (c) => {
stderr += c;
sigtermLinesSeen = (stderr.match(/SIGTERM received/g) || []).length;
});
const ready = setInterval(() => {
if (stdout.includes('READY')) {
clearInterval(ready);
child.kill('SIGTERM');
setTimeout(() => { try { child.kill('SIGTERM'); } catch {} }, 50);
}
}, 25);
child.on('close', (code) => {
clearInterval(ready);
assert.equal(sigtermLinesSeen, 1,
`handler must fire once (process.once); saw ${sigtermLinesSeen} SIGTERM lines\nstderr:\n${stderr}`);
resolve();
});
setTimeout(() => { try { child.kill('SIGKILL'); } catch {} }, 10_000);
});
} finally {
try { unlinkSync(path); } catch {}
}
});
test('SIGTERM handler is removed AFTER fetchFn completes (no race with publish-phase exit)', async () => {
// After the fetch phase returns, runSeed's try/finally removes the
// SIGTERM listener. Verify by: fetchFn returns synthetic data → runSeed
// enters publish phase (which will fail on our empty fetch mock that
// doesn't simulate atomicPublish success) → SIGTERM arriving AFTER
// fetchFn-resolution must fall through to Node's default handler (fast
// exit with signal=SIGTERM, NO "SIGTERM received" cleanup line).
//
// This pins the narrowed-scope contract: the cleanup handler exists
// only during the long-running fetch, and post-fetch SIGTERMs do
// NOT trigger runSeed's TTL-extension code path — critical for
// strict-floor seeders (emptyDataIsFailure: true) that must NOT
// refresh seed-meta on empty data.
const body = `
import { runSeed } from './_seed-utils.mjs';
// Redis stub: lock SET NX → OK, everything else → OK.
globalThis.fetch = async (url, opts = {}) => {
const body = opts?.body ? (() => { try { return JSON.parse(opts.body); } catch { return opts.body; } })() : null;
if (Array.isArray(body) && Array.isArray(body[0])) {
return new Response(JSON.stringify(body.map(() => ({ result: 0 }))), { status: 200 });
}
return new Response(JSON.stringify({ result: 'OK' }), { status: 200 });
};
// fetchFn resolves IMMEDIATELY with data, then we signal READY and
// hang in the publish phase so the test can send SIGTERM. Using
// atomicPublish's Lua-redirect stub response ('OK') will keep the
// seeder alive in verify-retry; setInterval ensures event loop stays up.
const quickFetch = async () => {
const data = { items: [{ k: 1 }] };
// Give the SIGTERM handler a microtask to be removed by the
// try{ ... } finally{ process.off } after withRetry resolves.
// Then signal ready.
setInterval(() => {}, 10_000);
setImmediate(() => console.log('POST_FETCH_READY'));
return data;
};
await runSeed('test-domain', 'post-fetch', 'data:test:post-fetch:v1', quickFetch, {
ttlSeconds: 900,
lockTtlMs: 60_000,
});
`;
const path = join(SCRIPTS_DIR, `_sigterm-postfetch-fixture-${Date.now()}.mjs`);
writeFileSync(path, body);
try {
await new Promise((resolve) => {
const child = spawn(process.execPath, [path], {
stdio: ['ignore', 'pipe', 'pipe'],
env: {
...process.env,
UPSTASH_REDIS_REST_URL: 'https://fake-upstash.example.com',
UPSTASH_REDIS_REST_TOKEN: 'fake-token',
},
});
let stdout = '';
let stderr = '';
child.stdout.on('data', (c) => { stdout += c; });
child.stderr.on('data', (c) => { stderr += c; });
const ready = setInterval(() => {
if (stdout.includes('POST_FETCH_READY')) {
clearInterval(ready);
// Give one extra event-loop tick so the finally{process.off} runs.
setTimeout(() => child.kill('SIGTERM'), 50);
}
}, 25);
child.on('close', (code, signal) => {
clearInterval(ready);
// Post-fetch SIGTERM falls through to Node default: no "SIGTERM
// received" cleanup log from our handler. Exit may be signal=SIGTERM
// or code (Node varies), but the decisive signal is the ABSENCE of
// the cleanup log line.
const cleanupFired = /SIGTERM received — releasing lock/.test(stderr);
assert.equal(cleanupFired, false,
`post-fetch SIGTERM must NOT trigger runSeed cleanup (strict-floor safety). stderr:\n${stderr}`);
resolve();
});
setTimeout(() => { try { child.kill('SIGKILL'); } catch {} }, 10_000);
});
} finally {
try { unlinkSync(path); } catch {}
}
});