mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
* fix(unrest): proxy-only fetch + 3-attempt retry for GDELT Production logs showed PR #3362's 45s proxy timeout solved one failure mode (CONNECT-tunnel timeouts) but ~80% of ticks now fail in 3-14 seconds with either "Proxy CONNECT: HTTP/1.1 522 Server Error" (Cloudflare can't reach GDELT origin) or "Client network socket disconnected before secure TLS connection" (Decodo RSTs the handshake). These are fast-fails, not timeouts — no amount of timeout bumping helps. Two changes: 1. Drop the direct fetch entirely. Every direct attempt in 14h of logs errored with UND_ERR_CONNECT_TIMEOUT or ECONNRESET — 0% success since PR #3256 added the proxy fallback. The direct call costs ~8-30s per tick for nothing. 2. Wrap the proxy call in a 3-attempt retry with 1.5-3s jitter. Single-attempt per-tick success rate measured at ~18%; with 3 attempts that lifts to ~75%+ under the same Decodo↔Cloudflare flake rate, comfortably keeping seedAge under the 120m STALE_SEED threshold. Deeper structural fix (out of scope here): wire ACLED credentials on the Railway unrest service so GDELT isn't the single upstream. * test(unrest): cover GDELT proxy retry path + no-proxy hard-fail Address PR #3395 reviewer concerns: (1) "no automated coverage for the new retry path or the no-proxy path" Add scripts/seed-unrest-events.mjs DI seams (_proxyFetcher, _sleep, _jitter, _maxAttempts, _resolveProxyForConnect) and a 6-test suite at tests/seed-unrest-gdelt-fetch.test.mjs covering: 1. Single-attempt success — no retries fire. 2. 2 transient failures + 3rd-attempt success — recovers, returns JSON. 3. All attempts fail — throws LAST error, exact attempt count. 4. Malformed proxy body — SyntaxError short-circuits retry (deterministic parse failures shouldn't burn attempts). 5. Missing CONNECT proxy creds — fetchGdeltEvents throws clear "PROXY_URL env var is not set" pointer for ops, asserts NO proxy fetcher invocation (no wasted network). 6. End-to-end with retry — fetchGdeltEvents with one transient 522 recovers and aggregates events normally. Gate runSeed() entry-point with `import.meta.url === file://argv[1]` so tests can `import` the module without triggering a real seed run. (2) "review assumes Railway has Decodo creds; without them, fails immediately" Yes — that's intentional. Direct fetch had 0% success in production for weeks (every Railway tick errored UND_ERR_CONNECT_TIMEOUT or ECONNRESET) since PR #3256 added the proxy fallback. Reintroducing it as "soft" fallback would just add ~30s of latency + log noise per tick. What's improved here: the no-proxy error message now names the missing env var (PROXY_URL) so an operator who hits this in Railway logs has a direct pointer instead of a generic "GDELT requires proxy" string.
336 lines
12 KiB
JavaScript
336 lines
12 KiB
JavaScript
#!/usr/bin/env node
|
|
|
|
import { loadEnvFile, CHROME_UA, runSeed, httpsProxyFetchRaw, resolveProxyForConnect } from './_seed-utils.mjs';
|
|
import { getAcledToken } from './shared/acled-oauth.mjs';
|
|
|
|
loadEnvFile(import.meta.url);
|
|
|
|
const GDELT_GKG_URL = 'https://api.gdeltproject.org/api/v1/gkg_geojson';
|
|
const ACLED_API_URL = 'https://acleddata.com/api/acled/read';
|
|
const CANONICAL_KEY = 'unrest:events:v1';
|
|
const CACHE_TTL = 16200; // 4.5h — 6x the 45 min cron interval (was 1.3x)
|
|
|
|
// ---------- ACLED Event Type Mapping (from _shared.ts) ----------
|
|
|
|
function mapAcledEventType(eventType, subEventType) {
|
|
const lower = (eventType + ' ' + subEventType).toLowerCase();
|
|
if (lower.includes('riot') || lower.includes('mob violence')) return 'UNREST_EVENT_TYPE_RIOT';
|
|
if (lower.includes('strike')) return 'UNREST_EVENT_TYPE_STRIKE';
|
|
if (lower.includes('demonstration')) return 'UNREST_EVENT_TYPE_DEMONSTRATION';
|
|
if (lower.includes('protest')) return 'UNREST_EVENT_TYPE_PROTEST';
|
|
return 'UNREST_EVENT_TYPE_CIVIL_UNREST';
|
|
}
|
|
|
|
// ---------- Severity Classification (from _shared.ts) ----------
|
|
|
|
function classifySeverity(fatalities, eventType) {
|
|
if (fatalities > 0 || eventType.toLowerCase().includes('riot')) return 'SEVERITY_LEVEL_HIGH';
|
|
if (eventType.toLowerCase().includes('protest')) return 'SEVERITY_LEVEL_MEDIUM';
|
|
return 'SEVERITY_LEVEL_LOW';
|
|
}
|
|
|
|
function classifyGdeltSeverity(count, name) {
|
|
const lowerName = name.toLowerCase();
|
|
if (count > 100 || lowerName.includes('riot') || lowerName.includes('clash')) return 'SEVERITY_LEVEL_HIGH';
|
|
if (count < 25) return 'SEVERITY_LEVEL_LOW';
|
|
return 'SEVERITY_LEVEL_MEDIUM';
|
|
}
|
|
|
|
function classifyGdeltEventType(name) {
|
|
const lowerName = name.toLowerCase();
|
|
if (lowerName.includes('riot')) return 'UNREST_EVENT_TYPE_RIOT';
|
|
if (lowerName.includes('strike')) return 'UNREST_EVENT_TYPE_STRIKE';
|
|
if (lowerName.includes('demonstration')) return 'UNREST_EVENT_TYPE_DEMONSTRATION';
|
|
return 'UNREST_EVENT_TYPE_PROTEST';
|
|
}
|
|
|
|
// ---------- Deduplication (from _shared.ts) ----------
|
|
|
|
function deduplicateEvents(events) {
|
|
const unique = new Map();
|
|
for (const event of events) {
|
|
const lat = event.location?.latitude ?? 0;
|
|
const lon = event.location?.longitude ?? 0;
|
|
const latKey = Math.round(lat * 10) / 10;
|
|
const lonKey = Math.round(lon * 10) / 10;
|
|
const dateKey = new Date(event.occurredAt).toISOString().split('T')[0];
|
|
const key = `${latKey}:${lonKey}:${dateKey}`;
|
|
|
|
const existing = unique.get(key);
|
|
if (!existing) {
|
|
unique.set(key, event);
|
|
} else if (event.sourceType === 'UNREST_SOURCE_TYPE_ACLED' && existing.sourceType !== 'UNREST_SOURCE_TYPE_ACLED') {
|
|
event.sources = [...new Set([...event.sources, ...existing.sources])];
|
|
unique.set(key, event);
|
|
} else if (existing.sourceType === 'UNREST_SOURCE_TYPE_ACLED') {
|
|
existing.sources = [...new Set([...existing.sources, ...event.sources])];
|
|
} else {
|
|
existing.sources = [...new Set([...existing.sources, ...event.sources])];
|
|
if (existing.sources.length >= 2) existing.confidence = 'CONFIDENCE_LEVEL_HIGH';
|
|
}
|
|
}
|
|
return Array.from(unique.values());
|
|
}
|
|
|
|
// ---------- Sort (from _shared.ts) ----------
|
|
|
|
function sortBySeverityAndRecency(events) {
|
|
const severityOrder = {
|
|
SEVERITY_LEVEL_HIGH: 0,
|
|
SEVERITY_LEVEL_MEDIUM: 1,
|
|
SEVERITY_LEVEL_LOW: 2,
|
|
SEVERITY_LEVEL_UNSPECIFIED: 3,
|
|
};
|
|
return events.sort((a, b) => {
|
|
const sevDiff = (severityOrder[a.severity] ?? 3) - (severityOrder[b.severity] ?? 3);
|
|
if (sevDiff !== 0) return sevDiff;
|
|
return b.occurredAt - a.occurredAt;
|
|
});
|
|
}
|
|
|
|
// ---------- ACLED Fetch ----------
|
|
|
|
async function fetchAcledProtests() {
|
|
const token = await getAcledToken({ userAgent: CHROME_UA });
|
|
if (!token) {
|
|
console.log(' ACLED: no credentials configured, skipping');
|
|
return [];
|
|
}
|
|
|
|
const now = Date.now();
|
|
const startDate = new Date(now - 30 * 24 * 60 * 60 * 1000).toISOString().split('T')[0];
|
|
const endDate = new Date(now).toISOString().split('T')[0];
|
|
|
|
const params = new URLSearchParams({
|
|
event_type: 'Protests',
|
|
event_date: `${startDate}|${endDate}`,
|
|
event_date_where: 'BETWEEN',
|
|
limit: '500',
|
|
_format: 'json',
|
|
});
|
|
|
|
const resp = await fetch(`${ACLED_API_URL}?${params}`, {
|
|
headers: {
|
|
Accept: 'application/json',
|
|
Authorization: `Bearer ${token}`,
|
|
'User-Agent': CHROME_UA,
|
|
},
|
|
signal: AbortSignal.timeout(15_000),
|
|
});
|
|
|
|
if (!resp.ok) throw new Error(`ACLED API error: ${resp.status}`);
|
|
const data = await resp.json();
|
|
if (data.message || data.error) throw new Error(data.message || data.error || 'ACLED API error');
|
|
|
|
const rawEvents = data.data || [];
|
|
console.log(` ACLED: ${rawEvents.length} raw events`);
|
|
|
|
return rawEvents
|
|
.filter((e) => {
|
|
const lat = parseFloat(e.latitude || '');
|
|
const lon = parseFloat(e.longitude || '');
|
|
return Number.isFinite(lat) && Number.isFinite(lon) && lat >= -90 && lat <= 90 && lon >= -180 && lon <= 180;
|
|
})
|
|
.map((e) => {
|
|
const fatalities = parseInt(e.fatalities || '', 10) || 0;
|
|
return {
|
|
id: `acled-${e.event_id_cnty}`,
|
|
title: e.notes?.slice(0, 200) || `${e.sub_event_type} in ${e.location}`,
|
|
summary: typeof e.notes === 'string' ? e.notes.substring(0, 500) : '',
|
|
eventType: mapAcledEventType(e.event_type || '', e.sub_event_type || ''),
|
|
city: e.location || '',
|
|
country: e.country || '',
|
|
region: e.admin1 || '',
|
|
location: {
|
|
latitude: parseFloat(e.latitude || '0'),
|
|
longitude: parseFloat(e.longitude || '0'),
|
|
},
|
|
occurredAt: new Date(e.event_date || '').getTime(),
|
|
severity: classifySeverity(fatalities, e.event_type || ''),
|
|
fatalities,
|
|
sources: [e.source].filter(Boolean),
|
|
sourceType: 'UNREST_SOURCE_TYPE_ACLED',
|
|
tags: e.tags?.split(';').map((t) => t.trim()).filter(Boolean) ?? [],
|
|
actors: [e.actor1, e.actor2].filter(Boolean),
|
|
confidence: 'CONFIDENCE_LEVEL_HIGH',
|
|
};
|
|
});
|
|
}
|
|
|
|
// ---------- GDELT Fetch ----------
|
|
|
|
function describeErr(err) {
|
|
if (!err) return 'unknown';
|
|
const cause = err.cause;
|
|
const causeCode = cause?.code || cause?.errno || cause?.message || (typeof cause === 'string' ? cause : null);
|
|
return causeCode ? `${err.message} (cause: ${causeCode})` : (err.message || String(err));
|
|
}
|
|
|
|
// Direct fetch from Railway has 0% success — every attempt errors with
|
|
// UND_ERR_CONNECT_TIMEOUT or ECONNRESET. Path is always proxy-only here.
|
|
// Decodo→Cloudflare→GDELT occasionally returns 522 or RSTs the TLS handshake
|
|
// (~80% per single attempt in production); retry-with-jitter recovers most of
|
|
// it without touching the cron interval.
|
|
//
|
|
// Test seams:
|
|
// _proxyFetcher — replaces httpsProxyFetchRaw (default production wiring).
|
|
// _sleep — replaces the inter-attempt jitter delay.
|
|
// _maxAttempts — replaces the default 3 (lets tests bound iterations).
|
|
// _jitter — replaces Math.random()-based jitter (deterministic in tests).
|
|
export async function fetchGdeltViaProxy(url, proxyAuth, opts = {}) {
|
|
const {
|
|
_proxyFetcher = httpsProxyFetchRaw,
|
|
_sleep = (ms) => new Promise((r) => setTimeout(r, ms)),
|
|
_maxAttempts = 3,
|
|
_jitter = () => 1500 + Math.random() * 1500,
|
|
} = opts;
|
|
let lastErr;
|
|
for (let attempt = 1; attempt <= _maxAttempts; attempt++) {
|
|
try {
|
|
const { buffer } = await _proxyFetcher(url, proxyAuth, {
|
|
accept: 'application/json',
|
|
timeoutMs: 45_000,
|
|
});
|
|
return JSON.parse(buffer.toString('utf8'));
|
|
} catch (err) {
|
|
lastErr = err;
|
|
// JSON.parse on a successfully fetched body is deterministic — retrying
|
|
// can't recover. Bail immediately so we don't burn three attempts on
|
|
// a malformed-but-cached upstream response.
|
|
if (err instanceof SyntaxError) throw err;
|
|
if (attempt < _maxAttempts) {
|
|
console.warn(` [GDELT] proxy attempt ${attempt}/${_maxAttempts} failed (${describeErr(err)}); retrying`);
|
|
await _sleep(_jitter());
|
|
}
|
|
}
|
|
}
|
|
throw lastErr;
|
|
}
|
|
|
|
export async function fetchGdeltEvents(opts = {}) {
|
|
const { _resolveProxyForConnect = resolveProxyForConnect, ..._proxyOpts } = opts;
|
|
const params = new URLSearchParams({
|
|
query: 'protest OR riot OR demonstration OR strike',
|
|
maxrows: '2500',
|
|
});
|
|
const url = `${GDELT_GKG_URL}?${params}`;
|
|
|
|
const proxyAuth = _resolveProxyForConnect();
|
|
if (!proxyAuth) {
|
|
// Direct fetch hasn't worked from Railway since PR #3256; this seeder
|
|
// hard-requires a CONNECT proxy. Surface the env var ops needs to set.
|
|
throw new Error('GDELT requires CONNECT proxy: PROXY_URL env var is not set on this Railway service');
|
|
}
|
|
|
|
let data;
|
|
try {
|
|
data = await fetchGdeltViaProxy(url, proxyAuth, _proxyOpts);
|
|
} catch (proxyErr) {
|
|
throw Object.assign(
|
|
new Error(`GDELT proxy failed (3 attempts): ${describeErr(proxyErr)}`),
|
|
{ cause: proxyErr },
|
|
);
|
|
}
|
|
|
|
const features = data?.features || [];
|
|
|
|
// Aggregate by location (v1 GKG returns individual mentions, not aggregated counts)
|
|
const locationMap = new Map();
|
|
for (const feature of features) {
|
|
const name = feature.properties?.name || '';
|
|
if (!name) continue;
|
|
|
|
const coords = feature.geometry?.coordinates;
|
|
if (!Array.isArray(coords) || coords.length < 2) continue;
|
|
|
|
const [lon, lat] = coords;
|
|
if (!Number.isFinite(lat) || !Number.isFinite(lon) || lat < -90 || lat > 90 || lon < -180 || lon > 180) continue;
|
|
|
|
const key = `${lat.toFixed(1)}:${lon.toFixed(1)}`;
|
|
const existing = locationMap.get(key);
|
|
if (existing) {
|
|
existing.count++;
|
|
if (feature.properties?.urltone < existing.worstTone) {
|
|
existing.worstTone = feature.properties.urltone;
|
|
}
|
|
} else {
|
|
locationMap.set(key, { name, lat, lon, count: 1, worstTone: feature.properties?.urltone ?? 0 });
|
|
}
|
|
}
|
|
|
|
const events = [];
|
|
for (const [, loc] of locationMap) {
|
|
if (loc.count < 5) continue;
|
|
|
|
const country = loc.name.split(',').pop()?.trim() || loc.name;
|
|
events.push({
|
|
id: `gdelt-${loc.lat.toFixed(2)}-${loc.lon.toFixed(2)}-${Date.now()}`,
|
|
title: `${loc.name} (${loc.count} reports)`,
|
|
summary: '',
|
|
eventType: classifyGdeltEventType(loc.name),
|
|
city: loc.name.split(',')[0]?.trim() || '',
|
|
country,
|
|
region: '',
|
|
location: { latitude: loc.lat, longitude: loc.lon },
|
|
occurredAt: Date.now(),
|
|
severity: classifyGdeltSeverity(loc.count, loc.name),
|
|
fatalities: 0,
|
|
sources: ['GDELT'],
|
|
sourceType: 'UNREST_SOURCE_TYPE_GDELT',
|
|
tags: [],
|
|
actors: [],
|
|
confidence: loc.count > 20 ? 'CONFIDENCE_LEVEL_HIGH' : 'CONFIDENCE_LEVEL_MEDIUM',
|
|
});
|
|
}
|
|
|
|
console.log(` GDELT: ${features.length} mentions → ${events.length} aggregated events`);
|
|
return events;
|
|
}
|
|
|
|
// ---------- Main Fetch ----------
|
|
|
|
async function fetchUnrestEvents() {
|
|
const results = await Promise.allSettled([fetchAcledProtests(), fetchGdeltEvents()]);
|
|
|
|
const acledEvents = results[0].status === 'fulfilled' ? results[0].value : [];
|
|
const gdeltEvents = results[1].status === 'fulfilled' ? results[1].value : [];
|
|
|
|
if (results[0].status === 'rejected') console.log(` ACLED failed: ${describeErr(results[0].reason)}`);
|
|
if (results[1].status === 'rejected') console.log(` GDELT failed: ${describeErr(results[1].reason)}`);
|
|
|
|
const merged = deduplicateEvents([...acledEvents, ...gdeltEvents]);
|
|
const sorted = sortBySeverityAndRecency(merged);
|
|
|
|
console.log(` Merged: ${acledEvents.length} ACLED + ${gdeltEvents.length} GDELT = ${sorted.length} deduplicated`);
|
|
|
|
return { events: sorted, clusters: [], pagination: undefined };
|
|
}
|
|
|
|
function validate(data) {
|
|
return Array.isArray(data?.events) && data.events.length > 0;
|
|
}
|
|
|
|
export function declareRecords(data) {
|
|
return Array.isArray(data?.events) ? data.events.length : 0;
|
|
}
|
|
|
|
// Gate the runSeed entry-point so this module is importable from tests
|
|
// without triggering a real seed run. process.argv[1] is set when this file
|
|
// is invoked as a script (`node scripts/seed-unrest-events.mjs`); under
|
|
// `node --test`, argv[1] is the test runner, not this file.
|
|
const isMain = import.meta.url === `file://${process.argv[1]}`;
|
|
if (isMain) {
|
|
runSeed('unrest', 'events', CANONICAL_KEY, fetchUnrestEvents, {
|
|
validateFn: validate,
|
|
ttlSeconds: CACHE_TTL,
|
|
sourceVersion: 'acled+gdelt',
|
|
|
|
declareRecords,
|
|
schemaVersion: 1,
|
|
maxStaleMin: 120,
|
|
}).catch((err) => {
|
|
const _cause = err.cause ? ` (cause: ${err.cause.message || err.cause.code || err.cause})` : ''; console.error('FATAL:', (err.message || err) + _cause);
|
|
process.exit(1);
|
|
});
|
|
}
|