Revert "feat: seed orchestrator with auto-seeding, persistence, and managemen…" (#2060)

This reverts commit bb79386d24.
This commit is contained in:
Elie Habib
2026-03-22 19:59:42 +04:00
committed by GitHub
parent bb79386d24
commit 429b1d99dd
16 changed files with 5 additions and 1833 deletions

View File

@@ -208,63 +208,6 @@ RELAY_METRICS_WINDOW_SECONDS=60
CORRIDOR_RISK_API_KEY=
# ------ Weather Intelligence (Docker seeder) ------
# Windy.com API (wind/weather overlays — optional)
# Register at: https://api.windy.com/
WINDY_API_KEY=
# ------ Cryptocurrency Data (Docker seeder) ------
# CoinGecko API (crypto prices — optional, free tier available)
# Register at: https://www.coingecko.com/en/api
COINGECKO_API_KEY=
# ------ Trade & Tariff Data (Docker seeder) ------
# World Trade Organization API (trade policy data — optional)
# Register at: https://apiportal.wto.org/
WTO_API_KEY=
# ------ Proxy Authentication (Docker seeder) ------
# OpenSky proxy auth token (for self-hosted relay proxy)
OPENSKY_PROXY_AUTH=
# Oref (Israel Home Front Command) proxy auth token
OREF_PROXY_AUTH=
# ------ Cyber Threat Intelligence (Docker seeder) ------
# AbuseIPDB (IP reputation data — free tier: 1000 checks/day)
# Register at: https://www.abuseipdb.com/
ABUSEIPDB_API_KEY=
# AlienVault OTX (Open Threat Exchange — free)
# Register at: https://otx.alienvault.com/
OTX_API_KEY=
# URLhaus auth key (malware URL feed — free)
# Register at: https://urlhaus.abuse.ch/
URLHAUS_AUTH_KEY=
# ------ Ollama API Key (Docker seeder) ------
# API key for Ollama endpoints that require authentication
OLLAMA_API_KEY=
# ------ Cloudflare R2 Token (Docker seeder) ------
# R2 API token for seed scripts that write to R2 storage
CLOUDFLARE_R2_TOKEN=
# ------ Public Data Sources (no keys required) ------
# UNHCR (UN Refugee Agency) — public API, no auth (CC BY 4.0)

View File

@@ -47,13 +47,6 @@ COPY --from=builder /app/api ./api
# Static data files used by handlers at runtime
COPY --from=builder /app/data ./data
# Seed scripts + orchestrator + shared utils
COPY --from=builder /app/scripts ./scripts
COPY --from=builder /app/shared ./shared
# Node modules needed by seed scripts (e.g. fast-xml-parser for sanctions)
COPY --from=builder /app/node_modules ./node_modules
# Built frontend static files
COPY --from=builder /app/dist /usr/share/nginx/html

View File

@@ -41,28 +41,6 @@ services:
NASA_FIRMS_API_KEY: "${NASA_FIRMS_API_KEY:-}"
CLOUDFLARE_API_TOKEN: "${CLOUDFLARE_API_TOKEN:-}"
AVIATIONSTACK_API: "${AVIATIONSTACK_API:-}"
# Seeder API keys — all optional, features degrade gracefully without them
WINDY_API_KEY: "${WINDY_API_KEY:-}"
COINGECKO_API_KEY: "${COINGECKO_API_KEY:-}"
ICAO_API_KEY: "${ICAO_API_KEY:-}"
WTO_API_KEY: "${WTO_API_KEY:-}"
WINGBITS_API_KEY: "${WINGBITS_API_KEY:-}"
OPENSKY_CLIENT_ID: "${OPENSKY_CLIENT_ID:-}"
OPENSKY_CLIENT_SECRET: "${OPENSKY_CLIENT_SECRET:-}"
OPENSKY_PROXY_AUTH: "${OPENSKY_PROXY_AUTH:-}"
OREF_PROXY_AUTH: "${OREF_PROXY_AUTH:-}"
UCDP_ACCESS_TOKEN: "${UCDP_ACCESS_TOKEN:-}"
ACLED_EMAIL: "${ACLED_EMAIL:-}"
ACLED_PASSWORD: "${ACLED_PASSWORD:-}"
ABUSEIPDB_API_KEY: "${ABUSEIPDB_API_KEY:-}"
OTX_API_KEY: "${OTX_API_KEY:-}"
URLHAUS_AUTH_KEY: "${URLHAUS_AUTH_KEY:-}"
OLLAMA_API_KEY: "${OLLAMA_API_KEY:-}"
OLLAMA_MODEL: "${OLLAMA_MODEL:-}"
CLOUDFLARE_R2_ACCOUNT_ID: "${CLOUDFLARE_R2_ACCOUNT_ID:-}"
CLOUDFLARE_R2_TOKEN: "${CLOUDFLARE_R2_TOKEN:-}"
# Testing
SEED_TURBO: "${SEED_TURBO:-}"
# Docker secrets (recommended for API keys — keeps them out of docker inspect).
# Create secrets/ dir with one file per key, then uncomment below.
# See SELF_HOSTING.md or docker-compose.override.yml for details.
@@ -94,7 +72,7 @@ services:
redis:
image: docker.io/redis:7-alpine
container_name: worldmonitor-redis
command: redis-server --maxmemory ${REDIS_MAXMEMORY:-512mb} --maxmemory-policy allkeys-lru --save 300 10 --save 900 1
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
volumes:
- redis-data:/data
restart: unless-stopped

View File

@@ -22,15 +22,3 @@ stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:seed-orchestrator]
command=node /app/scripts/seed-orchestrator.mjs
directory=/app
autostart=true
autorestart=true
startretries=5
startsecs=5
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0

View File

