fix(fuel-prices): resilient seeder — proxy, retry, stale-carry-forward, strict gate (#3082)

* fix(fuel-prices): resilient seeder — proxy, retry, stale-carry-forward, strict gate

Addresses 2026-04-07 run where 4 of 7 sources failed (NZ 403, BR/MX fetch failed)
and the seeder silently published 30 countries with Brazil/Mexico/NZ vanishing
from the UI.

- Startup proxy diagnostic so PROXY_URL misconfigs are immediately visible.
- New fetchWithProxyPreferred (proxy-first, direct fallback) + withFuelRetry
  (3 attempts, backoff) wrapping NZ/BR/MX upstream calls.
- Swap MX from dead datos.gob.mx to CRE publicacionexterna XML (13k stations).
- Stale-carry-forward failed sources from :prev snapshot (stale: true) instead
  of dropping countries; fresh-only ranking; skip WoW for stale entries.
- Gate :prev rotation on all-sources-succeeded so partial runs don't poison
  next week's WoW.
- Strict validateFn: >=25 countries AND US+GB+MY fresh. Prior gate was >=1.
- emptyDataIsFailure: true so validation fail doesnt refresh seed-meta.
- Wrap imperative body in main() + isMain guard; export parseCREStationPrices
  and validateFuel; 9 new unit tests.

* fix(fuel-prices): remove stale-carry-forward, harden validator (PR review)

Reviewer flagged two P1s on the prior commit:

1. stale-carry-forward inserted stale: true rows into the published payload,
   but the proto schema and panel have no staleness render path. Users would
   see week-old BR/MX/NZ prices as current. Resilience turned into a
   freshness bug.
2. Validator counted stale-carried entries toward the floor. US/GB/MY fresh
   + 22 stale still passed, refreshing seed-meta.fetchedAt and leaving health
   operationally healthy indefinitely. Hid the outage.

Fix: remove stale-carry-forward entirely. Tighten validator to require
countries.length >= 30, US+GB+MY present, and failedSources.length === 0.
Partial-failure runs now rejected → 10-day cache TTL serves last healthy
snapshot → health STALE_SEED after maxStaleMin. Correct, visible signal.

Drops dead code: SOURCE_COUNTRY_CODES, staleCarried/freshCountries, stale
WoW skip. Tests updated for the failedSources gate.
This commit is contained in:
Elie Habib
2026-04-14 09:16:23 +04:00
committed by GitHub
parent 45c98284da
commit 7e7ca70faf
2 changed files with 271 additions and 52 deletions

View File

@@ -7,23 +7,74 @@ loadEnvFile(import.meta.url);
const _proxyAuth = resolveProxyForConnect();
// Fetch with proxy fallback for government APIs that block datacenter IPs.
// Startup diagnostic — makes silent proxy misconfig immediately visible in logs.
if (_proxyAuth) {
const hostHint = _proxyAuth.split('@').pop().split(':')[0];
console.log(` [PROXY] configured via PROXY_URL (host=${hostHint})`);
} else {
console.warn(` [PROXY] NOT configured — PROXY_URL empty; datacenter-blocked sources (NZ/BR/MX) will fail`);
}
async function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }
// Retry wrapper: 3 attempts, 1.5s/3s/4.5s backoff. Use for all upstream calls.
async function withFuelRetry(label, fn, { tries = 3 } = {}) {
let lastErr;
for (let i = 1; i <= tries; i++) {
try {
return await fn();
} catch (err) {
lastErr = err;
if (i < tries) {
const delay = 1500 * i;
console.warn(` [${label}] attempt ${i}/${tries} failed (${err.message}) — retry in ${delay}ms`);
await sleep(delay);
}
}
}
throw lastErr;
}
async function fetchDirect(url, { timeoutMs, accept }) {
const r = await globalThis.fetch(url, {
headers: { 'User-Agent': CHROME_UA, Accept: accept },
signal: AbortSignal.timeout(timeoutMs),
});
if (!r.ok) throw new Error(`HTTP ${r.status}`);
return r;
}
async function fetchViaProxy(url, { timeoutMs, accept }) {
if (!_proxyAuth) throw new Error('proxy not configured');
const { buffer, contentType } = await httpsProxyFetchRaw(url, _proxyAuth, { accept, timeoutMs });
return new Response(buffer, { headers: { 'Content-Type': contentType || 'text/plain' } });
}
// Direct-first: try direct, fall back to proxy. Use for sources that usually work.
async function fetchWithProxyFallback(url, { timeoutMs = 20_000, accept = 'text/csv,text/plain,*/*' } = {}) {
try {
const r = await globalThis.fetch(url, {
headers: { 'User-Agent': CHROME_UA, Accept: accept },
signal: AbortSignal.timeout(timeoutMs),
});
if (r.ok) return r;
throw new Error(`HTTP ${r.status}`);
return await fetchDirect(url, { timeoutMs, accept });
} catch (directErr) {
if (!_proxyAuth) throw directErr;
console.warn(` direct failed (${directErr.message}) — retrying via proxy`);
const { buffer, contentType } = await httpsProxyFetchRaw(url, _proxyAuth, { accept, timeoutMs });
return new Response(buffer, { headers: { 'Content-Type': contentType || 'text/plain' } });
return await fetchViaProxy(url, { timeoutMs, accept });
}
}
// Proxy-first: try proxy, fall back to direct. Use for sources known to block
// datacenter IPs (NZ MBIE via Cloudflare, gov.br TLS failures from Railway,
// MX CRE with intermittent IPv4 routing). Saves a failed direct call every run.
async function fetchWithProxyPreferred(url, { timeoutMs = 20_000, accept = 'text/csv,text/plain,*/*' } = {}) {
if (_proxyAuth) {
try {
return await fetchViaProxy(url, { timeoutMs, accept });
} catch (proxyErr) {
console.warn(` proxy failed (${proxyErr.message}) — falling back to direct`);
}
}
return await fetchDirect(url, { timeoutMs, accept });
}
const CANONICAL_KEY = 'economic:fuel-prices:v1';
const CACHE_TTL = 864000; // 10 days — weekly seed with 3-day cron-drift buffer
const MIN_COUNTRIES = 5;
@@ -160,36 +211,35 @@ async function fetchSpain() {
}
}
// MX: datos.gob.mx/v2 went unresponsive in 2026 — IPv4 connect hangs forever
// even from residential IPs. Switched to CRE's publicacionexterna XML feed,
// which publishes daily station-level prices (regular/premium/diesel in MXN/L).
async function fetchMexico() {
const url = 'https://publicacionexterna.azurewebsites.net/publicaciones/prices';
try {
const url = 'https://api.datos.gob.mx/v2/precio.gasolina.publico?pageSize=1000';
console.log(` [MX] API: ${url}`);
const resp = await fetchWithProxyFallback(url, { accept: 'application/json' });
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
const data = await resp.json();
const results = data?.results;
if (!Array.isArray(results) || results.length === 0) return [];
const dates = results.map(r => r.fecha_aplicacion).filter(Boolean);
if (dates.length === 0) return [];
const maxDate = dates.sort().reverse()[0];
const latest = results.filter(r => r.fecha_aplicacion === maxDate);
const regularPrices = latest.map(r => parseFloat(r.precio_gasolina_regular)).filter(v => !isNaN(v) && v > 0);
const dieselPrices = latest.map(r => parseFloat(r.precio_diesel)).filter(v => !isNaN(v) && v > 0);
const avgRegular = regularPrices.length > 0
? +(regularPrices.reduce((a, b) => a + b, 0) / regularPrices.length).toFixed(4)
: null;
const avgDiesel = dieselPrices.length > 0
? +(dieselPrices.reduce((a, b) => a + b, 0) / dieselPrices.length).toFixed(4)
: null;
console.log(` [MX] Regular=${avgRegular} MXN/L, Diesel=${avgDiesel} MXN/L (${latest.length} entries, date=${maxDate})`);
console.log(` [MX] CRE XML: ${url}`);
const resp = await withFuelRetry('MX', () =>
fetchWithProxyPreferred(url, { accept: 'application/xml,text/xml,*/*', timeoutMs: 30000 }),
);
const xml = await resp.text();
const re = (type) => new RegExp(`<gas_price\\s+type="${type}">([\\d.]+)</gas_price>`, 'g');
const collect = (type) => [...xml.matchAll(re(type))].map(m => parseFloat(m[1]))
.filter(v => Number.isFinite(v) && v > 5 && v < 100); // MXN/L sanity (5 < v < 100)
const regular = collect('regular');
const diesel = collect('diesel');
if (!regular.length && !diesel.length) {
console.warn(` [MX] CRE returned ${xml.length} bytes but no usable <gas_price> rows`);
return [];
}
const avg = (a) => a.length ? +(a.reduce((s, v) => s + v, 0) / a.length).toFixed(4) : null;
const avgRegular = avg(regular);
const avgDiesel = avg(diesel);
const observedAt = new Date().toISOString().slice(0, 10);
console.log(` [MX] Regular=${avgRegular} MXN/L (${regular.length}), Diesel=${avgDiesel} MXN/L (${diesel.length})`);
return [{
code: 'MX', name: 'Mexico', currency: 'MXN', flag: '🇲🇽',
gasoline: avgRegular != null ? { localPrice: avgRegular, grade: 'Regular', source: 'datos.gob.mx', observedAt: maxDate } : null,
diesel: avgDiesel != null ? { localPrice: avgDiesel, grade: 'Diesel', source: 'datos.gob.mx', observedAt: maxDate } : null,
gasoline: avgRegular != null ? { localPrice: avgRegular, grade: 'Regular', source: 'cre.gob.mx', observedAt } : null,
diesel: avgDiesel != null ? { localPrice: avgDiesel, grade: 'Diesel', source: 'cre.gob.mx', observedAt } : null,
}];
} catch (err) {
console.warn(` [MX] fetchMexico error: ${err.message}`);
@@ -418,15 +468,17 @@ async function fetchBrazil() {
try {
console.log(` [BR] gas CSV: ${GAS_URL}`);
console.log(` [BR] dsl CSV: ${DSL_URL}`);
// Use allSettled so a 429 on the diesel CSV doesn't discard gasoline data
// Use allSettled so a 429 on the diesel CSV doesn't discard gasoline data.
// gov.br returns generic undici "fetch failed" from Railway IPs — proxy-preferred + retry
// is the only path that consistently works from datacenter networks.
const [gasResult, dslResult] = await Promise.allSettled([
fetchWithProxyFallback(GAS_URL, { timeoutMs: 30000 })
.then(r => r.ok ? r.text() : Promise.reject(new Error(`Gas HTTP ${r.status}`))),
fetchWithProxyFallback(DSL_URL, { timeoutMs: 30000 })
.then(r => r.ok ? r.text() : Promise.reject(new Error(`Dsl HTTP ${r.status}`))),
withFuelRetry('BR-gas', () => fetchWithProxyPreferred(GAS_URL, { timeoutMs: 30000 }))
.then(r => r.text()),
withFuelRetry('BR-dsl', () => fetchWithProxyPreferred(DSL_URL, { timeoutMs: 30000 }))
.then(r => r.text()),
]);
if (gasResult.status === 'rejected') console.warn(` [BR] gas CSV failed: ${gasResult.reason.message}`);
if (dslResult.status === 'rejected') console.warn(` [BR] dsl CSV failed: ${dslResult.reason.message}`);
if (gasResult.status === 'rejected') console.warn(` [BR] gas CSV failed after retries: ${gasResult.reason?.message || gasResult.reason}`);
if (dslResult.status === 'rejected') console.warn(` [BR] dsl CSV failed after retries: ${dslResult.reason?.message || dslResult.reason}`);
const gas = gasResult.status === 'fulfilled' ? nationalMean(gasResult.value, 'GASOLINA', 'valor de venda') : null;
const dsl = dslResult.status === 'fulfilled' ? nationalMean(dslResult.value, 'DIESEL', 'valor de venda') : null;
@@ -450,8 +502,8 @@ async function fetchNewZealand() {
const url = 'https://www.mbie.govt.nz/assets/Data-Files/Energy/Weekly-fuel-price-monitoring/weekly-table.csv';
try {
console.log(` [NZ] CSV: ${url}`);
const resp = await fetchWithProxyFallback(url);
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
// MBIE's CDN 403s Railway datacenter IPs (Cloudflare IP reputation). Proxy-preferred + retry.
const resp = await withFuelRetry('NZ', () => fetchWithProxyPreferred(url, { timeoutMs: 30000 }));
const text = await resp.text();
const lines = text.split('\n').map(l => l.trim()).filter(Boolean);
if (lines.length < 2) return [];
@@ -544,6 +596,40 @@ async function fetchUK_DESNZ() {
}
}
// Pure helpers exported for unit testing. Must stay above the isMain guard
// so `import` from tests doesn't trigger the imperative seed run below.
// Extract per-station MXN/L prices from the CRE XML feed. Used by fetchMexico.
// Filters to the sane range (5..100 MXN/L) to drop placeholder/test rows.
export function parseCREStationPrices(xml) {
const re = (type) => new RegExp(`<gas_price\\s+type="${type}">([\\d.]+)</gas_price>`, 'g');
const collect = (type) => [...xml.matchAll(re(type))].map(m => parseFloat(m[1]))
.filter(v => Number.isFinite(v) && v > 5 && v < 100);
return { regular: collect('regular'), diesel: collect('diesel') };
}
// Publish gate. Exported so tests can lock in the contract.
//
// All entries in `countries` are FRESH from this run (no stale-carry-forward —
// that was removed after review: carrying previous-week's data as if current
// created a freshness bug because the proto/UI have no badge for staleness).
// A degraded run that can't meet this gate fails publish; the 10-day cache TTL
// serves the last healthy snapshot and health flips to STALE_SEED after its
// maxStaleMin window.
//
// Contract:
// - ≥30 countries (EU-CSV alone is 27 + at least 3 of US/GB/MY/BR/MX/NZ).
// - US + GB + MY present (each uniquely covers a non-EU region).
// - No failed sources — partial failures must not publish as healthy.
export function validateFuel(d) {
const codes = new Set((d?.countries ?? []).map(c => c.code));
const total = d?.countries?.length ?? 0;
const criticalPresent = ['US', 'GB', 'MY'].every(code => codes.has(code));
const allSourcesOk = Array.isArray(d?.failedSources) ? d.failedSources.length === 0 : true;
return total >= 30 && criticalPresent && allSourcesOk;
}
async function main() {
const prevSnapshot = await readSeedSnapshot(`${CANONICAL_KEY}:prev`);
const fxSymbols = {};
@@ -566,6 +652,7 @@ const fetchResults = await Promise.allSettled([
const sourceNames = ['Malaysia', 'Mexico', 'US-EIA', 'EU-CSV', 'Brazil', 'New Zealand', 'UK-DESNZ'];
let successfulSources = 0;
const failedSources = [];
const countryMap = new Map();
@@ -598,19 +685,36 @@ function mergeCountry(entry, fxRates) {
for (let i = 0; i < fetchResults.length; i++) {
const result = fetchResults[i];
const name = sourceNames[i];
if (result.status === 'fulfilled' && result.value.length > 0) {
successfulSources++;
for (const entry of result.value) {
mergeCountry(entry, fxRates);
}
console.log(` [SOURCE] ${sourceNames[i]}: ${result.value.length} countries`);
} else if (result.status === 'rejected') {
console.warn(` [SOURCE] ${sourceNames[i]}: rejected — ${result.reason}`);
console.log(` [SOURCE] ${name}: ${result.value.length} countries`);
} else {
console.warn(` [SOURCE] ${sourceNames[i]}: 0 countries`);
failedSources.push(name);
if (result.status === 'rejected') {
console.warn(` [SOURCE] ${name}: rejected — ${result.reason?.message || result.reason}`);
} else {
console.warn(` [SOURCE] ${name}: 0 countries`);
}
}
}
// Stale-carry-forward was removed after review: it inserted week-old data
// into the published payload with a `stale:true` field that no proto schema
// or panel knew how to render, so BR/MX/NZ carried-forward entries would
// display as ordinary current prices. That's a freshness bug, not resilience.
//
// Instead: on partial failure, the strict validator (≥30 countries + US/GB/MY
// + no failed sources) rejects the publish. The 10-day cache TTL keeps the
// last healthy snapshot serving the panel, and health flips to STALE_SEED
// once maxStaleMin is exceeded — a correct, visible failure signal.
if (failedSources.length > 0) {
console.warn(` [DEGRADED] ${failedSources.length} source(s) failed this run — publish will be rejected by validator, previous snapshot will continue serving until cache TTL`);
}
const countries = Array.from(countryMap.values());
// Coverage warnings — log but always publish what we have
@@ -661,7 +765,7 @@ if (wowAvailable) {
}
}
// Compute cheapest/most-expensive
// All entries are fresh this run (carry-forward removed).
const withGasoline = countries.filter(c => c.gasoline?.usdPrice > 0);
const withDiesel = countries.filter(c => c.diesel?.usdPrice > 0);
@@ -678,7 +782,9 @@ const mostExpensiveDiesel = withDiesel.length
? withDiesel.reduce((a, b) => a.diesel.usdPrice > b.diesel.usdPrice ? a : b).code
: '';
console.log(`\n Summary: ${countries.length} countries, ${successfulSources} sources`);
const allSourcesFresh = failedSources.length === 0;
console.log(`\n Summary: ${countries.length} countries, ${successfulSources}/${sourceNames.length} sources`);
if (!allSourcesFresh) console.warn(` [FRESHNESS] Failed sources this run: ${failedSources.join(', ')} — publish will be rejected, prev snapshot keeps serving`);
console.log(` Cheapest gasoline: ${cheapestGasoline}, Cheapest diesel: ${cheapestDiesel}`);
console.log(` Most expensive gasoline: ${mostExpensiveGasoline}, Most expensive diesel: ${mostExpensiveDiesel}`);
@@ -692,16 +798,35 @@ const data = {
wowAvailable,
prevFetchedAt: wowAvailable ? (prevSnapshot.fetchedAt ?? '') : '',
sourceCount: successfulSources,
totalSources: sourceNames.length,
failedSources,
countryCount: countries.length,
allSourcesFresh,
};
// Only rotate :prev when EVERY source succeeded this run. A partial rotation
// poisons next week's WoW for every country the failed source owned (would
// compare fresh-this-week to stale-carried-last-week = ~0% change forever).
const rotatePrev = allSourcesFresh;
if (!rotatePrev) console.warn(` [:prev] Skipping rotation — WoW integrity preserved for next run`);
await runSeed('economic', 'fuel-prices', CANONICAL_KEY, async () => data, {
ttlSeconds: CACHE_TTL,
validateFn: (d) => d?.countries?.length >= 1,
validateFn: validateFuel,
emptyDataIsFailure: true,
recordCount: (d) => d?.countries?.length || 0,
extraKeys: wowAvailable ? [{
extraKeys: (wowAvailable && rotatePrev) ? [{
key: `${CANONICAL_KEY}:prev`,
transform: () => data,
ttl: CACHE_TTL * 2,
}] : [],
});
}
if (process.argv[1]?.endsWith('seed-fuel-prices.mjs')) {
main().catch((err) => {
const cause = err.cause ? ` (cause: ${err.cause.message || err.cause.code || err.cause})` : '';
console.error('FATAL:', (err.message || err) + cause);
process.exit(1);
});
}

View File

@@ -0,0 +1,94 @@
import { test } from 'node:test';
import assert from 'node:assert/strict';
import { parseCREStationPrices, validateFuel } from '../scripts/seed-fuel-prices.mjs';
test('parseCREStationPrices extracts regular + diesel per-station prices from CRE XML', () => {
const xml = `<?xml version="1.0" encoding="utf-8"?>
<places>
<place place_id="1">
<gas_price type="regular">22.95</gas_price>
<gas_price type="premium">26.91</gas_price>
</place>
<place place_id="2">
<gas_price type="regular">24.7</gas_price>
<gas_price type="diesel">29.5</gas_price>
</place>
</places>`;
const { regular, diesel } = parseCREStationPrices(xml);
assert.deepEqual(regular, [22.95, 24.7]);
assert.deepEqual(diesel, [29.5]);
});
test('parseCREStationPrices filters out-of-range prices', () => {
// 0.01 and 1000.0 are clearly bad (placeholder/test rows); 15 and 50 are valid MXN/L.
const xml = `<places>
<place><gas_price type="regular">0.01</gas_price></place>
<place><gas_price type="regular">15</gas_price></place>
<place><gas_price type="regular">1000.0</gas_price></place>
<place><gas_price type="regular">50</gas_price></place>
</places>`;
const { regular } = parseCREStationPrices(xml);
assert.deepEqual(regular, [15, 50]);
});
test('parseCREStationPrices handles empty XML', () => {
const { regular, diesel } = parseCREStationPrices('<places></places>');
assert.deepEqual(regular, []);
assert.deepEqual(diesel, []);
});
const HEALTHY_COUNTRIES = [
{ code: 'US' }, { code: 'GB' }, { code: 'MY' }, { code: 'BR' }, { code: 'MX' }, { code: 'NZ' },
...Array.from({ length: 27 }, (_, i) => ({ code: `EU${i}` })),
];
test('validateFuel accepts healthy snapshot (all sources fresh, 33 countries, US+GB+MY present)', () => {
assert.equal(validateFuel({ countries: HEALTHY_COUNTRIES, failedSources: [] }), true);
});
test('validateFuel rejects when ANY source failed (no silent degraded publishes)', () => {
assert.equal(
validateFuel({ countries: HEALTHY_COUNTRIES, failedSources: ['Brazil'] }),
false,
'even a single failed source must block publish; cache TTL serves last healthy snapshot',
);
});
test('validateFuel rejects when country count < 30', () => {
const countries = [
{ code: 'US' }, { code: 'GB' }, { code: 'MY' },
...Array.from({ length: 25 }, (_, i) => ({ code: `EU${i}` })),
];
assert.equal(validateFuel({ countries, failedSources: [] }), false, '28 countries should fail >=30');
});
test('validateFuel rejects when critical source US is missing', () => {
const countries = [
{ code: 'GB' }, { code: 'MY' }, { code: 'BR' }, { code: 'MX' }, { code: 'NZ' },
...Array.from({ length: 27 }, (_, i) => ({ code: `EU${i}` })),
];
assert.equal(validateFuel({ countries, failedSources: [] }), false, 'missing US fails gate');
});
test('validateFuel rejects when critical source GB is missing', () => {
const countries = [
{ code: 'US' }, { code: 'MY' }, { code: 'BR' }, { code: 'MX' }, { code: 'NZ' },
...Array.from({ length: 27 }, (_, i) => ({ code: `EU${i}` })),
];
assert.equal(validateFuel({ countries, failedSources: [] }), false, 'missing GB fails gate');
});
test('validateFuel rejects when critical source MY is missing', () => {
const countries = [
{ code: 'US' }, { code: 'GB' }, { code: 'BR' }, { code: 'MX' }, { code: 'NZ' },
...Array.from({ length: 27 }, (_, i) => ({ code: `EU${i}` })),
];
assert.equal(validateFuel({ countries, failedSources: [] }), false, 'missing MY fails gate');
});
test('validateFuel rejects null/undefined/empty', () => {
assert.equal(validateFuel(null), false);
assert.equal(validateFuel(undefined), false);
assert.equal(validateFuel({}), false);
assert.equal(validateFuel({ countries: [] }), false);
});