mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
* feat(gateway): thread Vercel Edge ctx through createDomainGateway (#3381) PR-0 of the Axiom usage-telemetry stack. Pure infra change: no telemetry emission yet, only the signature plumbing required for ctx.waitUntil to exist on the hot path. - createDomainGateway returns (req, ctx) instead of (req) - rewriteToSebuf propagates ctx to its target gateway - 5 alias callsites updated to pass ctx through - ~30 [rpc].ts callsites unchanged (export default createDomainGateway(...)) Pattern reference: api/notification-channels.ts:166. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(usage): pure UsageIdentity resolver + Axiom emit primitives (#3381) server/_shared/usage-identity.ts - buildUsageIdentity: pure function, consumes already-resolved gateway state. - Static ENTERPRISE_KEY_TO_CUSTOMER map (explicit, reviewable in code). - Does not re-verify JWTs or re-validate API keys. server/_shared/usage.ts - buildRequestEvent / buildUpstreamEvent: allowlisted-primitive builders only. Never accept Request/Response — additive field leaks become structurally impossible. - emitUsageEvents → ctx.waitUntil(sendToAxiom). Direct fetch, 1.5s timeout, no retry, gated by USAGE_TELEMETRY=1 and AXIOM_API_TOKEN. - Sliding-window circuit breaker (5% over 5min, min 20 samples). Trips with one structured console.error; subsequent drops are 1%-sampled console.warn. - Header derivers reuse Vercel/CF headers for request_id, region, country, reqBytes; ua_hash null unless USAGE_UA_PEPPER is set (no stable fingerprinting). - Dev-only x-usage-telemetry response header for 2-second debugging. server/_shared/auth-session.ts - New resolveClerkSession returning { userId, orgId } in one JWT verify so customer_id can be Clerk org id without a second pass. resolveSessionUserId kept as back-compat wrapper. No emission wiring yet — that lands in the next commit (gateway request event + 403 + 429). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(gateway): emit Axiom request events on every return path (#3381) Wires the request-event side of the Axiom usage-telemetry stack. Behind USAGE_TELEMETRY=1 — no-op when the env var is unset. Emit points (each builds identity from accumulated gateway state): - origin_403 disallowed origin → reason=origin_403 - API access subscription required (403) - legacy bearer 401 / 403 / 401-without-bearer - entitlement check fail-through - endpoint rate-limit 429 → reason=rate_limit_429 - global rate-limit 429 → reason=rate_limit_429 - 405 method not allowed - 404 not found - 304 etag match (resolved cache tier) - 200 GET with body (resolved cache tier, real res_bytes) - streaming / non-GET-200 final return (res_bytes best-effort) Identity inputs (UsageIdentityInput): - sessionUserId / clerkOrgId from new resolveClerkSession (one JWT verify) - isUserApiKey + userApiKeyCustomerRef from validateUserApiKey result - enterpriseApiKey when keyCheck.valid + non-wm_ wmKey present - widgetKey from x-widget-key header (best-effort) - tier captured opportunistically from existing getEntitlements calls Header derivers reuse Vercel/CF metadata (x-vercel-id, x-vercel-ip-country, cf-ipcountry, content-length, sentry-trace) — no new geo lookup, no new crypto on the hot path. ua_hash null unless USAGE_UA_PEPPER is set. Dev-only x-usage-telemetry response header (ok | degraded | off) attached on the response paths for 2-second debugging in non-production. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(usage): upstream events via implicit request scope (#3381) Closes the upstream-attribution side of the Axiom usage-telemetry stack without requiring leaf-handler changes (per koala's review). server/_shared/usage.ts - AsyncLocalStorage-backed UsageScope: gateway sets it once per request, fetch helpers read from it lazily. Defensive import — if the runtime rejects node:async_hooks, scope helpers degrade to no-ops and the request event is unaffected. - runWithUsageScope(scope, fn) / getUsageScope() exports. server/gateway.ts - Wraps matchedHandler in runWithUsageScope({ ctx, requestId, customerId, route, tier }) so deep fetchers can attribute upstream calls without threading state through every handler signature. server/_shared/redis.ts - cachedFetchJsonWithMeta accepts opts.usage = { provider, operation? }. Only the provider label is required to opt in — request_id / customer_id / route / tier flow implicitly from UsageScope. - Emits on the fresh path only (cache hits don't emit; the inbound request event already records cache_status). - cache_status correctly distinguishes 'miss' vs 'neg-sentinel' by construction, matching NEG_SENTINEL handling. - Telemetry never throws — failures are swallowed in the lazy-import catch, sink itself short-circuits on USAGE_TELEMETRY=0. server/_shared/fetch-json.ts - New optional { provider, operation } in FetchJsonOptions. Same opt-in-by-provider model as cachedFetchJsonWithMeta. Auto-derives host from URL. Reads body via .text() so response_bytes is recorded (best-effort; chunked responses still report 0). Net result: any handler that uses fetchJson or cachedFetchJsonWithMeta gets full per-customer upstream attribution by adding two fields to the options bag. No signature changes anywhere else. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(gateway): address round-1 codex feedback on usage telemetry - ctx is now optional on the createDomainGateway handler signature so direct callers (tests, non-Vercel paths) no longer crash on emit - legacy premium bearer-token routes (resilience, shipping-v2) propagate session.userId into the usage accumulator so successful requests are attributed instead of emitting as anon - after checkEntitlement allows a tier-gated route, re-read entitlements (Redis-cached + in-flight coalesced) to populate usage.tier so analyze-stock & co. emit the correct tier rather than 0 - domain extraction now skips a leading vN segment, so /api/v2/shipping/* records domain="shipping" instead of "v2" Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(usage): assert telemetry payload + identity resolver + operator guide - tests/usage-telemetry-emission.test.mts stubs globalThis.fetch to capture the Axiom ingest POST body and asserts the four review-flagged fields end-to-end through the gateway: domain on /api/v2/<svc>/* (was "v2"), customer_id on legacy premium bearer success (was null/anon), tier on entitlement-gated success via the Convex fallback path (was 0), plus a ctx-optional regression guard - server/__tests__/usage-identity.test.ts unit-tests the pure buildUsageIdentity() resolver across every auth_kind branch, tier coercion, and the secret-handling invariant (raw enterprise key never lands in any output field) - docs/architecture/usage-telemetry.md is the operator + dev guide: field reference, architecture, configuration, failure modes, local workflow, eight Axiom APL recipes, and runbooks for adding fields / new gateway return paths Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(usage): make recorder.settled robust to nested waitUntil Promise.all(pending) snapshotted the array at call time, missing the inner ctx.waitUntil(sendToAxiom(...)) that emitUsageEvents pushes after the outer drain begins. Tests passed only because the fetch spy resolved in an earlier microtask tick. Replace with a quiescence loop so the helper survives any future async in the emit path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore: trigger preview * fix(usage): address koala #3403 review — collapse nested waitUntil, widget-key validation, neg-sentinel status, auth_* reasons P1 - Collapse nested ctx.waitUntil at all 3 emit sites (gateway.ts emitRequest, fetch-json.ts, redis.ts emitUpstreamFromHook). Export sendToAxiom and call it directly inside the outer waitUntil so Edge runtimes don't drop the delivery promise after the response phase. - Validate X-Widget-Key against WIDGET_AGENT_KEY before populating usage.widgetKey so unauthenticated callers can't spoof per-customer attribution. P2 - Emit on OPTIONS preflight (new 'preflight' RequestReason). - Gate cachedFetchJsonWithMeta upstreamStatus=200 on result != null so the neg-sentinel branch no longer reports as a successful upstream call. - Extend RequestReason with auth_401/auth_403/tier_403 and replace reason:'ok' on every auth/tier-rejection emit path. - Replace 32-bit FNV-1a with a two-round XOR-folded 64-bit variant in hashKeySync (collision space matters once widget-key adoption grows). Verification - tests/usage-telemetry-emission.test.mts — 6/6 - tests/premium-stock-gateway.test.mts + tests/gateway-cdn-origin-policy.test.mts — 15/15 - npx vitest run server/__tests__/usage-identity.test.ts — 13/13 - npx tsc --noEmit clean Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore: trigger preview rebuild for AXIOM_API_TOKEN * chore(usage): note Axiom region in ingest URL comment Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * debug(usage): unconditional logs in sendToAxiom for preview troubleshooting Temporary — to be reverted once Axiom delivery is confirmed working in preview. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(usage): add 'live' cache tier + revert preview debug logs - Sync UsageCacheTier with the local CacheTier in gateway.ts (main added 'live' in PR #3402 — synthetic merge with main was failing typecheck:api). - Revert temporary unconditional debug logs in sendToAxiom now that Axiom delivery is verified end-to-end on preview (event landed with all fields populated, including the new auth_401 reason from the koala #3403 fix). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
504 lines
19 KiB
TypeScript
504 lines
19 KiB
TypeScript
import { unwrapEnvelope } from './seed-envelope';
|
|
import { buildUpstreamEvent, getUsageScope, sendToAxiom } from './usage';
|
|
|
|
const REDIS_OP_TIMEOUT_MS = 1_500;
|
|
const REDIS_PIPELINE_TIMEOUT_MS = 5_000;
|
|
|
|
function errMsg(err: unknown): string {
|
|
return err instanceof Error ? err.message : String(err);
|
|
}
|
|
|
|
/**
|
|
* Environment-based key prefix to avoid collisions when multiple deployments
|
|
* share the same Upstash Redis instance (M-6 fix).
|
|
*/
|
|
function getKeyPrefix(): string {
|
|
const env = process.env.VERCEL_ENV; // 'production' | 'preview' | 'development'
|
|
if (!env || env === 'production') return '';
|
|
const sha = process.env.VERCEL_GIT_COMMIT_SHA?.slice(0, 8) || 'dev';
|
|
return `${env}:${sha}:`;
|
|
}
|
|
|
|
let cachedPrefix: string | undefined;
|
|
function prefixKey(key: string): string {
|
|
if (cachedPrefix === undefined) cachedPrefix = getKeyPrefix();
|
|
if (!cachedPrefix) return key;
|
|
return `${cachedPrefix}${key}`;
|
|
}
|
|
|
|
// Test-only: invalidate the memoized key prefix so a test that mutates
|
|
// process.env.VERCEL_ENV / VERCEL_GIT_COMMIT_SHA sees the new value on the
|
|
// next read. No production caller should ever invoke this.
|
|
export function __resetKeyPrefixCacheForTests(): void {
|
|
cachedPrefix = undefined;
|
|
}
|
|
|
|
/**
|
|
* Like getCachedJson but throws on Redis/network failures instead of returning null.
|
|
* Always uses the raw (unprefixed) key — callers that write via seed scripts (which bypass
|
|
* the prefix system) must use this to read the same key they wrote.
|
|
*/
|
|
export async function getRawJson(key: string): Promise<unknown | null> {
|
|
if (process.env.LOCAL_API_MODE === 'tauri-sidecar') {
|
|
const { sidecarCacheGet } = await import('./sidecar-cache');
|
|
return sidecarCacheGet(key);
|
|
}
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) throw new Error('Redis credentials not configured');
|
|
const resp = await fetch(`${url}/get/${encodeURIComponent(key)}`, {
|
|
headers: { Authorization: `Bearer ${token}` },
|
|
signal: AbortSignal.timeout(REDIS_OP_TIMEOUT_MS),
|
|
});
|
|
if (!resp.ok) throw new Error(`Redis HTTP ${resp.status}`);
|
|
const data = (await resp.json()) as { result?: string };
|
|
if (!data.result) return null;
|
|
// Envelope-aware: contract-mode canonical keys are stored as {_seed, data}.
|
|
// unwrapEnvelope is a no-op on legacy (non-envelope) shapes.
|
|
return unwrapEnvelope(JSON.parse(data.result)).data;
|
|
}
|
|
|
|
/**
|
|
* Read a key's value as a raw Upstash string — no JSON.parse, no envelope unwrap.
|
|
* Use when a seeder stores a bare scalar (e.g., a snapshot_id pointer) via
|
|
* `['SET', key, bareString]` without JSON.stringify. getCachedJson() on these
|
|
* keys silently returns null because JSON.parse throws on unquoted strings,
|
|
* and the try/catch swallows the error.
|
|
*
|
|
* Always uses the raw (unprefixed) key — matches the seed-script write path
|
|
* (seeders don't know about the Vercel env-prefix scheme).
|
|
*/
|
|
export async function getCachedRawString(key: string): Promise<string | null> {
|
|
if (process.env.LOCAL_API_MODE === 'tauri-sidecar') {
|
|
const { sidecarCacheGet } = await import('./sidecar-cache');
|
|
const v = sidecarCacheGet(key);
|
|
return typeof v === 'string' ? v : null;
|
|
}
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) return null;
|
|
try {
|
|
const resp = await fetch(`${url}/get/${encodeURIComponent(key)}`, {
|
|
headers: { Authorization: `Bearer ${token}` },
|
|
signal: AbortSignal.timeout(REDIS_OP_TIMEOUT_MS),
|
|
});
|
|
if (!resp.ok) return null;
|
|
const data = (await resp.json()) as { result?: string | null };
|
|
return typeof data.result === 'string' && data.result.length > 0 ? data.result : null;
|
|
} catch (err) {
|
|
// AbortSignal.timeout() throws DOMException name='TimeoutError' (on V8
|
|
// runtimes incl. Vercel Edge); manual controller.abort() throws 'AbortError'.
|
|
// Match both so the [REDIS-TIMEOUT] structured log actually fires.
|
|
const isTimeout = err instanceof Error && (err.name === 'TimeoutError' || err.name === 'AbortError');
|
|
if (isTimeout) console.error(`[REDIS-TIMEOUT] getCachedRawString key=${key} timeoutMs=${REDIS_OP_TIMEOUT_MS}`);
|
|
else console.warn('[redis] getCachedRawString failed:', errMsg(err));
|
|
return null;
|
|
}
|
|
}
|
|
|
|
export async function getCachedJson(key: string, raw = false): Promise<unknown | null> {
|
|
if (process.env.LOCAL_API_MODE === 'tauri-sidecar') {
|
|
const { sidecarCacheGet } = await import('./sidecar-cache');
|
|
return sidecarCacheGet(key);
|
|
}
|
|
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) return null;
|
|
try {
|
|
const finalKey = raw ? key : prefixKey(key);
|
|
const resp = await fetch(`${url}/get/${encodeURIComponent(finalKey)}`, {
|
|
headers: { Authorization: `Bearer ${token}` },
|
|
signal: AbortSignal.timeout(REDIS_OP_TIMEOUT_MS),
|
|
});
|
|
if (!resp.ok) return null;
|
|
const data = (await resp.json()) as { result?: string };
|
|
if (!data.result) return null;
|
|
// Envelope-aware by default — RPC consumers get the bare payload regardless
|
|
// of whether the writer has migrated to contract mode. Legacy shapes pass
|
|
// through unchanged (unwrapEnvelope returns {_seed: null, data: raw}).
|
|
return unwrapEnvelope(JSON.parse(data.result)).data;
|
|
} catch (err) {
|
|
// Structured timeout log goes to Sentry via Vercel integration. Large-
|
|
// payload timeouts used to silently return null and let downstream callers
|
|
// cache zero-state — see docs/plans/chokepoint-rpc-payload-split.md for
|
|
// the incident that added this tag.
|
|
//
|
|
// AbortSignal.timeout() throws DOMException name='TimeoutError' (on V8
|
|
// runtimes incl. Vercel Edge); manual controller.abort() throws
|
|
// 'AbortError'. Checking only 'AbortError' meant the [REDIS-TIMEOUT] log
|
|
// never fired — every timeout fell through to the generic console.warn.
|
|
const isTimeout = err instanceof Error && (err.name === 'TimeoutError' || err.name === 'AbortError');
|
|
if (isTimeout) {
|
|
console.error(`[REDIS-TIMEOUT] getCachedJson key=${key} timeoutMs=${REDIS_OP_TIMEOUT_MS}`);
|
|
} else {
|
|
console.warn('[redis] getCachedJson failed:', errMsg(err));
|
|
}
|
|
return null;
|
|
}
|
|
}
|
|
|
|
export async function setCachedJson(key: string, value: unknown, ttlSeconds: number, raw = false): Promise<void> {
|
|
if (process.env.LOCAL_API_MODE === 'tauri-sidecar') {
|
|
const { sidecarCacheSet } = await import('./sidecar-cache');
|
|
sidecarCacheSet(key, value, ttlSeconds);
|
|
return;
|
|
}
|
|
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) return;
|
|
try {
|
|
const finalKey = raw ? key : prefixKey(key);
|
|
// Atomic SET with EX — single call avoids race between SET and EXPIRE (C-3 fix)
|
|
await fetch(`${url}/set/${encodeURIComponent(finalKey)}/${encodeURIComponent(JSON.stringify(value))}/EX/${ttlSeconds}`, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}` },
|
|
signal: AbortSignal.timeout(REDIS_OP_TIMEOUT_MS),
|
|
});
|
|
} catch (err) {
|
|
console.warn('[redis] setCachedJson failed:', errMsg(err));
|
|
}
|
|
}
|
|
|
|
const NEG_SENTINEL = '__WM_NEG__';
|
|
|
|
/**
|
|
* Batch GET using Upstash pipeline API — single HTTP round-trip for N keys.
|
|
* Returns a Map of key → parsed JSON value (missing/failed/sentinel keys omitted).
|
|
*/
|
|
export async function getCachedJsonBatch(keys: string[]): Promise<Map<string, unknown>> {
|
|
const result = new Map<string, unknown>();
|
|
if (keys.length === 0) return result;
|
|
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) return result;
|
|
|
|
try {
|
|
const pipeline = keys.map((k) => ['GET', prefixKey(k)]);
|
|
const resp = await fetch(`${url}/pipeline`, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(pipeline),
|
|
signal: AbortSignal.timeout(REDIS_PIPELINE_TIMEOUT_MS),
|
|
});
|
|
if (!resp.ok) return result;
|
|
|
|
const data = (await resp.json()) as Array<{ result?: string }>;
|
|
for (let i = 0; i < keys.length; i++) {
|
|
const raw = data[i]?.result;
|
|
if (raw) {
|
|
try {
|
|
const parsed = JSON.parse(raw);
|
|
if (parsed === NEG_SENTINEL) continue;
|
|
// Envelope-aware: unwrap contract-mode canonical keys; legacy values
|
|
// pass through.
|
|
result.set(keys[i]!, unwrapEnvelope(parsed).data);
|
|
} catch { /* skip malformed */ }
|
|
}
|
|
}
|
|
} catch (err) {
|
|
console.warn('[redis] getCachedJsonBatch failed:', errMsg(err));
|
|
}
|
|
return result;
|
|
}
|
|
|
|
export type RedisPipelineCommand = Array<string | number>;
|
|
|
|
function normalizePipelineCommand(command: RedisPipelineCommand, raw: boolean): RedisPipelineCommand {
|
|
if (raw || command.length < 2) return [...command];
|
|
const [verb, key, ...rest] = command;
|
|
if (typeof verb !== 'string' || typeof key !== 'string') return [...command];
|
|
return [verb, prefixKey(key), ...rest];
|
|
}
|
|
|
|
export async function runRedisPipeline(
|
|
commands: RedisPipelineCommand[],
|
|
raw = false,
|
|
): Promise<Array<{ result?: unknown }>> {
|
|
if (process.env.LOCAL_API_MODE === 'tauri-sidecar') return [];
|
|
if (commands.length === 0) return [];
|
|
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) return [];
|
|
|
|
try {
|
|
const response = await fetch(`${url}/pipeline`, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(commands.map((command) => normalizePipelineCommand(command, raw))),
|
|
signal: AbortSignal.timeout(REDIS_PIPELINE_TIMEOUT_MS),
|
|
});
|
|
if (!response.ok) {
|
|
console.warn(`[redis] runRedisPipeline HTTP ${response.status}`);
|
|
return [];
|
|
}
|
|
return await response.json() as Array<{ result?: unknown }>;
|
|
} catch (err) {
|
|
console.warn('[redis] runRedisPipeline failed:', errMsg(err));
|
|
return [];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* In-flight request coalescing map.
|
|
* When multiple concurrent requests hit the same cache key during a miss,
|
|
* only the first triggers the upstream fetch — others await the same promise.
|
|
* This eliminates duplicate upstream API calls within a single Edge Function invocation.
|
|
*/
|
|
const inflight = new Map<string, Promise<unknown>>();
|
|
|
|
/**
|
|
* Check cache, then fetch with coalescing on miss.
|
|
* Concurrent callers for the same key share a single upstream fetch + Redis write.
|
|
* When fetcher returns null, a sentinel is cached for negativeTtlSeconds to prevent request storms.
|
|
*/
|
|
export async function cachedFetchJson<T extends object>(
|
|
key: string,
|
|
ttlSeconds: number,
|
|
fetcher: () => Promise<T | null>,
|
|
negativeTtlSeconds = 120,
|
|
): Promise<T | null> {
|
|
const cached = await getCachedJson(key);
|
|
if (cached === NEG_SENTINEL) return null;
|
|
if (cached !== null) return cached as T;
|
|
|
|
const existing = inflight.get(key);
|
|
if (existing) return existing as Promise<T | null>;
|
|
|
|
const promise = fetcher()
|
|
.then(async (result) => {
|
|
if (result != null) {
|
|
await setCachedJson(key, result, ttlSeconds);
|
|
} else {
|
|
await setCachedJson(key, NEG_SENTINEL, negativeTtlSeconds);
|
|
}
|
|
return result;
|
|
})
|
|
.catch((err: unknown) => {
|
|
console.warn(`[redis] cachedFetchJson fetcher failed for "${key}":`, errMsg(err));
|
|
throw err;
|
|
})
|
|
.finally(() => {
|
|
inflight.delete(key);
|
|
});
|
|
|
|
inflight.set(key, promise);
|
|
return promise;
|
|
}
|
|
|
|
/**
|
|
* Per-call usage-telemetry hook for upstream event emission (issue #3381).
|
|
*
|
|
* The only required field is `provider` — its presence is what tells the
|
|
* helper "emit an upstream event for this call." Everything else is filled
|
|
* in by the gateway-set UsageScope (request_id, customer_id, route, tier,
|
|
* ctx) via AsyncLocalStorage. Pass overrides explicitly if you need to.
|
|
*
|
|
* Use this when calling fetchJson / cachedFetchJsonWithMeta from a code
|
|
* path that runs inside a gateway-handled request. For helpers used
|
|
* outside any request (cron, scripts), no scope exists and emission is
|
|
* skipped silently.
|
|
*/
|
|
export interface UsageHook {
|
|
provider: string;
|
|
operation?: string;
|
|
host?: string;
|
|
// Overrides — leave unset to inherit from gateway-set UsageScope.
|
|
ctx?: { waitUntil: (p: Promise<unknown>) => void };
|
|
requestId?: string;
|
|
customerId?: string | null;
|
|
route?: string;
|
|
tier?: number;
|
|
}
|
|
|
|
/**
|
|
* Like cachedFetchJson but reports the data source.
|
|
* Use when callers need to distinguish cache hits from fresh fetches
|
|
* (e.g. to set provider/cached metadata on responses).
|
|
*
|
|
* Returns { data, source } where source is:
|
|
* 'cache' — served from Redis
|
|
* 'fresh' — fetcher ran (leader) or joined an in-flight fetch (follower)
|
|
*
|
|
* If `opts.usage` is supplied, an upstream event is emitted on the fresh
|
|
* path (issue #3381). Pass-through for callers that don't care about
|
|
* telemetry — backwards-compatible.
|
|
*/
|
|
export async function cachedFetchJsonWithMeta<T extends object>(
|
|
key: string,
|
|
ttlSeconds: number,
|
|
fetcher: () => Promise<T | null>,
|
|
negativeTtlSeconds = 120,
|
|
opts?: { usage?: UsageHook },
|
|
): Promise<{ data: T | null; source: 'cache' | 'fresh' }> {
|
|
const cached = await getCachedJson(key);
|
|
if (cached === NEG_SENTINEL) return { data: null, source: 'cache' };
|
|
if (cached !== null) return { data: cached as T, source: 'cache' };
|
|
|
|
const existing = inflight.get(key);
|
|
if (existing) {
|
|
const data = (await existing) as T | null;
|
|
return { data, source: 'fresh' };
|
|
}
|
|
|
|
const fetchT0 = Date.now();
|
|
let upstreamStatus = 0;
|
|
let cacheStatus: 'miss' | 'neg-sentinel' = 'miss';
|
|
|
|
const promise = fetcher()
|
|
.then(async (result) => {
|
|
// Only count an upstream call as a 200 when it actually returned data.
|
|
// A null result triggers the neg-sentinel branch below — these are
|
|
// empty/failed upstream calls and must NOT show up as `status=200` in
|
|
// dashboards (would poison the cache-hit-ratio recipe and per-provider
|
|
// error rates). Use status=0 for the empty branch; cache_status carries
|
|
// the structural detail.
|
|
if (result != null) {
|
|
upstreamStatus = 200;
|
|
await setCachedJson(key, result, ttlSeconds);
|
|
} else {
|
|
upstreamStatus = 0;
|
|
cacheStatus = 'neg-sentinel';
|
|
await setCachedJson(key, NEG_SENTINEL, negativeTtlSeconds);
|
|
}
|
|
return result;
|
|
})
|
|
.catch((err: unknown) => {
|
|
upstreamStatus = 0;
|
|
console.warn(`[redis] cachedFetchJsonWithMeta fetcher failed for "${key}":`, errMsg(err));
|
|
throw err;
|
|
})
|
|
.finally(() => {
|
|
inflight.delete(key);
|
|
});
|
|
|
|
inflight.set(key, promise);
|
|
let data: T | null;
|
|
try {
|
|
data = await promise;
|
|
} finally {
|
|
emitUpstreamFromHook(opts?.usage, upstreamStatus, Date.now() - fetchT0, cacheStatus);
|
|
}
|
|
return { data, source: 'fresh' };
|
|
}
|
|
|
|
function emitUpstreamFromHook(
|
|
usage: UsageHook | undefined,
|
|
status: number,
|
|
durationMs: number,
|
|
cacheStatus: 'miss' | 'fresh' | 'stale-while-revalidate' | 'neg-sentinel',
|
|
): void {
|
|
// Emit only when caller labels the provider — avoids "unknown" pollution.
|
|
if (!usage?.provider) return;
|
|
// Single waitUntil() registered synchronously here — no nested
|
|
// ctx.waitUntil() inside Axiom delivery. Static import keeps the call
|
|
// synchronous so the runtime registers it during the request phase.
|
|
const scope = getUsageScope();
|
|
const ctx = usage.ctx ?? scope?.ctx;
|
|
if (!ctx) return;
|
|
const event = buildUpstreamEvent({
|
|
requestId: usage.requestId ?? scope?.requestId ?? '',
|
|
customerId: usage.customerId ?? scope?.customerId ?? null,
|
|
route: usage.route ?? scope?.route ?? '',
|
|
tier: usage.tier ?? scope?.tier ?? 0,
|
|
provider: usage.provider,
|
|
operation: usage.operation ?? 'fetch',
|
|
host: usage.host ?? '',
|
|
status,
|
|
durationMs,
|
|
requestBytes: 0,
|
|
responseBytes: 0,
|
|
cacheStatus,
|
|
});
|
|
try {
|
|
ctx.waitUntil(sendToAxiom([event]));
|
|
} catch {
|
|
/* telemetry must never throw */
|
|
}
|
|
}
|
|
|
|
export async function geoSearchByBox(
|
|
key: string, lon: number, lat: number,
|
|
widthKm: number, heightKm: number, count: number, raw = false,
|
|
): Promise<string[]> {
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) return [];
|
|
try {
|
|
const finalKey = raw ? key : prefixKey(key);
|
|
const pipeline = [
|
|
['GEOSEARCH', finalKey, 'FROMLONLAT', String(lon), String(lat),
|
|
'BYBOX', String(widthKm), String(heightKm), 'km', 'ASC', 'COUNT', String(count)],
|
|
];
|
|
const resp = await fetch(`${url}/pipeline`, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(pipeline),
|
|
signal: AbortSignal.timeout(REDIS_PIPELINE_TIMEOUT_MS),
|
|
});
|
|
if (!resp.ok) return [];
|
|
const data = (await resp.json()) as Array<{ result?: string[] }>;
|
|
return data[0]?.result ?? [];
|
|
} catch (err) {
|
|
console.warn('[redis] geoSearchByBox failed:', errMsg(err));
|
|
return [];
|
|
}
|
|
}
|
|
|
|
export async function getHashFieldsBatch(
|
|
key: string, fields: string[], raw = false,
|
|
): Promise<Map<string, string>> {
|
|
const result = new Map<string, string>();
|
|
if (fields.length === 0) return result;
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) return result;
|
|
try {
|
|
const finalKey = raw ? key : prefixKey(key);
|
|
const pipeline = [['HMGET', finalKey, ...fields]];
|
|
const resp = await fetch(`${url}/pipeline`, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(pipeline),
|
|
signal: AbortSignal.timeout(REDIS_PIPELINE_TIMEOUT_MS),
|
|
});
|
|
if (!resp.ok) return result;
|
|
const data = (await resp.json()) as Array<{ result?: (string | null)[] }>;
|
|
const values = data[0]?.result;
|
|
if (values) {
|
|
for (let i = 0; i < fields.length; i++) {
|
|
if (values[i]) result.set(fields[i]!, values[i]!);
|
|
}
|
|
}
|
|
} catch (err) {
|
|
console.warn('[redis] getHashFieldsBatch failed:', errMsg(err));
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Deletes a single Redis key via Upstash REST API.
|
|
*
|
|
* @param key - The key to delete
|
|
* @param raw - When true, skips the environment prefix (use for global keys like entitlements)
|
|
*/
|
|
export async function deleteRedisKey(key: string, raw = false): Promise<void> {
|
|
const url = process.env.UPSTASH_REDIS_REST_URL;
|
|
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
|
|
if (!url || !token) return;
|
|
|
|
try {
|
|
const finalKey = raw ? key : prefixKey(key);
|
|
await fetch(`${url}/del/${encodeURIComponent(finalKey)}`, {
|
|
method: 'POST',
|
|
headers: { Authorization: `Bearer ${token}` },
|
|
signal: AbortSignal.timeout(REDIS_OP_TIMEOUT_MS),
|
|
});
|
|
} catch (err) {
|
|
console.warn('[redis] deleteRedisKey failed:', errMsg(err));
|
|
}
|
|
}
|