Files
worldmonitor/scripts/seed-digest-notifications.mjs
Elie Habib b490dfe2bc fix(digest): strict > on secondary Jaccard to close boundary-exact oil-price case
Reviewer's third P1: shorter 'Oil prices rise on Iran nuclear talks optimism' variant lands EXACTLY on Jaccard=0.25 (3 shared in 12-union) and scraped through the secondary merge via inclusive >= comparison.

Fix: strict > instead of >= on SECONDARY_MERGE_MIN_JACCARD at both call sites. Keeps the named constant at 0.25 (any bump would break legitimate close-miss Hormuz merges at J=0.267).

Trade-off: the Hormuz bridge-headline 'tanker reports attack as Iran closes strait of Hormuz; French soldier killed in Lebanon' also lands at J=0.25 against the 4-story Hormuz cluster, so it now stays its own cluster. That's 6→3 reduction vs the original 6→1 goal, but the cluster algorithm structurally cannot distinguish this bridge case from the oil-price reaction via bag-of-words alone — admitting one means admitting the other. Chose to block the false positive. Hormuz test relaxed from ≥5/6 to ≥4/6 with detailed comment explaining both outliers.

Tests: 24/24 -> 25/25. Added explicit reviewer reproducer (short oil-price variant at J=0.25). Full test:data 5773/5773. Typecheck + lint clean.
2026-04-19 09:55:02 +04:00

