feat(usage): per-request Axiom telemetry pipeline (gateway + upstream attribution) (#3403)

* feat(gateway): thread Vercel Edge ctx through createDomainGateway (#3381)

PR-0 of the Axiom usage-telemetry stack. Pure infra change: no telemetry
emission yet, only the signature plumbing required for ctx.waitUntil to
exist on the hot path.

- createDomainGateway returns (req, ctx) instead of (req)
- rewriteToSebuf propagates ctx to its target gateway
- 5 alias callsites updated to pass ctx through
- ~30 [rpc].ts callsites unchanged (export default createDomainGateway(...))

Pattern reference: api/notification-channels.ts:166.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(usage): pure UsageIdentity resolver + Axiom emit primitives (#3381)

server/_shared/usage-identity.ts
- buildUsageIdentity: pure function, consumes already-resolved gateway state.
- Static ENTERPRISE_KEY_TO_CUSTOMER map (explicit, reviewable in code).
- Does not re-verify JWTs or re-validate API keys.

server/_shared/usage.ts
- buildRequestEvent / buildUpstreamEvent: allowlisted-primitive builders only.
  Never accept Request/Response — additive field leaks become structurally
  impossible.
- emitUsageEvents → ctx.waitUntil(sendToAxiom). Direct fetch, 1.5s timeout,
  no retry, gated by USAGE_TELEMETRY=1 and AXIOM_API_TOKEN.
- Sliding-window circuit breaker (5% over 5min, min 20 samples). Trips with
  one structured console.error; subsequent drops are 1%-sampled console.warn.
- Header derivers reuse Vercel/CF headers for request_id, region, country,
  reqBytes; ua_hash null unless USAGE_UA_PEPPER is set (no stable
  fingerprinting).
- Dev-only x-usage-telemetry response header for 2-second debugging.

server/_shared/auth-session.ts
- New resolveClerkSession returning { userId, orgId } in one JWT verify so
  customer_id can be Clerk org id without a second pass. resolveSessionUserId
  kept as back-compat wrapper.

No emission wiring yet — that lands in the next commit (gateway request
event + 403 + 429).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(gateway): emit Axiom request events on every return path (#3381)

Wires the request-event side of the Axiom usage-telemetry stack. Behind
USAGE_TELEMETRY=1 — no-op when the env var is unset.

Emit points (each builds identity from accumulated gateway state):
- origin_403 disallowed origin → reason=origin_403
- API access subscription required (403)
- legacy bearer 401 / 403 / 401-without-bearer
- entitlement check fail-through
- endpoint rate-limit 429 → reason=rate_limit_429
- global rate-limit 429 → reason=rate_limit_429
- 405 method not allowed
- 404 not found
- 304 etag match (resolved cache tier)
- 200 GET with body (resolved cache tier, real res_bytes)
- streaming / non-GET-200 final return (res_bytes best-effort)

Identity inputs (UsageIdentityInput):
- sessionUserId / clerkOrgId from new resolveClerkSession (one JWT verify)
- isUserApiKey + userApiKeyCustomerRef from validateUserApiKey result
- enterpriseApiKey when keyCheck.valid + non-wm_ wmKey present
- widgetKey from x-widget-key header (best-effort)
- tier captured opportunistically from existing getEntitlements calls

Header derivers reuse Vercel/CF metadata (x-vercel-id, x-vercel-ip-country,
cf-ipcountry, content-length, sentry-trace) — no new geo lookup, no new
crypto on the hot path. ua_hash null unless USAGE_UA_PEPPER is set.

Dev-only x-usage-telemetry response header (ok | degraded | off) attached
on the response paths for 2-second debugging in non-production.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(usage): upstream events via implicit request scope (#3381)

Closes the upstream-attribution side of the Axiom usage-telemetry stack
without requiring leaf-handler changes (per koala's review).

server/_shared/usage.ts
- AsyncLocalStorage-backed UsageScope: gateway sets it once per request,
  fetch helpers read from it lazily. Defensive import — if the runtime
  rejects node:async_hooks, scope helpers degrade to no-ops and the
  request event is unaffected.
- runWithUsageScope(scope, fn) / getUsageScope() exports.

server/gateway.ts
- Wraps matchedHandler in runWithUsageScope({ ctx, requestId, customerId,
  route, tier }) so deep fetchers can attribute upstream calls without
  threading state through every handler signature.

server/_shared/redis.ts
- cachedFetchJsonWithMeta accepts opts.usage = { provider, operation? }.
  Only the provider label is required to opt in — request_id / customer_id
  / route / tier flow implicitly from UsageScope.
- Emits on the fresh path only (cache hits don't emit; the inbound
  request event already records cache_status).
- cache_status correctly distinguishes 'miss' vs 'neg-sentinel' by
  construction, matching NEG_SENTINEL handling.
- Telemetry never throws — failures are swallowed in the lazy-import
  catch, sink itself short-circuits on USAGE_TELEMETRY=0.

server/_shared/fetch-json.ts
- New optional { provider, operation } in FetchJsonOptions. Same
  opt-in-by-provider model as cachedFetchJsonWithMeta. Auto-derives host
  from URL. Reads body via .text() so response_bytes is recorded
  (best-effort; chunked responses still report 0).

Net result: any handler that uses fetchJson or cachedFetchJsonWithMeta
gets full per-customer upstream attribution by adding two fields to the
options bag. No signature changes anywhere else.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(gateway): address round-1 codex feedback on usage telemetry

- ctx is now optional on the createDomainGateway handler signature so
  direct callers (tests, non-Vercel paths) no longer crash on emit
- legacy premium bearer-token routes (resilience, shipping-v2) propagate
  session.userId into the usage accumulator so successful requests are
  attributed instead of emitting as anon
- after checkEntitlement allows a tier-gated route, re-read entitlements
  (Redis-cached + in-flight coalesced) to populate usage.tier so
  analyze-stock & co. emit the correct tier rather than 0
- domain extraction now skips a leading vN segment, so /api/v2/shipping/*
  records domain="shipping" instead of "v2"

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(usage): assert telemetry payload + identity resolver + operator guide

- tests/usage-telemetry-emission.test.mts stubs globalThis.fetch to
  capture the Axiom ingest POST body and asserts the four review-flagged
  fields end-to-end through the gateway: domain on /api/v2/<svc>/* (was
  "v2"), customer_id on legacy premium bearer success (was null/anon),
  tier on entitlement-gated success via the Convex fallback path (was 0),
  plus a ctx-optional regression guard
- server/__tests__/usage-identity.test.ts unit-tests the pure
  buildUsageIdentity() resolver across every auth_kind branch, tier
  coercion, and the secret-handling invariant (raw enterprise key never
  lands in any output field)
- docs/architecture/usage-telemetry.md is the operator + dev guide:
  field reference, architecture, configuration, failure modes, local
  workflow, eight Axiom APL recipes, and runbooks for adding fields /
  new gateway return paths

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(usage): make recorder.settled robust to nested waitUntil

Promise.all(pending) snapshotted the array at call time, missing the
inner ctx.waitUntil(sendToAxiom(...)) that emitUsageEvents pushes after
the outer drain begins. Tests passed only because the fetch spy resolved
in an earlier microtask tick. Replace with a quiescence loop so the
helper survives any future async in the emit path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore: trigger preview

* fix(usage): address koala #3403 review — collapse nested waitUntil, widget-key validation, neg-sentinel status, auth_* reasons

P1
- Collapse nested ctx.waitUntil at all 3 emit sites (gateway.ts emitRequest,
  fetch-json.ts, redis.ts emitUpstreamFromHook). Export sendToAxiom and call
  it directly inside the outer waitUntil so Edge runtimes don't drop the
  delivery promise after the response phase.
- Validate X-Widget-Key against WIDGET_AGENT_KEY before populating usage.widgetKey
  so unauthenticated callers can't spoof per-customer attribution.

P2
- Emit on OPTIONS preflight (new 'preflight' RequestReason).
- Gate cachedFetchJsonWithMeta upstreamStatus=200 on result != null so the
  neg-sentinel branch no longer reports as a successful upstream call.
- Extend RequestReason with auth_401/auth_403/tier_403 and replace
  reason:'ok' on every auth/tier-rejection emit path.
- Replace 32-bit FNV-1a with a two-round XOR-folded 64-bit variant in
  hashKeySync (collision space matters once widget-key adoption grows).

Verification
- tests/usage-telemetry-emission.test.mts — 6/6
- tests/premium-stock-gateway.test.mts + tests/gateway-cdn-origin-policy.test.mts — 15/15
- npx vitest run server/__tests__/usage-identity.test.ts — 13/13
- npx tsc --noEmit clean

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore: trigger preview rebuild for AXIOM_API_TOKEN

* chore(usage): note Axiom region in ingest URL comment

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* debug(usage): unconditional logs in sendToAxiom for preview troubleshooting

Temporary — to be reverted once Axiom delivery is confirmed working in preview.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(usage): add 'live' cache tier + revert preview debug logs

- Sync UsageCacheTier with the local CacheTier in gateway.ts (main added
  'live' in PR #3402 — synthetic merge with main was failing typecheck:api).
- Revert temporary unconditional debug logs in sendToAxiom now that Axiom
  delivery is verified end-to-end on preview (event landed with all fields
  populated, including the new auth_401 reason from the koala #3403 fix).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sebastien Melki
2026-04-25 18:10:51 +03:00
committed by GitHub
parent 8655bd81bc
commit 1db05e6caa
15 changed files with 1754 additions and 26 deletions

View File

@@ -4,5 +4,5 @@ import gateway from './[rpc]';
import { rewriteToSebuf } from '../../../server/alias-rewrite';
// Alias for documented v1 URL. See server/alias-rewrite.ts.
export default (req: Request) =>
rewriteToSebuf(req, '/api/scenario/v1/run-scenario', gateway);
export default (req: Request, ctx: { waitUntil: (p: Promise<unknown>) => void }) =>
rewriteToSebuf(req, '/api/scenario/v1/run-scenario', gateway, ctx);

View File

@@ -4,5 +4,5 @@ import gateway from './[rpc]';
import { rewriteToSebuf } from '../../../server/alias-rewrite';
// Alias for documented v1 URL. See server/alias-rewrite.ts.
export default (req: Request) =>
rewriteToSebuf(req, '/api/scenario/v1/get-scenario-status', gateway);
export default (req: Request, ctx: { waitUntil: (p: Promise<unknown>) => void }) =>
rewriteToSebuf(req, '/api/scenario/v1/get-scenario-status', gateway, ctx);

View File

@@ -4,5 +4,5 @@ import gateway from './[rpc]';
import { rewriteToSebuf } from '../../../server/alias-rewrite';
// Alias for documented v1 URL. See server/alias-rewrite.ts.
export default (req: Request) =>
rewriteToSebuf(req, '/api/scenario/v1/list-scenario-templates', gateway);
export default (req: Request, ctx: { waitUntil: (p: Promise<unknown>) => void }) =>
rewriteToSebuf(req, '/api/scenario/v1/list-scenario-templates', gateway, ctx);

View File

@@ -4,5 +4,5 @@ import gateway from './[rpc]';
import { rewriteToSebuf } from '../../../server/alias-rewrite';
// Alias for documented v1 URL. See server/alias-rewrite.ts.
export default (req: Request) =>
rewriteToSebuf(req, '/api/supply-chain/v1/get-country-products', gateway);
export default (req: Request, ctx: { waitUntil: (p: Promise<unknown>) => void }) =>
rewriteToSebuf(req, '/api/supply-chain/v1/get-country-products', gateway, ctx);

View File

@@ -4,5 +4,5 @@ import gateway from './[rpc]';
import { rewriteToSebuf } from '../../../server/alias-rewrite';
// Alias for documented v1 URL. See server/alias-rewrite.ts.
export default (req: Request) =>
rewriteToSebuf(req, '/api/supply-chain/v1/get-multi-sector-cost-shock', gateway);
export default (req: Request, ctx: { waitUntil: (p: Promise<unknown>) => void }) =>
rewriteToSebuf(req, '/api/supply-chain/v1/get-multi-sector-cost-shock', gateway, ctx);

View File

@@ -0,0 +1,351 @@
# Usage telemetry (Axiom)
Operator + developer guide to the gateway's per-request usage telemetry pipeline.
Implements the requirements in `docs/brainstorms/2026-04-24-axiom-api-observability-requirements.md`.
---
## What it is
Every inbound API request that hits `createDomainGateway()` emits one structured
event to Axiom describing **who** called **what**, **how it was authenticated**,
**what it cost**, and **how it was served**. Deep fetch helpers
(`fetchJson`, `cachedFetchJsonWithMeta`) emit a second event type per upstream
call so customer × provider attribution is reconstructible.
It is **observability only** — never on the request-critical path. The whole
sink runs inside `ctx.waitUntil(...)` with a 1.5s timeout, no retries, and a
circuit breaker that trips on 5% failure ratio over a 5-minute window.
## What you get out of it
Two event types in dataset `wm_api_usage`:
### `request` (one per inbound request)
| Field | Example | Notes |
|--------------------|-------------------------------------------|----------------------------------------------|
| `event_type` | `"request"` | |
| `request_id` | `"req_xxx"` | from `x-request-id` or generated |
| `route` | `/api/market/v1/analyze-stock` | |
| `domain` | `"market"` | strips leading `vN` for `/api/v2/<svc>/…` |
| `method`, `status` | `"GET"`, `200` | |
| `duration_ms` | `412` | wall-clock at the gateway |
| `req_bytes`, `res_bytes` | | response counted only on 200/304 GET |
| `customer_id` | Clerk user ID, org ID, enterprise slug, or widget key | `null` only for anon |
| `principal_id` | user ID or **hash** of API/widget key | never the raw secret |
| `auth_kind` | `clerk_jwt` \| `user_api_key` \| `enterprise_api_key` \| `widget_key` \| `anon` | |
| `tier` | `0` free / `1` pro / `2` api / `3` enterprise | `0` if unknown |
| `cache_tier` | `fast` \| `medium` \| `slow` \| `slow-browser` \| `static` \| `daily` \| `no-store` | only on 200/304 |
| `country`, `execution_region` | `"US"`, `"iad1"` | Vercel-provided |
| `execution_plane` | `"vercel-edge"` | |
| `origin_kind` | `api-key` \| `oauth` \| `browser-same-origin` \| `browser-cross-origin` \| `null` | derived from headers by `deriveOriginKind()``mcp` and `internal-cron` exist in the `OriginKind` type for upstream/future use but are not currently emitted on the request path |
| `ua_hash` | SHA-256 of the UA | hashed so PII doesn't land in Axiom |
| `sentry_trace_id` | `"abc123…"` | join key into Sentry |
| `reason` | `ok` \| `origin_403` \| `rate_limit_429` \| `preflight` \| `auth_401` \| `auth_403` \| `tier_403` | `auth_*` distinguishes auth-rejection paths from genuine successes when filtering on `status` alone is ambiguous |
### `upstream` (one per outbound fetch from a request handler)
| Field | Example |
|----------------------|--------------------------|
| `request_id` | links back to the parent |
| `provider`, `host` | `"yahoo-finance"`, `"query1.finance.yahoo.com"` |
| `operation` | logical op name set by the helper |
| `status`, `duration_ms`, `request_bytes`, `response_bytes` | |
| `cache_status` | `miss` \| `fresh` \| `stale-while-revalidate` \| `neg-sentinel` |
| `customer_id`, `route`, `tier` | inherited from the inbound request via AsyncLocalStorage |
## What it answers
A non-exhaustive list — copy-paste APL queries are in the **Analysis** section below.
- Per-customer request volume, p50/p95 latency, error rate
- Per-route premium-vs-free traffic mix
- CDN cache-tier distribution per route (calibrate `RPC_CACHE_TIER`)
- Top-of-funnel for noisy abusers (`auth_kind=anon` × `country` × `route`)
- Upstream provider cost per customer (`upstream` join `request` on `request_id`)
- Bearer-vs-API-key vs anon ratio per premium route
- Region heatmaps (`execution_region` × `route`)
---
## Architecture
```
┌─────────────────────────────────────────────────────┐
│ Vercel Edge handler │
│ │
request ──► │ createDomainGateway() │
│ auth resolution → usage:UsageIdentityInput │
│ runWithUsageScope({ ctx, customerId, route, … }) │
│ └─ user handler ── fetchJson / cachedFetch... ─┼─► upstream
│ (reads scope, emits │ API
│ upstream event) │
│ emitRequest(...) at every return point ──────────┼────► Axiom
│ └─ ctx.waitUntil(emitUsageEvents(...)) │ wm_api_usage
└─────────────────────────────────────────────────────┘
```
Code map:
| Concern | File |
|----------------------------------------|--------------------------------------------|
| Gateway emit points + identity accumulator | `server/gateway.ts` |
| Identity resolver (pure) | `server/_shared/usage-identity.ts` |
| Event shapes, builders, Axiom sink, breaker, ALS scope | `server/_shared/usage.ts` |
| Upstream-event emission from fetch helpers | `server/_shared/cached-fetch.ts`, `server/_shared/fetch-json.ts` |
Key invariants:
1. **Builders accept allowlisted primitives only** — they never accept
`Request`, `Response`, or untyped objects, so future field additions can't
leak by structural impossibility.
2. **`emitRequest()` fires at every gateway return path** — origin block,
OPTIONS, 401/403/404/405, rate-limit 429, ETag 304, success 200, error 500.
Adding a new return path requires adding the emit, or telemetry coverage
silently regresses.
3. **`principal_id` is a hash for secret-bearing auth** (API key, widget key)
so raw secrets never land in Axiom.
4. **Telemetry failure must not affect API availability or latency** — sink is
fire-and-forget with timeout + breaker; any error path drops the event with
a 1%-sampled `console.warn`.
---
## Configuration
Two env vars control the pipeline. Both are independent of every other system.
| Var | Required for | Behavior when missing |
|--------------------|--------------|-------------------------------------------|
| `USAGE_TELEMETRY` | Emission | Set to `1` to enable. Anything else → emission is a no-op (zero network calls, zero allocations of the event payload). |
| `AXIOM_API_TOKEN` | Delivery | Events build but `sendToAxiom` short-circuits to a 1%-sampled `[usage-telemetry] drop { reason: 'no-token' }` warning. |
Vercel project setup:
1. Axiom → create dataset **`wm_api_usage`** (the constant in
`server/_shared/usage.ts:18`; rename if you want a different name).
2. Axiom → Settings → API Tokens → create an **Ingest** token scoped to that
dataset. Copy the `xaat-…` value.
3. Vercel → Project → Settings → Environment Variables, add for the desired
environments (Production / Preview):
```
USAGE_TELEMETRY=1
AXIOM_API_TOKEN=xaat-...
```
4. Redeploy. Axiom infers schema from the first events — no upfront schema
work needed.
### Failure modes (deploy-with-Axiom-down is safe)
| Scenario | Behavior |
|---------------------------------------|------------------------------------------------------|
| `USAGE_TELEMETRY` unset | emit is a no-op, identity object is still built but discarded |
| `USAGE_TELEMETRY=1`, no token | event built, `fetch` skipped, sampled warn |
| Axiom returns non-2xx | `recordSample(false)`, sampled warn |
| Axiom timeout (>1.5s) | `AbortController` aborts, sampled warn |
| ≥5% failure ratio over 5min (≥20 samples) | breaker trips → all sends short-circuit until ratio recovers |
| Direct gateway caller passes no `ctx` | emit is a no-op (the `ctx?.waitUntil` guard) |
### Kill switch
There is no in-code feature flag separate from the env vars. To disable in
production: set `USAGE_TELEMETRY=0` (or unset it) and redeploy. Existing
in-flight requests drain on the next isolate cycle.
---
## Local development & testing
### Smoke test without Axiom
Just run the dev server with neither env var set. Hit any route. The path is
fully exercised — only the Axiom POST is skipped.
```sh
vercel dev
curl http://localhost:3000/api/seismology/v1/list-earthquakes
```
In any non-`production` build, the response carries an `x-usage-telemetry`
header. Use it as a wiring check:
```sh
curl -sI http://localhost:3000/api/seismology/v1/list-earthquakes | grep -i x-usage
# x-usage-telemetry: off # USAGE_TELEMETRY unset
# x-usage-telemetry: ok # enabled, breaker closed
# x-usage-telemetry: degraded # breaker tripped — Axiom is failing
```
### End-to-end with a real Axiom dataset
```sh
USAGE_TELEMETRY=1 AXIOM_API_TOKEN=xaat-... vercel dev
curl http://localhost:3000/api/market/v1/list-market-quotes?symbols=AAPL
```
Then in Axiom:
```kusto
['wm_api_usage']
| where _time > ago(2m)
| project _time, route, status, customer_id, auth_kind, tier, duration_ms
```
### Automated tests
Three suites cover the pipeline:
1. **Identity unit tests** — `server/__tests__/usage-identity.test.ts` cover the
pure `buildUsageIdentity()` resolver across every `auth_kind` branch.
2. **Gateway emit assertions** — `tests/usage-telemetry-emission.test.mts`
stubs `globalThis.fetch` to capture the Axiom POST body and asserts the
`domain`, `customer_id`, `auth_kind`, and `tier` fields end-to-end through
the gateway.
3. **Auth-path regression tests** — `tests/premium-stock-gateway.test.mts` and
`tests/gateway-cdn-origin-policy.test.mts` exercise the gateway without a
`ctx` argument, locking in the "telemetry must not break direct callers"
invariant.
Run them:
```sh
npx tsx --test tests/usage-telemetry-emission.test.mts \
tests/premium-stock-gateway.test.mts \
tests/gateway-cdn-origin-policy.test.mts
npx vitest run server/__tests__/usage-identity.test.ts
```
---
## Analysis recipes (Axiom APL)
All queries assume dataset `wm_api_usage`. Adjust time windows as needed.
### Per-customer request volume + error rate
```kusto
['wm_api_usage']
| where event_type == "request" and _time > ago(24h)
| summarize requests = count(),
errors_5xx = countif(status >= 500),
errors_4xx = countif(status >= 400 and status < 500),
p95_ms = percentile(duration_ms, 95)
by customer_id
| order by requests desc
```
### p50 / p95 latency per route
```kusto
['wm_api_usage']
| where event_type == "request" and _time > ago(1h)
| summarize p50 = percentile(duration_ms, 50),
p95 = percentile(duration_ms, 95),
n = count()
by route
| where n > 50
| order by p95 desc
```
### Premium vs free traffic mix per route
```kusto
['wm_api_usage']
| where event_type == "request" and _time > ago(24h)
| extend tier_bucket = case(tier >= 2, "api+ent", tier == 1, "pro", "free/anon")
| summarize n = count() by route, tier_bucket
| evaluate pivot(tier_bucket, sum(n))
| order by route asc
```
### CDN cache-tier mix per route — calibrates `RPC_CACHE_TIER`
```kusto
['wm_api_usage']
| where event_type == "request" and status == 200 and method == "GET" and _time > ago(24h)
| summarize n = count() by route, cache_tier
| evaluate pivot(cache_tier, sum(n))
| order by route asc
```
A route dominated by `slow-browser` that *should* be CDN-cached is a hint to
add an entry to `RPC_CACHE_TIER` in `server/gateway.ts`.
### Anonymous abuse hotspots
```kusto
['wm_api_usage']
| where event_type == "request" and auth_kind == "anon" and _time > ago(1h)
| summarize n = count() by route, country
| where n > 100
| order by n desc
```
### Upstream cost per customer (provider attribution)
```kusto
['wm_api_usage']
| where event_type == "upstream" and _time > ago(24h)
| summarize calls = count(),
response_bytes_mb = sum(response_bytes) / 1024.0 / 1024.0,
p95_ms = percentile(duration_ms, 95)
by customer_id, provider
| order by calls desc
```
### Cache hit ratio per provider (correctness signal)
```kusto
['wm_api_usage']
| where event_type == "upstream" and _time > ago(24h)
| summarize n = count() by provider, cache_status
| evaluate pivot(cache_status, sum(n))
| extend hit_ratio = (fresh + coalesce(['stale-while-revalidate'], 0)) * 1.0 / (fresh + miss + coalesce(['stale-while-revalidate'], 0))
| order by hit_ratio asc
```
### Sentry × Axiom join
When Sentry surfaces an exception, copy its trace ID and:
```kusto
['wm_api_usage']
| where sentry_trace_id == "<paste from Sentry>"
```
…to see the exact request envelope (route, customer, latency, cache outcome).
### Telemetry health watch
```kusto
['wm_api_usage']
| where _time > ago(1h)
| summarize events_per_min = count() by bin(_time, 1m)
| order by _time asc
```
A drop to zero with no corresponding traffic drop = breaker tripped or
Vercel/Axiom integration broken — pair it with the `[usage-telemetry] drop`
warns in Vercel logs to find the cause.
---
## Adding new telemetry fields
1. Add the field to `RequestEvent` (or `UpstreamEvent`) in
`server/_shared/usage.ts`.
2. Extend the corresponding builder (`buildRequestEvent` /
`buildUpstreamEvent`) — only allowlisted primitives, no untyped objects.
3. If the value comes from gateway state, set it on the `usage` accumulator
in `gateway.ts`. Otherwise plumb it through the builder call sites.
4. Axiom auto-discovers the new column on the next ingest. No schema migration.
5. Update this doc's field table.
## Adding a new gateway return path
If you add a new `return new Response(...)` inside `createDomainGateway()`,
**you must call `emitRequest(status, reason, cacheTier, resBytes?)` immediately
before it.** Telemetry coverage is enforced by code review, not lint. The
`reason` field uses the existing `RequestReason` union — extend it if the
return represents a new failure class.

View File

@@ -0,0 +1,143 @@
/**
* Pure-resolver tests for buildUsageIdentity().
*
* The resolver maps gateway-internal auth state to the four telemetry identity
* fields (auth_kind, principal_id, customer_id, tier). It is intentionally
* pure — no JWT verification, no key hashing of secrets, no I/O — so the
* branch matrix is trivially testable here.
*/
import { describe, expect, test } from 'vitest';
import { buildUsageIdentity, type UsageIdentityInput } from '../_shared/usage-identity';
function baseInput(overrides: Partial<UsageIdentityInput> = {}): UsageIdentityInput {
return {
sessionUserId: null,
isUserApiKey: false,
enterpriseApiKey: null,
widgetKey: null,
clerkOrgId: null,
userApiKeyCustomerRef: null,
tier: null,
...overrides,
};
}
describe('buildUsageIdentity — auth_kind branches', () => {
test('user_api_key takes precedence over every other signal', () => {
const ident = buildUsageIdentity(baseInput({
isUserApiKey: true,
sessionUserId: 'user_123',
userApiKeyCustomerRef: 'customer_abc',
enterpriseApiKey: 'should-be-ignored',
widgetKey: 'should-be-ignored',
tier: 2,
}));
expect(ident.auth_kind).toBe('user_api_key');
expect(ident.principal_id).toBe('user_123');
expect(ident.customer_id).toBe('customer_abc');
expect(ident.tier).toBe(2);
});
test('user_api_key falls back to sessionUserId for customer_id when no explicit ref', () => {
const ident = buildUsageIdentity(baseInput({
isUserApiKey: true,
sessionUserId: 'user_123',
tier: 1,
}));
expect(ident.customer_id).toBe('user_123');
});
test('clerk_jwt: customer_id prefers org over user when org is present', () => {
const ident = buildUsageIdentity(baseInput({
sessionUserId: 'user_123',
clerkOrgId: 'org_acme',
tier: 1,
}));
expect(ident.auth_kind).toBe('clerk_jwt');
expect(ident.principal_id).toBe('user_123');
expect(ident.customer_id).toBe('org_acme');
expect(ident.tier).toBe(1);
});
test('clerk_jwt: customer_id falls back to user when no org', () => {
const ident = buildUsageIdentity(baseInput({
sessionUserId: 'user_123',
}));
expect(ident.customer_id).toBe('user_123');
expect(ident.tier).toBe(0);
});
test('enterprise_api_key: principal_id is hashed, not raw', () => {
const ident = buildUsageIdentity(baseInput({
enterpriseApiKey: 'wm_super_secret_key',
tier: 3,
}));
expect(ident.auth_kind).toBe('enterprise_api_key');
expect(ident.principal_id).not.toBe('wm_super_secret_key');
expect(ident.principal_id).toMatch(/^[0-9a-z]+$/);
// Customer is the unmapped sentinel until a real entry is added to ENTERPRISE_KEY_TO_CUSTOMER
expect(ident.customer_id).toBe('enterprise-unmapped');
expect(ident.tier).toBe(3);
});
test('widget_key: customer_id is the widget key itself, principal_id is hashed', () => {
const ident = buildUsageIdentity(baseInput({
widgetKey: 'widget_pub_xyz',
}));
expect(ident.auth_kind).toBe('widget_key');
expect(ident.customer_id).toBe('widget_pub_xyz');
expect(ident.principal_id).not.toBe('widget_pub_xyz');
expect(ident.principal_id).toMatch(/^[0-9a-z]+$/);
expect(ident.tier).toBe(0);
});
test('anon: every field null, tier always zero', () => {
const ident = buildUsageIdentity(baseInput());
expect(ident.auth_kind).toBe('anon');
expect(ident.principal_id).toBeNull();
expect(ident.customer_id).toBeNull();
expect(ident.tier).toBe(0);
});
test('anon: tier coerces to 0 even if input.tier was set (defensive)', () => {
// No identity signal but a leftover tier value should not show up as a mystery free row.
const ident = buildUsageIdentity(baseInput({ tier: 99 }));
expect(ident.tier).toBe(0);
});
});
describe('buildUsageIdentity — tier handling', () => {
test('null tier coerces to 0 for non-anon kinds', () => {
const ident = buildUsageIdentity(baseInput({ sessionUserId: 'u', tier: null }));
expect(ident.tier).toBe(0);
});
test('zero tier is preserved (not promoted)', () => {
const ident = buildUsageIdentity(baseInput({ sessionUserId: 'u', tier: 0 }));
expect(ident.tier).toBe(0);
});
test('integer tiers pass through unchanged', () => {
for (const t of [0, 1, 2, 3]) {
const ident = buildUsageIdentity(baseInput({ sessionUserId: 'u', tier: t }));
expect(ident.tier).toBe(t);
}
});
});
describe('buildUsageIdentity — secret handling', () => {
test('enterprise key never appears verbatim in any output field', () => {
const secret = 'wm_ent_LEAKY_VALUE_DO_NOT_LOG';
const ident = buildUsageIdentity(baseInput({ enterpriseApiKey: secret }));
expect(JSON.stringify(ident)).not.toContain(secret);
});
test('widget key appears as customer_id (intentional — widget keys are public)', () => {
// Widget keys are embeds installed on third-party sites; treating them as
// customer attribution is the contract documented in usage-identity.ts:73-79.
const ident = buildUsageIdentity(baseInput({ widgetKey: 'widget_public_xyz' }));
expect(ident.customer_id).toBe('widget_public_xyz');
});
});

View File

@@ -15,13 +15,18 @@
import { jwtVerify } from 'jose';
import { getClerkJwtVerifyOptions, getJWKS } from '../auth-session';
export interface ClerkSession {
userId: string;
orgId: string | null;
}
/**
* Extracts and verifies a bearer token from the request.
* Returns the userId (sub claim) on success, null on any failure.
* Returns { userId, orgId } on success, null on any failure.
*
* Fail-open: errors are logged but never thrown.
*/
export async function resolveSessionUserId(request: Request): Promise<string | null> {
export async function resolveClerkSession(request: Request): Promise<ClerkSession | null> {
try {
const authHeader = request.headers.get('Authorization');
if (!authHeader?.startsWith('Bearer ')) return null;
@@ -38,7 +43,19 @@ export async function resolveSessionUserId(request: Request): Promise<string | n
issuer: issuerDomain,
});
return (payload.sub as string) ?? null;
const userId = (payload.sub as string) ?? null;
if (!userId) return null;
const orgClaim = (payload as Record<string, unknown>).org as
| Record<string, unknown>
| undefined;
const orgId =
(typeof orgClaim?.id === 'string' ? orgClaim.id : null) ??
(typeof (payload as Record<string, unknown>).org_id === 'string'
? ((payload as Record<string, unknown>).org_id as string)
: null);
return { userId, orgId };
} catch (err) {
console.warn(
'[auth-session] JWT verification failed:',
@@ -47,3 +64,11 @@ export async function resolveSessionUserId(request: Request): Promise<string | n
return null;
}
}
/**
* Back-compat wrapper. Prefer resolveClerkSession() for new callers.
*/
export async function resolveSessionUserId(request: Request): Promise<string | null> {
const session = await resolveClerkSession(request);
return session?.userId ?? null;
}

View File

@@ -1,14 +1,28 @@
import { CHROME_UA } from './constants';
import type { UsageHook } from './redis';
import { buildUpstreamEvent, getUsageScope, sendToAxiom } from './usage';
interface FetchJsonOptions {
timeoutMs?: number;
headers?: Record<string, string>;
/**
* Provider attribution for usage telemetry. When set, an upstream event
* is emitted for this call. Leaves request_id / customer_id / route / tier
* to flow implicitly from the gateway-set UsageScope (issue #3381).
*/
provider?: string;
operation?: string;
/** Escape hatch for callers outside a request scope. Rarely needed. */
usage?: UsageHook;
}
export async function fetchJson<T>(
url: string,
options: FetchJsonOptions = {},
): Promise<T | null> {
const t0 = Date.now();
let status = 0;
let responseBytes = 0;
try {
const response = await fetch(url, {
headers: {
@@ -18,9 +32,57 @@ export async function fetchJson<T>(
},
signal: AbortSignal.timeout(options.timeoutMs ?? 8_000),
});
status = response.status;
if (!response.ok) return null;
return await response.json() as T;
const text = await response.text();
responseBytes = text.length;
return JSON.parse(text) as T;
} catch {
return null;
} finally {
// Emit only when the caller has labeled the provider — avoids polluting
// the dataset with "unknown" rows from internal/utility fetches.
const provider = options.usage?.provider ?? options.provider;
const operation = options.usage?.operation ?? options.operation ?? 'fetch';
if (provider) {
const durationMs = Date.now() - t0;
const explicit = options.usage;
const host = explicit?.host ?? safeHost(url);
// Single waitUntil() registered synchronously here — no nested
// ctx.waitUntil() inside the Axiom delivery (Edge runtimes may drop
// the outer registration after the response phase ends). Static
// import keeps the emit path on the hot path.
const scope = getUsageScope();
const ctx = explicit?.ctx ?? scope?.ctx;
if (ctx) {
const event = buildUpstreamEvent({
requestId: explicit?.requestId ?? scope?.requestId ?? '',
customerId: explicit?.customerId ?? scope?.customerId ?? null,
route: explicit?.route ?? scope?.route ?? '',
tier: explicit?.tier ?? scope?.tier ?? 0,
provider,
operation,
host,
status,
durationMs,
requestBytes: 0,
responseBytes,
cacheStatus: 'miss',
});
try {
ctx.waitUntil(sendToAxiom([event]));
} catch {
/* telemetry must never throw */
}
}
}
}
}
function safeHost(url: string): string {
try {
return new URL(url).host;
} catch {
return '';
}
}

View File

@@ -1,4 +1,5 @@
import { unwrapEnvelope } from './seed-envelope';
import { buildUpstreamEvent, getUsageScope, sendToAxiom } from './usage';
const REDIS_OP_TIMEOUT_MS = 1_500;
const REDIS_PIPELINE_TIMEOUT_MS = 5_000;
@@ -288,6 +289,31 @@ export async function cachedFetchJson<T extends object>(
return promise;
}
/**
* Per-call usage-telemetry hook for upstream event emission (issue #3381).
*
* The only required field is `provider` — its presence is what tells the
* helper "emit an upstream event for this call." Everything else is filled
* in by the gateway-set UsageScope (request_id, customer_id, route, tier,
* ctx) via AsyncLocalStorage. Pass overrides explicitly if you need to.
*
* Use this when calling fetchJson / cachedFetchJsonWithMeta from a code
* path that runs inside a gateway-handled request. For helpers used
* outside any request (cron, scripts), no scope exists and emission is
* skipped silently.
*/
export interface UsageHook {
provider: string;
operation?: string;
host?: string;
// Overrides — leave unset to inherit from gateway-set UsageScope.
ctx?: { waitUntil: (p: Promise<unknown>) => void };
requestId?: string;
customerId?: string | null;
route?: string;
tier?: number;
}
/**
* Like cachedFetchJson but reports the data source.
* Use when callers need to distinguish cache hits from fresh fetches
@@ -296,12 +322,17 @@ export async function cachedFetchJson<T extends object>(
* Returns { data, source } where source is:
* 'cache' — served from Redis
* 'fresh' — fetcher ran (leader) or joined an in-flight fetch (follower)
*
* If `opts.usage` is supplied, an upstream event is emitted on the fresh
* path (issue #3381). Pass-through for callers that don't care about
* telemetry — backwards-compatible.
*/
export async function cachedFetchJsonWithMeta<T extends object>(
key: string,
ttlSeconds: number,
fetcher: () => Promise<T | null>,
negativeTtlSeconds = 120,
opts?: { usage?: UsageHook },
): Promise<{ data: T | null; source: 'cache' | 'fresh' }> {
const cached = await getCachedJson(key);
if (cached === NEG_SENTINEL) return { data: null, source: 'cache' };
@@ -313,16 +344,30 @@ export async function cachedFetchJsonWithMeta<T extends object>(
return { data, source: 'fresh' };
}
const fetchT0 = Date.now();
let upstreamStatus = 0;
let cacheStatus: 'miss' | 'neg-sentinel' = 'miss';
const promise = fetcher()
.then(async (result) => {
// Only count an upstream call as a 200 when it actually returned data.
// A null result triggers the neg-sentinel branch below — these are
// empty/failed upstream calls and must NOT show up as `status=200` in
// dashboards (would poison the cache-hit-ratio recipe and per-provider
// error rates). Use status=0 for the empty branch; cache_status carries
// the structural detail.
if (result != null) {
upstreamStatus = 200;
await setCachedJson(key, result, ttlSeconds);
} else {
upstreamStatus = 0;
cacheStatus = 'neg-sentinel';
await setCachedJson(key, NEG_SENTINEL, negativeTtlSeconds);
}
return result;
})
.catch((err: unknown) => {
upstreamStatus = 0;
console.warn(`[redis] cachedFetchJsonWithMeta fetcher failed for "${key}":`, errMsg(err));
throw err;
})
@@ -331,10 +376,50 @@ export async function cachedFetchJsonWithMeta<T extends object>(
});
inflight.set(key, promise);
const data = await promise;
let data: T | null;
try {
data = await promise;
} finally {
emitUpstreamFromHook(opts?.usage, upstreamStatus, Date.now() - fetchT0, cacheStatus);
}
return { data, source: 'fresh' };
}
function emitUpstreamFromHook(
usage: UsageHook | undefined,
status: number,
durationMs: number,
cacheStatus: 'miss' | 'fresh' | 'stale-while-revalidate' | 'neg-sentinel',
): void {
// Emit only when caller labels the provider — avoids "unknown" pollution.
if (!usage?.provider) return;
// Single waitUntil() registered synchronously here — no nested
// ctx.waitUntil() inside Axiom delivery. Static import keeps the call
// synchronous so the runtime registers it during the request phase.
const scope = getUsageScope();
const ctx = usage.ctx ?? scope?.ctx;
if (!ctx) return;
const event = buildUpstreamEvent({
requestId: usage.requestId ?? scope?.requestId ?? '',
customerId: usage.customerId ?? scope?.customerId ?? null,
route: usage.route ?? scope?.route ?? '',
tier: usage.tier ?? scope?.tier ?? 0,
provider: usage.provider,
operation: usage.operation ?? 'fetch',
host: usage.host ?? '',
status,
durationMs,
requestBytes: 0,
responseBytes: 0,
cacheStatus,
});
try {
ctx.waitUntil(sendToAxiom([event]));
} catch {
/* telemetry must never throw */
}
}
export async function geoSearchByBox(
key: string, lon: number, lat: number,
widthKm: number, heightKm: number, count: number, raw = false,

View File

@@ -0,0 +1,108 @@
/**
* Pure resolver: maps gateway-internal auth state to a UsageIdentity event field set.
*
* MUST NOT re-verify JWTs, re-hash keys, or re-validate API keys. The gateway has
* already done that work — this function consumes the resolved values.
*
* Tier is the user's current entitlement tier (0 = free / unknown). For non-tier-gated
* endpoints the gateway never resolves it, so we accept null/undefined and report 0.
*/
export type AuthKind =
| 'clerk_jwt'
| 'user_api_key'
| 'enterprise_api_key'
| 'widget_key'
| 'anon';
export interface UsageIdentity {
auth_kind: AuthKind;
principal_id: string | null;
customer_id: string | null;
tier: number;
}
export interface UsageIdentityInput {
sessionUserId: string | null;
isUserApiKey: boolean;
enterpriseApiKey: string | null;
widgetKey: string | null;
clerkOrgId: string | null;
userApiKeyCustomerRef: string | null;
tier: number | null;
}
// Static enterprise-key → customer map. Explicit so attribution is reviewable in code,
// not floating in env vars. Add entries here as enterprise customers are onboarded.
// The hash (not the raw key) is used as principal_id so logs never leak the secret.
const ENTERPRISE_KEY_TO_CUSTOMER: Record<string, string> = {
// 'wm_ent_xxxx': 'acme-corp',
};
export function buildUsageIdentity(input: UsageIdentityInput): UsageIdentity {
const tier = input.tier ?? 0;
if (input.isUserApiKey) {
return {
auth_kind: 'user_api_key',
principal_id: input.sessionUserId,
customer_id: input.userApiKeyCustomerRef ?? input.sessionUserId,
tier,
};
}
if (input.sessionUserId) {
return {
auth_kind: 'clerk_jwt',
principal_id: input.sessionUserId,
customer_id: input.clerkOrgId ?? input.sessionUserId,
tier,
};
}
if (input.enterpriseApiKey) {
const customer = ENTERPRISE_KEY_TO_CUSTOMER[input.enterpriseApiKey] ?? 'enterprise-unmapped';
return {
auth_kind: 'enterprise_api_key',
principal_id: hashKeySync(input.enterpriseApiKey),
customer_id: customer,
tier,
};
}
if (input.widgetKey) {
return {
auth_kind: 'widget_key',
principal_id: hashKeySync(input.widgetKey),
customer_id: input.widgetKey,
tier,
};
}
return {
auth_kind: 'anon',
principal_id: null,
customer_id: null,
tier: 0,
};
}
// 64-bit FNV-1a (two-round, XOR-folded) — non-cryptographic, only used to
// avoid logging raw key material. Edge crypto.subtle.digest is async; we
// want a sync helper for the hot path. Two rounds with different seeds give
// ~64 bits of state, dropping birthday-collision risk well below the
// widget-key population horizon (32-bit collides ~65k keys).
function hashKeySync(key: string): string {
let h1 = 2166136261;
let h2 = 0x811c9dc5 ^ 0xa3b2c1d4;
for (let i = 0; i < key.length; i++) {
const c = key.charCodeAt(i);
h1 ^= c;
h1 = Math.imul(h1, 16777619);
h2 ^= c + 0x9e3779b1;
h2 = Math.imul(h2, 16777619);
}
const lo = (h1 >>> 0).toString(36);
const hi = (h2 >>> 0).toString(36);
return `${hi}${lo}`;
}

421
server/_shared/usage.ts Normal file
View File

@@ -0,0 +1,421 @@
/**
* Axiom-based API usage observability — emit-side primitives.
*
* - Builders accept allowlisted primitives only. Never accept Request, Response,
* or untyped objects: future field additions then leak by structural impossibility.
* - emitUsageEvents fires via ctx.waitUntil so the Edge isolate cannot tear down
* the unflushed POST. Direct fetch, 1.5s timeout, no retry.
* - Circuit breaker (5% failure / 5min sliding window) trips when delivery is broken.
* - Tripping logs once via console.error; drops thereafter are 1%-sampled console.warn.
* - Telemetry failure must not affect API availability or latency.
*
* Scoped to USAGE attribution. Sentry-edge already covers exceptions — do NOT
* emit error tracebacks here. Cross-link via sentry_trace_id field instead.
*/
import type { AuthKind } from './usage-identity';
const AXIOM_DATASET = 'wm_api_usage';
// US region endpoint. EU workspaces would use api.eu.axiom.co.
const AXIOM_INGEST_URL = `https://api.axiom.co/v1/datasets/${AXIOM_DATASET}/ingest`;
const TELEMETRY_TIMEOUT_MS = 1_500;
const CB_WINDOW_MS = 5 * 60 * 1_000;
const CB_TRIP_FAILURE_RATIO = 0.05;
const CB_MIN_SAMPLES = 20;
const SAMPLED_DROP_LOG_RATE = 0.01;
function isUsageEnabled(): boolean {
return process.env.USAGE_TELEMETRY === '1';
}
function isDevHeaderEnabled(): boolean {
return process.env.NODE_ENV !== 'production';
}
// ---------- Event shapes ----------
export type CacheTier =
| 'fast'
| 'medium'
| 'slow'
| 'slow-browser'
| 'static'
| 'daily'
| 'no-store'
| 'live';
export type CacheStatus = 'miss' | 'fresh' | 'stale-while-revalidate' | 'neg-sentinel';
export type ExecutionPlane = 'vercel-edge' | 'vercel-node' | 'railway-relay';
export type OriginKind =
| 'browser-same-origin'
| 'browser-cross-origin'
| 'api-key'
| 'oauth'
| 'mcp'
| 'internal-cron';
export type RequestReason =
| 'ok'
| 'origin_403'
| 'rate_limit_429'
| 'preflight'
| 'auth_401'
| 'auth_403'
| 'tier_403';
export interface RequestEvent {
_time: string;
event_type: 'request';
request_id: string;
domain: string;
route: string;
method: string;
status: number;
duration_ms: number;
req_bytes: number;
res_bytes: number;
customer_id: string | null;
principal_id: string | null;
auth_kind: AuthKind;
tier: number;
country: string | null;
execution_region: string | null;
execution_plane: ExecutionPlane;
origin_kind: OriginKind | null;
cache_tier: CacheTier | null;
ua_hash: string | null;
sentry_trace_id: string | null;
reason: RequestReason;
}
export interface UpstreamEvent {
_time: string;
event_type: 'upstream';
request_id: string;
customer_id: string | null;
route: string;
tier: number;
provider: string;
operation: string;
host: string;
status: number;
duration_ms: number;
request_bytes: number;
response_bytes: number;
cache_status: CacheStatus;
}
export type UsageEvent = RequestEvent | UpstreamEvent;
// ---------- Builders (allowlisted primitives only) ----------
export function buildRequestEvent(p: {
requestId: string;
domain: string;
route: string;
method: string;
status: number;
durationMs: number;
reqBytes: number;
resBytes: number;
customerId: string | null;
principalId: string | null;
authKind: AuthKind;
tier: number;
country: string | null;
executionRegion: string | null;
executionPlane: ExecutionPlane;
originKind: OriginKind | null;
cacheTier: CacheTier | null;
uaHash: string | null;
sentryTraceId: string | null;
reason: RequestReason;
}): RequestEvent {
return {
_time: new Date().toISOString(),
event_type: 'request',
request_id: p.requestId,
domain: p.domain,
route: p.route,
method: p.method,
status: p.status,
duration_ms: p.durationMs,
req_bytes: p.reqBytes,
res_bytes: p.resBytes,
customer_id: p.customerId,
principal_id: p.principalId,
auth_kind: p.authKind,
tier: p.tier,
country: p.country,
execution_region: p.executionRegion,
execution_plane: p.executionPlane,
origin_kind: p.originKind,
cache_tier: p.cacheTier,
ua_hash: p.uaHash,
sentry_trace_id: p.sentryTraceId,
reason: p.reason,
};
}
export function buildUpstreamEvent(p: {
requestId: string;
customerId: string | null;
route: string;
tier: number;
provider: string;
operation: string;
host: string;
status: number;
durationMs: number;
requestBytes: number;
responseBytes: number;
cacheStatus: CacheStatus;
}): UpstreamEvent {
return {
_time: new Date().toISOString(),
event_type: 'upstream',
request_id: p.requestId,
customer_id: p.customerId,
route: p.route,
tier: p.tier,
provider: p.provider,
operation: p.operation,
host: p.host,
status: p.status,
duration_ms: p.durationMs,
request_bytes: p.requestBytes,
response_bytes: p.responseBytes,
cache_status: p.cacheStatus,
};
}
// ---------- Header-derived helpers (ok to take Request — these only read primitives) ----------
export function deriveRequestId(req: Request): string {
return req.headers.get('x-vercel-id') ?? '';
}
export function deriveExecutionRegion(req: Request): string | null {
const id = req.headers.get('x-vercel-id');
if (!id) return null;
const sep = id.indexOf('::');
return sep > 0 ? id.slice(0, sep) : null;
}
export function deriveCountry(req: Request): string | null {
return (
req.headers.get('x-vercel-ip-country') ??
req.headers.get('cf-ipcountry') ??
null
);
}
export function deriveReqBytes(req: Request): number {
const len = req.headers.get('content-length');
if (!len) return 0;
const n = Number(len);
return Number.isFinite(n) && n >= 0 ? n : 0;
}
export function deriveSentryTraceId(req: Request): string | null {
return req.headers.get('sentry-trace') ?? null;
}
// ua_hash: SHA-256(UA + monthly-rotated pepper). Pepper key: USAGE_UA_PEPPER.
// If the pepper is unset we return null rather than a stable per-browser fingerprint.
export async function deriveUaHash(req: Request): Promise<string | null> {
const pepper = process.env.USAGE_UA_PEPPER;
if (!pepper) return null;
const ua = req.headers.get('user-agent') ?? '';
if (!ua) return null;
const data = new TextEncoder().encode(`${pepper}|${ua}`);
const buf = await crypto.subtle.digest('SHA-256', data);
return Array.from(new Uint8Array(buf), (b) => b.toString(16).padStart(2, '0')).join('');
}
export function deriveOriginKind(req: Request): OriginKind | null {
const origin = req.headers.get('origin') ?? '';
const hasApiKey =
req.headers.has('x-worldmonitor-key') || req.headers.has('x-api-key');
const hasBearer = (req.headers.get('authorization') ?? '').startsWith('Bearer ');
if (hasApiKey) return 'api-key';
if (hasBearer) return 'oauth';
if (!origin) return null;
try {
const host = new URL(origin).host;
const reqHost = new URL(req.url).host;
return host === reqHost ? 'browser-same-origin' : 'browser-cross-origin';
} catch {
return 'browser-cross-origin';
}
}
// ---------- Circuit breaker ----------
interface BreakerSample {
ts: number;
ok: boolean;
}
const breakerSamples: BreakerSample[] = [];
let breakerTripped = false;
let breakerLastNotifyTs = 0;
function pruneOldSamples(now: number): void {
while (breakerSamples.length > 0 && now - breakerSamples[0]!.ts > CB_WINDOW_MS) {
breakerSamples.shift();
}
}
function recordSample(ok: boolean): void {
const now = Date.now();
pruneOldSamples(now);
breakerSamples.push({ ts: now, ok });
if (breakerSamples.length < CB_MIN_SAMPLES) {
breakerTripped = false;
return;
}
let failures = 0;
for (const s of breakerSamples) if (!s.ok) failures++;
const ratio = failures / breakerSamples.length;
const wasTripped = breakerTripped;
breakerTripped = ratio > CB_TRIP_FAILURE_RATIO;
if (breakerTripped && !wasTripped && now - breakerLastNotifyTs > CB_WINDOW_MS) {
breakerLastNotifyTs = now;
console.error('[usage-telemetry] circuit breaker tripped', {
ratio: ratio.toFixed(3),
samples: breakerSamples.length,
});
}
}
export function getTelemetryHealth(): 'ok' | 'degraded' | 'off' {
if (!isUsageEnabled()) return 'off';
return breakerTripped ? 'degraded' : 'ok';
}
export function maybeAttachDevHealthHeader(headers: Headers): void {
if (!isDevHeaderEnabled()) return;
headers.set('x-usage-telemetry', getTelemetryHealth());
}
// ---------- Implicit request scope (AsyncLocalStorage) ----------
//
// Per koala's review (#3381), this lets fetch helpers emit upstream events
// without leaf handlers having to thread a usage hook through every call.
// The gateway sets the scope before invoking matchedHandler; fetch helpers
// (fetchJson, cachedFetchJsonWithMeta) read from it lazily.
//
// AsyncLocalStorage is loaded defensively. If the runtime ever rejects the
// import (older Edge versions, sandboxed contexts), the scope helpers
// degrade to no-ops and telemetry simply skips. The gateway request event
// is unaffected — it never depended on ALS.
export interface UsageScope {
ctx: WaitUntilCtx;
requestId: string;
customerId: string | null;
route: string;
tier: number;
}
type ALSLike<T> = {
run: <R>(store: T, fn: () => R) => R;
getStore: () => T | undefined;
};
let scopeStore: ALSLike<UsageScope> | null = null;
async function getScopeStore(): Promise<ALSLike<UsageScope> | null> {
if (scopeStore) return scopeStore;
try {
const mod = await import('node:async_hooks');
scopeStore = new mod.AsyncLocalStorage<UsageScope>();
return scopeStore;
} catch {
return null;
}
}
export async function runWithUsageScope<R>(scope: UsageScope, fn: () => R | Promise<R>): Promise<R> {
const store = await getScopeStore();
if (!store) return fn();
return store.run(scope, fn) as R | Promise<R>;
}
export function getUsageScope(): UsageScope | undefined {
return scopeStore?.getStore();
}
// ---------- Sink ----------
export async function sendToAxiom(events: UsageEvent[]): Promise<void> {
if (!isUsageEnabled()) return;
if (events.length === 0) return;
const token = process.env.AXIOM_API_TOKEN;
if (!token) {
if (Math.random() < SAMPLED_DROP_LOG_RATE) {
console.warn('[usage-telemetry] drop', { reason: 'no-token' });
}
return;
}
if (breakerTripped) {
if (Math.random() < SAMPLED_DROP_LOG_RATE) {
console.warn('[usage-telemetry] drop', { reason: 'breaker-open' });
}
return;
}
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), TELEMETRY_TIMEOUT_MS);
try {
const resp = await fetch(AXIOM_INGEST_URL, {
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(events),
signal: controller.signal,
});
if (!resp.ok) {
recordSample(false);
if (Math.random() < SAMPLED_DROP_LOG_RATE) {
console.warn('[usage-telemetry] drop', { reason: `http-${resp.status}` });
}
return;
}
recordSample(true);
} catch (err) {
recordSample(false);
if (Math.random() < SAMPLED_DROP_LOG_RATE) {
const reason = err instanceof Error && err.name === 'AbortError' ? 'timeout' : 'fetch-error';
console.warn('[usage-telemetry] drop', { reason });
}
} finally {
clearTimeout(timer);
}
}
export interface WaitUntilCtx {
waitUntil: (p: Promise<unknown>) => void;
}
export function emitUsageEvents(ctx: WaitUntilCtx, events: UsageEvent[]): void {
if (!isUsageEnabled() || events.length === 0) return;
ctx.waitUntil(sendToAxiom(events));
}
// Variant that returns the in-flight delivery promise instead of registering
// it on a context. Use when the caller is already inside a single
// ctx.waitUntil() chain and wants to await delivery synchronously to avoid a
// nested waitUntil registration (which Edge runtimes may drop).
export function deliverUsageEvents(events: UsageEvent[]): Promise<void> {
if (!isUsageEnabled() || events.length === 0) return Promise.resolve();
return sendToAxiom(events);
}

View File

@@ -13,10 +13,13 @@
*
* Trivially deleted when v1 retires — just `rm` the alias files.
*/
type GatewayCtx = { waitUntil: (p: Promise<unknown>) => void };
export async function rewriteToSebuf(
req: Request,
newPath: string,
gateway: (req: Request) => Promise<Response>,
gateway: (req: Request, ctx: GatewayCtx) => Promise<Response>,
ctx: GatewayCtx,
): Promise<Response> {
const url = new URL(req.url);
url.pathname = newPath;
@@ -27,5 +30,5 @@ export async function rewriteToSebuf(
headers: req.headers,
body,
});
return gateway(rewritten);
return gateway(rewritten, ctx);
}

View File

@@ -17,7 +17,23 @@ import { mapErrorToResponse } from './error-mapper';
import { checkRateLimit, checkEndpointRateLimit, hasEndpointRatePolicy } from './_shared/rate-limit';
import { drainResponseHeaders } from './_shared/response-headers';
import { checkEntitlement, getRequiredTier, getEntitlements } from './_shared/entitlement-check';
import { resolveSessionUserId } from './_shared/auth-session';
import { resolveClerkSession } from './_shared/auth-session';
import { buildUsageIdentity, type UsageIdentityInput } from './_shared/usage-identity';
import {
deliverUsageEvents,
buildRequestEvent,
deriveRequestId,
deriveExecutionRegion,
deriveCountry,
deriveReqBytes,
deriveSentryTraceId,
deriveOriginKind,
deriveUaHash,
maybeAttachDevHealthHeader,
runWithUsageScope,
type CacheTier as UsageCacheTier,
type RequestReason,
} from './_shared/usage';
import type { ServerOptions } from '../src/generated/server/worldmonitor/seismology/v1/service_server';
export const serverOptions: ServerOptions = { onError: mapErrorToResponse };
@@ -283,18 +299,86 @@ import { PREMIUM_RPC_PATHS } from '../src/shared/premium-paths';
* Applies the full gateway pipeline: origin check → CORS → OPTIONS preflight →
* API key → rate limit → route match (with POST→GET compat) → execute → cache headers.
*/
export type GatewayCtx = { waitUntil: (p: Promise<unknown>) => void };
export function createDomainGateway(
routes: RouteDescriptor[],
): (req: Request) => Promise<Response> {
): (req: Request, ctx?: GatewayCtx) => Promise<Response> {
const router = createRouter(routes);
return async function handler(originalRequest: Request): Promise<Response> {
return async function handler(originalRequest: Request, ctx?: GatewayCtx): Promise<Response> {
let request = originalRequest;
const rawPathname = new URL(request.url).pathname;
const pathname = rawPathname.length > 1 ? rawPathname.replace(/\/+$/, '') : rawPathname;
const t0 = Date.now();
// Usage-telemetry identity inputs — accumulated as gateway auth resolution progresses.
// Read at every return point; null/0 defaults are valid for early returns.
//
// x-widget-key is intentionally NOT trusted here: a header is attacker-
// controllable, and emitting it as `customer_id` would let unauthenticated
// callers poison per-customer dashboards (per koala #3403 review). We only
// populate `widgetKey` after validating it against the configured
// WIDGET_AGENT_KEY — same check used in api/widget-agent.ts.
const rawWidgetKey = request.headers.get('x-widget-key') ?? null;
const widgetAgentKey = process.env.WIDGET_AGENT_KEY ?? '';
const validatedWidgetKey =
rawWidgetKey && widgetAgentKey && rawWidgetKey === widgetAgentKey ? rawWidgetKey : null;
const usage: UsageIdentityInput = {
sessionUserId: null,
isUserApiKey: false,
enterpriseApiKey: null,
widgetKey: validatedWidgetKey,
clerkOrgId: null,
userApiKeyCustomerRef: null,
tier: null,
};
// Domain segment for telemetry. Path layouts:
// /api/<domain>/v1/<rpc> → parts[2] = domain
// /api/v2/<domain>/<rpc> → parts[2] = "v2", parts[3] = domain
const _parts = pathname.split('/');
const domain = (/^v\d+$/.test(_parts[2] ?? '') ? _parts[3] : _parts[2]) ?? '';
const reqBytes = deriveReqBytes(request);
function emitRequest(status: number, reason: RequestReason, cacheTier: UsageCacheTier | null, resBytes = 0): void {
if (!ctx?.waitUntil) return;
const identity = buildUsageIdentity(usage);
// Single ctx.waitUntil() registered synchronously in the request phase.
// The IIFE awaits ua_hash (SHA-256) then awaits delivery directly via
// deliverUsageEvents — no nested waitUntil call, which Edge runtimes
// (Cloudflare/Vercel) may drop after the response phase ends.
ctx.waitUntil((async () => {
const uaHash = await deriveUaHash(originalRequest);
await deliverUsageEvents([
buildRequestEvent({
requestId: deriveRequestId(originalRequest),
domain,
route: pathname,
method: originalRequest.method,
status,
durationMs: Date.now() - t0,
reqBytes,
resBytes,
customerId: identity.customer_id,
principalId: identity.principal_id,
authKind: identity.auth_kind,
tier: identity.tier,
country: deriveCountry(originalRequest),
executionRegion: deriveExecutionRegion(originalRequest),
executionPlane: 'vercel-edge',
originKind: deriveOriginKind(originalRequest),
cacheTier,
uaHash,
sentryTraceId: deriveSentryTraceId(originalRequest),
reason,
}),
]);
})());
}
// Origin check — skip CORS headers for disallowed origins
if (isDisallowedOrigin(request)) {
emitRequest(403, 'origin_403', null);
return new Response(JSON.stringify({ error: 'Origin not allowed' }), {
status: 403,
headers: { 'Content-Type': 'application/json' },
@@ -310,6 +394,7 @@ export function createDomainGateway(
// OPTIONS preflight
if (request.method === 'OPTIONS') {
emitRequest(204, 'preflight', null);
return new Response(null, { status: 204, headers: corsHeaders });
}
@@ -322,7 +407,10 @@ export function createDomainGateway(
// Only runs for tier-gated endpoints to avoid JWKS lookup on every request.
let sessionUserId: string | null = null;
if (isTierGated) {
sessionUserId = await resolveSessionUserId(request);
const session = await resolveClerkSession(request);
sessionUserId = session?.userId ?? null;
usage.sessionUserId = sessionUserId;
usage.clerkOrgId = session?.orgId ?? null;
if (sessionUserId) {
request = new Request(request.url, {
method: request.method,
@@ -354,10 +442,13 @@ export function createDomainGateway(
const userKeyResult = await validateUserApiKey(wmKey);
if (userKeyResult) {
isUserApiKey = true;
usage.isUserApiKey = true;
usage.userApiKeyCustomerRef = userKeyResult.userId;
keyCheck = { valid: true, required: true };
// Inject x-user-id for downstream entitlement checks
if (!sessionUserId) {
sessionUserId = userKeyResult.userId;
usage.sessionUserId = sessionUserId;
request = new Request(request.url, {
method: request.method,
headers: (() => {
@@ -371,11 +462,19 @@ export function createDomainGateway(
}
}
// Enterprise API key (WORLDMONITOR_VALID_KEYS): keyCheck.valid + wmKey present
// and not a wm_-prefixed user key.
if (keyCheck.valid && wmKey && !isUserApiKey && !wmKey.startsWith('wm_')) {
usage.enterpriseApiKey = wmKey;
}
// User API keys on PREMIUM_RPC_PATHS need verified pro-tier entitlement.
// Admin keys (WORLDMONITOR_VALID_KEYS) bypass this since they are operator-issued.
if (isUserApiKey && needsLegacyProBearerGate && sessionUserId) {
const ent = await getEntitlements(sessionUserId);
if (ent) usage.tier = typeof ent.features.tier === 'number' ? ent.features.tier : 0;
if (!ent || !ent.features.apiAccess) {
emitRequest(403, 'tier_403', null);
return new Response(JSON.stringify({ error: 'API access subscription required' }), {
status: 403,
headers: { 'Content-Type': 'application/json', ...corsHeaders },
@@ -390,11 +489,19 @@ export function createDomainGateway(
const { validateBearerToken } = await import('./auth-session');
const session = await validateBearerToken(authHeader.slice(7));
if (!session.valid) {
emitRequest(401, 'auth_401', null);
return new Response(JSON.stringify({ error: 'Invalid or expired session' }), {
status: 401,
headers: { 'Content-Type': 'application/json', ...corsHeaders },
});
}
// Capture identity for telemetry — legacy bearer auth bypasses the
// earlier resolveClerkSession() block (only runs for tier-gated routes),
// so without this premium bearer requests would emit as anonymous.
if (session.userId) {
sessionUserId = session.userId;
usage.sessionUserId = session.userId;
}
// Accept EITHER a Clerk 'pro' role OR a Convex Dodo entitlement with
// tier >= 1. The Dodo webhook pipeline writes Convex entitlements but
// does NOT sync Clerk publicMetadata.role, so a paying subscriber's
@@ -414,9 +521,11 @@ export function createDomainGateway(
let allowed = session.role === 'pro';
if (!allowed && session.userId) {
const ent = await getEntitlements(session.userId);
if (ent) usage.tier = typeof ent.features.tier === 'number' ? ent.features.tier : 0;
allowed = !!ent && ent.features.tier >= 1 && ent.validUntil >= Date.now();
}
if (!allowed) {
emitRequest(403, 'tier_403', null);
return new Response(JSON.stringify({ error: 'Pro subscription required' }), {
status: 403,
headers: { 'Content-Type': 'application/json', ...corsHeaders },
@@ -424,12 +533,14 @@ export function createDomainGateway(
}
// Valid pro session (Clerk role OR Dodo entitlement) — fall through to route handling.
} else {
emitRequest(401, 'auth_401', null);
return new Response(JSON.stringify({ error: keyCheck.error, _debug: (keyCheck as any)._debug }), {
status: 401,
headers: { 'Content-Type': 'application/json', ...corsHeaders },
});
}
} else {
emitRequest(401, 'auth_401', null);
return new Response(JSON.stringify({ error: keyCheck.error }), {
status: 401,
headers: { 'Content-Type': 'application/json', ...corsHeaders },
@@ -442,16 +553,36 @@ export function createDomainGateway(
// User API keys do NOT bypass — the key owner's tier is checked normally.
if (!(keyCheck.valid && wmKey && !isUserApiKey)) {
const entitlementResponse = await checkEntitlement(request, pathname, corsHeaders);
if (entitlementResponse) return entitlementResponse;
if (entitlementResponse) {
const entReason: RequestReason =
entitlementResponse.status === 401 ? 'auth_401'
: entitlementResponse.status === 403 ? 'tier_403'
: 'ok';
emitRequest(entitlementResponse.status, entReason, null);
return entitlementResponse;
}
// Allowed → record the resolved tier for telemetry. getEntitlements has
// its own Redis cache + in-flight coalescing, so the second lookup here
// does not double the cost when checkEntitlement already fetched.
if (isTierGated && sessionUserId && usage.tier === null) {
const ent = await getEntitlements(sessionUserId);
if (ent) usage.tier = typeof ent.features.tier === 'number' ? ent.features.tier : 0;
}
}
// IP-based rate limiting — two-phase: endpoint-specific first, then global fallback
const endpointRlResponse = await checkEndpointRateLimit(request, pathname, corsHeaders);
if (endpointRlResponse) return endpointRlResponse;
if (endpointRlResponse) {
emitRequest(endpointRlResponse.status, 'rate_limit_429', null);
return endpointRlResponse;
}
if (!hasEndpointRatePolicy(pathname)) {
const rateLimitResponse = await checkRateLimit(request, corsHeaders);
if (rateLimitResponse) return rateLimitResponse;
if (rateLimitResponse) {
emitRequest(rateLimitResponse.status, 'rate_limit_429', null);
return rateLimitResponse;
}
}
// Route matching — if POST doesn't match, convert to GET for stale clients
@@ -477,21 +608,38 @@ export function createDomainGateway(
if (!matchedHandler) {
const allowed = router.allowedMethods(new URL(request.url).pathname);
if (allowed.length > 0) {
emitRequest(405, 'ok', null);
return new Response(JSON.stringify({ error: 'Method not allowed' }), {
status: 405,
headers: { 'Content-Type': 'application/json', Allow: allowed.join(', '), ...corsHeaders },
});
}
emitRequest(404, 'ok', null);
return new Response(JSON.stringify({ error: 'Not found' }), {
status: 404,
headers: { 'Content-Type': 'application/json', ...corsHeaders },
});
}
// Execute handler with top-level error boundary
// Execute handler with top-level error boundary.
// Wrap in runWithUsageScope so deep fetch helpers (fetchJson,
// cachedFetchJsonWithMeta) can attribute upstream calls to this customer
// without leaf handlers having to thread a usage hook through every call.
let response: Response;
const identityForScope = buildUsageIdentity(usage);
const handlerCall = matchedHandler;
const requestForHandler = request;
try {
response = await matchedHandler(request);
response = await runWithUsageScope(
{
ctx: ctx ?? { waitUntil: () => {} },
requestId: deriveRequestId(originalRequest),
customerId: identityForScope.customer_id,
route: pathname,
tier: identityForScope.tier,
},
() => handlerCall(requestForHandler),
);
} catch (err) {
console.error('[gateway] Unhandled handler error:', err);
response = new Response(JSON.stringify({ message: 'Internal server error' }), {
@@ -513,6 +661,7 @@ export function createDomainGateway(
}
// For GET 200 responses: read body once for cache-header decisions + ETag
let resolvedCacheTier: CacheTier | null = null;
if (response.status === 200 && request.method === 'GET' && response.body) {
const bodyBytes = await response.arrayBuffer();
@@ -524,12 +673,14 @@ export function createDomainGateway(
if (mergedHeaders.get('X-No-Cache') || isUpstreamUnavailable) {
mergedHeaders.set('Cache-Control', 'no-store');
mergedHeaders.set('X-Cache-Tier', 'no-store');
resolvedCacheTier = 'no-store';
} else {
const rpcName = pathname.split('/').pop() ?? '';
const envOverride = process.env[`CACHE_TIER_OVERRIDE_${rpcName.replace(/-/g, '_').toUpperCase()}`] as CacheTier | undefined;
const isPremium = PREMIUM_RPC_PATHS.has(pathname) || getRequiredTier(pathname) !== null;
const tier = isPremium ? 'slow-browser' as CacheTier
: (envOverride && envOverride in TIER_HEADERS ? envOverride : null) ?? RPC_CACHE_TIER[pathname] ?? 'medium';
resolvedCacheTier = tier;
mergedHeaders.set('Cache-Control', TIER_HEADERS[tier]);
// Only allow Vercel CDN caching for trusted origins (worldmonitor.app, Vercel previews,
// Tauri). No-origin server-side requests (external scrapers) must always reach the edge
@@ -563,9 +714,13 @@ export function createDomainGateway(
const ifNoneMatch = request.headers.get('If-None-Match');
if (ifNoneMatch === etag) {
emitRequest(304, 'ok', resolvedCacheTier, 0);
maybeAttachDevHealthHeader(mergedHeaders);
return new Response(null, { status: 304, headers: mergedHeaders });
}
emitRequest(response.status, 'ok', resolvedCacheTier, view.length);
maybeAttachDevHealthHeader(mergedHeaders);
return new Response(bodyBytes, {
status: response.status,
statusText: response.statusText,
@@ -580,6 +735,12 @@ export function createDomainGateway(
mergedHeaders.delete('X-No-Cache');
}
// Streaming/non-GET-200 responses: res_bytes is best-effort 0 (Content-Length
// is often absent on chunked responses; teeing the stream would add latency).
const finalContentLen = response.headers.get('content-length');
const finalResBytes = finalContentLen ? Number(finalContentLen) || 0 : 0;
emitRequest(response.status, 'ok', resolvedCacheTier, finalResBytes);
maybeAttachDevHealthHeader(mergedHeaders);
return new Response(response.body, {
status: response.status,
statusText: response.statusText,

View File

@@ -0,0 +1,369 @@
/**
* Asserts the Axiom telemetry payload emitted by createDomainGateway() —
* specifically the four fields the round-1 Codex review flagged:
*
* - domain (must be 'shipping' for /api/v2/shipping/* routes, not 'v2')
* - customer_id (must be populated on legacy premium bearer-token success)
* - auth_kind (must reflect the resolved identity, not stay 'anon')
* - tier (recorded when entitlement-gated routes succeed; covered indirectly
* by the legacy bearer success case via the Dodo `tier` branch)
*
* Strategy: enable telemetry (USAGE_TELEMETRY=1 + AXIOM_API_TOKEN=fake), stub
* globalThis.fetch to intercept the Axiom ingest POST, and pass a real ctx
* whose waitUntil collects the in-flight Promises so we can await them after
* the gateway returns.
*/
import assert from 'node:assert/strict';
import { createServer, type Server } from 'node:http';
import { afterEach, before, after, describe, it } from 'node:test';
import { generateKeyPair, exportJWK, SignJWT } from 'jose';
import { createDomainGateway, type GatewayCtx } from '../server/gateway.ts';
interface CapturedEvent {
event_type: string;
domain: string;
route: string;
status: number;
customer_id: string | null;
auth_kind: string;
tier: number;
}
function makeRecordingCtx(): { ctx: GatewayCtx; settled: Promise<void> } {
const pending: Promise<unknown>[] = [];
const ctx: GatewayCtx = {
waitUntil: (p) => { pending.push(p); },
};
// Quiescence loop: emitUsageEvents calls ctx.waitUntil from inside an
// already-pending waitUntil promise, so the array grows during drain.
// Keep awaiting until no new entries appear between iterations.
async function settled(): Promise<void> {
let prev = -1;
while (pending.length !== prev) {
prev = pending.length;
await Promise.allSettled(pending.slice(0, prev));
}
}
return {
ctx,
get settled() { return settled(); },
} as { ctx: GatewayCtx; settled: Promise<void> };
}
function installAxiomFetchSpy(
originalFetch: typeof fetch,
opts: { entitlementsResponse?: unknown } = {},
): {
events: CapturedEvent[];
restore: () => void;
} {
const events: CapturedEvent[] = [];
globalThis.fetch = (async (input: RequestInfo | URL, init?: RequestInit) => {
const url = typeof input === 'string' ? input : input instanceof URL ? input.toString() : input.url;
if (url.includes('api.axiom.co')) {
const body = init?.body ? JSON.parse(init.body as string) as CapturedEvent[] : [];
for (const ev of body) events.push(ev);
return new Response('{}', { status: 200 });
}
if (url.includes('/api/internal-entitlements')) {
return new Response(JSON.stringify(opts.entitlementsResponse ?? null), {
status: 200,
headers: { 'Content-Type': 'application/json' },
});
}
return originalFetch(input as Request | string | URL, init);
}) as typeof fetch;
return { events, restore: () => { globalThis.fetch = originalFetch; } };
}
const ORIGINAL_FETCH = globalThis.fetch;
const ORIGINAL_USAGE_FLAG = process.env.USAGE_TELEMETRY;
const ORIGINAL_AXIOM_TOKEN = process.env.AXIOM_API_TOKEN;
const ORIGINAL_VALID_KEYS = process.env.WORLDMONITOR_VALID_KEYS;
afterEach(() => {
globalThis.fetch = ORIGINAL_FETCH;
if (ORIGINAL_USAGE_FLAG == null) delete process.env.USAGE_TELEMETRY;
else process.env.USAGE_TELEMETRY = ORIGINAL_USAGE_FLAG;
if (ORIGINAL_AXIOM_TOKEN == null) delete process.env.AXIOM_API_TOKEN;
else process.env.AXIOM_API_TOKEN = ORIGINAL_AXIOM_TOKEN;
if (ORIGINAL_VALID_KEYS == null) delete process.env.WORLDMONITOR_VALID_KEYS;
else process.env.WORLDMONITOR_VALID_KEYS = ORIGINAL_VALID_KEYS;
});
describe('gateway telemetry payload — domain extraction', () => {
it("emits domain='shipping' for /api/v2/shipping/* routes (not 'v2')", async () => {
process.env.USAGE_TELEMETRY = '1';
process.env.AXIOM_API_TOKEN = 'test-token';
const spy = installAxiomFetchSpy(ORIGINAL_FETCH);
const handler = createDomainGateway([
{
method: 'GET',
path: '/api/v2/shipping/route-intelligence',
handler: async () => new Response('{"ok":true}', { status: 200 }),
},
]);
const recorder = makeRecordingCtx();
const res = await handler(
new Request('https://worldmonitor.app/api/v2/shipping/route-intelligence', {
headers: { Origin: 'https://worldmonitor.app' },
}),
recorder.ctx,
);
// Anonymous → 401 (premium path, missing API key + no bearer)
assert.equal(res.status, 401);
await recorder.settled;
spy.restore();
assert.equal(spy.events.length, 1, 'expected exactly one telemetry event');
const ev = spy.events[0]!;
assert.equal(ev.domain, 'shipping', `domain should strip leading vN segment, got '${ev.domain}'`);
assert.equal(ev.route, '/api/v2/shipping/route-intelligence');
assert.equal(ev.auth_kind, 'anon');
assert.equal(ev.customer_id, null);
assert.equal(ev.tier, 0);
});
it("emits domain='market' for the standard /api/<domain>/v1/<rpc> layout", async () => {
process.env.USAGE_TELEMETRY = '1';
process.env.AXIOM_API_TOKEN = 'test-token';
const spy = installAxiomFetchSpy(ORIGINAL_FETCH);
const handler = createDomainGateway([
{
method: 'GET',
path: '/api/market/v1/list-market-quotes',
handler: async () => new Response('{"ok":true}', { status: 200 }),
},
]);
const recorder = makeRecordingCtx();
const res = await handler(
new Request('https://worldmonitor.app/api/market/v1/list-market-quotes?symbols=AAPL', {
headers: { Origin: 'https://worldmonitor.app' },
}),
recorder.ctx,
);
assert.equal(res.status, 200);
await recorder.settled;
spy.restore();
assert.equal(spy.events.length, 1);
assert.equal(spy.events[0]!.domain, 'market');
});
});
describe('gateway telemetry payload — bearer identity propagation', () => {
let privateKey: CryptoKey;
let jwksServer: Server;
let jwksPort: number;
before(async () => {
const { publicKey, privateKey: pk } = await generateKeyPair('RS256');
privateKey = pk;
const publicJwk = await exportJWK(publicKey);
publicJwk.kid = 'telemetry-key-1';
publicJwk.alg = 'RS256';
publicJwk.use = 'sig';
const jwks = { keys: [publicJwk] };
jwksServer = createServer((req, res) => {
if (req.url === '/.well-known/jwks.json') {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(jwks));
} else {
res.writeHead(404);
res.end();
}
});
await new Promise<void>((resolve) => jwksServer.listen(0, '127.0.0.1', () => resolve()));
const addr = jwksServer.address();
jwksPort = typeof addr === 'object' && addr ? addr.port : 0;
process.env.CLERK_JWT_ISSUER_DOMAIN = `http://127.0.0.1:${jwksPort}`;
});
after(async () => {
jwksServer?.close();
delete process.env.CLERK_JWT_ISSUER_DOMAIN;
});
function signToken(claims: Record<string, unknown>) {
return new SignJWT(claims)
.setProtectedHeader({ alg: 'RS256', kid: 'telemetry-key-1' })
.setIssuer(`http://127.0.0.1:${jwksPort}`)
.setAudience('convex')
.setSubject(claims.sub as string ?? 'user_test')
.setIssuedAt()
.setExpirationTime('1h')
.sign(privateKey);
}
it('records customer_id from a successful legacy premium bearer call', async () => {
process.env.USAGE_TELEMETRY = '1';
process.env.AXIOM_API_TOKEN = 'test-token';
const spy = installAxiomFetchSpy(ORIGINAL_FETCH);
const handler = createDomainGateway([
{
method: 'GET',
path: '/api/resilience/v1/get-resilience-score',
handler: async () => new Response('{"ok":true}', { status: 200 }),
},
]);
const token = await signToken({ sub: 'user_pro', plan: 'pro' });
const recorder = makeRecordingCtx();
const res = await handler(
new Request('https://worldmonitor.app/api/resilience/v1/get-resilience-score?countryCode=US', {
headers: {
Origin: 'https://worldmonitor.app',
Authorization: `Bearer ${token}`,
},
}),
recorder.ctx,
);
assert.equal(res.status, 200);
await recorder.settled;
spy.restore();
assert.equal(spy.events.length, 1, 'expected exactly one telemetry event');
const ev = spy.events[0]!;
// The whole point of fix #2: pre-fix this would have been null/anon.
assert.equal(ev.customer_id, 'user_pro', 'customer_id should be the bearer subject');
assert.equal(ev.auth_kind, 'clerk_jwt');
assert.equal(ev.domain, 'resilience');
assert.equal(ev.status, 200);
});
it("records tier=2 for an entitlement-gated success (the path the round-1 P2 fix targets)", async () => {
// /api/market/v1/analyze-stock requires tier 2 in ENDPOINT_ENTITLEMENTS.
// Pre-fix: usage.tier stayed null → emitted as 0. Post-fix: gateway re-reads
// entitlements after checkEntitlement allows the request, so tier=2 lands on
// the wire. We exercise this by stubbing the Convex entitlements fallback —
// Redis returns null without UPSTASH env, then getEntitlements falls through
// to the Convex HTTP path which we intercept via the same fetch spy.
process.env.USAGE_TELEMETRY = '1';
process.env.AXIOM_API_TOKEN = 'test-token';
process.env.CONVEX_SITE_URL = 'https://convex.test';
process.env.CONVEX_SERVER_SHARED_SECRET = 'test-shared-secret';
const fakeEntitlements = {
planKey: 'api_starter',
features: {
tier: 2,
apiAccess: true,
apiRateLimit: 1000,
maxDashboards: 10,
prioritySupport: false,
exportFormats: ['json'],
},
validUntil: Date.now() + 60_000,
};
const spy = installAxiomFetchSpy(ORIGINAL_FETCH, { entitlementsResponse: fakeEntitlements });
const handler = createDomainGateway([
{
method: 'GET',
path: '/api/market/v1/analyze-stock',
handler: async () => new Response('{"ok":true}', { status: 200 }),
},
]);
// plan: 'api' so the legacy bearer-role short-circuit (`session.role === 'pro'`)
// does NOT fire — we want the entitlement-check path that populates usage.tier.
const token = await signToken({ sub: 'user_api', plan: 'api' });
const recorder = makeRecordingCtx();
const res = await handler(
new Request('https://worldmonitor.app/api/market/v1/analyze-stock?symbol=AAPL', {
headers: {
Origin: 'https://worldmonitor.app',
Authorization: `Bearer ${token}`,
},
}),
recorder.ctx,
);
assert.equal(res.status, 200, 'entitlement-gated request with sufficient tier should succeed');
await recorder.settled;
spy.restore();
delete process.env.CONVEX_SITE_URL;
delete process.env.CONVEX_SERVER_SHARED_SECRET;
assert.equal(spy.events.length, 1);
const ev = spy.events[0]!;
assert.equal(ev.tier, 2, `tier should reflect resolved entitlement, got ${ev.tier}`);
assert.equal(ev.customer_id, 'user_api');
assert.equal(ev.auth_kind, 'clerk_jwt');
assert.equal(ev.domain, 'market');
assert.equal(ev.route, '/api/market/v1/analyze-stock');
});
it('still emits with auth_kind=anon when the bearer is invalid', async () => {
process.env.USAGE_TELEMETRY = '1';
process.env.AXIOM_API_TOKEN = 'test-token';
const spy = installAxiomFetchSpy(ORIGINAL_FETCH);
const handler = createDomainGateway([
{
method: 'GET',
path: '/api/resilience/v1/get-resilience-score',
handler: async () => new Response('{"ok":true}', { status: 200 }),
},
]);
const recorder = makeRecordingCtx();
const res = await handler(
new Request('https://worldmonitor.app/api/resilience/v1/get-resilience-score?countryCode=US', {
headers: {
Origin: 'https://worldmonitor.app',
Authorization: 'Bearer not-a-real-token',
},
}),
recorder.ctx,
);
assert.equal(res.status, 401);
await recorder.settled;
spy.restore();
assert.equal(spy.events.length, 1);
const ev = spy.events[0]!;
assert.equal(ev.auth_kind, 'anon');
assert.equal(ev.customer_id, null);
});
});
describe('gateway telemetry payload — ctx-optional safety', () => {
it('handler(req) without ctx still resolves cleanly even with telemetry on', async () => {
process.env.USAGE_TELEMETRY = '1';
process.env.AXIOM_API_TOKEN = 'test-token';
const spy = installAxiomFetchSpy(ORIGINAL_FETCH);
const handler = createDomainGateway([
{
method: 'GET',
path: '/api/market/v1/list-market-quotes',
handler: async () => new Response('{"ok":true}', { status: 200 }),
},
]);
const res = await handler(
new Request('https://worldmonitor.app/api/market/v1/list-market-quotes?symbols=AAPL', {
headers: { Origin: 'https://worldmonitor.app' },
}),
);
assert.equal(res.status, 200);
spy.restore();
// No ctx → emit short-circuits → no events delivered. The point is that
// the handler does not throw "Cannot read properties of undefined".
assert.equal(spy.events.length, 0);
});
});