Files
worldmonitor/server/_shared/rate-limit.ts
Elie Habib 5c955691a9 feat(energy-atlas): live tanker map layer + contract (parity PR 3, plan U7-U8) (#3402)
* feat(energy-atlas): live tanker map layer + contract (PR 3, plan U7-U8)

Lands the third and final parity-push surface — per-vessel tanker positions
inside chokepoint bounding boxes, refreshed every 60s. Closes the visual
gap with peer reference energy-intel sites for the live AIS tanker view.

Per docs/plans/2026-04-25-003-feat-energy-parity-pushup-plan.md PR 3.
Codex-approved through 8 review rounds against origin/main @ 050073354.

U7 — Contract changes (relay + handler + proto + gateway + rate-limit + test):

- scripts/ais-relay.cjs: parallel `tankerReports` Map populated for AIS
  ship type 80-89 (tanker class) per ITU-R M.1371. SEPARATE from the
  existing `candidateReports` Map (military-only) so the existing
  military-detection consumer's contract stays unchanged. Snapshot
  endpoint extended to accept `bbox=swLat,swLon,neLat,neLon` + `tankers=true`
  query params, with bbox-filtering applied server-side. Tanker reports
  cleaned up on the same retention window as candidate reports; capped
  at 200 per response (10× headroom for global storage).
- proto/worldmonitor/maritime/v1/{get_,}vessel_snapshot.proto:
  - new `bool include_tankers = 6` request field
  - new `repeated SnapshotCandidateReport tanker_reports = 7` response
    field (reuses existing message shape; parallel to candidate_reports)
- server/worldmonitor/maritime/v1/get-vessel-snapshot.ts: REPLACES the
  prior 5-minute `with|without` cache with a request-keyed cache —
  (includeCandidates, includeTankers, quantizedBbox) — at 60s TTL for
  the live-tanker path and 5min TTL for the existing density/disruption
  consumers. Also adds 1° bbox quantization for cache-key reuse and a
  10° max-bbox guard (BboxTooLargeError) to prevent malicious clients
  from pulling all tankers through one query.
- server/gateway.ts: NEW `'live'` cache tier. CacheTier union extended;
  TIER_HEADERS + TIER_CDN_CACHE both gain entries with `s-maxage=60,
  stale-while-revalidate=60`. RPC_CACHE_TIER maps the maritime endpoint
  from `'no-store'` to `'live'` so the CDN absorbs concurrent identical
  requests across all viewers (without this, N viewers × 6 chokepoints
  hit AISStream upstream linearly).
- server/_shared/rate-limit.ts: ENDPOINT_RATE_POLICIES entry for the
  maritime endpoint at 60 req/min/IP — enough headroom for one user's
  6-chokepoint tab plus refreshes; flags only true scrape-class traffic.
- tests/route-cache-tier.test.mjs: regex extended to include `live` so
  the every-route-has-an-explicit-tier check still recognises the new
  mapping. Without this, the new tier would silently drop the maritime
  route from the validator's route map.

U8 — LiveTankersLayer consumer:

- src/services/live-tankers.ts: per-chokepoint fetcher with 60s in-memory
  cache. Promise.allSettled — never .all — so one chokepoint failing
  doesn't blank the whole layer (failed zones serve last-known data).
  Sources bbox centroids from src/config/chokepoint-registry.ts
  (CORRECT location — server/.../​_chokepoint-ids.ts strips lat/lon).
  Default chokepoint set: hormuz_strait, suez, bab_el_mandeb,
  malacca_strait, panama, bosphorus.
- src/components/DeckGLMap.ts: new `createLiveTankersLayer()` ScatterplotLayer
  styled by speed (anchored amber when speed < 0.5 kn, underway cyan,
  unknown gray); new `loadLiveTankers()` async loader with abort-controller
  cancellation. Layer instantiated when `mapLayers.liveTankers && this.liveTankers.length > 0`.
- src/config/map-layer-definitions.ts: `LayerDefinition` for `liveTankers`
  with `renderers: ['flat'], deckGLOnly: true` (matches existing
  storageFacilities/fuelShortages pattern). Added to `VARIANT_LAYER_ORDER.energy`
  near `ais` so getLayersForVariant() and sanitizeLayersForVariant()
  include it on the energy variant — without this addition the layer
  would be silently stripped even when toggled on.
- src/types/index.ts: `liveTankers?: boolean` on the MapLayers union.
- src/config/panels.ts: ENERGY_MAP_LAYERS + ENERGY_MOBILE_MAP_LAYERS
  both gain `liveTankers: true`. Default `false` everywhere else.
- src/services/maritime/index.ts: existing snapshot consumer pinned to
  `includeTankers: false` to satisfy the proto's new required field;
  preserves identical behavior for the AIS-density / military-detection
  surfaces.

Tests:
- npm run typecheck clean.
- 5 unit tests in tests/live-tankers-service.test.mjs cover the default
  chokepoint set (rejects ids that aren't in CHOKEPOINT_REGISTRY), the
  60s cache TTL pin (must match gateway 'live' tier s-maxage), and bbox
  derivation (±2° padding, total span under the 10° handler guard).
- tests/route-cache-tier.test.mjs continues to pass after the regex
  extension; the new maritime tier is correctly extracted.

Defense in depth:
- THREE-layer cache (CDN 'live' tier → handler bbox-keyed 60s → service
  in-memory 60s) means concurrent users hit the relay sub-linearly.
- Server-side 200-vessel cap on tanker_reports + client-side cap;
  protects layer render perf even on a runaway relay payload.
- Bbox-size guard (10° max) prevents a single global-bbox query from
  exfiltrating every tanker.
- Per-IP rate limit at 60/min covers normal use; flags scrape-class only.
- Existing military-detection contract preserved: `candidate_reports`
  field semantics unchanged; consumers self-select via include_tankers
  vs include_candidates rather than the response field changing meaning.

* fix(energy-atlas): wire LiveTankers loop + 400 bbox-range guard (PR3 review)

Three findings from review of #3402:

P1 — loadLiveTankers() was never called (DeckGLMap.ts:2999):
- Add ensureLiveTankersLoop() / stopLiveTankersLoop() helpers paired with
  the layer-enabled / layer-disabled branches in updateLayers(). The
  ensure helper kicks an immediate load + a 60s setInterval; idempotent
  so calling it on every layers update is safe.
- Wire stopLiveTankersLoop() into destroy() and into the layer-disabled
  branch so we don't hammer the relay when the layer is off.
- Layer factory now runs only when liveTankers.length > 0; ensureLoop
  fires on every observed-enabled tick so first-paint kicks the load
  even before the first tanker arrives.

P1 — bbox lat/lon range guard (get-vessel-snapshot.ts:253):
- Out-of-range bboxes (e.g. ne_lat=200) previously passed the size
  guard (200-195=5° < 10°) but failed at the relay, which silently
  drops the bbox param and returns a global capped subset — making
  the layer appear to "work" with stale phantom data.
- Add isValidLatLon() check inside extractAndValidateBbox(): every
  corner must satisfy [-90, 90] / [-180, 180] before the size guard
  runs. Failure throws BboxValidationError.

P2 — BboxTooLargeError surfaced as 500 instead of 400:
- server/error-mapper.ts maps errors to HTTP status by checking
  `'statusCode' in error`. The previous BboxTooLargeError extended
  Error without that property, so the mapper fell through to
  "unhandled error" → 500.
- Rename to BboxValidationError, add `readonly statusCode = 400`.
  Mapper now surfaces it as HTTP 400 with a descriptive reason.
- Keep BboxTooLargeError as a backwards-compat alias so existing
  imports / tests don't break.

Tests:
- Updated tests/server-handlers.test.mjs structural test to pin the
  new class name + statusCode + lat/lon range checks. 24 tests pass.
- typecheck (src + api) clean.

* fix(energy-atlas): thread AbortSignal through fetchLiveTankers (PR3 review #2)

P2 — AbortController was created + aborted but signal was never passed
into the actual fetch path (DeckGLMap.ts:3048 / live-tankers.ts:100):
- Toggling the layer off, destroying the map, or starting a new refresh
  did not actually cancel in-flight network work. A slow older refresh
  could complete after a newer one and overwrite this.liveTankers with
  stale data.

Threading:
- fetchLiveTankers() now accepts `options.signal: AbortSignal`. Signal
  is passed through to client.getVesselSnapshot() per chokepoint via
  the Connect-RPC client's standard `{ signal }` option.
- Per-zone abort handling: bail early if signal is already aborted
  before the fetch starts (saves a wasted RPC + cache write); re-check
  after the fetch resolves so a slow resolver can't clobber cache
  after the caller cancelled.

Stale-result race guard in DeckGLMap.loadLiveTankers:
- Capture controller in a local before storing on this.liveTankersAbort.
- After fetchLiveTankers resolves, drop the result if EITHER:
  - controller.signal is now aborted (newer load cancelled this one)
  - this.liveTankersAbort points to a different controller (a newer
    load already started + replaced us in the field)
- Without these guards, an older fetch that completed despite
  signal.aborted could still write to this.liveTankers and call
  updateLayers, racing with the newer load.

Tests: 1 new signature-pin test in tests/live-tankers-service.test.mts
verifies fetchLiveTankers accepts options.signal — guards against future
edits silently dropping the parameter and re-introducing the race.
6 tests pass. typecheck clean.

* fix(energy-atlas): bound vessel-snapshot cache via LRU eviction (PR3 review)

Greptile P2 finding: the in-process cache Map grows unbounded across the
serverless instance lifetime. Each distinct (includeCandidates,
includeTankers, quantizedBbox) triple creates a slot that's never evicted.
With 1° quantization and a misbehaving client the keyspace is ~64,000
entries — realistic load is ~12, so a 128-slot cap leaves 10x headroom
while making OOM impossible.

Implementation:
- SNAPSHOT_CACHE_MAX_SLOTS = 128.
- evictIfNeeded() walks insertion order and evicts the first slot whose
  inFlight is null. Slots with active fetches are skipped to avoid
  orphaning awaiting callers; we accept brief over-cap growth until
  in-flight settles.
- touchSlot() re-inserts a slot at the end of Map insertion order on
  hit / in-flight join / fresh write so it counts as most-recently-used.
2026-04-25 17:56:23 +04:00

210 lines
6.6 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { Ratelimit, type Duration } from '@upstash/ratelimit';
import { Redis } from '@upstash/redis';
let ratelimit: Ratelimit | null = null;
function getRatelimit(): Ratelimit | null {
if (ratelimit) return ratelimit;
const url = process.env.UPSTASH_REDIS_REST_URL;
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
if (!url || !token) return null;
ratelimit = new Ratelimit({
redis: new Redis({ url, token }),
limiter: Ratelimit.slidingWindow(600, '60 s'),
prefix: 'rl',
analytics: false,
});
return ratelimit;
}
function getClientIp(request: Request): string {
// With Cloudflare proxy → Vercel, x-real-ip is the CF edge IP (shared across users).
// cf-connecting-ip is the actual client IP set by Cloudflare — prefer it.
// x-forwarded-for is client-settable and MUST NOT be trusted for rate limiting.
return (
request.headers.get('cf-connecting-ip') ||
request.headers.get('x-real-ip') ||
request.headers.get('x-forwarded-for')?.split(',')[0]?.trim() ||
'0.0.0.0'
);
}
function tooManyRequestsResponse(
limit: number,
reset: number,
corsHeaders: Record<string, string>,
): Response {
return new Response(JSON.stringify({ error: 'Too many requests' }), {
status: 429,
headers: {
'Content-Type': 'application/json',
'X-RateLimit-Limit': String(limit),
'X-RateLimit-Remaining': '0',
'X-RateLimit-Reset': String(reset),
'Retry-After': String(Math.ceil((reset - Date.now()) / 1000)),
...corsHeaders,
},
});
}
export async function checkRateLimit(
request: Request,
corsHeaders: Record<string, string>,
): Promise<Response | null> {
const rl = getRatelimit();
if (!rl) return null;
const ip = getClientIp(request);
try {
const { success, limit, reset } = await rl.limit(ip);
if (!success) {
return tooManyRequestsResponse(limit, reset, corsHeaders);
}
return null;
} catch {
return null;
}
}
// --- Per-endpoint rate limiting ---
interface EndpointRatePolicy {
limit: number;
window: Duration;
}
// Exported so scripts/enforce-rate-limit-policies.mjs can import it directly
// (#3278) instead of regex-parsing this file. Internal callers should keep
// using checkEndpointRateLimit / hasEndpointRatePolicy below — the export is
// for tooling, not new runtime callers.
export const ENDPOINT_RATE_POLICIES: Record<string, EndpointRatePolicy> = {
'/api/news/v1/summarize-article-cache': { limit: 3000, window: '60 s' },
'/api/intelligence/v1/classify-event': { limit: 600, window: '60 s' },
// Legacy /api/sanctions-entity-search rate limit was 30/min per IP. Preserve
// that budget now that LookupSanctionEntity proxies OpenSanctions live.
'/api/sanctions/v1/lookup-sanction-entity': { limit: 30, window: '60 s' },
// Lead capture: preserve the 3/hr and 5/hr budgets from legacy api/contact.js
// and api/register-interest.js. Lower limits than normal IP rate limit since
// these hit Convex + Resend per request.
'/api/leads/v1/submit-contact': { limit: 3, window: '1 h' },
'/api/leads/v1/register-interest': { limit: 5, window: '1 h' },
// Scenario engine: legacy /api/scenario/v1/run capped at 10 jobs/min/IP via
// inline Upstash INCR. Gateway now enforces the same budget with per-IP
// keying in checkEndpointRateLimit.
'/api/scenario/v1/run-scenario': { limit: 10, window: '60 s' },
// Live tanker map (Energy Atlas): one user with 6 chokepoints × 1 call/min
// = 6 req/min/IP base load. 60/min headroom covers tab refreshes + zoom
// pans within a single user without flagging legitimate traffic.
'/api/maritime/v1/get-vessel-snapshot': { limit: 60, window: '60 s' },
};
const endpointLimiters = new Map<string, Ratelimit>();
function getEndpointRatelimit(pathname: string): Ratelimit | null {
const policy = ENDPOINT_RATE_POLICIES[pathname];
if (!policy) return null;
const cached = endpointLimiters.get(pathname);
if (cached) return cached;
const url = process.env.UPSTASH_REDIS_REST_URL;
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
if (!url || !token) return null;
const rl = new Ratelimit({
redis: new Redis({ url, token }),
limiter: Ratelimit.slidingWindow(policy.limit, policy.window),
prefix: 'rl:ep',
analytics: false,
});
endpointLimiters.set(pathname, rl);
return rl;
}
export function hasEndpointRatePolicy(pathname: string): boolean {
return pathname in ENDPOINT_RATE_POLICIES;
}
export async function checkEndpointRateLimit(
request: Request,
pathname: string,
corsHeaders: Record<string, string>,
): Promise<Response | null> {
const rl = getEndpointRatelimit(pathname);
if (!rl) return null;
const ip = getClientIp(request);
try {
const { success, limit, reset } = await rl.limit(`${pathname}:${ip}`);
if (!success) {
return tooManyRequestsResponse(limit, reset, corsHeaders);
}
return null;
} catch {
return null;
}
}
// --- In-handler scoped rate limits ---
//
// Handlers that need a per-subscope cap *in addition to* the gateway-level
// endpoint policy (e.g. a tighter budget for one request variant) use this
// helper. Gateway's checkEndpointRateLimit still runs first — this is a
// second stage.
const scopedLimiters = new Map<string, Ratelimit>();
function getScopedRatelimit(scope: string, limit: number, window: Duration): Ratelimit | null {
const cacheKey = `${scope}|${limit}|${window}`;
const cached = scopedLimiters.get(cacheKey);
if (cached) return cached;
const url = process.env.UPSTASH_REDIS_REST_URL;
const token = process.env.UPSTASH_REDIS_REST_TOKEN;
if (!url || !token) return null;
const rl = new Ratelimit({
redis: new Redis({ url, token }),
limiter: Ratelimit.slidingWindow(limit, window),
prefix: 'rl:scope',
analytics: false,
});
scopedLimiters.set(cacheKey, rl);
return rl;
}
export interface ScopedRateLimitResult {
allowed: boolean;
limit: number;
reset: number;
}
/**
* Returns whether the request is under the scoped budget. `scope` is an
* opaque namespace (e.g. `${pathname}#desktop`); `identifier` is usually the
* client IP but can be any stable caller identifier. Fail-open on Redis errors
* to stay consistent with checkRateLimit / checkEndpointRateLimit semantics.
*/
export async function checkScopedRateLimit(
scope: string,
limit: number,
window: Duration,
identifier: string,
): Promise<ScopedRateLimitResult> {
const rl = getScopedRatelimit(scope, limit, window);
if (!rl) return { allowed: true, limit, reset: 0 };
try {
const result = await rl.limit(`${scope}:${identifier}`);
return { allowed: result.success, limit: result.limit, reset: result.reset };
} catch {
return { allowed: true, limit, reset: 0 };
}
}