mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
* fix(seeds): rethrow non-fetch failures in runSeed() Split runSeed() into two phases so only upstream fetch errors get the graceful TTL-extension path. Redis publish, seed-meta, and verification failures now rethrow (exit 1) so monitoring catches them. * fix(seeds): separate fetch from publish errors in standalone scripts Split seed-airport-delays, seed-military-flights, and seed-service-statuses into two phases matching runSeed() pattern: - Phase 1: upstream fetch errors are graceful (extend TTL, exit 0) - Phase 2: Redis publish/verify errors propagate (exit 1) * fix(seeds): make Redis SET throw on failure so publish errors propagate Local redisSet() returned false instead of throwing, silently masking Redis write failures. writeExtraKey() also warned instead of throwing. Both now throw on non-OK responses, ensuring Phase 2 catch fires. * fix(seed): treat empty Redis key after successful RPC as publish failure When cachedFetchJson() silently swallows a Redis write failure, the warm-ping script now throws instead of warning, reaching the outer catch handler (exit 1) so monitoring detects the issue.
352 lines
12 KiB
JavaScript
352 lines
12 KiB
JavaScript
#!/usr/bin/env node
|
|
|
|
import { readFileSync, existsSync } from 'node:fs';
|
|
import { dirname, join } from 'node:path';
|
|
import { fileURLToPath } from 'node:url';
|
|
|
|
const CHROME_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36';
|
|
const MAX_PAYLOAD_BYTES = 5 * 1024 * 1024; // 5MB per key
|
|
|
|
const __seed_dirname = dirname(fileURLToPath(import.meta.url));
|
|
|
|
export { CHROME_UA };
|
|
|
|
export function loadSharedConfig(filename) {
|
|
for (const base of [join(__seed_dirname, '..', 'shared'), join(__seed_dirname, 'shared')]) {
|
|
const p = join(base, filename);
|
|
if (existsSync(p)) return JSON.parse(readFileSync(p, 'utf8'));
|
|
}
|
|
throw new Error(`Cannot find shared/${filename} — checked ../shared/ and ./shared/`);
|
|
}
|
|
|
|
export function loadEnvFile(metaUrl) {
|
|
const __dirname = metaUrl ? dirname(fileURLToPath(metaUrl)) : process.cwd();
|
|
const candidates = [
|
|
join(__dirname, '..', '.env.local'),
|
|
join(__dirname, '..', '..', '.env.local'),
|
|
];
|
|
if (process.env.HOME) {
|
|
candidates.push(join(process.env.HOME, 'Documents/GitHub/worldmonitor', '.env.local'));
|
|
}
|
|
for (const envPath of candidates) {
|
|
if (!existsSync(envPath)) continue;
|
|
const lines = readFileSync(envPath, 'utf8').split('\n');
|
|
for (const line of lines) {
|
|
const trimmed = line.trim();
|
|
if (!trimmed || trimmed.startsWith('#')) continue;
|
|
const eqIdx = trimmed.indexOf('=');
|
|
if (eqIdx === -1) continue;
|
|
const key = trimmed.slice(0, eqIdx).trim();
|
|
let val = trimmed.slice(eqIdx + 1).trim();
|
|
if ((val.startsWith('"') && val.endsWith('"')) || (val.startsWith("'") && val.endsWith("'"))) {
|
|
val = val.slice(1, -1);
|
|
}
|
|
if (!process.env[key]) process.env[key] = val;
|
|
}
|
|
return;
|
|
}
|
|
}
|
|
|
|
export function maskToken(token) {
|
|
if (!token || token.length < 8) return '***';
|
|
return token.slice(0, 4) + '***' + token.slice(-4);
|
|
}
|
|
|
|
export function getRedisCredentials() {
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) {
|
|
console.error('Missing UPSTASH_REDIS_REST_URL or UPSTASH_REDIS_REST_TOKEN');
|
|
process.exit(1);
|
|
}
|
|
return { url, token };
|
|
}
|
|
|
|
async function redisCommand(url, token, command) {
|
|
const resp = await fetch(url, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(command),
|
|
signal: AbortSignal.timeout(15_000),
|
|
});
|
|
if (!resp.ok) {
|
|
const text = await resp.text().catch(() => '');
|
|
throw new Error(`Redis command failed: HTTP ${resp.status} — ${text.slice(0, 200)}`);
|
|
}
|
|
return resp.json();
|
|
}
|
|
|
|
async function redisGet(url, token, key) {
|
|
const resp = await fetch(`${url}/get/${encodeURIComponent(key)}`, {
|
|
headers: { Authorization: `Bearer ${token}` },
|
|
signal: AbortSignal.timeout(5_000),
|
|
});
|
|
if (!resp.ok) return null;
|
|
const data = await resp.json();
|
|
return data.result ? JSON.parse(data.result) : null;
|
|
}
|
|
|
|
async function redisSet(url, token, key, value, ttlSeconds) {
|
|
const payload = JSON.stringify(value);
|
|
const cmd = ttlSeconds
|
|
? ['SET', key, payload, 'EX', ttlSeconds]
|
|
: ['SET', key, payload];
|
|
return redisCommand(url, token, cmd);
|
|
}
|
|
|
|
async function redisDel(url, token, key) {
|
|
return redisCommand(url, token, ['DEL', key]);
|
|
}
|
|
|
|
export async function acquireLock(domain, runId, ttlMs) {
|
|
const { url, token } = getRedisCredentials();
|
|
const lockKey = `seed-lock:${domain}`;
|
|
const result = await redisCommand(url, token, ['SET', lockKey, runId, 'NX', 'PX', ttlMs]);
|
|
return result?.result === 'OK';
|
|
}
|
|
|
|
export async function releaseLock(domain, runId) {
|
|
const { url, token } = getRedisCredentials();
|
|
const lockKey = `seed-lock:${domain}`;
|
|
const script = `if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end`;
|
|
try {
|
|
await redisCommand(url, token, ['EVAL', script, 1, lockKey, runId]);
|
|
} catch {
|
|
// Best-effort release; lock will expire via TTL
|
|
}
|
|
}
|
|
|
|
export async function atomicPublish(canonicalKey, data, validateFn, ttlSeconds) {
|
|
const { url, token } = getRedisCredentials();
|
|
const runId = String(Date.now());
|
|
const stagingKey = `${canonicalKey}:staging:${runId}`;
|
|
|
|
const payload = JSON.stringify(data);
|
|
const payloadBytes = Buffer.byteLength(payload, 'utf8');
|
|
if (payloadBytes > MAX_PAYLOAD_BYTES) {
|
|
throw new Error(`Payload too large: ${(payloadBytes / 1024 / 1024).toFixed(1)}MB > 5MB limit`);
|
|
}
|
|
|
|
if (validateFn) {
|
|
const valid = validateFn(data);
|
|
if (!valid) {
|
|
return { payloadBytes: 0, skipped: true };
|
|
}
|
|
}
|
|
|
|
// Write to staging key
|
|
await redisSet(url, token, stagingKey, data, 300); // 5 min staging TTL
|
|
|
|
// Overwrite canonical key
|
|
if (ttlSeconds) {
|
|
await redisCommand(url, token, ['SET', canonicalKey, payload, 'EX', ttlSeconds]);
|
|
} else {
|
|
await redisCommand(url, token, ['SET', canonicalKey, payload]);
|
|
}
|
|
|
|
// Cleanup staging
|
|
await redisDel(url, token, stagingKey).catch(() => {});
|
|
|
|
return { payloadBytes, recordCount: Array.isArray(data) ? data.length : null };
|
|
}
|
|
|
|
export async function writeFreshnessMetadata(domain, resource, count, source) {
|
|
const { url, token } = getRedisCredentials();
|
|
const metaKey = `seed-meta:${domain}:${resource}`;
|
|
const meta = {
|
|
fetchedAt: Date.now(),
|
|
recordCount: count,
|
|
sourceVersion: source || '',
|
|
};
|
|
await redisSet(url, token, metaKey, meta, 86400 * 7); // 7 day TTL on metadata
|
|
return meta;
|
|
}
|
|
|
|
export async function withRetry(fn, maxRetries = 3, delayMs = 1000) {
|
|
let lastErr;
|
|
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
|
try {
|
|
return await fn();
|
|
} catch (err) {
|
|
lastErr = err;
|
|
if (attempt < maxRetries) {
|
|
const wait = delayMs * 2 ** attempt;
|
|
console.warn(` Retry ${attempt + 1}/${maxRetries} in ${wait}ms: ${err.message || err}`);
|
|
await new Promise(r => setTimeout(r, wait));
|
|
}
|
|
}
|
|
}
|
|
throw lastErr;
|
|
}
|
|
|
|
export function logSeedResult(domain, count, durationMs, extra = {}) {
|
|
console.log(JSON.stringify({
|
|
event: 'seed_complete',
|
|
domain,
|
|
recordCount: count,
|
|
durationMs: Math.round(durationMs),
|
|
timestamp: new Date().toISOString(),
|
|
...extra,
|
|
}));
|
|
}
|
|
|
|
export async function verifySeedKey(key) {
|
|
const { url, token } = getRedisCredentials();
|
|
const data = await redisGet(url, token, key);
|
|
return data;
|
|
}
|
|
|
|
export async function writeExtraKey(key, data, ttl) {
|
|
const { url, token } = getRedisCredentials();
|
|
const payload = JSON.stringify(data);
|
|
const resp = await fetch(url, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(['SET', key, payload, 'EX', ttl]),
|
|
signal: AbortSignal.timeout(10_000),
|
|
});
|
|
if (!resp.ok) throw new Error(`Extra key ${key}: write failed (HTTP ${resp.status})`);
|
|
console.log(` Extra key ${key}: written`);
|
|
}
|
|
|
|
export async function writeExtraKeyWithMeta(key, data, ttl, recordCount, metaKeyOverride) {
|
|
await writeExtraKey(key, data, ttl);
|
|
const { url, token } = getRedisCredentials();
|
|
const metaKey = metaKeyOverride || `seed-meta:${key.replace(/:v\d+$/, '')}`;
|
|
const meta = { fetchedAt: Date.now(), recordCount: recordCount ?? 0 };
|
|
const resp = await fetch(url, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(['SET', metaKey, JSON.stringify(meta), 'EX', 86400 * 7]),
|
|
signal: AbortSignal.timeout(5_000),
|
|
});
|
|
if (!resp.ok) console.warn(` seed-meta ${metaKey}: write failed`);
|
|
}
|
|
|
|
export async function extendExistingTtl(keys, ttlSeconds = 600) {
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) {
|
|
console.error(' Cannot extend TTL: missing Redis credentials');
|
|
return;
|
|
}
|
|
try {
|
|
const pipeline = keys.map(k => ['EXPIRE', k, ttlSeconds]);
|
|
const resp = await fetch(`${url}/pipeline`, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(pipeline),
|
|
signal: AbortSignal.timeout(10_000),
|
|
});
|
|
if (resp.ok) {
|
|
console.log(` Extended TTL on ${keys.length} existing key(s) (${ttlSeconds}s)`);
|
|
}
|
|
} catch (e) {
|
|
console.error(` TTL extension failed: ${e.message}`);
|
|
}
|
|
}
|
|
|
|
export function sleep(ms) {
|
|
return new Promise((r) => setTimeout(r, ms));
|
|
}
|
|
|
|
export function parseYahooChart(data, symbol) {
|
|
const result = data?.chart?.result?.[0];
|
|
const meta = result?.meta;
|
|
if (!meta) return null;
|
|
|
|
const price = meta.regularMarketPrice;
|
|
const prevClose = meta.chartPreviousClose || meta.previousClose || price;
|
|
const change = prevClose ? ((price - prevClose) / prevClose) * 100 : 0;
|
|
const closes = result.indicators?.quote?.[0]?.close;
|
|
const sparkline = Array.isArray(closes) ? closes.filter((v) => v != null) : [];
|
|
|
|
return { symbol, name: symbol, display: symbol, price, change: +change.toFixed(2), sparkline };
|
|
}
|
|
|
|
export async function runSeed(domain, resource, canonicalKey, fetchFn, opts = {}) {
|
|
const { validateFn, ttlSeconds, lockTtlMs = 120_000, extraKeys } = opts;
|
|
const runId = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
|
const startMs = Date.now();
|
|
|
|
console.log(`=== ${domain}:${resource} Seed ===`);
|
|
console.log(` Run ID: ${runId}`);
|
|
console.log(` Key: ${canonicalKey}`);
|
|
|
|
// Acquire lock
|
|
const locked = await acquireLock(`${domain}:${resource}`, runId, lockTtlMs);
|
|
if (!locked) {
|
|
console.log(' SKIPPED: another seed run in progress');
|
|
process.exit(0);
|
|
}
|
|
|
|
// Phase 1: Fetch data (graceful on failure — extend TTL on stale data)
|
|
let data;
|
|
try {
|
|
data = await withRetry(fetchFn);
|
|
} catch (err) {
|
|
await releaseLock(`${domain}:${resource}`, runId);
|
|
const durationMs = Date.now() - startMs;
|
|
console.error(` FETCH FAILED: ${err.message || err}`);
|
|
|
|
const ttl = ttlSeconds || 600;
|
|
const keys = [canonicalKey];
|
|
if (extraKeys) keys.push(...extraKeys.map(ek => ek.key));
|
|
await extendExistingTtl(keys, ttl);
|
|
|
|
console.log(`\n=== Failed gracefully (${Math.round(durationMs)}ms) ===`);
|
|
process.exit(0);
|
|
}
|
|
|
|
// Phase 2: Publish to Redis (rethrow on failure — data was fetched but not stored)
|
|
try {
|
|
const publishResult = await atomicPublish(canonicalKey, data, validateFn, ttlSeconds);
|
|
if (publishResult.skipped) {
|
|
const durationMs = Date.now() - startMs;
|
|
console.log(` SKIPPED: validation failed (empty data) — preserving existing cache`);
|
|
console.log(`\n=== Done (${Math.round(durationMs)}ms, no write) ===`);
|
|
await releaseLock(`${domain}:${resource}`, runId);
|
|
process.exit(0);
|
|
}
|
|
const { payloadBytes } = publishResult;
|
|
const topicArticleCount = Array.isArray(data?.topics)
|
|
? data.topics.reduce((n, t) => n + (t?.articles?.length || t?.events?.length || 0), 0)
|
|
: undefined;
|
|
const recordCount = opts.recordCount != null
|
|
? (typeof opts.recordCount === 'function' ? opts.recordCount(data) : opts.recordCount)
|
|
: Array.isArray(data) ? data.length
|
|
: (topicArticleCount
|
|
?? data?.events?.length ?? data?.earthquakes?.length ?? data?.outages?.length
|
|
?? data?.fireDetections?.length ?? data?.anomalies?.length ?? data?.threats?.length
|
|
?? data?.quotes?.length ?? data?.stablecoins?.length
|
|
?? data?.cables?.length ?? 0);
|
|
|
|
// Write extra keys (e.g., bootstrap hydration keys)
|
|
if (extraKeys) {
|
|
for (const ek of extraKeys) {
|
|
await writeExtraKey(ek.key, ek.transform ? ek.transform(data) : data, ek.ttl || ttlSeconds);
|
|
}
|
|
}
|
|
|
|
const meta = await writeFreshnessMetadata(domain, resource, recordCount, opts.sourceVersion);
|
|
|
|
const durationMs = Date.now() - startMs;
|
|
logSeedResult(domain, recordCount, durationMs, { payloadBytes });
|
|
|
|
// Verify
|
|
const verified = await verifySeedKey(canonicalKey);
|
|
if (verified) {
|
|
console.log(` Verified: data present in Redis`);
|
|
} else {
|
|
console.warn(' WARNING: verification read returned null');
|
|
}
|
|
|
|
console.log(`\n=== Done (${Math.round(durationMs)}ms) ===`);
|
|
await releaseLock(`${domain}:${resource}`, runId);
|
|
process.exit(0);
|
|
} catch (err) {
|
|
await releaseLock(`${domain}:${resource}`, runId);
|
|
throw err;
|
|
}
|
|
}
|