Files
worldmonitor/scripts/seed-health-air-quality.mjs
Fayez Bast 9d94ad36aa feat(climate+health):add shared air quality seed and mirrored health (#2634)
* feat(climate+health):add shared air quality seed and mirrored health/climate RPCs

* feat(climate+health):add shared air quality seed and mirrored health/climate RPCs

* fix(air-quality): address review findings — TTL, seed-health, FAST_KEYS, shared meta

- Raise CACHE_TTL from 3600 to 10800 (3× the 1h cron cadence; gold standard)
- Add health:air-quality to api/seed-health.js SEED_DOMAINS so monitoring dashboard tracks freshness
- Remove climateAirQuality and healthAirQuality from FAST_KEYS (large station payloads; load in slow batch)
- Point climateAirQuality SEED_META to same meta key as healthAirQuality (same seeder run, one source of truth)

* fix(bootstrap): move air quality keys to SLOW tier — large station payloads avoid critical-path batch

* fix(air-quality): fix malformed OpenAQ URL and remove from bootstrap until panel exists

- Drop deprecated first URL attempt (parameters=pm25, order_by=lastUpdated, sort=desc);
  use correct v3 params (parameters_id=2, sort_order=desc) directly — eliminates
  guaranteed 4xx retry cycle per page on 20-page crawl
- Remove climateAirQuality and healthAirQuality from BOOTSTRAP_CACHE_KEYS, SLOW_KEYS,
  and BOOTSTRAP_TIERS — no panel consumes these yet; adding thousands of station records
  to every startup bootstrap is pure payload bloat
- Remove normalizeAirQualityPayload helpers from bootstrap.js (no longer called)
- Update service wrappers to fetch via RPC directly; re-add bootstrap hydration
  when a panel actually needs it

* fix(air-quality): raise lock TTL to 3600s to cover 20-page crawl worst case

2 OpenAQ calls × 20 pages × (30s timeout × 3 attempts) = 3600s max runtime.
Previous 600s TTL allowed concurrent cron runs on any degraded upstream.

---------

Co-authored-by: Elie Habib <elie.habib@gmail.com>
2026-04-03 10:27:37 +04:00

599 lines
20 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env node
import {
acquireLockSafely,
CHROME_UA,
extendExistingTtl,
getRedisCredentials,
loadEnvFile,
logSeedResult,
releaseLock,
verifySeedKey,
withRetry,
} from './_seed-utils.mjs';
loadEnvFile(import.meta.url);
export const HEALTH_AIR_QUALITY_KEY = 'health:air-quality:v1';
export const CLIMATE_AIR_QUALITY_KEY = 'climate:air-quality:v1';
export const CACHE_TTL = 10800; // 3h — 3× the 1h cron cadence (gold standard: TTL ≥ 3× interval)
export const AIR_QUALITY_WINDOW_MS = 2 * 60 * 60 * 1000;
export const OPENAQ_META_KEY = 'seed-meta:health:air-quality';
export const CLIMATE_META_KEY = 'seed-meta:climate:air-quality';
export const OPENAQ_SOURCE_VERSION = 'openaq-v3-pm25-waqi-optional-v2';
const OPENAQ_LOCATIONS_URL = 'https://api.openaq.org/v3/locations';
const OPENAQ_PM25_LATEST_URL = 'https://api.openaq.org/v3/parameters/2/latest';
const OPENAQ_PAGE_LIMIT = 1000;
const OPENAQ_MAX_PAGES = 20;
// Worst case: 2 OpenAQ calls × 20 pages × (30s timeout × 3 attempts) ≈ 3600s
const AIR_QUALITY_LOCK_TTL_MS = 3_600_000;
// The product only exposes four buckets, so EPA's sensitive/unhealthy/very-unhealthy
// bands are collapsed into a single "unhealthy" level.
const EPA_PM25_BREAKPOINTS = [
{ cLow: 0.0, cHigh: 12.0, iLow: 0, iHigh: 50 },
{ cLow: 12.1, cHigh: 35.4, iLow: 51, iHigh: 100 },
{ cLow: 35.5, cHigh: 55.4, iLow: 101, iHigh: 150 },
{ cLow: 55.5, cHigh: 150.4, iLow: 151, iHigh: 200 },
{ cLow: 150.5, cHigh: 250.4, iLow: 201, iHigh: 300 },
{ cLow: 250.5, cHigh: 350.4, iLow: 301, iHigh: 400 },
{ cLow: 350.5, cHigh: 500.4, iLow: 401, iHigh: 500 },
];
const WAQI_WORLD_TILES = [
'-55,-180,0,-60',
'-55,-60,0,60',
'-55,60,0,180',
'0,-180,55,-60',
'0,-60,55,60',
'0,60,55,180',
];
class SeedConfigurationError extends Error {
constructor(message) {
super(message);
this.name = 'SeedConfigurationError';
this.code = 'SEED_CONFIGURATION_ERROR';
this.retryable = false;
}
}
function toFiniteNumber(value) {
const numeric = typeof value === 'number' ? value : Number(value);
return Number.isFinite(numeric) ? numeric : null;
}
function trimString(value) {
return typeof value === 'string' ? value.trim() : '';
}
function normalizeCountryCode(value) {
const code = trimString(value).toUpperCase();
return /^[A-Z]{2}$/.test(code) ? code : '';
}
function toEpochMs(value) {
if (typeof value === 'number' && Number.isFinite(value)) return value;
if (value instanceof Date) return value.getTime();
if (typeof value !== 'string' || !value.trim()) return null;
const parsed = Date.parse(value);
return Number.isFinite(parsed) ? parsed : null;
}
function roundTo(value, decimals = 1) {
const factor = 10 ** decimals;
return Math.round(value * factor) / factor;
}
function truncatePm25(value) {
return Math.floor(value * 10) / 10;
}
export function computeUsAqiFromPm25(pm25) {
const numeric = toFiniteNumber(pm25);
if (numeric == null || numeric < 0) return 0;
const concentration = Math.min(truncatePm25(numeric), 500.4);
const breakpoint = EPA_PM25_BREAKPOINTS.find(({ cHigh }) => concentration <= cHigh) ?? EPA_PM25_BREAKPOINTS.at(-1);
const ratio = (breakpoint.iHigh - breakpoint.iLow) / (breakpoint.cHigh - breakpoint.cLow);
return Math.max(0, Math.min(500, Math.round((ratio * (concentration - breakpoint.cLow)) + breakpoint.iLow)));
}
export function classifyRiskLevel(aqi) {
const numeric = Math.max(0, Math.min(500, Math.round(toFiniteNumber(aqi) ?? 0)));
if (numeric <= 50) return 'good';
if (numeric <= 100) return 'moderate';
if (numeric <= 300) return 'unhealthy';
return 'hazardous';
}
function isFreshMeasurement(measuredAt, nowMs = Date.now()) {
return Number.isFinite(measuredAt) && measuredAt >= (nowMs - AIR_QUALITY_WINDOW_MS) && measuredAt <= (nowMs + 5 * 60 * 1000);
}
function pickLocationName(location) {
return trimString(location?.locality)
|| trimString(location?.city)
|| trimString(location?.name)
|| normalizeCountryCode(location?.country?.code)
|| 'Unknown';
}
function pickCoordinates(primary, fallback) {
const lat = toFiniteNumber(primary?.latitude ?? primary?.lat) ?? toFiniteNumber(fallback?.latitude ?? fallback?.lat);
const lng = toFiniteNumber(primary?.longitude ?? primary?.lng) ?? toFiniteNumber(fallback?.longitude ?? fallback?.lng);
if (lat == null || lng == null) return null;
return { lat: roundTo(lat, 4), lng: roundTo(lng, 4) };
}
export function buildOpenAqLocationIndex(locations = []) {
const index = new Map();
for (const location of locations) {
const id = toFiniteNumber(location?.id);
if (id == null) continue;
index.set(id, {
city: pickLocationName(location),
countryCode: normalizeCountryCode(location?.country?.code),
coordinates: pickCoordinates(location?.coordinates),
});
}
return index;
}
function buildLocationMetadata(result, locationIndex) {
const locationId = toFiniteNumber(
result?.locationsId
?? result?.locationId
?? result?.location?.id,
);
const indexed = locationId != null ? locationIndex.get(locationId) : null;
const inlineLocation = result?.location ?? null;
const city = indexed?.city || pickLocationName(inlineLocation);
const countryCode = indexed?.countryCode || normalizeCountryCode(inlineLocation?.country?.code);
const coordinates = pickCoordinates(result?.coordinates, indexed?.coordinates ?? inlineLocation?.coordinates);
if (!city || !coordinates) return null;
return { locationId: locationId ?? null, city, countryCode, coordinates };
}
export function buildOpenAqStations(locations = [], latestMeasurements = [], nowMs = Date.now()) {
const locationIndex = buildOpenAqLocationIndex(locations);
const latestByLocation = new Map();
for (const result of latestMeasurements) {
const pm25 = toFiniteNumber(result?.value);
if (pm25 == null || pm25 < 0) continue;
const measuredAt = toEpochMs(result?.datetime?.utc ?? result?.datetime?.local ?? result?.date?.utc ?? result?.date?.local);
if (!isFreshMeasurement(measuredAt, nowMs)) continue;
const metadata = buildLocationMetadata(result, locationIndex);
if (!metadata) continue;
const pollutant = trimString(result?.parameter?.name) || trimString(result?.parameter) || 'pm25';
const normalizedPm25 = roundTo(pm25, 1);
const aqi = computeUsAqiFromPm25(normalizedPm25);
const station = {
city: metadata.city,
countryCode: metadata.countryCode,
lat: metadata.coordinates.lat,
lng: metadata.coordinates.lng,
pm25: normalizedPm25,
aqi,
riskLevel: classifyRiskLevel(aqi),
pollutant,
measuredAt,
source: 'OpenAQ',
};
const dedupeKey = metadata.locationId ?? `${station.city}:${station.lat}:${station.lng}`;
const previous = latestByLocation.get(dedupeKey);
if (!previous || station.measuredAt > previous.measuredAt || (station.measuredAt === previous.measuredAt && station.pm25 > previous.pm25)) {
latestByLocation.set(dedupeKey, station);
}
}
return [...latestByLocation.values()].sort((left, right) => right.aqi - left.aqi || right.measuredAt - left.measuredAt);
}
function extractCountryCodeFromName(name) {
const match = trimString(name).match(/\b([A-Z]{2})\b$/);
return match ? normalizeCountryCode(match[1]) : '';
}
export function buildWaqiStations(entries = [], nowMs = Date.now()) {
const stations = [];
for (const entry of entries) {
const pm25 = toFiniteNumber(entry?.iaqi?.pm25?.v ?? entry?.pm25);
const lat = toFiniteNumber(entry?.lat);
const lng = toFiniteNumber(entry?.lon);
const aqi = toFiniteNumber(entry?.aqi);
const stationName = trimString(entry?.station?.name);
const measuredAt = toEpochMs(entry?.station?.time);
if (pm25 == null || lat == null || lng == null || aqi == null || !stationName || !isFreshMeasurement(measuredAt, nowMs)) continue;
stations.push({
city: stationName.split(',')[0]?.trim() || stationName,
countryCode: extractCountryCodeFromName(stationName),
lat: roundTo(lat, 4),
lng: roundTo(lng, 4),
pm25: roundTo(pm25, 1),
aqi: Math.max(0, Math.min(500, Math.round(aqi))),
riskLevel: classifyRiskLevel(aqi),
pollutant: trimString(entry?.dominentpol) || 'pm25',
measuredAt,
source: 'WAQI',
});
}
return stations;
}
function isNormalizedAirQualityStation(station) {
return Boolean(
trimString(station?.city)
&& toFiniteNumber(station?.lat) != null
&& toFiniteNumber(station?.lng) != null
&& toFiniteNumber(station?.aqi) != null
&& toEpochMs(station?.measuredAt) != null,
);
}
function normalizeSupplementalStations({ waqiStations = [], waqiEntries = [], nowMs = Date.now() }) {
const normalizedStations = Array.isArray(waqiStations)
? waqiStations.filter(isNormalizedAirQualityStation)
: [];
if (!Array.isArray(waqiEntries) || waqiEntries.length === 0) {
return normalizedStations;
}
// `buildAirQualityPayload()` now accepts pre-normalized `waqiStations`.
// Keep `waqiEntries` as a backward-compatible alias for raw WAQI API payloads.
const legacyStations = waqiEntries.some(isNormalizedAirQualityStation)
? waqiEntries.filter(isNormalizedAirQualityStation)
: buildWaqiStations(waqiEntries, nowMs);
return [...normalizedStations, ...legacyStations];
}
function stationIdentity(station) {
return [
trimString(station.city).toLowerCase(),
normalizeCountryCode(station.countryCode).toLowerCase(),
roundTo(station.lat, 2),
roundTo(station.lng, 2),
].join('|');
}
export function mergeAirQualityStations(primaryStations = [], secondaryStations = []) {
const merged = new Map();
for (const station of primaryStations) {
if (!isNormalizedAirQualityStation(station)) continue;
merged.set(stationIdentity(station), station);
}
for (const station of secondaryStations) {
if (!isNormalizedAirQualityStation(station)) continue;
const key = stationIdentity(station);
if (!merged.has(key)) merged.set(key, station);
}
return [...merged.values()].sort((left, right) => right.aqi - left.aqi || right.measuredAt - left.measuredAt);
}
function toOutputStation(station) {
return {
city: station.city,
country_code: station.countryCode,
lat: station.lat,
lng: station.lng,
pm25: station.pm25,
aqi: station.aqi,
risk_level: station.riskLevel,
pollutant: station.pollutant,
measured_at: station.measuredAt,
source: station.source,
};
}
export function buildOpenAqHeaders(apiKey = process.env.OPENAQ_API_KEY) {
const trimmedKey = trimString(apiKey);
if (!trimmedKey) {
throw new SeedConfigurationError('Missing OPENAQ_API_KEY — OpenAQ v3 requests now require X-API-Key');
}
return {
Accept: 'application/json',
'User-Agent': CHROME_UA,
'X-API-Key': trimmedKey,
};
}
function isConfigurationError(error) {
return error instanceof SeedConfigurationError || error?.code === 'SEED_CONFIGURATION_ERROR';
}
async function fetchJson(url, label, headers = {}) {
const response = await fetch(url, {
headers: {
Accept: 'application/json',
'User-Agent': CHROME_UA,
...headers,
},
signal: AbortSignal.timeout(30_000),
});
if (!response.ok) {
const body = await response.text().catch(() => '');
throw new Error(`${label}: HTTP ${response.status} ${body.slice(0, 200)}`.trim());
}
return response.json();
}
function buildUrl(baseUrl, params) {
const url = new URL(baseUrl);
for (const [key, value] of Object.entries(params)) {
if (value == null || value === '') continue;
url.searchParams.set(key, String(value));
}
return url.toString();
}
async function fetchOpenAqLocationsPage(page) {
const headers = buildOpenAqHeaders();
const url = buildUrl(OPENAQ_LOCATIONS_URL, {
limit: OPENAQ_PAGE_LIMIT,
page,
parameters_id: 2,
sort_order: 'desc',
});
return await withRetry(() => fetchJson(url, `OpenAQ locations page ${page}`, headers), 2, 1_000);
}
async function fetchOpenAqLatestPage(page) {
const headers = buildOpenAqHeaders();
const url = buildUrl(OPENAQ_PM25_LATEST_URL, {
limit: OPENAQ_PAGE_LIMIT,
page,
});
return withRetry(() => fetchJson(url, `OpenAQ latest page ${page}`, headers), 2, 1_000);
}
async function fetchPagedResults(fetchPage, label) {
const results = [];
let expectedFound = 0;
for (let page = 1; page <= OPENAQ_MAX_PAGES; page++) {
const payload = await fetchPage(page);
const pageResults = Array.isArray(payload?.results) ? payload.results : [];
results.push(...pageResults);
const found = toFiniteNumber(payload?.meta?.found);
const effectiveLimit = toFiniteNumber(payload?.meta?.limit) ?? OPENAQ_PAGE_LIMIT;
if (found != null && found > 0) expectedFound = found;
if (pageResults.length < effectiveLimit) break;
if (expectedFound > 0 && results.length >= expectedFound) break;
}
if (results.length === 0) {
throw new Error(`${label}: no results returned`);
}
return results;
}
async function fetchWaqiStations(nowMs) {
const apiKey = trimString(process.env.WAQI_API_KEY);
if (!apiKey) {
console.log(' [AIR] WAQI_API_KEY missing; skipping WAQI supplement');
return [];
}
const entries = [];
for (const bbox of WAQI_WORLD_TILES) {
const url = buildUrl('https://api.waqi.info/map/bounds/', { latlng: bbox, token: apiKey });
try {
const payload = await withRetry(() => fetchJson(url, `WAQI ${bbox}`), 1, 1_000);
if (payload?.status === 'ok' && Array.isArray(payload.data)) {
entries.push(...payload.data);
}
} catch (error) {
console.warn(` [AIR] WAQI tile ${bbox} failed: ${error?.message ?? error}`);
}
}
return buildWaqiStations(entries, nowMs);
}
export function buildAirQualityPayload({
locations = [],
latestMeasurements = [],
waqiStations = [],
waqiEntries = [],
nowMs = Date.now(),
} = {}) {
const openAqStations = buildOpenAqStations(locations, latestMeasurements, nowMs);
const supplementalStations = normalizeSupplementalStations({ waqiStations, waqiEntries, nowMs });
const mergedStations = mergeAirQualityStations(openAqStations, supplementalStations);
return {
stations: mergedStations.map(toOutputStation),
fetchedAt: nowMs,
};
}
export async function fetchAirQualityPayload(nowMs = Date.now()) {
const [locations, latestMeasurements, waqiStations] = await Promise.all([
fetchPagedResults(fetchOpenAqLocationsPage, 'OpenAQ locations'),
fetchPagedResults(fetchOpenAqLatestPage, 'OpenAQ latest'),
fetchWaqiStations(nowMs).catch((error) => {
console.warn(` [AIR] WAQI supplement failed: ${error?.message ?? error}`);
return [];
}),
]);
const payload = buildAirQualityPayload({
locations,
latestMeasurements,
waqiStations,
nowMs,
});
if (!payload.stations.length) {
throw new Error('No fresh PM2.5 stations found in the last 2 hours');
}
return payload;
}
export function validateAirQualityPayload(payload) {
return Array.isArray(payload?.stations) && payload.stations.length > 0;
}
export function buildMirrorWriteCommands(payload, ttlSeconds, fetchedAt = Date.now(), sourceVersion = OPENAQ_SOURCE_VERSION) {
const payloadJson = JSON.stringify(payload);
const recordCount = payload?.stations?.length ?? 0;
const metaTtl = 86400 * 7;
const healthMeta = JSON.stringify({ fetchedAt, recordCount, sourceVersion });
const climateMeta = JSON.stringify({ fetchedAt, recordCount, sourceVersion });
return [
['SET', HEALTH_AIR_QUALITY_KEY, payloadJson, 'EX', String(ttlSeconds)],
['SET', CLIMATE_AIR_QUALITY_KEY, payloadJson, 'EX', String(ttlSeconds)],
['SET', OPENAQ_META_KEY, healthMeta, 'EX', String(metaTtl)],
['SET', CLIMATE_META_KEY, climateMeta, 'EX', String(metaTtl)],
];
}
async function redisPipeline(commands) {
const { url, token } = getRedisCredentials();
const response = await fetch(`${url}/pipeline`, {
method: 'POST',
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify(commands),
signal: AbortSignal.timeout(15_000),
});
if (!response.ok) {
const body = await response.text().catch(() => '');
throw new Error(`Redis pipeline failed: HTTP ${response.status}${body.slice(0, 200)}`);
}
return response.json();
}
async function publishMirroredPayload(payload) {
const fetchedAt = Date.now();
const commands = buildMirrorWriteCommands(payload, CACHE_TTL, fetchedAt, OPENAQ_SOURCE_VERSION);
await redisPipeline(commands);
return {
fetchedAt,
payloadBytes: Buffer.byteLength(JSON.stringify(payload), 'utf8'),
recordCount: payload?.stations?.length ?? 0,
};
}
async function verifyMirroredKeys() {
const [healthPayload, climatePayload] = await Promise.all([
verifySeedKey(HEALTH_AIR_QUALITY_KEY),
verifySeedKey(CLIMATE_AIR_QUALITY_KEY),
]);
return Boolean(healthPayload && climatePayload);
}
async function fetchAirQualityPayloadWithRetry(maxRetries = 2, delayMs = 1_000) {
let lastError;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fetchAirQualityPayload();
} catch (error) {
lastError = error;
if (isConfigurationError(error) || attempt >= maxRetries) break;
const wait = delayMs * 2 ** attempt;
const cause = error?.cause ? ` (cause: ${error.cause.message || error.cause.code || error.cause})` : '';
console.warn(` Retry ${attempt + 1}/${maxRetries} in ${wait}ms: ${error?.message ?? error}${cause}`);
await new Promise((resolve) => setTimeout(resolve, wait));
}
}
throw lastError;
}
async function main() {
const domain = 'health';
const resource = 'air-quality';
const startMs = Date.now();
const runId = `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
console.log(`=== ${domain}:${resource} Seed ===`);
console.log(` Run ID: ${runId}`);
console.log(` Keys: ${HEALTH_AIR_QUALITY_KEY}, ${CLIMATE_AIR_QUALITY_KEY}`);
// Each OpenAQ branch can walk up to 20 pages sequentially with per-request timeouts.
// Keep the lock well above the realistic worst-case runtime to avoid overlapping cron runs.
const lockResult = await acquireLockSafely(`${domain}:${resource}`, runId, AIR_QUALITY_LOCK_TTL_MS, {
label: `${domain}:${resource}`,
});
if (lockResult.skipped) process.exit(0);
if (!lockResult.locked) {
console.log(' SKIPPED: another seed run in progress');
process.exit(0);
}
let payload;
try {
payload = await fetchAirQualityPayloadWithRetry();
} catch (error) {
await releaseLock(`${domain}:${resource}`, runId);
const durationMs = Date.now() - startMs;
const cause = error?.cause ? ` (cause: ${error.cause.message || error.cause.code || error.cause})` : '';
console.error(` FETCH FAILED: ${error?.message ?? error}${cause}`);
await extendExistingTtl([
HEALTH_AIR_QUALITY_KEY,
CLIMATE_AIR_QUALITY_KEY,
OPENAQ_META_KEY,
CLIMATE_META_KEY,
], CACHE_TTL).catch(() => {});
if (isConfigurationError(error)) {
console.log(`\n=== Fatal configuration error (${Math.round(durationMs)}ms) ===`);
process.exit(1);
}
console.log(`\n=== Failed gracefully (${Math.round(durationMs)}ms) ===`);
process.exit(0);
}
if (!validateAirQualityPayload(payload)) {
await releaseLock(`${domain}:${resource}`, runId);
await extendExistingTtl([
HEALTH_AIR_QUALITY_KEY,
CLIMATE_AIR_QUALITY_KEY,
OPENAQ_META_KEY,
CLIMATE_META_KEY,
], CACHE_TTL).catch(() => {});
console.log(' SKIPPED: validation failed (empty data)');
process.exit(0);
}
try {
const publishResult = await publishMirroredPayload(payload);
const durationMs = Date.now() - startMs;
logSeedResult(domain, publishResult.recordCount, durationMs, {
payloadBytes: publishResult.payloadBytes,
mirroredKeys: 2,
});
const verified = await verifyMirroredKeys().catch(() => false);
if (verified) {
console.log(' Verified: both Redis keys present');
} else {
console.warn(` WARNING: verification read returned null for one or more mirror keys (${HEALTH_AIR_QUALITY_KEY}, ${CLIMATE_AIR_QUALITY_KEY})`);
}
console.log(`\n=== Done (${Math.round(durationMs)}ms) ===`);
await releaseLock(`${domain}:${resource}`, runId);
process.exit(0);
} catch (error) {
await releaseLock(`${domain}:${resource}`, runId);
throw error;
}
}
const isMain = process.argv[1] && import.meta.url.endsWith(process.argv[1].replace(/^file:\/\//, ''));
if (isMain) {
main().catch((error) => {
const cause = error?.cause ? ` (cause: ${error.cause.message || error.cause.code || error.cause})` : '';
console.error('FATAL:', `${error?.message ?? error}${cause}`);
process.exit(1);
});
}