diff --git a/api/scenario/v1/run.ts b/api/scenario/v1/run.ts index f3890f15a..c3473db53 100644 --- a/api/scenario/v1/run.ts +++ b/api/scenario/v1/run.ts @@ -4,5 +4,5 @@ import gateway from './[rpc]'; import { rewriteToSebuf } from '../../../server/alias-rewrite'; // Alias for documented v1 URL. See server/alias-rewrite.ts. -export default (req: Request) => - rewriteToSebuf(req, '/api/scenario/v1/run-scenario', gateway); +export default (req: Request, ctx: { waitUntil: (p: Promise) => void }) => + rewriteToSebuf(req, '/api/scenario/v1/run-scenario', gateway, ctx); diff --git a/api/scenario/v1/status.ts b/api/scenario/v1/status.ts index bcaac7420..08095161a 100644 --- a/api/scenario/v1/status.ts +++ b/api/scenario/v1/status.ts @@ -4,5 +4,5 @@ import gateway from './[rpc]'; import { rewriteToSebuf } from '../../../server/alias-rewrite'; // Alias for documented v1 URL. See server/alias-rewrite.ts. -export default (req: Request) => - rewriteToSebuf(req, '/api/scenario/v1/get-scenario-status', gateway); +export default (req: Request, ctx: { waitUntil: (p: Promise) => void }) => + rewriteToSebuf(req, '/api/scenario/v1/get-scenario-status', gateway, ctx); diff --git a/api/scenario/v1/templates.ts b/api/scenario/v1/templates.ts index 68af275a0..a8c23e892 100644 --- a/api/scenario/v1/templates.ts +++ b/api/scenario/v1/templates.ts @@ -4,5 +4,5 @@ import gateway from './[rpc]'; import { rewriteToSebuf } from '../../../server/alias-rewrite'; // Alias for documented v1 URL. See server/alias-rewrite.ts. -export default (req: Request) => - rewriteToSebuf(req, '/api/scenario/v1/list-scenario-templates', gateway); +export default (req: Request, ctx: { waitUntil: (p: Promise) => void }) => + rewriteToSebuf(req, '/api/scenario/v1/list-scenario-templates', gateway, ctx); diff --git a/api/supply-chain/v1/country-products.ts b/api/supply-chain/v1/country-products.ts index a9e57e79e..5ebe81b39 100644 --- a/api/supply-chain/v1/country-products.ts +++ b/api/supply-chain/v1/country-products.ts @@ -4,5 +4,5 @@ import gateway from './[rpc]'; import { rewriteToSebuf } from '../../../server/alias-rewrite'; // Alias for documented v1 URL. See server/alias-rewrite.ts. -export default (req: Request) => - rewriteToSebuf(req, '/api/supply-chain/v1/get-country-products', gateway); +export default (req: Request, ctx: { waitUntil: (p: Promise) => void }) => + rewriteToSebuf(req, '/api/supply-chain/v1/get-country-products', gateway, ctx); diff --git a/api/supply-chain/v1/multi-sector-cost-shock.ts b/api/supply-chain/v1/multi-sector-cost-shock.ts index badf0878f..baafdcebc 100644 --- a/api/supply-chain/v1/multi-sector-cost-shock.ts +++ b/api/supply-chain/v1/multi-sector-cost-shock.ts @@ -4,5 +4,5 @@ import gateway from './[rpc]'; import { rewriteToSebuf } from '../../../server/alias-rewrite'; // Alias for documented v1 URL. See server/alias-rewrite.ts. -export default (req: Request) => - rewriteToSebuf(req, '/api/supply-chain/v1/get-multi-sector-cost-shock', gateway); +export default (req: Request, ctx: { waitUntil: (p: Promise) => void }) => + rewriteToSebuf(req, '/api/supply-chain/v1/get-multi-sector-cost-shock', gateway, ctx); diff --git a/docs/architecture/usage-telemetry.md b/docs/architecture/usage-telemetry.md new file mode 100644 index 000000000..a08824545 --- /dev/null +++ b/docs/architecture/usage-telemetry.md @@ -0,0 +1,351 @@ +# Usage telemetry (Axiom) + +Operator + developer guide to the gateway's per-request usage telemetry pipeline. +Implements the requirements in `docs/brainstorms/2026-04-24-axiom-api-observability-requirements.md`. + +--- + +## What it is + +Every inbound API request that hits `createDomainGateway()` emits one structured +event to Axiom describing **who** called **what**, **how it was authenticated**, +**what it cost**, and **how it was served**. Deep fetch helpers +(`fetchJson`, `cachedFetchJsonWithMeta`) emit a second event type per upstream +call so customer × provider attribution is reconstructible. + +It is **observability only** — never on the request-critical path. The whole +sink runs inside `ctx.waitUntil(...)` with a 1.5s timeout, no retries, and a +circuit breaker that trips on 5% failure ratio over a 5-minute window. + +## What you get out of it + +Two event types in dataset `wm_api_usage`: + +### `request` (one per inbound request) + +| Field | Example | Notes | +|--------------------|-------------------------------------------|----------------------------------------------| +| `event_type` | `"request"` | | +| `request_id` | `"req_xxx"` | from `x-request-id` or generated | +| `route` | `/api/market/v1/analyze-stock` | | +| `domain` | `"market"` | strips leading `vN` for `/api/v2//…` | +| `method`, `status` | `"GET"`, `200` | | +| `duration_ms` | `412` | wall-clock at the gateway | +| `req_bytes`, `res_bytes` | | response counted only on 200/304 GET | +| `customer_id` | Clerk user ID, org ID, enterprise slug, or widget key | `null` only for anon | +| `principal_id` | user ID or **hash** of API/widget key | never the raw secret | +| `auth_kind` | `clerk_jwt` \| `user_api_key` \| `enterprise_api_key` \| `widget_key` \| `anon` | | +| `tier` | `0` free / `1` pro / `2` api / `3` enterprise | `0` if unknown | +| `cache_tier` | `fast` \| `medium` \| `slow` \| `slow-browser` \| `static` \| `daily` \| `no-store` | only on 200/304 | +| `country`, `execution_region` | `"US"`, `"iad1"` | Vercel-provided | +| `execution_plane` | `"vercel-edge"` | | +| `origin_kind` | `api-key` \| `oauth` \| `browser-same-origin` \| `browser-cross-origin` \| `null` | derived from headers by `deriveOriginKind()` — `mcp` and `internal-cron` exist in the `OriginKind` type for upstream/future use but are not currently emitted on the request path | +| `ua_hash` | SHA-256 of the UA | hashed so PII doesn't land in Axiom | +| `sentry_trace_id` | `"abc123…"` | join key into Sentry | +| `reason` | `ok` \| `origin_403` \| `rate_limit_429` \| `preflight` \| `auth_401` \| `auth_403` \| `tier_403` | `auth_*` distinguishes auth-rejection paths from genuine successes when filtering on `status` alone is ambiguous | + +### `upstream` (one per outbound fetch from a request handler) + +| Field | Example | +|----------------------|--------------------------| +| `request_id` | links back to the parent | +| `provider`, `host` | `"yahoo-finance"`, `"query1.finance.yahoo.com"` | +| `operation` | logical op name set by the helper | +| `status`, `duration_ms`, `request_bytes`, `response_bytes` | | +| `cache_status` | `miss` \| `fresh` \| `stale-while-revalidate` \| `neg-sentinel` | +| `customer_id`, `route`, `tier` | inherited from the inbound request via AsyncLocalStorage | + +## What it answers + +A non-exhaustive list — copy-paste APL queries are in the **Analysis** section below. + +- Per-customer request volume, p50/p95 latency, error rate +- Per-route premium-vs-free traffic mix +- CDN cache-tier distribution per route (calibrate `RPC_CACHE_TIER`) +- Top-of-funnel for noisy abusers (`auth_kind=anon` × `country` × `route`) +- Upstream provider cost per customer (`upstream` join `request` on `request_id`) +- Bearer-vs-API-key vs anon ratio per premium route +- Region heatmaps (`execution_region` × `route`) + +--- + +## Architecture + +``` + ┌─────────────────────────────────────────────────────┐ + │ Vercel Edge handler │ + │ │ + request ──► │ createDomainGateway() │ + │ auth resolution → usage:UsageIdentityInput │ + │ runWithUsageScope({ ctx, customerId, route, … }) │ + │ └─ user handler ── fetchJson / cachedFetch... ─┼─► upstream + │ (reads scope, emits │ API + │ upstream event) │ + │ emitRequest(...) at every return point ──────────┼────► Axiom + │ └─ ctx.waitUntil(emitUsageEvents(...)) │ wm_api_usage + └─────────────────────────────────────────────────────┘ +``` + +Code map: + +| Concern | File | +|----------------------------------------|--------------------------------------------| +| Gateway emit points + identity accumulator | `server/gateway.ts` | +| Identity resolver (pure) | `server/_shared/usage-identity.ts` | +| Event shapes, builders, Axiom sink, breaker, ALS scope | `server/_shared/usage.ts` | +| Upstream-event emission from fetch helpers | `server/_shared/cached-fetch.ts`, `server/_shared/fetch-json.ts` | + +Key invariants: + +1. **Builders accept allowlisted primitives only** — they never accept + `Request`, `Response`, or untyped objects, so future field additions can't + leak by structural impossibility. +2. **`emitRequest()` fires at every gateway return path** — origin block, + OPTIONS, 401/403/404/405, rate-limit 429, ETag 304, success 200, error 500. + Adding a new return path requires adding the emit, or telemetry coverage + silently regresses. +3. **`principal_id` is a hash for secret-bearing auth** (API key, widget key) + so raw secrets never land in Axiom. +4. **Telemetry failure must not affect API availability or latency** — sink is + fire-and-forget with timeout + breaker; any error path drops the event with + a 1%-sampled `console.warn`. + +--- + +## Configuration + +Two env vars control the pipeline. Both are independent of every other system. + +| Var | Required for | Behavior when missing | +|--------------------|--------------|-------------------------------------------| +| `USAGE_TELEMETRY` | Emission | Set to `1` to enable. Anything else → emission is a no-op (zero network calls, zero allocations of the event payload). | +| `AXIOM_API_TOKEN` | Delivery | Events build but `sendToAxiom` short-circuits to a 1%-sampled `[usage-telemetry] drop { reason: 'no-token' }` warning. | + +Vercel project setup: + +1. Axiom → create dataset **`wm_api_usage`** (the constant in + `server/_shared/usage.ts:18`; rename if you want a different name). +2. Axiom → Settings → API Tokens → create an **Ingest** token scoped to that + dataset. Copy the `xaat-…` value. +3. Vercel → Project → Settings → Environment Variables, add for the desired + environments (Production / Preview): + ``` + USAGE_TELEMETRY=1 + AXIOM_API_TOKEN=xaat-... + ``` +4. Redeploy. Axiom infers schema from the first events — no upfront schema + work needed. + +### Failure modes (deploy-with-Axiom-down is safe) + +| Scenario | Behavior | +|---------------------------------------|------------------------------------------------------| +| `USAGE_TELEMETRY` unset | emit is a no-op, identity object is still built but discarded | +| `USAGE_TELEMETRY=1`, no token | event built, `fetch` skipped, sampled warn | +| Axiom returns non-2xx | `recordSample(false)`, sampled warn | +| Axiom timeout (>1.5s) | `AbortController` aborts, sampled warn | +| ≥5% failure ratio over 5min (≥20 samples) | breaker trips → all sends short-circuit until ratio recovers | +| Direct gateway caller passes no `ctx` | emit is a no-op (the `ctx?.waitUntil` guard) | + +### Kill switch + +There is no in-code feature flag separate from the env vars. To disable in +production: set `USAGE_TELEMETRY=0` (or unset it) and redeploy. Existing +in-flight requests drain on the next isolate cycle. + +--- + +## Local development & testing + +### Smoke test without Axiom + +Just run the dev server with neither env var set. Hit any route. The path is +fully exercised — only the Axiom POST is skipped. + +```sh +vercel dev +curl http://localhost:3000/api/seismology/v1/list-earthquakes +``` + +In any non-`production` build, the response carries an `x-usage-telemetry` +header. Use it as a wiring check: + +```sh +curl -sI http://localhost:3000/api/seismology/v1/list-earthquakes | grep -i x-usage +# x-usage-telemetry: off # USAGE_TELEMETRY unset +# x-usage-telemetry: ok # enabled, breaker closed +# x-usage-telemetry: degraded # breaker tripped — Axiom is failing +``` + +### End-to-end with a real Axiom dataset + +```sh +USAGE_TELEMETRY=1 AXIOM_API_TOKEN=xaat-... vercel dev +curl http://localhost:3000/api/market/v1/list-market-quotes?symbols=AAPL +``` + +Then in Axiom: + +```kusto +['wm_api_usage'] +| where _time > ago(2m) +| project _time, route, status, customer_id, auth_kind, tier, duration_ms +``` + +### Automated tests + +Three suites cover the pipeline: + +1. **Identity unit tests** — `server/__tests__/usage-identity.test.ts` cover the + pure `buildUsageIdentity()` resolver across every `auth_kind` branch. +2. **Gateway emit assertions** — `tests/usage-telemetry-emission.test.mts` + stubs `globalThis.fetch` to capture the Axiom POST body and asserts the + `domain`, `customer_id`, `auth_kind`, and `tier` fields end-to-end through + the gateway. +3. **Auth-path regression tests** — `tests/premium-stock-gateway.test.mts` and + `tests/gateway-cdn-origin-policy.test.mts` exercise the gateway without a + `ctx` argument, locking in the "telemetry must not break direct callers" + invariant. + +Run them: + +```sh +npx tsx --test tests/usage-telemetry-emission.test.mts \ + tests/premium-stock-gateway.test.mts \ + tests/gateway-cdn-origin-policy.test.mts +npx vitest run server/__tests__/usage-identity.test.ts +``` + +--- + +## Analysis recipes (Axiom APL) + +All queries assume dataset `wm_api_usage`. Adjust time windows as needed. + +### Per-customer request volume + error rate + +```kusto +['wm_api_usage'] +| where event_type == "request" and _time > ago(24h) +| summarize requests = count(), + errors_5xx = countif(status >= 500), + errors_4xx = countif(status >= 400 and status < 500), + p95_ms = percentile(duration_ms, 95) + by customer_id +| order by requests desc +``` + +### p50 / p95 latency per route + +```kusto +['wm_api_usage'] +| where event_type == "request" and _time > ago(1h) +| summarize p50 = percentile(duration_ms, 50), + p95 = percentile(duration_ms, 95), + n = count() + by route +| where n > 50 +| order by p95 desc +``` + +### Premium vs free traffic mix per route + +```kusto +['wm_api_usage'] +| where event_type == "request" and _time > ago(24h) +| extend tier_bucket = case(tier >= 2, "api+ent", tier == 1, "pro", "free/anon") +| summarize n = count() by route, tier_bucket +| evaluate pivot(tier_bucket, sum(n)) +| order by route asc +``` + +### CDN cache-tier mix per route — calibrates `RPC_CACHE_TIER` + +```kusto +['wm_api_usage'] +| where event_type == "request" and status == 200 and method == "GET" and _time > ago(24h) +| summarize n = count() by route, cache_tier +| evaluate pivot(cache_tier, sum(n)) +| order by route asc +``` + +A route dominated by `slow-browser` that *should* be CDN-cached is a hint to +add an entry to `RPC_CACHE_TIER` in `server/gateway.ts`. + +### Anonymous abuse hotspots + +```kusto +['wm_api_usage'] +| where event_type == "request" and auth_kind == "anon" and _time > ago(1h) +| summarize n = count() by route, country +| where n > 100 +| order by n desc +``` + +### Upstream cost per customer (provider attribution) + +```kusto +['wm_api_usage'] +| where event_type == "upstream" and _time > ago(24h) +| summarize calls = count(), + response_bytes_mb = sum(response_bytes) / 1024.0 / 1024.0, + p95_ms = percentile(duration_ms, 95) + by customer_id, provider +| order by calls desc +``` + +### Cache hit ratio per provider (correctness signal) + +```kusto +['wm_api_usage'] +| where event_type == "upstream" and _time > ago(24h) +| summarize n = count() by provider, cache_status +| evaluate pivot(cache_status, sum(n)) +| extend hit_ratio = (fresh + coalesce(['stale-while-revalidate'], 0)) * 1.0 / (fresh + miss + coalesce(['stale-while-revalidate'], 0)) +| order by hit_ratio asc +``` + +### Sentry × Axiom join + +When Sentry surfaces an exception, copy its trace ID and: + +```kusto +['wm_api_usage'] +| where sentry_trace_id == "" +``` + +…to see the exact request envelope (route, customer, latency, cache outcome). + +### Telemetry health watch + +```kusto +['wm_api_usage'] +| where _time > ago(1h) +| summarize events_per_min = count() by bin(_time, 1m) +| order by _time asc +``` + +A drop to zero with no corresponding traffic drop = breaker tripped or +Vercel/Axiom integration broken — pair it with the `[usage-telemetry] drop` +warns in Vercel logs to find the cause. + +--- + +## Adding new telemetry fields + +1. Add the field to `RequestEvent` (or `UpstreamEvent`) in + `server/_shared/usage.ts`. +2. Extend the corresponding builder (`buildRequestEvent` / + `buildUpstreamEvent`) — only allowlisted primitives, no untyped objects. +3. If the value comes from gateway state, set it on the `usage` accumulator + in `gateway.ts`. Otherwise plumb it through the builder call sites. +4. Axiom auto-discovers the new column on the next ingest. No schema migration. +5. Update this doc's field table. + +## Adding a new gateway return path + +If you add a new `return new Response(...)` inside `createDomainGateway()`, +**you must call `emitRequest(status, reason, cacheTier, resBytes?)` immediately +before it.** Telemetry coverage is enforced by code review, not lint. The +`reason` field uses the existing `RequestReason` union — extend it if the +return represents a new failure class. diff --git a/server/__tests__/usage-identity.test.ts b/server/__tests__/usage-identity.test.ts new file mode 100644 index 000000000..d78535b5c --- /dev/null +++ b/server/__tests__/usage-identity.test.ts @@ -0,0 +1,143 @@ +/** + * Pure-resolver tests for buildUsageIdentity(). + * + * The resolver maps gateway-internal auth state to the four telemetry identity + * fields (auth_kind, principal_id, customer_id, tier). It is intentionally + * pure — no JWT verification, no key hashing of secrets, no I/O — so the + * branch matrix is trivially testable here. + */ + +import { describe, expect, test } from 'vitest'; + +import { buildUsageIdentity, type UsageIdentityInput } from '../_shared/usage-identity'; + +function baseInput(overrides: Partial = {}): UsageIdentityInput { + return { + sessionUserId: null, + isUserApiKey: false, + enterpriseApiKey: null, + widgetKey: null, + clerkOrgId: null, + userApiKeyCustomerRef: null, + tier: null, + ...overrides, + }; +} + +describe('buildUsageIdentity — auth_kind branches', () => { + test('user_api_key takes precedence over every other signal', () => { + const ident = buildUsageIdentity(baseInput({ + isUserApiKey: true, + sessionUserId: 'user_123', + userApiKeyCustomerRef: 'customer_abc', + enterpriseApiKey: 'should-be-ignored', + widgetKey: 'should-be-ignored', + tier: 2, + })); + expect(ident.auth_kind).toBe('user_api_key'); + expect(ident.principal_id).toBe('user_123'); + expect(ident.customer_id).toBe('customer_abc'); + expect(ident.tier).toBe(2); + }); + + test('user_api_key falls back to sessionUserId for customer_id when no explicit ref', () => { + const ident = buildUsageIdentity(baseInput({ + isUserApiKey: true, + sessionUserId: 'user_123', + tier: 1, + })); + expect(ident.customer_id).toBe('user_123'); + }); + + test('clerk_jwt: customer_id prefers org over user when org is present', () => { + const ident = buildUsageIdentity(baseInput({ + sessionUserId: 'user_123', + clerkOrgId: 'org_acme', + tier: 1, + })); + expect(ident.auth_kind).toBe('clerk_jwt'); + expect(ident.principal_id).toBe('user_123'); + expect(ident.customer_id).toBe('org_acme'); + expect(ident.tier).toBe(1); + }); + + test('clerk_jwt: customer_id falls back to user when no org', () => { + const ident = buildUsageIdentity(baseInput({ + sessionUserId: 'user_123', + })); + expect(ident.customer_id).toBe('user_123'); + expect(ident.tier).toBe(0); + }); + + test('enterprise_api_key: principal_id is hashed, not raw', () => { + const ident = buildUsageIdentity(baseInput({ + enterpriseApiKey: 'wm_super_secret_key', + tier: 3, + })); + expect(ident.auth_kind).toBe('enterprise_api_key'); + expect(ident.principal_id).not.toBe('wm_super_secret_key'); + expect(ident.principal_id).toMatch(/^[0-9a-z]+$/); + // Customer is the unmapped sentinel until a real entry is added to ENTERPRISE_KEY_TO_CUSTOMER + expect(ident.customer_id).toBe('enterprise-unmapped'); + expect(ident.tier).toBe(3); + }); + + test('widget_key: customer_id is the widget key itself, principal_id is hashed', () => { + const ident = buildUsageIdentity(baseInput({ + widgetKey: 'widget_pub_xyz', + })); + expect(ident.auth_kind).toBe('widget_key'); + expect(ident.customer_id).toBe('widget_pub_xyz'); + expect(ident.principal_id).not.toBe('widget_pub_xyz'); + expect(ident.principal_id).toMatch(/^[0-9a-z]+$/); + expect(ident.tier).toBe(0); + }); + + test('anon: every field null, tier always zero', () => { + const ident = buildUsageIdentity(baseInput()); + expect(ident.auth_kind).toBe('anon'); + expect(ident.principal_id).toBeNull(); + expect(ident.customer_id).toBeNull(); + expect(ident.tier).toBe(0); + }); + + test('anon: tier coerces to 0 even if input.tier was set (defensive)', () => { + // No identity signal but a leftover tier value should not show up as a mystery free row. + const ident = buildUsageIdentity(baseInput({ tier: 99 })); + expect(ident.tier).toBe(0); + }); +}); + +describe('buildUsageIdentity — tier handling', () => { + test('null tier coerces to 0 for non-anon kinds', () => { + const ident = buildUsageIdentity(baseInput({ sessionUserId: 'u', tier: null })); + expect(ident.tier).toBe(0); + }); + + test('zero tier is preserved (not promoted)', () => { + const ident = buildUsageIdentity(baseInput({ sessionUserId: 'u', tier: 0 })); + expect(ident.tier).toBe(0); + }); + + test('integer tiers pass through unchanged', () => { + for (const t of [0, 1, 2, 3]) { + const ident = buildUsageIdentity(baseInput({ sessionUserId: 'u', tier: t })); + expect(ident.tier).toBe(t); + } + }); +}); + +describe('buildUsageIdentity — secret handling', () => { + test('enterprise key never appears verbatim in any output field', () => { + const secret = 'wm_ent_LEAKY_VALUE_DO_NOT_LOG'; + const ident = buildUsageIdentity(baseInput({ enterpriseApiKey: secret })); + expect(JSON.stringify(ident)).not.toContain(secret); + }); + + test('widget key appears as customer_id (intentional — widget keys are public)', () => { + // Widget keys are embeds installed on third-party sites; treating them as + // customer attribution is the contract documented in usage-identity.ts:73-79. + const ident = buildUsageIdentity(baseInput({ widgetKey: 'widget_public_xyz' })); + expect(ident.customer_id).toBe('widget_public_xyz'); + }); +}); diff --git a/server/_shared/auth-session.ts b/server/_shared/auth-session.ts index e5a457443..081e31844 100644 --- a/server/_shared/auth-session.ts +++ b/server/_shared/auth-session.ts @@ -15,13 +15,18 @@ import { jwtVerify } from 'jose'; import { getClerkJwtVerifyOptions, getJWKS } from '../auth-session'; +export interface ClerkSession { + userId: string; + orgId: string | null; +} + /** * Extracts and verifies a bearer token from the request. - * Returns the userId (sub claim) on success, null on any failure. + * Returns { userId, orgId } on success, null on any failure. * * Fail-open: errors are logged but never thrown. */ -export async function resolveSessionUserId(request: Request): Promise { +export async function resolveClerkSession(request: Request): Promise { try { const authHeader = request.headers.get('Authorization'); if (!authHeader?.startsWith('Bearer ')) return null; @@ -38,7 +43,19 @@ export async function resolveSessionUserId(request: Request): Promise).org as + | Record + | undefined; + const orgId = + (typeof orgClaim?.id === 'string' ? orgClaim.id : null) ?? + (typeof (payload as Record).org_id === 'string' + ? ((payload as Record).org_id as string) + : null); + + return { userId, orgId }; } catch (err) { console.warn( '[auth-session] JWT verification failed:', @@ -47,3 +64,11 @@ export async function resolveSessionUserId(request: Request): Promise { + const session = await resolveClerkSession(request); + return session?.userId ?? null; +} diff --git a/server/_shared/fetch-json.ts b/server/_shared/fetch-json.ts index 2dcf3db5f..7b525b505 100644 --- a/server/_shared/fetch-json.ts +++ b/server/_shared/fetch-json.ts @@ -1,14 +1,28 @@ import { CHROME_UA } from './constants'; +import type { UsageHook } from './redis'; +import { buildUpstreamEvent, getUsageScope, sendToAxiom } from './usage'; interface FetchJsonOptions { timeoutMs?: number; headers?: Record; + /** + * Provider attribution for usage telemetry. When set, an upstream event + * is emitted for this call. Leaves request_id / customer_id / route / tier + * to flow implicitly from the gateway-set UsageScope (issue #3381). + */ + provider?: string; + operation?: string; + /** Escape hatch for callers outside a request scope. Rarely needed. */ + usage?: UsageHook; } export async function fetchJson( url: string, options: FetchJsonOptions = {}, ): Promise { + const t0 = Date.now(); + let status = 0; + let responseBytes = 0; try { const response = await fetch(url, { headers: { @@ -18,9 +32,57 @@ export async function fetchJson( }, signal: AbortSignal.timeout(options.timeoutMs ?? 8_000), }); + status = response.status; if (!response.ok) return null; - return await response.json() as T; + const text = await response.text(); + responseBytes = text.length; + return JSON.parse(text) as T; } catch { return null; + } finally { + // Emit only when the caller has labeled the provider — avoids polluting + // the dataset with "unknown" rows from internal/utility fetches. + const provider = options.usage?.provider ?? options.provider; + const operation = options.usage?.operation ?? options.operation ?? 'fetch'; + if (provider) { + const durationMs = Date.now() - t0; + const explicit = options.usage; + const host = explicit?.host ?? safeHost(url); + // Single waitUntil() registered synchronously here — no nested + // ctx.waitUntil() inside the Axiom delivery (Edge runtimes may drop + // the outer registration after the response phase ends). Static + // import keeps the emit path on the hot path. + const scope = getUsageScope(); + const ctx = explicit?.ctx ?? scope?.ctx; + if (ctx) { + const event = buildUpstreamEvent({ + requestId: explicit?.requestId ?? scope?.requestId ?? '', + customerId: explicit?.customerId ?? scope?.customerId ?? null, + route: explicit?.route ?? scope?.route ?? '', + tier: explicit?.tier ?? scope?.tier ?? 0, + provider, + operation, + host, + status, + durationMs, + requestBytes: 0, + responseBytes, + cacheStatus: 'miss', + }); + try { + ctx.waitUntil(sendToAxiom([event])); + } catch { + /* telemetry must never throw */ + } + } + } + } +} + +function safeHost(url: string): string { + try { + return new URL(url).host; + } catch { + return ''; } } diff --git a/server/_shared/redis.ts b/server/_shared/redis.ts index 7a29985ee..19030d096 100644 --- a/server/_shared/redis.ts +++ b/server/_shared/redis.ts @@ -1,4 +1,5 @@ import { unwrapEnvelope } from './seed-envelope'; +import { buildUpstreamEvent, getUsageScope, sendToAxiom } from './usage'; const REDIS_OP_TIMEOUT_MS = 1_500; const REDIS_PIPELINE_TIMEOUT_MS = 5_000; @@ -288,6 +289,31 @@ export async function cachedFetchJson( return promise; } +/** + * Per-call usage-telemetry hook for upstream event emission (issue #3381). + * + * The only required field is `provider` — its presence is what tells the + * helper "emit an upstream event for this call." Everything else is filled + * in by the gateway-set UsageScope (request_id, customer_id, route, tier, + * ctx) via AsyncLocalStorage. Pass overrides explicitly if you need to. + * + * Use this when calling fetchJson / cachedFetchJsonWithMeta from a code + * path that runs inside a gateway-handled request. For helpers used + * outside any request (cron, scripts), no scope exists and emission is + * skipped silently. + */ +export interface UsageHook { + provider: string; + operation?: string; + host?: string; + // Overrides — leave unset to inherit from gateway-set UsageScope. + ctx?: { waitUntil: (p: Promise) => void }; + requestId?: string; + customerId?: string | null; + route?: string; + tier?: number; +} + /** * Like cachedFetchJson but reports the data source. * Use when callers need to distinguish cache hits from fresh fetches @@ -296,12 +322,17 @@ export async function cachedFetchJson( * Returns { data, source } where source is: * 'cache' — served from Redis * 'fresh' — fetcher ran (leader) or joined an in-flight fetch (follower) + * + * If `opts.usage` is supplied, an upstream event is emitted on the fresh + * path (issue #3381). Pass-through for callers that don't care about + * telemetry — backwards-compatible. */ export async function cachedFetchJsonWithMeta( key: string, ttlSeconds: number, fetcher: () => Promise, negativeTtlSeconds = 120, + opts?: { usage?: UsageHook }, ): Promise<{ data: T | null; source: 'cache' | 'fresh' }> { const cached = await getCachedJson(key); if (cached === NEG_SENTINEL) return { data: null, source: 'cache' }; @@ -313,16 +344,30 @@ export async function cachedFetchJsonWithMeta( return { data, source: 'fresh' }; } + const fetchT0 = Date.now(); + let upstreamStatus = 0; + let cacheStatus: 'miss' | 'neg-sentinel' = 'miss'; + const promise = fetcher() .then(async (result) => { + // Only count an upstream call as a 200 when it actually returned data. + // A null result triggers the neg-sentinel branch below — these are + // empty/failed upstream calls and must NOT show up as `status=200` in + // dashboards (would poison the cache-hit-ratio recipe and per-provider + // error rates). Use status=0 for the empty branch; cache_status carries + // the structural detail. if (result != null) { + upstreamStatus = 200; await setCachedJson(key, result, ttlSeconds); } else { + upstreamStatus = 0; + cacheStatus = 'neg-sentinel'; await setCachedJson(key, NEG_SENTINEL, negativeTtlSeconds); } return result; }) .catch((err: unknown) => { + upstreamStatus = 0; console.warn(`[redis] cachedFetchJsonWithMeta fetcher failed for "${key}":`, errMsg(err)); throw err; }) @@ -331,10 +376,50 @@ export async function cachedFetchJsonWithMeta( }); inflight.set(key, promise); - const data = await promise; + let data: T | null; + try { + data = await promise; + } finally { + emitUpstreamFromHook(opts?.usage, upstreamStatus, Date.now() - fetchT0, cacheStatus); + } return { data, source: 'fresh' }; } +function emitUpstreamFromHook( + usage: UsageHook | undefined, + status: number, + durationMs: number, + cacheStatus: 'miss' | 'fresh' | 'stale-while-revalidate' | 'neg-sentinel', +): void { + // Emit only when caller labels the provider — avoids "unknown" pollution. + if (!usage?.provider) return; + // Single waitUntil() registered synchronously here — no nested + // ctx.waitUntil() inside Axiom delivery. Static import keeps the call + // synchronous so the runtime registers it during the request phase. + const scope = getUsageScope(); + const ctx = usage.ctx ?? scope?.ctx; + if (!ctx) return; + const event = buildUpstreamEvent({ + requestId: usage.requestId ?? scope?.requestId ?? '', + customerId: usage.customerId ?? scope?.customerId ?? null, + route: usage.route ?? scope?.route ?? '', + tier: usage.tier ?? scope?.tier ?? 0, + provider: usage.provider, + operation: usage.operation ?? 'fetch', + host: usage.host ?? '', + status, + durationMs, + requestBytes: 0, + responseBytes: 0, + cacheStatus, + }); + try { + ctx.waitUntil(sendToAxiom([event])); + } catch { + /* telemetry must never throw */ + } +} + export async function geoSearchByBox( key: string, lon: number, lat: number, widthKm: number, heightKm: number, count: number, raw = false, diff --git a/server/_shared/usage-identity.ts b/server/_shared/usage-identity.ts new file mode 100644 index 000000000..aa14b86b1 --- /dev/null +++ b/server/_shared/usage-identity.ts @@ -0,0 +1,108 @@ +/** + * Pure resolver: maps gateway-internal auth state to a UsageIdentity event field set. + * + * MUST NOT re-verify JWTs, re-hash keys, or re-validate API keys. The gateway has + * already done that work — this function consumes the resolved values. + * + * Tier is the user's current entitlement tier (0 = free / unknown). For non-tier-gated + * endpoints the gateway never resolves it, so we accept null/undefined and report 0. + */ + +export type AuthKind = + | 'clerk_jwt' + | 'user_api_key' + | 'enterprise_api_key' + | 'widget_key' + | 'anon'; + +export interface UsageIdentity { + auth_kind: AuthKind; + principal_id: string | null; + customer_id: string | null; + tier: number; +} + +export interface UsageIdentityInput { + sessionUserId: string | null; + isUserApiKey: boolean; + enterpriseApiKey: string | null; + widgetKey: string | null; + clerkOrgId: string | null; + userApiKeyCustomerRef: string | null; + tier: number | null; +} + +// Static enterprise-key → customer map. Explicit so attribution is reviewable in code, +// not floating in env vars. Add entries here as enterprise customers are onboarded. +// The hash (not the raw key) is used as principal_id so logs never leak the secret. +const ENTERPRISE_KEY_TO_CUSTOMER: Record = { + // 'wm_ent_xxxx': 'acme-corp', +}; + +export function buildUsageIdentity(input: UsageIdentityInput): UsageIdentity { + const tier = input.tier ?? 0; + + if (input.isUserApiKey) { + return { + auth_kind: 'user_api_key', + principal_id: input.sessionUserId, + customer_id: input.userApiKeyCustomerRef ?? input.sessionUserId, + tier, + }; + } + + if (input.sessionUserId) { + return { + auth_kind: 'clerk_jwt', + principal_id: input.sessionUserId, + customer_id: input.clerkOrgId ?? input.sessionUserId, + tier, + }; + } + + if (input.enterpriseApiKey) { + const customer = ENTERPRISE_KEY_TO_CUSTOMER[input.enterpriseApiKey] ?? 'enterprise-unmapped'; + return { + auth_kind: 'enterprise_api_key', + principal_id: hashKeySync(input.enterpriseApiKey), + customer_id: customer, + tier, + }; + } + + if (input.widgetKey) { + return { + auth_kind: 'widget_key', + principal_id: hashKeySync(input.widgetKey), + customer_id: input.widgetKey, + tier, + }; + } + + return { + auth_kind: 'anon', + principal_id: null, + customer_id: null, + tier: 0, + }; +} + +// 64-bit FNV-1a (two-round, XOR-folded) — non-cryptographic, only used to +// avoid logging raw key material. Edge crypto.subtle.digest is async; we +// want a sync helper for the hot path. Two rounds with different seeds give +// ~64 bits of state, dropping birthday-collision risk well below the +// widget-key population horizon (32-bit collides ~65k keys). +function hashKeySync(key: string): string { + let h1 = 2166136261; + let h2 = 0x811c9dc5 ^ 0xa3b2c1d4; + for (let i = 0; i < key.length; i++) { + const c = key.charCodeAt(i); + h1 ^= c; + h1 = Math.imul(h1, 16777619); + h2 ^= c + 0x9e3779b1; + h2 = Math.imul(h2, 16777619); + } + const lo = (h1 >>> 0).toString(36); + const hi = (h2 >>> 0).toString(36); + return `${hi}${lo}`; +} diff --git a/server/_shared/usage.ts b/server/_shared/usage.ts new file mode 100644 index 000000000..c06590eb0 --- /dev/null +++ b/server/_shared/usage.ts @@ -0,0 +1,421 @@ +/** + * Axiom-based API usage observability — emit-side primitives. + * + * - Builders accept allowlisted primitives only. Never accept Request, Response, + * or untyped objects: future field additions then leak by structural impossibility. + * - emitUsageEvents fires via ctx.waitUntil so the Edge isolate cannot tear down + * the unflushed POST. Direct fetch, 1.5s timeout, no retry. + * - Circuit breaker (5% failure / 5min sliding window) trips when delivery is broken. + * - Tripping logs once via console.error; drops thereafter are 1%-sampled console.warn. + * - Telemetry failure must not affect API availability or latency. + * + * Scoped to USAGE attribution. Sentry-edge already covers exceptions — do NOT + * emit error tracebacks here. Cross-link via sentry_trace_id field instead. + */ + +import type { AuthKind } from './usage-identity'; + +const AXIOM_DATASET = 'wm_api_usage'; +// US region endpoint. EU workspaces would use api.eu.axiom.co. +const AXIOM_INGEST_URL = `https://api.axiom.co/v1/datasets/${AXIOM_DATASET}/ingest`; +const TELEMETRY_TIMEOUT_MS = 1_500; + +const CB_WINDOW_MS = 5 * 60 * 1_000; +const CB_TRIP_FAILURE_RATIO = 0.05; +const CB_MIN_SAMPLES = 20; +const SAMPLED_DROP_LOG_RATE = 0.01; + +function isUsageEnabled(): boolean { + return process.env.USAGE_TELEMETRY === '1'; +} + +function isDevHeaderEnabled(): boolean { + return process.env.NODE_ENV !== 'production'; +} + +// ---------- Event shapes ---------- + +export type CacheTier = + | 'fast' + | 'medium' + | 'slow' + | 'slow-browser' + | 'static' + | 'daily' + | 'no-store' + | 'live'; + +export type CacheStatus = 'miss' | 'fresh' | 'stale-while-revalidate' | 'neg-sentinel'; + +export type ExecutionPlane = 'vercel-edge' | 'vercel-node' | 'railway-relay'; + +export type OriginKind = + | 'browser-same-origin' + | 'browser-cross-origin' + | 'api-key' + | 'oauth' + | 'mcp' + | 'internal-cron'; + +export type RequestReason = + | 'ok' + | 'origin_403' + | 'rate_limit_429' + | 'preflight' + | 'auth_401' + | 'auth_403' + | 'tier_403'; + +export interface RequestEvent { + _time: string; + event_type: 'request'; + request_id: string; + domain: string; + route: string; + method: string; + status: number; + duration_ms: number; + req_bytes: number; + res_bytes: number; + customer_id: string | null; + principal_id: string | null; + auth_kind: AuthKind; + tier: number; + country: string | null; + execution_region: string | null; + execution_plane: ExecutionPlane; + origin_kind: OriginKind | null; + cache_tier: CacheTier | null; + ua_hash: string | null; + sentry_trace_id: string | null; + reason: RequestReason; +} + +export interface UpstreamEvent { + _time: string; + event_type: 'upstream'; + request_id: string; + customer_id: string | null; + route: string; + tier: number; + provider: string; + operation: string; + host: string; + status: number; + duration_ms: number; + request_bytes: number; + response_bytes: number; + cache_status: CacheStatus; +} + +export type UsageEvent = RequestEvent | UpstreamEvent; + +// ---------- Builders (allowlisted primitives only) ---------- + +export function buildRequestEvent(p: { + requestId: string; + domain: string; + route: string; + method: string; + status: number; + durationMs: number; + reqBytes: number; + resBytes: number; + customerId: string | null; + principalId: string | null; + authKind: AuthKind; + tier: number; + country: string | null; + executionRegion: string | null; + executionPlane: ExecutionPlane; + originKind: OriginKind | null; + cacheTier: CacheTier | null; + uaHash: string | null; + sentryTraceId: string | null; + reason: RequestReason; +}): RequestEvent { + return { + _time: new Date().toISOString(), + event_type: 'request', + request_id: p.requestId, + domain: p.domain, + route: p.route, + method: p.method, + status: p.status, + duration_ms: p.durationMs, + req_bytes: p.reqBytes, + res_bytes: p.resBytes, + customer_id: p.customerId, + principal_id: p.principalId, + auth_kind: p.authKind, + tier: p.tier, + country: p.country, + execution_region: p.executionRegion, + execution_plane: p.executionPlane, + origin_kind: p.originKind, + cache_tier: p.cacheTier, + ua_hash: p.uaHash, + sentry_trace_id: p.sentryTraceId, + reason: p.reason, + }; +} + +export function buildUpstreamEvent(p: { + requestId: string; + customerId: string | null; + route: string; + tier: number; + provider: string; + operation: string; + host: string; + status: number; + durationMs: number; + requestBytes: number; + responseBytes: number; + cacheStatus: CacheStatus; +}): UpstreamEvent { + return { + _time: new Date().toISOString(), + event_type: 'upstream', + request_id: p.requestId, + customer_id: p.customerId, + route: p.route, + tier: p.tier, + provider: p.provider, + operation: p.operation, + host: p.host, + status: p.status, + duration_ms: p.durationMs, + request_bytes: p.requestBytes, + response_bytes: p.responseBytes, + cache_status: p.cacheStatus, + }; +} + +// ---------- Header-derived helpers (ok to take Request — these only read primitives) ---------- + +export function deriveRequestId(req: Request): string { + return req.headers.get('x-vercel-id') ?? ''; +} + +export function deriveExecutionRegion(req: Request): string | null { + const id = req.headers.get('x-vercel-id'); + if (!id) return null; + const sep = id.indexOf('::'); + return sep > 0 ? id.slice(0, sep) : null; +} + +export function deriveCountry(req: Request): string | null { + return ( + req.headers.get('x-vercel-ip-country') ?? + req.headers.get('cf-ipcountry') ?? + null + ); +} + +export function deriveReqBytes(req: Request): number { + const len = req.headers.get('content-length'); + if (!len) return 0; + const n = Number(len); + return Number.isFinite(n) && n >= 0 ? n : 0; +} + +export function deriveSentryTraceId(req: Request): string | null { + return req.headers.get('sentry-trace') ?? null; +} + +// ua_hash: SHA-256(UA + monthly-rotated pepper). Pepper key: USAGE_UA_PEPPER. +// If the pepper is unset we return null rather than a stable per-browser fingerprint. +export async function deriveUaHash(req: Request): Promise { + const pepper = process.env.USAGE_UA_PEPPER; + if (!pepper) return null; + const ua = req.headers.get('user-agent') ?? ''; + if (!ua) return null; + const data = new TextEncoder().encode(`${pepper}|${ua}`); + const buf = await crypto.subtle.digest('SHA-256', data); + return Array.from(new Uint8Array(buf), (b) => b.toString(16).padStart(2, '0')).join(''); +} + +export function deriveOriginKind(req: Request): OriginKind | null { + const origin = req.headers.get('origin') ?? ''; + const hasApiKey = + req.headers.has('x-worldmonitor-key') || req.headers.has('x-api-key'); + const hasBearer = (req.headers.get('authorization') ?? '').startsWith('Bearer '); + if (hasApiKey) return 'api-key'; + if (hasBearer) return 'oauth'; + if (!origin) return null; + try { + const host = new URL(origin).host; + const reqHost = new URL(req.url).host; + return host === reqHost ? 'browser-same-origin' : 'browser-cross-origin'; + } catch { + return 'browser-cross-origin'; + } +} + +// ---------- Circuit breaker ---------- + +interface BreakerSample { + ts: number; + ok: boolean; +} + +const breakerSamples: BreakerSample[] = []; +let breakerTripped = false; +let breakerLastNotifyTs = 0; + +function pruneOldSamples(now: number): void { + while (breakerSamples.length > 0 && now - breakerSamples[0]!.ts > CB_WINDOW_MS) { + breakerSamples.shift(); + } +} + +function recordSample(ok: boolean): void { + const now = Date.now(); + pruneOldSamples(now); + breakerSamples.push({ ts: now, ok }); + + if (breakerSamples.length < CB_MIN_SAMPLES) { + breakerTripped = false; + return; + } + let failures = 0; + for (const s of breakerSamples) if (!s.ok) failures++; + const ratio = failures / breakerSamples.length; + const wasTripped = breakerTripped; + breakerTripped = ratio > CB_TRIP_FAILURE_RATIO; + + if (breakerTripped && !wasTripped && now - breakerLastNotifyTs > CB_WINDOW_MS) { + breakerLastNotifyTs = now; + console.error('[usage-telemetry] circuit breaker tripped', { + ratio: ratio.toFixed(3), + samples: breakerSamples.length, + }); + } +} + +export function getTelemetryHealth(): 'ok' | 'degraded' | 'off' { + if (!isUsageEnabled()) return 'off'; + return breakerTripped ? 'degraded' : 'ok'; +} + +export function maybeAttachDevHealthHeader(headers: Headers): void { + if (!isDevHeaderEnabled()) return; + headers.set('x-usage-telemetry', getTelemetryHealth()); +} + +// ---------- Implicit request scope (AsyncLocalStorage) ---------- +// +// Per koala's review (#3381), this lets fetch helpers emit upstream events +// without leaf handlers having to thread a usage hook through every call. +// The gateway sets the scope before invoking matchedHandler; fetch helpers +// (fetchJson, cachedFetchJsonWithMeta) read from it lazily. +// +// AsyncLocalStorage is loaded defensively. If the runtime ever rejects the +// import (older Edge versions, sandboxed contexts), the scope helpers +// degrade to no-ops and telemetry simply skips. The gateway request event +// is unaffected — it never depended on ALS. + +export interface UsageScope { + ctx: WaitUntilCtx; + requestId: string; + customerId: string | null; + route: string; + tier: number; +} + +type ALSLike = { + run: (store: T, fn: () => R) => R; + getStore: () => T | undefined; +}; + +let scopeStore: ALSLike | null = null; + +async function getScopeStore(): Promise | null> { + if (scopeStore) return scopeStore; + try { + const mod = await import('node:async_hooks'); + scopeStore = new mod.AsyncLocalStorage(); + return scopeStore; + } catch { + return null; + } +} + +export async function runWithUsageScope(scope: UsageScope, fn: () => R | Promise): Promise { + const store = await getScopeStore(); + if (!store) return fn(); + return store.run(scope, fn) as R | Promise; +} + +export function getUsageScope(): UsageScope | undefined { + return scopeStore?.getStore(); +} + +// ---------- Sink ---------- + +export async function sendToAxiom(events: UsageEvent[]): Promise { + if (!isUsageEnabled()) return; + if (events.length === 0) return; + const token = process.env.AXIOM_API_TOKEN; + if (!token) { + if (Math.random() < SAMPLED_DROP_LOG_RATE) { + console.warn('[usage-telemetry] drop', { reason: 'no-token' }); + } + return; + } + if (breakerTripped) { + if (Math.random() < SAMPLED_DROP_LOG_RATE) { + console.warn('[usage-telemetry] drop', { reason: 'breaker-open' }); + } + return; + } + + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), TELEMETRY_TIMEOUT_MS); + try { + const resp = await fetch(AXIOM_INGEST_URL, { + method: 'POST', + headers: { + Authorization: `Bearer ${token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(events), + signal: controller.signal, + }); + if (!resp.ok) { + recordSample(false); + if (Math.random() < SAMPLED_DROP_LOG_RATE) { + console.warn('[usage-telemetry] drop', { reason: `http-${resp.status}` }); + } + return; + } + recordSample(true); + } catch (err) { + recordSample(false); + if (Math.random() < SAMPLED_DROP_LOG_RATE) { + const reason = err instanceof Error && err.name === 'AbortError' ? 'timeout' : 'fetch-error'; + console.warn('[usage-telemetry] drop', { reason }); + } + } finally { + clearTimeout(timer); + } +} + +export interface WaitUntilCtx { + waitUntil: (p: Promise) => void; +} + +export function emitUsageEvents(ctx: WaitUntilCtx, events: UsageEvent[]): void { + if (!isUsageEnabled() || events.length === 0) return; + ctx.waitUntil(sendToAxiom(events)); +} + +// Variant that returns the in-flight delivery promise instead of registering +// it on a context. Use when the caller is already inside a single +// ctx.waitUntil() chain and wants to await delivery synchronously to avoid a +// nested waitUntil registration (which Edge runtimes may drop). +export function deliverUsageEvents(events: UsageEvent[]): Promise { + if (!isUsageEnabled() || events.length === 0) return Promise.resolve(); + return sendToAxiom(events); +} diff --git a/server/alias-rewrite.ts b/server/alias-rewrite.ts index bf561f353..cda327824 100644 --- a/server/alias-rewrite.ts +++ b/server/alias-rewrite.ts @@ -13,10 +13,13 @@ * * Trivially deleted when v1 retires — just `rm` the alias files. */ +type GatewayCtx = { waitUntil: (p: Promise) => void }; + export async function rewriteToSebuf( req: Request, newPath: string, - gateway: (req: Request) => Promise, + gateway: (req: Request, ctx: GatewayCtx) => Promise, + ctx: GatewayCtx, ): Promise { const url = new URL(req.url); url.pathname = newPath; @@ -27,5 +30,5 @@ export async function rewriteToSebuf( headers: req.headers, body, }); - return gateway(rewritten); + return gateway(rewritten, ctx); } diff --git a/server/gateway.ts b/server/gateway.ts index 45f566adc..e8e6a5e70 100644 --- a/server/gateway.ts +++ b/server/gateway.ts @@ -17,7 +17,23 @@ import { mapErrorToResponse } from './error-mapper'; import { checkRateLimit, checkEndpointRateLimit, hasEndpointRatePolicy } from './_shared/rate-limit'; import { drainResponseHeaders } from './_shared/response-headers'; import { checkEntitlement, getRequiredTier, getEntitlements } from './_shared/entitlement-check'; -import { resolveSessionUserId } from './_shared/auth-session'; +import { resolveClerkSession } from './_shared/auth-session'; +import { buildUsageIdentity, type UsageIdentityInput } from './_shared/usage-identity'; +import { + deliverUsageEvents, + buildRequestEvent, + deriveRequestId, + deriveExecutionRegion, + deriveCountry, + deriveReqBytes, + deriveSentryTraceId, + deriveOriginKind, + deriveUaHash, + maybeAttachDevHealthHeader, + runWithUsageScope, + type CacheTier as UsageCacheTier, + type RequestReason, +} from './_shared/usage'; import type { ServerOptions } from '../src/generated/server/worldmonitor/seismology/v1/service_server'; export const serverOptions: ServerOptions = { onError: mapErrorToResponse }; @@ -283,18 +299,86 @@ import { PREMIUM_RPC_PATHS } from '../src/shared/premium-paths'; * Applies the full gateway pipeline: origin check → CORS → OPTIONS preflight → * API key → rate limit → route match (with POST→GET compat) → execute → cache headers. */ +export type GatewayCtx = { waitUntil: (p: Promise) => void }; + export function createDomainGateway( routes: RouteDescriptor[], -): (req: Request) => Promise { +): (req: Request, ctx?: GatewayCtx) => Promise { const router = createRouter(routes); - return async function handler(originalRequest: Request): Promise { + return async function handler(originalRequest: Request, ctx?: GatewayCtx): Promise { let request = originalRequest; const rawPathname = new URL(request.url).pathname; const pathname = rawPathname.length > 1 ? rawPathname.replace(/\/+$/, '') : rawPathname; + const t0 = Date.now(); + + // Usage-telemetry identity inputs — accumulated as gateway auth resolution progresses. + // Read at every return point; null/0 defaults are valid for early returns. + // + // x-widget-key is intentionally NOT trusted here: a header is attacker- + // controllable, and emitting it as `customer_id` would let unauthenticated + // callers poison per-customer dashboards (per koala #3403 review). We only + // populate `widgetKey` after validating it against the configured + // WIDGET_AGENT_KEY — same check used in api/widget-agent.ts. + const rawWidgetKey = request.headers.get('x-widget-key') ?? null; + const widgetAgentKey = process.env.WIDGET_AGENT_KEY ?? ''; + const validatedWidgetKey = + rawWidgetKey && widgetAgentKey && rawWidgetKey === widgetAgentKey ? rawWidgetKey : null; + const usage: UsageIdentityInput = { + sessionUserId: null, + isUserApiKey: false, + enterpriseApiKey: null, + widgetKey: validatedWidgetKey, + clerkOrgId: null, + userApiKeyCustomerRef: null, + tier: null, + }; + // Domain segment for telemetry. Path layouts: + // /api//v1/ → parts[2] = domain + // /api/v2// → parts[2] = "v2", parts[3] = domain + const _parts = pathname.split('/'); + const domain = (/^v\d+$/.test(_parts[2] ?? '') ? _parts[3] : _parts[2]) ?? ''; + const reqBytes = deriveReqBytes(request); + + function emitRequest(status: number, reason: RequestReason, cacheTier: UsageCacheTier | null, resBytes = 0): void { + if (!ctx?.waitUntil) return; + const identity = buildUsageIdentity(usage); + // Single ctx.waitUntil() registered synchronously in the request phase. + // The IIFE awaits ua_hash (SHA-256) then awaits delivery directly via + // deliverUsageEvents — no nested waitUntil call, which Edge runtimes + // (Cloudflare/Vercel) may drop after the response phase ends. + ctx.waitUntil((async () => { + const uaHash = await deriveUaHash(originalRequest); + await deliverUsageEvents([ + buildRequestEvent({ + requestId: deriveRequestId(originalRequest), + domain, + route: pathname, + method: originalRequest.method, + status, + durationMs: Date.now() - t0, + reqBytes, + resBytes, + customerId: identity.customer_id, + principalId: identity.principal_id, + authKind: identity.auth_kind, + tier: identity.tier, + country: deriveCountry(originalRequest), + executionRegion: deriveExecutionRegion(originalRequest), + executionPlane: 'vercel-edge', + originKind: deriveOriginKind(originalRequest), + cacheTier, + uaHash, + sentryTraceId: deriveSentryTraceId(originalRequest), + reason, + }), + ]); + })()); + } // Origin check — skip CORS headers for disallowed origins if (isDisallowedOrigin(request)) { + emitRequest(403, 'origin_403', null); return new Response(JSON.stringify({ error: 'Origin not allowed' }), { status: 403, headers: { 'Content-Type': 'application/json' }, @@ -310,6 +394,7 @@ export function createDomainGateway( // OPTIONS preflight if (request.method === 'OPTIONS') { + emitRequest(204, 'preflight', null); return new Response(null, { status: 204, headers: corsHeaders }); } @@ -322,7 +407,10 @@ export function createDomainGateway( // Only runs for tier-gated endpoints to avoid JWKS lookup on every request. let sessionUserId: string | null = null; if (isTierGated) { - sessionUserId = await resolveSessionUserId(request); + const session = await resolveClerkSession(request); + sessionUserId = session?.userId ?? null; + usage.sessionUserId = sessionUserId; + usage.clerkOrgId = session?.orgId ?? null; if (sessionUserId) { request = new Request(request.url, { method: request.method, @@ -354,10 +442,13 @@ export function createDomainGateway( const userKeyResult = await validateUserApiKey(wmKey); if (userKeyResult) { isUserApiKey = true; + usage.isUserApiKey = true; + usage.userApiKeyCustomerRef = userKeyResult.userId; keyCheck = { valid: true, required: true }; // Inject x-user-id for downstream entitlement checks if (!sessionUserId) { sessionUserId = userKeyResult.userId; + usage.sessionUserId = sessionUserId; request = new Request(request.url, { method: request.method, headers: (() => { @@ -371,11 +462,19 @@ export function createDomainGateway( } } + // Enterprise API key (WORLDMONITOR_VALID_KEYS): keyCheck.valid + wmKey present + // and not a wm_-prefixed user key. + if (keyCheck.valid && wmKey && !isUserApiKey && !wmKey.startsWith('wm_')) { + usage.enterpriseApiKey = wmKey; + } + // User API keys on PREMIUM_RPC_PATHS need verified pro-tier entitlement. // Admin keys (WORLDMONITOR_VALID_KEYS) bypass this since they are operator-issued. if (isUserApiKey && needsLegacyProBearerGate && sessionUserId) { const ent = await getEntitlements(sessionUserId); + if (ent) usage.tier = typeof ent.features.tier === 'number' ? ent.features.tier : 0; if (!ent || !ent.features.apiAccess) { + emitRequest(403, 'tier_403', null); return new Response(JSON.stringify({ error: 'API access subscription required' }), { status: 403, headers: { 'Content-Type': 'application/json', ...corsHeaders }, @@ -390,11 +489,19 @@ export function createDomainGateway( const { validateBearerToken } = await import('./auth-session'); const session = await validateBearerToken(authHeader.slice(7)); if (!session.valid) { + emitRequest(401, 'auth_401', null); return new Response(JSON.stringify({ error: 'Invalid or expired session' }), { status: 401, headers: { 'Content-Type': 'application/json', ...corsHeaders }, }); } + // Capture identity for telemetry — legacy bearer auth bypasses the + // earlier resolveClerkSession() block (only runs for tier-gated routes), + // so without this premium bearer requests would emit as anonymous. + if (session.userId) { + sessionUserId = session.userId; + usage.sessionUserId = session.userId; + } // Accept EITHER a Clerk 'pro' role OR a Convex Dodo entitlement with // tier >= 1. The Dodo webhook pipeline writes Convex entitlements but // does NOT sync Clerk publicMetadata.role, so a paying subscriber's @@ -414,9 +521,11 @@ export function createDomainGateway( let allowed = session.role === 'pro'; if (!allowed && session.userId) { const ent = await getEntitlements(session.userId); + if (ent) usage.tier = typeof ent.features.tier === 'number' ? ent.features.tier : 0; allowed = !!ent && ent.features.tier >= 1 && ent.validUntil >= Date.now(); } if (!allowed) { + emitRequest(403, 'tier_403', null); return new Response(JSON.stringify({ error: 'Pro subscription required' }), { status: 403, headers: { 'Content-Type': 'application/json', ...corsHeaders }, @@ -424,12 +533,14 @@ export function createDomainGateway( } // Valid pro session (Clerk role OR Dodo entitlement) — fall through to route handling. } else { + emitRequest(401, 'auth_401', null); return new Response(JSON.stringify({ error: keyCheck.error, _debug: (keyCheck as any)._debug }), { status: 401, headers: { 'Content-Type': 'application/json', ...corsHeaders }, }); } } else { + emitRequest(401, 'auth_401', null); return new Response(JSON.stringify({ error: keyCheck.error }), { status: 401, headers: { 'Content-Type': 'application/json', ...corsHeaders }, @@ -442,16 +553,36 @@ export function createDomainGateway( // User API keys do NOT bypass — the key owner's tier is checked normally. if (!(keyCheck.valid && wmKey && !isUserApiKey)) { const entitlementResponse = await checkEntitlement(request, pathname, corsHeaders); - if (entitlementResponse) return entitlementResponse; + if (entitlementResponse) { + const entReason: RequestReason = + entitlementResponse.status === 401 ? 'auth_401' + : entitlementResponse.status === 403 ? 'tier_403' + : 'ok'; + emitRequest(entitlementResponse.status, entReason, null); + return entitlementResponse; + } + // Allowed → record the resolved tier for telemetry. getEntitlements has + // its own Redis cache + in-flight coalescing, so the second lookup here + // does not double the cost when checkEntitlement already fetched. + if (isTierGated && sessionUserId && usage.tier === null) { + const ent = await getEntitlements(sessionUserId); + if (ent) usage.tier = typeof ent.features.tier === 'number' ? ent.features.tier : 0; + } } // IP-based rate limiting — two-phase: endpoint-specific first, then global fallback const endpointRlResponse = await checkEndpointRateLimit(request, pathname, corsHeaders); - if (endpointRlResponse) return endpointRlResponse; + if (endpointRlResponse) { + emitRequest(endpointRlResponse.status, 'rate_limit_429', null); + return endpointRlResponse; + } if (!hasEndpointRatePolicy(pathname)) { const rateLimitResponse = await checkRateLimit(request, corsHeaders); - if (rateLimitResponse) return rateLimitResponse; + if (rateLimitResponse) { + emitRequest(rateLimitResponse.status, 'rate_limit_429', null); + return rateLimitResponse; + } } // Route matching — if POST doesn't match, convert to GET for stale clients @@ -477,21 +608,38 @@ export function createDomainGateway( if (!matchedHandler) { const allowed = router.allowedMethods(new URL(request.url).pathname); if (allowed.length > 0) { + emitRequest(405, 'ok', null); return new Response(JSON.stringify({ error: 'Method not allowed' }), { status: 405, headers: { 'Content-Type': 'application/json', Allow: allowed.join(', '), ...corsHeaders }, }); } + emitRequest(404, 'ok', null); return new Response(JSON.stringify({ error: 'Not found' }), { status: 404, headers: { 'Content-Type': 'application/json', ...corsHeaders }, }); } - // Execute handler with top-level error boundary + // Execute handler with top-level error boundary. + // Wrap in runWithUsageScope so deep fetch helpers (fetchJson, + // cachedFetchJsonWithMeta) can attribute upstream calls to this customer + // without leaf handlers having to thread a usage hook through every call. let response: Response; + const identityForScope = buildUsageIdentity(usage); + const handlerCall = matchedHandler; + const requestForHandler = request; try { - response = await matchedHandler(request); + response = await runWithUsageScope( + { + ctx: ctx ?? { waitUntil: () => {} }, + requestId: deriveRequestId(originalRequest), + customerId: identityForScope.customer_id, + route: pathname, + tier: identityForScope.tier, + }, + () => handlerCall(requestForHandler), + ); } catch (err) { console.error('[gateway] Unhandled handler error:', err); response = new Response(JSON.stringify({ message: 'Internal server error' }), { @@ -513,6 +661,7 @@ export function createDomainGateway( } // For GET 200 responses: read body once for cache-header decisions + ETag + let resolvedCacheTier: CacheTier | null = null; if (response.status === 200 && request.method === 'GET' && response.body) { const bodyBytes = await response.arrayBuffer(); @@ -524,12 +673,14 @@ export function createDomainGateway( if (mergedHeaders.get('X-No-Cache') || isUpstreamUnavailable) { mergedHeaders.set('Cache-Control', 'no-store'); mergedHeaders.set('X-Cache-Tier', 'no-store'); + resolvedCacheTier = 'no-store'; } else { const rpcName = pathname.split('/').pop() ?? ''; const envOverride = process.env[`CACHE_TIER_OVERRIDE_${rpcName.replace(/-/g, '_').toUpperCase()}`] as CacheTier | undefined; const isPremium = PREMIUM_RPC_PATHS.has(pathname) || getRequiredTier(pathname) !== null; const tier = isPremium ? 'slow-browser' as CacheTier : (envOverride && envOverride in TIER_HEADERS ? envOverride : null) ?? RPC_CACHE_TIER[pathname] ?? 'medium'; + resolvedCacheTier = tier; mergedHeaders.set('Cache-Control', TIER_HEADERS[tier]); // Only allow Vercel CDN caching for trusted origins (worldmonitor.app, Vercel previews, // Tauri). No-origin server-side requests (external scrapers) must always reach the edge @@ -563,9 +714,13 @@ export function createDomainGateway( const ifNoneMatch = request.headers.get('If-None-Match'); if (ifNoneMatch === etag) { + emitRequest(304, 'ok', resolvedCacheTier, 0); + maybeAttachDevHealthHeader(mergedHeaders); return new Response(null, { status: 304, headers: mergedHeaders }); } + emitRequest(response.status, 'ok', resolvedCacheTier, view.length); + maybeAttachDevHealthHeader(mergedHeaders); return new Response(bodyBytes, { status: response.status, statusText: response.statusText, @@ -580,6 +735,12 @@ export function createDomainGateway( mergedHeaders.delete('X-No-Cache'); } + // Streaming/non-GET-200 responses: res_bytes is best-effort 0 (Content-Length + // is often absent on chunked responses; teeing the stream would add latency). + const finalContentLen = response.headers.get('content-length'); + const finalResBytes = finalContentLen ? Number(finalContentLen) || 0 : 0; + emitRequest(response.status, 'ok', resolvedCacheTier, finalResBytes); + maybeAttachDevHealthHeader(mergedHeaders); return new Response(response.body, { status: response.status, statusText: response.statusText, diff --git a/tests/usage-telemetry-emission.test.mts b/tests/usage-telemetry-emission.test.mts new file mode 100644 index 000000000..11093a750 --- /dev/null +++ b/tests/usage-telemetry-emission.test.mts @@ -0,0 +1,369 @@ +/** + * Asserts the Axiom telemetry payload emitted by createDomainGateway() — + * specifically the four fields the round-1 Codex review flagged: + * + * - domain (must be 'shipping' for /api/v2/shipping/* routes, not 'v2') + * - customer_id (must be populated on legacy premium bearer-token success) + * - auth_kind (must reflect the resolved identity, not stay 'anon') + * - tier (recorded when entitlement-gated routes succeed; covered indirectly + * by the legacy bearer success case via the Dodo `tier` branch) + * + * Strategy: enable telemetry (USAGE_TELEMETRY=1 + AXIOM_API_TOKEN=fake), stub + * globalThis.fetch to intercept the Axiom ingest POST, and pass a real ctx + * whose waitUntil collects the in-flight Promises so we can await them after + * the gateway returns. + */ + +import assert from 'node:assert/strict'; +import { createServer, type Server } from 'node:http'; +import { afterEach, before, after, describe, it } from 'node:test'; +import { generateKeyPair, exportJWK, SignJWT } from 'jose'; + +import { createDomainGateway, type GatewayCtx } from '../server/gateway.ts'; + +interface CapturedEvent { + event_type: string; + domain: string; + route: string; + status: number; + customer_id: string | null; + auth_kind: string; + tier: number; +} + +function makeRecordingCtx(): { ctx: GatewayCtx; settled: Promise } { + const pending: Promise[] = []; + const ctx: GatewayCtx = { + waitUntil: (p) => { pending.push(p); }, + }; + // Quiescence loop: emitUsageEvents calls ctx.waitUntil from inside an + // already-pending waitUntil promise, so the array grows during drain. + // Keep awaiting until no new entries appear between iterations. + async function settled(): Promise { + let prev = -1; + while (pending.length !== prev) { + prev = pending.length; + await Promise.allSettled(pending.slice(0, prev)); + } + } + return { + ctx, + get settled() { return settled(); }, + } as { ctx: GatewayCtx; settled: Promise }; +} + +function installAxiomFetchSpy( + originalFetch: typeof fetch, + opts: { entitlementsResponse?: unknown } = {}, +): { + events: CapturedEvent[]; + restore: () => void; +} { + const events: CapturedEvent[] = []; + globalThis.fetch = (async (input: RequestInfo | URL, init?: RequestInit) => { + const url = typeof input === 'string' ? input : input instanceof URL ? input.toString() : input.url; + if (url.includes('api.axiom.co')) { + const body = init?.body ? JSON.parse(init.body as string) as CapturedEvent[] : []; + for (const ev of body) events.push(ev); + return new Response('{}', { status: 200 }); + } + if (url.includes('/api/internal-entitlements')) { + return new Response(JSON.stringify(opts.entitlementsResponse ?? null), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }); + } + return originalFetch(input as Request | string | URL, init); + }) as typeof fetch; + return { events, restore: () => { globalThis.fetch = originalFetch; } }; +} + +const ORIGINAL_FETCH = globalThis.fetch; +const ORIGINAL_USAGE_FLAG = process.env.USAGE_TELEMETRY; +const ORIGINAL_AXIOM_TOKEN = process.env.AXIOM_API_TOKEN; +const ORIGINAL_VALID_KEYS = process.env.WORLDMONITOR_VALID_KEYS; + +afterEach(() => { + globalThis.fetch = ORIGINAL_FETCH; + if (ORIGINAL_USAGE_FLAG == null) delete process.env.USAGE_TELEMETRY; + else process.env.USAGE_TELEMETRY = ORIGINAL_USAGE_FLAG; + if (ORIGINAL_AXIOM_TOKEN == null) delete process.env.AXIOM_API_TOKEN; + else process.env.AXIOM_API_TOKEN = ORIGINAL_AXIOM_TOKEN; + if (ORIGINAL_VALID_KEYS == null) delete process.env.WORLDMONITOR_VALID_KEYS; + else process.env.WORLDMONITOR_VALID_KEYS = ORIGINAL_VALID_KEYS; +}); + +describe('gateway telemetry payload — domain extraction', () => { + it("emits domain='shipping' for /api/v2/shipping/* routes (not 'v2')", async () => { + process.env.USAGE_TELEMETRY = '1'; + process.env.AXIOM_API_TOKEN = 'test-token'; + const spy = installAxiomFetchSpy(ORIGINAL_FETCH); + + const handler = createDomainGateway([ + { + method: 'GET', + path: '/api/v2/shipping/route-intelligence', + handler: async () => new Response('{"ok":true}', { status: 200 }), + }, + ]); + + const recorder = makeRecordingCtx(); + const res = await handler( + new Request('https://worldmonitor.app/api/v2/shipping/route-intelligence', { + headers: { Origin: 'https://worldmonitor.app' }, + }), + recorder.ctx, + ); + // Anonymous → 401 (premium path, missing API key + no bearer) + assert.equal(res.status, 401); + + await recorder.settled; + spy.restore(); + + assert.equal(spy.events.length, 1, 'expected exactly one telemetry event'); + const ev = spy.events[0]!; + assert.equal(ev.domain, 'shipping', `domain should strip leading vN segment, got '${ev.domain}'`); + assert.equal(ev.route, '/api/v2/shipping/route-intelligence'); + assert.equal(ev.auth_kind, 'anon'); + assert.equal(ev.customer_id, null); + assert.equal(ev.tier, 0); + }); + + it("emits domain='market' for the standard /api//v1/ layout", async () => { + process.env.USAGE_TELEMETRY = '1'; + process.env.AXIOM_API_TOKEN = 'test-token'; + const spy = installAxiomFetchSpy(ORIGINAL_FETCH); + + const handler = createDomainGateway([ + { + method: 'GET', + path: '/api/market/v1/list-market-quotes', + handler: async () => new Response('{"ok":true}', { status: 200 }), + }, + ]); + + const recorder = makeRecordingCtx(); + const res = await handler( + new Request('https://worldmonitor.app/api/market/v1/list-market-quotes?symbols=AAPL', { + headers: { Origin: 'https://worldmonitor.app' }, + }), + recorder.ctx, + ); + assert.equal(res.status, 200); + + await recorder.settled; + spy.restore(); + + assert.equal(spy.events.length, 1); + assert.equal(spy.events[0]!.domain, 'market'); + }); +}); + +describe('gateway telemetry payload — bearer identity propagation', () => { + let privateKey: CryptoKey; + let jwksServer: Server; + let jwksPort: number; + + before(async () => { + const { publicKey, privateKey: pk } = await generateKeyPair('RS256'); + privateKey = pk; + + const publicJwk = await exportJWK(publicKey); + publicJwk.kid = 'telemetry-key-1'; + publicJwk.alg = 'RS256'; + publicJwk.use = 'sig'; + const jwks = { keys: [publicJwk] }; + + jwksServer = createServer((req, res) => { + if (req.url === '/.well-known/jwks.json') { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(jwks)); + } else { + res.writeHead(404); + res.end(); + } + }); + await new Promise((resolve) => jwksServer.listen(0, '127.0.0.1', () => resolve())); + const addr = jwksServer.address(); + jwksPort = typeof addr === 'object' && addr ? addr.port : 0; + process.env.CLERK_JWT_ISSUER_DOMAIN = `http://127.0.0.1:${jwksPort}`; + }); + + after(async () => { + jwksServer?.close(); + delete process.env.CLERK_JWT_ISSUER_DOMAIN; + }); + + function signToken(claims: Record) { + return new SignJWT(claims) + .setProtectedHeader({ alg: 'RS256', kid: 'telemetry-key-1' }) + .setIssuer(`http://127.0.0.1:${jwksPort}`) + .setAudience('convex') + .setSubject(claims.sub as string ?? 'user_test') + .setIssuedAt() + .setExpirationTime('1h') + .sign(privateKey); + } + + it('records customer_id from a successful legacy premium bearer call', async () => { + process.env.USAGE_TELEMETRY = '1'; + process.env.AXIOM_API_TOKEN = 'test-token'; + const spy = installAxiomFetchSpy(ORIGINAL_FETCH); + + const handler = createDomainGateway([ + { + method: 'GET', + path: '/api/resilience/v1/get-resilience-score', + handler: async () => new Response('{"ok":true}', { status: 200 }), + }, + ]); + + const token = await signToken({ sub: 'user_pro', plan: 'pro' }); + const recorder = makeRecordingCtx(); + const res = await handler( + new Request('https://worldmonitor.app/api/resilience/v1/get-resilience-score?countryCode=US', { + headers: { + Origin: 'https://worldmonitor.app', + Authorization: `Bearer ${token}`, + }, + }), + recorder.ctx, + ); + assert.equal(res.status, 200); + + await recorder.settled; + spy.restore(); + + assert.equal(spy.events.length, 1, 'expected exactly one telemetry event'); + const ev = spy.events[0]!; + // The whole point of fix #2: pre-fix this would have been null/anon. + assert.equal(ev.customer_id, 'user_pro', 'customer_id should be the bearer subject'); + assert.equal(ev.auth_kind, 'clerk_jwt'); + assert.equal(ev.domain, 'resilience'); + assert.equal(ev.status, 200); + }); + + it("records tier=2 for an entitlement-gated success (the path the round-1 P2 fix targets)", async () => { + // /api/market/v1/analyze-stock requires tier 2 in ENDPOINT_ENTITLEMENTS. + // Pre-fix: usage.tier stayed null → emitted as 0. Post-fix: gateway re-reads + // entitlements after checkEntitlement allows the request, so tier=2 lands on + // the wire. We exercise this by stubbing the Convex entitlements fallback — + // Redis returns null without UPSTASH env, then getEntitlements falls through + // to the Convex HTTP path which we intercept via the same fetch spy. + process.env.USAGE_TELEMETRY = '1'; + process.env.AXIOM_API_TOKEN = 'test-token'; + process.env.CONVEX_SITE_URL = 'https://convex.test'; + process.env.CONVEX_SERVER_SHARED_SECRET = 'test-shared-secret'; + + const fakeEntitlements = { + planKey: 'api_starter', + features: { + tier: 2, + apiAccess: true, + apiRateLimit: 1000, + maxDashboards: 10, + prioritySupport: false, + exportFormats: ['json'], + }, + validUntil: Date.now() + 60_000, + }; + const spy = installAxiomFetchSpy(ORIGINAL_FETCH, { entitlementsResponse: fakeEntitlements }); + + const handler = createDomainGateway([ + { + method: 'GET', + path: '/api/market/v1/analyze-stock', + handler: async () => new Response('{"ok":true}', { status: 200 }), + }, + ]); + + // plan: 'api' so the legacy bearer-role short-circuit (`session.role === 'pro'`) + // does NOT fire — we want the entitlement-check path that populates usage.tier. + const token = await signToken({ sub: 'user_api', plan: 'api' }); + const recorder = makeRecordingCtx(); + const res = await handler( + new Request('https://worldmonitor.app/api/market/v1/analyze-stock?symbol=AAPL', { + headers: { + Origin: 'https://worldmonitor.app', + Authorization: `Bearer ${token}`, + }, + }), + recorder.ctx, + ); + assert.equal(res.status, 200, 'entitlement-gated request with sufficient tier should succeed'); + + await recorder.settled; + spy.restore(); + delete process.env.CONVEX_SITE_URL; + delete process.env.CONVEX_SERVER_SHARED_SECRET; + + assert.equal(spy.events.length, 1); + const ev = spy.events[0]!; + assert.equal(ev.tier, 2, `tier should reflect resolved entitlement, got ${ev.tier}`); + assert.equal(ev.customer_id, 'user_api'); + assert.equal(ev.auth_kind, 'clerk_jwt'); + assert.equal(ev.domain, 'market'); + assert.equal(ev.route, '/api/market/v1/analyze-stock'); + }); + + it('still emits with auth_kind=anon when the bearer is invalid', async () => { + process.env.USAGE_TELEMETRY = '1'; + process.env.AXIOM_API_TOKEN = 'test-token'; + const spy = installAxiomFetchSpy(ORIGINAL_FETCH); + + const handler = createDomainGateway([ + { + method: 'GET', + path: '/api/resilience/v1/get-resilience-score', + handler: async () => new Response('{"ok":true}', { status: 200 }), + }, + ]); + + const recorder = makeRecordingCtx(); + const res = await handler( + new Request('https://worldmonitor.app/api/resilience/v1/get-resilience-score?countryCode=US', { + headers: { + Origin: 'https://worldmonitor.app', + Authorization: 'Bearer not-a-real-token', + }, + }), + recorder.ctx, + ); + assert.equal(res.status, 401); + + await recorder.settled; + spy.restore(); + + assert.equal(spy.events.length, 1); + const ev = spy.events[0]!; + assert.equal(ev.auth_kind, 'anon'); + assert.equal(ev.customer_id, null); + }); +}); + +describe('gateway telemetry payload — ctx-optional safety', () => { + it('handler(req) without ctx still resolves cleanly even with telemetry on', async () => { + process.env.USAGE_TELEMETRY = '1'; + process.env.AXIOM_API_TOKEN = 'test-token'; + const spy = installAxiomFetchSpy(ORIGINAL_FETCH); + + const handler = createDomainGateway([ + { + method: 'GET', + path: '/api/market/v1/list-market-quotes', + handler: async () => new Response('{"ok":true}', { status: 200 }), + }, + ]); + + const res = await handler( + new Request('https://worldmonitor.app/api/market/v1/list-market-quotes?symbols=AAPL', { + headers: { Origin: 'https://worldmonitor.app' }, + }), + ); + assert.equal(res.status, 200); + spy.restore(); + // No ctx → emit short-circuits → no events delivered. The point is that + // the handler does not throw "Cannot read properties of undefined". + assert.equal(spy.events.length, 0); + }); +});