1544 lines
65 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env node
/**
* Digest notification cron — Railway scheduled job, runs every 30 minutes.
*
* For each enabled alert rule with digestMode != "realtime":
* 1. Checks isDue() against digest:last-sent:v1:${userId}:${variant}
* 2. ZRANGEBYSCORE digest:accumulator:v1:${variant} to get stories in window
* 3. Batch HGETALL story:track:v1:${hash} for metadata
* 4. Derives phase, filters fading/non-matching severity, sorts by currentScore
* 5. SMEMBERS story:sources:v1:${hash} for source attribution
* 6. Formats and dispatches to each configured channel
* 7. Updates digest:last-sent:v1:${userId}:${variant}
*/
import { createRequire } from 'node:module';
import { createHash } from 'node:crypto';
import dns from 'node:dns/promises';
import {
escapeHtml,
escapeTelegramHtml,
escapeSlackMrkdwn,
markdownToEmailHtml,
markdownToTelegramHtml,
markdownToSlackMrkdwn,
markdownToDiscord,
} from './_digest-markdown.mjs';
const require = createRequire(import.meta.url);
const { decrypt } = require('./lib/crypto.cjs');
const { callLLM } = require('./lib/llm-chain.cjs');
const { fetchUserPreferences, extractUserContext, formatUserProfile } = require('./lib/user-context.cjs');
const { Resend } = require('resend');
import { readRawJsonFromUpstash, redisPipeline } from '../api/_upstash-json.js';
import {
composeBriefFromDigestStories,
extractInsights,
groupEligibleRulesByUser,
shouldExitNonZero as shouldExitOnBriefFailures,
} from './lib/brief-compose.mjs';
import { enrichBriefEnvelopeWithLLM } from './lib/brief-llm.mjs';
import { assertBriefEnvelope } from '../server/_shared/brief-render.js';
import { signBriefUrl, BriefUrlError } from './lib/brief-url-sign.mjs';
// ── Config ────────────────────────────────────────────────────────────────────
const UPSTASH_URL = process.env.UPSTASH_REDIS_REST_URL ?? '';
const UPSTASH_TOKEN = process.env.UPSTASH_REDIS_REST_TOKEN ?? '';
const CONVEX_SITE_URL =
process.env.CONVEX_SITE_URL ??
(process.env.CONVEX_URL ?? '').replace('.convex.cloud', '.convex.site');
const RELAY_SECRET = process.env.RELAY_SHARED_SECRET ?? '';
const TELEGRAM_BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN ?? '';
const RESEND_API_KEY = process.env.RESEND_API_KEY ?? '';
const RESEND_FROM = process.env.RESEND_FROM_EMAIL ?? 'WorldMonitor <alerts@worldmonitor.app>';
if (process.env.DIGEST_CRON_ENABLED === '0') {
console.log('[digest] DIGEST_CRON_ENABLED=0 — skipping run');
process.exit(0);
}
if (!UPSTASH_URL || !UPSTASH_TOKEN) {
console.error('[digest] UPSTASH_REDIS_REST_URL / UPSTASH_REDIS_REST_TOKEN not set');
process.exit(1);
}
if (!CONVEX_SITE_URL || !RELAY_SECRET) {
console.error('[digest] CONVEX_SITE_URL / RELAY_SHARED_SECRET not set');
process.exit(1);
}
const resend = RESEND_API_KEY ? new Resend(RESEND_API_KEY) : null;
const DIGEST_MAX_ITEMS = 30;
const DIGEST_LOOKBACK_MS = 24 * 60 * 60 * 1000; // 24h default lookback on first send
const DIGEST_CRITICAL_LIMIT = Infinity;
const DIGEST_HIGH_LIMIT = 15;
const DIGEST_MEDIUM_LIMIT = 10;
const AI_SUMMARY_CACHE_TTL = 3600; // 1h
const AI_DIGEST_ENABLED = process.env.AI_DIGEST_ENABLED !== '0';
const ENTITLEMENT_CACHE_TTL = 900; // 15 min
// ── Brief composer (consolidation of the retired seed-brief-composer) ──────
const BRIEF_URL_SIGNING_SECRET = process.env.BRIEF_URL_SIGNING_SECRET ?? '';
const WORLDMONITOR_PUBLIC_BASE_URL =
process.env.WORLDMONITOR_PUBLIC_BASE_URL ?? 'https://worldmonitor.app';
const BRIEF_TTL_SECONDS = 7 * 24 * 60 * 60; // 7 days
// The brief is a once-per-day editorial snapshot. 24h is the natural
// window regardless of a user's email cadence (daily / twice_daily /
// weekly) — weekly subscribers still expect a fresh brief each day
// in the dashboard panel. Matches DIGEST_LOOKBACK_MS so first-send
// users see identical story pools in brief and email.
const BRIEF_STORY_WINDOW_MS = 24 * 60 * 60 * 1000;
const INSIGHTS_KEY = 'news:insights:v1';
// Operator kill switch — used to intentionally silence brief compose
// without surfacing a Railway red flag. Distinguished from "secret
// missing in a production rollout" which IS worth flagging.
const BRIEF_COMPOSE_DISABLED_BY_OPERATOR = process.env.BRIEF_COMPOSE_ENABLED === '0';
const BRIEF_COMPOSE_ENABLED =
!BRIEF_COMPOSE_DISABLED_BY_OPERATOR && BRIEF_URL_SIGNING_SECRET !== '';
const BRIEF_SIGNING_SECRET_MISSING =
!BRIEF_COMPOSE_DISABLED_BY_OPERATOR && BRIEF_URL_SIGNING_SECRET === '';
// Phase 3b LLM enrichment. Kept separate from AI_DIGEST_ENABLED so
// the email-digest AI summary and the brief editorial prose can be
// toggled independently (e.g. kill the brief LLM without silencing
// the email's AI summary during a provider outage).
const BRIEF_LLM_ENABLED = process.env.BRIEF_LLM_ENABLED !== '0';
// Dependencies injected into brief-llm.mjs. Defined near the top so
// the upstashRest helper below is in scope when this closure runs
// inside composeAndStoreBriefForUser().
const briefLlmDeps = {
callLLM,
async cacheGet(key) {
const raw = await upstashRest('GET', key);
if (typeof raw !== 'string' || raw.length === 0) return null;
try { return JSON.parse(raw); } catch { return null; }
},
async cacheSet(key, value, ttlSec) {
await upstashRest('SETEX', key, String(ttlSec), JSON.stringify(value));
},
};
// ── Redis helpers ──────────────────────────────────────────────────────────────
async function upstashRest(...args) {
const res = await fetch(`${UPSTASH_URL}/${args.map(encodeURIComponent).join('/')}`, {
method: 'POST',
headers: {
Authorization: `Bearer ${UPSTASH_TOKEN}`,
'User-Agent': 'worldmonitor-digest/1.0',
},
signal: AbortSignal.timeout(10000),
});
if (!res.ok) {
console.warn(`[digest] Upstash error ${res.status} for command ${args[0]}`);
return null;
}
const json = await res.json();
return json.result;
}
async function upstashPipeline(commands) {
if (commands.length === 0) return [];
const res = await fetch(`${UPSTASH_URL}/pipeline`, {
method: 'POST',
headers: {
Authorization: `Bearer ${UPSTASH_TOKEN}`,
'Content-Type': 'application/json',
'User-Agent': 'worldmonitor-digest/1.0',
},
body: JSON.stringify(commands),
signal: AbortSignal.timeout(15000),
});
if (!res.ok) {
console.warn(`[digest] pipeline error ${res.status}`);
return [];
}
return res.json();
}
// ── Schedule helpers ──────────────────────────────────────────────────────────
function toLocalHour(nowMs, timezone) {
try {
const fmt = new Intl.DateTimeFormat('en-US', {
timeZone: timezone,
hour: 'numeric',
hour12: false,
});
const parts = fmt.formatToParts(new Date(nowMs));
const hourPart = parts.find((p) => p.type === 'hour');
return hourPart ? parseInt(hourPart.value, 10) : -1;
} catch {
return -1;
}
}
function isDue(rule, lastSentAt) {
const nowMs = Date.now();
const tz = rule.digestTimezone ?? 'UTC';
const primaryHour = rule.digestHour ?? 8;
const localHour = toLocalHour(nowMs, tz);
const hourMatches = rule.digestMode === 'twice_daily'
? localHour === primaryHour || localHour === (primaryHour + 12) % 24
: localHour === primaryHour;
if (!hourMatches) return false;
if (lastSentAt === null) return true;
const minIntervalMs =
rule.digestMode === 'daily' ? 23 * 3600000
: rule.digestMode === 'twice_daily' ? 11 * 3600000
: rule.digestMode === 'weekly' ? 6.5 * 24 * 3600000
: 0;
return (nowMs - lastSentAt) >= minIntervalMs;
}
// ── Story helpers ─────────────────────────────────────────────────────────────
function flatArrayToObject(flat) {
const obj = {};
for (let i = 0; i + 1 < flat.length; i += 2) {
obj[flat[i]] = flat[i + 1];
}
return obj;
}
function derivePhase(track) {
const mentionCount = parseInt(track.mentionCount ?? '1', 10);
const firstSeen = parseInt(track.firstSeen ?? '0', 10);
const lastSeen = parseInt(track.lastSeen ?? String(Date.now()), 10);
const now = Date.now();
const ageH = (now - firstSeen) / 3600000;
const silenceH = (now - lastSeen) / 3600000;
if (silenceH > 24) return 'fading';
if (mentionCount >= 3 && ageH >= 12) return 'sustained';
if (mentionCount >= 2) return 'developing';
if (ageH < 2) return 'breaking';
return 'unknown';
}
function matchesSensitivity(ruleSensitivity, severity) {
if (ruleSensitivity === 'all') return true;
if (ruleSensitivity === 'high') return severity === 'high' || severity === 'critical';
return severity === 'critical';
}
// ── Fuzzy deduplication ──────────────────────────────────────────────────────
const STOP_WORDS = new Set([
'the','a','an','in','on','at','to','for','of','is','are','was','were',
'has','have','had','be','been','by','from','with','as','it','its',
'says','say','said','according','reports','report','officials','official',
'us','new','will','can','could','would','may','also','who','that','this',
'after','about','over','more','up','out','into','than','some','other',
// News framing / liveblog boilerplate — these appear in a huge
// fraction of wire headlines and were causing the Jaccard signal
// to drown in framing noise rather than converge on event words.
// Added after the 2026-04-19 Hormuz-cluster incident (6 stories
// about the same Strait closure ended up as 6 separate brief
// pages because shared headline vocabulary was dominated by
// "middle east crisis live" framing rather than the actual event
// nouns).
'live','crisis','update','updates','breaking','today','yesterday','latest',
'middle','east','west','north','south','news','briefing','watch',
'amid','and','if','but','or','so','when','while','still','now','then',
]);
function stripSourceSuffix(title) {
return title
.replace(/\s*[-–—]\s*[\w\s.]+\.(?:com|org|net|co\.uk)\s*$/i, '')
.replace(/\s*[-–—]\s*(?:Reuters|AP News|BBC|CNN|Al Jazeera|France 24|DW News|PBS NewsHour|CBS News|NBC|ABC|Associated Press|The Guardian|NOS Nieuws|Tagesschau|CNBC|The National)\s*$/i, '');
}
function extractTitleWords(title) {
return new Set(
stripSourceSuffix(title)
.toLowerCase()
.replace(/[^\p{L}\p{N}\s]/gu, '')
.split(/\s+/)
.filter(w => w.length > 2 && !STOP_WORDS.has(w)),
);
}
function jaccardSimilarity(setA, setB) {
if (setA.size === 0 || setB.size === 0) return 0;
let intersection = 0;
for (const w of setA) if (setB.has(w)) intersection++;
return intersection / (setA.size + setB.size - intersection);
}
// Jaccard threshold for the PRIMARY merge signal. 0.35 catches wire
// duplicates that phrase the same event slightly differently
// ("closed" vs "closes", "strait of hormuz" vs "strait hormuz") but
// rejects unrelated stories sharing a single generic word.
//
// NOTE for future tuners: in practice this threshold is ONLY reached
// by singleton-to-singleton comparisons and very-close pairs joining
// a small cluster. Once a cluster absorbs 3+ stories its word UNION
// expands to 1530+ terms; a new 7-word candidate sharing 2 words
// gets Jaccard ≈ 2/(7+25-2) = 0.067 — nowhere near 0.35 regardless of
// how related it is. Multi-story cluster merges therefore flow
// through the SECONDARY rule below. That's intentional: the
// secondary rule checks distinctive overlap against cluster.CORE
// (intersection of all items), which narrows sharply as the cluster
// grows and is the right signal for "does this new story belong to
// the topic this cluster has converged on?". If you're tempted to
// raise JACCARD_MERGE_THRESHOLD because "it seems loose", remember
// it barely affects established clusters — tune the secondary
// floor (SECONDARY_MERGE_MIN_JACCARD) and
// CLUSTER_JOIN_MIN_DISTINCTIVE_SHARED instead.
const JACCARD_MERGE_THRESHOLD = 0.35;
// Secondary merge signal: catches close-Jaccard-miss candidates that
// nonetheless share strong distinctive-entity overlap with a
// cluster's core (intersection of all items). Threshold values below
// are the triple gate applied in deduplicateStories:
// - Jaccard ≥ SECONDARY_MERGE_MIN_JACCARD (defined further down)
// - countDistinctiveShared(story, cluster.core) ≥ MIN_DISTINCTIVE_SHARED
// - countShared(story, cluster.words) ≥ MIN_SHARED_WORDS
//
// "Distinctive" is a length ≥ DISTINCTIVE_LEN proxy for named
// entities. It's imperfect — generic event vocabulary like
// "attack"/"missile"/"talks" also clears the bar — which is why the
// Jaccard floor is required as a second independent signal. Two
// unrelated events sharing only "attack"+"missile" would hit the
// distinctive count but their surrounding vocabulary diverges enough
// to push Jaccard below the floor.
const CLUSTER_JOIN_MIN_SHARED_WORDS = 2;
const CLUSTER_JOIN_MIN_DISTINCTIVE_SHARED = 2;
const CLUSTER_JOIN_DISTINCTIVE_LEN = 5;
// Jaccard floor for the SECONDARY merge rule (Jaccard in this range
// still merges if distinctive + total shared meet thresholds). Below
// this, no amount of entity overlap is enough — the surrounding
// vocabulary has diverged too far for us to claim the stories cover
// the same event. Catches the classic false positives where two
// different events share two named entities but nothing else:
// - Two Lebanon incidents both mentioning "French" + "Lebanon"
// - Two Russia events both mentioning "Russia" + "attack"
// - Iran-talks vs oil-price-reaction both mentioning "Iran" +
// "nuclear" + "talks"
// All of those have Jaccard between 0.14 and 0.24 — below 0.25
// they won't merge regardless of distinctive-word overlap.
//
// IMPORTANT: the secondary rule compares with STRICT inequality
// (jaccard > SECONDARY_MERGE_MIN_JACCARD), not ≥. A short
// downstream-reaction headline like "Oil prices rise on Iran
// nuclear talks optimism" vs an Iran-talks cluster can land
// EXACTLY on 0.25 (3 shared words in a 12-word union), and with
// ≥ it would scrape through merge despite being a different
// event. Strict > excludes that boundary case without moving
// the constant — which would risk breaking legitimate close-
// miss Hormuz merges (P10 joining at J=0.267, P02↔P08 at 0.308).
const SECONDARY_MERGE_MIN_JACCARD = 0.25;
/**
* Count words that appear in BOTH sets where the word is distinctive
* (length ≥ CLUSTER_JOIN_DISTINCTIVE_LEN). Distinctive length is a
* proxy for "named entity / place / event name" rather than a generic
* short content word.
*
* @param {Set<string>} a
* @param {Set<string>} b
*/
function countDistinctiveShared(a, b) {
let count = 0;
for (const w of a) {
if (w.length >= CLUSTER_JOIN_DISTINCTIVE_LEN && b.has(w)) count++;
}
return count;
}
/** Count words that appear in both sets, any length. */
function countShared(a, b) {
let count = 0;
for (const w of a) if (b.has(w)) count++;
return count;
}
/** Intersection of two sets as a new Set. */
function intersectSets(a, b) {
const out = new Set();
for (const w of a) if (b.has(w)) out.add(w);
return out;
}
/**
* Each cluster tracks TWO sets of words:
*
* - `words` = UNION of all items' vocabulary. Used for Jaccard
* because Jaccard's denominator (union) naturally penalises
* cluster pollution — as bridge / mixed headlines get absorbed,
* Jaccard to new unrelated candidates stays low.
*
* - `core` = INTERSECTION of all items' vocabulary (words present
* in every item of the cluster). Used for distinctive-content
* checks. Robust against bridge-headline pollution: a mixed
* headline like "… Hormuz … French soldier killed in Lebanon"
* injects french+lebanon into `words` (union) but they never land
* in `core` because the other items in the Hormuz cluster don't
* mention them. Post-pass and secondary merge therefore use core
* and avoid the transitive false-positive the reviewer flagged on
* PR #3195 (Hormuz cluster absorbing a separate Lebanon cluster
* via bridge-injected vocabulary).
*
* For a 1-item cluster, `core === words` (intersection of a single
* set is the set itself), so the asymmetry only matters once a
* cluster has grown.
*/
function deduplicateStories(stories) {
const clusters = [];
for (const story of stories) {
const words = extractTitleWords(story.title);
let merged = false;
for (const cluster of clusters) {
// Unified merge rule. Jaccard is the honest signal —
// naturally penalises divergence via the union denominator.
// Distinctive-shared is NOT sufficient on its own because
// "distinctive" (length ≥5) is a weak proxy for named
// entities: "attack"/"nuclear"/"missile"/"talks"/"rally" all
// clear the length bar and are generic event vocabulary.
// Proper nouns like "Kyiv"/"Oman" don't even correlate with
// length. So every merge — singleton or established cluster
// — goes through the same gate:
//
// PRIMARY: Jaccard ≥ 0.35 (covers near-duplicates)
// SECONDARY: Jaccard ≥ 0.25 AND distinctive-shared ≥ 2
// AND total-shared ≥ 2 (covers close misses with strong
// entity overlap, e.g. "Iran says closed Strait of
// Hormuz" variants that use mixed vocabulary)
//
// Distinctive-shared is measured against cluster.CORE (the
// intersection of all cluster items' words). Bridge-headline
// pollution in cluster.words (union) can't leak into core
// because core only contains words every item has. Total-
// shared is against union, just to confirm some real overlap
// exists beyond the distinctive entities alone.
const jaccard = jaccardSimilarity(words, cluster.words);
let shouldMerge = jaccard >= JACCARD_MERGE_THRESHOLD;
if (!shouldMerge && jaccard > SECONDARY_MERGE_MIN_JACCARD) {
const distinct = countDistinctiveShared(words, cluster.core);
const total = countShared(words, cluster.words);
shouldMerge =
distinct >= CLUSTER_JOIN_MIN_DISTINCTIVE_SHARED &&
total >= CLUSTER_JOIN_MIN_SHARED_WORDS;
}
if (shouldMerge) {
cluster.items.push(story);
for (const w of words) cluster.words.add(w); // union grows
cluster.core = intersectSets(cluster.core, words); // core narrows
merged = true;
break;
}
}
if (!merged) {
// New cluster: seed core = words. Both are the same set object
// is fine — only reads happen in the inner loop. We copy via
// new Set so subsequent mutations stay independent.
clusters.push({
words: new Set(words),
core: new Set(words),
items: [story],
});
}
}
// Post-pass: catch processing-order misses (two clusters that
// cover the same event but neither seeded in a way that let the
// other join in the first pass). Uses CORE for the distinctive
// check — critical for avoiding bridge-pollution false positives.
// A mixed headline inside the Hormuz cluster adds french+lebanon
// to that cluster's UNION but never to its CORE (since sibling
// Hormuz stories don't mention Lebanon), so a separate Lebanon
// cluster no longer matches here.
for (let i = 0; i < clusters.length; i++) {
for (let j = i + 1; j < clusters.length; ) {
const a = clusters[i];
const b = clusters[j];
// Post-pass applies the SAME unified rule as the initial pass,
// just between two existing clusters: Jaccard on unions, with
// distinctive/total measured against CORE intersections (so
// bridge-injected vocabulary in either cluster's union cannot
// drive a false-positive merge).
const jaccardUnion = jaccardSimilarity(a.words, b.words);
const distinctiveCore = countDistinctiveShared(a.core, b.core);
const totalCore = countShared(a.core, b.core);
const postMerge =
jaccardUnion >= JACCARD_MERGE_THRESHOLD ||
(jaccardUnion > SECONDARY_MERGE_MIN_JACCARD &&
distinctiveCore >= CLUSTER_JOIN_MIN_DISTINCTIVE_SHARED &&
totalCore >= CLUSTER_JOIN_MIN_SHARED_WORDS);
if (postMerge) {
for (const item of b.items) a.items.push(item);
for (const w of b.words) a.words.add(w);
a.core = intersectSets(a.core, b.core);
clusters.splice(j, 1);
} else {
j++;
}
}
}
return clusters.map(({ items }) => {
items.sort((a, b) => b.currentScore - a.currentScore || b.mentionCount - a.mentionCount);
const best = { ...items[0] };
if (items.length > 1) {
best.mentionCount = items.reduce((sum, s) => sum + s.mentionCount, 0);
}
best.mergedHashes = items.map(s => s.hash);
return best;
});
}
// ── Digest content ────────────────────────────────────────────────────────────
async function buildDigest(rule, windowStartMs) {
const variant = rule.variant ?? 'full';
const lang = rule.lang ?? 'en';
const accKey = `digest:accumulator:v1:${variant}:${lang}`;
const hashes = await upstashRest(
'ZRANGEBYSCORE', accKey, String(windowStartMs), String(Date.now()),
);
if (!Array.isArray(hashes) || hashes.length === 0) return null;
const trackResults = await upstashPipeline(
hashes.map((h) => ['HGETALL', `story:track:v1:${h}`]),
);
const stories = [];
for (let i = 0; i < hashes.length; i++) {
const raw = trackResults[i]?.result;
if (!Array.isArray(raw) || raw.length === 0) continue;
const track = flatArrayToObject(raw);
if (!track.title || !track.severity) continue;
const phase = derivePhase(track);
if (phase === 'fading') continue;
if (!matchesSensitivity(rule.sensitivity ?? 'high', track.severity)) continue;
stories.push({
hash: hashes[i],
title: track.title,
link: track.link ?? '',
severity: track.severity,
currentScore: parseInt(track.currentScore ?? '0', 10),
mentionCount: parseInt(track.mentionCount ?? '1', 10),
phase,
sources: [],
});
}
if (stories.length === 0) return null;
stories.sort((a, b) => b.currentScore - a.currentScore);
const deduped = deduplicateStories(stories);
const top = deduped.slice(0, DIGEST_MAX_ITEMS);
const allSourceCmds = [];
const cmdIndex = [];
for (let i = 0; i < top.length; i++) {
const hashes = top[i].mergedHashes ?? [top[i].hash];
for (const h of hashes) {
allSourceCmds.push(['SMEMBERS', `story:sources:v1:${h}`]);
cmdIndex.push(i);
}
}
const sourceResults = await upstashPipeline(allSourceCmds);
for (let i = 0; i < top.length; i++) top[i].sources = [];
for (let j = 0; j < sourceResults.length; j++) {
const arr = sourceResults[j]?.result ?? [];
for (const src of arr) {
if (!top[cmdIndex[j]].sources.includes(src)) top[cmdIndex[j]].sources.push(src);
}
}
return top;
}
function formatDigest(stories, nowMs) {
if (!stories || stories.length === 0) return null;
const dateStr = new Intl.DateTimeFormat('en-US', {
month: 'long', day: 'numeric', year: 'numeric',
}).format(new Date(nowMs));
const lines = [`WorldMonitor Daily Digest — ${dateStr}`, ''];
const buckets = { critical: [], high: [], medium: [] };
for (const s of stories) {
const b = buckets[s.severity] ?? buckets.high;
b.push(s);
}
const SEVERITY_LIMITS = { critical: DIGEST_CRITICAL_LIMIT, high: DIGEST_HIGH_LIMIT, medium: DIGEST_MEDIUM_LIMIT };
for (const [level, items] of Object.entries(buckets)) {
if (items.length === 0) continue;
const limit = SEVERITY_LIMITS[level] ?? DIGEST_MEDIUM_LIMIT;
lines.push(`${level.toUpperCase()} (${items.length} event${items.length !== 1 ? 's' : ''})`);
for (const item of items.slice(0, limit)) {
const src = item.sources.length > 0
? ` [${item.sources.slice(0, 3).join(', ')}${item.sources.length > 3 ? ` +${item.sources.length - 3}` : ''}]`
: '';
lines.push(` \u2022 ${stripSourceSuffix(item.title)}${src}`);
}
if (items.length > limit) lines.push(` ... and ${items.length - limit} more`);
lines.push('');
}
lines.push('View full dashboard \u2192 worldmonitor.app');
return lines.join('\n');
}
function formatDigestHtml(stories, nowMs) {
if (!stories || stories.length === 0) return null;
const dateStr = new Intl.DateTimeFormat('en-US', {
month: 'long', day: 'numeric', year: 'numeric',
}).format(new Date(nowMs));
const buckets = { critical: [], high: [], medium: [] };
for (const s of stories) {
const b = buckets[s.severity] ?? buckets.high;
b.push(s);
}
const totalCount = stories.length;
const criticalCount = buckets.critical.length;
const highCount = buckets.high.length;
const SEVERITY_BORDER = { critical: '#ef4444', high: '#f97316', medium: '#eab308' };
const PHASE_COLOR = { breaking: '#ef4444', developing: '#f97316', sustained: '#60a5fa', fading: '#555' };
function storyCard(s) {
const borderColor = SEVERITY_BORDER[s.severity] ?? '#4ade80';
const phaseColor = PHASE_COLOR[s.phase] ?? '#888';
const phaseCap = s.phase ? s.phase.charAt(0).toUpperCase() + s.phase.slice(1) : '';
const srcText = s.sources.length > 0
? s.sources.slice(0, 3).join(', ') + (s.sources.length > 3 ? ` +${s.sources.length - 3}` : '')
: '';
const cleanTitle = stripSourceSuffix(s.title);
const titleEl = s.link
? `<a href="${escapeHtml(s.link)}" style="color: #e0e0e0; text-decoration: none; font-size: 14px; font-weight: 600; line-height: 1.4;">${escapeHtml(cleanTitle)}</a>`
: `<span style="color: #e0e0e0; font-size: 14px; font-weight: 600; line-height: 1.4;">${escapeHtml(cleanTitle)}</span>`;
const meta = [
phaseCap ? `<span style="font-size: 10px; color: ${phaseColor}; text-transform: uppercase; letter-spacing: 1px; font-weight: 700;">${phaseCap}</span>` : '',
srcText ? `<span style="font-size: 11px; color: #555;">${escapeHtml(srcText)}</span>` : '',
].filter(Boolean).join('<span style="color: #333; margin: 0 6px;">&bull;</span>');
return `<div style="background: #111; border: 1px solid #1a1a1a; border-left: 3px solid ${borderColor}; padding: 12px 16px; margin-bottom: 8px;">${titleEl}${meta ? `<div style="margin-top: 6px;">${meta}</div>` : ''}</div>`;
}
const SEVERITY_LIMITS = { critical: DIGEST_CRITICAL_LIMIT, high: DIGEST_HIGH_LIMIT, medium: DIGEST_MEDIUM_LIMIT };
function sectionHtml(severity, items) {
if (items.length === 0) return '';
const limit = SEVERITY_LIMITS[severity] ?? DIGEST_MEDIUM_LIMIT;
const SEVERITY_LABEL = { critical: '&#128308; Critical', high: '&#128992; High', medium: '&#128993; Medium' };
const label = SEVERITY_LABEL[severity] ?? severity.toUpperCase();
const cards = items.slice(0, limit).map(storyCard).join('');
const overflow = items.length > limit
? `<p style="font-size: 12px; color: #555; margin: 4px 0 16px; padding-left: 4px;">... and ${items.length - limit} more</p>`
: '';
return `<div style="margin-bottom: 24px;"><div style="font-size: 11px; font-weight: 700; color: #888; text-transform: uppercase; letter-spacing: 2px; margin-bottom: 10px;">${label} (${items.length})</div>${cards}${overflow}</div>`;
}
const sectionsHtml = ['critical', 'high', 'medium']
.map((sev) => sectionHtml(sev, buckets[sev]))
.join('');
return `<div style="font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; background: #111; color: #e0e0e0;">
<div style="max-width: 680px; margin: 0 auto;">
<div style="background: #4ade80; height: 3px;"></div>
<div style="background: #0d0d0d; padding: 32px 36px 0;">
<table cellpadding="0" cellspacing="0" border="0" width="100%" style="margin-bottom: 28px;">
<tr>
<td style="vertical-align: middle;">
<table cellpadding="0" cellspacing="0" border="0">
<tr>
<td style="width: 36px; height: 36px; vertical-align: middle;">
<img src="https://www.worldmonitor.app/favico/android-chrome-192x192.png" width="36" height="36" alt="WorldMonitor" style="border-radius: 50%; display: block;" />
</td>
<td style="padding-left: 10px;">
<div style="font-size: 15px; font-weight: 800; color: #fff; letter-spacing: -0.3px;">WORLD MONITOR</div>
</td>
</tr>
</table>
</td>
<td style="text-align: right; vertical-align: middle;">
<span style="font-size: 11px; color: #555; text-transform: uppercase; letter-spacing: 1px;">${dateStr}</span>
</td>
</tr>
</table>
<div data-ai-summary-slot></div>
<div data-brief-cta-slot></div>
<table cellpadding="0" cellspacing="0" border="0" width="100%" style="margin-bottom: 24px;">
<tr>
<td style="text-align: center; padding: 14px 8px; width: 33%; background: #161616; border: 1px solid #222;">
<div style="font-size: 24px; font-weight: 800; color: #4ade80;">${totalCount}</div>
<div style="font-size: 9px; color: #666; text-transform: uppercase; letter-spacing: 1.5px; margin-top: 2px;">Events</div>
</td>
<td style="width: 1px;"></td>
<td style="text-align: center; padding: 14px 8px; width: 33%; background: #161616; border: 1px solid #222;">
<div style="font-size: 24px; font-weight: 800; color: #ef4444;">${criticalCount}</div>
<div style="font-size: 9px; color: #666; text-transform: uppercase; letter-spacing: 1.5px; margin-top: 2px;">Critical</div>
</td>
<td style="width: 1px;"></td>
<td style="text-align: center; padding: 14px 8px; width: 33%; background: #161616; border: 1px solid #222;">
<div style="font-size: 24px; font-weight: 800; color: #f97316;">${highCount}</div>
<div style="font-size: 9px; color: #666; text-transform: uppercase; letter-spacing: 1.5px; margin-top: 2px;">High</div>
</td>
</tr>
</table>
${sectionsHtml}
<div style="text-align: center; padding: 12px 0 36px;">
<a href="https://worldmonitor.app" style="display: inline-block; background: #4ade80; color: #0a0a0a; padding: 12px 32px; text-decoration: none; font-weight: 700; font-size: 12px; text-transform: uppercase; letter-spacing: 1.5px; border-radius: 3px;">Open Dashboard</a>
</div>
</div>
<div style="background: #0a0a0a; border-top: 1px solid #1a1a1a; padding: 20px 36px; text-align: center;">
<div style="margin-bottom: 12px;">
<a href="https://x.com/worldmonitorapp" style="color: #555; text-decoration: none; font-size: 11px; margin: 0 10px;">X / Twitter</a>
<a href="https://github.com/koala73/worldmonitor" style="color: #555; text-decoration: none; font-size: 11px; margin: 0 10px;">GitHub</a>
<a href="https://discord.gg/re63kWKxaz" style="color: #555; text-decoration: none; font-size: 11px; margin: 0 10px;">Discord</a>
</div>
<p style="font-size: 10px; color: #444; margin: 0; line-height: 1.5;">
<a href="https://worldmonitor.app" style="color: #4ade80; text-decoration: none;">worldmonitor.app</a>
</p>
</div>
</div>
</div>`;
}
// ── AI summary generation ────────────────────────────────────────────────────
function hashShort(str) {
return createHash('sha256').update(str).digest('hex').slice(0, 16);
}
async function generateAISummary(stories, rule) {
if (!AI_DIGEST_ENABLED) return null;
if (!stories || stories.length === 0) return null;
// rule.aiDigestEnabled (from alertRules) is the user's explicit opt-in for
// AI summaries. userPreferences is a SEPARATE table (SPA app settings blob:
// watchlist, airports, panels). A user can have alertRules without having
// ever saved userPreferences — or under a different variant. Missing prefs
// must NOT silently disable the feature the user just enabled; degrade to
// a non-personalized summary instead.
// error: true = transient fetch failure (network, non-OK HTTP, env missing)
// error: false = the (userId, variant) row genuinely does not exist
// Both cases degrade to a non-personalized summary, but log them distinctly
// so transient fetch failures are visible in observability.
const { data: prefs, error: prefsFetchError } = await fetchUserPreferences(rule.userId, rule.variant ?? 'full');
if (!prefs) {
console.log(
prefsFetchError
? `[digest] Prefs fetch failed for ${rule.userId} — generating non-personalized AI summary`
: `[digest] No stored preferences for ${rule.userId} — generating non-personalized AI summary`,
);
}
const ctx = extractUserContext(prefs);
const profile = formatUserProfile(ctx, rule.variant ?? 'full');
const variant = rule.variant ?? 'full';
const tz = rule.digestTimezone ?? 'UTC';
const localHour = toLocalHour(Date.now(), tz);
if (localHour === -1) console.warn(`[digest] Bad timezone "${tz}" for ${rule.userId} — defaulting to evening greeting`);
const greeting = localHour >= 5 && localHour < 12 ? 'Good morning'
: localHour >= 12 && localHour < 17 ? 'Good afternoon'
: 'Good evening';
const storiesHash = hashShort(stories.map(s =>
`${s.titleHash ?? s.title}:${s.severity ?? ''}:${s.phase ?? ''}:${(s.sources ?? []).slice(0, 3).join(',')}`
).sort().join('|'));
const ctxHash = hashShort(JSON.stringify(ctx));
const cacheKey = `digest:ai-summary:v1:${variant}:${greeting}:${storiesHash}:${ctxHash}`;
try {
const cached = await upstashRest('GET', cacheKey);
if (cached) {
console.log(`[digest] AI summary cache hit for ${rule.userId}`);
return cached;
}
} catch { /* miss */ }
const dateStr = new Date().toISOString().split('T')[0];
const storyList = stories.slice(0, 20).map((s, i) => {
const phase = s.phase ? ` [${s.phase}]` : '';
const src = s.sources?.length > 0 ? ` (${s.sources.slice(0, 2).join(', ')})` : '';
return `${i + 1}. [${(s.severity ?? 'high').toUpperCase()}]${phase} ${s.title}${src}`;
}).join('\n');
const systemPrompt = `You are WorldMonitor's intelligence analyst. Today is ${dateStr} UTC.
Write a personalized daily brief for a user focused on ${rule.variant ?? 'full'} intelligence.
The user's local time greeting is "${greeting}" — use this exact greeting to open the brief.
User profile:
${profile}
Rules:
- Open with "${greeting}." followed by the brief
- Lead with the single most impactful development for this user
- Connect events to watched assets/regions where relevant
- 3-5 bullet points, 1-2 sentences each
- Flag anything directly affecting watched assets
- Separate facts from assessment
- End with "Signals to watch:" (1-2 items)
- Under 250 words`;
const summary = await callLLM(systemPrompt, storyList, { maxTokens: 600, temperature: 0.3, timeoutMs: 15_000, skipProviders: ['groq'] });
if (!summary) {
console.warn(`[digest] AI summary generation failed for ${rule.userId}`);
return null;
}
try {
await upstashRest('SET', cacheKey, summary, 'EX', String(AI_SUMMARY_CACHE_TTL));
} catch { /* best-effort cache write */ }
console.log(`[digest] AI summary generated for ${rule.userId} (${summary.length} chars)`);
return summary;
}
// ── Channel deactivation ──────────────────────────────────────────────────────
async function deactivateChannel(userId, channelType) {
try {
const res = await fetch(`${CONVEX_SITE_URL}/relay/deactivate`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${RELAY_SECRET}`,
'User-Agent': 'worldmonitor-digest/1.0',
},
body: JSON.stringify({ userId, channelType }),
signal: AbortSignal.timeout(10000),
});
if (!res.ok) {
console.warn(`[digest] Deactivate failed ${userId}/${channelType}: ${res.status}`);
}
} catch (err) {
console.warn(`[digest] Deactivate request failed for ${userId}/${channelType}:`, err.message);
}
}
function isPrivateIP(ip) {
return /^(10\.|172\.(1[6-9]|2\d|3[01])\.|192\.168\.|127\.|::1|fc|fd)/.test(ip);
}
// ── Send functions ────────────────────────────────────────────────────────────
const TELEGRAM_MAX_LEN = 4096;
function sanitizeTelegramHtml(html) {
let out = html.replace(/<[^>]*$/, '');
for (const tag of ['b', 'i', 'u', 's', 'code', 'pre']) {
const opens = (out.match(new RegExp(`<${tag}>`, 'g')) || []).length;
const closes = (out.match(new RegExp(`</${tag}>`, 'g')) || []).length;
for (let i = closes; i < opens; i++) out += `</${tag}>`;
}
return out;
}
function truncateTelegramHtml(html, limit = TELEGRAM_MAX_LEN) {
if (html.length <= limit) {
const sanitized = sanitizeTelegramHtml(html);
return sanitized.length <= limit ? sanitized : truncateTelegramHtml(sanitized, limit);
}
const truncated = html.slice(0, limit - 30);
const lastNewline = truncated.lastIndexOf('\n');
const cutPoint = lastNewline > limit * 0.6 ? lastNewline : truncated.length;
return sanitizeTelegramHtml(truncated.slice(0, cutPoint) + '\n\n[truncated]');
}
/**
* Phase 8: derive the 3 carousel image URLs from a signed magazine
* URL. The HMAC token binds (userId, issueDate), not the path — so
* the same token verifies against /api/brief/{u}/{d}?t=T AND against
* /api/brief/carousel/{u}/{d}/{0|1|2}?t=T.
*
* Returns null when the magazine URL doesn't match the expected shape
* — caller falls back to text-only delivery.
*/
function carouselUrlsFrom(magazineUrl) {
try {
const u = new URL(magazineUrl);
const m = u.pathname.match(/^\/api\/brief\/([^/]+)\/(\d{4}-\d{2}-\d{2})\/?$/);
if (!m) return null;
const [, userId, issueDate] = m;
const token = u.searchParams.get('t');
if (!token) return null;
return [0, 1, 2].map(
(p) => `${u.origin}/api/brief/carousel/${userId}/${issueDate}/${p}?t=${token}`,
);
} catch {
return null;
}
}
/**
* Send the 3-image brief carousel to a Telegram chat via sendMediaGroup.
* Telegram fetches each URL server-side, so our carousel edge function
* has to be publicly reachable (it is — HMAC is the only credential).
*
* Caption goes on the FIRST image only (Telegram renders one shared
* caption beneath the album). The caller still calls sendTelegram()
* afterward for the long-form text — carousel is the header, text is
* the body.
*/
async function sendTelegramBriefCarousel(userId, chatId, caption, magazineUrl) {
if (!TELEGRAM_BOT_TOKEN) return false;
const urls = carouselUrlsFrom(magazineUrl);
if (!urls) return false;
const media = urls.map((url, i) => ({
type: 'photo',
media: url,
...(i === 0 ? { caption, parse_mode: 'HTML' } : {}),
}));
try {
const res = await fetch(
`https://api.telegram.org/bot${TELEGRAM_BOT_TOKEN}/sendMediaGroup`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json', 'User-Agent': 'worldmonitor-digest/1.0' },
body: JSON.stringify({ chat_id: chatId, media }),
signal: AbortSignal.timeout(20000),
},
);
if (!res.ok) {
const body = await res.text().catch(() => '');
console.warn(`[digest] Telegram carousel ${res.status} for ${userId}: ${body.slice(0, 300)}`);
return false;
}
return true;
} catch (err) {
console.warn(`[digest] Telegram carousel error for ${userId}: ${err.code || err.message}`);
return false;
}
}
async function sendTelegram(userId, chatId, text) {
if (!TELEGRAM_BOT_TOKEN) {
console.warn('[digest] Telegram: TELEGRAM_BOT_TOKEN not set, skipping');
return false;
}
const safeText = truncateTelegramHtml(text);
try {
const res = await fetch(
`https://api.telegram.org/bot${TELEGRAM_BOT_TOKEN}/sendMessage`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json', 'User-Agent': 'worldmonitor-digest/1.0' },
body: JSON.stringify({
chat_id: chatId,
text: safeText,
parse_mode: 'HTML',
disable_web_page_preview: true,
}),
signal: AbortSignal.timeout(10000),
},
);
if (res.status === 403) {
console.warn(`[digest] Telegram 403 for ${userId}, deactivating`);
await deactivateChannel(userId, 'telegram');
return false;
} else if (!res.ok) {
const body = await res.text().catch(() => '');
console.warn(`[digest] Telegram send failed ${res.status} for ${userId}: ${body.slice(0, 300)}`);
return false;
}
console.log(`[digest] Telegram delivered to ${userId}`);
return true;
} catch (err) {
console.warn(`[digest] Telegram send error for ${userId}: ${err.code || err.message}`);
return false;
}
}
const SLACK_RE = /^https:\/\/hooks\.slack\.com\/services\/[A-Z0-9]+\/[A-Z0-9]+\/[a-zA-Z0-9]+$/;
const DISCORD_RE = /^https:\/\/discord\.com\/api(?:\/v\d+)?\/webhooks\/\d+\/[\w-]+\/?$/;
async function sendSlack(userId, webhookEnvelope, text) {
let webhookUrl;
try { webhookUrl = decrypt(webhookEnvelope); } catch (err) {
console.warn(`[digest] Slack decrypt failed for ${userId}:`, err.message); return false;
}
if (!SLACK_RE.test(webhookUrl)) { console.warn(`[digest] Slack URL invalid for ${userId}`); return false; }
try {
const hostname = new URL(webhookUrl).hostname;
const addrs = await dns.resolve4(hostname).catch(() => []);
if (addrs.some(isPrivateIP)) { console.warn(`[digest] Slack SSRF blocked for ${userId}`); return false; }
} catch { return false; }
try {
const res = await fetch(webhookUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'User-Agent': 'worldmonitor-digest/1.0' },
body: JSON.stringify({ text, unfurl_links: false }),
signal: AbortSignal.timeout(10000),
});
if (res.status === 404 || res.status === 410) {
console.warn(`[digest] Slack webhook gone for ${userId}, deactivating`);
await deactivateChannel(userId, 'slack');
return false;
} else if (!res.ok) {
console.warn(`[digest] Slack send failed ${res.status} for ${userId}`);
return false;
}
console.log(`[digest] Slack delivered to ${userId}`);
return true;
} catch (err) {
console.warn(`[digest] Slack send error for ${userId}: ${err.code || err.message}`);
return false;
}
}
async function sendDiscord(userId, webhookEnvelope, text) {
let webhookUrl;
try { webhookUrl = decrypt(webhookEnvelope); } catch (err) {
console.warn(`[digest] Discord decrypt failed for ${userId}:`, err.message); return false;
}
if (!DISCORD_RE.test(webhookUrl)) { console.warn(`[digest] Discord URL invalid for ${userId}`); return false; }
try {
const hostname = new URL(webhookUrl).hostname;
const addrs = await dns.resolve4(hostname).catch(() => []);
if (addrs.some(isPrivateIP)) { console.warn(`[digest] Discord SSRF blocked for ${userId}`); return false; }
} catch { return false; }
const content = text.length > 2000 ? text.slice(0, 1999) + '\u2026' : text;
try {
const res = await fetch(webhookUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'User-Agent': 'worldmonitor-digest/1.0' },
body: JSON.stringify({ content }),
signal: AbortSignal.timeout(10000),
});
if (res.status === 404 || res.status === 410) {
console.warn(`[digest] Discord webhook gone for ${userId}, deactivating`);
await deactivateChannel(userId, 'discord');
return false;
} else if (!res.ok) {
console.warn(`[digest] Discord send failed ${res.status} for ${userId}`);
return false;
}
console.log(`[digest] Discord delivered to ${userId}`);
return true;
} catch (err) {
console.warn(`[digest] Discord send error for ${userId}: ${err.code || err.message}`);
return false;
}
}
async function sendEmail(email, subject, text, html) {
if (!resend) { console.warn('[digest] Email: RESEND_API_KEY not set — skipping'); return false; }
try {
const payload = { from: RESEND_FROM, to: email, subject, text };
if (html) payload.html = html;
await resend.emails.send(payload);
console.log(`[digest] Email delivered to ${email}`);
return true;
} catch (err) {
console.warn('[digest] Resend failed:', err.message);
return false;
}
}
async function sendWebhook(userId, webhookEnvelope, stories, aiSummary) {
let url;
try { url = decrypt(webhookEnvelope); } catch (err) {
console.warn(`[digest] Webhook decrypt failed for ${userId}:`, err.message);
return false;
}
let parsed;
try { parsed = new URL(url); } catch {
console.warn(`[digest] Webhook invalid URL for ${userId}`);
await deactivateChannel(userId, 'webhook');
return false;
}
if (parsed.protocol !== 'https:') {
console.warn(`[digest] Webhook rejected non-HTTPS for ${userId}`);
return false;
}
try {
const addrs = await dns.resolve4(parsed.hostname);
if (addrs.some(isPrivateIP)) { console.warn(`[digest] Webhook SSRF blocked for ${userId}`); return false; }
} catch {
console.warn(`[digest] Webhook DNS resolve failed for ${userId}`);
return false;
}
const payload = JSON.stringify({
version: '1',
eventType: 'digest',
stories: stories.map(s => ({ title: s.title, severity: s.severity, phase: s.phase, sources: s.sources })),
summary: aiSummary ?? null,
storyCount: stories.length,
});
try {
const resp = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'User-Agent': 'worldmonitor-digest/1.0' },
body: payload,
signal: AbortSignal.timeout(10000),
});
if (resp.status === 404 || resp.status === 410 || resp.status === 403) {
console.warn(`[digest] Webhook ${resp.status} for ${userId} — deactivating`);
await deactivateChannel(userId, 'webhook');
return false;
}
if (!resp.ok) { console.warn(`[digest] Webhook ${resp.status} for ${userId}`); return false; }
console.log(`[digest] Webhook delivered for ${userId}`);
return true;
} catch (err) {
console.warn(`[digest] Webhook error for ${userId}:`, err.message);
return false;
}
}
// ── Entitlement check ────────────────────────────────────────────────────────
async function isUserPro(userId) {
const cacheKey = `relay:entitlement:${userId}`;
try {
const cached = await upstashRest('GET', cacheKey);
if (cached !== null) return Number(cached) >= 1;
} catch { /* miss */ }
try {
const res = await fetch(`${CONVEX_SITE_URL}/relay/entitlement`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${RELAY_SECRET}`, 'User-Agent': 'worldmonitor-digest/1.0' },
body: JSON.stringify({ userId }),
signal: AbortSignal.timeout(5000),
});
if (!res.ok) return true; // fail-open
const { tier } = await res.json();
await upstashRest('SET', cacheKey, String(tier ?? 0), 'EX', String(ENTITLEMENT_CACHE_TTL));
return (tier ?? 0) >= 1;
} catch {
return true; // fail-open
}
}
// ── Per-channel body composition ─────────────────────────────────────────────
const DIVIDER = '─'.repeat(40);
/**
* Compose the per-channel message bodies for a single digest rule.
* Keeps the per-channel formatting logic out of main() so its cognitive
* complexity stays within the lint budget.
*/
function buildChannelBodies(storyListPlain, aiSummary, magazineUrl) {
// The URL is already HMAC-signed and shape-validated at sign time
// (userId regex + YYYY-MM-DD), but we still escape it per-target
// as defence-in-depth — same discipline injectBriefCta uses for
// the email button. Each target has different metacharacter rules.
const telegramSafeUrl = magazineUrl
? String(magazineUrl)
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;')
: '';
const slackSafeUrl = magazineUrl
? String(magazineUrl).replace(/[<>|]/g, '')
: '';
const briefFooterPlain = magazineUrl
? `\n\n${DIVIDER}\n\n📖 Open your WorldMonitor Brief magazine:\n${magazineUrl}`
: '';
const briefFooterTelegram = magazineUrl
? `\n\n${DIVIDER}\n\n📖 <a href="${telegramSafeUrl}">Open your WorldMonitor Brief magazine</a>`
: '';
const briefFooterSlack = magazineUrl
? `\n\n${DIVIDER}\n\n📖 <${slackSafeUrl}|Open your WorldMonitor Brief magazine>`
: '';
const briefFooterDiscord = magazineUrl
? `\n\n${DIVIDER}\n\n📖 [Open your WorldMonitor Brief magazine](${magazineUrl})`
: '';
if (!aiSummary) {
return {
text: `${storyListPlain}${briefFooterPlain}`,
telegramText: `${escapeTelegramHtml(storyListPlain)}${briefFooterTelegram}`,
slackText: `${escapeSlackMrkdwn(storyListPlain)}${briefFooterSlack}`,
discordText: `${storyListPlain}${briefFooterDiscord}`,
};
}
return {
text: `EXECUTIVE SUMMARY\n\n${aiSummary}\n\n${DIVIDER}\n\n${storyListPlain}${briefFooterPlain}`,
telegramText: `<b>EXECUTIVE SUMMARY</b>\n\n${markdownToTelegramHtml(aiSummary)}\n\n${DIVIDER}\n\n${escapeTelegramHtml(storyListPlain)}${briefFooterTelegram}`,
slackText: `*EXECUTIVE SUMMARY*\n\n${markdownToSlackMrkdwn(aiSummary)}\n\n${DIVIDER}\n\n${escapeSlackMrkdwn(storyListPlain)}${briefFooterSlack}`,
discordText: `**EXECUTIVE SUMMARY**\n\n${markdownToDiscord(aiSummary)}\n\n${DIVIDER}\n\n${storyListPlain}${briefFooterDiscord}`,
};
}
/**
* Inject the formatted AI summary into the HTML email template's slot,
* or strip the slot placeholder when there is no summary.
*/
function injectEmailSummary(html, aiSummary) {
if (!html) return html;
if (!aiSummary) return html.replace('<div data-ai-summary-slot></div>', '');
const formattedSummary = markdownToEmailHtml(aiSummary);
const summaryHtml = `<div style="background:#161616;border:1px solid #222;border-left:3px solid #4ade80;padding:18px 22px;margin:0 0 24px 0;">
<div style="font-size:10px;font-weight:700;text-transform:uppercase;letter-spacing:1.5px;color:#4ade80;margin-bottom:10px;">Executive Summary</div>
<div style="font-size:13px;line-height:1.7;color:#ccc;">${formattedSummary}</div>
</div>`;
return html.replace('<div data-ai-summary-slot></div>', summaryHtml);
}
/**
* Inject the "Open your brief" CTA into the email HTML. Placed near
* the top of the body so recipients see the magazine link before the
* story list. Uses inline styles only (Gmail / Outlook friendly).
* When no magazineUrl is present (composer skipped / signing
* failed), the slot is stripped so the email stays clean.
*/
function injectBriefCta(html, magazineUrl) {
if (!html) return html;
if (!magazineUrl) return html.replace('<div data-brief-cta-slot></div>', '');
const escapedUrl = String(magazineUrl)
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;');
const ctaHtml = `<div style="margin:0 0 24px 0;">
<a href="${escapedUrl}" style="display:inline-block;background:#f2ede4;color:#0a0a0a;text-decoration:none;font-weight:700;font-size:14px;letter-spacing:0.08em;padding:14px 22px;border-radius:4px;">Open your WorldMonitor Brief →</a>
<div style="margin-top:10px;font-size:11px;color:#888;line-height:1.5;">Your personalised editorial magazine. Opens in the browser — scroll or swipe through today's threads.</div>
</div>`;
return html.replace('<div data-brief-cta-slot></div>', ctaHtml);
}
// ── Brief composition (runs once per cron tick, before digest loop) ─────────
/**
* Write brief:{userId}:{issueDate} for every eligible user and
* return { briefByUser, counters } for the digest loop + main's
* end-of-run exit gate. One brief per user regardless of how many
* variants they have enabled.
*
* Returns empty counters when brief composition is disabled,
* insights are unavailable, or the signing secret is missing. Never
* throws — the digest send path must remain independent of the
* brief path, so main() handles exit-codes at the very end AFTER
* the digest has been dispatched.
*
* @param {unknown[]} rules
* @param {number} nowMs
* @returns {Promise<{ briefByUser: Map<string, object>; composeSuccess: number; composeFailed: number }>}
*/
async function composeBriefsForRun(rules, nowMs) {
const briefByUser = new Map();
// Missing secret without explicit operator-disable = misconfigured
// rollout. Count it as a compose failure so the end-of-run exit
// gate trips and Railway flags the run red. Digest send still
// proceeds (compose failures must never block notification
// delivery to users).
if (BRIEF_SIGNING_SECRET_MISSING) {
console.error(
'[digest] brief: BRIEF_URL_SIGNING_SECRET not configured. Set BRIEF_COMPOSE_ENABLED=0 to silence intentionally.',
);
return { briefByUser, composeSuccess: 0, composeFailed: 1 };
}
if (!BRIEF_COMPOSE_ENABLED) return { briefByUser, composeSuccess: 0, composeFailed: 0 };
// The brief's story list now comes from the same digest accumulator
// the email reads (buildDigest). news:insights:v1 is still consulted
// for the global "clusters / multi-source" stat-page numbers, but no
// longer for the story list itself. A failed or empty insights fetch
// is NOT fatal — we fall back to zeroed numbers and still ship the
// brief, because the stories are what matter. (A mismatched brief
// was far worse than a brief with dashes on the stats page.)
let insightsNumbers = { clusters: 0, multiSource: 0 };
try {
const insightsRaw = await readRawJsonFromUpstash(INSIGHTS_KEY);
if (insightsRaw) insightsNumbers = extractInsights(insightsRaw).numbers;
} catch (err) {
console.warn('[digest] brief: insights read failed, using zeroed stats:', err.message);
}
// Memoize buildDigest by (variant, lang, windowStart). Many users
// share a variant/lang, so this saves ZRANGE + HGETALL round-trips
// across the per-user loop. Scoped to this cron run — no cross-run
// memoization needed (Redis is authoritative).
const windowStart = nowMs - BRIEF_STORY_WINDOW_MS;
const digestCache = new Map();
async function digestFor(candidate) {
const key = `${candidate.variant ?? 'full'}:${candidate.lang ?? 'en'}:${windowStart}`;
if (digestCache.has(key)) return digestCache.get(key);
const stories = await buildDigest(candidate, windowStart);
digestCache.set(key, stories ?? []);
return stories ?? [];
}
const eligibleByUser = groupEligibleRulesByUser(rules);
let composeSuccess = 0;
let composeFailed = 0;
for (const [userId, candidates] of eligibleByUser) {
try {
const hit = await composeAndStoreBriefForUser(userId, candidates, insightsNumbers, digestFor, nowMs);
if (hit) {
briefByUser.set(userId, hit);
composeSuccess++;
}
} catch (err) {
composeFailed++;
if (err instanceof BriefUrlError) {
console.warn(`[digest] brief: sign failed for ${userId} (${err.code}): ${err.message}`);
} else {
console.warn(`[digest] brief: compose failed for ${userId}:`, err.message);
}
}
}
console.log(
`[digest] brief: compose_success=${composeSuccess} compose_failed=${composeFailed} total_users=${eligibleByUser.size}`,
);
return { briefByUser, composeSuccess, composeFailed };
}
/**
* Per-user: walk candidates, for each pull the per-variant digest
* story pool (same pool buildDigest feeds to the email), and compose
* the brief envelope from the first candidate that yields non-empty
* stories. SETEX the envelope, sign the magazine URL. Returns the
* entry the caller should stash in briefByUser, or null when no
* candidate had stories.
*/
async function composeAndStoreBriefForUser(userId, candidates, insightsNumbers, digestFor, nowMs) {
let envelope = null;
let chosenVariant = null;
let chosenCandidate = null;
for (const candidate of candidates) {
const digestStories = await digestFor(candidate);
if (!digestStories || digestStories.length === 0) continue;
const composed = composeBriefFromDigestStories(
candidate,
digestStories,
insightsNumbers,
{ nowMs },
);
if (composed) {
envelope = composed;
chosenVariant = candidate.variant;
chosenCandidate = candidate;
break;
}
}
if (!envelope) return null;
// Phase 3b — LLM enrichment. Substitutes the stubbed whyMatters /
// lead / threads / signals fields with Gemini 2.5 Flash output.
// Pure passthrough on any failure: the baseline envelope has
// already passed validation and is safe to ship as-is. Do NOT
// abort composition if the LLM is down; the stub is better than
// no brief.
if (BRIEF_LLM_ENABLED && chosenCandidate) {
const baseline = envelope;
try {
const enriched = await enrichBriefEnvelopeWithLLM(envelope, chosenCandidate, briefLlmDeps);
// Defence in depth: re-validate the enriched envelope against
// the renderer's strict contract before we SETEX it. If
// enrichment produced a structurally broken shape (bad cache
// row, code bug, upstream type drift) we'd otherwise SETEX it
// and /api/brief would 404 the user's brief at read time. Fall
// back to the unenriched baseline — which is already known to
// pass assertBriefEnvelope() because composeBriefFromDigestStories
// asserted on construction.
try {
assertBriefEnvelope(enriched);
envelope = enriched;
} catch (assertErr) {
console.warn(`[digest] brief: enriched envelope failed assertion for ${userId} — shipping stubbed:`, assertErr?.message);
envelope = baseline;
}
} catch (err) {
console.warn(`[digest] brief: LLM enrichment threw for ${userId} — shipping stubbed envelope:`, err?.message);
envelope = baseline;
}
}
const issueDate = envelope.data.date;
const key = `brief:${userId}:${issueDate}`;
const pipelineResult = await redisPipeline([
['SETEX', key, String(BRIEF_TTL_SECONDS), JSON.stringify(envelope)],
]);
if (!pipelineResult || !Array.isArray(pipelineResult) || pipelineResult.length === 0) {
throw new Error('null pipeline response from Upstash');
}
const cell = pipelineResult[0];
if (cell && typeof cell === 'object' && 'error' in cell) {
throw new Error(`Upstash SETEX error: ${cell.error}`);
}
const magazineUrl = await signBriefUrl({
userId,
issueDate,
baseUrl: WORLDMONITOR_PUBLIC_BASE_URL,
secret: BRIEF_URL_SIGNING_SECRET,
});
return { envelope, magazineUrl, chosenVariant };
}
// ── Main ──────────────────────────────────────────────────────────────────────
async function main() {
const nowMs = Date.now();
console.log('[digest] Cron run start:', new Date(nowMs).toISOString());
let rules;
try {
const res = await fetch(`${CONVEX_SITE_URL}/relay/digest-rules`, {
method: 'GET',
headers: {
Authorization: `Bearer ${RELAY_SECRET}`,
'User-Agent': 'worldmonitor-digest/1.0',
},
signal: AbortSignal.timeout(10000),
});
if (!res.ok) {
console.error('[digest] Failed to fetch rules:', res.status);
return;
}
rules = await res.json();
} catch (err) {
console.error('[digest] Fetch rules failed:', err.message);
return;
}
if (!Array.isArray(rules) || rules.length === 0) {
console.log('[digest] No digest rules found — nothing to do');
return;
}
// Compose per-user brief envelopes once per run (extracted so main's
// complexity score stays in the biome budget). Failures MUST NOT
// block digest sends — we carry counters forward and apply the
// exit-non-zero gate AFTER the digest dispatch so Railway still
// surfaces compose-layer breakage without skipping user-visible
// digest delivery.
const { briefByUser, composeSuccess, composeFailed } = await composeBriefsForRun(rules, nowMs);
let sentCount = 0;
for (const rule of rules) {
if (!rule.userId || !rule.variant) continue;
const lastSentKey = `digest:last-sent:v1:${rule.userId}:${rule.variant}`;
let lastSentAt = null;
try {
const raw = await upstashRest('GET', lastSentKey);
if (raw) {
const parsed = JSON.parse(raw);
lastSentAt = typeof parsed.sentAt === 'number' ? parsed.sentAt : null;
}
} catch { /* first send */ }
if (!isDue(rule, lastSentAt)) continue;
const pro = await isUserPro(rule.userId);
if (!pro) {
console.log(`[digest] Skipping ${rule.userId} — not PRO`);
continue;
}
const windowStart = lastSentAt ?? (nowMs - DIGEST_LOOKBACK_MS);
const stories = await buildDigest(rule, windowStart);
if (!stories) {
console.log(`[digest] No stories in window for ${rule.userId} (${rule.variant})`);
continue;
}
let channels = [];
try {
const chRes = await fetch(`${CONVEX_SITE_URL}/relay/channels`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${RELAY_SECRET}`,
'User-Agent': 'worldmonitor-digest/1.0',
},
body: JSON.stringify({ userId: rule.userId }),
signal: AbortSignal.timeout(10000),
});
if (chRes.ok) channels = await chRes.json();
} catch (err) {
console.warn(`[digest] Channel fetch failed for ${rule.userId}:`, err.message);
}
const ruleChannelSet = new Set(rule.channels ?? []);
const deliverableChannels = channels.filter(ch => ruleChannelSet.has(ch.channelType) && ch.verified);
if (deliverableChannels.length === 0) {
console.log(`[digest] No deliverable channels for ${rule.userId} — skipping`);
continue;
}
let aiSummary = null;
if (AI_DIGEST_ENABLED && rule.aiDigestEnabled !== false) {
aiSummary = await generateAISummary(stories, rule);
}
const storyListPlain = formatDigest(stories, nowMs);
if (!storyListPlain) continue;
const htmlRaw = formatDigestHtml(stories, nowMs);
const brief = briefByUser.get(rule.userId);
const magazineUrl = brief?.magazineUrl ?? null;
const { text, telegramText, slackText, discordText } = buildChannelBodies(
storyListPlain,
aiSummary,
magazineUrl,
);
const htmlWithSummary = injectEmailSummary(htmlRaw, aiSummary);
const html = injectBriefCta(htmlWithSummary, magazineUrl);
const shortDate = new Intl.DateTimeFormat('en-US', { month: 'short', day: 'numeric' }).format(new Date(nowMs));
const subject = aiSummary ? `WorldMonitor Intelligence Brief — ${shortDate}` : `WorldMonitor Digest — ${shortDate}`;
let anyDelivered = false;
for (const ch of deliverableChannels) {
let ok = false;
if (ch.channelType === 'telegram' && ch.chatId) {
// Phase 8: send the 3-image carousel first (best-effort), then
// the full text. Caption on the carousel is a short teaser —
// the long-form story list goes in the text message below so
// it remains forwardable / quotable on its own.
if (magazineUrl) {
const caption = `<b>WorldMonitor Brief — ${shortDate}</b>\n${stories.length} ${stories.length === 1 ? 'thread' : 'threads'} on the desk today.`;
await sendTelegramBriefCarousel(rule.userId, ch.chatId, caption, magazineUrl);
}
ok = await sendTelegram(rule.userId, ch.chatId, telegramText);
} else if (ch.channelType === 'slack' && ch.webhookEnvelope) {
ok = await sendSlack(rule.userId, ch.webhookEnvelope, slackText);
} else if (ch.channelType === 'discord' && ch.webhookEnvelope) {
ok = await sendDiscord(rule.userId, ch.webhookEnvelope, discordText);
} else if (ch.channelType === 'email' && ch.email) {
ok = await sendEmail(ch.email, subject, text, html);
} else if (ch.channelType === 'webhook' && ch.webhookEnvelope) {
ok = await sendWebhook(rule.userId, ch.webhookEnvelope, stories, aiSummary);
}
if (ok) anyDelivered = true;
}
if (anyDelivered) {
await upstashRest(
'SET', lastSentKey, JSON.stringify({ sentAt: nowMs }), 'EX', '691200', // 8 days
);
sentCount++;
console.log(
`[digest] Sent ${stories.length} stories to ${rule.userId} (${rule.variant}, ${rule.digestMode})`,
);
}
}
console.log(`[digest] Cron run complete: ${sentCount} digest(s) sent`);
// Brief-compose failure gate. Runs at the very end so a compose-
// layer outage (Upstash blip, insights key stale, signing secret
// missing) never blocks digest delivery to users — but Railway
// still flips the run red so ops see the signal. Denominator is
// attempted writes (shouldExitNonZero enforces this).
if (shouldExitOnBriefFailures({ success: composeSuccess, failed: composeFailed })) {
console.warn(
`[digest] brief: exiting non-zero — compose_failed=${composeFailed} compose_success=${composeSuccess} crossed the threshold`,
);
process.exit(1);
}
}
main().catch((err) => {
console.error('[digest] Fatal:', err);
process.exit(1);
});