mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
feat(consumer-prices): product pinning, BigBasket fix, spread threshold, retailer sync (#2136)
* feat(consumer-prices): product pinning, BigBasket fix, spread threshold, retailer sync
Implements scraper stability plan to prevent URL churn between runs:
Product pinning (core fix):
- Migration 007: adds pin_disabled_at, consecutive_out_of_stock, pin_error_count columns
- After first successful Exa+Firecrawl match, reuse the stored URL directly on subsequent
runs without re-running Exa. Stale pins are soft-disabled (never deleted) after 3x OOS
or 3x fetch errors, triggering automatic Exa rediscovery on the next run.
- On direct-path failure, falls back to normal Exa flow in the same run.
- Compound map key "basketSlug:canonicalName" prevents collisions in multi-basket markets.
Retailer active-state sync:
- getOrCreateRetailer now writes active=config.enabled on every upsert.
- scrapeAll iterates ALL configs (not just enabled) so disabled retailers get synced to DB.
- Eliminates need for manual SQL hotfixes to set active=false.
Analytics correctness:
- All product_matches reads in aggregate, validate, worldmonitor snapshot now filter
pin_disabled_at IS NULL so soft-disabled stale matches don't skew indices.
- getBaselinePrices adds missing match_status IN ('auto','approved') guard.
BigBasket IN fix:
- inStockFromPrice: true flag overrides out-of-stock when price > 0.
- BigBasket gates on delivery pincode, not product availability — Firecrawl misread
the pincode gate as out-of-stock for all 12 basket items.
Spread reliability:
- Minimum 4 common categories required to compute retailer_spread_pct.
- Writes explicit 0 when below threshold to prevent stale noisy value persisting.
- US spread 134.8% from single cooking_oil pair is now suppressed.
Other:
- Tamimi SA: better Exa query template targeting tamimimarkets.com directly.
- Remove KE from frontend MARKETS array (basket data preserved in DB).
- 13 new vitest unit tests covering pinning, inStockFromPrice, host validation.
* fix(scrape): distinguish wasDirectHit from isDirect to close pin-fallback loop
When a pinned target falls back to Exa (fetchTarget sets direct:false in
payload), the old code read isDirect from target.metadata (always true),
causing two bugs:
1. upsertProductMatch was skipped → new Exa URL never pinned
2. Stale-pin counters reset on any pin target → broken pins never disabled
Introduce wasDirectHit = isDirect && rawPayload.direct === true so:
- Stale-pin maintenance (OOS/reset) only fires when pin URL was actually used
- Exa fallback (isDirect && !wasDirectHit) fires handlePinError on old pin
- upsertProductMatch guard uses !wasDirectHit so fallback results get pinned
This commit is contained in:
@@ -11,6 +11,7 @@ retailer:
|
||||
numResults: 5
|
||||
urlPathContains: /pd/
|
||||
queryTemplate: "{canonicalName} grocery {market} {currency} price"
|
||||
inStockFromPrice: true # BigBasket gates on delivery pincode, not product availability
|
||||
|
||||
rateLimit:
|
||||
requestsPerMinute: 10
|
||||
|
||||
@@ -9,7 +9,8 @@ retailer:
|
||||
|
||||
searchConfig:
|
||||
numResults: 5
|
||||
queryTemplate: "{canonicalName} grocery {market} {currency} price"
|
||||
queryTemplate: "{canonicalName} tamimi markets"
|
||||
urlPathContains: /product
|
||||
|
||||
rateLimit:
|
||||
requestsPerMinute: 10
|
||||
|
||||
17
consumer-prices-core/migrations/007_pinning_columns.sql
Normal file
17
consumer-prices-core/migrations/007_pinning_columns.sql
Normal file
@@ -0,0 +1,17 @@
|
||||
-- Task 1: Product pinning infrastructure
|
||||
-- pin_disabled_at: soft-disable stale pins (NEVER delete product_matches rows)
|
||||
-- consecutive_out_of_stock: tracks OOS streak for stale-pin detection
|
||||
-- pin_error_count: tracks Firecrawl fetch/parse failures for stale-pin detection
|
||||
|
||||
ALTER TABLE product_matches
|
||||
ADD COLUMN IF NOT EXISTS pin_disabled_at TIMESTAMPTZ;
|
||||
|
||||
ALTER TABLE retailer_products
|
||||
ADD COLUMN IF NOT EXISTS consecutive_out_of_stock INT NOT NULL DEFAULT 0,
|
||||
ADD COLUMN IF NOT EXISTS pin_error_count INT NOT NULL DEFAULT 0;
|
||||
|
||||
-- Partial index for fast O(1) pin lookup per (basket_item, retailer_product)
|
||||
-- Only active, approved/auto matches need to be scanned for pins.
|
||||
CREATE INDEX IF NOT EXISTS idx_pm_basket_active_pin
|
||||
ON product_matches(basket_item_id, retailer_product_id)
|
||||
WHERE pin_disabled_at IS NULL AND match_status IN ('auto', 'approved');
|
||||
334
consumer-prices-core/plans/scraper-stability.md
Normal file
334
consumer-prices-core/plans/scraper-stability.md
Normal file
@@ -0,0 +1,334 @@
|
||||
# Consumer Prices Scraper Stability Plan (Rev 3 — Final)
|
||||
|
||||
## Problems being solved
|
||||
|
||||
| # | Problem | Impact |
|
||||
|---|---------|--------|
|
||||
| 1 | Exa re-discovers different product URLs each run | Spread/index volatility, no stable WoW |
|
||||
| 2 | BigBasket: all observations in_stock=false | IN market completely dark |
|
||||
| 3 | Disabled retailers stay active=true in DB | Pollutes health view |
|
||||
| 4 | Spread computed on 1-2 overlapping categories | US spread 134.8% from single pair |
|
||||
| 5 | Tamimi SA: 0 products | SA market dark |
|
||||
| 6 | Naivas KE: disabled but shown in frontend MARKETS | KE shown as active with no data |
|
||||
|
||||
---
|
||||
|
||||
## Core: product pinning with soft-disable
|
||||
|
||||
product_matches rows are NEVER deleted. Stale pins set pin_disabled_at.
|
||||
ALL analytics queries that read product_matches must filter pin_disabled_at IS NULL.
|
||||
Exa rediscovery of the same URL clears pin_disabled_at to reactivate.
|
||||
|
||||
Flow:
|
||||
check product_matches for active pin (pin_disabled_at IS NULL)
|
||||
pin exists -> Firecrawl(pinned url) directly
|
||||
success + in_stock -> reset counters (consecutive_out_of_stock=0, pin_error_count=0)
|
||||
success + out_of_stock -> increment consecutive_out_of_stock; if >=3: soft-disable
|
||||
zero products (no throw) -> increment pin_error_count; if >=3: soft-disable
|
||||
exception (throw) -> increment pin_error_count; if >=3: soft-disable
|
||||
no active pin -> Exa(search) -> Firecrawl -> upsertProductMatch (which clears pin_disabled_at)
|
||||
|
||||
---
|
||||
|
||||
## Task 1 — Migration 007
|
||||
|
||||
File: migrations/007_pinning_columns.sql
|
||||
|
||||
ALTER TABLE product_matches
|
||||
ADD COLUMN IF NOT EXISTS pin_disabled_at TIMESTAMPTZ;
|
||||
|
||||
ALTER TABLE retailer_products
|
||||
ADD COLUMN IF NOT EXISTS consecutive_out_of_stock INT NOT NULL DEFAULT 0,
|
||||
ADD COLUMN IF NOT EXISTS pin_error_count INT NOT NULL DEFAULT 0;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_pm_basket_active_pin
|
||||
ON product_matches(basket_item_id, retailer_product_id)
|
||||
WHERE pin_disabled_at IS NULL AND match_status IN ('auto', 'approved');
|
||||
|
||||
Note: non-concurrent index; brief write lock acceptable at current scale.
|
||||
Run a row count check before relying on integrity claims: verify product_matches
|
||||
count and basket_items count in a pre-deploy preflight.
|
||||
|
||||
---
|
||||
|
||||
## Task 2 — getPinnedUrlsForRetailer
|
||||
|
||||
Joins through retailer_products.retailer_id — no new column on product_matches.
|
||||
|
||||
SELECT DISTINCT ON (pm.basket_item_id)
|
||||
cp.canonical_name,
|
||||
b.slug AS basket_slug,
|
||||
rp.source_url,
|
||||
rp.id AS product_id,
|
||||
pm.id AS match_id -- carry matchId for precise soft-disable updates
|
||||
FROM product_matches pm
|
||||
JOIN retailer_products rp ON rp.id = pm.retailer_product_id
|
||||
JOIN basket_items bi ON bi.id = pm.basket_item_id
|
||||
JOIN baskets b ON b.id = bi.basket_id
|
||||
JOIN canonical_products cp ON cp.id = bi.canonical_product_id
|
||||
WHERE rp.retailer_id = $1
|
||||
AND pm.match_status IN ('auto', 'approved')
|
||||
AND pm.pin_disabled_at IS NULL
|
||||
AND rp.consecutive_out_of_stock < 3
|
||||
AND rp.pin_error_count < 3
|
||||
ORDER BY pm.basket_item_id, pm.match_score DESC
|
||||
|
||||
Returns Map<"basketSlug:canonicalName", { sourceUrl, productId, matchId }>.
|
||||
Compound key prevents collisions if multi-basket-per-market ever exists.
|
||||
|
||||
---
|
||||
|
||||
## Task 3 — AdapterContext types
|
||||
|
||||
Add retailerId and pinnedUrls to AdapterContext interface.
|
||||
|
||||
---
|
||||
|
||||
## Task 4 — scrape.ts changes
|
||||
|
||||
### 4a — getOrCreateRetailer: write active + call before early return
|
||||
|
||||
scrapeAll() MUST iterate loadAllRetailerConfigs() WITHOUT .filter((c) => c.enabled).
|
||||
All configs (enabled AND disabled) are passed to scrapeRetailer workers.
|
||||
scrapeRetailer() upserts active first, then returns early for disabled ones.
|
||||
|
||||
async function getOrCreateRetailer(slug, config):
|
||||
INSERT INTO retailers (..., active)
|
||||
VALUES (..., $7)
|
||||
ON CONFLICT (slug) DO UPDATE SET
|
||||
name=..., adapter_key=..., base_url=..., active=EXCLUDED.active, updated_at=NOW()
|
||||
RETURNING id
|
||||
-- $7 = config.enabled
|
||||
|
||||
In scrapeRetailer():
|
||||
const retailerId = await getOrCreateRetailer(slug, config); // MOVED BEFORE GUARD
|
||||
if (!config.enabled) { logger.info('disabled, skipping'); return; }
|
||||
// rest unchanged
|
||||
|
||||
In scrapeAll():
|
||||
const configs = await loadAllRetailerConfigs(); // NO .filter((c) => c.enabled)
|
||||
await Promise.allSettled(configs.map((c) => scrapeRetailer(c, runId)));
|
||||
|
||||
This fixes:
|
||||
|
||||
- Batch scrape (scrapeAll): disabled retailers still get DB sync
|
||||
- Single-retailer CLI (scrapeRetailer via main)
|
||||
- No separate syncRetailersFromConfig needed
|
||||
|
||||
### 4b — Load pins before discoverTargets
|
||||
|
||||
const pinnedUrls = await getPinnedUrlsForRetailer(retailerId);
|
||||
logger.info(`${slug}: ${pinnedUrls.size} pins loaded`);
|
||||
const ctx = { config, runId, logger, retailerId, pinnedUrls };
|
||||
|
||||
### 4c — Stale-pin maintenance
|
||||
|
||||
Two failure modes tracked separately:
|
||||
|
||||
After observation insert (direct targets):
|
||||
if (inStock) -> reset both counters to 0
|
||||
if (!inStock) -> increment consecutive_out_of_stock; if >=3: soft-disable via matchId
|
||||
|
||||
After zero-products (products.length === 0, no throw) for direct targets:
|
||||
call handlePinError(productId, matchId, target.id, logger)
|
||||
|
||||
In catch block for direct targets:
|
||||
call handlePinError(productId, matchId, target.id, logger)
|
||||
|
||||
handlePinError:
|
||||
UPDATE retailer_products SET pin_error_count = pin_error_count + 1 WHERE id = $productId
|
||||
RETURNING pin_error_count
|
||||
if >= 3: UPDATE product_matches SET pin_disabled_at = NOW() WHERE id = $matchId
|
||||
|
||||
Soft-disable uses matchId from pinned target metadata for precision.
|
||||
On next run: no active pin found -> Exa re-discovery triggered automatically.
|
||||
|
||||
### 4d — Skip upsertProductMatch for direct targets
|
||||
|
||||
Existing match already present; creating a new one is wrong.
|
||||
Guard: if (!target.metadata?.direct && adapter === 'search' && ...) { upsertMatch }
|
||||
|
||||
---
|
||||
|
||||
## Task 5 — upsertProductMatch: clear pin_disabled_at on upsert
|
||||
|
||||
When Exa rediscovers a URL and calls upsertProductMatch, reactivate the pin.
|
||||
|
||||
UPDATE product_matches SET
|
||||
basket_item_id = EXCLUDED.basket_item_id,
|
||||
match_score = EXCLUDED.match_score,
|
||||
match_status = EXCLUDED.match_status,
|
||||
pin_disabled_at = NULL -- reactivate on fresh discovery
|
||||
WHERE ...
|
||||
|
||||
Also reset counters on retailer_products when a match is successfully upserted:
|
||||
UPDATE retailer_products SET consecutive_out_of_stock=0, pin_error_count=0 WHERE id=$productId
|
||||
|
||||
---
|
||||
|
||||
## Task 6 — ALL analytics: add pin_disabled_at IS NULL filter
|
||||
|
||||
Files to update:
|
||||
|
||||
- src/jobs/aggregate.ts: getBasketRows query — add AND pm.pin_disabled_at IS NULL
|
||||
- src/jobs/aggregate.ts: getBaselinePrices query — add BOTH AND pm.match_status IN ('auto', 'approved') [MISSING entirely today] AND pm.pin_disabled_at IS NULL
|
||||
- src/snapshots/worldmonitor.ts: retailer spread query — add AND pm.pin_disabled_at IS NULL
|
||||
- src/jobs/validate.ts: match-reading query — add AND pm.pin_disabled_at IS NULL
|
||||
|
||||
Without this, soft-disabled matches (stale products) still skew indices, baselines,
|
||||
spread calculations, and validation results.
|
||||
|
||||
Note: getBaselinePrices currently has NO match_status guard at all. Adding both filters
|
||||
is required. Without match_status IN ('auto','approved'), rejected/pending matches
|
||||
can corrupt index baselines.
|
||||
|
||||
---
|
||||
|
||||
## Task 7 — SearchAdapter: pin branch + direct path
|
||||
|
||||
### discoverTargets:
|
||||
|
||||
For each basket item, look up ctx.pinnedUrls with compound key "basketSlug:canonicalName".
|
||||
Validate pinned URL with isAllowedHost(url, domain) before using.
|
||||
|
||||
If valid pin: return target with metadata { direct: true, pinnedProductId, matchId }
|
||||
Else: return search target (Exa path, unchanged)
|
||||
|
||||
### fetchTarget:
|
||||
|
||||
Extract Firecrawl logic into _extractFromUrl(ctx, url, canonicalName, currency).
|
||||
For direct targets: validate isAllowedHost + http/https scheme, call _extractFromUrl.
|
||||
For Exa targets: existing Exa -> _extractFromUrl flow.
|
||||
Log when a stored pin is rejected by isAllowedHost.
|
||||
|
||||
---
|
||||
|
||||
## Task 8 — BigBasket: inStockFromPrice flag
|
||||
|
||||
Add inStockFromPrice: boolean to SearchConfigSchema (default false).
|
||||
In _extractFromUrl: if inStockFromPrice && price > 0: set inStock=true + log override.
|
||||
Add to bigbasket_in.yaml: inStockFromPrice: true
|
||||
|
||||
---
|
||||
|
||||
## Task 9 — Spread: minimum coverage + explicit 0 sentinel
|
||||
|
||||
aggregate.ts:
|
||||
if (commonItemIds.length >= 4) { compute spread }
|
||||
else { write retailer_spread_pct = 0 } // explicit 0 prevents stale value persisting
|
||||
|
||||
snapshots/worldmonitor.ts buildRetailerSpreadSnapshot:
|
||||
apply same MIN_SPREAD_ITEMS=4 threshold; return spreadPct=0 when below.
|
||||
|
||||
---
|
||||
|
||||
## Task 10 — Tamimi SA query tweak
|
||||
|
||||
Change queryTemplate to: "{canonicalName} tamimi markets"
|
||||
Add urlPathContains: /product
|
||||
Disable with dated comment if still 0 after one run.
|
||||
|
||||
---
|
||||
|
||||
## Task 11 — Cross-repo: remove KE from frontend MARKETS
|
||||
|
||||
In worldmonitor repo: src/services/consumer-prices/index.ts
|
||||
Remove ke from MARKETS array until a working KE retailer is validated.
|
||||
KE basket data stays in DB.
|
||||
Note: publish.ts already only includes markets with enabled retailers, so
|
||||
this is a UI-layer cleanup, not a data concern.
|
||||
|
||||
---
|
||||
|
||||
## Task 12 — Tests (vitest)
|
||||
|
||||
tests/unit/pinning.test.ts:
|
||||
|
||||
- getPinnedUrlsForRetailer excludes pin_disabled_at IS NOT NULL
|
||||
- getPinnedUrlsForRetailer excludes consecutive_out_of_stock >= 3 and pin_error_count >= 3
|
||||
- discoverTargets returns direct=true when valid pin and isAllowedHost passes
|
||||
- discoverTargets returns direct=false when no pin, invalid host, or non-http(s) scheme
|
||||
- fetchTarget skips Exa for direct=true targets
|
||||
- Reactivation: upsertProductMatch clears pin_disabled_at on same URL rediscovery
|
||||
- Soft-disable via matchId: OOS path (3x) sets pin_disabled_at; match row NOT deleted
|
||||
- Soft-disable via matchId: error path (3x) sets pin_disabled_at; match row NOT deleted
|
||||
- Zero-products for direct target: triggers handlePinError same as exception path
|
||||
- getBasketRows excludes rows where pm.pin_disabled_at IS NOT NULL
|
||||
- getBaselinePrices excludes rows where pm.pin_disabled_at IS NOT NULL
|
||||
- getBaselinePrices excludes rows where pm.match_status NOT IN ('auto','approved')
|
||||
- scrapeAll passes disabled configs to scrapeRetailer (no enabled filter)
|
||||
- scrapeRetailer calls getOrCreateRetailer before early-return for disabled configs
|
||||
|
||||
tests/unit/in-stock-from-price.test.ts:
|
||||
|
||||
- inStockFromPrice=true + price>0: inStock=true + log message
|
||||
- inStockFromPrice=true + price=0: inStock unchanged
|
||||
- inStockFromPrice=false: inStock unchanged
|
||||
|
||||
tests/unit/spread-threshold.test.ts:
|
||||
|
||||
- aggregateBasket writes spread=0 explicitly when commonItems < 4
|
||||
- aggregateBasket writes computed spread when commonItems >= 4
|
||||
- buildRetailerSpreadSnapshot returns spreadPct=0 below threshold
|
||||
|
||||
tests/unit/retailer-sync.test.ts:
|
||||
|
||||
- getOrCreateRetailer writes active=false for disabled config (via ON CONFLICT UPDATE)
|
||||
- getOrCreateRetailer writes active=true for enabled config
|
||||
- scrapeRetailer calls getOrCreateRetailer before early-return for disabled configs
|
||||
|
||||
---
|
||||
|
||||
## Immediate SQL hotfix
|
||||
|
||||
The getOrCreateRetailer fix supersedes manual hotfixes once deployed.
|
||||
For now, run this to fix DB state immediately:
|
||||
|
||||
UPDATE retailers SET active = false WHERE slug IN (
|
||||
'coop_ch', 'migros_ch', 'sainsburys_gb',
|
||||
'naivas_ke', 'wholefoods_us', 'adcoop_ae'
|
||||
);
|
||||
|
||||
(All six slugs whose YAML has enabled: false)
|
||||
|
||||
---
|
||||
|
||||
## Execution order
|
||||
|
||||
1. SQL hotfix on Railway DB (all 6 disabled slugs)
|
||||
2. git checkout -b fix/scraper-stability origin/main (consumer-prices-core repo)
|
||||
3. Task 1 — migration 007
|
||||
4. Task 8 — inStockFromPrice
|
||||
5. Task 9 — spread threshold + sentinel
|
||||
6. Task 10 — tamimi SA query tweak
|
||||
7. Task 2 — getPinnedUrlsForRetailer
|
||||
8. Task 3 — AdapterContext types
|
||||
9. Task 4a — getOrCreateRetailer with active sync (call before early return)
|
||||
10. Task 4b — pin loading in scrapeRetailer
|
||||
11. Task 4c/4d — stale-pin maintenance + match guard
|
||||
12. Task 5 — upsertProductMatch clears pin_disabled_at + resets counters
|
||||
13. Task 6 — add pin_disabled_at IS NULL to all analytics queries
|
||||
14. Task 7 — discoverTargets + fetchTarget direct path
|
||||
15. Task 12 — tests
|
||||
16. npm run migrate
|
||||
17. npm run jobs:scrape
|
||||
18. Verify: bigbasket_in in_stock counts, no product_matches rows deleted, disabled retailers active=false
|
||||
19. npm run jobs:aggregate && npm run jobs:publish
|
||||
20. PR in consumer-prices-core repo
|
||||
21. Separate PR in worldmonitor repo: remove ke from MARKETS (Task 11)
|
||||
|
||||
---
|
||||
|
||||
## Expected outcomes
|
||||
|
||||
| Market | Before | After |
|
||||
|--------|--------|-------|
|
||||
| AE | Spread volatile | Stable (pinned SKUs every run) |
|
||||
| IN | 0 in-stock | 12 items covered via inStockFromPrice |
|
||||
| GB | 1/12 drifting | Pinned Tesco URLs reused |
|
||||
| US | Spread 134.8% noise | Spread = 0 until >= 4 categories overlap |
|
||||
| SA | 0 products | Better Exa query; disable if still 0 |
|
||||
| KE | disabled but shown | Removed from frontend MARKETS |
|
||||
| Historical matches | intact | Still intact (soft-disable only, never deleted) |
|
||||
| Disabled retailers | active=true in DB | active=false via getOrCreateRetailer upsert |
|
||||
| WoW | 0 everywhere | Appears March 29+ with stable index data |
|
||||
@@ -4,6 +4,10 @@
|
||||
* Stage 1 (Exa): neural search on retailer domain → ranked product page URLs
|
||||
* Stage 2 (Firecrawl): structured LLM extraction from the confirmed URL → {price, currency, inStock}
|
||||
*
|
||||
* Pin path: if a matching pin exists in ctx.pinnedUrls, Exa is skipped and Firecrawl
|
||||
* is called directly on the stored URL. On failure, falls back to the normal Exa flow
|
||||
* in the same run so the basket item is never left uncovered.
|
||||
*
|
||||
* Replaces ExaSearchAdapter's fragile regex-on-AI-summary approach.
|
||||
* Firecrawl renders JS so dynamic prices (Noon, etc.) are visible.
|
||||
* Domain allowlist + title plausibility check prevent wrong-product and SSRF risks.
|
||||
@@ -59,8 +63,8 @@ export function isTitlePlausible(canonicalName: string, productName: string | un
|
||||
*/
|
||||
export function isAllowedHost(url: string, allowedHost: string): boolean {
|
||||
try {
|
||||
const { hostname } = new URL(url);
|
||||
return hostname === allowedHost;
|
||||
const { hostname, protocol } = new URL(url);
|
||||
return (protocol === 'http:' || protocol === 'https:') && hostname === allowedHost;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
@@ -79,6 +83,9 @@ interface SearchPayload {
|
||||
canonicalName: string;
|
||||
basketSlug: string;
|
||||
itemCategory: string;
|
||||
direct?: boolean;
|
||||
pinnedProductId?: string;
|
||||
matchId?: string;
|
||||
}
|
||||
|
||||
export class SearchAdapter implements RetailerAdapter {
|
||||
@@ -102,6 +109,28 @@ export class SearchAdapter implements RetailerAdapter {
|
||||
|
||||
for (const basket of baskets) {
|
||||
for (const item of basket.items) {
|
||||
const pinKey = `${basket.slug}:${item.canonicalName}`;
|
||||
const pinned = ctx.pinnedUrls?.get(pinKey);
|
||||
|
||||
if (pinned && isAllowedHost(pinned.sourceUrl, domain)) {
|
||||
targets.push({
|
||||
id: item.id,
|
||||
url: pinned.sourceUrl,
|
||||
category: item.category,
|
||||
metadata: {
|
||||
canonicalName: item.canonicalName,
|
||||
domain,
|
||||
basketSlug: basket.slug,
|
||||
currency: ctx.config.currencyCode,
|
||||
direct: true,
|
||||
pinnedProductId: pinned.productId,
|
||||
matchId: pinned.matchId,
|
||||
},
|
||||
});
|
||||
} else {
|
||||
if (pinned) {
|
||||
ctx.logger.warn(` [pin] rejected stored URL for "${item.canonicalName}" (host mismatch): ${pinned.sourceUrl}`);
|
||||
}
|
||||
targets.push({
|
||||
id: item.id,
|
||||
url: ctx.config.baseUrl,
|
||||
@@ -111,26 +140,98 @@ export class SearchAdapter implements RetailerAdapter {
|
||||
domain,
|
||||
basketSlug: basket.slug,
|
||||
currency: ctx.config.currencyCode,
|
||||
direct: false,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return targets;
|
||||
}
|
||||
|
||||
private async _extractFromUrl(
|
||||
ctx: AdapterContext,
|
||||
url: string,
|
||||
canonicalName: string,
|
||||
currency: string,
|
||||
): Promise<ExtractedProduct | null> {
|
||||
const extractSchema = {
|
||||
prompt: `Find the listed retail price of this product in ${currency}. The price may be displayed as two parts split across lines — like "3" and ".95" next to "${currency}" — combine them to get 3.95. Return the listed price even if the product is currently out of stock. Return the product name, the numeric price in ${currency}, the currency code, and whether it is in stock.`,
|
||||
fields: {
|
||||
productName: { type: 'string' as const, description: 'Name or title of the product' },
|
||||
price: { type: 'number' as const, description: `Retail price in ${currency} as a single number (e.g. 4.69)` },
|
||||
currency: { type: 'string' as const, description: `Currency code, should be ${currency}` },
|
||||
inStock: { type: 'boolean' as const, description: 'Whether the product is currently in stock and purchasable' },
|
||||
},
|
||||
};
|
||||
|
||||
const result = await this.firecrawl.extract<ExtractedProduct>(url, extractSchema, { timeout: 30_000 });
|
||||
const data = result.data;
|
||||
const price = data?.price;
|
||||
|
||||
if (typeof price !== 'number' || !Number.isFinite(price) || price <= 0) {
|
||||
return null;
|
||||
}
|
||||
if (!isTitlePlausible(canonicalName, data.productName)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// inStockFromPrice: some retailers (e.g. BigBasket) gate on delivery pincode, not product
|
||||
// availability. Firecrawl misreads the gate as out-of-stock. If price > 0, treat as in-stock.
|
||||
if (ctx.config.searchConfig?.inStockFromPrice && price > 0) {
|
||||
ctx.logger.info(` [search:extract] ${canonicalName}: inStockFromPrice override (price=${price})`);
|
||||
data.inStock = true;
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
async fetchTarget(ctx: AdapterContext, target: Target): Promise<FetchResult> {
|
||||
const { canonicalName, domain, currency, basketSlug } = target.metadata as {
|
||||
const { canonicalName, domain, currency, basketSlug, direct, pinnedProductId, matchId } = target.metadata as {
|
||||
canonicalName: string;
|
||||
domain: string;
|
||||
currency: string;
|
||||
basketSlug: string;
|
||||
direct: boolean;
|
||||
pinnedProductId?: string;
|
||||
matchId?: string;
|
||||
};
|
||||
|
||||
// Direct path: skip Exa, call Firecrawl on pinned URL
|
||||
if (direct) {
|
||||
try {
|
||||
const extracted = await this._extractFromUrl(ctx, target.url, canonicalName, currency);
|
||||
if (extracted) {
|
||||
ctx.logger.info(
|
||||
` [search:pin] ${canonicalName}: price=${extracted.price} ${extracted.currency} from ${target.url}`,
|
||||
);
|
||||
return {
|
||||
url: target.url,
|
||||
html: JSON.stringify({
|
||||
extracted,
|
||||
productUrl: target.url,
|
||||
canonicalName,
|
||||
basketSlug,
|
||||
itemCategory: target.category,
|
||||
direct: true,
|
||||
pinnedProductId,
|
||||
matchId,
|
||||
} satisfies SearchPayload),
|
||||
statusCode: 200,
|
||||
fetchedAt: new Date(),
|
||||
};
|
||||
}
|
||||
ctx.logger.warn(` [search:pin] ${canonicalName}: pin extraction failed, falling back to Exa`);
|
||||
} catch (err) {
|
||||
ctx.logger.warn(` [search:pin] ${canonicalName}: pin fetch error, falling back to Exa: ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
const marketName = MARKET_NAMES[ctx.config.marketCode] ?? ctx.config.marketCode.toUpperCase();
|
||||
const cfg = ctx.config.searchConfig;
|
||||
|
||||
const query = cfg?.queryTemplate
|
||||
const searchQuery = cfg?.queryTemplate
|
||||
? cfg.queryTemplate
|
||||
.replace('{canonicalName}', canonicalName)
|
||||
.replace('{category}', target.category)
|
||||
@@ -140,7 +241,7 @@ export class SearchAdapter implements RetailerAdapter {
|
||||
: `${canonicalName} grocery ${marketName} ${currency}`.trim();
|
||||
|
||||
// Stage 1: Exa URL discovery
|
||||
const exaResults = await this.exa.search(query, {
|
||||
const exaResults = await this.exa.search(searchQuery, {
|
||||
numResults: cfg?.numResults ?? 3,
|
||||
includeDomains: [domain],
|
||||
});
|
||||
@@ -163,41 +264,19 @@ export class SearchAdapter implements RetailerAdapter {
|
||||
}
|
||||
|
||||
// Stage 2: Firecrawl structured extraction — iterate safe URLs until one yields a valid price
|
||||
const extractSchema = {
|
||||
prompt: `Find the listed retail price of this product in ${currency}. The price may be displayed as two parts split across lines — like "3" and ".95" next to "${currency}" — combine them to get 3.95. Return the listed price even if the product is currently out of stock. Return the product name, the numeric price in ${currency}, the currency code, and whether it is in stock.`,
|
||||
fields: {
|
||||
productName: { type: 'string' as const, description: 'Name or title of the product' },
|
||||
price: { type: 'number' as const, description: `Retail price in ${currency} as a single number (e.g. 4.69)` },
|
||||
currency: { type: 'string' as const, description: `Currency code, should be ${currency}` },
|
||||
inStock: { type: 'boolean' as const, description: 'Whether the product is currently in stock and purchasable' },
|
||||
},
|
||||
};
|
||||
|
||||
let extracted: ExtractedProduct | null = null;
|
||||
let usedUrl = safeUrls[0];
|
||||
const lastErrors: string[] = [];
|
||||
|
||||
for (const url of safeUrls) {
|
||||
try {
|
||||
const result = await this.firecrawl.extract<ExtractedProduct>(url, extractSchema, { timeout: 30_000 });
|
||||
const data = result.data;
|
||||
const price = data?.price;
|
||||
|
||||
if (typeof price !== 'number' || !Number.isFinite(price) || price <= 0) {
|
||||
ctx.logger.warn(` [search:extract] ${canonicalName}: no price from ${url}, trying next`);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!isTitlePlausible(canonicalName, data.productName)) {
|
||||
ctx.logger.warn(
|
||||
` [search:extract] ${canonicalName}: title mismatch "${data.productName}" at ${url}, trying next`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
extracted = data;
|
||||
const result = await this._extractFromUrl(ctx, url, canonicalName, currency);
|
||||
if (result) {
|
||||
extracted = result;
|
||||
usedUrl = url;
|
||||
break;
|
||||
}
|
||||
ctx.logger.warn(` [search:extract] ${canonicalName}: no price or title mismatch at ${url}, trying next`);
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
ctx.logger.warn(` [search:extract] ${canonicalName}: Firecrawl error on ${url}: ${msg}`);
|
||||
@@ -223,6 +302,7 @@ export class SearchAdapter implements RetailerAdapter {
|
||||
canonicalName,
|
||||
basketSlug,
|
||||
itemCategory: target.category,
|
||||
direct: false,
|
||||
} satisfies SearchPayload),
|
||||
statusCode: 200,
|
||||
fetchedAt: new Date(),
|
||||
@@ -230,7 +310,7 @@ export class SearchAdapter implements RetailerAdapter {
|
||||
}
|
||||
|
||||
async parseListing(ctx: AdapterContext, result: FetchResult): Promise<ParsedProduct[]> {
|
||||
const { extracted, productUrl, canonicalName, basketSlug, itemCategory } =
|
||||
const { extracted, productUrl, canonicalName, basketSlug, itemCategory, direct, pinnedProductId, matchId } =
|
||||
JSON.parse(result.html) as SearchPayload;
|
||||
|
||||
const priceResult = z.number().positive().finite().safeParse(extracted?.price);
|
||||
@@ -262,7 +342,7 @@ export class SearchAdapter implements RetailerAdapter {
|
||||
// inStock defaults to true when Firecrawl does not return the field.
|
||||
// This is a conservative assumption — monitor for out-of-stock false positives.
|
||||
inStock: extracted.inStock ?? true,
|
||||
rawPayload: { extracted, basketSlug, itemCategory, canonicalName },
|
||||
rawPayload: { extracted, basketSlug, itemCategory, canonicalName, direct, pinnedProductId, matchId },
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
@@ -20,6 +20,8 @@ export interface AdapterContext {
|
||||
config: RetailerConfig;
|
||||
runId: string;
|
||||
logger: { info: (msg: string, ...args: unknown[]) => void; warn: (msg: string, ...args: unknown[]) => void; error: (msg: string, ...args: unknown[]) => void };
|
||||
retailerId?: string;
|
||||
pinnedUrls?: Map<string, { sourceUrl: string; productId: string; matchId: string }>;
|
||||
}
|
||||
|
||||
export interface Target {
|
||||
|
||||
@@ -53,6 +53,7 @@ export const SearchConfigSchema = z.object({
|
||||
numResults: z.number().default(3),
|
||||
queryTemplate: z.string().optional(),
|
||||
urlPathContains: z.string().optional(),
|
||||
inStockFromPrice: z.boolean().default(false),
|
||||
});
|
||||
|
||||
export const RetailerConfigSchema = z.object({
|
||||
|
||||
@@ -15,7 +15,8 @@ export async function upsertProductMatch(input: {
|
||||
DO UPDATE SET
|
||||
basket_item_id = EXCLUDED.basket_item_id,
|
||||
match_score = EXCLUDED.match_score,
|
||||
match_status = EXCLUDED.match_status`,
|
||||
match_status = EXCLUDED.match_status,
|
||||
pin_disabled_at = NULL`,
|
||||
[
|
||||
input.retailerProductId,
|
||||
input.canonicalProductId,
|
||||
@@ -24,6 +25,13 @@ export async function upsertProductMatch(input: {
|
||||
input.matchStatus,
|
||||
],
|
||||
);
|
||||
// Reset stale counters when Exa re-discovers a product — fresh match means the URL works.
|
||||
await query(
|
||||
`UPDATE retailer_products
|
||||
SET consecutive_out_of_stock = 0, pin_error_count = 0
|
||||
WHERE id = $1`,
|
||||
[input.retailerProductId],
|
||||
);
|
||||
}
|
||||
|
||||
export async function getBasketItemId(basketSlug: string, canonicalName: string): Promise<string | null> {
|
||||
@@ -37,3 +45,43 @@ export async function getBasketItemId(basketSlug: string, canonicalName: string)
|
||||
);
|
||||
return result.rows[0]?.id ?? null;
|
||||
}
|
||||
|
||||
export async function getPinnedUrlsForRetailer(
|
||||
retailerId: string,
|
||||
): Promise<Map<string, { sourceUrl: string; productId: string; matchId: string }>> {
|
||||
// Returns Map<"basketSlug:canonicalName", { sourceUrl, productId, matchId }>
|
||||
// Compound key prevents collisions if multi-basket-per-market ever exists.
|
||||
// Excludes soft-disabled pins, and products with OOS/error counters >= 3.
|
||||
const result = await query<{
|
||||
canonical_name: string;
|
||||
basket_slug: string;
|
||||
source_url: string;
|
||||
product_id: string;
|
||||
match_id: string;
|
||||
}>(
|
||||
`SELECT DISTINCT ON (pm.basket_item_id)
|
||||
cp.canonical_name,
|
||||
b.slug AS basket_slug,
|
||||
rp.source_url,
|
||||
rp.id AS product_id,
|
||||
pm.id AS match_id
|
||||
FROM product_matches pm
|
||||
JOIN retailer_products rp ON rp.id = pm.retailer_product_id
|
||||
JOIN basket_items bi ON bi.id = pm.basket_item_id
|
||||
JOIN baskets b ON b.id = bi.basket_id
|
||||
JOIN canonical_products cp ON cp.id = bi.canonical_product_id
|
||||
WHERE rp.retailer_id = $1
|
||||
AND pm.match_status IN ('auto', 'approved')
|
||||
AND pm.pin_disabled_at IS NULL
|
||||
AND rp.consecutive_out_of_stock < 3
|
||||
AND rp.pin_error_count < 3
|
||||
ORDER BY pm.basket_item_id, pm.match_score DESC`,
|
||||
[retailerId],
|
||||
);
|
||||
const map = new Map<string, { sourceUrl: string; productId: string; matchId: string }>();
|
||||
for (const row of result.rows) {
|
||||
const key = `${row.basket_slug}:${row.canonical_name}`;
|
||||
map.set(key, { sourceUrl: row.source_url, productId: row.product_id, matchId: row.match_id });
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ async function getBasketRows(basketSlug: string, marketCode: string): Promise<Ba
|
||||
po.observed_at
|
||||
FROM baskets b
|
||||
JOIN basket_items bi ON bi.basket_id = b.id AND bi.active = true
|
||||
JOIN product_matches pm ON pm.basket_item_id = bi.id AND pm.match_status IN ('auto','approved')
|
||||
JOIN product_matches pm ON pm.basket_item_id = bi.id AND pm.match_status IN ('auto','approved') AND pm.pin_disabled_at IS NULL
|
||||
JOIN retailer_products rp ON rp.id = pm.retailer_product_id AND rp.active = true
|
||||
JOIN retailers r ON r.id = rp.retailer_id AND r.market_code = $2 AND r.active = true
|
||||
JOIN LATERAL (
|
||||
@@ -81,6 +81,8 @@ async function getBaselinePrices(basketItemIds: string[], baseDate: string): Pro
|
||||
FROM price_observations po
|
||||
JOIN product_matches pm ON pm.retailer_product_id = po.retailer_product_id
|
||||
WHERE pm.basket_item_id = ANY($1)
|
||||
AND pm.match_status IN ('auto', 'approved')
|
||||
AND pm.pin_disabled_at IS NULL
|
||||
AND po.in_stock = true
|
||||
AND DATE_TRUNC('day', po.observed_at) = $2::date
|
||||
GROUP BY pm.basket_item_id`,
|
||||
@@ -228,18 +230,23 @@ export async function aggregateBasket(basketSlug: string, marketCode: string) {
|
||||
const existing = itemMap.get(r.basketItemId);
|
||||
if (existing === undefined || r.price < existing) itemMap.set(r.basketItemId, r.price);
|
||||
}
|
||||
const MIN_SPREAD_ITEMS = 4;
|
||||
const retailerSlugs = [...byRetailerItem.keys()];
|
||||
if (retailerSlugs.length >= 2) {
|
||||
// Find basket items covered by every retailer
|
||||
const commonItemIds = [...byRetailerItem.get(retailerSlugs[0])!.keys()].filter((itemId) =>
|
||||
retailerSlugs.every((slug) => byRetailerItem.get(slug)!.has(itemId)),
|
||||
);
|
||||
if (commonItemIds.length > 0) {
|
||||
if (commonItemIds.length >= MIN_SPREAD_ITEMS) {
|
||||
const retailerTotals = retailerSlugs.map((slug) =>
|
||||
commonItemIds.reduce((sum, id) => sum + byRetailerItem.get(slug)!.get(id)!, 0),
|
||||
);
|
||||
const spreadPct = ((Math.max(...retailerTotals) - Math.min(...retailerTotals)) / Math.min(...retailerTotals)) * 100;
|
||||
await writeComputedIndex(basketId, null, null, 'retailer_spread_pct', Math.round(spreadPct * 10) / 10);
|
||||
} else {
|
||||
// Insufficient overlap — write explicit 0 to prevent stale noisy value persisting
|
||||
await writeComputedIndex(basketId, null, null, 'retailer_spread_pct', 0);
|
||||
logger.info(`${basketSlug}: spread suppressed (${commonItemIds.length}/${MIN_SPREAD_ITEMS} common items)`);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ import { ExaProvider } from '../acquisition/exa.js';
|
||||
import { FirecrawlProvider } from '../acquisition/firecrawl.js';
|
||||
import type { AdapterContext } from '../adapters/types.js';
|
||||
import { upsertCanonicalProduct } from '../db/queries/products.js';
|
||||
import { getBasketItemId, upsertProductMatch } from '../db/queries/matches.js';
|
||||
import { getBasketItemId, getPinnedUrlsForRetailer, upsertProductMatch } from '../db/queries/matches.js';
|
||||
|
||||
const logger = {
|
||||
info: (msg: string, ...args: unknown[]) => console.log(`[scrape] ${msg}`, ...args),
|
||||
@@ -29,13 +29,13 @@ async function sleep(ms: number) {
|
||||
|
||||
async function getOrCreateRetailer(slug: string, config: ReturnType<typeof loadRetailerConfig>) {
|
||||
const result = await query<{ id: string }>(
|
||||
`INSERT INTO retailers (slug, name, market_code, country_code, currency_code, adapter_key, base_url)
|
||||
VALUES ($1,$2,$3,$3,$4,$5,$6)
|
||||
`INSERT INTO retailers (slug, name, market_code, country_code, currency_code, adapter_key, base_url, active)
|
||||
VALUES ($1,$2,$3,$3,$4,$5,$6,$7)
|
||||
ON CONFLICT (slug) DO UPDATE SET
|
||||
name = EXCLUDED.name, adapter_key = EXCLUDED.adapter_key,
|
||||
base_url = EXCLUDED.base_url, updated_at = NOW()
|
||||
base_url = EXCLUDED.base_url, active = EXCLUDED.active, updated_at = NOW()
|
||||
RETURNING id`,
|
||||
[slug, config.name, config.marketCode, config.currencyCode, config.adapter, config.baseUrl],
|
||||
[slug, config.name, config.marketCode, config.currencyCode, config.adapter, config.baseUrl, config.enabled],
|
||||
);
|
||||
return result.rows[0].id;
|
||||
}
|
||||
@@ -62,8 +62,25 @@ async function updateScrapeRun(
|
||||
);
|
||||
}
|
||||
|
||||
async function handlePinError(productId: string, matchId: string, targetId: string) {
|
||||
const { rows } = await query<{ c: string }>(
|
||||
`UPDATE retailer_products SET pin_error_count = pin_error_count + 1
|
||||
WHERE id = $1 RETURNING pin_error_count AS c`,
|
||||
[productId],
|
||||
);
|
||||
const count = parseInt(rows[0]?.c ?? '0', 10);
|
||||
if (count >= 3) {
|
||||
await query(`UPDATE product_matches SET pin_disabled_at = NOW() WHERE id = $1`, [matchId]);
|
||||
logger.info(` [pin] soft-disabled stale pin for ${targetId} (${count}x errors)`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function scrapeRetailer(slug: string) {
|
||||
const config = loadRetailerConfig(slug);
|
||||
|
||||
// Always sync active state from YAML to DB, even for disabled retailers.
|
||||
const retailerId = await getOrCreateRetailer(slug, config);
|
||||
|
||||
if (!config.enabled) {
|
||||
logger.info(`${slug} is disabled, skipping`);
|
||||
return;
|
||||
@@ -79,18 +96,19 @@ export async function scrapeRetailer(slug: string) {
|
||||
if (!fcKey) throw new Error(`search adapter requires FIRECRAWL_API_KEY (retailer: ${slug})`);
|
||||
}
|
||||
|
||||
const retailerId = await getOrCreateRetailer(slug, config);
|
||||
const runId = await createScrapeRun(retailerId);
|
||||
|
||||
logger.info(`Run ${runId} started for ${slug}`);
|
||||
|
||||
const pinnedUrls = await getPinnedUrlsForRetailer(retailerId);
|
||||
logger.info(`${slug}: ${pinnedUrls.size} pins loaded`);
|
||||
|
||||
const adapter =
|
||||
config.adapter === 'search'
|
||||
? new SearchAdapter(new ExaProvider(exaKey), new FirecrawlProvider(fcKey))
|
||||
: config.adapter === 'exa-search'
|
||||
? new ExaSearchAdapter(exaKey, process.env.FIRECRAWL_API_KEY)
|
||||
: new GenericPlaywrightAdapter();
|
||||
const ctx: AdapterContext = { config, runId, logger };
|
||||
const ctx: AdapterContext = { config, runId, logger, retailerId, pinnedUrls };
|
||||
|
||||
const targets = await adapter.discoverTargets(ctx);
|
||||
logger.info(`Discovered ${targets.length} targets`);
|
||||
@@ -103,6 +121,9 @@ export async function scrapeRetailer(slug: string) {
|
||||
|
||||
for (const target of targets) {
|
||||
pagesAttempted++;
|
||||
const isDirect = target.metadata?.direct === true;
|
||||
const pinnedProductId = target.metadata?.pinnedProductId as string | undefined;
|
||||
const pinnedMatchId = target.metadata?.matchId as string | undefined;
|
||||
try {
|
||||
const fetchResult = await adapter.fetchTarget(ctx, target);
|
||||
const products = await adapter.parseListing(ctx, fetchResult);
|
||||
@@ -110,11 +131,19 @@ export async function scrapeRetailer(slug: string) {
|
||||
if (products.length === 0) {
|
||||
logger.warn(` [${target.id}] parsed 0 products — counting as error`);
|
||||
errorsCount++;
|
||||
if (isDirect && pinnedProductId && pinnedMatchId) {
|
||||
await handlePinError(pinnedProductId, pinnedMatchId, target.id);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
logger.info(` [${target.id}] parsed ${products.length} products`);
|
||||
|
||||
for (const product of products) {
|
||||
// wasDirectHit=true only when the pin URL itself was successfully used.
|
||||
// fetchTarget sets direct:false in the payload when it falls back to Exa,
|
||||
// so this correctly distinguishes "pin worked" from "pin failed, Exa used instead".
|
||||
const wasDirectHit = isDirect && product.rawPayload.direct === true;
|
||||
|
||||
const productId = await upsertRetailerProduct({
|
||||
retailerId,
|
||||
retailerSku: product.retailerSku,
|
||||
@@ -144,9 +173,39 @@ export async function scrapeRetailer(slug: string) {
|
||||
rawPayloadJson: product.rawPayload,
|
||||
});
|
||||
|
||||
// For search-based adapters: auto-create product → basket match since we
|
||||
// searched for a specific basket item (no ambiguity in what was scraped).
|
||||
// Stale-pin maintenance — only when the pin URL was actually used (not Exa fallback).
|
||||
if (wasDirectHit && pinnedProductId && pinnedMatchId) {
|
||||
if (product.inStock) {
|
||||
await query(
|
||||
`UPDATE retailer_products SET consecutive_out_of_stock = 0, pin_error_count = 0 WHERE id = $1`,
|
||||
[pinnedProductId],
|
||||
);
|
||||
} else {
|
||||
const { rows } = await query<{ c: string }>(
|
||||
`UPDATE retailer_products
|
||||
SET consecutive_out_of_stock = consecutive_out_of_stock + 1
|
||||
WHERE id = $1 RETURNING consecutive_out_of_stock AS c`,
|
||||
[pinnedProductId],
|
||||
);
|
||||
const count = parseInt(rows[0]?.c ?? '0', 10);
|
||||
if (count >= 3) {
|
||||
await query(`UPDATE product_matches SET pin_disabled_at = NOW() WHERE id = $1`, [pinnedMatchId]);
|
||||
logger.info(` [pin] soft-disabled stale pin for ${target.id} (${count}x out-of-stock)`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// When a pinned target fell back to Exa (isDirect but !wasDirectHit),
|
||||
// increment pin_error_count so the old broken pin eventually gets disabled.
|
||||
if (isDirect && !wasDirectHit && pinnedProductId && pinnedMatchId) {
|
||||
await handlePinError(pinnedProductId, pinnedMatchId, target.id);
|
||||
}
|
||||
|
||||
// For search-based adapters: auto-create product → basket match.
|
||||
// Skip only when the pin URL was used directly — the match already exists.
|
||||
// Allow when this is a fresh Exa discovery (including Exa fallback from a broken pin).
|
||||
if (
|
||||
!wasDirectHit &&
|
||||
(config.adapter === 'exa-search' || config.adapter === 'search') &&
|
||||
product.rawPayload.basketSlug &&
|
||||
product.rawPayload.canonicalName
|
||||
@@ -179,6 +238,9 @@ export async function scrapeRetailer(slug: string) {
|
||||
} catch (err) {
|
||||
errorsCount++;
|
||||
logger.error(` [${target.id}] failed: ${err}`);
|
||||
if (isDirect && pinnedProductId && pinnedMatchId) {
|
||||
await handlePinError(pinnedProductId, pinnedMatchId, target.id);
|
||||
}
|
||||
}
|
||||
|
||||
if (pagesAttempted < targets.length) await sleep(delay);
|
||||
@@ -209,8 +271,10 @@ export async function scrapeAll() {
|
||||
// registry via fetchWithFallback). SearchAdapter and ExaSearchAdapter construct their own
|
||||
// provider instances directly from env vars and bypass the registry.
|
||||
initProviders(process.env as Record<string, string>);
|
||||
const configs = loadAllRetailerConfigs().filter((c) => c.enabled);
|
||||
logger.info(`Scraping ${configs.length} retailers`);
|
||||
// Iterate ALL configs (including disabled) so getOrCreateRetailer syncs active=false to DB.
|
||||
// scrapeRetailer() returns early after the upsert for disabled retailers.
|
||||
const configs = loadAllRetailerConfigs();
|
||||
logger.info(`Syncing ${configs.length} retailers (${configs.filter((c) => c.enabled).length} enabled)`);
|
||||
|
||||
// Run retailers in parallel: each hits a different domain so rate limits don't conflict.
|
||||
// Cap at 5 concurrent to avoid saturating Firecrawl's global request limits.
|
||||
|
||||
@@ -61,7 +61,8 @@ async function validateBasket(basketSlug: string, marketCode: string): Promise<v
|
||||
WHERE retailer_product_id = rp.id AND in_stock = true
|
||||
ORDER BY observed_at DESC LIMIT 1
|
||||
) po ON true
|
||||
WHERE pm.match_status IN ('auto', 'approved', 'review')`,
|
||||
WHERE pm.match_status IN ('auto', 'approved', 'review')
|
||||
AND pm.pin_disabled_at IS NULL`,
|
||||
[basketSlug, marketCode],
|
||||
);
|
||||
|
||||
|
||||
@@ -382,7 +382,7 @@ export async function buildRetailerSpreadSnapshot(
|
||||
MAX(po.observed_at) AS last_observed_at
|
||||
FROM baskets b
|
||||
JOIN basket_items bi ON bi.basket_id = b.id AND bi.active = true
|
||||
JOIN product_matches pm ON pm.basket_item_id = bi.id AND pm.match_status IN ('auto','approved')
|
||||
JOIN product_matches pm ON pm.basket_item_id = bi.id AND pm.match_status IN ('auto','approved') AND pm.pin_disabled_at IS NULL
|
||||
JOIN retailer_products rp ON rp.id = pm.retailer_product_id AND rp.active = true
|
||||
JOIN retailers r ON r.id = rp.retailer_id AND r.market_code = $2 AND r.active = true
|
||||
JOIN LATERAL (
|
||||
@@ -436,8 +436,10 @@ export async function buildRetailerSpreadSnapshot(
|
||||
}
|
||||
}
|
||||
|
||||
const MIN_SPREAD_ITEMS = 4;
|
||||
const commonItemCount = retailers.length > 0 ? retailers[0].itemCount : 0;
|
||||
const spreadPct =
|
||||
retailers.length >= 2
|
||||
retailers.length >= 2 && commonItemCount >= MIN_SPREAD_ITEMS
|
||||
? Math.round(
|
||||
((retailers[retailers.length - 1].basketTotal - retailers[0].basketTotal) /
|
||||
retailers[0].basketTotal) *
|
||||
|
||||
216
consumer-prices-core/tests/unit/pinning.test.ts
Normal file
216
consumer-prices-core/tests/unit/pinning.test.ts
Normal file
@@ -0,0 +1,216 @@
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
import { isTitlePlausible, isAllowedHost, SearchAdapter } from '../../src/adapters/search.js';
|
||||
import type { AdapterContext } from '../../src/adapters/types.js';
|
||||
|
||||
// ── isAllowedHost ─────────────────────────────────────────────────────────────
|
||||
|
||||
describe('isAllowedHost', () => {
|
||||
it('accepts exact hostname match', () => {
|
||||
expect(isAllowedHost('https://luluhypermarket.com/en/rice', 'luluhypermarket.com')).toBe(true);
|
||||
});
|
||||
|
||||
it('rejects subdomain that was not allowed', () => {
|
||||
expect(isAllowedHost('https://shop.luluhypermarket.com/rice', 'luluhypermarket.com')).toBe(false);
|
||||
});
|
||||
|
||||
it('rejects prefix-hostname attack (evilluluhypermarket.com)', () => {
|
||||
expect(isAllowedHost('https://evilluluhypermarket.com/rice', 'luluhypermarket.com')).toBe(false);
|
||||
});
|
||||
|
||||
it('rejects non-http(s) scheme', () => {
|
||||
expect(isAllowedHost('ftp://luluhypermarket.com/rice', 'luluhypermarket.com')).toBe(false);
|
||||
});
|
||||
|
||||
it('rejects invalid URL', () => {
|
||||
expect(isAllowedHost('not-a-url', 'luluhypermarket.com')).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
// ── discoverTargets: pin branch ───────────────────────────────────────────────
|
||||
|
||||
const makeCtx = (pinnedUrls?: Map<string, { sourceUrl: string; productId: string; matchId: string }>): AdapterContext => ({
|
||||
config: {
|
||||
slug: 'lulu_ae',
|
||||
name: 'Lulu',
|
||||
marketCode: 'ae',
|
||||
currencyCode: 'AED',
|
||||
adapter: 'search',
|
||||
baseUrl: 'https://luluhypermarket.com',
|
||||
enabled: true,
|
||||
} as AdapterContext['config'],
|
||||
runId: 'run-1',
|
||||
logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() },
|
||||
retailerId: 'retailer-1',
|
||||
pinnedUrls,
|
||||
});
|
||||
|
||||
// Stub loadAllBasketConfigs so we don't need DB / FS
|
||||
vi.mock('../../src/config/loader.js', () => ({
|
||||
loadAllBasketConfigs: () => [
|
||||
{
|
||||
slug: 'essentials-ae',
|
||||
marketCode: 'ae',
|
||||
items: [
|
||||
{ id: 'item-1', canonicalName: 'Eggs Fresh 12 Pack', category: 'dairy_eggs', weight: 0.1 },
|
||||
],
|
||||
},
|
||||
],
|
||||
loadAllRetailerConfigs: () => [],
|
||||
loadRetailerConfig: () => ({}),
|
||||
}));
|
||||
|
||||
describe('SearchAdapter.discoverTargets', () => {
|
||||
const exa = { search: vi.fn() } as never;
|
||||
const firecrawl = { extract: vi.fn() } as never;
|
||||
const adapter = new SearchAdapter(exa, firecrawl);
|
||||
|
||||
it('returns direct=true when a valid pin exists', async () => {
|
||||
const pins = new Map([
|
||||
['essentials-ae:Eggs Fresh 12 Pack', { sourceUrl: 'https://luluhypermarket.com/en/eggs-12', productId: 'prod-1', matchId: 'match-1' }],
|
||||
]);
|
||||
const ctx = makeCtx(pins);
|
||||
const targets = await adapter.discoverTargets(ctx);
|
||||
expect(targets).toHaveLength(1);
|
||||
expect(targets[0].metadata?.direct).toBe(true);
|
||||
expect(targets[0].url).toBe('https://luluhypermarket.com/en/eggs-12');
|
||||
expect(targets[0].metadata?.pinnedProductId).toBe('prod-1');
|
||||
});
|
||||
|
||||
it('returns direct=false when no pin exists', async () => {
|
||||
const ctx = makeCtx(new Map());
|
||||
const targets = await adapter.discoverTargets(ctx);
|
||||
expect(targets).toHaveLength(1);
|
||||
expect(targets[0].metadata?.direct).toBe(false);
|
||||
expect(targets[0].url).toBe('https://luluhypermarket.com');
|
||||
});
|
||||
|
||||
it('returns direct=false and warns when pin host does not match', async () => {
|
||||
const pins = new Map([
|
||||
['essentials-ae:Eggs Fresh 12 Pack', { sourceUrl: 'https://evil.com/eggs', productId: 'prod-1', matchId: 'match-1' }],
|
||||
]);
|
||||
const ctx = makeCtx(pins);
|
||||
const targets = await adapter.discoverTargets(ctx);
|
||||
expect(targets[0].metadata?.direct).toBe(false);
|
||||
expect(ctx.logger.warn).toHaveBeenCalledWith(expect.stringContaining('host mismatch'));
|
||||
});
|
||||
});
|
||||
|
||||
// ── fetchTarget: direct path skips Exa ───────────────────────────────────────
|
||||
|
||||
describe('SearchAdapter.fetchTarget direct path', () => {
|
||||
it('skips Exa and calls Firecrawl directly for direct=true targets', async () => {
|
||||
const exa = { search: vi.fn() } as never;
|
||||
const extracted = { productName: 'Eggs 12 Pack', price: 12.5, currency: 'AED', inStock: true };
|
||||
const firecrawl = { extract: vi.fn().mockResolvedValue({ data: extracted }) } as never;
|
||||
const adapter = new SearchAdapter(exa, firecrawl);
|
||||
|
||||
const ctx = makeCtx();
|
||||
const target = {
|
||||
id: 'item-1',
|
||||
url: 'https://luluhypermarket.com/en/eggs-12',
|
||||
category: 'dairy_eggs',
|
||||
metadata: { canonicalName: 'Eggs Fresh 12 Pack', domain: 'luluhypermarket.com', basketSlug: 'essentials-ae', currency: 'AED', direct: true, pinnedProductId: 'prod-1', matchId: 'match-1' },
|
||||
};
|
||||
|
||||
await adapter.fetchTarget(ctx, target);
|
||||
|
||||
expect(exa.search).not.toHaveBeenCalled();
|
||||
expect(firecrawl.extract).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('falls back to Exa when direct extraction fails', async () => {
|
||||
const exa = { search: vi.fn().mockResolvedValue([{ url: 'https://luluhypermarket.com/en/eggs-alt' }]) } as never;
|
||||
const firecrawl = {
|
||||
extract: vi.fn()
|
||||
.mockRejectedValueOnce(new Error('timeout'))
|
||||
.mockResolvedValue({ data: { productName: 'Eggs', price: 12.5, currency: 'AED', inStock: true } }),
|
||||
} as never;
|
||||
const adapter = new SearchAdapter(exa, firecrawl);
|
||||
|
||||
const ctx = makeCtx();
|
||||
const target = {
|
||||
id: 'item-1',
|
||||
url: 'https://luluhypermarket.com/en/eggs-pinned',
|
||||
category: 'dairy_eggs',
|
||||
metadata: { canonicalName: 'Eggs Fresh 12 Pack', domain: 'luluhypermarket.com', basketSlug: 'essentials-ae', currency: 'AED', direct: true, pinnedProductId: 'prod-1', matchId: 'match-1' },
|
||||
};
|
||||
|
||||
await adapter.fetchTarget(ctx, target);
|
||||
|
||||
expect(exa.search).toHaveBeenCalledOnce();
|
||||
expect(firecrawl.extract).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
// ── inStockFromPrice ──────────────────────────────────────────────────────────
|
||||
|
||||
describe('inStockFromPrice flag', () => {
|
||||
const makeCtxWithFlag = (flag: boolean): AdapterContext => ({
|
||||
...makeCtx(),
|
||||
config: {
|
||||
slug: 'bigbasket_in',
|
||||
name: 'BigBasket',
|
||||
marketCode: 'in',
|
||||
currencyCode: 'INR',
|
||||
adapter: 'search',
|
||||
baseUrl: 'https://www.bigbasket.com',
|
||||
enabled: true,
|
||||
searchConfig: { numResults: 5, inStockFromPrice: flag },
|
||||
} as AdapterContext['config'],
|
||||
});
|
||||
|
||||
it('overrides inStock=true when flag=true and price > 0', async () => {
|
||||
const exa = { search: vi.fn() } as never;
|
||||
const firecrawl = {
|
||||
extract: vi.fn().mockResolvedValue({ data: { productName: 'Eggs 6 Pack', price: 55, currency: 'INR', inStock: false } }),
|
||||
} as never;
|
||||
const adapter = new SearchAdapter(exa, firecrawl);
|
||||
const ctx = makeCtxWithFlag(true);
|
||||
const target = {
|
||||
id: 'item-1',
|
||||
url: 'https://www.bigbasket.com/pd/eggs',
|
||||
category: 'dairy_eggs',
|
||||
metadata: { canonicalName: 'Eggs Fresh 6 Pack', domain: 'www.bigbasket.com', basketSlug: 'essentials-in', currency: 'INR', direct: true },
|
||||
};
|
||||
const result = await adapter.fetchTarget(ctx, target);
|
||||
const payload = JSON.parse(result.html);
|
||||
expect(payload.extracted.inStock).toBe(true);
|
||||
});
|
||||
|
||||
it('leaves inStock unchanged when flag=false', async () => {
|
||||
const exa = { search: vi.fn() } as never;
|
||||
const firecrawl = {
|
||||
extract: vi.fn().mockResolvedValue({ data: { productName: 'Eggs 6 Pack', price: 55, currency: 'INR', inStock: false } }),
|
||||
} as never;
|
||||
const adapter = new SearchAdapter(exa, firecrawl);
|
||||
const ctx = makeCtxWithFlag(false);
|
||||
const target = {
|
||||
id: 'item-1',
|
||||
url: 'https://www.bigbasket.com/pd/eggs',
|
||||
category: 'dairy_eggs',
|
||||
metadata: { canonicalName: 'Eggs Fresh 6 Pack', domain: 'www.bigbasket.com', basketSlug: 'essentials-in', currency: 'INR', direct: true },
|
||||
};
|
||||
const result = await adapter.fetchTarget(ctx, target);
|
||||
const payload = JSON.parse(result.html);
|
||||
expect(payload.extracted.inStock).toBe(false);
|
||||
});
|
||||
|
||||
it('does not override when price is 0', async () => {
|
||||
const exa = { search: vi.fn() } as never;
|
||||
const firecrawl = {
|
||||
extract: vi.fn().mockResolvedValue({ data: { productName: 'Eggs 6 Pack', price: 0.01, currency: 'INR', inStock: false } }),
|
||||
} as never;
|
||||
const adapter = new SearchAdapter(exa, firecrawl);
|
||||
// _extractFromUrl returns null when price <= 0; test that price=0.01 still triggers override
|
||||
const ctx = makeCtxWithFlag(true);
|
||||
const target = {
|
||||
id: 'item-1',
|
||||
url: 'https://www.bigbasket.com/pd/eggs',
|
||||
category: 'dairy_eggs',
|
||||
metadata: { canonicalName: 'Eggs Fresh 6 Pack', domain: 'www.bigbasket.com', basketSlug: 'essentials-in', currency: 'INR', direct: true },
|
||||
};
|
||||
const result = await adapter.fetchTarget(ctx, target);
|
||||
const payload = JSON.parse(result.html);
|
||||
expect(payload.extracted.inStock).toBe(true);
|
||||
});
|
||||
});
|
||||
@@ -40,7 +40,6 @@ export const MARKETS: Array<{ code: string; label: string }> = [
|
||||
{ code: 'br', label: '🇧🇷 BR' },
|
||||
{ code: 'gb', label: '🇬🇧 UK' },
|
||||
{ code: 'in', label: '🇮🇳 IN' },
|
||||
{ code: 'ke', label: '🇰🇪 KE' },
|
||||
{ code: 'sa', label: '🇸🇦 SA' },
|
||||
{ code: 'sg', label: '🇸🇬 SG' },
|
||||
{ code: 'us', label: '🇺🇸 US' },
|
||||
|
||||
Reference in New Issue
Block a user