Files
worldmonitor/api/bootstrap.js
Elie Habib 7711e9de03 feat(consumer-prices): add basket price monitoring domain (#1901)
* feat(consumer-prices): add basket price monitoring domain

Adds end-to-end consumer price tracking to enable inflation monitoring
across key markets, starting with UAE essentials basket.

- consumer-prices-core/: companion scraping service with pluggable
  acquisition providers (Playwright, Exa, Firecrawl, Parallel P0),
  config-driven retailer YAML, Postgres schema, Redis snapshots
- proto/worldmonitor/consumer_prices/v1/: 6-RPC service definition
- api/consumer-prices/v1/[rpc].ts: Vercel edge route
- server/worldmonitor/consumer-prices/v1/: Redis-backed RPC handlers
- src/services/consumer-prices/: circuit breakers + bootstrap hydration
- src/components/ConsumerPricesPanel.ts: 5-tab panel (overview /
  categories / movers / spread / health)
- scripts/seed-consumer-prices.mjs: Railway cron seed script
- Wire into bootstrap, health, panels, gateway, cache-keys, locale

* fix(consumer-prices): resolve all code review findings

P0: populate topCategories — categoryResult was fetched but never used.
Added buildTopCategories() helper with grouped CTE query that extracts
current_index and week-over-week pct per category.

P1 (4 fixes):
- aggregate: replace N+1 getBaselinePrice loop with single batch query
  getBaselinePrices(ids[], date) via ANY($1) — eliminates 119 DB roundtrips
  per basket run
- aggregate/computeValueIndex: was dividing all category floors by the same
  arbitrary first baseline; now uses per-item floor price with per-item
  baseline (same methodology as fixed index but with cheapest price)
- basket-series endpoint now seeded: added buildBasketSeriesSnapshot() to
  worldmonitor.ts, /basket-series route in companion API, publish.ts writes
  7d/30d/90d series per basket, seed script fetches and writes all three ranges
- scrape: call teardownAll() after each retailer run to close Playwright
  browser; without this the Chromium process leaked on Railway

P2 (4 fixes):
- db/client: remove rejectUnauthorized: false — was bypassing TLS cert
  validation on all non-localhost connections
- publish: seed-meta now writes { fetchedAt, recordCount } matching the format
  expected by _seed-utils.mjs writeExtraKeyWithMeta (was writing { fetchedAt, key })
- products: remove unused getMatchedProductsForBasket — exact duplicate of
  getBasketRows in aggregate.ts; never imported by anything

Snapshot type overhaul:
- Flatten WMOverviewSnapshot to match proto GetConsumerPriceOverviewResponse
  (was nested under overview:{}; handlers read flat)
- All asOf fields changed from number to string (int64 → string per proto JSON)
- freshnessMin/parseSuccessRate null -> 0 defaults
- lastRunAt changed from epoch number to ISO string
- Mover items now include currentPrice and currencyCode
- emptyOverview/Movers/Spread/Freshness in seed script use String(Date.now())

* feat(consumer-prices): wire Exa search engine as acquisition backend for UAE retailers

Ports the proven Exa+summary price extraction from PR #1904 (seed-grocery-basket.mjs)
into consumer-prices-core as ExaSearchAdapter, replacing unvalidated Playwright CSS
scraping for all three UAE retailers (Carrefour, Lulu, Noon).

- New ExaSearchAdapter: discovers targets from basket YAML config (one per item),
  calls Exa API with contents.summary to get AI-extracted prices, uses matchPrice()
  regex (ISO codes + symbol fallback + CURRENCY_MIN guards) to extract AED amounts
- New db/queries/matches.ts: upsertProductMatch() + getBasketItemId() for auto-linking
  scraped Exa results to basket items without a separate matching step
- scrape.ts: selects ExaSearchAdapter when config.adapter === 'exa-search'; after
  insertObservation(), auto-creates canonical product and product_match (status: 'auto')
  so aggregate.ts can compute indices immediately without manual review
- All three UAE retailer YAMLs switched to adapter: exa-search and enabled: true;
  CSS extraction blocks removed (not used by search adapter)
- config/types.ts: adds 'exa-search' to adapter enum

* fix(consumer-prices): use EXA_API_KEYS (with fallback to EXA_API_KEY) matching PR #1904 pattern

* fix(consumer-prices): wire ConsumerPricesPanel in layout + fix movers limit:0 bug

Addresses Codex P1 findings on PR #1901:
- panel-layout.ts: import and createPanel('consumer-prices') so the panel
  actually renders in finance/commodity variants where it is enabled in config
- consumer-prices/index.ts: limit was hardcoded 0 causing slice(0,0) to always
  return empty risers/fallers after bootstrap is consumed; fixed to 10

* fix(consumer-prices): add categories snapshot to close P2 gap

consumer-prices:categories:ae:* was in BOOTSTRAP_KEYS but had no producer,
so the Categories tab always showed upstreamUnavailable.

- buildCategoriesSnapshot() in worldmonitor.ts — wraps buildTopCategories()
  and returns WMCategoriesSnapshot matching ListConsumerPriceCategoriesResponse
- /categories route in consumer-prices-core API
- publish.ts writes consumer-prices:categories:{market}:{range} for 7d/30d/90d
- seed-consumer-prices.mjs fetches all three ranges from consumer-prices-core
  and writes them to Redis alongside the other snapshots

P1 issues (snapshot structure mismatch + limit:0 movers) were already fixed
in earlier commits on this branch.

* fix(types): add variants? to PANEL_CATEGORY_MAP type
2026-03-20 17:08:22 +04:00

203 lines
8.1 KiB
JavaScript

import { getCorsHeaders, getPublicCorsHeaders, isDisallowedOrigin } from './_cors.js';
import { validateApiKey } from './_api-key.js';
import { jsonResponse } from './_json-response.js';
export const config = { runtime: 'edge' };
const BOOTSTRAP_CACHE_KEYS = {
earthquakes: 'seismology:earthquakes:v1',
outages: 'infra:outages:v1',
serviceStatuses: 'infra:service-statuses:v1',
marketQuotes: 'market:stocks-bootstrap:v1',
commodityQuotes: 'market:commodities-bootstrap:v1',
sectors: 'market:sectors:v1',
etfFlows: 'market:etf-flows:v1',
macroSignals: 'economic:macro-signals:v1',
bisPolicy: 'economic:bis:policy:v1',
bisExchange: 'economic:bis:eer:v1',
bisCredit: 'economic:bis:credit:v1',
shippingRates: 'supply_chain:shipping:v2',
chokepoints: 'supply_chain:chokepoints:v4',
chokepointTransits: 'supply_chain:chokepoint_transits:v1',
minerals: 'supply_chain:minerals:v2',
giving: 'giving:summary:v1',
climateAnomalies: 'climate:anomalies:v1',
radiationWatch: 'radiation:observations:v1',
thermalEscalation: 'thermal:escalation:v1',
wildfires: 'wildfire:fires:v1',
cyberThreats: 'cyber:threats-bootstrap:v2',
techReadiness: 'economic:worldbank-techreadiness:v1',
progressData: 'economic:worldbank-progress:v1',
renewableEnergy: 'economic:worldbank-renewable:v1',
positiveGeoEvents: 'positive_events:geo-bootstrap:v1',
theaterPosture: 'theater_posture:sebuf:stale:v1',
riskScores: 'risk:scores:sebuf:stale:v1',
naturalEvents: 'natural:events:v1',
flightDelays: 'aviation:delays-bootstrap:v1',
insights: 'news:insights:v1',
predictions: 'prediction:markets-bootstrap:v1',
cryptoQuotes: 'market:crypto:v1',
cryptoSectors: 'market:crypto-sectors:v1',
defiTokens: 'market:defi-tokens:v1',
aiTokens: 'market:ai-tokens:v1',
otherTokens: 'market:other-tokens:v1',
gulfQuotes: 'market:gulf-quotes:v1',
stablecoinMarkets: 'market:stablecoins:v1',
unrestEvents: 'unrest:events:v1',
iranEvents: 'conflict:iran-events:v1',
ucdpEvents: 'conflict:ucdp-events:v1',
temporalAnomalies: 'temporal:anomalies:v1',
weatherAlerts: 'weather:alerts:v1',
spending: 'economic:spending:v1',
techEvents: 'research:tech-events-bootstrap:v1',
gdeltIntel: 'intelligence:gdelt-intel:v1',
correlationCards: 'correlation:cards-bootstrap:v1',
forecasts: 'forecast:predictions:v2',
securityAdvisories: 'intelligence:advisories-bootstrap:v1',
customsRevenue: 'trade:customs-revenue:v1',
sanctionsPressure: 'sanctions:pressure:v1',
consumerPricesOverview: 'consumer-prices:overview:ae',
consumerPricesCategories: 'consumer-prices:categories:ae:30d',
consumerPricesMovers: 'consumer-prices:movers:ae:30d',
consumerPricesSpread: 'consumer-prices:retailer-spread:ae:essentials-ae',
groceryBasket: 'economic:grocery-basket:v1',
bigmac: 'economic:bigmac:v1',
nationalDebt: 'economic:national-debt:v1',
};
const SLOW_KEYS = new Set([
'bisPolicy', 'bisExchange', 'bisCredit', 'minerals', 'giving',
'sectors', 'etfFlows', 'wildfires', 'climateAnomalies',
'radiationWatch', 'thermalEscalation',
'cyberThreats', 'techReadiness', 'progressData', 'renewableEnergy',
'naturalEvents',
'cryptoQuotes', 'cryptoSectors', 'defiTokens', 'aiTokens', 'otherTokens',
'gulfQuotes', 'stablecoinMarkets', 'unrestEvents', 'ucdpEvents',
'techEvents',
'securityAdvisories',
'customsRevenue',
'sanctionsPressure',
'consumerPricesOverview', 'consumerPricesCategories', 'consumerPricesMovers', 'consumerPricesSpread',
'groceryBasket',
'bigmac',
'nationalDebt',
]);
const FAST_KEYS = new Set([
'earthquakes', 'outages', 'serviceStatuses', 'macroSignals', 'chokepoints', 'chokepointTransits',
'marketQuotes', 'commodityQuotes', 'positiveGeoEvents', 'riskScores', 'flightDelays','insights', 'predictions',
'iranEvents', 'temporalAnomalies', 'weatherAlerts', 'spending', 'theaterPosture', 'gdeltIntel',
'correlationCards', 'forecasts', 'shippingRates',
]);
// No public/s-maxage: CF (in front of api.worldmonitor.app) ignores Vary: Origin and would
// pin ACAO: worldmonitor.app on cached responses, breaking CORS for preview deployments.
// Vercel CDN caching is handled by TIER_CDN_CACHE via CDN-Cache-Control below.
const TIER_CACHE = {
slow: 'max-age=300, stale-while-revalidate=600, stale-if-error=3600',
fast: 'max-age=60, stale-while-revalidate=120, stale-if-error=900',
};
const TIER_CDN_CACHE = {
slow: 'public, s-maxage=7200, stale-while-revalidate=1800, stale-if-error=7200',
fast: 'public, s-maxage=600, stale-while-revalidate=120, stale-if-error=900',
};
const NEG_SENTINEL = '__WM_NEG__';
async function getCachedJsonBatch(keys) {
const result = new Map();
if (keys.length === 0) return result;
const url = process.env.UPSTASH_REDIS_REST_URL;
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
if (!url || !token) return result;
// Always read unprefixed keys — bootstrap is a read-only consumer of
// production cache data. Preview/branch deploys don't run handlers that
// populate prefixed keys, so prefixing would always miss.
const pipeline = keys.map((k) => ['GET', k]);
const resp = await fetch(`${url}/pipeline`, {
method: 'POST',
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify(pipeline),
signal: AbortSignal.timeout(3000),
});
if (!resp.ok) return result;
const data = await resp.json();
for (let i = 0; i < keys.length; i++) {
const raw = data[i]?.result;
if (raw) {
try {
const parsed = JSON.parse(raw);
if (parsed !== NEG_SENTINEL) result.set(keys[i], parsed);
} catch { /* skip malformed */ }
}
}
return result;
}
export default async function handler(req) {
if (isDisallowedOrigin(req))
return new Response('Forbidden', { status: 403 });
const cors = getCorsHeaders(req);
if (req.method === 'OPTIONS')
return new Response(null, { status: 204, headers: cors });
const apiKeyResult = validateApiKey(req);
if (apiKeyResult.required && !apiKeyResult.valid)
return jsonResponse({ error: apiKeyResult.error }, 401, cors);
const url = new URL(req.url);
const tier = url.searchParams.get('tier');
let registry;
if (tier === 'slow' || tier === 'fast') {
const tierSet = tier === 'slow' ? SLOW_KEYS : FAST_KEYS;
registry = Object.fromEntries(Object.entries(BOOTSTRAP_CACHE_KEYS).filter(([k]) => tierSet.has(k)));
} else {
const requested = url.searchParams.get('keys')?.split(',').filter(Boolean).sort();
registry = requested
? Object.fromEntries(Object.entries(BOOTSTRAP_CACHE_KEYS).filter(([k]) => requested.includes(k)))
: BOOTSTRAP_CACHE_KEYS;
}
const keys = Object.values(registry);
const names = Object.keys(registry);
let cached;
try {
cached = await getCachedJsonBatch(keys);
} catch {
return jsonResponse({ data: {}, missing: names }, 200, { ...cors, 'Cache-Control': 'no-cache' });
}
const data = {};
const missing = [];
for (let i = 0; i < names.length; i++) {
const val = cached.get(keys[i]);
if (val !== undefined) {
// Strip seed-internal metadata not intended for API clients
if (names[i] === 'forecasts' && val != null && 'enrichmentMeta' in val) {
const { enrichmentMeta: _stripped, ...rest } = val;
data[names[i]] = rest;
} else {
data[names[i]] = val;
}
} else {
missing.push(names[i]);
}
}
const cacheControl = (tier && TIER_CACHE[tier]) || 'public, s-maxage=600, stale-while-revalidate=120, stale-if-error=900';
// Bootstrap data is fully public (world events, market prices, seismic data).
// Use ACAO: * so CF caches one entry valid for all origins, including Vercel
// preview deployments. Per-origin ACAO with Vary: Origin causes CF to pin the
// first origin's ACAO on the cached response, breaking CORS for other origins.
return jsonResponse({ data, missing }, 200, {
...getPublicCorsHeaders(),
'Cache-Control': cacheControl,
'CDN-Cache-Control': (tier && TIER_CDN_CACHE[tier]) || TIER_CDN_CACHE.fast,
});
}