@@ -1,71 +0,0 @@
/**
* Central seed catalog — defines all 42 seeders with their scheduling parameters.
*
* Fields:
* tier — 'hot' | 'warm' | 'cold' | 'frozen'
* intervalMin — refresh interval in minutes (always < ttlSec / 60)
* ttlSec — Redis TTL in seconds (how long data is valid)
* ttlSource — 'source' = TTL from upstream API/script constant, 'inferred' = our estimate
* requiredKeys — env vars required for the seeder to run (excluding UPSTASH_REDIS_*)
* metaKey — seed-meta key pattern (domain:resource) for freshness tracking.
* Audited from runSeed(domain, resource) / writeFreshnessMetadata() calls.
* null = seeder doesn't write seed-meta; orchestrator writes its own.
*/
export const TIER_ORDER = ['hot', 'warm', 'cold', 'frozen'];
export const TIER_CONCURRENCY = { hot: 3, warm: 5, cold: 3, frozen: 2 };
export const STEADY_STATE_CONCURRENCY = 5;
export const SEED_CATALOG = {
// ── HOT (5-15 min) ─────────────────────────────────────────────────
'weather-alerts': { tier: 'hot', intervalMin: 5, ttlSec: 900, ttlSource: 'source', requiredKeys: [], metaKey: 'weather:alerts' },
'correlation': { tier: 'hot', intervalMin: 5, ttlSec: 1200, ttlSource: 'source', requiredKeys: [], metaKey: 'correlation:cards' },
'prediction-markets': { tier: 'hot', intervalMin: 10, ttlSec: 1800, ttlSource: 'source', requiredKeys: [], metaKey: 'prediction:markets' },
'commodity-quotes': { tier: 'hot', intervalMin: 10, ttlSec: 1800, ttlSource: 'source', requiredKeys: [], metaKey: 'market:commodities' },
'market-quotes': { tier: 'hot', intervalMin: 10, ttlSec: 1800, ttlSource: 'source', requiredKeys: ['FINNHUB_API_KEY'], metaKey: 'market:quotes' },
'insights': { tier: 'hot', intervalMin: 15, ttlSec: 1800, ttlSource: 'source', requiredKeys: [], metaKey: 'news:insights' },
'military-flights': { tier: 'hot', intervalMin: 5, ttlSec: 600, ttlSource: 'source', requiredKeys: [], metaKey: 'military:flights' },
'conflict-intel': { tier: 'hot', intervalMin: 10, ttlSec: 900, ttlSource: 'source', requiredKeys: [], metaKey: 'conflict:acled-intel' }, // accepts ACLED_EMAIL+PASSWORD or ACLED_ACCESS_TOKEN
// ── WARM (30-60 min) ───────────────────────────────────────────────
'earthquakes': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'seismology:earthquakes' },
'security-advisories': { tier: 'warm', intervalMin: 30, ttlSec: 7200, ttlSource: 'source', requiredKeys: [], metaKey: 'intelligence:advisories' },
'fire-detections': { tier: 'warm', intervalMin: 30, ttlSec: 7200, ttlSource: 'inferred', requiredKeys: ['NASA_FIRMS_API_KEY'], metaKey: 'wildfire:fires' },
'natural-events': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'natural:events' },
'radiation-watch': { tier: 'warm', intervalMin: 30, ttlSec: 7200, ttlSource: 'source', requiredKeys: [], metaKey: 'radiation:observations' },
'airport-delays': { tier: 'warm', intervalMin: 30, ttlSec: 7200, ttlSource: 'source', requiredKeys: [], metaKey: 'aviation:faa' },
'crypto-quotes': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'market:crypto' },
'stablecoin-markets': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'market:stablecoins' },
'gulf-quotes': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'market:gulf-quotes' },
'etf-flows': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'market:etf-flows' },
'economy': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: ['FRED_API_KEY'], metaKey: 'economic:energy-prices' },
'research': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'research:arxiv-hn-trending' },
'unrest-events': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'unrest:events' },
'usa-spending': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'economic:spending' },
'supply-chain-trade': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: ['FRED_API_KEY'], metaKey: 'supply_chain:shipping' },
'aviation': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'inferred', requiredKeys: ['AVIATIONSTACK_API'], metaKey: 'aviation:ops-news' },
'internet-outages': { tier: 'warm', intervalMin: 15, ttlSec: 1800, ttlSource: 'source', requiredKeys: ['CLOUDFLARE_API_TOKEN'], metaKey: 'infra:outages' },
'infra': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'inferred', requiredKeys: [], metaKey: null },
'service-statuses': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'inferred', requiredKeys: [], metaKey: 'infra:service-statuses' },
'military-maritime-news': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'inferred', requiredKeys: [], metaKey: null },
'sanctions-pressure': { tier: 'warm', intervalMin: 30, ttlSec: 43200, ttlSource: 'source', requiredKeys: [], metaKey: 'sanctions:pressure' },
'forecasts': { tier: 'warm', intervalMin: 60, ttlSec: 6300, ttlSource: 'source', requiredKeys: [], metaKey: 'forecast:predictions' },
// ── COLD (2-6 hours) ───────────────────────────────────────────────
'cyber-threats': { tier: 'cold', intervalMin: 120, ttlSec: 10800, ttlSource: 'source', requiredKeys: [], metaKey: 'cyber:threats' },
'climate-anomalies': { tier: 'cold', intervalMin: 120, ttlSec: 10800, ttlSource: 'source', requiredKeys: [], metaKey: 'climate:anomalies' },
'thermal-escalation': { tier: 'cold', intervalMin: 120, ttlSec: 10800, ttlSource: 'source', requiredKeys: [], metaKey: 'thermal:escalation' },
'gdelt-intel': { tier: 'cold', intervalMin: 120, ttlSec: 86400, ttlSource: 'source', requiredKeys: [], metaKey: 'intelligence:gdelt-intel' },
'webcams': { tier: 'cold', intervalMin: 360, ttlSec: 86400, ttlSource: 'inferred', requiredKeys: ['WINDY_API_KEY'], metaKey: 'webcam:cameras:geo' },
'iran-events': { tier: 'cold', intervalMin: 360, ttlSec: 172800, ttlSource: 'source', requiredKeys: [], metaKey: 'conflict:iran-events' },
// ── FROZEN (12h-7d) ────────────────────────────────────────────────
'bis-data': { tier: 'frozen', intervalMin: 600, ttlSec: 43200, ttlSource: 'source', requiredKeys: [], metaKey: 'economic:bis' },
'displacement-summary': { tier: 'frozen', intervalMin: 720, ttlSec: 86400, ttlSource: 'source', requiredKeys: [], metaKey: 'displacement:summary' },
'submarine-cables': { tier: 'frozen', intervalMin: 1440, ttlSec: 604800, ttlSource: 'source', requiredKeys: [], metaKey: 'infrastructure:submarine-cables' },
'military-bases': { tier: 'frozen', intervalMin: 1440, ttlSec: 604800, ttlSource: 'inferred', requiredKeys: [], metaKey: null },
'ucdp-events': { tier: 'frozen', intervalMin: 720, ttlSec: 86400, ttlSource: 'inferred', requiredKeys: [], metaKey: 'conflict:ucdp-events' },
'wb-indicators': { tier: 'frozen', intervalMin: 720, ttlSec: 86400, ttlSource: 'inferred', requiredKeys: [], metaKey: null },
};

View File

