mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
* feat(intelligence): GDELT tone/vol timeline per topic (#2044) * fix(gdelt-timeline): add isMain guard to seed script, fix gateway cache tier - Wrap runSeed() call in isMain guard (process.argv[1].endsWith check) to prevent CI failures when the seed module is imported rather than executed directly — pre-push hook does not catch this - Change gateway cache tier from 'medium' (20min CDN) to 'daily' (1h browser/s-maxage=86400 CDN) to align with the 1h TIMELINE_TTL on the per-topic tone/vol Redis keys * fix(gdelt-timeline): TTL 1h→12h, medium cache tier, real fetchedAt, exit 0 - seed-gdelt-intel.mjs: TIMELINE_TTL 3600→43200 (12h = 2× 6h cron) so tone/vol keys survive between cron runs instead of expiring after 1h - seed-gdelt-intel.mjs: afterPublish wraps tone/vol as {data, fetchedAt} so the real write timestamp is stored alongside the arrays - get-gdelt-topic-timeline.ts: unwrap new envelope shape; fetchedAt now reflects actual data write time instead of request time - gateway.ts: daily→medium cache tier (CDN s-maxage=1200 matches 6h cadence) - seed-gdelt-intel.mjs: process.exit(1)→0 to match seeder suite convention * fix(gdelt-timeline): add GdeltTimelinePoint type cast in unwrap helper
206 lines
8.2 KiB
JavaScript
206 lines
8.2 KiB
JavaScript
#!/usr/bin/env node
|
||
|
||
import { loadEnvFile, CHROME_UA, runSeed, sleep, verifySeedKey, writeExtraKey } from './_seed-utils.mjs';
|
||
|
||
loadEnvFile(import.meta.url);
|
||
|
||
const CANONICAL_KEY = 'intelligence:gdelt-intel:v1';
|
||
const CACHE_TTL = 86400; // 24h — intentionally much longer than the 2h cron so verifySeedKey always has a prior snapshot to merge from when GDELT 429s all topics
|
||
const TIMELINE_TTL = 43200; // 12h = 2× cron interval; tone/vol must survive until next 6h run
|
||
const GDELT_DOC_API = 'https://api.gdeltproject.org/api/v2/doc/doc';
|
||
const INTER_TOPIC_DELAY_MS = 20_000; // 20s between topics on success
|
||
const POST_EXHAUST_DELAY_MS = 120_000; // 2min extra cooldown after a topic exhausts all retries
|
||
|
||
const INTEL_TOPICS = [
|
||
{ id: 'military', query: '(military exercise OR troop deployment OR airstrike OR "naval exercise") sourcelang:eng' },
|
||
{ id: 'cyber', query: '(cyberattack OR ransomware OR hacking OR "data breach" OR APT) sourcelang:eng' },
|
||
{ id: 'nuclear', query: '(nuclear OR uranium enrichment OR IAEA OR "nuclear weapon" OR plutonium) sourcelang:eng' },
|
||
{ id: 'sanctions', query: '(sanctions OR embargo OR "trade war" OR tariff OR "economic pressure") sourcelang:eng' },
|
||
{ id: 'intelligence', query: '(espionage OR spy OR "intelligence agency" OR covert OR surveillance) sourcelang:eng' },
|
||
{ id: 'maritime', query: '(naval blockade OR piracy OR "strait of hormuz" OR "south china sea" OR warship) sourcelang:eng' },
|
||
];
|
||
|
||
function isValidUrl(str) {
|
||
try {
|
||
const u = new URL(str);
|
||
return u.protocol === 'http:' || u.protocol === 'https:';
|
||
} catch { return false; }
|
||
}
|
||
|
||
function normalizeArticle(raw) {
|
||
const url = raw.url || '';
|
||
if (!isValidUrl(url)) return null;
|
||
return {
|
||
title: String(raw.title || '').slice(0, 500),
|
||
url,
|
||
source: String(raw.domain || raw.source?.domain || '').slice(0, 200),
|
||
date: String(raw.seendate || ''),
|
||
image: isValidUrl(raw.socialimage || '') ? raw.socialimage : '',
|
||
language: String(raw.language || ''),
|
||
tone: typeof raw.tone === 'number' ? raw.tone : 0,
|
||
};
|
||
}
|
||
|
||
async function fetchTopicArticles(topic) {
|
||
const url = new URL(GDELT_DOC_API);
|
||
url.searchParams.set('query', topic.query);
|
||
url.searchParams.set('mode', 'artlist');
|
||
url.searchParams.set('maxrecords', '10');
|
||
url.searchParams.set('format', 'json');
|
||
url.searchParams.set('sort', 'date');
|
||
url.searchParams.set('timespan', '24h');
|
||
|
||
const resp = await fetch(url.toString(), {
|
||
headers: { 'User-Agent': CHROME_UA },
|
||
signal: AbortSignal.timeout(15_000),
|
||
});
|
||
|
||
if (!resp.ok) throw new Error(`GDELT ${topic.id}: HTTP ${resp.status}`);
|
||
|
||
const data = await resp.json();
|
||
const articles = (data.articles || [])
|
||
.map(normalizeArticle)
|
||
.filter(Boolean);
|
||
|
||
return {
|
||
id: topic.id,
|
||
articles,
|
||
fetchedAt: new Date().toISOString(),
|
||
};
|
||
}
|
||
|
||
function normalizeTimeline(data, mode) {
|
||
const raw = data?.timeline ?? data?.data ?? [];
|
||
return raw.map((pt) => ({
|
||
date: String(pt.date || pt.datetime || ''),
|
||
value: typeof pt.value === 'number' ? pt.value : (typeof pt[mode] === 'number' ? pt[mode] : 0),
|
||
})).filter((pt) => pt.date);
|
||
}
|
||
|
||
async function fetchTopicTimeline(topic, mode) {
|
||
const url = new URL(GDELT_DOC_API);
|
||
url.searchParams.set('query', topic.query);
|
||
url.searchParams.set('mode', mode);
|
||
url.searchParams.set('format', 'json');
|
||
url.searchParams.set('timespan', '14d');
|
||
|
||
try {
|
||
const resp = await fetch(url.toString(), {
|
||
headers: { 'User-Agent': CHROME_UA },
|
||
signal: AbortSignal.timeout(15_000),
|
||
});
|
||
if (!resp.ok) return [];
|
||
const data = await resp.json();
|
||
return normalizeTimeline(data, mode === 'TimelineTone' ? 'tone' : 'value');
|
||
} catch {
|
||
return [];
|
||
}
|
||
}
|
||
|
||
async function fetchWithRetry(topic, maxRetries = 3) {
|
||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||
try {
|
||
return await fetchTopicArticles(topic);
|
||
} catch (err) {
|
||
const is429 = err.message?.includes('429');
|
||
if (!is429 || attempt === maxRetries) {
|
||
console.warn(` ${topic.id}: giving up after ${attempt + 1} attempts (${err.message})`);
|
||
// exhausted:true only when 429 was the reason — post-exhaust cooldown is only relevant for rate-limit windows
|
||
return { id: topic.id, articles: [], fetchedAt: new Date().toISOString(), exhausted: is429 };
|
||
}
|
||
// Exponential backoff: 60s, 120s, 240s — GDELT rate limit windows exceed 50s
|
||
const backoff = 60_000 * Math.pow(2, attempt);
|
||
console.log(` 429 rate-limited, waiting ${backoff / 1000}s... (attempt ${attempt + 1}/${maxRetries + 1})`);
|
||
await sleep(backoff);
|
||
}
|
||
}
|
||
}
|
||
|
||
async function fetchAllTopics() {
|
||
const topics = [];
|
||
for (let i = 0; i < INTEL_TOPICS.length; i++) {
|
||
if (i > 0) await sleep(INTER_TOPIC_DELAY_MS);
|
||
console.log(` Fetching ${INTEL_TOPICS[i].id}...`);
|
||
const result = await fetchWithRetry(INTEL_TOPICS[i]);
|
||
console.log(` ${result.articles.length} articles`);
|
||
// Fetch tone/vol timelines in parallel — best-effort, 429s silently return []
|
||
const [tone, vol] = await Promise.all([
|
||
fetchTopicTimeline(INTEL_TOPICS[i], 'TimelineTone'),
|
||
fetchTopicTimeline(INTEL_TOPICS[i], 'TimelineVol'),
|
||
]);
|
||
result._tone = tone;
|
||
result._vol = vol;
|
||
console.log(` timeline: ${tone.length} tone pts, ${vol.length} vol pts`);
|
||
topics.push(result);
|
||
// After a topic exhausts all retries, give GDELT a longer cooldown before hitting
|
||
// it again with the next topic — the rate limit window for popular queries exceeds 50s
|
||
if (result.exhausted && i < INTEL_TOPICS.length - 1) {
|
||
console.log(` Rate-limit cooldown: waiting ${POST_EXHAUST_DELAY_MS / 1000}s before next topic...`);
|
||
await sleep(POST_EXHAUST_DELAY_MS);
|
||
}
|
||
}
|
||
|
||
// For topics that returned 0 articles (rate-limited), preserve the previous
|
||
// snapshot's articles rather than publishing empty results over good cached data.
|
||
const emptyTopics = topics.filter((t) => t.articles.length === 0);
|
||
if (emptyTopics.length > 0) {
|
||
const previous = await verifySeedKey(CANONICAL_KEY).catch(() => null);
|
||
if (previous && Array.isArray(previous.topics)) {
|
||
const prevMap = new Map(previous.topics.map((t) => [t.id, t]));
|
||
for (const topic of topics) {
|
||
if (topic.articles.length === 0 && prevMap.has(topic.id)) {
|
||
const prev = prevMap.get(topic.id);
|
||
if (prev.articles?.length > 0) {
|
||
console.log(` ${topic.id}: rate-limited — using ${prev.articles.length} cached articles from previous snapshot`);
|
||
topic.articles = prev.articles;
|
||
topic.fetchedAt = prev.fetchedAt;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return { topics, fetchedAt: new Date().toISOString() };
|
||
}
|
||
|
||
function validate(data) {
|
||
if (!Array.isArray(data?.topics) || data.topics.length === 0) return false;
|
||
const populated = data.topics.filter((t) => Array.isArray(t.articles) && t.articles.length > 0);
|
||
return populated.length >= 3; // at least 3 of 6 topics must have articles; partial 429s handled by per-topic merge above
|
||
}
|
||
|
||
// Strip private fields (_tone, _vol, exhausted) before writing to the canonical Redis key.
|
||
function publishTransform(data) {
|
||
return {
|
||
...data,
|
||
topics: (data.topics ?? []).map(({ _tone: _t, _vol: _v, exhausted: _e, ...rest }) => rest),
|
||
};
|
||
}
|
||
|
||
// Write per-topic tone/vol timeline keys (1h TTL) — separate from the 24h canonical key.
|
||
async function afterPublish(data, _meta) {
|
||
for (const topic of data.topics ?? []) {
|
||
const fetchedAt = topic.fetchedAt ?? data.fetchedAt;
|
||
if (Array.isArray(topic._tone) && topic._tone.length > 0) {
|
||
await writeExtraKey(`gdelt:intel:tone:${topic.id}`, { data: topic._tone, fetchedAt }, TIMELINE_TTL);
|
||
}
|
||
if (Array.isArray(topic._vol) && topic._vol.length > 0) {
|
||
await writeExtraKey(`gdelt:intel:vol:${topic.id}`, { data: topic._vol, fetchedAt }, TIMELINE_TTL);
|
||
}
|
||
}
|
||
}
|
||
|
||
if (process.argv[1]?.endsWith('seed-gdelt-intel.mjs')) {
|
||
runSeed('intelligence', 'gdelt-intel', CANONICAL_KEY, fetchAllTopics, {
|
||
validateFn: validate,
|
||
ttlSeconds: CACHE_TTL,
|
||
sourceVersion: 'gdelt-doc-v2',
|
||
publishTransform,
|
||
afterPublish,
|
||
}).catch((err) => {
|
||
const _cause = err.cause ? ` (cause: ${err.cause.message || err.cause.code || err.cause})` : '';
|
||
console.error('FATAL:', (err.message || err) + _cause);
|
||
process.exit(0);
|
||
});
|
||
}
|