fix(consumer-prices): pipeline hardening, basket spread fix, panel bugs, sw-update test sync (#2040)

* fix(consumer-prices): harden scrape/aggregate/publish pipeline

- scrape: treat 0-product parse as error (increments errorsCount, skips
  pagesSucceeded) so noon_grocery_ae missing eggs_12/tomatoes_1kg marks
  the run partial instead of completed
- publish: fix freshData gate (freshnessMin >= 0) so a scrape finishing
  at exactly 0 min lag still advances seed-meta
- aggregate: wrap per-basket aggregation in try/catch so one failing
  basket does not skip remaining baskets; re-throw if any failed
- seed-consumer-prices.mjs: require --force flag to prevent accidentally
  stomping publish.ts 26h TTLs with short 10-60min fallback TTLs

* fix(consumer-prices): correct basket comparison with intersection + dedup

Both aggregate.ts and the retailer spread snapshot were summing ALL
matched SKUs per retailer without deduplication, making Carrefour
appear most expensive simply because it had more matched products
(31 "items" vs Noon's 20 for a 12-item basket).

Fixes:
- aggregate.ts retailer_spread_pct: deduplicate per (retailer, basketItem)
  taking cheapest price, then only compare on items all retailers carry
- worldmonitor.ts buildRetailerSpreadSnapshot: same dedup + intersection
  logic in SQL — one best_price per (retailer, basket_item), common_items
  CTE filters to items every active retailer covers
- exa-search.ts parseListing: log whether Exa returned 0 results or
  results with no extractable price, to distinguish the two failure modes

* fix(consumer-prices-panel): correct parse rate display, category names, and freshness colors

- parseSuccessRate is stored as 0-100 but UI was doing *100 again (shows 10000%)
- Category name builder converts snake_case to Title Case (Cooking_oil → Cooking Oil)
- Add missing cp-fresh--ok/warn/stale/unknown CSS classes (freshness labels had no color)
- Add border-radius to stat cards and range buttons; add font-family to range buttons
- Add padding + bottom border to cp-range-bar for visual separation

* fix(consumer-prices): gate overview spread_pct query to last 2 days

buildOverviewSnapshot queried retailer_spread_pct with no recency
filter, so ORDER BY metric_date DESC LIMIT 1 would serve an
arbitrarily old row when today's aggregate run omitted a write
(no retailer intersection). Add INTERVAL '2 days' cutoff — covers
24h cron cadence plus scheduling drift. Falls through to 0 (→ UI
shows '—') when no recent value exists.
This commit is contained in:
Elie Habib
2026-03-22 11:46:40 +04:00
committed by GitHub
parent cfd11a8402
commit 6d66c06f07
8 changed files with 111 additions and 27 deletions

View File

@@ -170,11 +170,20 @@ export class ExaSearchAdapter implements RetailerAdapter {
const payload = JSON.parse(result.html) as SearchPayload;
const currency = ctx.config.currencyCode;
if (payload.exaResults.length === 0) {
ctx.logger.warn(` [exa] ${payload.canonicalName}: 0 results from Exa (no indexed pages on this domain for query)`);
return [];
}
for (const r of payload.exaResults) {
const price =
matchPrice(r.summary ?? '', currency) ??
matchPrice(r.title ?? '', currency);
if (price === null) {
ctx.logger.warn(` [exa] ${payload.canonicalName}: no price in result — title="${r.title?.slice(0, 60)}" summary="${(r.summary ?? '').slice(0, 80)}"`);
}
if (price !== null) {
return [
{

View File

@@ -195,14 +195,29 @@ export async function aggregateBasket(basketSlug: string, marketCode: string) {
await writeComputedIndex(basketId, null, null, 'coverage_pct', coveragePct);
// Retailer spread: (most expensive basket - cheapest basket) / cheapest × 100
const retailerTotals = new Map<string, number>();
// Only compare retailers on the INTERSECTION of basket items they all carry.
// Also deduplicate: per (retailer, basketItem) take cheapest price to avoid
// inflating totals when multiple SKUs are matched to the same basket item.
const byRetailerItem = new Map<string, Map<string, number>>();
for (const r of rows) {
retailerTotals.set(r.retailerSlug, (retailerTotals.get(r.retailerSlug) ?? 0) + r.price);
if (!byRetailerItem.has(r.retailerSlug)) byRetailerItem.set(r.retailerSlug, new Map());
const itemMap = byRetailerItem.get(r.retailerSlug)!;
const existing = itemMap.get(r.basketItemId);
if (existing === undefined || r.price < existing) itemMap.set(r.basketItemId, r.price);
}
if (retailerTotals.size >= 2) {
const totals = [...retailerTotals.values()];
const spreadPct = ((Math.max(...totals) - Math.min(...totals)) / Math.min(...totals)) * 100;
await writeComputedIndex(basketId, null, null, 'retailer_spread_pct', Math.round(spreadPct * 10) / 10);
const retailerSlugs = [...byRetailerItem.keys()];
if (retailerSlugs.length >= 2) {
// Find basket items covered by every retailer
const commonItemIds = [...byRetailerItem.get(retailerSlugs[0])!.keys()].filter((itemId) =>
retailerSlugs.every((slug) => byRetailerItem.get(slug)!.has(itemId)),
);
if (commonItemIds.length > 0) {
const retailerTotals = retailerSlugs.map((slug) =>
commonItemIds.reduce((sum, id) => sum + byRetailerItem.get(slug)!.get(id)!, 0),
);
const spreadPct = ((Math.max(...retailerTotals) - Math.min(...retailerTotals)) / Math.min(...retailerTotals)) * 100;
await writeComputedIndex(basketId, null, null, 'retailer_spread_pct', Math.round(spreadPct * 10) / 10);
}
}
// Per-category indices for buildTopCategories snapshot
@@ -227,9 +242,16 @@ export async function aggregateBasket(basketSlug: string, marketCode: string) {
export async function aggregateAll() {
const configs = loadAllBasketConfigs();
let failed = 0;
for (const c of configs) {
await aggregateBasket(c.slug, c.marketCode);
try {
await aggregateBasket(c.slug, c.marketCode);
} catch (err) {
logger.warn(`aggregateBasket ${c.slug}:${c.marketCode} failed: ${err}`);
failed++;
}
}
if (failed > 0) throw new Error(`${failed}/${configs.length} basket(s) failed`);
}
if (import.meta.url === `file://${process.argv[1]}`) {

View File

@@ -93,7 +93,7 @@ export async function publishAll() {
const advanceSeedMeta =
freshnessSnapshot != null &&
freshnessSnapshot.retailers.some(
(r) => r.freshnessMin > 0 && r.freshnessMin < FRESH_DATA_THRESHOLD_MIN,
(r) => r.freshnessMin >= 0 && r.freshnessMin < FRESH_DATA_THRESHOLD_MIN,
);
logger.info(`Publishing snapshots for market: ${marketCode} (freshData=${advanceSeedMeta})`);

View File

@@ -94,6 +94,11 @@ export async function scrapeRetailer(slug: string) {
const fetchResult = await adapter.fetchTarget(ctx, target);
const products = await adapter.parseListing(ctx, fetchResult);
if (products.length === 0) {
logger.warn(` [${target.id}] parsed 0 products — counting as error`);
errorsCount++;
continue;
}
logger.info(` [${target.id}] parsed ${products.length} products`);
for (const product of products) {

View File

@@ -161,7 +161,7 @@ async function buildTopCategories(basketId: string): Promise<WMCategorySnapshot[
.replace(/^-|-$/g, '');
return {
slug,
name: r.category.charAt(0).toUpperCase() + r.category.slice(1),
name: r.category.replace(/_/g, ' ').replace(/\b\w/g, (c) => c.toUpperCase()),
wowPct,
momPct: 0, // TODO: requires 30-day baseline per category
currentIndex: Math.round(cur * 10) / 10,
@@ -230,6 +230,7 @@ export async function buildOverviewSnapshot(marketCode: string): Promise<WMOverv
`SELECT metric_value AS spread_pct FROM computed_indices ci
JOIN baskets b ON b.id = ci.basket_id
WHERE b.market_code = $1 AND ci.metric_key = 'retailer_spread_pct'
AND ci.metric_date >= CURRENT_DATE - INTERVAL '2 days'
ORDER BY ci.metric_date DESC LIMIT 1`,
[marketCode],
),
@@ -362,22 +363,45 @@ export async function buildRetailerSpreadSnapshot(
currency_code: string;
freshness_min: string | null;
}>(
`SELECT r.slug AS retailer_slug, r.name AS retailer_name, r.currency_code,
SUM(po.price) AS basket_total, COUNT(*) AS item_count,
EXTRACT(EPOCH FROM (NOW() - MAX(po.observed_at))) / 60 AS freshness_min
FROM baskets b
JOIN basket_items bi ON bi.basket_id = b.id AND bi.active = true
JOIN product_matches pm ON pm.basket_item_id = bi.id AND pm.match_status IN ('auto','approved')
JOIN retailer_products rp ON rp.id = pm.retailer_product_id AND rp.active = true
JOIN retailers r ON r.id = rp.retailer_id AND r.market_code = $2 AND r.active = true
JOIN LATERAL (
SELECT price, observed_at
FROM price_observations
WHERE retailer_product_id = rp.id AND in_stock = true
ORDER BY observed_at DESC LIMIT 1
) po ON true
WHERE b.slug = $1
GROUP BY r.slug, r.name, r.currency_code
`WITH retailer_item_best AS (
-- For each (retailer, basket_item), pick the cheapest in-stock latest price.
-- This deduplicates multiple matched SKUs per basket item per retailer.
SELECT r.id AS retailer_id, r.slug AS retailer_slug, r.name AS retailer_name,
r.currency_code, bi.id AS basket_item_id,
MIN(po.price) AS best_price,
MAX(po.observed_at) AS last_observed_at
FROM baskets b
JOIN basket_items bi ON bi.basket_id = b.id AND bi.active = true
JOIN product_matches pm ON pm.basket_item_id = bi.id AND pm.match_status IN ('auto','approved')
JOIN retailer_products rp ON rp.id = pm.retailer_product_id AND rp.active = true
JOIN retailers r ON r.id = rp.retailer_id AND r.market_code = $2 AND r.active = true
JOIN LATERAL (
SELECT price, observed_at
FROM price_observations
WHERE retailer_product_id = rp.id AND in_stock = true
ORDER BY observed_at DESC LIMIT 1
) po ON true
WHERE b.slug = $1
GROUP BY r.id, r.slug, r.name, r.currency_code, bi.id
),
retailer_ids AS (
SELECT DISTINCT retailer_id FROM retailer_item_best
),
-- Only include basket items that every active retailer covers.
-- Comparing totals across different item counts is invalid.
common_items AS (
SELECT basket_item_id
FROM retailer_item_best
GROUP BY basket_item_id
HAVING COUNT(DISTINCT retailer_id) = (SELECT COUNT(*) FROM retailer_ids)
)
SELECT rib.retailer_slug, rib.retailer_name, rib.currency_code,
SUM(rib.best_price) AS basket_total,
COUNT(*) AS item_count,
EXTRACT(EPOCH FROM (NOW() - MAX(rib.last_observed_at))) / 60 AS freshness_min
FROM retailer_item_best rib
JOIN common_items ci ON ci.basket_item_id = rib.basket_item_id
GROUP BY rib.retailer_slug, rib.retailer_name, rib.currency_code
ORDER BY basket_total ASC`,
[basketSlug, marketCode],
);