From 429b1d99dd0048ae39157b7d96dd183fb1607f10 Mon Sep 17 00:00:00 2001 From: Elie Habib Date: Sun, 22 Mar 2026 19:59:42 +0400 Subject: [PATCH] =?UTF-8?q?Revert=20"feat:=20seed=20orchestrator=20with=20?= =?UTF-8?q?auto-seeding,=20persistence,=20and=20managemen=E2=80=A6"=20(#20?= =?UTF-8?q?60)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit bb79386d24493a934bb581407f888572cb13729a. --- .env.example | 57 -- Dockerfile | 7 - docker-compose.yml | 24 +- docker/supervisord.conf | 12 - scripts/seed-config.mjs | 71 --- scripts/seed-infra.mjs | 4 +- scripts/seed-insights.mjs | 4 +- scripts/seed-military-maritime-news.mjs | 4 +- scripts/seed-orchestrator.mjs | 552 ------------------ scripts/seed-service-statuses.mjs | 5 +- scripts/seed-utils/logger.mjs | 16 - scripts/seed-utils/meta.mjs | 50 -- scripts/seed-utils/runner.mjs | 64 --- tests/seed-orchestrator-scheduling.test.mjs | 157 ----- tests/seed-orchestrator.test.mjs | 209 ------- wmsm.sh | 602 -------------------- 16 files changed, 5 insertions(+), 1833 deletions(-) delete mode 100644 scripts/seed-config.mjs delete mode 100644 scripts/seed-orchestrator.mjs delete mode 100644 scripts/seed-utils/logger.mjs delete mode 100644 scripts/seed-utils/meta.mjs delete mode 100644 scripts/seed-utils/runner.mjs delete mode 100644 tests/seed-orchestrator-scheduling.test.mjs delete mode 100644 tests/seed-orchestrator.test.mjs delete mode 100755 wmsm.sh diff --git a/.env.example b/.env.example index 49d80e00a..f353b832c 100644 --- a/.env.example +++ b/.env.example @@ -208,63 +208,6 @@ RELAY_METRICS_WINDOW_SECONDS=60 CORRIDOR_RISK_API_KEY= -# ------ Weather Intelligence (Docker seeder) ------ - -# Windy.com API (wind/weather overlays — optional) -# Register at: https://api.windy.com/ -WINDY_API_KEY= - - -# ------ Cryptocurrency Data (Docker seeder) ------ - -# CoinGecko API (crypto prices — optional, free tier available) -# Register at: https://www.coingecko.com/en/api -COINGECKO_API_KEY= - - -# ------ Trade & Tariff Data (Docker seeder) ------ - -# World Trade Organization API (trade policy data — optional) -# Register at: https://apiportal.wto.org/ -WTO_API_KEY= - - -# ------ Proxy Authentication (Docker seeder) ------ - -# OpenSky proxy auth token (for self-hosted relay proxy) -OPENSKY_PROXY_AUTH= - -# Oref (Israel Home Front Command) proxy auth token -OREF_PROXY_AUTH= - - -# ------ Cyber Threat Intelligence (Docker seeder) ------ - -# AbuseIPDB (IP reputation data — free tier: 1000 checks/day) -# Register at: https://www.abuseipdb.com/ -ABUSEIPDB_API_KEY= - -# AlienVault OTX (Open Threat Exchange — free) -# Register at: https://otx.alienvault.com/ -OTX_API_KEY= - -# URLhaus auth key (malware URL feed — free) -# Register at: https://urlhaus.abuse.ch/ -URLHAUS_AUTH_KEY= - - -# ------ Ollama API Key (Docker seeder) ------ - -# API key for Ollama endpoints that require authentication -OLLAMA_API_KEY= - - -# ------ Cloudflare R2 Token (Docker seeder) ------ - -# R2 API token for seed scripts that write to R2 storage -CLOUDFLARE_R2_TOKEN= - - # ------ Public Data Sources (no keys required) ------ # UNHCR (UN Refugee Agency) — public API, no auth (CC BY 4.0) diff --git a/Dockerfile b/Dockerfile index 77494a560..d72880e58 100644 --- a/Dockerfile +++ b/Dockerfile @@ -47,13 +47,6 @@ COPY --from=builder /app/api ./api # Static data files used by handlers at runtime COPY --from=builder /app/data ./data -# Seed scripts + orchestrator + shared utils -COPY --from=builder /app/scripts ./scripts -COPY --from=builder /app/shared ./shared - -# Node modules needed by seed scripts (e.g. fast-xml-parser for sanctions) -COPY --from=builder /app/node_modules ./node_modules - # Built frontend static files COPY --from=builder /app/dist /usr/share/nginx/html diff --git a/docker-compose.yml b/docker-compose.yml index f17786c36..5b348e17d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,28 +41,6 @@ services: NASA_FIRMS_API_KEY: "${NASA_FIRMS_API_KEY:-}" CLOUDFLARE_API_TOKEN: "${CLOUDFLARE_API_TOKEN:-}" AVIATIONSTACK_API: "${AVIATIONSTACK_API:-}" - # Seeder API keys — all optional, features degrade gracefully without them - WINDY_API_KEY: "${WINDY_API_KEY:-}" - COINGECKO_API_KEY: "${COINGECKO_API_KEY:-}" - ICAO_API_KEY: "${ICAO_API_KEY:-}" - WTO_API_KEY: "${WTO_API_KEY:-}" - WINGBITS_API_KEY: "${WINGBITS_API_KEY:-}" - OPENSKY_CLIENT_ID: "${OPENSKY_CLIENT_ID:-}" - OPENSKY_CLIENT_SECRET: "${OPENSKY_CLIENT_SECRET:-}" - OPENSKY_PROXY_AUTH: "${OPENSKY_PROXY_AUTH:-}" - OREF_PROXY_AUTH: "${OREF_PROXY_AUTH:-}" - UCDP_ACCESS_TOKEN: "${UCDP_ACCESS_TOKEN:-}" - ACLED_EMAIL: "${ACLED_EMAIL:-}" - ACLED_PASSWORD: "${ACLED_PASSWORD:-}" - ABUSEIPDB_API_KEY: "${ABUSEIPDB_API_KEY:-}" - OTX_API_KEY: "${OTX_API_KEY:-}" - URLHAUS_AUTH_KEY: "${URLHAUS_AUTH_KEY:-}" - OLLAMA_API_KEY: "${OLLAMA_API_KEY:-}" - OLLAMA_MODEL: "${OLLAMA_MODEL:-}" - CLOUDFLARE_R2_ACCOUNT_ID: "${CLOUDFLARE_R2_ACCOUNT_ID:-}" - CLOUDFLARE_R2_TOKEN: "${CLOUDFLARE_R2_TOKEN:-}" - # Testing - SEED_TURBO: "${SEED_TURBO:-}" # Docker secrets (recommended for API keys — keeps them out of docker inspect). # Create secrets/ dir with one file per key, then uncomment below. # See SELF_HOSTING.md or docker-compose.override.yml for details. @@ -94,7 +72,7 @@ services: redis: image: docker.io/redis:7-alpine container_name: worldmonitor-redis - command: redis-server --maxmemory ${REDIS_MAXMEMORY:-512mb} --maxmemory-policy allkeys-lru --save 300 10 --save 900 1 + command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru volumes: - redis-data:/data restart: unless-stopped diff --git a/docker/supervisord.conf b/docker/supervisord.conf index 00e3adbeb..456f23cf4 100644 --- a/docker/supervisord.conf +++ b/docker/supervisord.conf @@ -22,15 +22,3 @@ stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 - -[program:seed-orchestrator] -command=node /app/scripts/seed-orchestrator.mjs -directory=/app -autostart=true -autorestart=true -startretries=5 -startsecs=5 -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 diff --git a/scripts/seed-config.mjs b/scripts/seed-config.mjs deleted file mode 100644 index 5df93b6d6..000000000 --- a/scripts/seed-config.mjs +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Central seed catalog — defines all 42 seeders with their scheduling parameters. - * - * Fields: - * tier — 'hot' | 'warm' | 'cold' | 'frozen' - * intervalMin — refresh interval in minutes (always < ttlSec / 60) - * ttlSec — Redis TTL in seconds (how long data is valid) - * ttlSource — 'source' = TTL from upstream API/script constant, 'inferred' = our estimate - * requiredKeys — env vars required for the seeder to run (excluding UPSTASH_REDIS_*) - * metaKey — seed-meta key pattern (domain:resource) for freshness tracking. - * Audited from runSeed(domain, resource) / writeFreshnessMetadata() calls. - * null = seeder doesn't write seed-meta; orchestrator writes its own. - */ - -export const TIER_ORDER = ['hot', 'warm', 'cold', 'frozen']; - -export const TIER_CONCURRENCY = { hot: 3, warm: 5, cold: 3, frozen: 2 }; - -export const STEADY_STATE_CONCURRENCY = 5; - -export const SEED_CATALOG = { - // ── HOT (5-15 min) ───────────────────────────────────────────────── - 'weather-alerts': { tier: 'hot', intervalMin: 5, ttlSec: 900, ttlSource: 'source', requiredKeys: [], metaKey: 'weather:alerts' }, - 'correlation': { tier: 'hot', intervalMin: 5, ttlSec: 1200, ttlSource: 'source', requiredKeys: [], metaKey: 'correlation:cards' }, - 'prediction-markets': { tier: 'hot', intervalMin: 10, ttlSec: 1800, ttlSource: 'source', requiredKeys: [], metaKey: 'prediction:markets' }, - 'commodity-quotes': { tier: 'hot', intervalMin: 10, ttlSec: 1800, ttlSource: 'source', requiredKeys: [], metaKey: 'market:commodities' }, - 'market-quotes': { tier: 'hot', intervalMin: 10, ttlSec: 1800, ttlSource: 'source', requiredKeys: ['FINNHUB_API_KEY'], metaKey: 'market:quotes' }, - 'insights': { tier: 'hot', intervalMin: 15, ttlSec: 1800, ttlSource: 'source', requiredKeys: [], metaKey: 'news:insights' }, - 'military-flights': { tier: 'hot', intervalMin: 5, ttlSec: 600, ttlSource: 'source', requiredKeys: [], metaKey: 'military:flights' }, - 'conflict-intel': { tier: 'hot', intervalMin: 10, ttlSec: 900, ttlSource: 'source', requiredKeys: [], metaKey: 'conflict:acled-intel' }, // accepts ACLED_EMAIL+PASSWORD or ACLED_ACCESS_TOKEN - - // ── WARM (30-60 min) ─────────────────────────────────────────────── - 'earthquakes': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'seismology:earthquakes' }, - 'security-advisories': { tier: 'warm', intervalMin: 30, ttlSec: 7200, ttlSource: 'source', requiredKeys: [], metaKey: 'intelligence:advisories' }, - 'fire-detections': { tier: 'warm', intervalMin: 30, ttlSec: 7200, ttlSource: 'inferred', requiredKeys: ['NASA_FIRMS_API_KEY'], metaKey: 'wildfire:fires' }, - 'natural-events': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'natural:events' }, - 'radiation-watch': { tier: 'warm', intervalMin: 30, ttlSec: 7200, ttlSource: 'source', requiredKeys: [], metaKey: 'radiation:observations' }, - 'airport-delays': { tier: 'warm', intervalMin: 30, ttlSec: 7200, ttlSource: 'source', requiredKeys: [], metaKey: 'aviation:faa' }, - 'crypto-quotes': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'market:crypto' }, - 'stablecoin-markets': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'market:stablecoins' }, - 'gulf-quotes': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'market:gulf-quotes' }, - 'etf-flows': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'market:etf-flows' }, - 'economy': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: ['FRED_API_KEY'], metaKey: 'economic:energy-prices' }, - 'research': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'research:arxiv-hn-trending' }, - 'unrest-events': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'unrest:events' }, - 'usa-spending': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'economic:spending' }, - 'supply-chain-trade': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: ['FRED_API_KEY'], metaKey: 'supply_chain:shipping' }, - 'aviation': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'inferred', requiredKeys: ['AVIATIONSTACK_API'], metaKey: 'aviation:ops-news' }, - 'internet-outages': { tier: 'warm', intervalMin: 15, ttlSec: 1800, ttlSource: 'source', requiredKeys: ['CLOUDFLARE_API_TOKEN'], metaKey: 'infra:outages' }, - 'infra': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'inferred', requiredKeys: [], metaKey: null }, - 'service-statuses': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'inferred', requiredKeys: [], metaKey: 'infra:service-statuses' }, - 'military-maritime-news': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'inferred', requiredKeys: [], metaKey: null }, - 'sanctions-pressure': { tier: 'warm', intervalMin: 30, ttlSec: 43200, ttlSource: 'source', requiredKeys: [], metaKey: 'sanctions:pressure' }, - 'forecasts': { tier: 'warm', intervalMin: 60, ttlSec: 6300, ttlSource: 'source', requiredKeys: [], metaKey: 'forecast:predictions' }, - - // ── COLD (2-6 hours) ─────────────────────────────────────────────── - 'cyber-threats': { tier: 'cold', intervalMin: 120, ttlSec: 10800, ttlSource: 'source', requiredKeys: [], metaKey: 'cyber:threats' }, - 'climate-anomalies': { tier: 'cold', intervalMin: 120, ttlSec: 10800, ttlSource: 'source', requiredKeys: [], metaKey: 'climate:anomalies' }, - 'thermal-escalation': { tier: 'cold', intervalMin: 120, ttlSec: 10800, ttlSource: 'source', requiredKeys: [], metaKey: 'thermal:escalation' }, - 'gdelt-intel': { tier: 'cold', intervalMin: 120, ttlSec: 86400, ttlSource: 'source', requiredKeys: [], metaKey: 'intelligence:gdelt-intel' }, - 'webcams': { tier: 'cold', intervalMin: 360, ttlSec: 86400, ttlSource: 'inferred', requiredKeys: ['WINDY_API_KEY'], metaKey: 'webcam:cameras:geo' }, - 'iran-events': { tier: 'cold', intervalMin: 360, ttlSec: 172800, ttlSource: 'source', requiredKeys: [], metaKey: 'conflict:iran-events' }, - - // ── FROZEN (12h-7d) ──────────────────────────────────────────────── - 'bis-data': { tier: 'frozen', intervalMin: 600, ttlSec: 43200, ttlSource: 'source', requiredKeys: [], metaKey: 'economic:bis' }, - 'displacement-summary': { tier: 'frozen', intervalMin: 720, ttlSec: 86400, ttlSource: 'source', requiredKeys: [], metaKey: 'displacement:summary' }, - 'submarine-cables': { tier: 'frozen', intervalMin: 1440, ttlSec: 604800, ttlSource: 'source', requiredKeys: [], metaKey: 'infrastructure:submarine-cables' }, - 'military-bases': { tier: 'frozen', intervalMin: 1440, ttlSec: 604800, ttlSource: 'inferred', requiredKeys: [], metaKey: null }, - 'ucdp-events': { tier: 'frozen', intervalMin: 720, ttlSec: 86400, ttlSource: 'inferred', requiredKeys: [], metaKey: 'conflict:ucdp-events' }, - 'wb-indicators': { tier: 'frozen', intervalMin: 720, ttlSec: 86400, ttlSource: 'inferred', requiredKeys: [], metaKey: null }, -}; diff --git a/scripts/seed-infra.mjs b/scripts/seed-infra.mjs index 1056d64c8..94f2b9814 100755 --- a/scripts/seed-infra.mjs +++ b/scripts/seed-infra.mjs @@ -21,9 +21,7 @@ import { loadEnvFile, CHROME_UA } from './_seed-utils.mjs'; loadEnvFile(import.meta.url); -const API_BASE = process.env.LOCAL_API_MODE === 'docker' - ? `http://localhost:${process.env.LOCAL_API_PORT || 46123}` - : 'https://api.worldmonitor.app'; +const API_BASE = 'https://api.worldmonitor.app'; const TIMEOUT = 30_000; async function warmPing(name, path) { diff --git a/scripts/seed-insights.mjs b/scripts/seed-insights.mjs index 93dff398a..bd304ab28 100644 --- a/scripts/seed-insights.mjs +++ b/scripts/seed-insights.mjs @@ -187,9 +187,7 @@ function categorizeStory(title) { } async function warmDigestCache() { - const apiBase = process.env.API_BASE_URL - || (process.env.LOCAL_API_MODE === 'docker' ? `http://localhost:${process.env.LOCAL_API_PORT || 46123}` : null) - || 'https://api.worldmonitor.app'; + const apiBase = process.env.API_BASE_URL || 'https://api.worldmonitor.app'; try { const resp = await fetch(`${apiBase}/api/news/v1/list-feed-digest?variant=full&lang=en`, { headers: { 'User-Agent': CHROME_UA }, diff --git a/scripts/seed-military-maritime-news.mjs b/scripts/seed-military-maritime-news.mjs index 24c9f559f..ea89459aa 100755 --- a/scripts/seed-military-maritime-news.mjs +++ b/scripts/seed-military-maritime-news.mjs @@ -24,9 +24,7 @@ import { loadEnvFile, CHROME_UA } from './_seed-utils.mjs'; loadEnvFile(import.meta.url); -const API_BASE = process.env.LOCAL_API_MODE === 'docker' - ? `http://localhost:${process.env.LOCAL_API_PORT || 46123}` - : 'https://api.worldmonitor.app'; +const API_BASE = 'https://api.worldmonitor.app'; const TIMEOUT = 30_000; async function warmPing(name, path, body = {}) { diff --git a/scripts/seed-orchestrator.mjs b/scripts/seed-orchestrator.mjs deleted file mode 100644 index e6bd449d4..000000000 --- a/scripts/seed-orchestrator.mjs +++ /dev/null @@ -1,552 +0,0 @@ -#!/usr/bin/env node - -/** - * Seed orchestrator — ties together the catalog, runner, freshness checks, - * tiered cold start, and recurring scheduling with graceful shutdown. - * - * Exports `classifySeeders` and `buildStartupSummary` for testing (pure functions). - * Redis and all side-effects are deferred to `main()`. - */ - -import { dirname, join } from 'node:path'; -import { fileURLToPath } from 'node:url'; -import { createLogger } from './seed-utils/logger.mjs'; -import { parseFreshness, isFresh, buildMeta } from './seed-utils/meta.mjs'; -import { forkSeeder } from './seed-utils/runner.mjs'; -import { - SEED_CATALOG, - TIER_ORDER, - TIER_CONCURRENCY, - STEADY_STATE_CONCURRENCY, -} from './seed-config.mjs'; -import { getRedisCredentials } from './_seed-utils.mjs'; - -/** - * Dry-run mock seeder — simulates a seeder with random sleep and 10% failure rate. - * Used when SEED_TURBO=dry. - */ -async function dryRunSeeder(name) { - const sleepMs = 100 + Math.random() * 400; - await new Promise((r) => setTimeout(r, sleepMs)); - const fail = Math.random() < 0.1; - return { - name, - exitCode: fail ? 1 : 0, - status: fail ? 'error' : 'ok', - durationMs: Math.round(sleepMs), - }; -} - -// ──────────────────────────────────────────────────────────────────────────── -// Thin Redis helpers — use the same REST API as _seed-utils.mjs's private -// redisGet/redisSet but scoped to orchestrator needs. We reuse -// getRedisCredentials() (deferred to main()) rather than duplicating env reads. -// ──────────────────────────────────────────────────────────────────────────── - -async function redisGet(url, token, key) { - const resp = await fetch(`${url}/get/${encodeURIComponent(key)}`, { - headers: { Authorization: `Bearer ${token}` }, - signal: AbortSignal.timeout(5_000), - }); - if (!resp.ok) return null; - const data = await resp.json(); - return data.result ? JSON.parse(data.result) : null; -} - -async function redisSet(url, token, key, value, ttlSeconds) { - const payload = JSON.stringify(value); - const cmd = ttlSeconds - ? ['SET', key, payload, 'EX', ttlSeconds] - : ['SET', key, payload]; - const resp = await fetch(url, { - method: 'POST', - headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' }, - body: JSON.stringify(cmd), - signal: AbortSignal.timeout(15_000), - }); - if (!resp.ok) { - const text = await resp.text().catch(() => ''); - throw new Error(`Redis SET failed: HTTP ${resp.status} — ${text.slice(0, 200)}`); - } - return resp.json(); -} - -const __dirname = dirname(fileURLToPath(import.meta.url)); -const log = createLogger('orchestrator'); - -let RETRY_DELAY_MS = 60_000; -const MAX_CONSECUTIVE_FAILURES = 5; -const SHUTDOWN_TIMEOUT_MS = 15_000; -const REDIS_PING_INTERVAL_MS = 3_000; -const REDIS_PING_MAX_ATTEMPTS = 20; - -// ──────────────────────────────────────────────────────────────────────────── -// Pure, testable functions -// ──────────────────────────────────────────────────────────────────────────── - -/** - * Classify seeders into active (runnable) and skipped (missing env vars). - * @param {Record} catalog - * @param {Record} env — process.env or a subset - * @returns {{ active: Array<{ name: string, tier: string, intervalMin: number, ttlSec: number, metaKey: string|null }>, skipped: Array<{ name: string, reason: string }> }} - */ -export function classifySeeders(catalog, env = process.env) { - const active = []; - const skipped = []; - - for (const [name, cfg] of Object.entries(catalog)) { - const missing = cfg.requiredKeys.filter((k) => !env[k]); - if (missing.length > 0) { - skipped.push({ name, reason: `missing ${missing.join(', ')}` }); - } else { - active.push({ - name, - tier: cfg.tier, - intervalMin: cfg.intervalMin, - ttlSec: cfg.ttlSec, - metaKey: cfg.metaKey, - }); - } - } - - return { active, skipped }; -} - -/** - * Build a human-readable startup summary. - * @param {Array<{ name: string, tier: string }>} active - * @param {Array<{ name: string, reason: string }>} skipped - * @param {number} freshCount — number of seeders with fresh data - * @returns {string} - */ -export function buildStartupSummary(active, skipped, freshCount) { - const lines = []; - lines.push(''); - lines.push('=== Seed Orchestrator Startup ==='); - lines.push(''); - lines.push(` ACTIVE (${active.length})`); - for (const tier of TIER_ORDER) { - const inTier = active.filter((s) => s.tier === tier); - if (inTier.length > 0) { - lines.push(` ${tier}: ${inTier.map((s) => s.name).join(', ')}`); - } - } - lines.push(''); - lines.push(` SKIPPED (${skipped.length})`); - for (const s of skipped) { - lines.push(` ${s.name}: ${s.reason}`); - } - lines.push(''); - lines.push(` ${freshCount}/${active.length} seeders have fresh data`); - lines.push(''); - return lines.join('\n'); -} - -/** - * Compute effective interval with turbo mode and failure demotion. - * @param {number} intervalMin — base interval from catalog - * @param {number} failureCount — consecutive failures for this seeder - * @param {string|undefined} turboMode — 'real', 'dry', or undefined - * @returns {number} — interval in milliseconds - */ -export function getEffectiveInterval(intervalMin, failureCount, turboMode) { - const div = turboMode ? 20 : 1; - let min = Math.max(1, Math.round(intervalMin / div)); - if (failureCount >= MAX_CONSECUTIVE_FAILURES) min *= 2; - return min * 60_000; -} - -/** - * Check if a seeder should skip this cycle (overlap protection). - * @param {string} name - * @param {Set} inFlight - * @returns {boolean} - */ -export function shouldSkipCycle(name, inFlight) { - return inFlight.has(name); -} - -/** - * Compute turbo-adjusted interval in minutes. - * @param {number} intervalMin - * @param {string|undefined} turboMode - * @returns {number} - */ -export function computeTurboInterval(intervalMin, turboMode) { - if (!turboMode) return intervalMin; - return Math.max(1, Math.round(intervalMin / 20)); -} - -// ──────────────────────────────────────────────────────────────────────────── -// Orchestrator runtime (only used when executed directly) -// ──────────────────────────────────────────────────────────────────────────── - -/** - * Wait for the Redis REST proxy to become reachable. - */ -async function waitForRedis(url, token) { - for (let i = 0; i < REDIS_PING_MAX_ATTEMPTS; i++) { - try { - const resp = await fetch(`${url}/ping`, { - headers: { Authorization: `Bearer ${token}` }, - signal: AbortSignal.timeout(3_000), - }); - if (resp.ok) { - log.info('Redis is reachable'); - return; - } - } catch { - // retry - } - if (i < REDIS_PING_MAX_ATTEMPTS - 1) { - log.info(`Waiting for Redis... (attempt ${i + 1}/${REDIS_PING_MAX_ATTEMPTS})`); - await new Promise((r) => setTimeout(r, REDIS_PING_INTERVAL_MS)); - } - } - throw new Error(`Redis not reachable after ${REDIS_PING_MAX_ATTEMPTS} attempts`); -} - -/** - * Fetch freshness metadata for all active seeders. - */ -async function fetchFreshnessMap(activeSeeders, url, token) { - const map = new Map(); - await Promise.all( - activeSeeders.map(async (s) => { - const key = s.metaKey ? `seed-meta:${s.metaKey}` : `seed-meta:orchestrator:${s.name}`; - try { - const raw = await redisGet(url, token, key); - const meta = parseFreshness(raw); - if (meta) map.set(s.name, meta); - } catch { - // missing/error — treat as stale - } - }), - ); - return map; -} - -/** - * Resolve the meta key to check/write for a given seeder. - */ -function resolveMetaKey(seeder) { - return seeder.metaKey ? `seed-meta:${seeder.metaKey}` : `seed-meta:orchestrator:${seeder.name}`; -} - -/** - * Execute a single seeder: fork the child process, handle meta writing. - */ -async function executeSeed(seeder, url, token, turboMode) { - const scriptPath = join(__dirname, `seed-${seeder.name}.mjs`); - log.info(`Running ${seeder.name}...`); - - // Read meta before fork so we can detect if the seeder updated it - let metaBefore = null; - const metaKey = resolveMetaKey(seeder); - if (turboMode !== 'dry') { - try { - metaBefore = await redisGet(url, token, metaKey); - } catch { - // ignore - } - } - - const timeoutMs = turboMode === 'dry' ? 30_000 : 120_000; - const result = turboMode === 'dry' - ? await dryRunSeeder(seeder.name) - : await forkSeeder(seeder.name, { - scriptPath: process.execPath, - args: [scriptPath], - timeoutMs, - }); - - log.info(`${seeder.name} finished: ${result.status} (${result.durationMs}ms)`); - - // Meta writing logic (skip in dry mode) - if (turboMode !== 'dry') { - if (result.status === 'ok') { - if (seeder.metaKey) { - // Check if the seeder already updated its own meta - try { - const metaAfter = await redisGet(url, token, metaKey); - const parsed = parseFreshness(metaAfter); - const parsedBefore = parseFreshness(metaBefore); - const alreadyUpdated = - parsed && - parsedBefore && - parsed.fetchedAt > parsedBefore.fetchedAt; - const freshlyWritten = parsed && !parsedBefore; - if (!alreadyUpdated && !freshlyWritten) { - // Seeder didn't update meta — write it ourselves - const meta = buildMeta(result.durationMs, 'ok'); - await redisSet(url, token, metaKey, meta, 86400 * 7); - } - } catch { - // Best effort - } - } else { - // metaKey is null — always write orchestrator meta - const meta = buildMeta(result.durationMs, 'ok'); - await redisSet(url, token, metaKey, meta, 86400 * 7); - } - } else { - // Write error meta for null-metaKey seeders - if (!seeder.metaKey) { - const meta = buildMeta(result.durationMs, 'error', `exit ${result.exitCode ?? result.status}`); - await redisSet(url, token, metaKey, meta, 86400 * 7); - } - } - } - - return result; -} - -/** - * Run a batch of seeders with concurrency control. - */ -async function runBatch(seeders, concurrency, executeFn) { - const results = []; - for (let i = 0; i < seeders.length; i += concurrency) { - const batch = seeders.slice(i, i + concurrency); - const batchResults = await Promise.all(batch.map(executeFn)); - results.push(...batchResults); - } - return results; -} - -/** - * Run tiered cold start: process tiers in order with tier-specific concurrency. - * Skip seeders that already have fresh data. - */ -async function tieredColdStart(activeSeeders, freshnessMap, url, token, turboMode) { - log.info('Starting tiered cold start...'); - - for (const tier of TIER_ORDER) { - const tierSeeders = activeSeeders.filter((s) => s.tier === tier); - // Filter out fresh seeders - const stale = tierSeeders.filter((s) => { - const meta = freshnessMap.get(s.name); - return !isFresh(meta, s.intervalMin); - }); - - if (stale.length === 0) { - log.info(`Tier ${tier}: all ${tierSeeders.length} seeders fresh, skipping`); - continue; - } - - const concurrency = TIER_CONCURRENCY[tier]; - log.info(`Tier ${tier}: running ${stale.length} stale seeders (concurrency ${concurrency})`); - - await runBatch(stale, concurrency, (s) => executeSeed(s, url, token, turboMode)); - } - - log.info('Tiered cold start complete'); -} - -/** - * Steady-state scheduler: recurring timers with overlap protection, retry, - * consecutive failure demotion, and global concurrency queue. - */ -function scheduleSeeders(activeSeeders, url, token, state) { - const { timers, inFlight, failureCounts, queue } = state; - - function getInterval(seeder) { - const failures = failureCounts.get(seeder.name) || 0; - return getEffectiveInterval(seeder.intervalMin, failures, state.turboMode); - } - - function drainQueue() { - while (queue.length > 0 && inFlight.size < STEADY_STATE_CONCURRENCY) { - const next = queue.shift(); - runScheduled(next); - } - } - - async function runScheduled(seeder) { - inFlight.add(seeder.name); - try { - const result = await executeSeed(seeder, url, token, state.turboMode); - - if (result.status === 'ok') { - failureCounts.set(seeder.name, 0); - } else { - const prev = failureCounts.get(seeder.name) || 0; - const newCount = prev + 1; - failureCounts.set(seeder.name, newCount); - - if (newCount < 2) { - // First failure — retry after 60s - log.warn(`${seeder.name} failed, scheduling retry in ${RETRY_DELAY_MS / 1000}s`); - const retryTimer = setTimeout(async () => { - if (state.shuttingDown) return; - if (inFlight.has(seeder.name)) return; // overlap protection on retry too - inFlight.add(seeder.name); - try { - const retryResult = await executeSeed(seeder, url, token, state.turboMode); - if (retryResult.status === 'ok') { - failureCounts.set(seeder.name, 0); - } else { - failureCounts.set(seeder.name, (failureCounts.get(seeder.name) || 0) + 1); - log.warn(`${seeder.name} retry failed — waiting for next cycle`); - } - } finally { - inFlight.delete(seeder.name); - drainQueue(); - } - }, RETRY_DELAY_MS); - timers.push(retryTimer); - } else { - if (newCount === MAX_CONSECUTIVE_FAILURES) { - log.warn(`${seeder.name}: ${MAX_CONSECUTIVE_FAILURES} consecutive failures — doubling interval`); - } - } - } - } finally { - inFlight.delete(seeder.name); - drainQueue(); - } - } - - for (const seeder of activeSeeders) { - const schedule = () => { - if (state.shuttingDown) return; - - const interval = getInterval(seeder); - const timer = setTimeout(() => { - if (state.shuttingDown) return; - - // Overlap protection - if (inFlight.has(seeder.name)) { - log.warn(`${seeder.name} still running, skipping this cycle`); - schedule(); // re-schedule for next interval - return; - } - - // Global concurrency check — queue if at limit - if (inFlight.size >= STEADY_STATE_CONCURRENCY) { - log.info(`${seeder.name} queued (concurrency limit)`); - queue.push(seeder); - schedule(); - return; - } - - runScheduled(seeder).then(() => { - schedule(); - }); - }, interval); - - timers.push(timer); - }; - - schedule(); - } -} - -/** - * Graceful shutdown: clear timers, wait for in-flight, exit. - */ -function setupShutdown(state) { - const handler = async (signal) => { - if (state.shuttingDown) return; - state.shuttingDown = true; - log.info(`Received ${signal}, shutting down...`); - - // Clear all scheduled timers - for (const t of state.timers) { - clearTimeout(t); - } - state.timers.length = 0; - state.queue.length = 0; - - // Wait for in-flight seeders - if (state.inFlight.size > 0) { - log.info(`Waiting for ${state.inFlight.size} in-flight seeder(s)...`); - const deadline = Date.now() + SHUTDOWN_TIMEOUT_MS; - while (state.inFlight.size > 0 && Date.now() < deadline) { - await new Promise((r) => setTimeout(r, 500)); - } - if (state.inFlight.size > 0) { - log.warn(`${state.inFlight.size} seeder(s) still running after ${SHUTDOWN_TIMEOUT_MS}ms timeout`); - } - } - - log.info('Shutdown complete'); - process.exit(0); - }; - - process.on('SIGTERM', () => handler('SIGTERM')); - process.on('SIGINT', () => handler('SIGINT')); -} - -// ──────────────────────────────────────────────────────────────────────────── -// Main entry point -// ──────────────────────────────────────────────────────────────────────────── - -async function main() { - const { url, token } = getRedisCredentials(); - - // Turbo mode - const turboMode = process.env.SEED_TURBO || undefined; - if (turboMode && turboMode !== 'real' && turboMode !== 'dry') { - log.error(`Invalid SEED_TURBO value: ${turboMode} (expected: real, dry)`); - process.exit(1); - } - if (turboMode) { - log.info(`⚡ TURBO MODE: ${turboMode} (intervals ÷20${turboMode === 'dry' ? ', no real seeders' : ''})`); - RETRY_DELAY_MS = 3_000; - } - - // Wait for Redis to be reachable - await waitForRedis(url, token); - - // Classify seeders - const { active, skipped } = classifySeeders(SEED_CATALOG); - - // Apply turbo interval compression - if (turboMode) { - for (const s of active) { - s.intervalMin = computeTurboInterval(s.intervalMin, turboMode); - } - } - - // Fetch freshness for all active seeders - const freshnessMap = await fetchFreshnessMap(active, url, token); - const freshCount = active.filter((s) => { - const meta = freshnessMap.get(s.name); - return isFresh(meta, s.intervalMin); - }).length; - - // Log startup summary - const summary = buildStartupSummary(active, skipped, freshCount); - log.info(summary); - - // Tiered cold start - await tieredColdStart(active, freshnessMap, url, token, turboMode); - - // Set up recurring scheduling - const state = { - timers: [], - inFlight: new Set(), - failureCounts: new Map(), - queue: [], - shuttingDown: false, - turboMode, - }; - - setupShutdown(state); - scheduleSeeders(active, url, token, state); - - log.info('Steady-state scheduling active'); -} - -// Only run main() when executed directly (not imported for testing) -const isDirectExecution = - process.argv[1] && - import.meta.url === `file://${process.argv[1]}`; - -if (isDirectExecution) { - main().catch((err) => { - log.error(`Fatal: ${err.message}`); - process.exit(1); - }); -} diff --git a/scripts/seed-service-statuses.mjs b/scripts/seed-service-statuses.mjs index b4e39322a..9b622bea1 100644 --- a/scripts/seed-service-statuses.mjs +++ b/scripts/seed-service-statuses.mjs @@ -12,10 +12,7 @@ import { loadEnvFile, CHROME_UA, getRedisCredentials, logSeedResult, extendExist loadEnvFile(import.meta.url); -const API_BASE = process.env.LOCAL_API_MODE === 'docker' - ? `http://localhost:${process.env.LOCAL_API_PORT || 46123}` - : 'https://api.worldmonitor.app'; -const RPC_URL = `${API_BASE}/api/infrastructure/v1/list-service-statuses`; +const RPC_URL = 'https://api.worldmonitor.app/api/infrastructure/v1/list-service-statuses'; const CANONICAL_KEY = 'infra:service-statuses:v1'; async function warmPing() { diff --git a/scripts/seed-utils/logger.mjs b/scripts/seed-utils/logger.mjs deleted file mode 100644 index c795884f1..000000000 --- a/scripts/seed-utils/logger.mjs +++ /dev/null @@ -1,16 +0,0 @@ -/** - * Prefixed console logger for seed orchestrator. - * @param {string} name — seeder name (e.g. 'earthquakes') - * @param {{ write?: (msg: string) => void }} [sink] — override for testing - */ -export function createLogger(name, sink) { - const prefix = name === 'orchestrator' ? '[orchestrator]' : `[seed:${name}]`; - const out = sink?.write ?? ((msg) => console.log(msg)); - const err = sink?.write ?? ((msg) => console.error(msg)); - - return { - info: (msg) => out(`${prefix} ${msg}`), - error: (msg) => err(`${prefix} error: ${msg}`), - warn: (msg) => err(`${prefix} warn: ${msg}`), - }; -} diff --git a/scripts/seed-utils/meta.mjs b/scripts/seed-utils/meta.mjs deleted file mode 100644 index 67f3b03f6..000000000 --- a/scripts/seed-utils/meta.mjs +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Seed metadata helpers for the orchestrator. - * - * Reads/writes `seed-meta:{domain}:{resource}` keys in the same schema - * as `writeFreshnessMetadata` in `_seed-utils.mjs`: - * { fetchedAt, recordCount, sourceVersion, durationMs?, status? } - */ - -/** - * Parse a raw seed-meta value from Redis. - * The value may be a parsed object (from _seed-utils.mjs redisGet which JSON.parses) - * or a raw JSON string. - * @param {object|string|null} raw - * @returns {{ fetchedAt: number, recordCount?: number, sourceVersion?: string, status?: string, error?: string } | null} - */ -export function parseFreshness(raw) { - if (raw == null || raw === '') return null; - try { - const obj = typeof raw === 'string' ? JSON.parse(raw) : raw; - if (!obj || typeof obj.fetchedAt !== 'number') return null; - return obj; - } catch { - return null; - } -} - -/** - * Check if seed-meta indicates data is still fresh. - * @param {{ fetchedAt: number } | null} meta — parsed seed-meta - * @param {number} intervalMin — refresh interval in minutes - * @returns {boolean} - */ -export function isFresh(meta, intervalMin) { - if (!meta) return false; - const ageMs = Date.now() - meta.fetchedAt; - return ageMs < intervalMin * 60_000; -} - -/** - * Build a seed-meta object for the orchestrator to write after a child process completes. - * @param {number} durationMs - * @param {'ok'|'error'} status - * @param {string} [error] - * @returns {object} - */ -export function buildMeta(durationMs, status, error) { - const meta = { fetchedAt: Date.now(), durationMs, status }; - if (error) meta.error = error; - return meta; -} diff --git a/scripts/seed-utils/runner.mjs b/scripts/seed-utils/runner.mjs deleted file mode 100644 index c902a2f19..000000000 --- a/scripts/seed-utils/runner.mjs +++ /dev/null @@ -1,64 +0,0 @@ -import { spawn } from 'node:child_process'; - -/** - * Fork a seed script as a child process and track its execution. - * - * @param {string} name — seeder name for logging - * @param {object} opts - * @param {string} opts.scriptPath — path to the script (or process.execPath for tests) - * @param {string[]} [opts.args] — arguments (default: none) - * @param {number} [opts.timeoutMs=120000] — kill after this many ms - * @param {object} [opts.env] — additional env vars (merged with process.env) - * @returns {Promise<{ name: string, exitCode: number|null, status: 'ok'|'error'|'timeout', durationMs: number }>} - */ -export function forkSeeder(name, opts) { - const { scriptPath, args = [], timeoutMs = 120_000, env } = opts; - - return new Promise((resolve) => { - const start = Date.now(); - const child = spawn(scriptPath, args, { - stdio: ['ignore', 'inherit', 'inherit'], - env: { ...process.env, ...env }, - }); - - let settled = false; - const timer = setTimeout(() => { - if (!settled) { - settled = true; - child.kill('SIGTERM'); - // Give 3s for graceful shutdown, then SIGKILL - setTimeout(() => { - try { child.kill('SIGKILL'); } catch {} - }, 3000); - resolve({ name, exitCode: null, status: 'timeout', durationMs: Date.now() - start }); - } - }, timeoutMs); - - child.on('close', (code) => { - if (!settled) { - settled = true; - clearTimeout(timer); - resolve({ - name, - exitCode: code, - status: code === 0 ? 'ok' : 'error', - durationMs: Date.now() - start, - }); - } - }); - - child.on('error', (err) => { - if (!settled) { - settled = true; - clearTimeout(timer); - resolve({ - name, - exitCode: null, - status: 'error', - durationMs: Date.now() - start, - error: err.message, - }); - } - }); - }); -} diff --git a/tests/seed-orchestrator-scheduling.test.mjs b/tests/seed-orchestrator-scheduling.test.mjs deleted file mode 100644 index 7def58510..000000000 --- a/tests/seed-orchestrator-scheduling.test.mjs +++ /dev/null @@ -1,157 +0,0 @@ -import assert from 'node:assert/strict'; -import { describe, it } from 'node:test'; -import { - classifySeeders, - buildStartupSummary, - getEffectiveInterval, - shouldSkipCycle, - computeTurboInterval, -} from '../scripts/seed-orchestrator.mjs'; -import { isFresh } from '../scripts/seed-utils/meta.mjs'; - -// ── getEffectiveInterval ───────────────────────────────────────────────── - -describe('getEffectiveInterval', () => { - it('returns normal interval when no turbo and no failures', () => { - // 30 min * 60_000 = 1_800_000 ms - assert.equal(getEffectiveInterval(30, 0, undefined), 1_800_000); - }); - - it('compresses interval by 20x in turbo mode', () => { - // 30 / 20 = 1.5 → rounds to 2 min → 120_000 ms - assert.equal(getEffectiveInterval(30, 0, 'real'), 120_000); - }); - - it('doubles interval after 5 consecutive failures', () => { - // 30 min * 2 = 60 min → 3_600_000 ms - assert.equal(getEffectiveInterval(30, 5, undefined), 3_600_000); - }); - - it('applies both turbo and demotion together', () => { - // 30 / 20 = 1.5 → rounds to 2, then * 2 = 4 min → 240_000 ms - assert.equal(getEffectiveInterval(30, 5, 'dry'), 240_000); - }); - - it('never returns less than 1 minute (60_000ms)', () => { - // 5 / 20 = 0.25 → floors to 1 min → 60_000 ms - assert.equal(getEffectiveInterval(5, 0, 'real'), 60_000); - }); - - it('4 failures does not trigger demotion', () => { - assert.equal(getEffectiveInterval(30, 4, undefined), 1_800_000); - }); -}); - -// ── shouldSkipCycle ────────────────────────────────────────────────────── - -describe('shouldSkipCycle', () => { - it('returns false when seeder not in flight', () => { - const inFlight = new Set(['other-seeder']); - assert.equal(shouldSkipCycle('earthquakes', inFlight), false); - }); - - it('returns true when seeder is in flight', () => { - const inFlight = new Set(['earthquakes']); - assert.equal(shouldSkipCycle('earthquakes', inFlight), true); - }); - - it('returns false for empty in-flight set', () => { - assert.equal(shouldSkipCycle('earthquakes', new Set()), false); - }); -}); - -// ── computeTurboInterval ───────────────────────────────────────────────── - -describe('computeTurboInterval', () => { - it('returns original interval when no turbo', () => { - assert.equal(computeTurboInterval(30, undefined), 30); - }); - - it('divides by 20 in turbo mode', () => { - // 120 / 20 = 6 - assert.equal(computeTurboInterval(120, 'real'), 6); - }); - - it('floors to minimum of 1 minute', () => { - // 5 / 20 = 0.25 → max(1, round(0.25)) = 1 - assert.equal(computeTurboInterval(5, 'dry'), 1); - }); - - it('rounds to nearest integer', () => { - // 30 / 20 = 1.5 → rounds to 2 - assert.equal(computeTurboInterval(30, 'real'), 2); - }); -}); - -// ── classifySeeders ────────────────────────────────────────────────────── - -describe('classifySeeders', () => { - const catalog = { - 'earthquakes': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'seismology:earthquakes' }, - 'market-quotes': { tier: 'hot', intervalMin: 10, ttlSec: 1800, ttlSource: 'source', requiredKeys: ['FINNHUB_API_KEY'], metaKey: 'market:quotes' }, - 'economy': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: ['FRED_API_KEY'], metaKey: 'economic:energy-prices' }, - }; - - it('all seeders active when no keys required', () => { - const mini = { 'a': { ...catalog['earthquakes'] } }; - const { active, skipped } = classifySeeders(mini, {}); - assert.equal(active.length, 1); - assert.equal(skipped.length, 0); - }); - - it('skips seeders with missing required keys', () => { - const { active, skipped } = classifySeeders(catalog, {}); - assert.equal(skipped.length, 2); - assert.ok(skipped.some(s => s.name === 'market-quotes')); - assert.ok(skipped.some(s => s.name === 'economy')); - }); - - it('includes seeders when required keys present', () => { - const { active, skipped } = classifySeeders(catalog, { FINNHUB_API_KEY: 'x', FRED_API_KEY: 'y' }); - assert.equal(active.length, 3); - assert.equal(skipped.length, 0); - }); -}); - -// ── buildStartupSummary ────────────────────────────────────────────────── - -describe('buildStartupSummary', () => { - it('includes active count and tier breakdown', () => { - const active = [ - { name: 'earthquakes', tier: 'warm' }, - { name: 'weather-alerts', tier: 'hot' }, - ]; - const summary = buildStartupSummary(active, [], 0); - assert.ok(summary.includes('ACTIVE (2)')); - assert.ok(summary.includes('hot: weather-alerts')); - assert.ok(summary.includes('warm: earthquakes')); - }); - - it('includes skipped seeders with reasons', () => { - const skipped = [{ name: 'market-quotes', reason: 'missing FINNHUB_API_KEY' }]; - const summary = buildStartupSummary([], skipped, 0); - assert.ok(summary.includes('SKIPPED (1)')); - assert.ok(summary.includes('FINNHUB_API_KEY')); - }); - - it('includes fresh count', () => { - const summary = buildStartupSummary([{ name: 'a', tier: 'hot' }], [], 1); - assert.ok(summary.includes('1/1 seeders have fresh data')); - }); -}); - -// ── freshness with turbo intervals ─────────────────────────────────────── - -describe('freshness integration', () => { - it('isFresh returns false for data older than turbo interval', () => { - const turboMin = computeTurboInterval(30, 'real'); // 2 min - const meta = { fetchedAt: Date.now() - 3 * 60_000 }; // 3 min ago - assert.equal(isFresh(meta, turboMin), false); - }); - - it('isFresh returns true for data within turbo interval', () => { - const turboMin = computeTurboInterval(30, 'real'); // 2 min - const meta = { fetchedAt: Date.now() - 30_000 }; // 30 sec ago - assert.equal(isFresh(meta, turboMin), true); - }); -}); diff --git a/tests/seed-orchestrator.test.mjs b/tests/seed-orchestrator.test.mjs deleted file mode 100644 index d99bb7aea..000000000 --- a/tests/seed-orchestrator.test.mjs +++ /dev/null @@ -1,209 +0,0 @@ -import assert from 'node:assert/strict'; -import { describe, it } from 'node:test'; -import { createLogger } from '../scripts/seed-utils/logger.mjs'; -import { parseFreshness, isFresh, buildMeta } from '../scripts/seed-utils/meta.mjs'; -import { forkSeeder } from '../scripts/seed-utils/runner.mjs'; -import { SEED_CATALOG, TIER_ORDER, TIER_CONCURRENCY, STEADY_STATE_CONCURRENCY } from '../scripts/seed-config.mjs'; -import { classifySeeders, buildStartupSummary } from '../scripts/seed-orchestrator.mjs'; - -describe('logger', () => { - it('prefixes messages with the given name', () => { - const lines = []; - const log = createLogger('earthquakes', { write: (msg) => lines.push(msg) }); - log.info('seeded 847 items'); - assert.match(lines[0], /\[seed:earthquakes\] seeded 847 items/); - }); - - it('formats error messages', () => { - const lines = []; - const log = createLogger('webcams', { write: (msg) => lines.push(msg) }); - log.error('HTTP 429'); - assert.match(lines[0], /\[seed:webcams\] error: HTTP 429/); - }); - - it('uses orchestrator prefix for orchestrator name', () => { - const lines = []; - const log = createLogger('orchestrator', { write: (msg) => lines.push(msg) }); - log.info('starting...'); - assert.match(lines[0], /\[orchestrator\] starting\.\.\./); - }); -}); - -describe('meta', () => { - describe('parseFreshness', () => { - it('parses valid seed-meta object (from redisGet which returns parsed JSON)', () => { - const obj = { fetchedAt: 1000, recordCount: 50, sourceVersion: 'v1' }; - const result = parseFreshness(obj); - assert.equal(result.fetchedAt, 1000); - assert.equal(result.recordCount, 50); - }); - - it('parses valid seed-meta string', () => { - const raw = JSON.stringify({ fetchedAt: 1000, recordCount: 50, sourceVersion: 'v1' }); - const result = parseFreshness(raw); - assert.equal(result.fetchedAt, 1000); - }); - - it('returns null for missing data', () => { - assert.equal(parseFreshness(null), null); - assert.equal(parseFreshness(''), null); - assert.equal(parseFreshness(undefined), null); - }); - - it('returns null for objects without fetchedAt', () => { - assert.equal(parseFreshness({ recordCount: 5 }), null); - }); - }); - - describe('isFresh', () => { - it('returns true when data is within interval', () => { - const meta = { fetchedAt: Date.now() - 60_000 }; // 1 min ago - assert.equal(isFresh(meta, 5), true); // 5 min interval - }); - - it('returns false when data is stale', () => { - const meta = { fetchedAt: Date.now() - 600_000 }; // 10 min ago - assert.equal(isFresh(meta, 5), false); // 5 min interval - }); - - it('returns false for null meta', () => { - assert.equal(isFresh(null, 5), false); - }); - }); - - describe('buildMeta', () => { - it('builds success meta', () => { - const meta = buildMeta(2340, 'ok'); - assert.equal(meta.status, 'ok'); - assert.equal(meta.durationMs, 2340); - assert.ok(meta.fetchedAt > 0); - assert.equal(meta.error, undefined); - }); - - it('builds error meta with message', () => { - const meta = buildMeta(5200, 'error', 'HTTP 429'); - assert.equal(meta.status, 'error'); - assert.equal(meta.error, 'HTTP 429'); - }); - }); -}); - -describe('runner', () => { - it('runs a script that exits 0 and reports success', async () => { - const result = await forkSeeder('test-ok', { - scriptPath: process.execPath, - args: ['-e', 'process.exit(0)'], - timeoutMs: 5000, - }); - assert.equal(result.exitCode, 0); - assert.equal(result.status, 'ok'); - assert.equal(result.name, 'test-ok'); - assert.ok(result.durationMs >= 0); - }); - - it('runs a script that exits 1 and reports error', async () => { - const result = await forkSeeder('test-fail', { - scriptPath: process.execPath, - args: ['-e', 'process.exit(1)'], - timeoutMs: 5000, - }); - assert.equal(result.exitCode, 1); - assert.equal(result.status, 'error'); - }); - - it('kills a script that exceeds timeout', async () => { - const result = await forkSeeder('test-hang', { - scriptPath: process.execPath, - args: ['-e', 'setTimeout(() => {}, 60000)'], - timeoutMs: 500, - }); - assert.equal(result.status, 'timeout'); - assert.equal(result.exitCode, null); - }); -}); - -describe('seed-config', () => { - it('exports a catalog with entries for all seed scripts', () => { - assert.equal(Object.keys(SEED_CATALOG).length, 42); - }); - - it('every entry has required fields', () => { - for (const [name, cfg] of Object.entries(SEED_CATALOG)) { - assert.ok(['hot', 'warm', 'cold', 'frozen'].includes(cfg.tier), `${name}: invalid tier ${cfg.tier}`); - assert.ok(typeof cfg.intervalMin === 'number' && cfg.intervalMin > 0, `${name}: invalid intervalMin`); - assert.ok(typeof cfg.ttlSec === 'number' && cfg.ttlSec > 0, `${name}: invalid ttlSec`); - assert.ok(['source', 'inferred'].includes(cfg.ttlSource), `${name}: invalid ttlSource`); - assert.ok(Array.isArray(cfg.requiredKeys), `${name}: requiredKeys must be array`); - assert.ok(cfg.metaKey === null || typeof cfg.metaKey === 'string', `${name}: metaKey must be string or null`); - } - }); - - it('intervalMin is always less than ttlSec / 60', () => { - for (const [name, cfg] of Object.entries(SEED_CATALOG)) { - assert.ok(cfg.intervalMin < cfg.ttlSec / 60, `${name}: intervalMin ${cfg.intervalMin} >= ttlSec/60 ${cfg.ttlSec / 60}`); - } - }); - - it('TIER_ORDER defines execution order', () => { - assert.deepEqual(TIER_ORDER, ['hot', 'warm', 'cold', 'frozen']); - }); - - it('TIER_CONCURRENCY defines concurrency caps', () => { - assert.equal(TIER_CONCURRENCY.hot, 3); - assert.equal(TIER_CONCURRENCY.warm, 5); - assert.equal(TIER_CONCURRENCY.cold, 3); - assert.equal(TIER_CONCURRENCY.frozen, 2); - }); - - it('STEADY_STATE_CONCURRENCY is 5', () => { - assert.equal(STEADY_STATE_CONCURRENCY, 5); - }); - - it('every catalog name matches a seed-*.mjs file', async () => { - const fs = await import('node:fs'); - const path = await import('node:path'); - for (const name of Object.keys(SEED_CATALOG)) { - const filePath = path.join(process.cwd(), 'scripts', `seed-${name}.mjs`); - assert.ok(fs.existsSync(filePath), `${name}: missing file seed-${name}.mjs`); - } - }); -}); - -describe('orchestrator', () => { - describe('classifySeeders', () => { - it('splits seeders into active and skipped based on env vars', () => { - const catalog = { - 'earthquakes': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: [], metaKey: 'seismology:earthquakes' }, - 'market-quotes': { tier: 'hot', intervalMin: 10, ttlSec: 1800, ttlSource: 'source', requiredKeys: ['FINNHUB_API_KEY'], metaKey: 'market:quotes' }, - 'economy': { tier: 'warm', intervalMin: 30, ttlSec: 3600, ttlSource: 'source', requiredKeys: ['FRED_API_KEY'], metaKey: 'economic:energy-prices' }, - }; - const env = { FRED_API_KEY: 'test' }; - const { active, skipped } = classifySeeders(catalog, env); - - assert.equal(active.length, 2); // earthquakes + economy - assert.equal(skipped.length, 1); // market-quotes - assert.ok(skipped[0].name === 'market-quotes'); - assert.ok(skipped[0].reason.includes('FINNHUB_API_KEY')); - assert.ok(active.some(s => s.name === 'earthquakes')); - assert.ok(active.some(s => s.name === 'economy')); - }); - }); - - describe('buildStartupSummary', () => { - it('returns formatted summary string', () => { - const active = [ - { name: 'earthquakes', tier: 'warm' }, - { name: 'weather-alerts', tier: 'hot' }, - ]; - const skipped = [ - { name: 'market-quotes', reason: 'missing FINNHUB_API_KEY' }, - ]; - const summary = buildStartupSummary(active, skipped, 1); - assert.ok(summary.includes('ACTIVE (2)')); - assert.ok(summary.includes('SKIPPED (1)')); - assert.ok(summary.includes('market-quotes')); - assert.ok(summary.includes('FINNHUB_API_KEY')); - assert.ok(summary.includes('1/2 seeders have fresh data')); - }); - }); -}); diff --git a/wmsm.sh b/wmsm.sh deleted file mode 100755 index d0938eeab..000000000 --- a/wmsm.sh +++ /dev/null @@ -1,602 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# ═══════════════════════════════════════════════════════════════════════════════ -# 🌍 World Monitor Seed Manager (wmsm) -# Homelab CLI for managing the seed orchestrator. -# Keep CATALOG in sync with scripts/seed-config.mjs -# ═══════════════════════════════════════════════════════════════════════════════ - -REDIS_URL="${REDIS_URL:-http://localhost:8079}" -REDIS_TOKEN="${REDIS_TOKEN:-wm-local-token}" -CONTAINER="${WM_CONTAINER:-worldmonitor}" - -# Auto-detect container runtime: docker or podman -if command -v docker >/dev/null 2>&1; then - DOCKER=docker -elif command -v podman >/dev/null 2>&1; then - DOCKER=podman -else - DOCKER=docker # will fail at check_deps with helpful message -fi - -# ── Catalog: name|tier|intervalMin|ttlSec|metaKey ───────────────────────────── -# metaKey "null" means orchestrator writes seed-meta:orchestrator:{name} -CATALOG=( - # HOT (5-15 min) - "weather-alerts|hot|5|900|weather:alerts" - "correlation|hot|5|1200|correlation:cards" - "prediction-markets|hot|10|1800|prediction:markets" - "commodity-quotes|hot|10|1800|market:commodities" - "market-quotes|hot|10|1800|market:quotes" - "insights|hot|15|1800|news:insights" - "military-flights|hot|5|600|military:flights" - "conflict-intel|hot|10|900|conflict:acled-intel" - # WARM (30-60 min) - "earthquakes|warm|30|3600|seismology:earthquakes" - "security-advisories|warm|30|7200|intelligence:advisories" - "fire-detections|warm|30|7200|wildfire:fires" - "natural-events|warm|30|3600|natural:events" - "radiation-watch|warm|30|7200|radiation:observations" - "airport-delays|warm|30|7200|aviation:faa" - "crypto-quotes|warm|30|3600|market:crypto" - "stablecoin-markets|warm|30|3600|market:stablecoins" - "gulf-quotes|warm|30|3600|market:gulf-quotes" - "etf-flows|warm|30|3600|market:etf-flows" - "economy|warm|30|3600|economic:energy-prices" - "research|warm|30|3600|research:arxiv-hn-trending" - "unrest-events|warm|30|3600|unrest:events" - "usa-spending|warm|30|3600|economic:spending" - "supply-chain-trade|warm|30|3600|supply_chain:shipping" - "aviation|warm|30|3600|aviation:ops-news" - "internet-outages|warm|15|1800|infra:outages" - "infra|warm|30|3600|null" - "service-statuses|warm|30|3600|infra:service-statuses" - "military-maritime-news|warm|30|3600|null" - "sanctions-pressure|warm|30|43200|sanctions:pressure" - "forecasts|warm|60|6300|forecast:predictions" - # COLD (2-6 hours) - "cyber-threats|cold|120|10800|cyber:threats" - "climate-anomalies|cold|120|10800|climate:anomalies" - "thermal-escalation|cold|120|10800|thermal:escalation" - "gdelt-intel|cold|120|86400|intelligence:gdelt-intel" - "webcams|cold|360|86400|webcam:cameras:geo" - "iran-events|cold|360|172800|conflict:iran-events" - # FROZEN (12h-7d) - "bis-data|frozen|600|43200|economic:bis" - "displacement-summary|frozen|720|86400|displacement:summary" - "submarine-cables|frozen|1440|604800|infrastructure:submarine-cables" - "military-bases|frozen|1440|604800|null" - "ucdp-events|frozen|720|86400|conflict:ucdp-events" - "wb-indicators|frozen|720|86400|null" -) - -TIER_ICONS=( "hot|🔥|5-15 min" "warm|🟡|30-60 min" "cold|🧊|2-6 hours" "frozen|🪨|12h-7d" ) -TIER_CONCURRENCY=( "hot|3" "warm|5" "cold|3" "frozen|2" ) - -# ── Helpers ─────────────────────────────────────────────────────────────────── - -header() { - echo "🌍 World Monitor Seed Manager" - echo "══════════════════════════════════════════════════════════════" - echo -} - -footer_line() { - echo "──────────────────────────────────────────────────────────────" -} - -redis_get() { - curl -sf -H "Authorization: Bearer $REDIS_TOKEN" "$REDIS_URL/get/$(python3 -c "import urllib.parse; print(urllib.parse.quote('$1', safe=''))" 2>/dev/null || echo "$1")" 2>/dev/null -} - -redis_scan() { - local pattern="$1" cursor="0" all_keys="" - while true; do - local resp - resp=$(curl -sf -H "Authorization: Bearer $REDIS_TOKEN" "$REDIS_URL/scan/$cursor?match=$(python3 -c "import urllib.parse; print(urllib.parse.quote('$pattern', safe=''))" 2>/dev/null || echo "$pattern")&count=200" 2>/dev/null) - [ -z "$resp" ] && break - cursor=$(echo "$resp" | jq -r '.result[0]') - local keys - keys=$(echo "$resp" | jq -r '.result[1][]' 2>/dev/null) - [ -n "$keys" ] && all_keys="$all_keys"$'\n'"$keys" - [ "$cursor" = "0" ] && break - done - echo "$all_keys" | grep -v '^$' | sort -u -} - -redis_del() { - local key="$1" - curl -sf -X POST -H "Authorization: Bearer $REDIS_TOKEN" -H "Content-Type: application/json" \ - -d "[\"DEL\",\"$key\"]" "$REDIS_URL" >/dev/null 2>&1 -} - -# Format seconds as human-readable age -format_age() { - local secs=$1 - if (( secs < 0 )); then echo "just now" - elif (( secs < 60 )); then echo "${secs}s ago" - elif (( secs < 3600 )); then echo "$(( secs / 60 ))m ago" - elif (( secs < 86400 )); then echo "$(( secs / 3600 ))h ago" - else echo "$(( secs / 86400 ))d ago" - fi -} - -# Format minutes as human-readable interval -format_interval() { - local min=$1 - if (( min < 60 )); then printf "%3dm" "$min" - elif (( min < 1440 )); then printf "%3dh" "$(( min / 60 ))" - else printf "%3dd" "$(( min / 1440 ))" - fi -} - -# Format seconds as human-readable TTL -format_ttl() { - local secs=$1 - if (( secs < 3600 )); then printf "%3dm" "$(( secs / 60 ))" - elif (( secs < 86400 )); then printf "%3dh" "$(( secs / 3600 ))" - else printf "%3dd" "$(( secs / 86400 ))" - fi -} - -# Get the seed-meta Redis key for a catalog entry -get_meta_key() { - local name="$1" meta_key="$2" - if [ "$meta_key" = "null" ]; then - echo "seed-meta:orchestrator:$name" - else - echo "seed-meta:$meta_key" - fi -} - -# Find closest seeder name match for typo correction -suggest_seeder() { - local input="$1" best="" best_score=0 - for entry in "${CATALOG[@]}"; do - local name="${entry%%|*}" - if [[ "$name" == *"$input"* ]] || [[ "$input" == *"$name"* ]]; then - echo "$name" - return - fi - done - # Fallback: longest common substring - for entry in "${CATALOG[@]}"; do - local name="${entry%%|*}" - local score=0 - for (( i=0; i<${#input}; i++ )); do - if [[ "$name" == *"${input:$i:1}"* ]]; then - (( score++ )) - fi - done - if (( score > best_score )); then - best_score=$score - best="$name" - fi - done - echo "$best" -} - -# ── Dependency check ────────────────────────────────────────────────────────── - -check_deps() { - local missing=() - command -v "$DOCKER" >/dev/null 2>&1 || missing+=("docker or podman") - command -v curl >/dev/null 2>&1 || missing+=("curl") - command -v jq >/dev/null 2>&1 || missing+=("jq") - if (( ${#missing[@]} > 0 )); then - echo "❌ Missing required tools: ${missing[*]}" - exit 1 - fi -} - -check_container() { - if ! $DOCKER inspect "$CONTAINER" --format '{{.State.Running}}' 2>/dev/null | grep -q true; then - echo "❌ Container '$CONTAINER' is not running" - echo " Start it with: $DOCKER compose up -d" - exit 1 - fi -} - -check_redis() { - if ! curl -sf -H "Authorization: Bearer $REDIS_TOKEN" "$REDIS_URL/ping" >/dev/null 2>&1; then - echo "❌ Cannot reach Redis at $REDIS_URL — is the stack running?" - exit 1 - fi -} - -# ── Commands ────────────────────────────────────────────────────────────────── - -cmd_help() { - cat <<'HELP' -🌍 World Monitor Seed Manager (wmsm) - -Usage: ./wmsm.sh [options] - -Commands: - status 📊 Show freshness of all seeders - schedule ⏱️ Show the refresh schedule - refresh 🔄 Force re-seed a specific seeder - refresh --all 🔄 Force re-seed everything (tiered) - flush 🗑️ Wipe all seed data and re-seed from scratch - logs [--follow|--all] 📋 Show orchestrator logs - help ❓ Show this help - -Environment: - REDIS_URL Redis REST proxy URL (default: http://localhost:8079) - REDIS_TOKEN Redis REST auth token (default: wm-local-token) - WM_CONTAINER Docker container name (default: worldmonitor) -HELP -} - -cmd_status() { - header - local now_ms - now_ms=$(date +%s%3N 2>/dev/null || echo "$(date +%s)000") - local count_healthy=0 count_stale=0 count_error=0 count_skipped=0 - local current_tier="" - - for entry in "${CATALOG[@]}"; do - IFS='|' read -r name tier interval_min ttl_sec meta_key <<< "$entry" - local redis_key - redis_key=$(get_meta_key "$name" "$meta_key") - - # Print tier header on tier change - if [ "$tier" != "$current_tier" ]; then - [ -n "$current_tier" ] && echo - current_tier="$tier" - local icon="" label="" - for ti in "${TIER_ICONS[@]}"; do - IFS='|' read -r t i l <<< "$ti" - if [ "$t" = "$tier" ]; then icon="$i"; label="$l"; break; fi - done - echo "$icon ${tier^^} ($label)" - fi - - # Fetch seed-meta from Redis - local raw - raw=$(redis_get "$redis_key" 2>/dev/null) || raw="" - local result - result=$(echo "$raw" | jq -r '.result // empty' 2>/dev/null) || result="" - - if [ -z "$result" ] || [ "$result" = "null" ]; then - # No meta — skipped - printf " ⬚ %-25s no data\n" "$name" - (( count_skipped++ )) || true - continue - fi - - # Parse meta fields (result is a JSON string, so parse it again) - local fetched_at record_count duration_ms status_field error_field - fetched_at=$(echo "$result" | jq -r '.fetchedAt // 0' 2>/dev/null) || fetched_at=0 - record_count=$(echo "$result" | jq -r '.recordCount // "-"' 2>/dev/null) || record_count="-" - duration_ms=$(echo "$result" | jq -r '.durationMs // 0' 2>/dev/null) || duration_ms=0 - status_field=$(echo "$result" | jq -r '.status // "ok"' 2>/dev/null) || status_field="ok" - error_field=$(echo "$result" | jq -r '.error // empty' 2>/dev/null) || error_field="" - - # Calculate age - local age_sec=0 - if (( fetched_at > 0 )); then - age_sec=$(( (${now_ms%???} - fetched_at / 1000) )) - (( age_sec < 0 )) && age_sec=0 - fi - - local age_str - age_str=$(format_age "$age_sec") - local duration_str - if (( duration_ms > 0 )); then - duration_str="$(awk "BEGIN {printf \"%.1f\", $duration_ms / 1000}")s" - else - duration_str="—" - fi - - local items_str - if [ "$record_count" != "-" ] && [ "$record_count" != "null" ]; then - items_str="${record_count} items" - else - items_str="—" - fi - - # Determine status icon - local icon - local interval_sec=$(( interval_min * 60 )) - if [ "$status_field" = "error" ] || [ "$status_field" = "timeout" ]; then - icon="❌" - (( count_error++ )) || true - elif (( age_sec > interval_sec )); then - icon="⚠️ " - (( count_stale++ )) || true - else - icon="✅" - (( count_healthy++ )) || true - fi - - printf " %s %-25s %-12s %-14s %s\n" "$icon" "$name" "$age_str" "$items_str" "$duration_str" - done - - echo - footer_line - echo "✅ $count_healthy healthy ⚠️ $count_stale stale ❌ $count_error error ⏭️ $count_skipped skipped" -} - -cmd_schedule() { - echo "🌍 World Monitor Seed Manager — Schedule" - echo "══════════════════════════════════════════════════════════════" - echo - - local now_ms - now_ms=$(date +%s%3N 2>/dev/null || echo "$(date +%s)000") - local now_sec=${now_ms%???} - local count_scheduled=0 count_skipped=0 - local current_tier="" - - for entry in "${CATALOG[@]}"; do - IFS='|' read -r name tier interval_min ttl_sec meta_key <<< "$entry" - local redis_key - redis_key=$(get_meta_key "$name" "$meta_key") - - # Print tier header on tier change - if [ "$tier" != "$current_tier" ]; then - [ -n "$current_tier" ] && echo - current_tier="$tier" - local icon="" - for ti in "${TIER_ICONS[@]}"; do - IFS='|' read -r t i l <<< "$ti" - if [ "$t" = "$tier" ]; then icon="$i"; break; fi - done - echo "$icon ${tier^^}" - fi - - # Fetch seed-meta - local raw result fetched_at age_sec - raw=$(redis_get "$redis_key" 2>/dev/null) || raw="" - result=$(echo "$raw" | jq -r '.result // empty' 2>/dev/null) || result="" - - if [ -z "$result" ] || [ "$result" = "null" ]; then - printf " %-25s every %s TTL %s ⏭️ no data\n" "$name" "$(format_interval "$interval_min")" "$(format_ttl "$ttl_sec")" - (( count_skipped++ )) || true - continue - fi - - fetched_at=$(echo "$result" | jq -r '.fetchedAt // 0' 2>/dev/null) || fetched_at=0 - if (( fetched_at > 0 )); then - age_sec=$(( now_sec - fetched_at / 1000 )) - (( age_sec < 0 )) && age_sec=0 - else - age_sec=0 - fi - - local age_str - age_str=$(format_age "$age_sec") - - # Calculate next run estimate - local interval_sec=$(( interval_min * 60 )) - local remaining=$(( interval_sec - age_sec )) - local next_str - if (( remaining <= 0 )); then - next_str="overdue" - elif (( remaining < 60 )); then - next_str="~${remaining}s" - elif (( remaining < 3600 )); then - next_str="~$(( remaining / 60 ))m" - else - next_str="~$(( remaining / 3600 ))h" - fi - - printf " %-25s every %s TTL %s last %-12s next %s\n" \ - "$name" "$(format_interval "$interval_min")" "$(format_ttl "$ttl_sec")" "$age_str" "$next_str" - (( count_scheduled++ )) || true - done - - echo - footer_line - echo "⏱️ $count_scheduled scheduled ⏭️ $count_skipped skipped" -} - -cmd_refresh() { - local target="${1:-}" - - if [ -z "$target" ]; then - echo "❌ Usage: ./wmsm.sh refresh or ./wmsm.sh refresh --all" - exit 1 - fi - - if [ "$target" = "--all" ]; then - cmd_refresh_all - return - fi - - # Validate seeder name - local found=false - for entry in "${CATALOG[@]}"; do - local name="${entry%%|*}" - if [ "$name" = "$target" ]; then - found=true - break - fi - done - - if [ "$found" = false ]; then - echo "❌ Unknown seeder: $target" - local suggestion - suggestion=$(suggest_seeder "$target") - if [ -n "$suggestion" ]; then - echo "💡 Did you mean: $suggestion?" - fi - exit 1 - fi - - header - echo "🔄 Refreshing $target..." - local start_sec - start_sec=$(date +%s) - - if $DOCKER exec "$CONTAINER" node "scripts/seed-${target}.mjs"; then - local dur=$(( $(date +%s) - start_sec )) - echo " ✅ Done in ${dur}s" - else - local code=$? - local dur=$(( $(date +%s) - start_sec )) - echo " ❌ Failed (exit code $code) in ${dur}s" - exit 1 - fi -} - -cmd_refresh_all() { - header - echo "🔄 Refreshing all seeders (tiered)..." - echo - - local total_ok=0 total_err=0 total_skip=0 - - for tier_order in hot warm cold frozen; do - # Get concurrency for this tier - local concurrency=3 - for tc in "${TIER_CONCURRENCY[@]}"; do - IFS='|' read -r t c <<< "$tc" - if [ "$t" = "$tier_order" ]; then concurrency=$c; break; fi - done - - # Get tier icon - local icon="" - for ti in "${TIER_ICONS[@]}"; do - IFS='|' read -r t i l <<< "$ti" - if [ "$t" = "$tier_order" ]; then icon="$i"; break; fi - done - - # Collect seeders for this tier - local tier_seeders=() - for entry in "${CATALOG[@]}"; do - IFS='|' read -r name tier interval_min ttl_sec meta_key <<< "$entry" - if [ "$tier" = "$tier_order" ]; then - tier_seeders+=("$name") - fi - done - - (( ${#tier_seeders[@]} == 0 )) && continue - - echo "$icon ${tier_order^^} (${#tier_seeders[@]} seeders, $concurrency at a time)" - - # Run in batches - local idx=0 - while (( idx < ${#tier_seeders[@]} )); do - local pids=() names=() starts=() - local batch_size=0 - while (( batch_size < concurrency && idx < ${#tier_seeders[@]} )); do - local sname="${tier_seeders[$idx]}" - local start_sec - start_sec=$(date +%s) - $DOCKER exec "$CONTAINER" node "scripts/seed-${sname}.mjs" >/dev/null 2>&1 & - pids+=($!) - names+=("$sname") - starts+=("$start_sec") - (( idx++ )) || true - (( batch_size++ )) || true - done - - # Wait for batch - for i in "${!pids[@]}"; do - if wait "${pids[$i]}" 2>/dev/null; then - local dur=$(( $(date +%s) - ${starts[$i]} )) - echo " ✅ ${names[$i]} (${dur}s)" - (( total_ok++ )) || true - else - local dur=$(( $(date +%s) - ${starts[$i]} )) - echo " ❌ ${names[$i]} (${dur}s)" - (( total_err++ )) || true - fi - done - done - echo - done - - footer_line - echo "Done: $total_ok ✅ $total_err ❌ $total_skip ⏭️" -} - -cmd_flush() { - echo "⚠️ This will delete ALL seed data and metadata from Redis." - echo " The orchestrator will perform a full cold start re-seed." - echo - read -rp " Type 'flush' to confirm: " confirm - if [ "$confirm" != "flush" ]; then - echo " Cancelled." - exit 0 - fi - - echo - echo "🗑️ Flushing seed data..." - - # Delete seed-meta keys - local meta_keys - meta_keys=$(redis_scan "seed-meta:*") - local meta_count=0 - while IFS= read -r key; do - [ -z "$key" ] && continue - redis_del "$key" - (( meta_count++ )) || true - done <<< "$meta_keys" - echo " Deleted $meta_count seed-meta keys" - - # Delete seed-lock keys - local lock_keys - lock_keys=$(redis_scan "seed-lock:*") - local lock_count=0 - while IFS= read -r key; do - [ -z "$key" ] && continue - redis_del "$key" - (( lock_count++ )) || true - done <<< "$lock_keys" - echo " Deleted $lock_count seed-lock keys" - - echo - echo "🔄 Restarting orchestrator for cold start..." - $DOCKER restart "$CONTAINER" >/dev/null 2>&1 - echo " ✅ Container restarting — run ./wmsm.sh logs --follow to watch" -} - -cmd_logs() { - local mode="${1:---filter}" - - case "$mode" in - --follow|-f) - $DOCKER logs -f "$CONTAINER" 2>&1 | grep --line-buffered '\[orchestrator\]\|\[seed:' - ;; - --all|-a) - $DOCKER logs "$CONTAINER" 2>&1 - ;; - *) - $DOCKER logs "$CONTAINER" 2>&1 | grep '\[orchestrator\]\|\[seed:' - ;; - esac -} - -# ── Main dispatcher ────────────────────────────────────────────────────────── - -main() { - local cmd="${1:-help}" - shift 2>/dev/null || true - - if [ "$cmd" = "help" ] || [ "$cmd" = "--help" ] || [ "$cmd" = "-h" ]; then - cmd_help - exit 0 - fi - - check_deps - check_container - - case "$cmd" in - status) check_redis; cmd_status "$@" ;; - schedule) check_redis; cmd_schedule "$@" ;; - refresh) check_redis; cmd_refresh "$@" ;; - flush) check_redis; cmd_flush "$@" ;; - logs) cmd_logs "$@" ;; - *) - echo "❌ Unknown command: $cmd" - echo " Run ./wmsm.sh help for usage" - exit 1 - ;; - esac -} - -main "$@"