mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
* feat(seed): BUNDLE_RUN_STARTED_AT_MS env + runSeed SIGTERM cleanup
Prereq for the re-export-share Comtrade seeder (plan 2026-04-24-003),
usable by any cohort seeder whose consumer needs bundle-level freshness.
Two coupled changes:
1. `_bundle-runner.mjs` injects `BUNDLE_RUN_STARTED_AT_MS` into every
spawned child. All siblings in a single bundle run share one value
(captured at `runBundle` start, not spawn time). Consumers use this
to detect stale peer keys — if a peer's seed-meta predates the
current bundle run, fall back to a hard default rather than read
a cohort-peer's last-week output.
2. `_seed-utils.mjs::runSeed` registers a `process.once('SIGTERM')`
handler that releases the acquired lock and extends existing-data
TTL before exiting 143. `_bundle-runner.mjs` sends SIGTERM on
section timeout, then SIGKILL after KILL_GRACE_MS (5s). Without
this handler the `finally` path never runs on SIGKILL, leaving
the 30-min acquireLock reservation in place until its own TTL
expires — the next cron tick silently skips the resource.
Regression guard memory: `bundle-runner-sigkill-leaks-child-lock` (PR
#3128 root cause).
Tests added:
- bundle-runner env injection (value within run bounds)
- sibling sections share the same timestamp (critical for the
consumer freshness guard)
- runSeed SIGTERM path: exit 143 + cleanup log
- process.once contract: second SIGTERM does not re-enter handler
* fix(seed): address P1/P2 review findings on SIGTERM + bundle contracts
Addresses PR #3384 review findings (todos 256, 257, 259, 260):
#256 (P1) — SIGTERM handler narrowed to fetch phase only. Was installed
at runSeed entry and armed through every `process.exit` path; could
race `emptyDataIsFailure: true` strict-floor exits (IMF-External,
WB-bulk) and extend seed-meta TTL when the contract forbids it —
silently re-masking 30-day outages. Now the handler is attached
immediately before `withRetry(fetchFn)` and removed in a try/finally
that covers all fetch-phase exit branches.
#257 (P1) — `BUNDLE_RUN_STARTED_AT_MS` now has a first-class helper.
Exported `getBundleRunStartedAtMs()` from `_seed-utils.mjs` with JSDoc
describing the bundle-freshness contract. Fleet-wide helper so the
next consumer seeder imports instead of rediscovering the idiom.
#259 (P2) — SIGTERM cleanup runs `Promise.allSettled` on disjoint-key
ops (`releaseLock` + `extendExistingTtl`). Serialising compounded
Upstash latency during the exact failure mode (Redis degraded) this
handler exists to handle, risking breach of the 5s SIGKILL grace.
#260 (P2) — `_bundle-runner.mjs` asserts topological order on
optional `dependsOn` section field. Throws on unknown-label refs and
on deps appearing at a later index. Fleet-wide contract replacing
the previous prose-comment ordering guarantee.
Tests added/updated:
- New: SIGTERM handler removed after fetchFn completes (narrowed-scope
contract — post-fetch SIGTERM must NOT trigger TTL extension)
- New: dependsOn unknown-label + out-of-order + happy-path (3 tests)
Full test suite: 6,866 tests pass (+4 net).
* fix(seed): getBundleRunStartedAtMs returns null outside a bundle run
Review follow-up: the earlier `Math.floor(Date.now()/1000)*1000` fallback
regressed standalone (non-bundle) runs. A consumer seeder invoked
manually just after its peer wrote `fetchedAt = (now - 5s)` would see
`bundleStartMs = Date.now()`, reject the perfectly-fresh peer envelope
as "stale", and fall back to defaults — defeating the point of the
peer-read path outside the bundle.
Returning null when `BUNDLE_RUN_STARTED_AT_MS` is unset/invalid keeps
the freshness gate scoped to its real purpose (across-bundle-tick
staleness) and lets standalone runs skip the gate entirely. Consumers
check `bundleStartMs != null` before applying the comparison; see the
companion `seed-sovereign-wealth.mjs` change on the stacked PR.
* test(seed): SIGTERM cleanup test now verifies Redis DEL + EXPIRE calls
Greptile review P2 on PR #3384: the existing test only asserted exit
code + log line, not that the Redis ops were actually issued. The
log claim was ahead of the test.
Fixture now logs every Upstash fetch call's shape (EVAL / pipeline-
EXPIRE / other) to stderr. Test asserts:
- >=1 EVAL op was issued during SIGTERM cleanup (releaseLock Lua
script on the lock key)
- >=1 pipeline-EXPIRE op was issued (extendExistingTtl on canonical
+ seed-meta keys)
- The EVAL body carries the runSeed-generated runId (proves it's
THIS run's release, not a phantom op)
- The EXPIRE pipeline touches both the canonicalKey AND the
seed-meta key (proves the keys[] array was built correctly
including the extraKeys merge path)
Full test suite: 6,866 tests pass, typecheck clean.
275 lines
12 KiB
JavaScript
275 lines
12 KiB
JavaScript
// Regression test: runSeed must release its lock and extend existing-data
|
|
// TTL when it receives SIGTERM from _bundle-runner.mjs. Without this, the
|
|
// 30-min acquireLock reservation leaks to the NEXT cron tick, which then
|
|
// silently skips the resource — long-tail outage window described in
|
|
// memory `bundle-runner-sigkill-leaks-child-lock` (PR #3128).
|
|
//
|
|
// Strategy: spawn a real child that monkey-patches global fetch to capture
|
|
// every Upstash call, invokes runSeed() with a fetchFn that awaits forever,
|
|
// sends SIGTERM, and verifies the child (a) exits 143 (b) prints the
|
|
// "SIGTERM received" line (c) emits the DEL (releaseLock) + EXPIRE pipeline
|
|
// (extendExistingTtl) calls before exit.
|
|
|
|
import { test } from 'node:test';
|
|
import assert from 'node:assert/strict';
|
|
import { spawn } from 'node:child_process';
|
|
import { writeFileSync, unlinkSync } from 'node:fs';
|
|
import { join } from 'node:path';
|
|
|
|
const SCRIPTS_DIR = new URL('../scripts/', import.meta.url).pathname;
|
|
|
|
function runFixture(bodyJs) {
|
|
const path = join(SCRIPTS_DIR, `_sigterm-fixture-${Date.now()}.mjs`);
|
|
writeFileSync(path, bodyJs);
|
|
return new Promise((resolve) => {
|
|
const child = spawn(process.execPath, [path], {
|
|
stdio: ['ignore', 'pipe', 'pipe'],
|
|
env: {
|
|
...process.env,
|
|
UPSTASH_REDIS_REST_URL: 'https://fake-upstash.example.com',
|
|
UPSTASH_REDIS_REST_TOKEN: 'fake-token',
|
|
},
|
|
});
|
|
let stdout = '';
|
|
let stderr = '';
|
|
child.stdout.on('data', (c) => { stdout += c; });
|
|
child.stderr.on('data', (c) => { stderr += c; });
|
|
child.on('close', (code, signal) => {
|
|
try { unlinkSync(path); } catch {}
|
|
resolve({ code, signal, stdout, stderr });
|
|
});
|
|
// Let runSeed register the SIGTERM handler and enter fetchFn before we kill.
|
|
// The fixture logs "READY" once the fetchFn is awaited; we kill then.
|
|
const readyCheck = setInterval(() => {
|
|
if (stdout.includes('READY')) {
|
|
clearInterval(readyCheck);
|
|
child.kill('SIGTERM');
|
|
}
|
|
}, 25);
|
|
setTimeout(() => {
|
|
clearInterval(readyCheck);
|
|
try { child.kill('SIGKILL'); } catch {}
|
|
}, 10_000);
|
|
});
|
|
}
|
|
|
|
test('runSeed releases lock and extends existing TTL on SIGTERM', async () => {
|
|
// Fixture logs every Upstash HTTP call (shape + body) on its own
|
|
// line so the test can assert that the SIGTERM cleanup actually
|
|
// emitted (a) an EVAL DEL-on-match for the lock key, and (b) an
|
|
// EXPIRE pipeline for the canonical + seed-meta keys. Log goes to
|
|
// stderr so READY-signal on stdout stays uncontended.
|
|
const body = `
|
|
import { runSeed } from './_seed-utils.mjs';
|
|
globalThis.fetch = async (url, opts = {}) => {
|
|
const body = opts?.body ? (() => { try { return JSON.parse(opts.body); } catch { return opts.body; } })() : null;
|
|
// Shape signature: EVAL / EXPIRE / pipeline / other — so the test
|
|
// asserts on the exact op without having to deep-inspect.
|
|
let shape = 'other';
|
|
if (Array.isArray(body)) {
|
|
if (Array.isArray(body[0])) {
|
|
shape = body[0][0] === 'EXPIRE' ? 'pipeline-EXPIRE' : 'pipeline-other';
|
|
} else if (body[0] === 'EVAL') {
|
|
shape = 'EVAL';
|
|
} else {
|
|
shape = 'cmd-' + body[0];
|
|
}
|
|
}
|
|
console.error('FETCH_OP shape=' + shape + ' body=' + JSON.stringify(body));
|
|
// Lock SET NX → return result:0 (not already held). Pipeline → array.
|
|
if (Array.isArray(body) && Array.isArray(body[0])) {
|
|
return new Response(JSON.stringify(body.map(() => ({ result: 1 }))), { status: 200 });
|
|
}
|
|
return new Response(JSON.stringify({ result: 'OK' }), { status: 200 });
|
|
};
|
|
// fetchFn that awaits "forever" — we want SIGTERM to interrupt mid-fetch.
|
|
// setInterval keeps the event loop alive (otherwise Node bails with
|
|
// "Detected unsettled top-level await" before SIGTERM can be delivered).
|
|
const foreverFetch = () => new Promise(() => {
|
|
console.log('READY');
|
|
setInterval(() => {}, 10_000);
|
|
});
|
|
await runSeed('test-domain', 'sigterm', 'data:test:sigterm:v1', foreverFetch, {
|
|
ttlSeconds: 900,
|
|
lockTtlMs: 60_000,
|
|
});
|
|
`;
|
|
const { code, signal, stderr } = await runFixture(body);
|
|
// process.exit(143) should produce code=143; on some platforms Node maps it
|
|
// back to a signal termination, so accept either code or signal.
|
|
assert.ok(code === 143 || signal === 'SIGTERM',
|
|
`expected exit 143 or SIGTERM; got code=${code} signal=${signal}\nstderr:\n${stderr}`);
|
|
assert.match(stderr, /SIGTERM received — releasing lock/,
|
|
`expected SIGTERM cleanup log; stderr:\n${stderr}`);
|
|
|
|
// Verify cleanup actually ISSUED the Redis ops, not just logged intent.
|
|
// Extract FETCH_OP lines; separate the acquire-time ops from SIGTERM-time.
|
|
const fetchOps = stderr.split('\n').filter((l) => l.startsWith('FETCH_OP '));
|
|
const evalOps = fetchOps.filter((l) => l.includes('shape=EVAL'));
|
|
const pipelineExpireOps = fetchOps.filter((l) => l.includes('shape=pipeline-EXPIRE'));
|
|
// The SIGTERM handler must emit at least one EVAL (releaseLock Lua
|
|
// script on the lock key) and at least one pipeline EXPIRE (extend
|
|
// existing TTL on canonical + seed-meta keys).
|
|
assert.ok(evalOps.length >= 1,
|
|
`expected >=1 EVAL (releaseLock) call; saw ${evalOps.length}\nstderr:\n${stderr}`);
|
|
assert.ok(pipelineExpireOps.length >= 1,
|
|
`expected >=1 pipeline-EXPIRE (extendExistingTtl) call; saw ${pipelineExpireOps.length}\nstderr:\n${stderr}`);
|
|
// Specific: the EVAL body must reference our runId (body[4]) so we
|
|
// know it's the SIGTERM-time release, not a different lock op.
|
|
// runId format: `${Date.now()}-${Math.random().toString(36).slice(2,8)}`
|
|
// → e.g. "1777061031282-cmgso6", JSON-quoted inside the EVAL body.
|
|
const evalHasRunId = evalOps.some((l) => /"\d{10,}-[a-z0-9]{6}"/.test(l));
|
|
assert.ok(evalHasRunId,
|
|
`expected EVAL body to carry the runSeed-generated runId; stderr:\n${stderr}`);
|
|
// Specific: the EXPIRE pipeline must reference both the canonical
|
|
// key and the seed-meta key (proves keys[] was constructed correctly).
|
|
const expireTouchesCanonical = pipelineExpireOps.some((l) => l.includes('data:test:sigterm:v1'));
|
|
const expireTouchesSeedMeta = pipelineExpireOps.some((l) => l.includes('seed-meta:test-domain:sigterm'));
|
|
assert.ok(expireTouchesCanonical,
|
|
`EXPIRE pipeline must include canonicalKey; stderr:\n${stderr}`);
|
|
assert.ok(expireTouchesSeedMeta,
|
|
`EXPIRE pipeline must include seed-meta key; stderr:\n${stderr}`);
|
|
});
|
|
|
|
test('runSeed SIGTERM handler fires once even if multiple SIGTERMs arrive', async () => {
|
|
// Uses process.once under the hood; verify by emitting SIGTERM twice.
|
|
// A second SIGTERM while the handler is mid-cleanup should not trigger
|
|
// re-entry. If the handler was registered with process.on instead of
|
|
// process.once, the second SIGTERM would re-enter and double-release.
|
|
const body = `
|
|
import { runSeed } from './_seed-utils.mjs';
|
|
globalThis.fetch = async (url, opts = {}) => {
|
|
const body = opts?.body ? (() => { try { return JSON.parse(opts.body); } catch { return opts.body; } })() : null;
|
|
if (Array.isArray(body) && Array.isArray(body[0])) {
|
|
return new Response(JSON.stringify(body.map(() => ({ result: 0 }))), { status: 200 });
|
|
}
|
|
return new Response(JSON.stringify({ result: 'OK' }), { status: 200 });
|
|
};
|
|
const foreverFetch = () => new Promise(() => { console.log('READY'); setInterval(() => {}, 10_000); });
|
|
await runSeed('test-domain', 'sigterm-once', 'data:test:sigterm-once:v1', foreverFetch, {
|
|
ttlSeconds: 900,
|
|
lockTtlMs: 60_000,
|
|
});
|
|
`;
|
|
const path = join(SCRIPTS_DIR, `_sigterm-once-fixture-${Date.now()}.mjs`);
|
|
writeFileSync(path, body);
|
|
try {
|
|
await new Promise((resolve) => {
|
|
const child = spawn(process.execPath, [path], {
|
|
stdio: ['ignore', 'pipe', 'pipe'],
|
|
env: {
|
|
...process.env,
|
|
UPSTASH_REDIS_REST_URL: 'https://fake-upstash.example.com',
|
|
UPSTASH_REDIS_REST_TOKEN: 'fake-token',
|
|
},
|
|
});
|
|
let stdout = '';
|
|
let stderr = '';
|
|
let sigtermLinesSeen = 0;
|
|
child.stdout.on('data', (c) => { stdout += c; });
|
|
child.stderr.on('data', (c) => {
|
|
stderr += c;
|
|
sigtermLinesSeen = (stderr.match(/SIGTERM received/g) || []).length;
|
|
});
|
|
const ready = setInterval(() => {
|
|
if (stdout.includes('READY')) {
|
|
clearInterval(ready);
|
|
child.kill('SIGTERM');
|
|
setTimeout(() => { try { child.kill('SIGTERM'); } catch {} }, 50);
|
|
}
|
|
}, 25);
|
|
child.on('close', (code) => {
|
|
clearInterval(ready);
|
|
assert.equal(sigtermLinesSeen, 1,
|
|
`handler must fire once (process.once); saw ${sigtermLinesSeen} SIGTERM lines\nstderr:\n${stderr}`);
|
|
resolve();
|
|
});
|
|
setTimeout(() => { try { child.kill('SIGKILL'); } catch {} }, 10_000);
|
|
});
|
|
} finally {
|
|
try { unlinkSync(path); } catch {}
|
|
}
|
|
});
|
|
|
|
test('SIGTERM handler is removed AFTER fetchFn completes (no race with publish-phase exit)', async () => {
|
|
// After the fetch phase returns, runSeed's try/finally removes the
|
|
// SIGTERM listener. Verify by: fetchFn returns synthetic data → runSeed
|
|
// enters publish phase (which will fail on our empty fetch mock that
|
|
// doesn't simulate atomicPublish success) → SIGTERM arriving AFTER
|
|
// fetchFn-resolution must fall through to Node's default handler (fast
|
|
// exit with signal=SIGTERM, NO "SIGTERM received" cleanup line).
|
|
//
|
|
// This pins the narrowed-scope contract: the cleanup handler exists
|
|
// only during the long-running fetch, and post-fetch SIGTERMs do
|
|
// NOT trigger runSeed's TTL-extension code path — critical for
|
|
// strict-floor seeders (emptyDataIsFailure: true) that must NOT
|
|
// refresh seed-meta on empty data.
|
|
const body = `
|
|
import { runSeed } from './_seed-utils.mjs';
|
|
// Redis stub: lock SET NX → OK, everything else → OK.
|
|
globalThis.fetch = async (url, opts = {}) => {
|
|
const body = opts?.body ? (() => { try { return JSON.parse(opts.body); } catch { return opts.body; } })() : null;
|
|
if (Array.isArray(body) && Array.isArray(body[0])) {
|
|
return new Response(JSON.stringify(body.map(() => ({ result: 0 }))), { status: 200 });
|
|
}
|
|
return new Response(JSON.stringify({ result: 'OK' }), { status: 200 });
|
|
};
|
|
// fetchFn resolves IMMEDIATELY with data, then we signal READY and
|
|
// hang in the publish phase so the test can send SIGTERM. Using
|
|
// atomicPublish's Lua-redirect stub response ('OK') will keep the
|
|
// seeder alive in verify-retry; setInterval ensures event loop stays up.
|
|
const quickFetch = async () => {
|
|
const data = { items: [{ k: 1 }] };
|
|
// Give the SIGTERM handler a microtask to be removed by the
|
|
// try{ ... } finally{ process.off } after withRetry resolves.
|
|
// Then signal ready.
|
|
setInterval(() => {}, 10_000);
|
|
setImmediate(() => console.log('POST_FETCH_READY'));
|
|
return data;
|
|
};
|
|
await runSeed('test-domain', 'post-fetch', 'data:test:post-fetch:v1', quickFetch, {
|
|
ttlSeconds: 900,
|
|
lockTtlMs: 60_000,
|
|
});
|
|
`;
|
|
const path = join(SCRIPTS_DIR, `_sigterm-postfetch-fixture-${Date.now()}.mjs`);
|
|
writeFileSync(path, body);
|
|
try {
|
|
await new Promise((resolve) => {
|
|
const child = spawn(process.execPath, [path], {
|
|
stdio: ['ignore', 'pipe', 'pipe'],
|
|
env: {
|
|
...process.env,
|
|
UPSTASH_REDIS_REST_URL: 'https://fake-upstash.example.com',
|
|
UPSTASH_REDIS_REST_TOKEN: 'fake-token',
|
|
},
|
|
});
|
|
let stdout = '';
|
|
let stderr = '';
|
|
child.stdout.on('data', (c) => { stdout += c; });
|
|
child.stderr.on('data', (c) => { stderr += c; });
|
|
const ready = setInterval(() => {
|
|
if (stdout.includes('POST_FETCH_READY')) {
|
|
clearInterval(ready);
|
|
// Give one extra event-loop tick so the finally{process.off} runs.
|
|
setTimeout(() => child.kill('SIGTERM'), 50);
|
|
}
|
|
}, 25);
|
|
child.on('close', (code, signal) => {
|
|
clearInterval(ready);
|
|
// Post-fetch SIGTERM falls through to Node default: no "SIGTERM
|
|
// received" cleanup log from our handler. Exit may be signal=SIGTERM
|
|
// or code (Node varies), but the decisive signal is the ABSENCE of
|
|
// the cleanup log line.
|
|
const cleanupFired = /SIGTERM received — releasing lock/.test(stderr);
|
|
assert.equal(cleanupFired, false,
|
|
`post-fetch SIGTERM must NOT trigger runSeed cleanup (strict-floor safety). stderr:\n${stderr}`);
|
|
resolve();
|
|
});
|
|
setTimeout(() => { try { child.kill('SIGKILL'); } catch {} }, 10_000);
|
|
});
|
|
} finally {
|
|
try { unlinkSync(path); } catch {}
|
|
}
|
|
});
|