@@ -21,9 +21,7 @@ import { loadEnvFile, CHROME_UA } from './_seed-utils.mjs';
loadEnvFile(import.meta.url);
const API_BASE = process.env.LOCAL_API_MODE === 'docker'
? `http://localhost:${process.env.LOCAL_API_PORT || 46123}`
: 'https://api.worldmonitor.app';
const API_BASE = 'https://api.worldmonitor.app';
const TIMEOUT = 30_000;
async function warmPing(name, path) {

View File

@@ -187,9 +187,7 @@ function categorizeStory(title) {
}
async function warmDigestCache() {
const apiBase = process.env.API_BASE_URL
|| (process.env.LOCAL_API_MODE === 'docker' ? `http://localhost:${process.env.LOCAL_API_PORT || 46123}` : null)
|| 'https://api.worldmonitor.app';
const apiBase = process.env.API_BASE_URL || 'https://api.worldmonitor.app';
try {
const resp = await fetch(`${apiBase}/api/news/v1/list-feed-digest?variant=full&lang=en`, {
headers: { 'User-Agent': CHROME_UA },

View File

@@ -24,9 +24,7 @@ import { loadEnvFile, CHROME_UA } from './_seed-utils.mjs';
loadEnvFile(import.meta.url);
const API_BASE = process.env.LOCAL_API_MODE === 'docker'
? `http://localhost:${process.env.LOCAL_API_PORT || 46123}`
: 'https://api.worldmonitor.app';
const API_BASE = 'https://api.worldmonitor.app';
const TIMEOUT = 30_000;
async function warmPing(name, path, body = {}) {

View File

@@ -1,552 +0,0 @@
#!/usr/bin/env node
/**
* Seed orchestrator — ties together the catalog, runner, freshness checks,
* tiered cold start, and recurring scheduling with graceful shutdown.
*
* Exports `classifySeeders` and `buildStartupSummary` for testing (pure functions).
* Redis and all side-effects are deferred to `main()`.
*/
import { dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';
import { createLogger } from './seed-utils/logger.mjs';
import { parseFreshness, isFresh, buildMeta } from './seed-utils/meta.mjs';
import { forkSeeder } from './seed-utils/runner.mjs';
import {
SEED_CATALOG,
TIER_ORDER,
TIER_CONCURRENCY,
STEADY_STATE_CONCURRENCY,
} from './seed-config.mjs';
import { getRedisCredentials } from './_seed-utils.mjs';
/**
* Dry-run mock seeder — simulates a seeder with random sleep and 10% failure rate.
* Used when SEED_TURBO=dry.
*/
async function dryRunSeeder(name) {
const sleepMs = 100 + Math.random() * 400;
await new Promise((r) => setTimeout(r, sleepMs));
const fail = Math.random() < 0.1;
return {
name,
exitCode: fail ? 1 : 0,
status: fail ? 'error' : 'ok',
durationMs: Math.round(sleepMs),
};
}
// ────────────────────────────────────────────────────────────────────────────
// Thin Redis helpers — use the same REST API as _seed-utils.mjs's private
// redisGet/redisSet but scoped to orchestrator needs. We reuse
// getRedisCredentials() (deferred to main()) rather than duplicating env reads.
// ────────────────────────────────────────────────────────────────────────────
async function redisGet(url, token, key) {
const resp = await fetch(`${url}/get/${encodeURIComponent(key)}`, {
headers: { Authorization: `Bearer ${token}` },
signal: AbortSignal.timeout(5_000),
});
if (!resp.ok) return null;
const data = await resp.json();
return data.result ? JSON.parse(data.result) : null;
}
async function redisSet(url, token, key, value, ttlSeconds) {
const payload = JSON.stringify(value);
const cmd = ttlSeconds
? ['SET', key, payload, 'EX', ttlSeconds]
: ['SET', key, payload];
const resp = await fetch(url, {
method: 'POST',
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify(cmd),
signal: AbortSignal.timeout(15_000),
});
if (!resp.ok) {
const text = await resp.text().catch(() => '');
throw new Error(`Redis SET failed: HTTP ${resp.status}${text.slice(0, 200)}`);
}
return resp.json();
}
const __dirname = dirname(fileURLToPath(import.meta.url));
const log = createLogger('orchestrator');
let RETRY_DELAY_MS = 60_000;
const MAX_CONSECUTIVE_FAILURES = 5;
const SHUTDOWN_TIMEOUT_MS = 15_000;
const REDIS_PING_INTERVAL_MS = 3_000;
const REDIS_PING_MAX_ATTEMPTS = 20;
// ────────────────────────────────────────────────────────────────────────────
// Pure, testable functions
// ────────────────────────────────────────────────────────────────────────────
/**
* Classify seeders into active (runnable) and skipped (missing env vars).
* @param {Record<string, object>} catalog
* @param {Record<string, string>} env — process.env or a subset
* @returns {{ active: Array<{ name: string, tier: string, intervalMin: number, ttlSec: number, metaKey: string|null }>, skipped: Array<{ name: string, reason: string }> }}
*/
export function classifySeeders(catalog, env = process.env) {
const active = [];
const skipped = [];
for (const [name, cfg] of Object.entries(catalog)) {
const missing = cfg.requiredKeys.filter((k) => !env[k]);
if (missing.length > 0) {
skipped.push({ name, reason: `missing ${missing.join(', ')}` });
} else {
active.push({
name,
tier: cfg.tier,
intervalMin: cfg.intervalMin,
ttlSec: cfg.ttlSec,
metaKey: cfg.metaKey,
});
}
}
return { active, skipped };
}
/**
* Build a human-readable startup summary.
* @param {Array<{ name: string, tier: string }>} active
* @param {Array<{ name: string, reason: string }>} skipped
* @param {number} freshCount — number of seeders with fresh data
* @returns {string}
*/
export function buildStartupSummary(active, skipped, freshCount) {
const lines = [];
lines.push('');
lines.push('=== Seed Orchestrator Startup ===');
lines.push('');
lines.push(` ACTIVE (${active.length})`);
for (const tier of TIER_ORDER) {
const inTier = active.filter((s) => s.tier === tier);
if (inTier.length > 0) {
lines.push(` ${tier}: ${inTier.map((s) => s.name).join(', ')}`);
}
}
lines.push('');
lines.push(` SKIPPED (${skipped.length})`);
for (const s of skipped) {
lines.push(` ${s.name}: ${s.reason}`);
}
lines.push('');
lines.push(` ${freshCount}/${active.length} seeders have fresh data`);
lines.push('');
return lines.join('\n');
}
/**
* Compute effective interval with turbo mode and failure demotion.
* @param {number} intervalMin — base interval from catalog
* @param {number} failureCount — consecutive failures for this seeder
* @param {string|undefined} turboMode — 'real', 'dry', or undefined
* @returns {number} — interval in milliseconds
*/
export function getEffectiveInterval(intervalMin, failureCount, turboMode) {
const div = turboMode ? 20 : 1;
let min = Math.max(1, Math.round(intervalMin / div));
if (failureCount >= MAX_CONSECUTIVE_FAILURES) min *= 2;
return min * 60_000;
}
/**
* Check if a seeder should skip this cycle (overlap protection).
* @param {string} name
* @param {Set<string>} inFlight
* @returns {boolean}
*/
export function shouldSkipCycle(name, inFlight) {
return inFlight.has(name);
}
/**
* Compute turbo-adjusted interval in minutes.
* @param {number} intervalMin
* @param {string|undefined} turboMode
* @returns {number}
*/
export function computeTurboInterval(intervalMin, turboMode) {
if (!turboMode) return intervalMin;
return Math.max(1, Math.round(intervalMin / 20));
}
// ────────────────────────────────────────────────────────────────────────────
// Orchestrator runtime (only used when executed directly)
// ────────────────────────────────────────────────────────────────────────────
/**
* Wait for the Redis REST proxy to become reachable.
*/
async function waitForRedis(url, token) {
for (let i = 0; i < REDIS_PING_MAX_ATTEMPTS; i++) {
try {
const resp = await fetch(`${url}/ping`, {
headers: { Authorization: `Bearer ${token}` },
signal: AbortSignal.timeout(3_000),
});
if (resp.ok) {
log.info('Redis is reachable');
return;
}
} catch {
// retry
}
if (i < REDIS_PING_MAX_ATTEMPTS - 1) {
log.info(`Waiting for Redis... (attempt ${i + 1}/${REDIS_PING_MAX_ATTEMPTS})`);
await new Promise((r) => setTimeout(r, REDIS_PING_INTERVAL_MS));
}
}
throw new Error(`Redis not reachable after ${REDIS_PING_MAX_ATTEMPTS} attempts`);
}
/**
* Fetch freshness metadata for all active seeders.
*/
async function fetchFreshnessMap(activeSeeders, url, token) {
const map = new Map();
await Promise.all(
activeSeeders.map(async (s) => {
const key = s.metaKey ? `seed-meta:${s.metaKey}` : `seed-meta:orchestrator:${s.name}`;
try {
const raw = await redisGet(url, token, key);
const meta = parseFreshness(raw);
if (meta) map.set(s.name, meta);
} catch {
// missing/error — treat as stale
}
}),
);
return map;
}
/**
* Resolve the meta key to check/write for a given seeder.
*/
function resolveMetaKey(seeder) {
return seeder.metaKey ? `seed-meta:${seeder.metaKey}` : `seed-meta:orchestrator:${seeder.name}`;
}
/**
* Execute a single seeder: fork the child process, handle meta writing.
*/
async function executeSeed(seeder, url, token, turboMode) {
const scriptPath = join(__dirname, `seed-${seeder.name}.mjs`);
log.info(`Running ${seeder.name}...`);
// Read meta before fork so we can detect if the seeder updated it
let metaBefore = null;
const metaKey = resolveMetaKey(seeder);
if (turboMode !== 'dry') {
try {
metaBefore = await redisGet(url, token, metaKey);
} catch {
// ignore
}
}
const timeoutMs = turboMode === 'dry' ? 30_000 : 120_000;
const result = turboMode === 'dry'
? await dryRunSeeder(seeder.name)
: await forkSeeder(seeder.name, {
scriptPath: process.execPath,
args: [scriptPath],
timeoutMs,
});
log.info(`${seeder.name} finished: ${result.status} (${result.durationMs}ms)`);
// Meta writing logic (skip in dry mode)
if (turboMode !== 'dry') {
if (result.status === 'ok') {
if (seeder.metaKey) {
// Check if the seeder already updated its own meta
try {
const metaAfter = await redisGet(url, token, metaKey);
const parsed = parseFreshness(metaAfter);
const parsedBefore = parseFreshness(metaBefore);
const alreadyUpdated =
parsed &&
parsedBefore &&
parsed.fetchedAt > parsedBefore.fetchedAt;
const freshlyWritten = parsed && !parsedBefore;
if (!alreadyUpdated && !freshlyWritten) {
// Seeder didn't update meta — write it ourselves
const meta = buildMeta(result.durationMs, 'ok');
await redisSet(url, token, metaKey, meta, 86400 * 7);
}
} catch {
// Best effort
}
} else {
// metaKey is null — always write orchestrator meta
const meta = buildMeta(result.durationMs, 'ok');
await redisSet(url, token, metaKey, meta, 86400 * 7);
}
} else {
// Write error meta for null-metaKey seeders
if (!seeder.metaKey) {
const meta = buildMeta(result.durationMs, 'error', `exit ${result.exitCode ?? result.status}`);
await redisSet(url, token, metaKey, meta, 86400 * 7);
}
}
}
return result;
}
/**
* Run a batch of seeders with concurrency control.
*/
async function runBatch(seeders, concurrency, executeFn) {
const results = [];
for (let i = 0; i < seeders.length; i += concurrency) {
const batch = seeders.slice(i, i + concurrency);
const batchResults = await Promise.all(batch.map(executeFn));
results.push(...batchResults);
}
return results;
}
/**
* Run tiered cold start: process tiers in order with tier-specific concurrency.
* Skip seeders that already have fresh data.
*/
async function tieredColdStart(activeSeeders, freshnessMap, url, token, turboMode) {
log.info('Starting tiered cold start...');
for (const tier of TIER_ORDER) {
const tierSeeders = activeSeeders.filter((s) => s.tier === tier);
// Filter out fresh seeders
const stale = tierSeeders.filter((s) => {
const meta = freshnessMap.get(s.name);
return !isFresh(meta, s.intervalMin);
});
if (stale.length === 0) {
log.info(`Tier ${tier}: all ${tierSeeders.length} seeders fresh, skipping`);
continue;
}
const concurrency = TIER_CONCURRENCY[tier];
log.info(`Tier ${tier}: running ${stale.length} stale seeders (concurrency ${concurrency})`);
await runBatch(stale, concurrency, (s) => executeSeed(s, url, token, turboMode));
}
log.info('Tiered cold start complete');
}
/**
* Steady-state scheduler: recurring timers with overlap protection, retry,
* consecutive failure demotion, and global concurrency queue.
*/
function scheduleSeeders(activeSeeders, url, token, state) {
const { timers, inFlight, failureCounts, queue } = state;
function getInterval(seeder) {
const failures = failureCounts.get(seeder.name) || 0;
return getEffectiveInterval(seeder.intervalMin, failures, state.turboMode);
}
function drainQueue() {
while (queue.length > 0 && inFlight.size < STEADY_STATE_CONCURRENCY) {
const next = queue.shift();
runScheduled(next);
}
}
async function runScheduled(seeder) {
inFlight.add(seeder.name);
try {
const result = await executeSeed(seeder, url, token, state.turboMode);
if (result.status === 'ok') {
failureCounts.set(seeder.name, 0);
} else {
const prev = failureCounts.get(seeder.name) || 0;
const newCount = prev + 1;
failureCounts.set(seeder.name, newCount);
if (newCount < 2) {
// First failure — retry after 60s
log.warn(`${seeder.name} failed, scheduling retry in ${RETRY_DELAY_MS / 1000}s`);
const retryTimer = setTimeout(async () => {
if (state.shuttingDown) return;
if (inFlight.has(seeder.name)) return; // overlap protection on retry too
inFlight.add(seeder.name);
try {
const retryResult = await executeSeed(seeder, url, token, state.turboMode);
if (retryResult.status === 'ok') {
failureCounts.set(seeder.name, 0);
} else {
failureCounts.set(seeder.name, (failureCounts.get(seeder.name) || 0) + 1);
log.warn(`${seeder.name} retry failed — waiting for next cycle`);
}
} finally {
inFlight.delete(seeder.name);
drainQueue();
}
}, RETRY_DELAY_MS);
timers.push(retryTimer);
} else {
if (newCount === MAX_CONSECUTIVE_FAILURES) {
log.warn(`${seeder.name}: ${MAX_CONSECUTIVE_FAILURES} consecutive failures — doubling interval`);
}
}
}
} finally {
inFlight.delete(seeder.name);
drainQueue();
}
}
for (const seeder of activeSeeders) {
const schedule = () => {
if (state.shuttingDown) return;
const interval = getInterval(seeder);
const timer = setTimeout(() => {
if (state.shuttingDown) return;
// Overlap protection
if (inFlight.has(seeder.name)) {
log.warn(`${seeder.name} still running, skipping this cycle`);
schedule(); // re-schedule for next interval
return;
}
// Global concurrency check — queue if at limit
if (inFlight.size >= STEADY_STATE_CONCURRENCY) {
log.info(`${seeder.name} queued (concurrency limit)`);
queue.push(seeder);
schedule();
return;
}
runScheduled(seeder).then(() => {
schedule();
});
}, interval);
timers.push(timer);
};
schedule();
}
}
/**
* Graceful shutdown: clear timers, wait for in-flight, exit.
*/
function setupShutdown(state) {
const handler = async (signal) => {
if (state.shuttingDown) return;
state.shuttingDown = true;
log.info(`Received ${signal}, shutting down...`);
// Clear all scheduled timers
for (const t of state.timers) {
clearTimeout(t);
}
state.timers.length = 0;
state.queue.length = 0;
// Wait for in-flight seeders
if (state.inFlight.size > 0) {
log.info(`Waiting for ${state.inFlight.size} in-flight seeder(s)...`);
const deadline = Date.now() + SHUTDOWN_TIMEOUT_MS;
while (state.inFlight.size > 0 && Date.now() < deadline) {
await new Promise((r) => setTimeout(r, 500));
}
if (state.inFlight.size > 0) {
log.warn(`${state.inFlight.size} seeder(s) still running after ${SHUTDOWN_TIMEOUT_MS}ms timeout`);
}
}
log.info('Shutdown complete');
process.exit(0);
};
process.on('SIGTERM', () => handler('SIGTERM'));
process.on('SIGINT', () => handler('SIGINT'));
}
// ────────────────────────────────────────────────────────────────────────────
// Main entry point
// ────────────────────────────────────────────────────────────────────────────
async function main() {
const { url, token } = getRedisCredentials();
// Turbo mode
const turboMode = process.env.SEED_TURBO || undefined;
if (turboMode && turboMode !== 'real' && turboMode !== 'dry') {
log.error(`Invalid SEED_TURBO value: ${turboMode} (expected: real, dry)`);
process.exit(1);
}
if (turboMode) {
log.info(`⚡ TURBO MODE: ${turboMode} (intervals ÷20${turboMode === 'dry' ? ', no real seeders' : ''})`);
RETRY_DELAY_MS = 3_000;
}
// Wait for Redis to be reachable
await waitForRedis(url, token);
// Classify seeders
const { active, skipped } = classifySeeders(SEED_CATALOG);
// Apply turbo interval compression
if (turboMode) {
for (const s of active) {
s.intervalMin = computeTurboInterval(s.intervalMin, turboMode);
}
}
// Fetch freshness for all active seeders
const freshnessMap = await fetchFreshnessMap(active, url, token);
const freshCount = active.filter((s) => {
const meta = freshnessMap.get(s.name);
return isFresh(meta, s.intervalMin);
}).length;
// Log startup summary
const summary = buildStartupSummary(active, skipped, freshCount);
log.info(summary);
// Tiered cold start
await tieredColdStart(active, freshnessMap, url, token, turboMode);
// Set up recurring scheduling
const state = {
timers: [],
inFlight: new Set(),
failureCounts: new Map(),
queue: [],
shuttingDown: false,
turboMode,
};
setupShutdown(state);
scheduleSeeders(active, url, token, state);
log.info('Steady-state scheduling active');
}
// Only run main() when executed directly (not imported for testing)
const isDirectExecution =
process.argv[1] &&
import.meta.url === `file://${process.argv[1]}`;
if (isDirectExecution) {
main().catch((err) => {
log.error(`Fatal: ${err.message}`);
process.exit(1);
});
}

View File

@@ -12,10 +12,7 @@ import { loadEnvFile, CHROME_UA, getRedisCredentials, logSeedResult, extendExist
loadEnvFile(import.meta.url);
const API_BASE = process.env.LOCAL_API_MODE === 'docker'
? `http://localhost:${process.env.LOCAL_API_PORT || 46123}`
: 'https://api.worldmonitor.app';
const RPC_URL = `${API_BASE}/api/infrastructure/v1/list-service-statuses`;
const RPC_URL = 'https://api.worldmonitor.app/api/infrastructure/v1/list-service-statuses';
const CANONICAL_KEY = 'infra:service-statuses:v1';
async function warmPing() {

View File

@@ -1,16 +0,0 @@
/**
* Prefixed console logger for seed orchestrator.
* @param {string} name — seeder name (e.g. 'earthquakes')
* @param {{ write?: (msg: string) => void }} [sink] — override for testing
*/
export function createLogger(name, sink) {
const prefix = name === 'orchestrator' ? '[orchestrator]' : `[seed:${name}]`;
const out = sink?.write ?? ((msg) => console.log(msg));
const err = sink?.write ?? ((msg) => console.error(msg));
return {
info: (msg) => out(`${prefix} ${msg}`),
error: (msg) => err(`${prefix} error: ${msg}`),
warn: (msg) => err(`${prefix} warn: ${msg}`),
};
}

View File

@@ -1,50 +0,0 @@
/**
* Seed metadata helpers for the orchestrator.
*
* Reads/writes `seed-meta:{domain}:{resource}` keys in the same schema
* as `writeFreshnessMetadata` in `_seed-utils.mjs`:
* { fetchedAt, recordCount, sourceVersion, durationMs?, status? }
*/
/**
* Parse a raw seed-meta value from Redis.
* The value may be a parsed object (from _seed-utils.mjs redisGet which JSON.parses)
* or a raw JSON string.
* @param {object|string|null} raw
* @returns {{ fetchedAt: number, recordCount?: number, sourceVersion?: string, status?: string, error?: string } | null}
*/
export function parseFreshness(raw) {
if (raw == null || raw === '') return null;
try {
const obj = typeof raw === 'string' ? JSON.parse(raw) : raw;
if (!obj || typeof obj.fetchedAt !== 'number') return null;
return obj;
} catch {
return null;
}
}
/**
* Check if seed-meta indicates data is still fresh.
* @param {{ fetchedAt: number } | null} meta — parsed seed-meta
* @param {number} intervalMin — refresh interval in minutes
* @returns {boolean}
*/
export function isFresh(meta, intervalMin) {
if (!meta) return false;
const ageMs = Date.now() - meta.fetchedAt;
return ageMs < intervalMin * 60_000;
}
/**
* Build a seed-meta object for the orchestrator to write after a child process completes.
* @param {number} durationMs
* @param {'ok'|'error'} status
* @param {string} [error]
* @returns {object}
*/
export function buildMeta(durationMs, status, error) {
const meta = { fetchedAt: Date.now(), durationMs, status };
if (error) meta.error = error;
return meta;
}

View File

@@ -1,64 +0,0 @@
import { spawn } from 'node:child_process';
/**
* Fork a seed script as a child process and track its execution.
*
* @param {string} name — seeder name for logging
* @param {object} opts
* @param {string} opts.scriptPath — path to the script (or process.execPath for tests)
* @param {string[]} [opts.args] — arguments (default: none)
* @param {number} [opts.timeoutMs=120000] — kill after this many ms
* @param {object} [opts.env] — additional env vars (merged with process.env)
* @returns {Promise<{ name: string, exitCode: number|null, status: 'ok'|'error'|'timeout', durationMs: number }>}
*/
export function forkSeeder(name, opts) {
const { scriptPath, args = [], timeoutMs = 120_000, env } = opts;
return new Promise((resolve) => {
const start = Date.now();
const child = spawn(scriptPath, args, {
stdio: ['ignore', 'inherit', 'inherit'],
env: { ...process.env, ...env },
});
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
child.kill('SIGTERM');
// Give 3s for graceful shutdown, then SIGKILL
setTimeout(() => {
try { child.kill('SIGKILL'); } catch {}
}, 3000);
resolve({ name, exitCode: null, status: 'timeout', durationMs: Date.now() - start });
}
}, timeoutMs);
child.on('close', (code) => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve({
name,
exitCode: code,
status: code === 0 ? 'ok' : 'error',
durationMs: Date.now() - start,
});
}
});
child.on('error', (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve({
name,
exitCode: null,
status: 'error',
durationMs: Date.now() - start,
error: err.message,
});
}
});
});
}

View File

@@ -1,157 +0,0 @@
import assert from 'node:assert/strict';
import { describe, it } from 'node:test';
import {
classifySeeders,
buildStartupSummary,
getEffectiveInterval,
shouldSkipCycle,
computeTurboInterval,
} from '../scripts/seed-orchestrator.mjs';
import { isFresh } from '../scripts/seed-utils/meta.mjs';
// ── getEffectiveInterval ─────────────────────────────────────────────────
describe('getEffectiveInterval', () => {
it('returns normal interval when no turbo and no failures', () => {
// 30 min * 60_000 = 1_800_000 ms
assert.equal(getEffectiveInterval(30, 0, undefined), 1_800_000);
});
it('compresses interval by 20x in turbo mode', () => {
// 30 / 20 = 1.5 → rounds to 2 min → 120_000 ms
assert.equal(getEffectiveInterval(30, 0, 'real'), 120_000);
});
it('doubles interval after 5 consecutive failures', () => {
// 30 min * 2 = 60 min → 3_600_000 ms
assert.equal(getEffectiveInterval(30, 5, undefined), 3_600_000);
});
it('applies both turbo and demotion together', () => {
// 30 / 20 = 1.5 → rounds to 2, then * 2 = 4 min → 240_000 ms
assert.equal(getEffectiveInterval(30, 5, 'dry'), 240_000);
});
it('never returns less than 1 minute (60_000ms)', () => {
// 5 / 20 = 0.25 → floors to 1 min → 60_000 ms
assert.equal(getEffectiveInterval(5, 0, 'real'), 60_000);
});
it('4 failures does not trigger demotion', () => {
assert.equal(getEffectiveInterval(30, 4, undefined), 1_800_000);
});
});
// ── shouldSkipCycle ──────────────────────────────────────────────────────
describe('shouldSkipCycle', () => {
it('returns false when seeder not in flight', () => {
const inFlight = new Set(['other-seeder']);
assert.equal(shouldSkipCycle('earthquakes', inFlight), false);
});
it('returns true when seeder is in flight', () => {
const inFlight = new Set(['earthquakes']);
assert.equal(shouldSkipCycle('earthquakes', inFlight), true);
});
it('returns false for empty in-flight set', () => {
assert.equal(shouldSkipCycle('earthquakes', new Set()), false);
});
});
// ── computeTurboInterval ─────────────────────────────────────────────────
describe('computeTurboInterval', () => {
it('returns original interval when no turbo', () => {
assert.equal(computeTurboInterval(30, undefined), 30);
});
it('divides by 20 in turbo mode', () => {
// 120 / 20 = 6
assert.equal(computeTurboInterval(120, 'real'), 6);
});
it('floors to minimum of 1 minute', () => {
// 5 / 20 = 0.25 → max(1, round(0.25)) = 1
assert.equal(computeTurboInterval(5, 'dry'), 1);
});
it('rounds to nearest integer', () => {
// 30 / 20 = 1.5 → rounds to 2
assert.equal(computeTurboInterval(30, 'real'), 2);
});
});
// ── classifySeeders ──────────────────────────────────────────────────────
describe('classifySeeders', () => {
const catalog = {
'earthquakes': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'seismology:earthquakes' },
'market-quotes': { tier: 'hot', intervalMin: 10, ttlSec: 1800, ttlSource: 'source', requiredKeys: ['FINNHUB_API_KEY'], metaKey: 'market:quotes' },
'economy': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: ['FRED_API_KEY'], metaKey: 'economic:energy-prices' },
};
it('all seeders active when no keys required', () => {
const mini = { 'a': { ...catalog['earthquakes'] } };
const { active, skipped } = classifySeeders(mini, {});
assert.equal(active.length, 1);
assert.equal(skipped.length, 0);
});
it('skips seeders with missing required keys', () => {
const { active, skipped } = classifySeeders(catalog, {});
assert.equal(skipped.length, 2);
assert.ok(skipped.some(s => s.name === 'market-quotes'));
assert.ok(skipped.some(s => s.name === 'economy'));
});
it('includes seeders when required keys present', () => {
const { active, skipped } = classifySeeders(catalog, { FINNHUB_API_KEY: 'x', FRED_API_KEY: 'y' });
assert.equal(active.length, 3);
assert.equal(skipped.length, 0);
});
});
// ── buildStartupSummary ──────────────────────────────────────────────────
describe('buildStartupSummary', () => {
it('includes active count and tier breakdown', () => {
const active = [
{ name: 'earthquakes', tier: 'warm' },
{ name: 'weather-alerts', tier: 'hot' },
];
const summary = buildStartupSummary(active, [], 0);
assert.ok(summary.includes('ACTIVE (2)'));
assert.ok(summary.includes('hot: weather-alerts'));
assert.ok(summary.includes('warm: earthquakes'));
});
it('includes skipped seeders with reasons', () => {
const skipped = [{ name: 'market-quotes', reason: 'missing FINNHUB_API_KEY' }];
const summary = buildStartupSummary([], skipped, 0);
assert.ok(summary.includes('SKIPPED (1)'));
assert.ok(summary.includes('FINNHUB_API_KEY'));
});
it('includes fresh count', () => {
const summary = buildStartupSummary([{ name: 'a', tier: 'hot' }], [], 1);
assert.ok(summary.includes('1/1 seeders have fresh data'));
});
});
// ── freshness with turbo intervals ───────────────────────────────────────
describe('freshness integration', () => {
it('isFresh returns false for data older than turbo interval', () => {
const turboMin = computeTurboInterval(30, 'real'); // 2 min
const meta = { fetchedAt: Date.now() - 3 * 60_000 }; // 3 min ago
assert.equal(isFresh(meta, turboMin), false);
});
it('isFresh returns true for data within turbo interval', () => {
const turboMin = computeTurboInterval(30, 'real'); // 2 min
const meta = { fetchedAt: Date.now() - 30_000 }; // 30 sec ago
assert.equal(isFresh(meta, turboMin), true);
});
});

View File

@@ -1,209 +0,0 @@
import assert from 'node:assert/strict';
import { describe, it } from 'node:test';
import { createLogger } from '../scripts/seed-utils/logger.mjs';
import { parseFreshness, isFresh, buildMeta } from '../scripts/seed-utils/meta.mjs';
import { forkSeeder } from '../scripts/seed-utils/runner.mjs';
import { SEED_CATALOG, TIER_ORDER, TIER_CONCURRENCY, STEADY_STATE_CONCURRENCY } from '../scripts/seed-config.mjs';
import { classifySeeders, buildStartupSummary } from '../scripts/seed-orchestrator.mjs';
describe('logger', () => {
it('prefixes messages with the given name', () => {
const lines = [];
const log = createLogger('earthquakes', { write: (msg) => lines.push(msg) });
log.info('seeded 847 items');
assert.match(lines[0], /\[seed:earthquakes\] seeded 847 items/);
});
it('formats error messages', () => {
const lines = [];
const log = createLogger('webcams', { write: (msg) => lines.push(msg) });
log.error('HTTP 429');
assert.match(lines[0], /\[seed:webcams\] error: HTTP 429/);
});
it('uses orchestrator prefix for orchestrator name', () => {
const lines = [];
const log = createLogger('orchestrator', { write: (msg) => lines.push(msg) });
log.info('starting...');
assert.match(lines[0], /\[orchestrator\] starting\.\.\./);
});
});
describe('meta', () => {
describe('parseFreshness', () => {
it('parses valid seed-meta object (from redisGet which returns parsed JSON)', () => {
const obj = { fetchedAt: 1000, recordCount: 50, sourceVersion: 'v1' };
const result = parseFreshness(obj);
assert.equal(result.fetchedAt, 1000);
assert.equal(result.recordCount, 50);
});
it('parses valid seed-meta string', () => {
const raw = JSON.stringify({ fetchedAt: 1000, recordCount: 50, sourceVersion: 'v1' });
const result = parseFreshness(raw);
assert.equal(result.fetchedAt, 1000);
});
it('returns null for missing data', () => {
assert.equal(parseFreshness(null), null);
assert.equal(parseFreshness(''), null);
assert.equal(parseFreshness(undefined), null);
});
it('returns null for objects without fetchedAt', () => {
assert.equal(parseFreshness({ recordCount: 5 }), null);
});
});
describe('isFresh', () => {
it('returns true when data is within interval', () => {
const meta = { fetchedAt: Date.now() - 60_000 }; // 1 min ago
assert.equal(isFresh(meta, 5), true); // 5 min interval
});
it('returns false when data is stale', () => {
const meta = { fetchedAt: Date.now() - 600_000 }; // 10 min ago
assert.equal(isFresh(meta, 5), false); // 5 min interval
});
it('returns false for null meta', () => {
assert.equal(isFresh(null, 5), false);
});
});
describe('buildMeta', () => {
it('builds success meta', () => {
const meta = buildMeta(2340, 'ok');
assert.equal(meta.status, 'ok');
assert.equal(meta.durationMs, 2340);
assert.ok(meta.fetchedAt > 0);
assert.equal(meta.error, undefined);
});
it('builds error meta with message', () => {
const meta = buildMeta(5200, 'error', 'HTTP 429');
assert.equal(meta.status, 'error');
assert.equal(meta.error, 'HTTP 429');
});
});
});
describe('runner', () => {
it('runs a script that exits 0 and reports success', async () => {
const result = await forkSeeder('test-ok', {
scriptPath: process.execPath,
args: ['-e', 'process.exit(0)'],
timeoutMs: 5000,
});
assert.equal(result.exitCode, 0);
assert.equal(result.status, 'ok');
assert.equal(result.name, 'test-ok');
assert.ok(result.durationMs >= 0);
});
it('runs a script that exits 1 and reports error', async () => {
const result = await forkSeeder('test-fail', {
scriptPath: process.execPath,
args: ['-e', 'process.exit(1)'],
timeoutMs: 5000,
});
assert.equal(result.exitCode, 1);
assert.equal(result.status, 'error');
});
it('kills a script that exceeds timeout', async () => {
const result = await forkSeeder('test-hang', {
scriptPath: process.execPath,
args: ['-e', 'setTimeout(() => {}, 60000)'],
timeoutMs: 500,
});
assert.equal(result.status, 'timeout');
assert.equal(result.exitCode, null);
});
});
describe('seed-config', () => {
it('exports a catalog with entries for all seed scripts', () => {
assert.equal(Object.keys(SEED_CATALOG).length, 42);
});
it('every entry has required fields', () => {
for (const [name, cfg] of Object.entries(SEED_CATALOG)) {
assert.ok(['hot', 'warm', 'cold', 'frozen'].includes(cfg.tier), `${name}: invalid tier ${cfg.tier}`);
assert.ok(typeof cfg.intervalMin === 'number' && cfg.intervalMin > 0, `${name}: invalid intervalMin`);
assert.ok(typeof cfg.ttlSec === 'number' && cfg.ttlSec > 0, `${name}: invalid ttlSec`);
assert.ok(['source', 'inferred'].includes(cfg.ttlSource), `${name}: invalid ttlSource`);
assert.ok(Array.isArray(cfg.requiredKeys), `${name}: requiredKeys must be array`);
assert.ok(cfg.metaKey === null || typeof cfg.metaKey === 'string', `${name}: metaKey must be string or null`);
}
});
it('intervalMin is always less than ttlSec / 60', () => {
for (const [name, cfg] of Object.entries(SEED_CATALOG)) {
assert.ok(cfg.intervalMin < cfg.ttlSec / 60, `${name}: intervalMin ${cfg.intervalMin} >= ttlSec/60 ${cfg.ttlSec / 60}`);
}
});
it('TIER_ORDER defines execution order', () => {
assert.deepEqual(TIER_ORDER, ['hot', 'warm', 'cold', 'frozen']);
});
it('TIER_CONCURRENCY defines concurrency caps', () => {
assert.equal(TIER_CONCURRENCY.hot, 3);
assert.equal(TIER_CONCURRENCY.warm, 5);
assert.equal(TIER_CONCURRENCY.cold, 3);
assert.equal(TIER_CONCURRENCY.frozen, 2);
});
it('STEADY_STATE_CONCURRENCY is 5', () => {
assert.equal(STEADY_STATE_CONCURRENCY, 5);
});
it('every catalog name matches a seed-*.mjs file', async () => {
const fs = await import('node:fs');
const path = await import('node:path');
for (const name of Object.keys(SEED_CATALOG)) {
const filePath = path.join(process.cwd(), 'scripts', `seed-${name}.mjs`);
assert.ok(fs.existsSync(filePath), `${name}: missing file seed-${name}.mjs`);
}
});
});
describe('orchestrator', () => {
describe('classifySeeders', () => {
it('splits seeders into active and skipped based on env vars', () => {
const catalog = {
'earthquakes': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'seismology:earthquakes' },
'market-quotes': { tier: 'hot', intervalMin: 10, ttlSec: 1800, ttlSource: 'source', requiredKeys: ['FINNHUB_API_KEY'], metaKey: 'market:quotes' },
'economy': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: ['FRED_API_KEY'], metaKey: 'economic:energy-prices' },
};
const env = { FRED_API_KEY: 'test' };
const { active, skipped } = classifySeeders(catalog, env);
assert.equal(active.length, 2); // earthquakes + economy
assert.equal(skipped.length, 1); // market-quotes
assert.ok(skipped[0].name === 'market-quotes');
assert.ok(skipped[0].reason.includes('FINNHUB_API_KEY'));
assert.ok(active.some(s => s.name === 'earthquakes'));
assert.ok(active.some(s => s.name === 'economy'));
});
});
describe('buildStartupSummary', () => {
it('returns formatted summary string', () => {
const active = [
{ name: 'earthquakes', tier: 'warm' },
{ name: 'weather-alerts', tier: 'hot' },
];
const skipped = [
{ name: 'market-quotes', reason: 'missing FINNHUB_API_KEY' },
];
const summary = buildStartupSummary(active, skipped, 1);
assert.ok(summary.includes('ACTIVE (2)'));
assert.ok(summary.includes('SKIPPED (1)'));
assert.ok(summary.includes('market-quotes'));
assert.ok(summary.includes('FINNHUB_API_KEY'));
assert.ok(summary.includes('1/2 seeders have fresh data'));
});
});
});

