From 12ea629a639891be8d9de041f6c9ea4e60d14d12 Mon Sep 17 00:00:00 2001 From: Elie Habib Date: Fri, 10 Apr 2026 14:44:14 +0400 Subject: [PATCH] =?UTF-8?q?feat(supply-chain):=20Sprint=20C=20=E2=80=94=20?= =?UTF-8?q?scenario=20engine=20(templates,=20job=20API,=20Railway=20worker?= =?UTF-8?q?,=20map=20activation)=20(#2890)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(supply-chain): Sprint C β€” scenario engine templates, job API, Railway worker, map activation Adds the async scenario engine for supply chain disruption modelling: - src/config/scenario-templates.ts: 6 pre-built ScenarioTemplate definitions (Taiwan Strait closure, Suez+BaB simultaneous, Panama drought, Hormuz blockade, Russia Baltic grain suspension, US electronics tariff shock) with costShockMultiplier and optional HS2 sector scoping. Exports ScenarioVisualState + ScenarioResult types (no UI imports, avoids MapContainer <-> DeckGLMap circular dep). - api/scenario/v1/run.ts: PRO-gated edge function β€” validates scenarioId against template registry and iso2 format, enqueues job to Redis scenario-queue:pending via RPUSH. Returns {jobId, status:'pending'} HTTP 202. - api/scenario/v1/status.ts: Edge function β€” validates jobId via regex to prevent Redis key injection, reads scenario-result:{jobId}. Returns {status:'pending'} when unprocessed, or full worker result when done. - scripts/scenario-worker.mjs: Always-on Railway worker using BLMOVE LEFT RIGHT for atomic FIFO dequeue+claim. Idempotency check before compute. Writes result with 24h TTL; writes {status:'failed'} on error; always cleans processing list in finally. - DeckGLMap.ts: scenarioState field + setScenarioState(). createTradeRoutesLayer() overrides arc color to orange for segments whose route waypoints intersect scenario disruptedChokepointIds. Null state restores normal colors. - MapContainer.ts: activateScenario(id, result) and deactivateScenario() broadcast ScenarioVisualState to DeckGLMap. Globe/SVG deferred to Sprint D (best-effort). πŸ€– Generated with Claude Sonnet 4.6 via Claude Code (https://claude.com/claude-code) + Compound Engineering v2.49.0 Co-Authored-By: Claude Sonnet 4.6 (200K context) * fix(supply-chain): move scenario-templates to server/ to satisfy arch boundary api/ edge functions may not import from src/ app code. Move the authoritative scenario-templates.ts to server/worldmonitor/supply-chain/v1/ and replace src/config/scenario-templates.ts with a type-only re-export for src/ consumers. * fix(supply-chain): guard scenario-worker runWorker() behind isMain check Without the isMain guard, the pre-push hook picks up scenario-worker.mjs as a seed test candidate (non-matching lines pass through sed unchanged) and starts the long-running worker process, causing push failures. * fix(pre-push): filter non-matching lines from seed test selector The sed transform passes non-matching lines (e.g. scenario-worker.mjs) through unchanged. Adding grep "^tests/" ensures only successfully transformed test paths are passed to the test runner. * fix(supply-chain): address PR #2890 review findings β€” worker data shapes + status PRO gate Three bugs found in PR #2890 code review: 1. [High] scenario-worker.mjs read wrong cache shape for exposure data. supply-chain:exposure:{iso2}:{hs2}:v1 caches GetCountryChokepointIndexResponse ({ iso2, hs2, exposures: [{chokepointId, exposureScore}], ... }), not a chokepointId-keyed object. Worker now iterates data.exposures[], filters by template.affectedChokepointIds, and ranks by exposureScore (importValue does not exist on ChokepointExposureEntry). adjustedImpact = exposureScore x (disruptionPct/100) x costShockMultiplier. 2. [Medium] api/scenario/v1/status.ts was not PRO-gated, allowing anyone with a valid jobId to retrieve full premium scenario results. Added isCallerPremium() check; returns HTTP 403 for non-PRO callers, matching run.ts behavior. 3. [Low] Worker parsed chokepoint status cache as Array but actual shape is { chokepoints: [], fetchedAt, upstreamUnavailable }. Fixed to access cpData.chokepoints array. * fix(scenario): per-country impactPct + O(1) route lookup in arc layer - impactPct now reflects each country's relative share of the worst-hit country (0-100) instead of the flat template.disruptionPct for all - Pre-build routeIdβ†’waypoints Map in createTradeRoutesLayer() so getColor() is O(1) per segment instead of O(n) per frame * fix(scenario): rate limit, pipeline GETs, error sanitization, processing state, orphan drain - Add per-user rate limit (10 jobs/min) + queue depth cap to run.ts - Replace 594 sequential Redis GETs with single Upstash pipeline call - Sanitize worker err.message to 'computation_error' in failed results - Remove dead validateApiKey() calls (isCallerPremium covers this) - Write processing state before computeScenario() starts - Add SIGTERM handler + startup orphan drain to worker loop - Validate dequeued job payload fields before use as Redis key fragments - Fix maxImpact divide-by-zero with Math.max(..., 1) - Hoist routeWaypoints Map to module level in DeckGLMap - Add GET /api/scenario/v1/templates discovery endpoint - Fix template sync comment to reference correct authoritative file * docs(plan): mark Sprint C complete, record deferrals to Sprint D - Sprint status table added: Sprints 0-2 merged, C ready to merge (#2890), A/B/D not started - Sprint C checklist: 4 ACs checked off, panel UI + tariff-shock visual deferred - Sprint D section updated to carry over Sprint C visual deferrals - PR #2890 added to Related PRs --------- Co-authored-by: Claude Sonnet 4.6 (200K context) --- .husky/pre-push | 2 +- api/scenario/v1/run.ts | 156 +++ api/scenario/v1/status.ts | 77 ++ api/scenario/v1/templates.ts | 27 + ...-supply-chain-routing-intelligence-plan.md | 980 ++++++++++++++++++ scripts/scenario-worker.mjs | 437 ++++++++ .../supply-chain/v1/scenario-templates.ts | 141 +++ src/components/DeckGLMap.ts | 42 +- src/components/MapContainer.ts | 30 + src/config/scenario-templates.ts | 12 + 10 files changed, 1899 insertions(+), 5 deletions(-) create mode 100644 api/scenario/v1/run.ts create mode 100644 api/scenario/v1/status.ts create mode 100644 api/scenario/v1/templates.ts create mode 100644 docs/plans/2026-04-09-001-feat-worldwide-supply-chain-routing-intelligence-plan.md create mode 100644 scripts/scenario-worker.mjs create mode 100644 server/worldmonitor/supply-chain/v1/scenario-templates.ts create mode 100644 src/config/scenario-templates.ts diff --git a/.husky/pre-push b/.husky/pre-push index a8426aa2e..fc9ec886e 100755 --- a/.husky/pre-push +++ b/.husky/pre-push @@ -106,7 +106,7 @@ else fi if [ "$RUN_SEED" = true ]; then echo "Running seed tests (scripts/ changed)..." - SEED_TESTS=$(echo "$CHANGED_FILES" | grep "^scripts/" | sed 's|scripts/seed-\(.*\)\.mjs|tests/\1-seed.test.mjs|' | while read -r t; do [ -f "$t" ] && echo "$t"; done) + SEED_TESTS=$(echo "$CHANGED_FILES" | grep "^scripts/" | sed 's|scripts/seed-\(.*\)\.mjs|tests/\1-seed.test.mjs|' | grep "^tests/" | while read -r t; do [ -f "$t" ] && echo "$t"; done) if [ -n "$SEED_TESTS" ]; then timeout 120 npx tsx --test $SEED_TESTS || exit 1 else diff --git a/api/scenario/v1/run.ts b/api/scenario/v1/run.ts new file mode 100644 index 000000000..776c209bf --- /dev/null +++ b/api/scenario/v1/run.ts @@ -0,0 +1,156 @@ +export const config = { runtime: 'edge' }; + +import { isCallerPremium } from '../../../server/_shared/premium-check'; +import { getScenarioTemplate } from '../../../server/worldmonitor/supply-chain/v1/scenario-templates'; + +const JOB_ID_CHARSET = 'abcdefghijklmnopqrstuvwxyz0123456789'; + +function generateJobId(): string { + const ts = Date.now(); + let suffix = ''; + const array = new Uint8Array(8); + crypto.getRandomValues(array); + for (const byte of array) suffix += JOB_ID_CHARSET[byte % JOB_ID_CHARSET.length]; + return `scenario:${ts}:${suffix}`; +} + +function getClientIp(req: Request): string { + return ( + req.headers.get('cf-connecting-ip') || + req.headers.get('x-real-ip') || + req.headers.get('x-forwarded-for')?.split(',')[0]?.trim() || + '0.0.0.0' + ); +} + +export default async function handler(req: Request): Promise { + if (req.method !== 'POST') { + return new Response('', { status: 405 }); + } + + const isPro = await isCallerPremium(req); + if (!isPro) { + return new Response(JSON.stringify({ error: 'PRO subscription required' }), { + status: 403, + headers: { 'Content-Type': 'application/json' }, + }); + } + + const url = process.env.UPSTASH_REDIS_REST_URL; + const token = process.env.UPSTASH_REDIS_REST_TOKEN; + if (!url || !token) { + return new Response(JSON.stringify({ error: 'Service temporarily unavailable' }), { + status: 503, + headers: { 'Content-Type': 'application/json' }, + }); + } + + // Per-user rate limit: 10 scenario jobs per user per minute (sliding window via INCR+EXPIRE). + const identifier = getClientIp(req); + const minute = Math.floor(Date.now() / 60_000); + const rateLimitKey = `rate:scenario:${identifier}:${minute}`; + + const rlResp = await fetch(`${url}/pipeline`, { + method: 'POST', + headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' }, + body: JSON.stringify([ + ['INCR', rateLimitKey], + ['EXPIRE', rateLimitKey, 60], + ['LLEN', 'scenario-queue:pending'], + ]), + signal: AbortSignal.timeout(5_000), + }).catch(() => null); + + if (rlResp?.ok) { + const rlResults = (await rlResp.json()) as Array<{ result: number }>; + const count = rlResults[0]?.result ?? 0; + const queueDepth = rlResults[2]?.result ?? 0; + + if (count > 10) { + return new Response(JSON.stringify({ error: 'Rate limit exceeded: 10 scenario jobs per minute' }), { + status: 429, + headers: { + 'Content-Type': 'application/json', + 'Retry-After': '60', + }, + }); + } + + if (queueDepth > 100) { + return new Response(JSON.stringify({ error: 'Scenario queue is at capacity, please try again later' }), { + status: 429, + headers: { + 'Content-Type': 'application/json', + 'Retry-After': '30', + }, + }); + } + } + + let body: Record; + try { + body = await req.json(); + } catch { + return new Response(JSON.stringify({ error: 'Invalid JSON body' }), { + status: 400, + headers: { 'Content-Type': 'application/json' }, + }); + } + + const { scenarioId, iso2 } = body as { scenarioId?: string; iso2?: string }; + + if (!scenarioId || typeof scenarioId !== 'string') { + return new Response(JSON.stringify({ error: 'scenarioId is required' }), { + status: 400, + headers: { 'Content-Type': 'application/json' }, + }); + } + + if (!getScenarioTemplate(scenarioId)) { + return new Response(JSON.stringify({ error: `Unknown scenario: ${scenarioId}` }), { + status: 400, + headers: { 'Content-Type': 'application/json' }, + }); + } + + if (iso2 !== undefined && iso2 !== null && (typeof iso2 !== 'string' || !/^[A-Z]{2}$/.test(iso2))) { + return new Response(JSON.stringify({ error: 'iso2 must be a 2-letter uppercase country code' }), { + status: 400, + headers: { 'Content-Type': 'application/json' }, + }); + } + + const jobId = generateJobId(); + const payload = JSON.stringify({ + jobId, + scenarioId, + iso2: iso2 ?? null, + enqueuedAt: Date.now(), + }); + + const redisResp = await fetch(`${url}/rpush/scenario-queue%3Apending`, { + method: 'POST', + headers: { + Authorization: `Bearer ${token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify([payload]), + signal: AbortSignal.timeout(5_000), + }); + + if (!redisResp.ok) { + console.error('[scenario/run] Redis enqueue failed:', redisResp.status); + return new Response(JSON.stringify({ error: 'Failed to enqueue scenario job' }), { + status: 502, + headers: { 'Content-Type': 'application/json' }, + }); + } + + return new Response( + JSON.stringify({ jobId, status: 'pending', statusUrl: `/api/scenario/v1/status?jobId=${jobId}` }), + { + status: 202, + headers: { 'Content-Type': 'application/json' }, + }, + ); +} diff --git a/api/scenario/v1/status.ts b/api/scenario/v1/status.ts new file mode 100644 index 000000000..6493d61bf --- /dev/null +++ b/api/scenario/v1/status.ts @@ -0,0 +1,77 @@ +export const config = { runtime: 'edge' }; + +import { isCallerPremium } from '../../../server/_shared/premium-check'; + +/** Matches jobIds produced by run.ts: "scenario:{timestamp}:{8-char-suffix}" */ +const JOB_ID_RE = /^scenario:\d{13}:[a-z0-9]{8}$/; + +export default async function handler(req: Request): Promise { + if (req.method !== 'GET') { + return new Response('', { status: 405 }); + } + + const isPro = await isCallerPremium(req); + if (!isPro) { + return new Response(JSON.stringify({ error: 'PRO subscription required' }), { + status: 403, + headers: { 'Content-Type': 'application/json' }, + }); + } + + const { searchParams } = new URL(req.url); + const jobId = searchParams.get('jobId'); + + if (!jobId || !JOB_ID_RE.test(jobId)) { + return new Response( + JSON.stringify({ error: 'Invalid or missing jobId' }), + { status: 400, headers: { 'Content-Type': 'application/json' } }, + ); + } + + const url = process.env.UPSTASH_REDIS_REST_URL; + const token = process.env.UPSTASH_REDIS_REST_TOKEN; + if (!url || !token) { + return new Response( + JSON.stringify({ error: 'Service temporarily unavailable' }), + { status: 503, headers: { 'Content-Type': 'application/json' } }, + ); + } + + const resultKey = `scenario-result:${jobId}`; + const redisResp = await fetch(`${url}/get/${encodeURIComponent(resultKey)}`, { + headers: { Authorization: `Bearer ${token}` }, + signal: AbortSignal.timeout(5_000), + }); + + if (!redisResp.ok) { + console.error('[scenario/status] Redis get failed:', redisResp.status); + return new Response( + JSON.stringify({ error: 'Failed to fetch job status' }), + { status: 502, headers: { 'Content-Type': 'application/json' } }, + ); + } + + const data = (await redisResp.json()) as { result?: string | null }; + + if (!data.result) { + return new Response( + JSON.stringify({ jobId, status: 'pending' }), + { status: 200, headers: { 'Content-Type': 'application/json' } }, + ); + } + + let parsed: unknown; + try { + parsed = JSON.parse(data.result); + } catch { + return new Response( + JSON.stringify({ error: 'Corrupted job result' }), + { status: 500, headers: { 'Content-Type': 'application/json' } }, + ); + } + + return new Response( + JSON.stringify(parsed), + { status: 200, headers: { 'Content-Type': 'application/json' } }, + ); +} diff --git a/api/scenario/v1/templates.ts b/api/scenario/v1/templates.ts new file mode 100644 index 000000000..7aed2069e --- /dev/null +++ b/api/scenario/v1/templates.ts @@ -0,0 +1,27 @@ +export const config = { runtime: 'edge' }; + +import { SCENARIO_TEMPLATES } from '../../../server/worldmonitor/supply-chain/v1/scenario-templates'; + +export default async function handler(req: Request): Promise { + if (req.method !== 'GET') { + return new Response('', { status: 405 }); + } + + const templates = SCENARIO_TEMPLATES.map(t => ({ + id: t.id, + name: t.name, + affectedChokepointIds: t.affectedChokepointIds, + disruptionPct: t.disruptionPct, + durationDays: t.durationDays, + affectedHs2: t.affectedHs2, + costShockMultiplier: t.costShockMultiplier, + })); + + return new Response(JSON.stringify({ templates }), { + status: 200, + headers: { + 'Content-Type': 'application/json', + 'Cache-Control': 'public, max-age=3600', + }, + }); +} diff --git a/docs/plans/2026-04-09-001-feat-worldwide-supply-chain-routing-intelligence-plan.md b/docs/plans/2026-04-09-001-feat-worldwide-supply-chain-routing-intelligence-plan.md new file mode 100644 index 000000000..b5ad4c19c --- /dev/null +++ b/docs/plans/2026-04-09-001-feat-worldwide-supply-chain-routing-intelligence-plan.md @@ -0,0 +1,980 @@ +--- +title: "feat: Worldwide Supply Chain Routing Intelligence β€” UI + Scenario Engine" +type: feat +status: active +date: 2026-04-09 +origin: docs/brainstorms/2026-04-09-worldwide-shipping-intelligence-requirements.md +--- + +## Sprint Status (updated 2026-04-10) + +| Sprint | Scope | PR | Status | +|---|---|---|---| +| 0–2 | Backend: bypass corridors, exposure seeder, chokepoint index | β€” | βœ… Merged | +| A | Supply Chain Panel UI: bypass cards, sector exposure, war risk badges | β€” | ⏳ Not started | +| B | Map Arc Intelligence: disruption-score arc coloring + arc click popup | β€” | ⏳ Not started | +| C | Scenario Engine: templates, job API, Railway worker, map activation | #2890 | πŸ” Review β€” ready to merge | +| D | Sector Dependency RPC + Vendor API + Sprint C visual deferrals | β€” | ⏳ Not started | + +### Sprint C β€” What shipped (PR #2890) +- `api/scenario/v1/run.ts` β€” PRO-gated edge function, RPUSH to `scenario-queue:pending` +- `api/scenario/v1/status.ts` β€” polling endpoint, `pending | processing | done | failed` +- `api/scenario/v1/templates.ts` β€” public discovery endpoint (no PRO gate) +- `scripts/scenario-worker.mjs` β€” always-on Railway worker, BLMOVE atomic FIFO dequeue, pipeline Redis reads, SIGTERM handler, startup orphan drain +- `server/worldmonitor/supply-chain/v1/scenario-templates.ts` β€” authoritative template registry +- `src/config/scenario-templates.ts` β€” type-only shim for src/ consumers +- `src/components/MapContainer.ts` β€” `activateScenario()` / `deactivateScenario()` +- `src/components/DeckGLMap.ts` β€” `setScenarioState()`, arc orange recolor for disrupted routes + +### Sprint C β€” Deferred to Sprint D +- **Globe + SVG renderer scenario state** β€” `activateScenario()` only dispatches to DeckGL; globe and SVG overlays need country-highlight choropleth layer +- **Tariff-shock visual** (`us-tariff-escalation-electronics`) β€” `affectedChokepointIds: []` means no arc recoloring; correct visualization is a country-heat overlay; `affectedIso2s` is already in `ScenarioVisualState` for Sprint D to consume +- **Panel UI** (trigger button, scenario summary card, dismiss) β€” Sprint A/D will add the UI surface that calls `run.ts` and renders results +- **Scenario tests** β€” unit + integration tests for endpoints, worker, and map activation path + +# feat: Worldwide Supply Chain Routing Intelligence β€” UI + Scenario Engine + +## Overview + +WorldMonitor's supply chain backend (Sprints 0–2) is complete: bypass corridors config, chokepoint exposure seeder, bypass RPC, cost-shock RPC, and chokepoint index RPC are all live and PRO-gated. What remains is the UI layer that surfaces this data in the panel and map, plus the async scenario engine. + +This plan covers four implementation sprints in priority order: +- **Sprint A** β€” Supply Chain Panel UI: bypass cards + sector exposure + war risk badges +- **Sprint B** β€” Map Arc Intelligence: disruption-score arc coloring + arc click β†’ breakdown +- **Sprint C** β€” Scenario Engine: templates config + async job API + Railway worker + map activation +- **Sprint D** β€” Sector Dependency RPC + Vendor API + +Reference: `gcc-optimal-shipping-routes.vercel.app` was the product reference app (fully analyzed 2026-04-09). See `docs/internal/worldmonitor-global-shipping-intelligence-roadmap.md` for the full 5-sprint roadmap. + +--- + +## Problem Statement + +The backend intelligence (bypass corridors, cost shock, chokepoint exposure) exists in Redis but nothing surfaces it to users. The supply chain panel shows chokepoints with disruption scores but no bypass options, no sector exposure breakdown, and no war risk tier badge. The map arcs are static blue β€” no disruption coloring. The scenario engine doesn't exist. + +Users cannot answer "if Hormuz closes, what are my options?" from the WorldMonitor UI today. + +--- + +## Scope Boundaries (from origin doc) + +**In scope (v1):** +- HS2-sector granularity (not HS6 β€” that's v2) +- 6 Comtrade reporters: US, China, Russia, Iran, India, Taiwan +- 13 chokepoints +- ~40 bypass corridors (already in `bypass-corridors.ts`) +- Energy shock model for HS27 only; other sectors return `null` with explanation + +**Out of scope (v2):** +- Full HS6 global coverage (195 Γ— 5000+ products) +- AI Strategic Advisor sidebar +- HS6 product selector with 300+ items +- LOCODE port support in vendor API + +--- + +## Technical Approach + +### Architecture + +``` +Supply Chain Panel (SupplyChainPanel.ts) + └─ Chokepoint card (expanded) + β”œβ”€ PRO: Bypass Options section [NEW Sprint A] + β”œβ”€ PRO: War Risk Tier badge [NEW Sprint A] + └─ PRO: HS2 Ring Chart [NEW Sprint A β€” also in MapPopup] + +Country Deep Dive Panel (CountryDeepDivePanel.ts) + └─ Sector Exposure Card [NEW Sprint A] + β”œβ”€ Top 3 chokepoints by exposure % + β”œβ”€ $ at risk per chokepoint + └─ PRO gate (applyProGate) + +DeckGLMap.ts + └─ Trade Routes Arc Layer [EXTEND Sprint B] + β”œβ”€ Color bound to disruption score (green/yellow/red) + └─ Arc click β†’ popup breakdown (PRO-gated) + +MapContainer.ts + └─ activateScenario(scenarioId) [NEW Sprint C] + β”œβ”€ Dispatch state to all 3 renderers + β”œβ”€ DeckGL: pulsing chokepoints + arc shift + └─ SVG + Globe: visual state update + +api/scenario/v1/run.ts [NEW Sprint C] +api/scenario/v1/status.ts [NEW Sprint C] +scripts/scenario-worker.mjs [NEW Sprint C] +src/config/scenario-templates.ts [NEW Sprint C] + +server/.../get-sector-dependency.ts [NEW Sprint D] +api/v2/shipping/route-intelligence.ts [NEW Sprint D] +api/shipping-webhook.ts [NEW Sprint D] +``` + +### Key Patterns to Follow (from research) + +1. **Section cards in CountryDeepDivePanel**: Use `sectionCard(title, helpText?)` β†’ `[card, body]` tuple. Append to `bodyGrid`. PRO gate: `applyProGate()` + `subscribeAuthState()` + `trackGateHit('feature')`. + +2. **TransitChart post-render mount**: Use `MutationObserver` observing `this.content` with `{ childList: true, subtree: true }`. 220ms `setTimeout` fallback. See `SupplyChainPanel.ts:134-158` for exact pattern. + +3. **Arc layer coloring**: Extend existing `createTradeRoutesLayer()` in `DeckGLMap.ts:4959`. `colorFor(status)` already maps `'disrupted'/'high_risk'/'active'` to RGBA tuples. Bind to chokepoint disruption score: score > 70 β†’ `'disrupted'`, 30–70 β†’ `'high_risk'`, < 30 β†’ `'active'`. + +4. **MapContainer state dispatch**: Store callback refs like `cachedOnStateChanged`. Use `setLayers()` to broadcast across all 3 renderers. New `activateScenario()` follows the same pattern. + +5. **PRO gate**: `isCallerPremium(ctx.request)` already in all server RPCs. Client-side: `hasPremiumAccess(getAuthState())` for immediate check, `subscribeAuthState(state => ...)` for reactive. + +6. **Async jobs** (no prior pattern in repo): Redis queue via `RPUSH scenario-queue:pending` on enqueue, `BLMOVE scenario-queue:pending scenario-queue:processing LEFT RIGHT` on dequeue. + +7. **country-port-clusters.json**: Referenced by `seed-hs2-chokepoint-exposure.mjs:54` but not yet created. Must exist before the seeder works correctly in prod. + +--- + +## Implementation Units + +### Sprint A β€” Supply Chain Panel UI + +#### A1: country-port-clusters.json + +**Goal:** Create the static config file `scripts/shared/country-port-clusters.json` that maps each iso2 to `{ nearestRouteIds, coastSide }`. Referenced by `seed-hs2-chokepoint-exposure.mjs` but not yet present. + +**Files:** +- `scripts/shared/country-port-clusters.json` β€” new file, ~195 country entries + +**Approach:** +- Map each country's ISO2 to the nearest named trade route IDs from `src/config/trade-routes.ts` and a coast side (`atlantic | pacific | indian | med | multi | landlocked`). +- Cover all 195 UN member states + major territories. +- Example entry: + ```json + { + "US": { "nearestRouteIds": ["transpacific", "transatlantic"], "coastSide": "multi" }, + "JP": { "nearestRouteIds": ["far-east-europe", "transpacific"], "coastSide": "pacific" }, + "SA": { "nearestRouteIds": ["indian-ocean-gulf", "red-sea"], "coastSide": "indian" }, + "DE": { "nearestRouteIds": ["transatlantic", "northern-europe"], "coastSide": "atlantic" } + } + ``` +- For landlocked countries: `{ "nearestRouteIds": [], "coastSide": "landlocked" }`. + +**Patterns to follow:** +- `src/config/chokepoint-registry.ts` β€” static config shape +- `seed-hs2-chokepoint-exposure.mjs:54` β€” import usage site + +**Verification:** +- All 195 countries present in the JSON with valid `nearestRouteIds` (array, may be empty for landlocked) +- `coastSide` is one of: `atlantic | pacific | indian | med | multi | landlocked` +- No duplicate entries +- `node -e "const d=require('./scripts/shared/country-port-clusters.json'); console.log(Object.keys(d).length)"` prints β‰₯ 195 + +--- + +#### A2: War Risk Tier Badge in SupplyChainPanel Chokepoint Cards + +**Goal:** Each expanded chokepoint card in `SupplyChainPanel.ts` shows a war risk tier badge derived from `cp.warRiskTier`. This is **free** β€” uses existing data in `GetChokepointStatusResponse`. + +**Files:** +- `src/components/SupplyChainPanel.ts` β€” add badge rendering in `renderChokepoints()` +- `src/styles/supply-chain-panel.css` β€” add `.sc-war-risk-badge` styles + +**Approach:** +- In `renderChokepoints()`, after the disruption score line, add: + ```ts + const tier = cp.warRiskTier ?? 'WAR_RISK_TIER_NORMAL'; + const tierLabel: Record = { + WAR_RISK_TIER_WAR_ZONE: 'War Zone', WAR_RISK_TIER_CRITICAL: 'Critical', + WAR_RISK_TIER_HIGH: 'High', WAR_RISK_TIER_ELEVATED: 'Elevated', + WAR_RISK_TIER_NORMAL: 'Normal', + }; + const tierClass: Record = { + WAR_RISK_TIER_WAR_ZONE: 'war', WAR_RISK_TIER_CRITICAL: 'critical', + WAR_RISK_TIER_HIGH: 'high', WAR_RISK_TIER_ELEVATED: 'elevated', + WAR_RISK_TIER_NORMAL: 'normal', + }; + ``` +- Badge: `${tierLabel[tier]}` +- CSS: red for `war/critical`, orange for `high/elevated`, grey for `normal`. +- Free β€” no `isCallerPremium` check needed; `warRiskTier` is already in the public chokepoint response. + +**Patterns to follow:** +- Existing disruption score badge in `SupplyChainPanel.ts` +- CSS from `src/styles/chokepoint-card.css` + +**Verification:** +- Bab el-Mandeb card shows "War Zone" badge +- Normal chokepoints show "Normal" badge in muted grey +- Badge visible to free and PRO users alike + +--- + +#### A3: Bypass Options Section in SupplyChainPanel Chokepoint Card + +**Goal:** When a chokepoint card is expanded in `SupplyChainPanel.ts`, show top 3 bypass options. PRO-gated. + +**Files:** +- `src/components/SupplyChainPanel.ts` β€” add bypass section to expanded chokepoint card +- `src/services/supply-chain/index.ts` β€” `fetchBypassOptions` already exists +- `src/styles/supply-chain-panel.css` β€” add `.sc-bypass-*` styles + +**Approach:** +- In `renderChokepoints()`, for the expanded card, add a bypass section below the transit chart: + ```ts + const bypassSection = document.createElement('div'); + bypassSection.className = 'sc-bypass-section'; + ``` +- Call `fetchBypassOptions(cp.id, 'container', 100)` when card expands (same trigger as TransitChart mount). +- Render a 3-row table: `Name | +Days | +$/ton | Risk` +- PRO gate: if `!hasPremiumAccess(getAuthState())`, render a locked placeholder: + ```html +
+ πŸ”’ + Bypass corridors available with PRO + +
+ ``` +- Subscribe auth state: `subscribeAuthState(state => applyProGate(hasPremiumAccess(state)))`. +- `trackGateHit('bypass-corridors')` on initial non-PRO impression. +- Loading state: show skeleton while fetching. +- Error state: "Bypass data unavailable" (don't crash). + +**Patterns to follow:** +- `SupplyChainPanel.ts:134-158` β€” MutationObserver + 220ms fallback for TransitChart; same trigger for bypass fetch +- `src/app/event-handlers.ts:1027-1032` β€” `applyProGate` + `subscribeAuthState` pattern +- `fetchBypassOptions` signature at `src/services/supply-chain/index.ts:122-133` + +**Verification:** +- PRO user expanding Suez card sees β‰₯ 1 bypass option ("Cape of Good Hope") +- Hormuz card shows β‰₯ 3 options +- Free user sees locked placeholder with upgrade CTA +- `trackGateHit('bypass-corridors')` fires on free user card expand (verify via analytics debug log) + +--- + +#### A4: Sector Exposure Card in CountryDeepDivePanel + +**Goal:** Add a "Trade Exposure" section card to `CountryDeepDivePanel.ts` showing the country's top 3 chokepoints by HS2 exposure %. PRO-gated. + +**Files:** +- `src/components/CountryDeepDivePanel.ts` β€” new `updateTradeExposure(data)` method + section card +- `src/app/country-intel.ts` β€” new `getCountryChokepointIndex` call site +- `src/styles/cdp.css` β€” add `.cdp-trade-exposure-*` styles + +**Approach:** +- In `CountryDeepDivePanel.ts`, add private field `private tradeExposureBody: HTMLElement | null = null`. +- In `buildLayout()`, create section card: + ```ts + const [tradeCard, tradeBody] = this.sectionCard( + 'Trade Exposure', + 'Chokepoints most critical to this country\'s imports by sector', + 'trade-exposure' + ); + this.tradeExposureBody = tradeBody; + bodyGrid.append(tradeCard); + ``` +- Public method `updateTradeExposure(data: GetCountryChokepointIndexResponse | null)`: + - If not PRO or `data == null` or `data.exposures.length === 0`: `this.tradeExposureBody?.parentElement?.remove()`. + - Otherwise, render 3-row exposure table: + ```html + + + + + + +
{chokepointName}{exposureScore.toFixed(1)}%
+ ``` + - Show `vulnerabilityIndex` as an overall score: `
Vulnerability: {Math.round(data.vulnerabilityIndex)}/100
`. + - For HS27 (energy): also show cost shock data via `fetchCountryCostShock(iso2, primaryChokepointId)` β€” `coverageDays` + `supplyDeficitPct`. + - Footer: ``. +- Reset: `this.tradeExposureBody = null` in `resetPanelContent()`. +- PRO gate: render locked placeholder for free users; `trackGateHit('shipping-exposure')`. + +**Call site in `country-intel.ts`:** +- After country resolves, call `fetchCountryChokepointIndex(code, '27')`. +- Stale guard: `if (this.getCode() !== code) return`. +- On success: `this.panel?.updateTradeExposure?.(result)`. +- On error: `this.panel?.updateTradeExposure?.(null)`. + +**Patterns to follow:** +- `CountryDeepDivePanel.ts:1286-1327` β€” `bodyGrid.append(cards)` + private body field +- `updateMaritimeActivity` method β€” exact same pattern +- `src/app/country-intel.ts` β€” existing `getCountryPortActivity` call site as template + +**Verification:** +- For US: section card shows top 3 chokepoints with exposure bars (US is seeded reporter) +- For DE: section card removes itself (DE not in v1 seeded reporters) +- For non-PRO user: locked placeholder shown, `trackGateHit('shipping-exposure')` fires +- `this.tradeExposureBody = null` in `resetPanelContent()` prevents stale renders + +--- + +#### A5: HS2 Ring Chart in MapPopup Chokepoint Detail + +**Goal:** In the chokepoint popup (`MapPopup.ts:renderWaterwayPopup()`), add an HS2 sector ring chart showing top sectors by exposure %. PRO-gated. Follows existing TransitChart post-render mount pattern. + +**Files:** +- `src/components/MapPopup.ts` β€” extend `renderWaterwayPopup()` + add `HS2RingChart` mount +- `src/utils/hs2-ring-chart.ts` β€” new mini canvas chart (similar to `transit-chart.ts`) +- `src/styles/map-popup.css` β€” add `.popup-hs2-ring-*` styles + +**Approach:** +- In `renderWaterwayPopup()`, after the transit chart element, add: + ```html + + + ``` +- Post-render: in the `setTimeout` that mounts TransitChart, also mount HS2RingChart: + ```ts + const ringEl = this.popup.querySelector(`[data-hs2-ring="${waterway.chokepointId}"]`); + if (ringEl && isPro) { + const country = getCurrentSelectedCountry(); // from app state + if (country) { + fetchCountryChokepointIndex(country, '27').then(data => { + if (data.exposures.length) new HS2RingChart().mount(ringEl, data.exposures); + }); + } + } + ``` +- `HS2RingChart` (`src/utils/hs2-ring-chart.ts`): canvas-based donut chart. Input: `ChokepointExposureEntry[]`. Renders top 5 sectors as arc slices with `exposureScore` proportions. Labels outside with HS2 chapter names from `hs2-sectors.ts`. +- PRO gate: if not PRO, render a 2-line teaser (``). +- `trackGateHit('chokepoint-sector-ring')` for free users. + +**Patterns to follow:** +- `MapPopup.ts:267-281` β€” TransitChart mount + PRO gate pattern +- `src/utils/transit-chart.ts` β€” `mount(el, data)` interface +- `src/config/hs2-sectors.ts` β€” HS2 label lookup + +**Verification:** +- PRO user clicking Suez popup sees donut chart with top 5 HS2 sectors +- For countries without Comtrade data, chart renders empty state "Sector data unavailable for this country" +- Free user sees 2-line teaser, `trackGateHit('chokepoint-sector-ring')` fires + +--- + +### Sprint B β€” Map Arc Intelligence + +#### B1: Disruption-Score Arc Coloring + +**Goal:** Trade route arcs in `DeckGLMap.ts` are colored by the chokepoint disruption score of routes they transit. + +**Files:** +- `src/components/DeckGLMap.ts` β€” extend `createTradeRoutesLayer()` (line 4959) +- `src/services/supply-chain/index.ts` β€” read from chokepoint status cache + +**Approach:** +- Each `TradeRouteSegment` already has a `status` field. The existing `colorFor(status)` maps `'disrupted'/'high_risk'/'active'` to RGBA tuples. +- Add a new step: when chokepoint status data updates (called from `setChokepointData()`), update each segment's `status`: + ```ts + private refreshTradeRouteStatus(chokepoints: ChokepointInfo[]): void { + const scoreMap = new Map(chokepoints.map(cp => [cp.id, cp.disruptionScore ?? 0])); + this.tradeRouteSegments = this.tradeRouteSegments.map(seg => ({ + ...seg, + status: seg.waypointChokepointIds + .map(id => scoreMap.get(id) ?? 0) + .reduce((max, s) => Math.max(max, s), 0) > 70 ? 'disrupted' + : seg.waypointChokepointIds + .map(id => scoreMap.get(id) ?? 0) + .reduce((max, s) => Math.max(max, s), 0) > 30 ? 'high_risk' : 'active', + })); + this.rerender(); // trigger DeckGL redraw + } + ``` +- This is PRO-gated visually: add a check β€” if not PRO, all segments render as `'active'` (uncolored) regardless. +- PRO users see disruption-reactive arc colors; `trackGateHit('trade-arc-intel')` when free user inspects a colored arc. +- Call `refreshTradeRouteStatus()` inside `setChokepointData()` whenever chokepoint data refreshes. + +**Patterns to follow:** +- `DeckGLMap.ts:4959-4979` β€” `createTradeRoutesLayer()` exact structure +- `colorFor(status)` pattern already exists β€” just feed it the right `status` string + +**Verification:** +- With Bab el-Mandeb `disruptionScore > 70`, arcs transiting that chokepoint turn red +- Free user sees all arcs in the default blue/active color +- No arc layer rebuild β€” status update triggers `rerender()` only + +--- + +#### B2: Arc Click β†’ Sector Exposure Popup + +**Goal:** PRO users clicking a trade route arc see a mini popup with sector exposure breakdown for the primary chokepoint on that route. + +**Files:** +- `src/components/DeckGLMap.ts` β€” set `pickable: true` on `createTradeRoutesLayer()`; add `onHover`/`onClick` handlers +- `src/components/MapPopup.ts` β€” new `showRouteBreakdown(segment, chokepointData)` method + +**Approach:** +- Set `pickable: true` on the arc layer. +- `onClick` handler: + ```ts + onClick: ({ object }) => { + if (!object) return; + const isPro = hasPremiumAccess(getAuthState()); + if (!isPro) { trackGateHit('trade-arc-intel'); return; } + this.callbacks.onRouteArcClick?.(object); // new callback + } + ``` +- `MapContainer.ts` wires `onRouteArcClick` to `MapPopup.showRouteBreakdown(segment, chokepointData)`. +- `showRouteBreakdown` renders a mini popup: route name, primary chokepoint, disruption score, war risk tier, top 2 HS2 sectors (from last cached `getCountryChokepointIndex` for the selected country). +- Popup closes on outside click (same as existing popup dismiss logic). + +**Patterns to follow:** +- Existing `pickable` arc layers (displacement flows arc layer in `DeckGLMap.ts:4919-4935`) +- `MapPopup` existing `show/hide` and positioning methods + +**Verification:** +- PRO user clicking a red arc over Hormuz sees popup: "Persian Gulf – Hormuz Strait, Disruption: 85, War Zone, Sectors: Energy 60%, Electronics 18%" +- Free user clicking arc β€” no popup, `trackGateHit('trade-arc-intel')` fires +- Popup dismissed on background click + +--- + +### Sprint C β€” Scenario Engine + +#### C1: Scenario Templates Config + +**Goal:** Create `src/config/scenario-templates.ts` with 6 pre-built scenario definitions. + +**Files:** +- `src/config/scenario-templates.ts` β€” new file + +**Approach:** +```ts +// src/config/scenario-templates.ts +export interface ScenarioTemplate { + id: string; + name: string; + description: string; + type: 'conflict' | 'weather' | 'sanctions' | 'tariff_shock' | 'infrastructure' | 'pandemic'; + affectedChokepointIds: string[]; // from chokepoint-registry.ts + disruptionPct: number; // 0-100 + durationDays: number; + affectedHs2?: string[]; // null = all sectors +} + +export const SCENARIO_TEMPLATES: ScenarioTemplate[] = [ + { + id: 'taiwan-strait-full-closure', + name: 'Taiwan Strait Full Closure', + description: 'Complete closure of Taiwan Strait for 30 days β€” electronics and machinery supply chains', + type: 'conflict', + affectedChokepointIds: ['taiwan_strait'], + disruptionPct: 100, + durationDays: 30, + affectedHs2: ['84', '85', '87'], + }, + // ... 5 more +]; +``` + +**Pre-built templates (6):** +1. Taiwan Strait full closure (conflict, 100%, 30d, HS 84/85/87) +2. Suez + Bab-el-Mandeb simultaneous disruption (conflict, 80%, 60d, all sectors) +3. Panama drought β€” 50% capacity (weather, 50%, 90d, all sectors) +4. Hormuz tanker blockade (conflict, 100%, 14d, HS 27 energy) +5. Russia Baltic grain suspension (sanctions, 100%, 180d, HS 10/12 food) +6. US tariff escalation on electronics (tariff_shock, 0% chokepoint but 30% cost shock, 365d, HS 85) + +**Patterns to follow:** `src/config/bypass-corridors.ts` β€” same static typed array pattern + +**Verification:** TypeScript compiles. `SCENARIO_TEMPLATES.length === 6`. Each template references valid `chokepointIds` from `chokepoint-registry.ts`. + +--- + +#### C2: Scenario Job API (Vercel Edge Functions) + +**Goal:** `POST /api/scenario/v1/run` + `GET /api/scenario/v1/status` for async scenario job dispatch. + +**Files:** +- `api/scenario/v1/run.ts` β€” edge function: validate + PRO gate + enqueue + return jobId +- `api/scenario/v1/status.ts` β€” edge function: poll job result from Redis +- `server/worldmonitor/supply-chain/v1/scenario-compute.ts` β€” pure compute function (no I/O) + +**Approach β€” `run.ts`:** +```ts +// api/scenario/v1/run.ts +import { validateApiKey } from '../_api-key'; +import { isCallerPremium } from '../../server/_shared/premium-check'; +import { getRedisCredentials } from '../../server/_shared/redis'; + +export const config = { runtime: 'edge' }; + +export default async function handler(req: Request) { + if (req.method !== 'POST') return new Response('', { status: 405 }); + await validateApiKey(req, { forceKey: false }); // browser auth OK + const isPro = await isCallerPremium(req); + if (!isPro) return new Response(JSON.stringify({ error: 'PRO required' }), { status: 403 }); + + const body = await req.json(); + const { scenarioId, iso2 } = body; // optional iso2 to scope impact + + const jobId = `scenario:${Date.now()}:${Math.random().toString(36).slice(2, 8)}`; + const payload = JSON.stringify({ jobId, scenarioId, iso2: iso2 ?? null, enqueuedAt: Date.now() }); + + const { url, token } = getRedisCredentials(); + await fetch(`${url}/rpush/scenario-queue:pending`, { + method: 'POST', + headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' }, + body: JSON.stringify([payload]), + }); + + return new Response(JSON.stringify({ jobId, status: 'pending' }), { status: 202 }); +} +``` + +**Approach β€” `status.ts`:** +```ts +export default async function handler(req: Request) { + const { searchParams } = new URL(req.url); + const jobId = searchParams.get('jobId'); + if (!jobId || !/^scenario:[0-9]+:[a-z0-9]+$/.test(jobId)) { + return new Response(JSON.stringify({ error: 'invalid jobId' }), { status: 400 }); + } + const result = await getCachedJson(`scenario-result:${jobId}`).catch(() => null); + if (!result) return new Response(JSON.stringify({ status: 'pending' }), { status: 200 }); + return new Response(JSON.stringify(result), { status: 200 }); +} +``` + +**Security:** `jobId` regex-validated to prevent Redis key injection. `forceKey: false` uses browser auth. `validateApiKey(req, { forceKey: true })` would be needed for server-to-server use. + +**Patterns to follow:** +- `api/supply-chain/v1/[rpc].ts` β€” edge function export pattern +- `api/_api-key.js:49` β€” `validateApiKey` with `forceKey` option + +**Verification:** +- `POST /api/scenario/v1/run` with PRO JWT returns `{ jobId, status: 'pending' }`, HTTP 202 +- `POST /api/scenario/v1/run` without PRO returns HTTP 403 +- `GET /api/scenario/v1/status?jobId=invalid` returns HTTP 400 +- `GET /api/scenario/v1/status?jobId={valid but unknown}` returns `{ status: 'pending' }` + +--- + +#### C3: Scenario Worker (Railway) + +**Goal:** Railway worker `scripts/scenario-worker.mjs` that atomically dequeues jobs, runs the scenario compute, writes results to Redis. + +**Files:** +- `scripts/scenario-worker.mjs` β€” new Railway worker + +**Approach:** +```js +// scripts/scenario-worker.mjs +import { getRedisCredentials, loadEnvFile } from './_seed-utils.mjs'; +loadEnvFile(import.meta.url); + +const QUEUE_KEY = 'scenario-queue:pending'; +const PROCESSING_KEY = 'scenario-queue:processing'; +const RESULT_TTL = 86400; // 24h + +async function redisCommand(cmd, args) { + const { url, token } = getRedisCredentials(); + const resp = await fetch(`${url}/${cmd}`, { + method: 'POST', + headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' }, + body: JSON.stringify(args), + signal: AbortSignal.timeout(15_000), + }); + const body = await resp.json(); + return body.result; +} + +async function runWorker() { + console.log('[scenario-worker] listening...'); + while (true) { + // Atomic FIFO dequeue+claim (Redis 6.2+) + const raw = await redisCommand('blmove', [QUEUE_KEY, PROCESSING_KEY, 'LEFT', 'RIGHT', 30]); + if (!raw) continue; // timeout, loop back + + let job; + try { job = JSON.parse(raw); } catch { continue; } + + const { jobId, scenarioId, iso2 } = job; + console.log(`[scenario-worker] processing ${jobId} (${scenarioId})`); + + // Check idempotency + const existing = await redisCommand('get', [`scenario-result:${jobId}`]); + if (existing) { + await redisCommand('lrem', [PROCESSING_KEY, 1, raw]); + continue; + } + + try { + const result = await computeScenario(scenarioId, iso2); + await redisCommand('setex', [`scenario-result:${jobId}`, RESULT_TTL, JSON.stringify({ status: 'done', result, completedAt: Date.now() })]); + } catch (err) { + await redisCommand('setex', [`scenario-result:${jobId}`, RESULT_TTL, JSON.stringify({ status: 'failed', error: err.message })]); + } finally { + await redisCommand('lrem', [PROCESSING_KEY, 1, raw]); + } + } +} + +runWorker().catch(err => { console.error(err); process.exit(1); }); +``` + +**`computeScenario(scenarioId, iso2)`:** +- Loads scenario template from a lightweight copy of `SCENARIO_TEMPLATES` (no TS imports) +- Reads chokepoint status from Redis: `supply_chain:chokepoints:v4` +- For each affected country (all if `iso2 === null`, or just the specified country): + - Reads `supply-chain:exposure:{iso2}:{hs2}:v1` from Redis + - Computes disruption impact: `exposureScore Γ— disruptionPct / 100` + - Ranks by `importValue Γ— adjustedExposure` +- Returns top-20 countries by impact + per-chokepoint bypass options + +**Railway service setup (per `railway-seed-setup` skill):** +- `startCommand`: `node scenario-worker.mjs` +- `rootDirectory`: `scripts` +- `vCPUs: 1`, `memoryGB: 1` +- No cron schedule (always-on worker, not cron) + +**BLMOVE note:** Upstash supports `LMOVE`/`BLMOVE` (Redis 6.2 commands). If unavailable, fallback: Lua script `RPOPLPUSH` equivalent. Test in staging first. + +**Verification:** +- Worker logs "processing {jobId}" and writes `scenario-result:{jobId}` within 30s +- Idempotency: running same jobId twice only writes result once +- On `computeScenario` throw: result has `{ status: 'failed', error }` (no orphaned processing entry) +- Processing list is always cleaned up in `finally` block + +--- + +#### C4: MapContainer.activateScenario() + Visual States + +**Goal:** `MapContainer.activateScenario(scenarioId, result)` broadcasts scenario state to all 3 renderers, triggering visual changes. + +**Files:** +- `src/components/MapContainer.ts` β€” new `activateScenario()` + `deactivateScenario()` methods +- `src/components/DeckGLMap.ts` β€” `setScenarioState(state: ScenarioVisualState | null)` method +- `src/components/SupplyChainPanel.ts` β€” scenario summary card pinned to top of panel + +**Approach β€” `MapContainer.ts`:** +```ts +interface ScenarioVisualState { + disruptedChokepointIds: string[]; + affectedIso2s: string[]; // countries with impact > threshold + impactLevel: 'low' | 'med' | 'high'; // per country +} + +public activateScenario(scenarioId: string, result: ScenarioResult): void { + const isPro = hasPremiumAccess(getAuthState()); + if (!isPro) { trackGateHit('scenario'); return; } + + const state: ScenarioVisualState = { + disruptedChokepointIds: result.affectedChokepointIds, + affectedIso2s: result.topImpactCountries.map(c => c.iso2), + impactLevel: 'high', + }; + this.activeRenderer?.setScenarioState?.(state); // DeckGL + this.svgMap?.setScenarioState?.(state); // SVG + this.globeMap?.setScenarioState?.(state); // Globe (optional, best-effort) + this.panel?.showScenarioSummary?.(scenarioId, result); // panel card +} + +public deactivateScenario(): void { + this.activeRenderer?.setScenarioState?.(null); + this.svgMap?.setScenarioState?.(null); + this.globeMap?.setScenarioState?.(null); + this.panel?.hideScenarioSummary?.(); +} +``` + +**DeckGLMap visual state (`setScenarioState`):** +- For `disruptedChokepointIds`: add pulsing CSS class to chokepoint markers (via existing `setView` mechanism that triggers marker DOM updates, or via DeckGL `ScatterplotLayer` with pulsing `radiusScale` animation) +- For trade route arcs: `status = 'disrupted'` for routes transiting affected chokepoints (orange/red palette) +- For country choropleth fill: countries in `affectedIso2s` get a semi-transparent red tint overlay (new `ScatterplotLayer` or modify existing fill layer) + +**SVG Map visual state:** simpler β€” country fills change to red tint for affected ISO2s. + +**Scenario summary card in `SupplyChainPanel`:** +- Pinned card at top: "⚠️ Taiwan Strait Scenario Β· Top 5 Affected: DE (#1, 42%), FR (#2, 38%)..." +- Dismiss button calls `MapContainer.deactivateScenario()`. + +**Patterns to follow:** +- `MapContainer.ts:756-785` β€” `setOnLayerChange()` broadcasts to all renderers +- `MapContainer.ts:401-405` β€” `setLayers()` broadcast pattern +- State dispatch uses `this.activeRenderer` for the currently visible renderer + +**Verification:** +- `activateScenario('taiwan-strait-full-closure', result)` β†’ Bashi/Miyako arcs turn orange, Taiwan Strait marker pulses +- Panel shows pinned scenario summary card +- `deactivateScenario()` restores all visual state to normal +- Free user calling `activateScenario` β†’ no visual change, `trackGateHit('scenario')` fires + +--- + +### Sprint D β€” Sector Dependency RPC + Vendor API + Sprint C Visual Deferrals + +**Carries over from Sprint C:** +- Scenario panel UI: trigger button, scenario summary card with top-impact country list, dismiss +- Globe renderer: scenario state dispatch in `activateScenario()` β†’ GlobeMap country highlight +- SVG renderer: same dispatch β†’ choropleth overlay +- Tariff-shock visual: country-heat layer using `affectedIso2s` from `ScenarioVisualState` +- Free user PRO gate on scenario activation: `trackGateHit('scenario-engine')` + upgrade CTA +- Scenario integration tests: endpoints, worker mock, map activation path + +#### D1: get-sector-dependency RPC + +**Goal:** New RPC `GetSectorDependency` returns dependency flags (SINGLE_SOURCE_CRITICAL, etc.) for a country + HS2 sector. + +**Files:** +- `server/worldmonitor/supply-chain/v1/get-sector-dependency.ts` β€” new handler +- `proto/worldmonitor/supply_chain/v1/get_sector_dependency.proto` β€” new proto +- `proto/worldmonitor/supply_chain/v1/service.proto` β€” register new RPC +- `server/worldmonitor/supply-chain/v1/handler.ts` β€” register handler +- `api/supply-chain/v1/[rpc].ts` β€” routed automatically +- `src/generated/...` β€” run `make generate` after proto changes +- `server/_shared/cache-keys.ts` β€” add `SECTOR_DEPENDENCY_KEY` +- `api/bootstrap.js` β€” NOT added (request-varying, excluded from bootstrap) + +**Proto:** +```protobuf +message GetSectorDependencyRequest { + string iso2 = 1; + string hs2 = 2; +} + +message GetSectorDependencyResponse { + string iso2 = 1; + string hs2 = 2; + string hs2_label = 3; + repeated DependencyFlag flags = 4; + string primary_exporter_iso2 = 5; + double primary_exporter_share = 6; // 0-1 + string primary_chokepoint_id = 7; + double primary_chokepoint_exposure = 8; // 0-100 + bool has_viable_bypass = 9; + string fetched_at = 10; +} + +enum DependencyFlag { + DEPENDENCY_FLAG_UNSPECIFIED = 0; + DEPENDENCY_FLAG_SINGLE_SOURCE_CRITICAL = 1; // >80% from 1 exporter + DEPENDENCY_FLAG_SINGLE_CORRIDOR_CRITICAL = 2; // >80% via 1 chokepoint, no bypass + DEPENDENCY_FLAG_COMPOUND_RISK = 3; // both of the above + DEPENDENCY_FLAG_DIVERSIFIABLE = 4; // bypass exists + multiple exporters +} +``` + +**Server logic:** +1. `isCallerPremium` guard +2. Read `supply-chain:exposure:{iso2}:{hs2}:v1` from Redis (seeded by `seed-hs2-chokepoint-exposure.mjs`) +3. Read top exporter from Comtrade data (`comtrade:flows:{numericCode}:2709` pattern) +4. Read bypass options for primary chokepoint via `BYPASS_CORRIDORS_BY_CHOKEPOINT` +5. Compute flags: `primaryExporterShare > 0.8` β†’ `SINGLE_SOURCE_CRITICAL`, `primaryChokepointExposure > 80 && !hasViableBypass` β†’ `SINGLE_CORRIDOR_CRITICAL`, both β†’ `COMPOUND_RISK`, has bypass + exporters β†’ `DIVERSIFIABLE` + +**Cache:** `supply-chain:sector-dep:{iso2}:{hs2}:v1` with TTL 86400 (24h). + +**Verification:** +- Japan HS2=85 (electronics): flags `SINGLE_CORRIDOR_CRITICAL` (Taiwan Strait) +- US HS2=27 (energy): flags `DIVERSIFIABLE` (IEA stocks + multiple suppliers) +- 4-file checklist: `cache-keys.ts` βœ“, handler registration βœ“, health.js βœ“ (not bootstrap) +- `make generate` runs cleanly + +--- + +#### D2: Vendor REST API (Route Intelligence) + +**Goal:** `GET /api/v2/shipping/route-intelligence` β€” authenticated endpoint returning route + disruption + bypass for a given country pair. + +**Files:** +- `api/v2/shipping/route-intelligence.ts` β€” new edge function +- `api/v2/shipping/webhooks.ts` β€” new HMAC webhook registration endpoint + +**Route Intelligence API:** +``` +GET /api/v2/shipping/route-intelligence + X-WorldMonitor-Key: + ?fromIso2=US&toIso2=JP&cargoType=container&hs2=85 +``` + +```ts +export const config = { runtime: 'edge' }; +export default async function handler(req: Request) { + await validateApiKey(req, { forceKey: true }); // vendor MUST send key + // ... build response from chokepoint status + bypass options +} +``` + +Response shape: +```json +{ + "fromIso2": "US", + "toIso2": "JP", + "primaryRouteId": "transpacific", + "chokepointExposures": [{ "chokepointId": "taiwan_strait", "exposurePct": 60 }], + "bypassOptions": [...], + "warRiskTier": "WAR_RISK_TIER_ELEVATED", + "disruptionScore": 45, + "fetchedAt": "2026-04-09T..." +} +``` + +**Webhook registration (`api/v2/shipping/webhooks.ts`):** +- `POST`: register `{ callbackUrl, chokepointIds[], alertThreshold }` β†’ returns `{ subscriberId, secret }` +- `GET /{subscriberId}`: status check +- `POST /{subscriberId}/rotate-secret`: secret rotation (old valid 10min) +- `POST /{subscriberId}/reactivate`: re-enable after suspension + +**SSRF prevention (mandatory, from roadmap):** +- Resolve `callbackUrl` hostname before each webhook delivery +- Reject private IPs: `127.x`, `10.x`, `192.168.x`, `172.16.0.0/12`, `169.254.x.x` +- Reject metadata: `169.254.169.254`, `fd00:ec2::254` +- No redirects to blocked targets + +**HMAC signature:** `X-WM-Signature: sha256=` + +**Verification:** +- `GET /api/v2/shipping/route-intelligence` without key returns HTTP 401 +- `GET /api/v2/shipping/route-intelligence?fromIso2=US&toIso2=JP&hs2=85` with valid key returns non-empty response +- Webhook registration with `callbackUrl: http://169.254.169.254/` is rejected with 400 + +--- + +## System-Wide Impact + +### Interaction Graph + +``` +user expands chokepoint card (SupplyChainPanel) + β†’ fetchBypassOptions(chokepointId) β†’ /api/supply-chain/v1/get-bypass-options β†’ Redis + β†’ isCallerPremium(request) β†’ Convex auth check + β†’ BYPASS_CORRIDORS_BY_CHOKEPOINT config β†’ getCachedJson('supply_chain:chokepoints:v4') + +user opens country panel (CountryDeepDivePanel) + β†’ country-intel.ts fetchCountryChokepointIndex(iso2, '27') + β†’ /api/supply-chain/v1/get-country-chokepoint-index β†’ Redis seed key + β†’ if result: updateTradeExposure(result) β†’ DOM render + β†’ if HS27 + PRO: fetchCountryCostShock(iso2, primaryCp) β†’ /api/supply-chain/v1/get-country-cost-shock + +user clicks arc (DeckGLMap, PRO) + β†’ MapContainer.onRouteArcClick(segment) + β†’ MapPopup.showRouteBreakdown(segment, chokepointData) + β†’ reads last cached fetchCountryChokepointIndex (in-memory, no new fetch) + +scenario run (PRO user) + β†’ POST /api/scenario/v1/run β†’ RPUSH scenario-queue:pending + β†’ scenario-worker.mjs (Railway) β†’ BLMOVE β†’ computeScenario() + β†’ SETEX scenario-result:{jobId} + β†’ client polls GET /api/scenario/v1/status?jobId=X (every 2s, max 30s) + β†’ on done: MapContainer.activateScenario(id, result) β†’ all 3 renderers update +``` + +### Error Propagation + +- `fetchBypassOptions` fails β†’ bypass section shows "Bypass data unavailable" (no crash, no empty white space) +- `fetchCountryChokepointIndex` fails β†’ `updateTradeExposure(null)` removes card from DOM +- Scenario worker crash β†’ `finally` block removes job from processing queue; `scenario-result:{jobId}` never written; poll returns `{ status: 'pending' }` forever (add stale detection in status endpoint: if `enqueuedAt > 10min ago` β†’ `{ status: 'failed', error: 'timeout' }`) +- Redis unavailable β†’ all RPC handlers return graceful empty responses (existing pattern) + +### State Lifecycle Risks + +- Bypass fetch fires on card expand; result is not cached in component state β€” if user collapses+reopens, another fetch fires. Add a per-chokepoint in-memory cache (`Map`) in `SupplyChainPanel`. +- `tradeExposureBody = null` in `resetPanelContent()` is critical β€” without it, updating a closed panel will crash. +- Scenario result TTL is 24h. Stale scenarios are fine (user can re-run). No cleanup needed. + +### API Surface Parity + +- `fetchBypassOptions` already exists in `src/services/supply-chain/index.ts` β€” UI just needs to call it +- `fetchCountryChokepointIndex` already exists β€” same +- New: `fetchSectorDependency` (Sprint D) must be added to `src/services/supply-chain/index.ts` +- Vendor API (`/api/v2/shipping/*`) is separate surface β€” no frontend consumption + +--- + +## Acceptance Criteria + +### Sprint A + +- [ ] Chokepoint card (expanded) shows war risk tier badge (free, no PRO gate) +- [ ] Chokepoint card (expanded, PRO) shows top 3 bypass options with added days + $/ton +- [ ] Free user expanding chokepoint card sees bypass gate + upgrade CTA; `trackGateHit('bypass-corridors')` fires +- [ ] CountryDeepDivePanel for US shows "Trade Exposure" card with β‰₯ 1 chokepoint + exposure bar +- [ ] CountryDeepDivePanel for DE: Trade Exposure card removes itself (not seeded in v1) +- [ ] MapPopup Suez β†’ HS2 ring chart visible for PRO user +- [ ] `resetPanelContent()` sets `tradeExposureBody = null` + +### Sprint B + +- [ ] DeckGLMap arcs for routes through Bab el-Mandeb are red when `disruptionScore > 70` +- [ ] Arc colors update within 2s of chokepoint data refresh (no page reload) +- [ ] Free users see uncolored (default blue) arcs +- [ ] PRO user clicking arc over disrupted chokepoint β†’ mini popup shown +- [ ] `trackGateHit('trade-arc-intel')` fires when free user clicks arc + +### Sprint C + +- [x] `POST /api/scenario/v1/run` (PRO) β†’ HTTP 202 with `jobId` _(PR #2890)_ +- [x] Worker processes job within 30s _(pipeline: ~300ms for targeted scenarios)_ +- [x] `GET /api/scenario/v1/status?jobId=X` returns `{ status: 'done', result }` after completion _(PR #2890)_ +- [x] `MapContainer.activateScenario()` triggers visual changes on DeckGL renderer _(arc orange recolor for physical chokepoint scenarios)_ +- [ ] Panel shows scenario summary card with dismiss button β€” **deferred to Sprint A** +- [ ] Free user activating scenario β†’ no visual change, gate fires β€” **deferred to Sprint A** +- [ ] Tariff-shock + globe/SVG choropleth visual β€” **deferred to Sprint D** + +### Sprint D + +- [ ] `GetSectorDependency` for Japan HS85 returns `SINGLE_CORRIDOR_CRITICAL` +- [ ] `GET /api/v2/shipping/route-intelligence` without key β†’ HTTP 401 +- [ ] Webhook `callbackUrl: 169.254.169.254` rejected with HTTP 400 + +--- + +## Quality Gates + +- [ ] `npm run typecheck` + `npm run typecheck:api` pass with zero errors +- [ ] `npm run test:data` passes for any new RPC +- [ ] `npm run lint` (Biome) passes +- [ ] No `console.error` in browser for normal operation +- [ ] All new PRO gates have corresponding `trackGateHit` call +- [ ] `scripts/shared/country-port-clusters.json` validated: 195+ entries, valid `coastSide` enum + +--- + +## Dependencies + +- `scripts/shared/country-port-clusters.json` (A1) must exist before `seed-hs2-chokepoint-exposure.mjs` can run correctly in production (currently uses fallback empty array) +- Sprint B arc coloring depends on `tradeRouteSegments` having `waypointChokepointIds` populated β€” verify this field exists in the segment type +- Sprint C `BLMOVE` depends on Upstash Redis supporting Redis 6.2+ commands β€” test before deploying worker +- Sprint D proto changes require `make generate` to regenerate TypeScript types before any handler code compiles + +--- + +## Post-Deploy Monitoring & Validation + +- **Log queries**: `[scenario-worker]` prefix in Railway logs; `trackGateHit` events in analytics +- **Redis**: check `LLEN scenario-queue:pending` and `LLEN scenario-queue:processing` β€” both should stay near 0 in steady state +- **Health**: `api/health.js` already monitors `seed-meta:supply_chain:chokepoint-exposure`; add `seed-meta:supply_chain:sector-dep` for D1 +- **Validation window**: Deploy Sprint A first (pure UI, no new RPCs) β†’ monitor for JS errors β†’ deploy Sprints B/C/D sequentially with 48h observation each +- **Failure signal**: Scenario processing queue grows without shrinking β†’ worker crashed; restart Railway worker service + +--- + +## Sources & References + +### Origin + +- **Origin document**: `docs/brainstorms/2026-04-09-worldwide-shipping-intelligence-requirements.md` β€” Key decisions carried forward: (1) HS2 granularity for v1, not HS6; (2) All new analytics PRO-only; (3) Async Railway worker for scenario engine + +### Full Roadmap + +- `docs/internal/worldmonitor-global-shipping-intelligence-roadmap.md` β€” complete 5-sprint roadmap with all technical specs + +### Existing Implementations (Backend β€” All Done) + +- `server/worldmonitor/supply-chain/v1/get-bypass-options.ts` β€” PRO-gated bypass scoring +- `server/worldmonitor/supply-chain/v1/get-country-cost-shock.ts` β€” energy shock RPC +- `server/worldmonitor/supply-chain/v1/get-country-chokepoint-index.ts` β€” exposure index RPC +- `scripts/seed-hs2-chokepoint-exposure.mjs` β€” Redis seeder (needs `country-port-clusters.json`) +- `src/config/bypass-corridors.ts` β€” 40 corridors for 13 chokepoints +- `src/config/chokepoint-registry.ts` β€” canonical 13-ID registry + +### UI Pattern References + +- `src/components/SupplyChainPanel.ts:134-158` β€” MutationObserver + TransitChart mount pattern +- `src/components/CountryDeepDivePanel.ts:1550-1562` β€” `sectionCard()` helper +- `src/components/MapPopup.ts:267-281` β€” PRO-gated TransitChart mount +- `src/components/DeckGLMap.ts:4959-4979` β€” `createTradeRoutesLayer()` arc coloring +- `src/app/event-handlers.ts:1027-1032` β€” `applyProGate` + `subscribeAuthState` pattern +- `src/services/supply-chain/index.ts:111-151` β€” existing RPC client calls + +### Related PRs + +- PR #2805 β€” PortWatch maritime activity (PR C β€” all done) +- PR #2841 β€” chokepoint popup TransitChart post-render mount pattern +- PR #2890 β€” Sprint C: scenario engine (templates, job API, Railway worker, DeckGL activation) β€” **ready to merge** diff --git a/scripts/scenario-worker.mjs b/scripts/scenario-worker.mjs new file mode 100644 index 000000000..d2aec2c93 --- /dev/null +++ b/scripts/scenario-worker.mjs @@ -0,0 +1,437 @@ +#!/usr/bin/env node +// @ts-check +/** + * Scenario Engine Worker β€” always-on Railway service + * + * Atomically dequeues scenario jobs from Redis using BLMOVE (Redis 6.2 / Upstash), + * runs computeScenario(), and writes results back to Redis with a 24-hour TTL. + * + * Railway config: + * rootDirectory: scripts + * startCommand: node scenario-worker.mjs + * vCPUs: 1 / memoryGB: 1 + * cronSchedule: (always-on long-running process) + */ + +import { getRedisCredentials, loadEnvFile } from './_seed-utils.mjs'; + +loadEnvFile(import.meta.url); + +const QUEUE_KEY = 'scenario-queue:pending'; +const PROCESSING_KEY = 'scenario-queue:processing'; +const RESULT_TTL_SECONDS = 86_400; // 24 h +const BLMOVE_TIMEOUT_SECONDS = 30; // block for up to 30s waiting for a job + +/** @typedef {{ jobId: string; scenarioId: string; iso2: string | null; enqueuedAt: number }} ScenarioJob */ + +/** + * Inline copy of SCENARIO_TEMPLATES (no TypeScript import). + * Keep in sync with server/worldmonitor/supply-chain/v1/scenario-templates.ts. + * Worker only needs: id, affectedChokepointIds, disruptionPct, durationDays, affectedHs2, costShockMultiplier. + * + * @type {Array<{ id: string; affectedChokepointIds: string[]; disruptionPct: number; durationDays: number; affectedHs2: string[] | null; costShockMultiplier: number }>} + */ +const SCENARIO_TEMPLATES = [ + { + id: 'taiwan-strait-full-closure', + affectedChokepointIds: ['taiwan_strait'], + disruptionPct: 100, + durationDays: 30, + affectedHs2: ['84', '85', '87'], + costShockMultiplier: 1.45, + }, + { + id: 'suez-bab-simultaneous', + affectedChokepointIds: ['suez', 'bab_el_mandeb'], + disruptionPct: 80, + durationDays: 60, + affectedHs2: null, + costShockMultiplier: 1.35, + }, + { + id: 'panama-drought-50pct', + affectedChokepointIds: ['panama'], + disruptionPct: 50, + durationDays: 90, + affectedHs2: null, + costShockMultiplier: 1.22, + }, + { + id: 'hormuz-tanker-blockade', + affectedChokepointIds: ['hormuz_strait'], + disruptionPct: 100, + durationDays: 14, + affectedHs2: ['27', '29'], + costShockMultiplier: 2.10, + }, + { + id: 'russia-baltic-grain-suspension', + affectedChokepointIds: ['bosphorus', 'dover_strait'], + disruptionPct: 100, + durationDays: 180, + affectedHs2: ['10', '12'], + costShockMultiplier: 1.55, + }, + { + id: 'us-tariff-escalation-electronics', + affectedChokepointIds: [], + disruptionPct: 0, + durationDays: 365, + affectedHs2: ['85'], + costShockMultiplier: 1.50, + }, +]; + +// ──────────────────────────────────────────────────────────────────────────── +// Redis helpers (Upstash REST API) +// ──────────────────────────────────────────────────────────────────────────── + +/** @returns {{ url: string; token: string }} */ +function getCredentials() { + return getRedisCredentials(); +} + +/** + * Execute a raw Redis command via Upstash REST API. + * @param {string} cmd e.g. "blmove" + * @param {unknown[]} args + */ +async function redisCmd(cmd, args) { + const { url, token } = getCredentials(); + const resp = await fetch(`${url}/${cmd}`, { + method: 'POST', + headers: { + Authorization: `Bearer ${token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(args), + signal: AbortSignal.timeout(40_000), // > BLMOVE_TIMEOUT_SECONDS + }); + if (!resp.ok) { + const text = await resp.text().catch(() => ''); + throw new Error(`Redis ${cmd} HTTP ${resp.status}: ${text.slice(0, 200)}`); + } + const body = /** @type {{ result: unknown }} */ (await resp.json()); + return body.result; +} + +/** + * GET a key β€” returns parsed JSON or null. + * @param {string} key + */ +async function redisGet(key) { + const { url, token } = getCredentials(); + const resp = await fetch(`${url}/get/${encodeURIComponent(key)}`, { + headers: { Authorization: `Bearer ${token}` }, + signal: AbortSignal.timeout(10_000), + }); + if (!resp.ok) return null; + const body = /** @type {{ result?: string }} */ (await resp.json()); + return body.result ? JSON.parse(body.result) : null; +} + +/** + * SET a key with TTL (SETEX equivalent). + * @param {string} key + * @param {number} ttl seconds + * @param {string} value serialised JSON string + */ +async function redisSetex(key, ttl, value) { + await redisCmd('setex', [key, ttl, value]); +} + +/** + * Remove the first occurrence of `value` from list `key`. + * @param {string} key + * @param {string} value + */ +async function redisLrem(key, value) { + await redisCmd('lrem', [key, 1, value]); +} + +/** + * Batch-GET multiple keys via a single Upstash pipeline request. + * Returns an array of parsed JSON values (null for missing/unparseable keys). + * @param {string[]} keys + * @returns {Promise>} + */ +async function redisPipelineGet(keys) { + if (keys.length === 0) return []; + const { url, token } = getCredentials(); + const pipeline = keys.map(k => ['GET', k]); + const resp = await fetch(`${url}/pipeline`, { + method: 'POST', + headers: { + Authorization: `Bearer ${token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(pipeline), + signal: AbortSignal.timeout(30_000), + }); + if (!resp.ok) { + const text = await resp.text().catch(() => ''); + throw new Error(`Redis pipeline HTTP ${resp.status}: ${text.slice(0, 200)}`); + } + const results = /** @type {Array<{ result: string | null }>} */ (await resp.json()); + return results.map(r => { + if (!r?.result) return null; + try { return JSON.parse(r.result); } + catch { return null; } + }); +} + +// ──────────────────────────────────────────────────────────────────────────── +// Scenario computation +// ──────────────────────────────────────────────────────────────────────────── + +/** + * Compute the impact of a scenario across countries and HS2 sectors. + * + * Algorithm: + * 1. Resolve the scenario template. + * 2. Read live chokepoint statuses from Redis (supply_chain:chokepoints:v4). + * 3. For each affected chokepoint, scan cached exposure keys for the + * specified iso2 (or all seeded reporters if iso2 is null). + * 4. Compute adjusted impact: exposureScore Γ— (disruptionPct / 100) Γ— costShockMultiplier. + * 5. Return top-20 countries by impact + per-chokepoint metadata. + * + * @param {string} scenarioId + * @param {string | null} iso2 scope to one country, or null = all reporters + * @returns {Promise} + */ +async function computeScenario(scenarioId, iso2) { + const template = SCENARIO_TEMPLATES.find(t => t.id === scenarioId); + if (!template) throw new Error(`Unknown scenario: ${scenarioId}`); + + // Read live chokepoint data for context (best-effort). + // Cache shape: { chokepoints: ChokepointInfo[], fetchedAt, upstreamUnavailable } + const cpData = await redisGet('supply_chain:chokepoints:v4').catch(() => null); + + /** @type {Map} chokepointId β†’ current disruptionScore */ + const currentScores = new Map(); + const cpArray = Array.isArray(cpData?.chokepoints) ? cpData.chokepoints : []; + for (const cp of cpArray) { + if (cp?.id && typeof cp.disruptionScore === 'number') { + currentScores.set(cp.id, cp.disruptionScore); + } + } + + // The reporters seeded in v1 (US, China, Russia, Iran, India, Taiwan) + const SEEDED_REPORTERS = ['US', 'CN', 'RU', 'IR', 'IN', 'TW']; + const reportersToCheck = iso2 ? [iso2] : SEEDED_REPORTERS; + + /** @type {Array<{ iso2: string; hs2: string; exposureScore: number; adjustedImpact: number; chokepointId: string }>} */ + const impacts = []; + + // Tariff-shock scenarios have no physical chokepoint closure (affectedChokepointIds: []). + // They affect all countries that trade the targeted HS2 sectors regardless of route. + const isTariffShock = template.affectedChokepointIds.length === 0; + + // Hoist hs2Chapters outside the reporter loop β€” depends only on template, not reporter. + const hs2Chapters = template.affectedHs2 ?? Array.from({ length: 99 }, (_, i) => String(i + 1).padStart(2, '0')); + + // Build all keys upfront for a single pipeline GET (avoids NΓ—M sequential requests). + /** @type {string[]} */ + const allKeys = []; + for (const reporter of reportersToCheck) { + for (const hs2 of hs2Chapters) { + allKeys.push(`supply-chain:exposure:${reporter}:${hs2}:v1`); + } + } + + // Single pipeline call replaces the nested sequential redisGet() calls. + const pipelineResults = await redisPipelineGet(allKeys); + + // Process results with the same tariff-shock vs regular logic. + let idx = 0; + for (const reporter of reportersToCheck) { + for (const hs2 of hs2Chapters) { + const data = /** @type {{ iso2?: string; hs2?: string; exposures?: Array<{ chokepointId: string; exposureScore: number }>; vulnerabilityIndex?: number } | null} */ (pipelineResults[idx++]); + if (!data || !Array.isArray(data.exposures)) continue; + + if (isTariffShock) { + // Tariff shock: all reporters trading this HS2 sector are impacted. + // Use vulnerabilityIndex as a proxy for overall trade exposure. + const vulnScore = typeof data.vulnerabilityIndex === 'number' ? data.vulnerabilityIndex : 0; + if (vulnScore > 0) { + const adjustedImpact = vulnScore * template.costShockMultiplier; + impacts.push({ iso2: reporter, hs2, exposureScore: vulnScore, adjustedImpact, chokepointId: 'tariff' }); + } + continue; + } + + for (const entry of data.exposures) { + if (!entry?.chokepointId || typeof entry.exposureScore !== 'number') continue; + // Only count chokepoints that this scenario actually disrupts + if (!template.affectedChokepointIds.includes(entry.chokepointId)) continue; + + const exposureScore = entry.exposureScore; + // adjustedImpact: exposureScore Γ— disruptionPct% Γ— costShockMultiplier + // (No importValue in cache β€” relative ranking by score is sufficient for v1) + const adjustedImpact = exposureScore * (template.disruptionPct / 100) * template.costShockMultiplier; + + if (exposureScore > 0) { + impacts.push({ iso2: reporter, hs2, exposureScore, adjustedImpact, chokepointId: entry.chokepointId }); + } + } + } + } + + // Aggregate by country + /** @type {Map} iso2 β†’ total adjusted impact */ + const byCountry = new Map(); + for (const item of impacts) { + byCountry.set(item.iso2, (byCountry.get(item.iso2) ?? 0) + item.adjustedImpact); + } + + const sorted = [...byCountry.entries()].sort((a, b) => b[1] - a[1]).slice(0, 20); + const maxImpact = Math.max(sorted[0]?.[1] ?? 0, 1); + const topImpactCountries = sorted.map(([countryIso2, totalImpact]) => ({ + iso2: countryIso2, + totalImpact, + // Relative share of the worst-hit country, capped at 100 + impactPct: Math.min(Math.round((totalImpact / maxImpact) * 100), 100), + })); + + return { + scenarioId, + template: { + name: template.affectedChokepointIds.join('+') || 'tariff_shock', + disruptionPct: template.disruptionPct, + durationDays: template.durationDays, + costShockMultiplier: template.costShockMultiplier, + }, + affectedChokepointIds: template.affectedChokepointIds, + currentDisruptionScores: Object.fromEntries( + template.affectedChokepointIds.map(id => [id, currentScores.get(id) ?? null]), + ), + topImpactCountries, + affectedHs2: template.affectedHs2, + scopedIso2: iso2, + computedAt: Date.now(), + }; +} + +// ──────────────────────────────────────────────────────────────────────────── +// Orphan drain + SIGTERM handling +// ──────────────────────────────────────────────────────────────────────────── + +let shuttingDown = false; + +process.on('SIGTERM', () => { + shuttingDown = true; +}); + +/** + * At startup, requeue any jobs left in the processing list from a previous crash. + */ +async function requeueOrphanedJobs() { + let moved; + let count = 0; + do { + moved = await redisCmd('lmove', [PROCESSING_KEY, QUEUE_KEY, 'RIGHT', 'LEFT']).catch(() => null); + if (moved) count++; + } while (moved); + if (count > 0) console.log(`[scenario-worker] requeued ${count} orphaned jobs`); +} + +// ──────────────────────────────────────────────────────────────────────────── +// Job payload validation +// ──────────────────────────────────────────────────────────────────────────── + +const JOB_ID_RE = /^scenario:\d{13}:[a-z0-9]{8}$/; + +// ──────────────────────────────────────────────────────────────────────────── +// Main worker loop +// ──────────────────────────────────────────────────────────────────────────── + +async function runWorker() { + console.log('[scenario-worker] starting β€” listening on scenario-queue:pending'); + + await requeueOrphanedJobs(); + + while (!shuttingDown) { + let raw; + try { + // Atomic FIFO dequeue+claim: moves item from pending β†’ processing + // BLMOVE source destination LEFT RIGHT timeout (Upstash / Redis 6.2+) + raw = await redisCmd('blmove', [QUEUE_KEY, PROCESSING_KEY, 'LEFT', 'RIGHT', BLMOVE_TIMEOUT_SECONDS]); + } catch (err) { + console.error('[scenario-worker] BLMOVE error:', err.message); + // Brief pause before retrying to avoid hot-loop on connectivity issues + await new Promise(r => setTimeout(r, 5_000)); + continue; + } + + if (!raw) continue; // timeout β€” no job in queue, loop back + + /** @type {ScenarioJob | null} */ + let job = null; + try { + job = JSON.parse(String(raw)); + } catch { + console.error('[scenario-worker] Unparseable job payload, discarding:', String(raw).slice(0, 100)); + await redisLrem(PROCESSING_KEY, String(raw)).catch(() => null); + continue; + } + + const { jobId, scenarioId, iso2 } = job; + + // Validate payload fields before using any as Redis key fragments. + if ( + typeof jobId !== 'string' || !JOB_ID_RE.test(jobId) || + typeof scenarioId !== 'string' || + (iso2 !== null && (typeof iso2 !== 'string' || !/^[A-Z]{2}$/.test(iso2))) + ) { + console.error('[scenario-worker] Job failed field validation, discarding:', String(raw).slice(0, 100)); + await redisLrem(PROCESSING_KEY, String(raw)).catch(() => null); + continue; + } + + console.log(`[scenario-worker] processing ${jobId} (${scenarioId}, iso2=${iso2 ?? 'all'})`); + + // Idempotency: skip if result already written + const resultKey = `scenario-result:${jobId}`; + const existing = await redisGet(resultKey).catch(() => null); + if (existing) { + console.log(`[scenario-worker] ${jobId} already processed, skipping`); + await redisLrem(PROCESSING_KEY, String(raw)).catch(() => null); + continue; + } + + // Write processing state immediately so status.ts can reflect in-flight work. + await redisSetex(resultKey, RESULT_TTL_SECONDS, + JSON.stringify({ status: 'processing', startedAt: Date.now() }), + ).catch(() => null); + + try { + const result = await computeScenario(scenarioId, iso2); + await redisSetex( + resultKey, + RESULT_TTL_SECONDS, + JSON.stringify({ status: 'done', result, completedAt: Date.now() }), + ); + console.log(`[scenario-worker] ${jobId} done β€” ${result.topImpactCountries.length} countries impacted`); + } catch (err) { + console.error(`[scenario-worker] ${jobId} failed:`, err.message); + await redisSetex( + resultKey, + RESULT_TTL_SECONDS, + JSON.stringify({ status: 'failed', error: 'computation_error', failedAt: Date.now() }), + ).catch(() => null); + } finally { + // Always remove from processing list so the queue doesn't stall + await redisLrem(PROCESSING_KEY, String(raw)).catch(() => null); + } + } + + console.log('[scenario-worker] shutdown complete (SIGTERM received)'); +} + +const isMain = process.argv[1] && import.meta.url.endsWith(process.argv[1].replace(/^file:\/\//, '')); +if (isMain) { + runWorker().catch(err => { + console.error('[scenario-worker] fatal:', err); + process.exit(1); + }); +} diff --git a/server/worldmonitor/supply-chain/v1/scenario-templates.ts b/server/worldmonitor/supply-chain/v1/scenario-templates.ts new file mode 100644 index 000000000..51e7e5cf7 --- /dev/null +++ b/server/worldmonitor/supply-chain/v1/scenario-templates.ts @@ -0,0 +1,141 @@ +/** + * Pre-built scenario templates for the supply chain scenario engine. + * + * Each template defines a geopolitical or environmental disruption event + * that can be run through the async scenario worker to compute impact + * across countries and HS2 sectors. + * + * All chokepoint IDs must reference valid entries in chokepoint-registry.ts. + */ + +export type ScenarioType = + | 'conflict' + | 'weather' + | 'sanctions' + | 'tariff_shock' + | 'infrastructure' + | 'pandemic'; + +export interface ScenarioTemplate { + id: string; + name: string; + description: string; + type: ScenarioType; + /** IDs from chokepoint-registry.ts */ + affectedChokepointIds: string[]; + /** 0–100 percent of chokepoint capacity blocked */ + disruptionPct: number; + /** Estimated duration of disruption in days */ + durationDays: number; + /** + * HS2 chapter codes affected (e.g. '27' = energy, '85' = electronics). + * null means ALL sectors are affected. + */ + affectedHs2: string[] | null; + /** + * Additional cost multiplier applied on top of bypass corridor costs. + * 1.0 = no additional cost; 1.3 = +30% freight cost. + */ + costShockMultiplier: number; +} + +export const SCENARIO_TEMPLATES: readonly ScenarioTemplate[] = [ + { + id: 'taiwan-strait-full-closure', + name: 'Taiwan Strait Full Closure', + description: + 'Complete closure of the Taiwan Strait for 30 days β€” critical impact on electronics, machinery, and vehicle supply chains routed through East Asia.', + type: 'conflict', + affectedChokepointIds: ['taiwan_strait'], + disruptionPct: 100, + durationDays: 30, + affectedHs2: ['84', '85', '87'], // machinery, electronics, vehicles + costShockMultiplier: 1.45, + }, + { + id: 'suez-bab-simultaneous', + name: 'Suez + Bab el-Mandeb Simultaneous Disruption', + description: + 'Simultaneous 80% blockage of the Suez Canal and Bab el-Mandeb Strait for 60 days β€” full Red Sea corridor closure affecting all sectors on Asia-Europe routes.', + type: 'conflict', + affectedChokepointIds: ['suez', 'bab_el_mandeb'], + disruptionPct: 80, + durationDays: 60, + affectedHs2: null, // all sectors + costShockMultiplier: 1.35, + }, + { + id: 'panama-drought-50pct', + name: 'Panama Canal Drought β€” 50% Capacity', + description: + 'Severe drought reduces Panama Canal capacity to 50% for 90 days β€” vessels diverted via Cape Horn or Suez, adding 12–18 transit days on transpacific routes.', + type: 'weather', + affectedChokepointIds: ['panama'], + disruptionPct: 50, + durationDays: 90, + affectedHs2: null, // all sectors + costShockMultiplier: 1.22, + }, + { + id: 'hormuz-tanker-blockade', + name: 'Hormuz Strait Tanker Blockade', + description: + 'Full closure of the Strait of Hormuz for 14 days β€” complete severance of Persian Gulf energy exports affecting oil, LNG, and petrochemical supply chains.', + type: 'conflict', + affectedChokepointIds: ['hormuz_strait'], + disruptionPct: 100, + durationDays: 14, + affectedHs2: ['27', '29'], // energy + petrochemicals + costShockMultiplier: 2.10, + }, + { + id: 'russia-baltic-grain-suspension', + name: 'Russia Baltic Grain Export Suspension', + description: + 'Full suspension of Russian grain exports via Baltic ports for 180 days due to expanded sanctions β€” impacts global wheat and corn supply chains.', + type: 'sanctions', + affectedChokepointIds: ['bosphorus', 'dover_strait'], + disruptionPct: 100, + durationDays: 180, + affectedHs2: ['10', '12'], // cereals + oilseeds + costShockMultiplier: 1.55, + }, + { + id: 'us-tariff-escalation-electronics', + name: 'US Tariff Escalation β€” Electronics', + description: + 'US imposes 50% tariff on electronics imports (HS 85) for 365 days β€” no chokepoint closure but severe cost shock on transpacific container routes carrying consumer electronics.', + type: 'tariff_shock', + affectedChokepointIds: [], // tariff shock, not physical closure + disruptionPct: 0, + durationDays: 365, + affectedHs2: ['85'], // electronics + costShockMultiplier: 1.50, + }, +] as const; + +/** Lookup by scenario ID β€” returns undefined if not found */ +export function getScenarioTemplate(id: string): ScenarioTemplate | undefined { + return SCENARIO_TEMPLATES.find(t => t.id === id); +} + +// ─── Runtime types shared between MapContainer and DeckGLMap ───────────────── +// Defined here (no UI imports) to avoid circular dependency. + +/** Visual state broadcast to all map renderers when a scenario is active. */ +export interface ScenarioVisualState { + scenarioId: string; + /** Chokepoint IDs that are fully/partially disrupted in this scenario. */ + disruptedChokepointIds: string[]; + /** ISO2 codes of countries with a non-trivial computed impact score. */ + affectedIso2s: string[]; +} + +/** + * Subset of the scenario worker result consumed by the map layer. + * Full result shape lives in the scenario worker (scenario-worker.mjs). + */ +export interface ScenarioResult { + affectedChokepointIds: string[]; + topImpactCountries: Array<{ iso2: string; totalImpact: number; impactPct: number }>; +} diff --git a/src/components/DeckGLMap.ts b/src/components/DeckGLMap.ts index f7d67bd8d..042f74d55 100644 --- a/src/components/DeckGLMap.ts +++ b/src/components/DeckGLMap.ts @@ -96,6 +96,7 @@ import { } from '@/config'; import type { GulfInvestment } from '@/types'; import { resolveTradeRouteSegments, TRADE_ROUTES as TRADE_ROUTES_LIST, type TradeRouteSegment } from '@/config/trade-routes'; +import type { ScenarioVisualState } from '@/config/scenario-templates'; import { getLayersForVariant, resolveLayerLabel, bindLayerSearch, type MapVariant } from '@/config/map-layer-definitions'; import { getAuthState, subscribeAuthState } from '@/services/auth-state'; import { hasPremiumAccess } from '@/services/panel-gating'; @@ -335,6 +336,11 @@ function ensureClosedRing(ring: [number, number][]): [number, number][] { return [...ring, first]; } +/** Module-level Map from routeId β†’ waypoint IDs. Built once, reused across all layer renders. */ +const ROUTE_WAYPOINTS_MAP = new Map( + TRADE_ROUTES_LIST.map(r => [r.id, r.waypoints]), +); + export class DeckGLMap { private static readonly MAX_CLUSTER_LEAVES = 200; @@ -392,6 +398,7 @@ export class DeckGLMap { private radiationObservations: RadiationObservation[] = []; private diseaseOutbreaks: DiseaseOutbreakItem[] = []; private tradeRouteSegments: TradeRouteSegment[] = resolveTradeRouteSegments(); + private scenarioState: ScenarioVisualState | null = null; private positiveEvents: PositiveGeoEvent[] = []; private kindnessPoints: KindnessPoint[] = []; private imageryScenes: ImageryScene[] = []; @@ -4960,16 +4967,33 @@ export class DeckGLMap { const active: [number, number, number, number] = getCurrentTheme() === 'light' ? [30, 100, 180, 200] : [100, 200, 255, 160]; const disrupted: [number, number, number, number] = getCurrentTheme() === 'light' ? [200, 40, 40, 220] : [255, 80, 80, 200]; const highRisk: [number, number, number, number] = getCurrentTheme() === 'light' ? [200, 140, 20, 200] : [255, 180, 50, 180]; + const scenario: [number, number, number, number] = getCurrentTheme() === 'light' ? [220, 100, 20, 230] : [255, 140, 50, 210]; const colorFor = (status: string): [number, number, number, number] => status === 'disrupted' ? disrupted : status === 'high_risk' ? highRisk : active; + // When a scenario is active, override colors for routes that transit disrupted chokepoints. + // ROUTE_WAYPOINTS_MAP is module-level so getColor() is O(1) per segment instead of O(n) per frame. + const scenarioDisrupted = this.scenarioState + ? new Set(this.scenarioState.disruptedChokepointIds) + : null; + + const getColor = (d: TradeRouteSegment): [number, number, number, number] => { + if (scenarioDisrupted && scenarioDisrupted.size > 0) { + const waypoints = ROUTE_WAYPOINTS_MAP.get(d.routeId); + if (waypoints && waypoints.some(wp => scenarioDisrupted.has(wp))) { + return scenario; + } + } + return colorFor(d.status); + }; + return new ArcLayer({ id: 'trade-routes-layer', data: this.tradeRouteSegments, getSourcePosition: (d) => d.sourcePosition, getTargetPosition: (d) => d.targetPosition, - getSourceColor: (d) => colorFor(d.status), - getTargetColor: (d) => colorFor(d.status), + getSourceColor: getColor, + getTargetColor: getColor, getWidth: (d) => d.category === 'energy' ? 3 : 2, widthMinPixels: 1, widthMaxPixels: 6, @@ -4981,8 +5005,8 @@ export class DeckGLMap { private createTradeChokepointsLayer(): ScatterplotLayer { const routeWaypointIds = new Set(); for (const seg of this.tradeRouteSegments) { - const route = TRADE_ROUTES_LIST.find(r => r.id === seg.routeId); - if (route) for (const wp of route.waypoints) routeWaypointIds.add(wp); + const waypoints = ROUTE_WAYPOINTS_MAP.get(seg.routeId); + if (waypoints) for (const wp of waypoints) routeWaypointIds.add(wp); } const chokepoints = STRATEGIC_WATERWAYS.filter(w => routeWaypointIds.has(w.id)); const isLight = getCurrentTheme() === 'light'; @@ -5373,6 +5397,16 @@ export class DeckGLMap { this.popup.setChokepointData(data); } + /** + * Activate or deactivate a scenario visual overlay. + * When active, trade route arcs transiting disrupted chokepoints shift to + * an orange scenario color. Pass null to restore normal colors. + */ + public setScenarioState(state: ScenarioVisualState | null): void { + this.scenarioState = state; + this.render(); + } + public setHappinessScores(data: HappinessData): void { this.happinessScores = data.scores; this.happinessYear = data.year; diff --git a/src/components/MapContainer.ts b/src/components/MapContainer.ts index 59d103d2a..8596604af 100644 --- a/src/components/MapContainer.ts +++ b/src/components/MapContainer.ts @@ -48,6 +48,9 @@ import type { WebcamEntry, WebcamCluster } from '@/generated/client/worldmonitor import type { TrafficAnomaly as ProtoTrafficAnomaly, DdosLocationHit } from '@/generated/client/worldmonitor/infrastructure/v1/service_client'; import type { DiseaseOutbreakItem } from '@/services/disease-outbreaks'; import type { GetChokepointStatusResponse } from '@/services/supply-chain'; +import type { ScenarioVisualState, ScenarioResult } from '@/config/scenario-templates'; + +export type { ScenarioVisualState, ScenarioResult }; export type TimeRange = '1h' | '6h' | '24h' | '48h' | '7d' | 'all'; export type MapView = 'global' | 'america' | 'mena' | 'eu' | 'asia' | 'latam' | 'africa' | 'oceania'; @@ -965,6 +968,33 @@ export class MapContainer { } } + // ─── Scenario Engine ───────────────────────────────────────────────────────── + + /** + * Activate a scenario across all active renderers. + * PRO-gated β€” free users trigger `trackGateHit('scenario')` only. + * + * @param scenarioId Template ID from scenario-templates.ts + * @param result Computed result from the scenario worker + */ + public activateScenario(scenarioId: string, result: ScenarioResult): void { + const state: ScenarioVisualState = { + scenarioId, + disruptedChokepointIds: result.affectedChokepointIds, + affectedIso2s: result.topImpactCountries.map((c: { iso2: string }) => c.iso2), + }; + // DeckGL is the primary renderer for scenario visuals. + // Globe and SVG support is deferred to Sprint D. + this.deckGLMap?.setScenarioState(state); + } + + /** + * Deactivate the current scenario and restore normal visual state. + */ + public deactivateScenario(): void { + this.deckGLMap?.setScenarioState(null); + } + // Utility methods public isDeckGLMode(): boolean { return this.useDeckGL; diff --git a/src/config/scenario-templates.ts b/src/config/scenario-templates.ts new file mode 100644 index 000000000..48fc74ad4 --- /dev/null +++ b/src/config/scenario-templates.ts @@ -0,0 +1,12 @@ +/** + * Re-exports scenario template types for use within src/. + * The authoritative source is server/worldmonitor/supply-chain/v1/scenario-templates.ts + * (kept there so API edge functions can import it without crossing the src/ boundary). + */ + +export type { + ScenarioType, + ScenarioTemplate, + ScenarioVisualState, + ScenarioResult, +} from '../../server/worldmonitor/supply-chain/v1/scenario-templates';