Files
worldmonitor/scripts/_seed-utils.mjs
Elie Habib bf27e474c2 fix(seeder): TLS proxy CONNECT — fixes FRED fetch failures (FSI, yield curve, macro) (#2538)
* fix(seeder): use TLS for proxy CONNECT tunnel to fix FRED fetch failures

Decodo gate.decodo.com:10001 requires TLS. Previous code used http.request
(plain TCP) which received SOCKS5 rejection bytes instead of HTTP 200.

Two issues fixed:
1. Replace http.request CONNECT with tls.connect + manual CONNECT handshake.
   Node.js http.request also auto-sets Host to the proxy hostname; Decodo
   rejects this and responds with SOCKS5 bytes (0x05 0xff). Manual CONNECT
   over a raw TLS socket avoids both issues.
2. Handle https:// and plain "user:pass@host:port" proxy URL formats — always
   uses TLS regardless of PROXY_URL prefix.

_proxy-utils.cjs: resolveProxyStringConnect now preserves https:// prefix
from PROXY_URL so callers can detect TLS proxies explicitly.

All 24 FRED series (BAMLH0A0HYM2, FEDFUNDS, DGS10, etc.) confirmed working
locally via gate.decodo.com:10001.

* fix(seeder): respect http:// proxy scheme + buffer full CONNECT response

Two protocol-correctness fixes:

1. http:// proxies used plain TCP before; always-TLS regressed them.
   Now: bare/undeclared format → TLS (Decodo requires it), explicit
   http:// → plain net.connect, explicit https:// → TLS.

2. CONNECT response buffered until \r\n\r\n instead of acting on the
   first data chunk. Fragmented proxy responses (headers split across
   packets) could corrupt the TLS handshake by leaving header bytes
   on the wire when tls.connect() was called too early.

Verified locally: BAMLH0A0HYM2 → { date: 2026-03-26, value: 3.21 }

* chore(seeder): remove unused http import, fix stale JSDoc

- Drop `import * as http from 'node:http'` — no longer used after
  replacing http.request CONNECT with tls.connect + manual handshake
- Update resolveProxyStringConnect() JSDoc: https.request → tls.connect
2026-03-30 11:06:48 +04:00

812 lines
33 KiB
JavaScript

#!/usr/bin/env node
import { readFileSync, existsSync } from 'node:fs';
import { execFileSync } from 'node:child_process';
import { dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';
import { createRequire } from 'node:module';
import * as net from 'node:net';
import * as tls from 'node:tls';
import * as https from 'node:https';
import { promisify } from 'node:util';
import { gunzip as _gunzip } from 'node:zlib';
const gunzip = promisify(_gunzip);
const CHROME_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36';
const MAX_PAYLOAD_BYTES = 5 * 1024 * 1024; // 5MB per key
const __seed_dirname = dirname(fileURLToPath(import.meta.url));
export { CHROME_UA };
// Canonical FX fallback rates — used when Yahoo Finance returns null/zero.
// Single source of truth shared by seed-bigmac, seed-grocery-basket, seed-fx-rates.
// EGP: 0.0192 is the most recently observed live rate (2026-03-21 seed run).
export const SHARED_FX_FALLBACKS = {
USD: 1.0000, GBP: 1.2700, EUR: 1.0850, JPY: 0.0067, CHF: 1.1300,
CNY: 0.1380, INR: 0.0120, AUD: 0.6500, CAD: 0.7400, NZD: 0.5900,
BRL: 0.1900, MXN: 0.0490, ZAR: 0.0540, TRY: 0.0290, KRW: 0.0007,
SGD: 0.7400, HKD: 0.1280, TWD: 0.0310, THB: 0.0280, IDR: 0.000063,
NOK: 0.0920, SEK: 0.0930, DKK: 0.1450, PLN: 0.2450, CZK: 0.0430,
HUF: 0.0028, RON: 0.2200, PHP: 0.0173, VND: 0.000040, MYR: 0.2250,
PKR: 0.0036, ILS: 0.2750, ARS: 0.00084, COP: 0.000240, CLP: 0.00108,
UAH: 0.0240, NGN: 0.00062, KES: 0.0077,
AED: 0.2723, SAR: 0.2666, QAR: 0.2747, KWD: 3.2520,
BHD: 2.6525, OMR: 2.5974, JOD: 1.4104, EGP: 0.0192, LBP: 0.0000112,
};
export function loadSharedConfig(filename) {
for (const base of [join(__seed_dirname, '..', 'shared'), join(__seed_dirname, 'shared')]) {
const p = join(base, filename);
if (existsSync(p)) return JSON.parse(readFileSync(p, 'utf8'));
}
throw new Error(`Cannot find shared/${filename} — checked ../shared/ and ./shared/`);
}
export function loadEnvFile(metaUrl) {
const __dirname = metaUrl ? dirname(fileURLToPath(metaUrl)) : process.cwd();
const candidates = [
join(__dirname, '..', '.env.local'),
join(__dirname, '..', '..', '.env.local'),
];
if (process.env.HOME) {
candidates.push(join(process.env.HOME, 'Documents/GitHub/worldmonitor', '.env.local'));
}
for (const envPath of candidates) {
if (!existsSync(envPath)) continue;
const lines = readFileSync(envPath, 'utf8').split('\n');
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith('#')) continue;
const eqIdx = trimmed.indexOf('=');
if (eqIdx === -1) continue;
const key = trimmed.slice(0, eqIdx).trim();
let val = trimmed.slice(eqIdx + 1).trim();
if ((val.startsWith('"') && val.endsWith('"')) || (val.startsWith("'") && val.endsWith("'"))) {
val = val.slice(1, -1);
}
if (!process.env[key]) process.env[key] = val;
}
return;
}
}
export function maskToken(token) {
if (!token || token.length < 8) return '***';
return token.slice(0, 4) + '***' + token.slice(-4);
}
export function getRedisCredentials() {
const url = process.env.UPSTASH_REDIS_REST_URL;
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
if (!url || !token) {
console.error('Missing UPSTASH_REDIS_REST_URL or UPSTASH_REDIS_REST_TOKEN');
process.exit(1);
}
return { url, token };
}
async function redisCommand(url, token, command) {
const resp = await fetch(url, {
method: 'POST',
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify(command),
signal: AbortSignal.timeout(15_000),
});
if (!resp.ok) {
const text = await resp.text().catch(() => '');
throw new Error(`Redis command failed: HTTP ${resp.status}${text.slice(0, 200)}`);
}
return resp.json();
}
async function redisGet(url, token, key) {
const resp = await fetch(`${url}/get/${encodeURIComponent(key)}`, {
headers: { Authorization: `Bearer ${token}` },
signal: AbortSignal.timeout(5_000),
});
if (!resp.ok) return null;
const data = await resp.json();
return data.result ? JSON.parse(data.result) : null;
}
async function redisSet(url, token, key, value, ttlSeconds) {
const payload = JSON.stringify(value);
const cmd = ttlSeconds
? ['SET', key, payload, 'EX', ttlSeconds]
: ['SET', key, payload];
return redisCommand(url, token, cmd);
}
async function redisDel(url, token, key) {
return redisCommand(url, token, ['DEL', key]);
}
// Upstash REST calls surface transient network issues through fetch/undici
// errors rather than stable app-level error codes, so we normalize the common
// timeout/reset/DNS variants here before deciding to skip a seed run.
export function isTransientRedisError(err) {
const message = String(err?.message || '');
const causeMessage = String(err?.cause?.message || '');
const code = String(err?.code || err?.cause?.code || '');
const combined = `${message} ${causeMessage} ${code}`;
return /UND_ERR_|Connect Timeout Error|fetch failed|ECONNRESET|ENOTFOUND|ETIMEDOUT|EAI_AGAIN/i.test(combined);
}
export async function acquireLock(domain, runId, ttlMs) {
const { url, token } = getRedisCredentials();
const lockKey = `seed-lock:${domain}`;
const result = await redisCommand(url, token, ['SET', lockKey, runId, 'NX', 'PX', ttlMs]);
return result?.result === 'OK';
}
export async function acquireLockSafely(domain, runId, ttlMs, opts = {}) {
const label = opts.label || domain;
try {
const locked = await withRetry(() => acquireLock(domain, runId, ttlMs), opts.maxRetries ?? 2, opts.delayMs ?? 1000);
return { locked, skipped: false, reason: null };
} catch (err) {
if (isTransientRedisError(err)) {
console.warn(` SKIPPED: Redis unavailable during lock acquisition for ${label}`);
return { locked: false, skipped: true, reason: 'redis_unavailable' };
}
throw err;
}
}
export async function releaseLock(domain, runId) {
const { url, token } = getRedisCredentials();
const lockKey = `seed-lock:${domain}`;
const script = `if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end`;
try {
await redisCommand(url, token, ['EVAL', script, 1, lockKey, runId]);
} catch {
// Best-effort release; lock will expire via TTL
}
}
export async function atomicPublish(canonicalKey, data, validateFn, ttlSeconds) {
const { url, token } = getRedisCredentials();
const runId = String(Date.now());
const stagingKey = `${canonicalKey}:staging:${runId}`;
const payload = JSON.stringify(data);
const payloadBytes = Buffer.byteLength(payload, 'utf8');
if (payloadBytes > MAX_PAYLOAD_BYTES) {
throw new Error(`Payload too large: ${(payloadBytes / 1024 / 1024).toFixed(1)}MB > 5MB limit`);
}
if (validateFn) {
const valid = validateFn(data);
if (!valid) {
return { payloadBytes: 0, skipped: true };
}
}
// Write to staging key
await redisSet(url, token, stagingKey, data, 300); // 5 min staging TTL
// Overwrite canonical key
if (ttlSeconds) {
await redisCommand(url, token, ['SET', canonicalKey, payload, 'EX', ttlSeconds]);
} else {
await redisCommand(url, token, ['SET', canonicalKey, payload]);
}
// Cleanup staging
await redisDel(url, token, stagingKey).catch(() => {});
return { payloadBytes, recordCount: Array.isArray(data) ? data.length : null };
}
export async function writeFreshnessMetadata(domain, resource, count, source) {
const { url, token } = getRedisCredentials();
const metaKey = `seed-meta:${domain}:${resource}`;
const meta = {
fetchedAt: Date.now(),
recordCount: count,
sourceVersion: source || '',
};
await redisSet(url, token, metaKey, meta, 86400 * 7); // 7 day TTL on metadata
return meta;
}
export async function withRetry(fn, maxRetries = 3, delayMs = 1000) {
let lastErr;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (err) {
lastErr = err;
if (attempt < maxRetries) {
const wait = delayMs * 2 ** attempt;
const cause = err.cause ? ` (cause: ${err.cause.message || err.cause.code || err.cause})` : '';
console.warn(` Retry ${attempt + 1}/${maxRetries} in ${wait}ms: ${err.message || err}${cause}`);
await new Promise(r => setTimeout(r, wait));
}
}
}
throw lastErr;
}
export function logSeedResult(domain, count, durationMs, extra = {}) {
console.log(JSON.stringify({
event: 'seed_complete',
domain,
recordCount: count,
durationMs: Math.round(durationMs),
timestamp: new Date().toISOString(),
...extra,
}));
}
export async function verifySeedKey(key) {
const { url, token } = getRedisCredentials();
const data = await redisGet(url, token, key);
return data;
}
export async function writeExtraKey(key, data, ttl) {
const { url, token } = getRedisCredentials();
const payload = JSON.stringify(data);
const resp = await fetch(url, {
method: 'POST',
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify(['SET', key, payload, 'EX', ttl]),
signal: AbortSignal.timeout(10_000),
});
if (!resp.ok) throw new Error(`Extra key ${key}: write failed (HTTP ${resp.status})`);
console.log(` Extra key ${key}: written`);
}
export async function writeExtraKeyWithMeta(key, data, ttl, recordCount, metaKeyOverride) {
await writeExtraKey(key, data, ttl);
const { url, token } = getRedisCredentials();
const metaKey = metaKeyOverride || `seed-meta:${key.replace(/:v\d+$/, '')}`;
const meta = { fetchedAt: Date.now(), recordCount: recordCount ?? 0 };
const resp = await fetch(url, {
method: 'POST',
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify(['SET', metaKey, JSON.stringify(meta), 'EX', 86400 * 7]),
signal: AbortSignal.timeout(5_000),
});
if (!resp.ok) console.warn(` seed-meta ${metaKey}: write failed`);
}
export async function extendExistingTtl(keys, ttlSeconds = 600) {
const url = process.env.UPSTASH_REDIS_REST_URL;
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
if (!url || !token) {
console.error(' Cannot extend TTL: missing Redis credentials');
return;
}
try {
// EXPIRE only refreshes TTL when key already exists (returns 0 on missing keys — no-op).
// Check each result: keys that returned 0 are missing/expired and cannot be extended.
const pipeline = keys.map(k => ['EXPIRE', k, ttlSeconds]);
const resp = await fetch(`${url}/pipeline`, {
method: 'POST',
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify(pipeline),
signal: AbortSignal.timeout(10_000),
});
if (resp.ok) {
const results = await resp.json();
const extended = results.filter(r => r?.result === 1).length;
const missing = results.filter(r => r?.result === 0).length;
if (extended > 0) console.log(` Extended TTL on ${extended} key(s) (${ttlSeconds}s)`);
if (missing > 0) console.warn(` WARNING: ${missing} key(s) were expired/missing — EXPIRE was a no-op; manual seed required`);
}
} catch (e) {
console.error(` TTL extension failed: ${e.message}`);
}
}
export function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}
// ─── Proxy helpers for sources that block Railway container IPs ───
// TODO: consolidate all fetch+proxy logic into a single proxyFetch(url, options) helper.
// Current state: _seed-utils has fredFetchJson (fixed: direct-first, proxy fallback) and
// curlFetch (curl-only, relay container). seed-disease-outbreaks.mjs and seed-fear-greed.mjs
// each define their own local curlFetch that silently fails with ENOENT in seeder containers
// (curl not in node:22-alpine). Each of those scripts should use a shared fetchWithProxyFallback
// that tries native fetch first and falls back to httpsProxyFetchJson — same pattern as
// fredFetchJson after this fix. Tracked: consolidate into one exported function.
const { resolveProxyString, resolveProxyStringConnect } = createRequire(import.meta.url)('./_proxy-utils.cjs');
export function resolveProxy() {
return resolveProxyString();
}
// For HTTP CONNECT tunneling (httpsProxyFetchJson); keeps gate.decodo.com, not us.decodo.com.
export function resolveProxyForConnect() {
return resolveProxyStringConnect();
}
// curl-based fetch; throws on non-2xx. Returns response body as string.
// NOTE: requires curl binary — only available in Dockerfile.relay (apk add curl).
// Do NOT call from standalone seed scripts; use fredFetchJson or httpsProxyFetchJson instead.
export function curlFetch(url, proxyAuth, headers = {}) {
const args = ['-sS', '--compressed', '--max-time', '15', '-L'];
if (proxyAuth) args.push('-x', `http://${proxyAuth}`);
for (const [k, v] of Object.entries(headers)) args.push('-H', `${k}: ${v}`);
args.push('-w', '\n%{http_code}');
args.push(url);
const raw = execFileSync('curl', args, { encoding: 'utf8', timeout: 20000, stdio: ['pipe', 'pipe', 'pipe'] });
const nl = raw.lastIndexOf('\n');
const status = parseInt(raw.slice(nl + 1).trim(), 10);
if (status < 200 || status >= 300) throw Object.assign(new Error(`HTTP ${status}`), { status });
return raw.slice(0, nl);
}
// Pure Node.js HTTPS-through-proxy (CONNECT tunnel).
// proxyAuth format: "user:pass@host:port" (bare/Decodo → TLS) OR
// "https://user:pass@host:port" (explicit TLS) OR
// "http://user:pass@host:port" (explicit plain TCP)
// Bare/undeclared-scheme proxies always use TLS (Decodo gate.decodo.com requires it).
// Explicit http:// proxies use plain TCP to avoid breaking non-TLS setups.
async function httpsProxyFetchJson(url, proxyAuth) {
const targetUrl = new URL(url);
// Detect whether the proxy URL specifies http:// explicitly (plain TCP) or not
// (bare format or https:// → TLS). User instruction: bare → always TLS.
const explicitHttp = proxyAuth.startsWith('http://') && !proxyAuth.startsWith('https://');
const useTls = !explicitHttp;
// Strip scheme prefix, parse user:pass@host:port.
let proxyAuthStr = proxyAuth;
if (proxyAuth.startsWith('https://') || proxyAuth.startsWith('http://')) {
const u = new URL(proxyAuth);
proxyAuthStr = (u.username ? `${decodeURIComponent(u.username)}:${decodeURIComponent(u.password)}@` : '') + `${u.hostname}:${u.port}`;
}
const atIdx = proxyAuthStr.lastIndexOf('@');
const credentials = atIdx >= 0 ? proxyAuthStr.slice(0, atIdx) : '';
const hostPort = atIdx >= 0 ? proxyAuthStr.slice(atIdx + 1) : proxyAuthStr;
const colonIdx = hostPort.lastIndexOf(':');
const proxyHost = hostPort.slice(0, colonIdx);
const proxyPort = parseInt(hostPort.slice(colonIdx + 1), 10);
// Step 1: Open socket to proxy (TLS for https:// or bare, plain TCP for http://).
const proxySock = await new Promise((resolve, reject) => {
if (useTls) {
const s = tls.connect({ host: proxyHost, port: proxyPort, servername: proxyHost, ALPNProtocols: ['http/1.1'] }, () => resolve(s));
s.on('error', reject);
} else {
const s = net.connect({ host: proxyHost, port: proxyPort }, () => resolve(s));
s.on('error', reject);
}
});
// Step 2: Send HTTP CONNECT manually (avoids Node.js http.request auto-setting
// Host to the proxy hostname, which Decodo rejects with SOCKS5 bytes).
const authHeader = credentials ? `\r\nProxy-Authorization: Basic ${Buffer.from(credentials).toString('base64')}` : '';
proxySock.write(`CONNECT ${targetUrl.hostname}:443 HTTP/1.1\r\nHost: ${targetUrl.hostname}:443${authHeader}\r\n\r\n`);
// Step 3: Buffer until the full CONNECT response headers arrive (\r\n\r\n).
// Using a single 'data' event is not safe — headers may arrive fragmented across
// multiple packets, leaving unread bytes that corrupt the subsequent TLS handshake.
await new Promise((resolve, reject) => {
let buf = '';
const onData = (chunk) => {
buf += chunk.toString('ascii');
if (!buf.includes('\r\n\r\n')) return;
proxySock.removeListener('data', onData);
const statusLine = buf.split('\r\n')[0];
if (!statusLine.startsWith('HTTP/1.1 200') && !statusLine.startsWith('HTTP/1.0 200')) {
proxySock.destroy();
return reject(Object.assign(new Error(`Proxy CONNECT: ${statusLine}`), { status: parseInt(statusLine.split(' ')[1]) || 0 }));
}
proxySock.pause();
resolve();
};
proxySock.on('data', onData);
proxySock.on('error', reject);
});
// Step 4: TLS over the proxy tunnel (TLS-in-TLS) to reach the target server.
const tlsSock = tls.connect({ socket: proxySock, servername: targetUrl.hostname, ALPNProtocols: ['http/1.1'] });
await new Promise((resolve, reject) => {
tlsSock.on('secureConnect', resolve);
tlsSock.on('error', reject);
});
proxySock.resume();
return new Promise((resolve, reject) => {
const timer = setTimeout(() => { tlsSock.destroy(); reject(new Error('FRED proxy fetch timeout')); }, 20000);
const fail = (e) => { clearTimeout(timer); tlsSock.destroy(); reject(e); };
https.request({
host: targetUrl.hostname,
path: targetUrl.pathname + targetUrl.search,
method: 'GET',
headers: { Accept: 'application/json', 'Accept-Encoding': 'gzip', 'User-Agent': CHROME_UA },
createConnection: () => tlsSock,
}, (resp) => {
clearTimeout(timer);
if (resp.statusCode < 200 || resp.statusCode >= 300) {
resp.resume();
return reject(Object.assign(new Error(`HTTP ${resp.statusCode}`), { status: resp.statusCode }));
}
const chunks = [];
resp.on('data', c => chunks.push(c));
resp.on('end', () => {
const body = Buffer.concat(chunks);
const isGzip = (resp.headers['content-encoding'] || '').includes('gzip');
(isGzip ? gunzip(body) : Promise.resolve(body))
.then(data => { try { resolve(JSON.parse(data.toString('utf8'))); } catch (e) { fail(e); } })
.catch(fail);
});
resp.on('error', fail);
}).on('error', fail).end();
});
}
// Fetch JSON from a FRED URL, routing through proxy when available.
export async function fredFetchJson(url, proxyAuth) {
try {
const r = await fetch(url, { headers: { Accept: 'application/json' }, signal: AbortSignal.timeout(20_000) });
if (r.ok) return r.json();
throw Object.assign(new Error(`HTTP ${r.status}`), { status: r.status });
} catch (directErr) {
if (!proxyAuth) throw directErr;
console.warn(` [fredFetch] direct failed (${directErr.message}) — retrying via proxy`);
try {
return await httpsProxyFetchJson(url, proxyAuth);
} catch (proxyErr) {
throw Object.assign(new Error(`proxy: ${proxyErr.message}`), { cause: proxyErr });
}
}
}
// ---------------------------------------------------------------------------
// Learned Routes — persist successful scrape URLs across seed runs
// ---------------------------------------------------------------------------
// Validate a URL's hostname against a list of allowed domains (same list used
// for EXA includeDomains). Prevents stored-SSRF from Redis-persisted URLs.
export function isAllowedRouteHost(url, allowedHosts) {
try {
const hostname = new URL(url).hostname.replace(/^www\./, '');
return allowedHosts.some(h => hostname === h || hostname.endsWith('.' + h));
} catch {
return false;
}
}
// Batch-read all learned routes for a scope via single Upstash pipeline request.
// Returns Map<key → routeData>. Non-fatal: throws on HTTP error (caller catches).
export async function bulkReadLearnedRoutes(scope, keys) {
if (!keys.length) return new Map();
const { url, token } = getRedisCredentials();
const pipeline = keys.map(k => ['GET', `seed-routes:${scope}:${k}`]);
const resp = await fetch(`${url}/pipeline`, {
method: 'POST',
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify(pipeline),
signal: AbortSignal.timeout(10_000),
});
if (!resp.ok) throw new Error(`bulkReadLearnedRoutes HTTP ${resp.status}`);
const results = await resp.json();
const map = new Map();
for (let i = 0; i < keys.length; i++) {
const raw = results[i]?.result;
if (!raw) continue;
try { map.set(keys[i], JSON.parse(raw)); }
catch { console.warn(` [routes] malformed JSON for ${keys[i]} — skipping`); }
}
return map;
}
// Batch-write route updates and hard-delete evicted routes via single pipeline.
// Keys in updates always win over deletes (SET/DEL conflict resolution).
// DELs are sent before SETs to ensure correct ordering.
export async function bulkWriteLearnedRoutes(scope, updates, deletes = new Set()) {
const { url, token } = getRedisCredentials();
const ROUTE_TTL = 14 * 24 * 3600; // 14 days
const effectiveDeletes = [...deletes].filter(k => !updates.has(k));
const pipeline = [];
for (const k of effectiveDeletes)
pipeline.push(['DEL', `seed-routes:${scope}:${k}`]);
for (const [k, v] of updates)
pipeline.push(['SET', `seed-routes:${scope}:${k}`, JSON.stringify(v), 'EX', ROUTE_TTL]);
if (!pipeline.length) return;
const resp = await fetch(`${url}/pipeline`, {
method: 'POST',
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify(pipeline),
signal: AbortSignal.timeout(15_000),
});
if (!resp.ok) throw new Error(`bulkWriteLearnedRoutes HTTP ${resp.status}`);
console.log(` [routes] written: ${updates.size} updated, ${effectiveDeletes.length} deleted`);
}
// Decision tree for a single seed item: try learned route first, fall back to EXA.
// All external I/O is injected so this function can be unit-tested without Redis or HTTP.
//
// Returns: { localPrice, sourceSite, routeUpdate, routeDelete }
// routeUpdate — route object to persist (null = nothing to write)
// routeDelete — true if the Redis key should be hard-deleted
export async function processItemRoute({
learned, // route object from Redis, or undefined/null on first run
allowedHosts, // string[] — normalised (no www.), same as EXA includeDomains
currency, // e.g. 'AED'
itemId, // e.g. 'sugar' — used only for log messages
fxRate, // number | null
itemUsdMax = null, // per-item bulk cap in USD (ITEM_USD_MAX[itemId])
tryDirectFetch, // async (url, currency, itemId, fxRate) => number | null
scrapeFirecrawl, // async (url, currency) => { price, source } | null
fetchViaExa, // async () => { localPrice, sourceSite } | null (caller owns EXA+FC logic)
sleep: sleepFn, // async ms => void
firecrawlDelayMs = 0,
}) {
let localPrice = null;
let sourceSite = '';
let routeUpdate = null;
let routeDelete = false;
if (learned) {
if (learned.failsSinceSuccess >= 2 || !isAllowedRouteHost(learned.url, allowedHosts)) {
routeDelete = true;
console.log(` [learned✗] ${itemId}: evicting (${learned.failsSinceSuccess >= 2 ? '2 failures' : 'invalid host'})`);
} else {
localPrice = await tryDirectFetch(learned.url, currency, itemId, fxRate);
if (localPrice !== null) {
sourceSite = learned.url;
routeUpdate = { ...learned, hits: learned.hits + 1, failsSinceSuccess: 0, lastSuccessAt: Date.now() };
console.log(` [learned✓] ${itemId}: ${localPrice} ${currency}`);
} else {
await sleepFn(firecrawlDelayMs);
const fc = await scrapeFirecrawl(learned.url, currency);
const fcSkip = fc && fxRate && itemUsdMax && (fc.price * fxRate) > itemUsdMax;
if (fc && !fcSkip) {
localPrice = fc.price;
sourceSite = fc.source;
routeUpdate = { ...learned, hits: learned.hits + 1, failsSinceSuccess: 0, lastSuccessAt: Date.now() };
console.log(` [learned-FC✓] ${itemId}: ${localPrice} ${currency}`);
} else {
const newFails = learned.failsSinceSuccess + 1;
if (newFails >= 2) {
routeDelete = true;
console.log(` [learned✗→EXA] ${itemId}: 2 failures — evicting, retrying via EXA`);
} else {
routeUpdate = { ...learned, failsSinceSuccess: newFails };
console.log(` [learned✗→EXA] ${itemId}: failed (${newFails}/2), retrying via EXA`);
}
}
}
}
}
if (localPrice === null) {
const exaResult = await fetchViaExa();
if (exaResult?.localPrice != null) {
localPrice = exaResult.localPrice;
sourceSite = exaResult.sourceSite || '';
if (sourceSite && isAllowedRouteHost(sourceSite, allowedHosts)) {
routeUpdate = { url: sourceSite, lastSuccessAt: Date.now(), hits: 1, failsSinceSuccess: 0, currency };
console.log(` [EXA->learned] ${itemId}: saved ${sourceSite.slice(0, 55)}`);
}
}
}
return { localPrice, sourceSite, routeUpdate, routeDelete };
}
/**
* Shared FX rates cache — reads from Redis `shared:fx-rates:v1` (4h TTL).
* Falls back to fetching from Yahoo Finance if the key is missing/expired.
* All seeds needing currency conversion should call this instead of their own fetchFxRates().
*
* @param {Record<string, string>} fxSymbols - map of { CCY: 'CCYUSD=X' }
* @param {Record<string, number>} fallbacks - hardcoded rates to use if Yahoo fails
*/
export async function getSharedFxRates(fxSymbols, fallbacks) {
const SHARED_KEY = 'shared:fx-rates:v1';
const { url, token } = getRedisCredentials();
// Try reading cached rates first (read-only — only seed-fx-rates.mjs writes this key)
try {
const cached = await redisGet(url, token, SHARED_KEY);
if (cached && typeof cached === 'object' && Object.keys(cached).length > 0) {
console.log(' FX rates: loaded from shared cache');
// Fill any missing currencies this seed needs using Yahoo or fallback
const missing = Object.keys(fxSymbols).filter(c => cached[c] == null);
if (missing.length === 0) return cached;
console.log(` FX rates: fetching ${missing.length} missing currencies from Yahoo`);
const extra = await fetchYahooFxRates(
Object.fromEntries(missing.map(c => [c, fxSymbols[c]])),
fallbacks,
);
return { ...cached, ...extra };
}
} catch {
// Cache read failed — fall through to live fetch
}
console.log(' FX rates: cache miss — fetching from Yahoo Finance');
return fetchYahooFxRates(fxSymbols, fallbacks);
}
export async function fetchYahooFxRates(fxSymbols, fallbacks) {
const rates = {};
for (const [currency, symbol] of Object.entries(fxSymbols)) {
if (currency === 'USD') { rates['USD'] = 1.0; continue; }
try {
const url = `https://query1.finance.yahoo.com/v8/finance/chart/${encodeURIComponent(symbol)}`;
const resp = await fetch(url, {
headers: { 'User-Agent': CHROME_UA },
signal: AbortSignal.timeout(8_000),
});
if (!resp.ok) { rates[currency] = fallbacks[currency] ?? null; continue; }
const data = await resp.json();
const price = data?.chart?.result?.[0]?.meta?.regularMarketPrice;
rates[currency] = (price != null && price > 0) ? price : (fallbacks[currency] ?? null);
} catch {
rates[currency] = fallbacks[currency] ?? null;
}
await new Promise(r => setTimeout(r, 100));
}
console.log(' FX rates fetched:', JSON.stringify(rates));
return rates;
}
/**
* Read the current canonical snapshot from Redis before a seed run overwrites it.
* Used by seed scripts that compute WoW deltas (bigmac, grocery-basket).
* Returns null on any error — scripts must handle first-run (no prev data).
*/
export async function readSeedSnapshot(canonicalKey) {
const url = process.env.UPSTASH_REDIS_REST_URL;
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
if (!url || !token) return null;
try {
const resp = await fetch(`${url}/get/${encodeURIComponent(canonicalKey)}`, {
headers: { Authorization: `Bearer ${token}` },
signal: AbortSignal.timeout(5_000),
});
if (!resp.ok) return null;
const { result } = await resp.json();
return result ? JSON.parse(result) : null;
} catch {
return null;
}
}
export function parseYahooChart(data, symbol) {
const result = data?.chart?.result?.[0];
const meta = result?.meta;
if (!meta) return null;
const price = meta.regularMarketPrice;
const prevClose = meta.chartPreviousClose || meta.previousClose || price;
const change = prevClose ? ((price - prevClose) / prevClose) * 100 : 0;
const closes = result.indicators?.quote?.[0]?.close;
const sparkline = Array.isArray(closes) ? closes.filter((v) => v != null) : [];
return { symbol, name: symbol, display: symbol, price, change: +change.toFixed(2), sparkline };
}
export async function runSeed(domain, resource, canonicalKey, fetchFn, opts = {}) {
const {
validateFn,
ttlSeconds,
lockTtlMs = 120_000,
extraKeys,
afterPublish,
publishTransform,
} = opts;
const runId = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const startMs = Date.now();
console.log(`=== ${domain}:${resource} Seed ===`);
console.log(` Run ID: ${runId}`);
console.log(` Key: ${canonicalKey}`);
// Acquire lock
const lockResult = await acquireLockSafely(`${domain}:${resource}`, runId, lockTtlMs, {
label: `${domain}:${resource}`,
});
if (lockResult.skipped) {
process.exit(0);
}
if (!lockResult.locked) {
console.log(' SKIPPED: another seed run in progress');
process.exit(0);
}
// Phase 1: Fetch data (graceful on failure — extend TTL on stale data)
let data;
try {
data = await withRetry(fetchFn);
} catch (err) {
await releaseLock(`${domain}:${resource}`, runId);
const durationMs = Date.now() - startMs;
const cause = err.cause ? ` (cause: ${err.cause.message || err.cause.code || err.cause})` : '';
console.error(` FETCH FAILED: ${err.message || err}${cause}`);
const ttl = ttlSeconds || 600;
const keys = [canonicalKey, `seed-meta:${domain}:${resource}`];
if (extraKeys) keys.push(...extraKeys.map(ek => ek.key));
await extendExistingTtl(keys, ttl);
console.log(`\n=== Failed gracefully (${Math.round(durationMs)}ms) ===`);
process.exit(0);
}
// Phase 2: Publish to Redis (rethrow on failure — data was fetched but not stored)
try {
const publishData = publishTransform ? publishTransform(data) : data;
const publishResult = await atomicPublish(canonicalKey, publishData, validateFn, ttlSeconds);
if (publishResult.skipped) {
const durationMs = Date.now() - startMs;
const keys = [canonicalKey, `seed-meta:${domain}:${resource}`];
if (extraKeys) keys.push(...extraKeys.map(ek => ek.key));
await extendExistingTtl(keys, ttlSeconds || 600);
// Always write seed-meta even when data is empty so health checks can
// distinguish "seeder ran but nothing to publish" from "seeder stopped".
await writeFreshnessMetadata(domain, resource, 0, opts.sourceVersion);
console.log(` SKIPPED: validation failed (empty data) — seed-meta refreshed, existing cache TTL extended`);
console.log(`\n=== Done (${Math.round(durationMs)}ms, no write) ===`);
await releaseLock(`${domain}:${resource}`, runId);
process.exit(0);
}
const { payloadBytes } = publishResult;
const topicArticleCount = Array.isArray(data?.topics)
? data.topics.reduce((n, t) => n + (t?.articles?.length || t?.events?.length || 0), 0)
: undefined;
const recordCount = opts.recordCount != null
? (typeof opts.recordCount === 'function' ? opts.recordCount(data) : opts.recordCount)
: Array.isArray(data) ? data.length
: (topicArticleCount
?? data?.predictions?.length
?? data?.events?.length ?? data?.earthquakes?.length ?? data?.outages?.length
?? data?.fireDetections?.length ?? data?.anomalies?.length ?? data?.threats?.length
?? data?.quotes?.length ?? data?.stablecoins?.length
?? data?.cables?.length ?? 0);
// Write extra keys (e.g., bootstrap hydration keys)
if (extraKeys) {
for (const ek of extraKeys) {
await writeExtraKey(ek.key, ek.transform ? ek.transform(data) : data, ek.ttl || ttlSeconds);
}
}
if (afterPublish) {
await afterPublish(data, { canonicalKey, ttlSeconds, recordCount, runId });
}
const meta = await writeFreshnessMetadata(domain, resource, recordCount, opts.sourceVersion);
const durationMs = Date.now() - startMs;
logSeedResult(domain, recordCount, durationMs, { payloadBytes });
// Verify (best-effort: write already succeeded, don't fail the job on transient read issues)
let verified = false;
for (let attempt = 0; attempt < 2; attempt++) {
try {
verified = !!(await verifySeedKey(canonicalKey));
if (verified) break;
if (attempt === 0) await new Promise(r => setTimeout(r, 500));
} catch {
if (attempt === 0) await new Promise(r => setTimeout(r, 500));
}
}
if (verified) {
console.log(` Verified: data present in Redis`);
} else {
console.warn(` WARNING: verification read returned null for ${canonicalKey} (write succeeded, may be transient)`);
}
console.log(`\n=== Done (${Math.round(durationMs)}ms) ===`);
await releaseLock(`${domain}:${resource}`, runId);
process.exit(0);
} catch (err) {
await releaseLock(`${domain}:${resource}`, runId);
throw err;
}
}