602
wmsm.sh
View File

@@ -1,602 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
# ═══════════════════════════════════════════════════════════════════════════════
# 🌍 World Monitor Seed Manager (wmsm)
# Homelab CLI for managing the seed orchestrator.
# Keep CATALOG in sync with scripts/seed-config.mjs
# ═══════════════════════════════════════════════════════════════════════════════
REDIS_URL="${REDIS_URL:-http://localhost:8079}"
REDIS_TOKEN="${REDIS_TOKEN:-wm-local-token}"
CONTAINER="${WM_CONTAINER:-worldmonitor}"
# Auto-detect container runtime: docker or podman
if command -v docker >/dev/null 2>&1; then
DOCKER=docker
elif command -v podman >/dev/null 2>&1; then
DOCKER=podman
else
DOCKER=docker # will fail at check_deps with helpful message
fi
# ── Catalog: name|tier|intervalMin|ttlSec|metaKey ─────────────────────────────
# metaKey "null" means orchestrator writes seed-meta:orchestrator:{name}
CATALOG=(
# HOT (5-15 min)
"weather-alerts|hot|5|900|weather:alerts"
"correlation|hot|5|1200|correlation:cards"
"prediction-markets|hot|10|1800|prediction:markets"
"commodity-quotes|hot|10|1800|market:commodities"
"market-quotes|hot|10|1800|market:quotes"
"insights|hot|15|1800|news:insights"
"military-flights|hot|5|600|military:flights"
"conflict-intel|hot|10|900|conflict:acled-intel"
# WARM (30-60 min)
"earthquakes|warm|30|3600|seismology:earthquakes"
"security-advisories|warm|30|7200|intelligence:advisories"
"fire-detections|warm|30|7200|wildfire:fires"
"natural-events|warm|30|3600|natural:events"
"radiation-watch|warm|30|7200|radiation:observations"
"airport-delays|warm|30|7200|aviation:faa"
"crypto-quotes|warm|30|3600|market:crypto"
"stablecoin-markets|warm|30|3600|market:stablecoins"
"gulf-quotes|warm|30|3600|market:gulf-quotes"
"etf-flows|warm|30|3600|market:etf-flows"
"economy|warm|30|3600|economic:energy-prices"
"research|warm|30|3600|research:arxiv-hn-trending"
"unrest-events|warm|30|3600|unrest:events"
"usa-spending|warm|30|3600|economic:spending"
"supply-chain-trade|warm|30|3600|supply_chain:shipping"
"aviation|warm|30|3600|aviation:ops-news"
"internet-outages|warm|15|1800|infra:outages"
"infra|warm|30|3600|null"
"service-statuses|warm|30|3600|infra:service-statuses"
"military-maritime-news|warm|30|3600|null"
"sanctions-pressure|warm|30|43200|sanctions:pressure"
"forecasts|warm|60|6300|forecast:predictions"
# COLD (2-6 hours)
"cyber-threats|cold|120|10800|cyber:threats"
"climate-anomalies|cold|120|10800|climate:anomalies"
"thermal-escalation|cold|120|10800|thermal:escalation"
"gdelt-intel|cold|120|86400|intelligence:gdelt-intel"
"webcams|cold|360|86400|webcam:cameras:geo"
"iran-events|cold|360|172800|conflict:iran-events"
# FROZEN (12h-7d)
"bis-data|frozen|600|43200|economic:bis"
"displacement-summary|frozen|720|86400|displacement:summary"
"submarine-cables|frozen|1440|604800|infrastructure:submarine-cables"
"military-bases|frozen|1440|604800|null"
"ucdp-events|frozen|720|86400|conflict:ucdp-events"
"wb-indicators|frozen|720|86400|null"
)
TIER_ICONS=( "hot|🔥|5-15 min" "warm|🟡|30-60 min" "cold|🧊|2-6 hours" "frozen|🪨|12h-7d" )
TIER_CONCURRENCY=( "hot|3" "warm|5" "cold|3" "frozen|2" )
# ── Helpers ───────────────────────────────────────────────────────────────────
header() {
echo "🌍 World Monitor Seed Manager"
echo "══════════════════════════════════════════════════════════════"
echo
}
footer_line() {
echo "──────────────────────────────────────────────────────────────"
}
redis_get() {
curl -sf -H "Authorization: Bearer $REDIS_TOKEN" "$REDIS_URL/get/$(python3 -c "import urllib.parse; print(urllib.parse.quote('$1', safe=''))" 2>/dev/null || echo "$1")" 2>/dev/null
}
redis_scan() {
local pattern="$1" cursor="0" all_keys=""
while true; do
local resp
resp=$(curl -sf -H "Authorization: Bearer $REDIS_TOKEN" "$REDIS_URL/scan/$cursor?match=$(python3 -c "import urllib.parse; print(urllib.parse.quote('$pattern', safe=''))" 2>/dev/null || echo "$pattern")&count=200" 2>/dev/null)
[ -z "$resp" ] && break
cursor=$(echo "$resp" | jq -r '.result[0]')
local keys
keys=$(echo "$resp" | jq -r '.result[1][]' 2>/dev/null)
[ -n "$keys" ] && all_keys="$all_keys"$'\n'"$keys"
[ "$cursor" = "0" ] && break
done
echo "$all_keys" | grep -v '^$' | sort -u
}
redis_del() {
local key="$1"
curl -sf -X POST -H "Authorization: Bearer $REDIS_TOKEN" -H "Content-Type: application/json" \
-d "[\"DEL\",\"$key\"]" "$REDIS_URL" >/dev/null 2>&1
}
# Format seconds as human-readable age
format_age() {
local secs=$1
if (( secs < 0 )); then echo "just now"
elif (( secs < 60 )); then echo "${secs}s ago"
elif (( secs < 3600 )); then echo "$(( secs / 60 ))m ago"
elif (( secs < 86400 )); then echo "$(( secs / 3600 ))h ago"
else echo "$(( secs / 86400 ))d ago"
fi
}
# Format minutes as human-readable interval
format_interval() {
local min=$1
if (( min < 60 )); then printf "%3dm" "$min"
elif (( min < 1440 )); then printf "%3dh" "$(( min / 60 ))"
else printf "%3dd" "$(( min / 1440 ))"
fi
}
# Format seconds as human-readable TTL
format_ttl() {
local secs=$1
if (( secs < 3600 )); then printf "%3dm" "$(( secs / 60 ))"
elif (( secs < 86400 )); then printf "%3dh" "$(( secs / 3600 ))"
else printf "%3dd" "$(( secs / 86400 ))"
fi
}
# Get the seed-meta Redis key for a catalog entry
get_meta_key() {
local name="$1" meta_key="$2"
if [ "$meta_key" = "null" ]; then
echo "seed-meta:orchestrator:$name"
else
echo "seed-meta:$meta_key"
fi
}
# Find closest seeder name match for typo correction
suggest_seeder() {
local input="$1" best="" best_score=0
for entry in "${CATALOG[@]}"; do
local name="${entry%%|*}"
if [[ "$name" == *"$input"* ]] || [[ "$input" == *"$name"* ]]; then
echo "$name"
return
fi
done
# Fallback: longest common substring
for entry in "${CATALOG[@]}"; do
local name="${entry%%|*}"
local score=0
for (( i=0; i<${#input}; i++ )); do
if [[ "$name" == *"${input:$i:1}"* ]]; then
(( score++ ))
fi
done
if (( score > best_score )); then
best_score=$score
best="$name"
fi
done
echo "$best"
}
# ── Dependency check ──────────────────────────────────────────────────────────
check_deps() {
local missing=()
command -v "$DOCKER" >/dev/null 2>&1 || missing+=("docker or podman")
command -v curl >/dev/null 2>&1 || missing+=("curl")
command -v jq >/dev/null 2>&1 || missing+=("jq")
if (( ${#missing[@]} > 0 )); then
echo "❌ Missing required tools: ${missing[*]}"
exit 1
fi
}
check_container() {
if ! $DOCKER inspect "$CONTAINER" --format '{{.State.Running}}' 2>/dev/null | grep -q true; then
echo "❌ Container '$CONTAINER' is not running"
echo " Start it with: $DOCKER compose up -d"
exit 1
fi
}
check_redis() {
if ! curl -sf -H "Authorization: Bearer $REDIS_TOKEN" "$REDIS_URL/ping" >/dev/null 2>&1; then
echo "❌ Cannot reach Redis at $REDIS_URL — is the stack running?"
exit 1
fi
}
# ── Commands ──────────────────────────────────────────────────────────────────
cmd_help() {
cat <<'HELP'
🌍 World Monitor Seed Manager (wmsm)
Usage: ./wmsm.sh <command> [options]
Commands:
status 📊 Show freshness of all seeders
schedule ⏱️ Show the refresh schedule
refresh <name> 🔄 Force re-seed a specific seeder
refresh --all 🔄 Force re-seed everything (tiered)
flush 🗑️ Wipe all seed data and re-seed from scratch
logs [--follow|--all] 📋 Show orchestrator logs
help ❓ Show this help
Environment:
REDIS_URL Redis REST proxy URL (default: http://localhost:8079)
REDIS_TOKEN Redis REST auth token (default: wm-local-token)
WM_CONTAINER Docker container name (default: worldmonitor)
HELP
}
cmd_status() {
header
local now_ms
now_ms=$(date +%s%3N 2>/dev/null || echo "$(date +%s)000")
local count_healthy=0 count_stale=0 count_error=0 count_skipped=0
local current_tier=""
for entry in "${CATALOG[@]}"; do
IFS='|' read -r name tier interval_min ttl_sec meta_key <<< "$entry"
local redis_key
redis_key=$(get_meta_key "$name" "$meta_key")
# Print tier header on tier change
if [ "$tier" != "$current_tier" ]; then
[ -n "$current_tier" ] && echo
current_tier="$tier"
local icon="" label=""
for ti in "${TIER_ICONS[@]}"; do
IFS='|' read -r t i l <<< "$ti"
if [ "$t" = "$tier" ]; then icon="$i"; label="$l"; break; fi
done
echo "$icon ${tier^^} ($label)"
fi
# Fetch seed-meta from Redis
local raw
raw=$(redis_get "$redis_key" 2>/dev/null) || raw=""
local result
result=$(echo "$raw" | jq -r '.result // empty' 2>/dev/null) || result=""
if [ -z "$result" ] || [ "$result" = "null" ]; then
# No meta — skipped
printf " ⬚ %-25s no data\n" "$name"
(( count_skipped++ )) || true
continue
fi
# Parse meta fields (result is a JSON string, so parse it again)
local fetched_at record_count duration_ms status_field error_field
fetched_at=$(echo "$result" | jq -r '.fetchedAt // 0' 2>/dev/null) || fetched_at=0
record_count=$(echo "$result" | jq -r '.recordCount // "-"' 2>/dev/null) || record_count="-"
duration_ms=$(echo "$result" | jq -r '.durationMs // 0' 2>/dev/null) || duration_ms=0
status_field=$(echo "$result" | jq -r '.status // "ok"' 2>/dev/null) || status_field="ok"
error_field=$(echo "$result" | jq -r '.error // empty' 2>/dev/null) || error_field=""
# Calculate age
local age_sec=0
if (( fetched_at > 0 )); then
age_sec=$(( (${now_ms%???} - fetched_at / 1000) ))
(( age_sec < 0 )) && age_sec=0
fi
local age_str
age_str=$(format_age "$age_sec")
local duration_str
if (( duration_ms > 0 )); then
duration_str="$(awk "BEGIN {printf \"%.1f\", $duration_ms / 1000}")s"
else
duration_str="—"
fi
local items_str
if [ "$record_count" != "-" ] && [ "$record_count" != "null" ]; then
items_str="${record_count} items"
else
items_str="—"
fi
# Determine status icon
local icon
local interval_sec=$(( interval_min * 60 ))
if [ "$status_field" = "error" ] || [ "$status_field" = "timeout" ]; then
icon="❌"
(( count_error++ )) || true
elif (( age_sec > interval_sec )); then
icon="⚠️ "
(( count_stale++ )) || true
else
icon="✅"
(( count_healthy++ )) || true
fi
printf " %s %-25s %-12s %-14s %s\n" "$icon" "$name" "$age_str" "$items_str" "$duration_str"
done
echo
footer_line
echo "$count_healthy healthy ⚠️ $count_stale stale ❌ $count_error error ⏭️ $count_skipped skipped"
}
cmd_schedule() {
echo "🌍 World Monitor Seed Manager — Schedule"
echo "══════════════════════════════════════════════════════════════"
echo
local now_ms
now_ms=$(date +%s%3N 2>/dev/null || echo "$(date +%s)000")
local now_sec=${now_ms%???}
local count_scheduled=0 count_skipped=0
local current_tier=""
for entry in "${CATALOG[@]}"; do
IFS='|' read -r name tier interval_min ttl_sec meta_key <<< "$entry"
local redis_key
redis_key=$(get_meta_key "$name" "$meta_key")
# Print tier header on tier change
if [ "$tier" != "$current_tier" ]; then
[ -n "$current_tier" ] && echo
current_tier="$tier"
local icon=""
for ti in "${TIER_ICONS[@]}"; do
IFS='|' read -r t i l <<< "$ti"
if [ "$t" = "$tier" ]; then icon="$i"; break; fi
done
echo "$icon ${tier^^}"
fi
# Fetch seed-meta
local raw result fetched_at age_sec
raw=$(redis_get "$redis_key" 2>/dev/null) || raw=""
result=$(echo "$raw" | jq -r '.result // empty' 2>/dev/null) || result=""
if [ -z "$result" ] || [ "$result" = "null" ]; then
printf " %-25s every %s TTL %s ⏭️ no data\n" "$name" "$(format_interval "$interval_min")" "$(format_ttl "$ttl_sec")"
(( count_skipped++ )) || true
continue
fi
fetched_at=$(echo "$result" | jq -r '.fetchedAt // 0' 2>/dev/null) || fetched_at=0
if (( fetched_at > 0 )); then
age_sec=$(( now_sec - fetched_at / 1000 ))
(( age_sec < 0 )) && age_sec=0
else
age_sec=0
fi
local age_str
age_str=$(format_age "$age_sec")
# Calculate next run estimate
local interval_sec=$(( interval_min * 60 ))
local remaining=$(( interval_sec - age_sec ))
local next_str
if (( remaining <= 0 )); then
next_str="overdue"
elif (( remaining < 60 )); then
next_str="~${remaining}s"
elif (( remaining < 3600 )); then
next_str="~$(( remaining / 60 ))m"
else
next_str="~$(( remaining / 3600 ))h"
fi
printf " %-25s every %s TTL %s last %-12s next %s\n" \
"$name" "$(format_interval "$interval_min")" "$(format_ttl "$ttl_sec")" "$age_str" "$next_str"
(( count_scheduled++ )) || true
done
echo
footer_line
echo "⏱️ $count_scheduled scheduled ⏭️ $count_skipped skipped"
}
cmd_refresh() {
local target="${1:-}"
if [ -z "$target" ]; then
echo "❌ Usage: ./wmsm.sh refresh <name> or ./wmsm.sh refresh --all"
exit 1
fi
if [ "$target" = "--all" ]; then
cmd_refresh_all
return
fi
# Validate seeder name
local found=false
for entry in "${CATALOG[@]}"; do
local name="${entry%%|*}"
if [ "$name" = "$target" ]; then
found=true
break
fi
done
if [ "$found" = false ]; then
echo "❌ Unknown seeder: $target"
local suggestion
suggestion=$(suggest_seeder "$target")
if [ -n "$suggestion" ]; then
echo "💡 Did you mean: $suggestion?"
fi
exit 1
fi
header
echo "🔄 Refreshing $target..."
local start_sec
start_sec=$(date +%s)
if $DOCKER exec "$CONTAINER" node "scripts/seed-${target}.mjs"; then
local dur=$(( $(date +%s) - start_sec ))
echo " ✅ Done in ${dur}s"
else
local code=$?
local dur=$(( $(date +%s) - start_sec ))
echo " ❌ Failed (exit code $code) in ${dur}s"
exit 1
fi
}
cmd_refresh_all() {
header
echo "🔄 Refreshing all seeders (tiered)..."
echo
local total_ok=0 total_err=0 total_skip=0
for tier_order in hot warm cold frozen; do
# Get concurrency for this tier
local concurrency=3
for tc in "${TIER_CONCURRENCY[@]}"; do
IFS='|' read -r t c <<< "$tc"
if [ "$t" = "$tier_order" ]; then concurrency=$c; break; fi
done
# Get tier icon
local icon=""
for ti in "${TIER_ICONS[@]}"; do
IFS='|' read -r t i l <<< "$ti"
if [ "$t" = "$tier_order" ]; then icon="$i"; break; fi
done
# Collect seeders for this tier
local tier_seeders=()
for entry in "${CATALOG[@]}"; do
IFS='|' read -r name tier interval_min ttl_sec meta_key <<< "$entry"
if [ "$tier" = "$tier_order" ]; then
tier_seeders+=("$name")
fi
done
(( ${#tier_seeders[@]} == 0 )) && continue
echo "$icon ${tier_order^^} (${#tier_seeders[@]} seeders, $concurrency at a time)"
# Run in batches
local idx=0
while (( idx < ${#tier_seeders[@]} )); do
local pids=() names=() starts=()
local batch_size=0
while (( batch_size < concurrency && idx < ${#tier_seeders[@]} )); do
local sname="${tier_seeders[$idx]}"
local start_sec
start_sec=$(date +%s)
$DOCKER exec "$CONTAINER" node "scripts/seed-${sname}.mjs" >/dev/null 2>&1 &
pids+=($!)
names+=("$sname")
starts+=("$start_sec")
(( idx++ )) || true
(( batch_size++ )) || true
done
# Wait for batch
for i in "${!pids[@]}"; do
if wait "${pids[$i]}" 2>/dev/null; then
local dur=$(( $(date +%s) - ${starts[$i]} ))
echo "${names[$i]} (${dur}s)"
(( total_ok++ )) || true
else
local dur=$(( $(date +%s) - ${starts[$i]} ))
echo "${names[$i]} (${dur}s)"
(( total_err++ )) || true
fi
done
done
echo
done
footer_line
echo "Done: $total_ok$total_err$total_skip ⏭️"
}
cmd_flush() {
echo "⚠️ This will delete ALL seed data and metadata from Redis."
echo " The orchestrator will perform a full cold start re-seed."
echo
read -rp " Type 'flush' to confirm: " confirm
if [ "$confirm" != "flush" ]; then
echo " Cancelled."
exit 0
fi
echo
echo "🗑️ Flushing seed data..."
# Delete seed-meta keys
local meta_keys
meta_keys=$(redis_scan "seed-meta:*")
local meta_count=0
while IFS= read -r key; do
[ -z "$key" ] && continue
redis_del "$key"
(( meta_count++ )) || true
done <<< "$meta_keys"
echo " Deleted $meta_count seed-meta keys"
# Delete seed-lock keys
local lock_keys
lock_keys=$(redis_scan "seed-lock:*")
local lock_count=0
while IFS= read -r key; do
[ -z "$key" ] && continue
redis_del "$key"
(( lock_count++ )) || true
done <<< "$lock_keys"
echo " Deleted $lock_count seed-lock keys"
echo
echo "🔄 Restarting orchestrator for cold start..."
$DOCKER restart "$CONTAINER" >/dev/null 2>&1
echo " ✅ Container restarting — run ./wmsm.sh logs --follow to watch"
}
cmd_logs() {
local mode="${1:---filter}"
case "$mode" in
--follow|-f)
$DOCKER logs -f "$CONTAINER" 2>&1 | grep --line-buffered '\[orchestrator\]\|\[seed:'
;;
--all|-a)
$DOCKER logs "$CONTAINER" 2>&1
;;
*)
$DOCKER logs "$CONTAINER" 2>&1 | grep '\[orchestrator\]\|\[seed:'
;;
esac
}
# ── Main dispatcher ──────────────────────────────────────────────────────────
main() {
local cmd="${1:-help}"
shift 2>/dev/null || true
if [ "$cmd" = "help" ] || [ "$cmd" = "--help" ] || [ "$cmd" = "-h" ]; then
cmd_help
exit 0
fi
check_deps
check_container
case "$cmd" in
status) check_redis; cmd_status "$@" ;;
schedule) check_redis; cmd_schedule "$@" ;;
refresh) check_redis; cmd_refresh "$@" ;;
flush) check_redis; cmd_flush "$@" ;;
logs) cmd_logs "$@" ;;
*)
echo "❌ Unknown command: $cmd"
echo " Run ./wmsm.sh help for usage"
exit 1
;;
esac
}
main "$@"