mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
feat(consumer-prices): wire Exa engine into DB pipeline for time-series tracking (#1932)
- Add migration 002: seed canonical_products, baskets, basket_items for essentials-ae - Add migration 003: partial unique index to fix NULL uniqueness gap in canonical_products - Add ExaSearchAdapter to scrape.ts; auto-creates canonical product→basket matches - Fix getBasketItemId to lookup by canonical_name via JOIN (not category) to avoid dairy collision (3 items share same category) - Fix getOrCreateRetailer race condition with ON CONFLICT upsert - Add per-category index writes in aggregate.ts; guard division-by-zero - Set all publish.ts TTLs to 93600s (26h) to survive cron scheduling drift - Set health.js maxStaleMin to 2880 (daily cron × 2) for correct staleness detection - Remove redundant seed-consumer-prices.mjs (publish.ts writes to Redis directly) - Add src/cli/validate.ts DB health check script - Fix z.record() to z.record(z.string(), z.unknown()) for Zod compat
This commit is contained in:
@@ -148,11 +148,11 @@ const SEED_META = {
|
||||
thermalEscalation: { key: 'seed-meta:thermal:escalation', maxStaleMin: 240 },
|
||||
nationalDebt: { key: 'seed-meta:economic:national-debt', maxStaleMin: 10080 }, // 7 days — monthly seed
|
||||
tariffTrendsUs: { key: 'seed-meta:trade:tariffs:v1:840:all:10', maxStaleMin: 900 },
|
||||
consumerPricesOverview: { key: 'seed-meta:consumer-prices:overview:ae', maxStaleMin: 240 },
|
||||
consumerPricesCategories: { key: 'seed-meta:consumer-prices:categories:ae', maxStaleMin: 240 },
|
||||
consumerPricesMovers: { key: 'seed-meta:consumer-prices:movers:ae:30d', maxStaleMin: 240 },
|
||||
consumerPricesSpread: { key: 'seed-meta:consumer-prices:spread:ae', maxStaleMin: 720 },
|
||||
consumerPricesFreshness: { key: 'seed-meta:consumer-prices:freshness:ae', maxStaleMin: 30 },
|
||||
consumerPricesOverview: { key: 'seed-meta:consumer-prices:overview:ae', maxStaleMin: 2880 }, // daily cron × 2
|
||||
consumerPricesCategories: { key: 'seed-meta:consumer-prices:categories:ae', maxStaleMin: 2880 },
|
||||
consumerPricesMovers: { key: 'seed-meta:consumer-prices:movers:ae:30d', maxStaleMin: 2880 },
|
||||
consumerPricesSpread: { key: 'seed-meta:consumer-prices:spread:ae', maxStaleMin: 2880 },
|
||||
consumerPricesFreshness: { key: 'seed-meta:consumer-prices:freshness:ae', maxStaleMin: 2880 },
|
||||
};
|
||||
|
||||
// Standalone keys that are populated on-demand by RPC handlers (not seeds).
|
||||
|
||||
@@ -8,7 +8,8 @@ REDIS_URL=redis://localhost:6379
|
||||
# Firecrawl — https://firecrawl.dev
|
||||
FIRECRAWL_API_KEY=
|
||||
|
||||
# Exa — https://exa.ai
|
||||
# Exa — https://exa.ai (supports comma/newline-separated key rotation)
|
||||
EXA_API_KEYS=
|
||||
EXA_API_KEY=
|
||||
|
||||
# Parallel P0
|
||||
|
||||
@@ -33,6 +33,7 @@ RUN npm ci --omit=dev
|
||||
COPY tsconfig.json ./
|
||||
COPY src ./src
|
||||
COPY configs ./configs
|
||||
COPY migrations ./migrations
|
||||
RUN npm run build
|
||||
|
||||
# Runtime
|
||||
|
||||
60
consumer-prices-core/migrations/002_seed_reference_data.sql
Normal file
60
consumer-prices-core/migrations/002_seed_reference_data.sql
Normal file
@@ -0,0 +1,60 @@
|
||||
-- Consumer Prices Core: Seed reference data
|
||||
-- Inserts canonical products, the essentials-ae basket, and its 12 basket items.
|
||||
-- Must match configs/baskets/essentials_ae.yaml exactly (same canonical names).
|
||||
-- Run automatically by: tsx src/db/migrate.ts
|
||||
|
||||
-- ─── Canonical Products ───────────────────────────────────────────────────────
|
||||
|
||||
INSERT INTO canonical_products (canonical_name, category) VALUES
|
||||
('Eggs Fresh 12 Pack', 'eggs'),
|
||||
('Full Fat Fresh Milk 1L', 'dairy'),
|
||||
('White Sliced Bread 600g', 'bread'),
|
||||
('Basmati Rice 1kg', 'rice'),
|
||||
('Sunflower Oil 1L', 'cooking_oil'),
|
||||
('Whole Chicken Fresh 1kg', 'chicken'),
|
||||
('Tomatoes Fresh 1kg', 'tomatoes'),
|
||||
('Onions 1kg', 'onions'),
|
||||
('Drinking Water 1.5L', 'water'),
|
||||
('White Sugar 1kg', 'sugar'),
|
||||
('Processed Cheese Slices 200g','dairy'),
|
||||
('Plain Yogurt 500g', 'dairy');
|
||||
|
||||
-- ─── Basket ───────────────────────────────────────────────────────────────────
|
||||
|
||||
INSERT INTO baskets (slug, name, market_code, methodology, base_date, description)
|
||||
VALUES (
|
||||
'essentials-ae',
|
||||
'Essentials Basket UAE',
|
||||
'ae',
|
||||
'fixed',
|
||||
'2025-01-01',
|
||||
'Core household essentials tracked weekly across UAE retailers. Weighted to reflect a typical household of 4 in the UAE.'
|
||||
);
|
||||
|
||||
-- ─── Basket Items (joined from canonical_products) ────────────────────────────
|
||||
-- Each basket item links to its canonical product via canonical_product_id.
|
||||
-- The weight, category, and canonical_name must mirror essentials_ae.yaml.
|
||||
|
||||
INSERT INTO basket_items (basket_id, category, canonical_product_id, weight)
|
||||
SELECT
|
||||
b.id,
|
||||
cp.category,
|
||||
cp.id,
|
||||
v.weight
|
||||
FROM baskets b
|
||||
CROSS JOIN LATERAL (VALUES
|
||||
('Eggs Fresh 12 Pack', 0.12),
|
||||
('Full Fat Fresh Milk 1L', 0.10),
|
||||
('White Sliced Bread 600g', 0.08),
|
||||
('Basmati Rice 1kg', 0.10),
|
||||
('Sunflower Oil 1L', 0.08),
|
||||
('Whole Chicken Fresh 1kg', 0.12),
|
||||
('Tomatoes Fresh 1kg', 0.08),
|
||||
('Onions 1kg', 0.06),
|
||||
('Drinking Water 1.5L', 0.08),
|
||||
('White Sugar 1kg', 0.06),
|
||||
('Processed Cheese Slices 200g', 0.06),
|
||||
('Plain Yogurt 500g', 0.06)
|
||||
) AS v(canonical_name, weight)
|
||||
JOIN canonical_products cp ON cp.canonical_name = v.canonical_name
|
||||
WHERE b.slug = 'essentials-ae';
|
||||
@@ -0,0 +1,15 @@
|
||||
-- Consumer Prices Core: Fix canonical_products upsert with NULL variant fields.
|
||||
--
|
||||
-- PostgreSQL treats NULL != NULL in unique constraints, so the existing
|
||||
-- UNIQUE (canonical_name, brand_norm, category, variant_norm, size_value, size_unit)
|
||||
-- constraint never fires when brand_norm/variant_norm/size_value/size_unit are all NULL.
|
||||
-- This caused upsertCanonicalProduct() to INSERT duplicates on every scrape run.
|
||||
--
|
||||
-- This partial index covers the common case: no brand, no variant, no size specified.
|
||||
|
||||
CREATE UNIQUE INDEX canonical_products_name_category_null_idx
|
||||
ON canonical_products (canonical_name, category)
|
||||
WHERE brand_norm IS NULL
|
||||
AND variant_norm IS NULL
|
||||
AND size_value IS NULL
|
||||
AND size_unit IS NULL;
|
||||
75
consumer-prices-core/src/cli/validate.ts
Normal file
75
consumer-prices-core/src/cli/validate.ts
Normal file
@@ -0,0 +1,75 @@
|
||||
/**
|
||||
* Validates DB state: baskets, basket_items, retailers, and recent observations.
|
||||
* Run: tsx src/cli/validate.ts
|
||||
* Exit 0 = healthy, Exit 1 = issues found.
|
||||
*/
|
||||
import 'dotenv/config';
|
||||
import { getPool } from '../db/client.js';
|
||||
|
||||
const pool = getPool();
|
||||
let issues = 0;
|
||||
|
||||
async function check(label: string, sql: string, params: unknown[], expectMin: number) {
|
||||
const result = await pool.query<{ count: string }>(sql, params as never[]);
|
||||
const count = parseInt(result.rows[0]?.count ?? '0', 10);
|
||||
const ok = count >= expectMin;
|
||||
console.log(` [${ok ? 'OK' : 'FAIL'}] ${label}: ${count} (expected ≥ ${expectMin})`);
|
||||
if (!ok) issues++;
|
||||
}
|
||||
|
||||
async function run() {
|
||||
console.log('[validate] Checking DB state...');
|
||||
|
||||
await check('Baskets', `SELECT COUNT(*) FROM baskets`, [], 1);
|
||||
await check('Basket items', `SELECT COUNT(*) FROM basket_items WHERE active = true`, [], 12);
|
||||
await check(
|
||||
'Canonical products',
|
||||
`SELECT COUNT(*) FROM canonical_products WHERE active = true`,
|
||||
[],
|
||||
12,
|
||||
);
|
||||
await check('Retailers', `SELECT COUNT(*) FROM retailers WHERE active = true`, [], 1);
|
||||
await check(
|
||||
'Price observations (any)',
|
||||
`SELECT COUNT(*) FROM price_observations`,
|
||||
[],
|
||||
0,
|
||||
);
|
||||
await check(
|
||||
'Product matches (auto)',
|
||||
`SELECT COUNT(*) FROM product_matches WHERE match_status = 'auto'`,
|
||||
[],
|
||||
0,
|
||||
);
|
||||
await check(
|
||||
'Computed indices',
|
||||
`SELECT COUNT(*) FROM computed_indices`,
|
||||
[],
|
||||
0,
|
||||
);
|
||||
|
||||
const freshResult = await pool.query<{ slug: string; last_run_at: Date | null }>(
|
||||
`SELECT r.slug, dsh.last_successful_run_at AS last_run_at
|
||||
FROM retailers r
|
||||
LEFT JOIN data_source_health dsh ON dsh.retailer_id = r.id
|
||||
WHERE r.active = true`,
|
||||
);
|
||||
for (const row of freshResult.rows) {
|
||||
const age = row.last_run_at
|
||||
? Math.round((Date.now() - row.last_run_at.getTime()) / 1000 / 60) + 'min ago'
|
||||
: 'never scraped';
|
||||
console.log(` [INFO] ${row.slug}: last successful scrape ${age}`);
|
||||
}
|
||||
|
||||
await pool.end();
|
||||
if (issues > 0) {
|
||||
console.error(`[validate] ${issues} check(s) failed.`);
|
||||
process.exit(1);
|
||||
}
|
||||
console.log('[validate] All checks passed.');
|
||||
}
|
||||
|
||||
run().catch((err) => {
|
||||
console.error('[validate]', err);
|
||||
process.exit(1);
|
||||
});
|
||||
@@ -75,7 +75,7 @@ export const RetailerConfigSchema = z.object({
|
||||
currencySymbols: z.array(z.string()).default([]),
|
||||
})
|
||||
.optional(),
|
||||
}),
|
||||
}).optional(),
|
||||
enabled: z.boolean().default(true),
|
||||
}),
|
||||
});
|
||||
@@ -91,7 +91,7 @@ export const BasketItemSchema = z.object({
|
||||
substitutionGroup: z.string().optional(),
|
||||
minBaseQty: z.number().optional(),
|
||||
maxBaseQty: z.number().optional(),
|
||||
qualificationRules: z.record(z.unknown()).optional(),
|
||||
qualificationRules: z.record(z.string(), z.unknown()).optional(),
|
||||
});
|
||||
|
||||
export const BasketConfigSchema = z.object({
|
||||
|
||||
@@ -26,13 +26,14 @@ export async function upsertProductMatch(input: {
|
||||
);
|
||||
}
|
||||
|
||||
export async function getBasketItemId(basketSlug: string, category: string): Promise<string | null> {
|
||||
export async function getBasketItemId(basketSlug: string, canonicalName: string): Promise<string | null> {
|
||||
const result = await query<{ id: string }>(
|
||||
`SELECT bi.id FROM basket_items bi
|
||||
JOIN baskets b ON b.id = bi.basket_id
|
||||
WHERE b.slug = $1 AND bi.category = $2 AND bi.active = true
|
||||
JOIN canonical_products cp ON cp.id = bi.canonical_product_id
|
||||
WHERE b.slug = $1 AND cp.canonical_name = $2 AND bi.active = true
|
||||
LIMIT 1`,
|
||||
[basketSlug, category],
|
||||
[basketSlug, canonicalName],
|
||||
);
|
||||
return result.rows[0]?.id ?? null;
|
||||
}
|
||||
|
||||
@@ -193,6 +193,23 @@ export async function aggregateBasket(basketSlug: string, marketCode: string) {
|
||||
await writeComputedIndex(basketId, null, null, 'value_index', valueIndex);
|
||||
await writeComputedIndex(basketId, null, null, 'coverage_pct', coveragePct);
|
||||
|
||||
// Per-category indices for buildTopCategories snapshot
|
||||
const byCategory = new Map<string, BasketRow[]>();
|
||||
for (const r of rows) {
|
||||
if (!byCategory.has(r.category)) byCategory.set(r.category, []);
|
||||
byCategory.get(r.category)!.push(r);
|
||||
}
|
||||
|
||||
for (const [category, catRows] of byCategory) {
|
||||
const catEssentials = computeFixedIndex(catRows, baselines);
|
||||
const catCoverage =
|
||||
(new Set(catRows.map((r) => r.basketItemId)).size /
|
||||
Math.max(1, basketConfig.items.filter((i) => i.category === category).length)) *
|
||||
100;
|
||||
await writeComputedIndex(basketId, null, category, 'essentials_index', catEssentials);
|
||||
await writeComputedIndex(basketId, null, category, 'coverage_pct', catCoverage);
|
||||
}
|
||||
|
||||
logger.info(`${basketSlug}:${marketCode} essentials=${essentialsIndex.toFixed(2)} value=${valueIndex.toFixed(2)} coverage=${coveragePct.toFixed(1)}%`);
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,9 @@ async function writeSnapshot(
|
||||
logger.info(` wrote ${key} (${json.length} bytes, ttl=${ttlSeconds}s)`);
|
||||
}
|
||||
|
||||
// 26h TTL — longer than the 24h cron cadence to survive scheduling drift
|
||||
const TTL = 93600;
|
||||
|
||||
export async function publishAll() {
|
||||
const redisUrl = process.env.REDIS_URL;
|
||||
if (!redisUrl) throw new Error('REDIS_URL is not set');
|
||||
@@ -63,7 +66,7 @@ export async function publishAll() {
|
||||
|
||||
try {
|
||||
const overview = await buildOverviewSnapshot(marketCode);
|
||||
await writeSnapshot(redis, makeKey(['consumer-prices', 'overview', marketCode]), overview, 1800);
|
||||
await writeSnapshot(redis, makeKey(['consumer-prices', 'overview', marketCode]), overview, TTL);
|
||||
} catch (err) {
|
||||
logger.error(`overview:${marketCode} failed: ${err}`);
|
||||
}
|
||||
@@ -71,7 +74,7 @@ export async function publishAll() {
|
||||
for (const days of [7, 30]) {
|
||||
try {
|
||||
const movers = await buildMoversSnapshot(marketCode, days);
|
||||
await writeSnapshot(redis, makeKey(['consumer-prices', 'movers', marketCode, `${days}d`]), movers, 1800);
|
||||
await writeSnapshot(redis, makeKey(['consumer-prices', 'movers', marketCode, `${days}d`]), movers, TTL);
|
||||
} catch (err) {
|
||||
logger.error(`movers:${marketCode}:${days}d failed: ${err}`);
|
||||
}
|
||||
@@ -79,7 +82,7 @@ export async function publishAll() {
|
||||
|
||||
try {
|
||||
const freshness = await buildFreshnessSnapshot(marketCode);
|
||||
await writeSnapshot(redis, makeKey(['consumer-prices', 'freshness', marketCode]), freshness, 600);
|
||||
await writeSnapshot(redis, makeKey(['consumer-prices', 'freshness', marketCode]), freshness, TTL);
|
||||
} catch (err) {
|
||||
logger.error(`freshness:${marketCode} failed: ${err}`);
|
||||
}
|
||||
@@ -87,7 +90,7 @@ export async function publishAll() {
|
||||
for (const range of ['7d', '30d', '90d']) {
|
||||
try {
|
||||
const categories = await buildCategoriesSnapshot(marketCode, range);
|
||||
await writeSnapshot(redis, makeKey(['consumer-prices', 'categories', marketCode, range]), categories, 1800);
|
||||
await writeSnapshot(redis, makeKey(['consumer-prices', 'categories', marketCode, range]), categories, TTL);
|
||||
} catch (err) {
|
||||
logger.error(`categories:${marketCode}:${range} failed: ${err}`);
|
||||
}
|
||||
@@ -100,7 +103,7 @@ export async function publishAll() {
|
||||
redis,
|
||||
makeKey(['consumer-prices', 'retailer-spread', marketCode, basket.slug]),
|
||||
spread,
|
||||
1800,
|
||||
TTL,
|
||||
);
|
||||
} catch (err) {
|
||||
logger.error(`spread:${marketCode}:${basket.slug} failed: ${err}`);
|
||||
@@ -113,7 +116,7 @@ export async function publishAll() {
|
||||
redis,
|
||||
makeKey(['consumer-prices', 'basket-series', marketCode, basket.slug, range]),
|
||||
series,
|
||||
3600,
|
||||
TTL,
|
||||
);
|
||||
} catch (err) {
|
||||
logger.error(`basket-series:${marketCode}:${basket.slug}:${range} failed: ${err}`);
|
||||
|
||||
@@ -25,12 +25,13 @@ async function sleep(ms: number) {
|
||||
}
|
||||
|
||||
async function getOrCreateRetailer(slug: string, config: ReturnType<typeof loadRetailerConfig>) {
|
||||
const existing = await query<{ id: string }>(`SELECT id FROM retailers WHERE slug = $1`, [slug]);
|
||||
if (existing.rows.length > 0) return existing.rows[0].id;
|
||||
|
||||
const result = await query<{ id: string }>(
|
||||
`INSERT INTO retailers (slug, name, market_code, country_code, currency_code, adapter_key, base_url)
|
||||
VALUES ($1,$2,$3,$3,$4,$5,$6) RETURNING id`,
|
||||
VALUES ($1,$2,$3,$3,$4,$5,$6)
|
||||
ON CONFLICT (slug) DO UPDATE SET
|
||||
name = EXCLUDED.name, adapter_key = EXCLUDED.adapter_key,
|
||||
base_url = EXCLUDED.base_url, updated_at = NOW()
|
||||
RETURNING id`,
|
||||
[slug, config.name, config.marketCode, config.currencyCode, config.adapter, config.baseUrl],
|
||||
);
|
||||
return result.rows[0].id;
|
||||
@@ -130,7 +131,7 @@ export async function scrapeRetailer(slug: string) {
|
||||
if (
|
||||
config.adapter === 'exa-search' &&
|
||||
product.rawPayload.basketSlug &&
|
||||
product.rawPayload.itemCategory
|
||||
product.rawPayload.canonicalName
|
||||
) {
|
||||
try {
|
||||
const canonicalId = await upsertCanonicalProduct({
|
||||
@@ -139,7 +140,7 @@ export async function scrapeRetailer(slug: string) {
|
||||
});
|
||||
const basketItemId = await getBasketItemId(
|
||||
product.rawPayload.basketSlug as string,
|
||||
product.rawPayload.itemCategory as string,
|
||||
product.rawPayload.canonicalName as string,
|
||||
);
|
||||
if (basketItemId) {
|
||||
await upsertProductMatch({
|
||||
@@ -168,6 +169,21 @@ export async function scrapeRetailer(slug: string) {
|
||||
const status = errorsCount === 0 ? 'completed' : pagesSucceeded > 0 ? 'partial' : 'failed';
|
||||
await updateScrapeRun(runId, status, pagesAttempted, pagesSucceeded, errorsCount);
|
||||
logger.info(`Run ${runId} finished: ${status} (${pagesSucceeded}/${pagesAttempted} pages)`);
|
||||
|
||||
const parseSuccessRate = pagesAttempted > 0 ? (pagesSucceeded / pagesAttempted) * 100 : 0;
|
||||
const isSuccess = status === 'completed' || status === 'partial';
|
||||
await query(
|
||||
`INSERT INTO data_source_health
|
||||
(retailer_id, last_successful_run_at, last_run_status, parse_success_rate, updated_at)
|
||||
VALUES ($1, $2, $3, $4, NOW())
|
||||
ON CONFLICT (retailer_id) DO UPDATE SET
|
||||
last_successful_run_at = COALESCE($2, data_source_health.last_successful_run_at),
|
||||
last_run_status = EXCLUDED.last_run_status,
|
||||
parse_success_rate = EXCLUDED.parse_success_rate,
|
||||
updated_at = NOW()`,
|
||||
[retailerId, isSuccess ? new Date() : null, status, Math.round(parseSuccessRate * 100) / 100],
|
||||
);
|
||||
|
||||
await teardownAll();
|
||||
}
|
||||
|
||||
|
||||
@@ -1,201 +0,0 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
/**
|
||||
* Seed script: fetches compact snapshot payloads from consumer-prices-core
|
||||
* and writes them to Upstash Redis for WorldMonitor bootstrap hydration.
|
||||
*
|
||||
* Run manually: node scripts/seed-consumer-prices.mjs
|
||||
* Deployed as: Railway cron service (same pattern as ais-relay loops)
|
||||
*
|
||||
* Memory: runSeed() calls process.exit(0) — use extraKeys for all keys.
|
||||
*/
|
||||
|
||||
import { loadEnvFile, CHROME_UA, writeExtraKeyWithMeta } from './_seed-utils.mjs';
|
||||
|
||||
loadEnvFile(import.meta.url);
|
||||
|
||||
const BASE_URL = process.env.CONSUMER_PRICES_CORE_BASE_URL;
|
||||
const API_KEY = process.env.CONSUMER_PRICES_CORE_API_KEY;
|
||||
const MARKET = process.env.CONSUMER_PRICES_DEFAULT_MARKET || 'ae';
|
||||
const BASKET = 'essentials-ae';
|
||||
|
||||
if (!BASE_URL) {
|
||||
console.warn('[consumer-prices] CONSUMER_PRICES_CORE_BASE_URL not set — writing empty placeholders');
|
||||
}
|
||||
|
||||
async function fetchSnapshot(path) {
|
||||
if (!BASE_URL) return null;
|
||||
const url = `${BASE_URL.replace(/\/$/, '')}${path}`;
|
||||
try {
|
||||
const resp = await fetch(url, {
|
||||
headers: {
|
||||
'User-Agent': CHROME_UA,
|
||||
...(API_KEY ? { 'x-api-key': API_KEY } : {}),
|
||||
},
|
||||
signal: AbortSignal.timeout(20_000),
|
||||
});
|
||||
if (!resp.ok) {
|
||||
console.warn(` [consumer-prices] ${path} HTTP ${resp.status}`);
|
||||
return null;
|
||||
}
|
||||
return resp.json();
|
||||
} catch (err) {
|
||||
console.warn(` [consumer-prices] ${path} error: ${err.message}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function emptyOverview(market) {
|
||||
return {
|
||||
marketCode: market,
|
||||
asOf: String(Date.now()),
|
||||
currencyCode: 'AED',
|
||||
essentialsIndex: 0,
|
||||
valueBasketIndex: 0,
|
||||
wowPct: 0,
|
||||
momPct: 0,
|
||||
retailerSpreadPct: 0,
|
||||
coveragePct: 0,
|
||||
freshnessLagMin: 0,
|
||||
topCategories: [],
|
||||
upstreamUnavailable: true,
|
||||
};
|
||||
}
|
||||
|
||||
function emptyMovers(market, range) {
|
||||
return { marketCode: market, asOf: String(Date.now()), range, risers: [], fallers: [], upstreamUnavailable: true };
|
||||
}
|
||||
|
||||
function emptySpread(market, basket) {
|
||||
return { marketCode: market, asOf: String(Date.now()), basketSlug: basket, currencyCode: 'AED', retailers: [], spreadPct: 0, upstreamUnavailable: true };
|
||||
}
|
||||
|
||||
function emptyFreshness(market) {
|
||||
return { marketCode: market, asOf: String(Date.now()), retailers: [], overallFreshnessMin: 0, stalledCount: 0, upstreamUnavailable: true };
|
||||
}
|
||||
|
||||
function emptyBasketSeries(market, basket, range) {
|
||||
return { marketCode: market, basketSlug: basket, asOf: String(Date.now()), currencyCode: 'AED', range, essentialsSeries: [], valueSeries: [], upstreamUnavailable: true };
|
||||
}
|
||||
|
||||
function emptyCategories(market, range) {
|
||||
return { marketCode: market, asOf: String(Date.now()), range, categories: [], upstreamUnavailable: true };
|
||||
}
|
||||
|
||||
async function run() {
|
||||
console.log(`[consumer-prices] seeding market=${MARKET} basket=${BASKET}`);
|
||||
|
||||
const TTL_OVERVIEW = 1800; // 30 min
|
||||
const TTL_MOVERS = 1800; // 30 min
|
||||
const TTL_SPREAD = 3600; // 60 min
|
||||
const TTL_FRESHNESS = 600; // 10 min
|
||||
const TTL_SERIES = 3600; // 60 min
|
||||
const TTL_CATEGORIES = 1800; // 30 min
|
||||
|
||||
// Fetch all snapshots in parallel
|
||||
const [overview, movers30d, movers7d, spread, freshness, series30d, series7d, series90d,
|
||||
categories30d, categories7d, categories90d] = await Promise.all([
|
||||
fetchSnapshot(`/wm/consumer-prices/v1/overview?market=${MARKET}`),
|
||||
fetchSnapshot(`/wm/consumer-prices/v1/movers?market=${MARKET}&days=30`),
|
||||
fetchSnapshot(`/wm/consumer-prices/v1/movers?market=${MARKET}&days=7`),
|
||||
fetchSnapshot(`/wm/consumer-prices/v1/retailer-spread?market=${MARKET}&basket=${BASKET}`),
|
||||
fetchSnapshot(`/wm/consumer-prices/v1/freshness?market=${MARKET}`),
|
||||
fetchSnapshot(`/wm/consumer-prices/v1/basket-series?market=${MARKET}&basket=${BASKET}&range=30d`),
|
||||
fetchSnapshot(`/wm/consumer-prices/v1/basket-series?market=${MARKET}&basket=${BASKET}&range=7d`),
|
||||
fetchSnapshot(`/wm/consumer-prices/v1/basket-series?market=${MARKET}&basket=${BASKET}&range=90d`),
|
||||
fetchSnapshot(`/wm/consumer-prices/v1/categories?market=${MARKET}&range=30d`),
|
||||
fetchSnapshot(`/wm/consumer-prices/v1/categories?market=${MARKET}&range=7d`),
|
||||
fetchSnapshot(`/wm/consumer-prices/v1/categories?market=${MARKET}&range=90d`),
|
||||
]);
|
||||
|
||||
const writes = [
|
||||
{
|
||||
key: `consumer-prices:overview:${MARKET}`,
|
||||
data: overview ?? emptyOverview(MARKET),
|
||||
ttl: TTL_OVERVIEW,
|
||||
metaKey: `seed-meta:consumer-prices:overview:${MARKET}`,
|
||||
},
|
||||
{
|
||||
key: `consumer-prices:movers:${MARKET}:30d`,
|
||||
data: movers30d ?? emptyMovers(MARKET, '30d'),
|
||||
ttl: TTL_MOVERS,
|
||||
metaKey: `seed-meta:consumer-prices:movers:${MARKET}:30d`,
|
||||
},
|
||||
{
|
||||
key: `consumer-prices:movers:${MARKET}:7d`,
|
||||
data: movers7d ?? emptyMovers(MARKET, '7d'),
|
||||
ttl: TTL_MOVERS,
|
||||
metaKey: `seed-meta:consumer-prices:movers:${MARKET}:7d`,
|
||||
},
|
||||
{
|
||||
key: `consumer-prices:retailer-spread:${MARKET}:${BASKET}`,
|
||||
data: spread ?? emptySpread(MARKET, BASKET),
|
||||
ttl: TTL_SPREAD,
|
||||
metaKey: `seed-meta:consumer-prices:spread:${MARKET}`,
|
||||
},
|
||||
{
|
||||
key: `consumer-prices:freshness:${MARKET}`,
|
||||
data: freshness ?? emptyFreshness(MARKET),
|
||||
ttl: TTL_FRESHNESS,
|
||||
metaKey: `seed-meta:consumer-prices:freshness:${MARKET}`,
|
||||
},
|
||||
{
|
||||
key: `consumer-prices:basket-series:${MARKET}:${BASKET}:30d`,
|
||||
data: series30d ?? emptyBasketSeries(MARKET, BASKET, '30d'),
|
||||
ttl: TTL_SERIES,
|
||||
metaKey: `seed-meta:consumer-prices:basket-series:${MARKET}:${BASKET}:30d`,
|
||||
},
|
||||
{
|
||||
key: `consumer-prices:basket-series:${MARKET}:${BASKET}:7d`,
|
||||
data: series7d ?? emptyBasketSeries(MARKET, BASKET, '7d'),
|
||||
ttl: TTL_SERIES,
|
||||
metaKey: `seed-meta:consumer-prices:basket-series:${MARKET}:${BASKET}:7d`,
|
||||
},
|
||||
{
|
||||
key: `consumer-prices:basket-series:${MARKET}:${BASKET}:90d`,
|
||||
data: series90d ?? emptyBasketSeries(MARKET, BASKET, '90d'),
|
||||
ttl: TTL_SERIES,
|
||||
metaKey: `seed-meta:consumer-prices:basket-series:${MARKET}:${BASKET}:90d`,
|
||||
},
|
||||
{
|
||||
key: `consumer-prices:categories:${MARKET}:30d`,
|
||||
data: categories30d ?? emptyCategories(MARKET, '30d'),
|
||||
ttl: TTL_CATEGORIES,
|
||||
metaKey: `seed-meta:consumer-prices:categories:${MARKET}:30d`,
|
||||
},
|
||||
{
|
||||
key: `consumer-prices:categories:${MARKET}:7d`,
|
||||
data: categories7d ?? emptyCategories(MARKET, '7d'),
|
||||
ttl: TTL_CATEGORIES,
|
||||
metaKey: `seed-meta:consumer-prices:categories:${MARKET}:7d`,
|
||||
},
|
||||
{
|
||||
key: `consumer-prices:categories:${MARKET}:90d`,
|
||||
data: categories90d ?? emptyCategories(MARKET, '90d'),
|
||||
ttl: TTL_CATEGORIES,
|
||||
metaKey: `seed-meta:consumer-prices:categories:${MARKET}:90d`,
|
||||
},
|
||||
];
|
||||
|
||||
let failed = 0;
|
||||
for (const { key, data, ttl, metaKey } of writes) {
|
||||
try {
|
||||
const recordCount = Array.isArray(data.retailers ?? data.categories ?? data.risers)
|
||||
? (data.retailers ?? data.categories ?? data.risers ?? []).length
|
||||
: 1;
|
||||
await writeExtraKeyWithMeta(key, data, ttl, recordCount, metaKey);
|
||||
console.log(` [consumer-prices] wrote ${key} (${recordCount} records)`);
|
||||
} catch (err) {
|
||||
console.error(` [consumer-prices] failed ${key}: ${err.message}`);
|
||||
failed++;
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`[consumer-prices] done. ${writes.length - failed}/${writes.length} keys written.`);
|
||||
process.exit(failed > 0 ? 1 : 0);
|
||||
}
|
||||
|
||||
run().catch((err) => {
|
||||
console.error('[consumer-prices] seed failed:', err);
|
||||
process.exit(1);
|
||||
});
|
||||
Reference in New Issue
Block a user