(backend) introduce add-ons authentication backend

Add a new authentication backend for add-ons, with a core service
managing session state in cache and exposing three API endpoints.

Microsoft Add-ins recommend launching authentication from the
side panel via a dialog, then returning the JWT to the parent
context using postMessage. However, due to Django and SSO security
constraints (window.opener not preserved), this approach is not
viable.

Implement a three-step authentication flow:

- `/init`: create a session, return a short-lived transit token
  and CSRF token. Store session ID in a secure HttpOnly cookie.
- `/poll`: allow the add-on to poll until authentication is
  complete, then consume the session and clear the cookie.
- `/exchange`: exchange the transit token for a JWT, which is
  later retrieved via the `/poll` endpoint.

The add-on opens an authentication dialog, stores the transit
token in sessionStorage, and performs the exchange after login.

This approach works within iframe constraints and provides a
sufficiently secure v0 despite known limitations.
This commit is contained in:
lebaudantoine
2026-04-15 10:07:57 +02:00
committed by aleb_the_flash
parent 5e1e05b001
commit ecb710688d
14 changed files with 1869 additions and 15 deletions

View File

@@ -11,6 +11,7 @@ and this project adheres to
### Added
- ✨(backend) add metadata collection of VAD, connection and chat events
- ✨(backend) introduce add-ons authentication backend
### Fixed

View File

@@ -0,0 +1 @@
"""Meet core add-ons module."""

View File

@@ -0,0 +1,344 @@
"""Authentication session management for add-ons using temporary cache-based sessions."""
import hashlib
import hmac
import secrets
from datetime import datetime, timedelta, timezone
from enum import Enum
from logging import getLogger
from django.conf import settings
from django.core.cache import cache
from django.core.exceptions import ImproperlyConfigured
from core.models import User
from core.services.jwt_token import JwtTokenService
logger = getLogger(__name__)
_PUBLIC_SESSION_FIELDS = frozenset(
{"state", "access_token", "token_type", "expires_in", "scope"}
)
class SessionDataError(Exception):
"""Raised when session data is invalid or malformed."""
class CSRFTokenError(Exception):
"""Raised when CSRF token verification fails."""
class TransitTokenError(Exception):
"""Raised when a transit token is invalid or expired."""
class SessionExpiredError(Exception):
"""Raised when a session has expired."""
class SessionNotFoundError(Exception):
"""Raised when a session is not found."""
class SuspiciousSessionError(Exception):
"""Raised when session state indicates a possible attack or bug."""
class SessionState(str, Enum):
"""Add-on authentication session lifecycle states."""
PENDING = "pending"
AUTHENTICATED = "authenticated"
class TransitTokenState(str, Enum):
"""Transit token lifecycle states; CONSUMED is retained to detect replay."""
PENDING = "pending"
CONSUMED = "consumed"
class TokenExchangeService:
"""Manage temporary authentication sessions for add-on JWT token exchange."""
def __init__(self):
"""Build the underlying JWT service and validate required settings."""
if not settings.ADDONS_CSRF_SECRET:
raise ImproperlyConfigured("CSRF Secret is required.")
if not settings.ADDONS_TOKEN_SCOPE:
raise ImproperlyConfigured("Token scope must be defined.")
self._token_service = JwtTokenService(
secret_key=settings.ADDONS_TOKEN_SECRET_KEY,
algorithm=settings.ADDONS_TOKEN_ALG,
issuer=settings.ADDONS_TOKEN_ISSUER,
audience=settings.ADDONS_TOKEN_AUDIENCE,
expiration_seconds=settings.ADDONS_TOKEN_TTL,
token_type=settings.ADDONS_TOKEN_TYPE,
)
@staticmethod
def _cache_key(prefix: str, token: str) -> str:
"""Build a namespaced cache key: ``addons_{prefix}_{token}``."""
return f"addons_{prefix}_{token}"
@staticmethod
def _derive_csrf_token(session_id: str) -> str:
"""Derive the CSRF token as HMAC-SHA256(session_id) under ADDONS_CSRF_SECRET."""
return hmac.new(
settings.ADDONS_CSRF_SECRET.encode("utf-8"),
session_id.encode("utf-8"),
hashlib.sha256,
).hexdigest()
@staticmethod
def _validate_session_not_expired(session_data: dict) -> int:
"""Return remaining seconds until expiry, or raise if missing/malformed/expired."""
expires_at_str = session_data.get("expires_at")
if expires_at_str is None:
raise SessionDataError("Invalid session data: missing expiration.")
try:
expires_at = datetime.fromisoformat(expires_at_str)
except ValueError as e:
raise SessionDataError("Invalid session data: malformed expiration.") from e
remaining_seconds = int(
(expires_at - datetime.now(timezone.utc)).total_seconds()
)
if remaining_seconds <= 0:
raise SessionExpiredError("Session expired.")
return remaining_seconds
def _generate_session_id(self) -> str:
"""Generate a high-entropy URL-safe session_id."""
return secrets.token_urlsafe(settings.ADDONS_RANDOM_TOKEN_BYTE_LENGTH)
def _generate_transit_token(self) -> str:
"""Generate a high-entropy URL-safe transit token."""
return secrets.token_urlsafe(settings.ADDONS_RANDOM_TOKEN_BYTE_LENGTH)
def init_session(self) -> tuple[str, str, str]:
"""Create a new pending session and its transit binding.
Returns:
(transit_token, session_id, csrf_token)
"""
session_id = self._generate_session_id()
transit_token = self._generate_transit_token()
csrf_token = self._derive_csrf_token(session_id)
expires_at = (
datetime.now(timezone.utc) + timedelta(seconds=settings.ADDONS_SESSION_TTL)
).isoformat()
session_data = {
"state": SessionState.PENDING,
"expires_at": expires_at,
"transit_token": transit_token,
}
cache.set(
self._cache_key(settings.ADDONS_CACHE_PREFIX_SESSION, session_id),
session_data,
settings.ADDONS_SESSION_TTL,
)
transit_token_data = {
"session_id": session_id,
"state": TransitTokenState.PENDING,
}
cache.set(
self._cache_key(settings.ADDONS_CACHE_PREFIX_TRANSIT, transit_token),
transit_token_data,
settings.ADDONS_TRANSIT_TOKEN_TTL,
)
return transit_token, session_id, csrf_token
def verify_csrf(self, session_id: str, submitted_csrf: str) -> None:
"""Constant-time verify submitted_csrf against HMAC(session_id). Raise on mismatch."""
expected_csrf = self._derive_csrf_token(session_id)
if not hmac.compare_digest(expected_csrf, submitted_csrf):
raise CSRFTokenError("Invalid CSRF token.")
def consume_transit_token(self, transit_token: str) -> str:
"""Mark transit token consumed and return its session_id.
A replay (second consume) evicts the session as a security cleanup and raises.
Raises:
TransitTokenError: If token is unknown, expired, or already consumed.
"""
cache_key = self._cache_key(settings.ADDONS_CACHE_PREFIX_TRANSIT, transit_token)
transit_token_data = cache.get(cache_key)
if transit_token_data is None:
# Indistinguishable from here: either the token was never issued (attacker
# probing or client bug) or it was issued but expired before consumption.
logger.warning(
"Transit token not found in cache (unknown or expired).",
)
raise TransitTokenError("Invalid or expired transit token.")
state = transit_token_data.get("state", None)
session_id = transit_token_data.get("session_id", None)
if not session_id:
logger.warning("Transit token data missing session_id.")
raise TransitTokenError("Invalid transit token.")
if state == TransitTokenState.CONSUMED:
logger.warning(
"Replay on session %s",
session_id,
)
# Security cleanup: a replay attempt means the transit token leaked
# (or an attacker is probing). Evict the session so the authenticated
# tokens — if they exist — can no longer be polled.
cache.delete(
self._cache_key(settings.ADDONS_CACHE_PREFIX_SESSION, session_id)
)
raise TransitTokenError("Transit token already consumed.")
new_transit_token_data = {
"state": TransitTokenState.CONSUMED,
"session_id": session_id,
}
cache.set(
cache_key,
new_transit_token_data,
settings.ADDONS_SESSION_TTL,
)
return session_id
@staticmethod
def is_session_pending(session_data: dict) -> bool:
"""Return True if the public session dict is still in the pending state."""
return session_data.get("state") == SessionState.PENDING
def _get_session_data(self, session_id: str) -> dict:
"""Fetch raw session data from cache, or raise SessionNotFoundError."""
if not session_id:
raise SessionNotFoundError("Session not found.")
data = cache.get(
self._cache_key(settings.ADDONS_CACHE_PREFIX_SESSION, session_id)
)
if data is None:
raise SessionNotFoundError("Session not found.")
return data
def get_session(self, session_id: str) -> dict:
"""Return the public session view; evict the session on authenticated read.
Raises:
SessionNotFoundError: If session is not found.
SessionDataError: If session data is missing the state field.
"""
# raises if session is not found
session_data = self._get_session_data(session_id)
if "state" not in session_data:
raise SessionDataError("Invalid session data: missing state field.")
# One-time read: clear both bindings for authenticated sessions
if session_data["state"] == SessionState.AUTHENTICATED:
cache.delete(
self._cache_key(settings.ADDONS_CACHE_PREFIX_SESSION, session_id)
)
# Return public fields only
return {k: v for k, v in session_data.items() if k in _PUBLIC_SESSION_FIELDS}
def _validate_transit_token_state(self, session_data: dict) -> None:
"""Assert the session's transit token exists in cache and is in CONSUMED state.
Raises:
SessionDataError: session_data is missing the transit_token field.
SuspiciousSessionError: transit entry is missing, or still pending (flow skipped).
"""
transit_token = session_data.get("transit_token", None)
if transit_token is None:
raise SessionDataError("Invalid session data: missing transit_token field.")
transit_token_data = cache.get(
self._cache_key(settings.ADDONS_CACHE_PREFIX_TRANSIT, transit_token)
)
if transit_token_data is None:
logger.warning("Transit token missing when setting access token.")
raise SuspiciousSessionError("Transit token not found.")
if transit_token_data.get("state") != TransitTokenState.CONSUMED:
logger.warning("Access token requested without completing transit flow.")
raise SuspiciousSessionError("Transit token not consumed.")
def set_access_token(self, user: User, session_id: str) -> None:
"""Authenticate a pending session by minting a JWT and storing it on the session.
Non-pending sessions are evicted as a security cleanup before raising.
Raises:
SessionNotFoundError: If session doesn't exist.
SessionDataError: If session data is malformed.
SessionExpiredError: If session has expired.
SuspiciousSessionError: If session is not pending or transit wasn't consumed.
"""
# raises if session is not found
session_data = self._get_session_data(session_id)
if session_data.get("state") != SessionState.PENDING:
logger.warning(
"Session's state is not pending. Suspicious.",
)
# Security cleanup: evict the session so any cached tokens cannot be polled.
cache.delete(
self._cache_key(settings.ADDONS_CACHE_PREFIX_SESSION, session_id)
)
raise SuspiciousSessionError("Session is not in pending state.")
# raises if transit_token is invalid
try:
self._validate_transit_token_state(session_data)
except SuspiciousSessionError:
# Security cleanup: evict the session.
cache.delete(
self._cache_key(settings.ADDONS_CACHE_PREFIX_SESSION, session_id)
)
raise
# raises if session is expired
remaining_seconds = self._validate_session_not_expired(session_data)
response = self._token_service.generate_jwt(user, settings.ADDONS_TOKEN_SCOPE)
new_data = {
"access_token": response["access_token"],
"token_type": response["token_type"],
"expires_in": response["expires_in"],
"scope": response["scope"],
"expires_at": session_data["expires_at"],
"state": SessionState.AUTHENTICATED,
}
cache.set(
self._cache_key(settings.ADDONS_CACHE_PREFIX_SESSION, session_id),
new_data,
remaining_seconds,
)

View File

@@ -0,0 +1,229 @@
"""Add-ons API endpoints"""
from logging import getLogger
from django.conf import settings
from django.core.exceptions import SuspiciousOperation
from rest_framework import decorators, viewsets
from rest_framework import (
response as drf_response,
)
from rest_framework import status as drf_status
from core.addons.service import (
CSRFTokenError,
SessionDataError,
SessionExpiredError,
SessionNotFoundError,
SuspiciousSessionError,
TokenExchangeService,
TransitTokenError,
)
from core.api.feature_flag import FeatureFlag
from core.api.permissions import IsAuthenticated
logger = getLogger(__name__)
class SessionViewSet(viewsets.ViewSet):
"""ViewSet for managing add-on authentication sessions via token exchange.
Implements a three-step flow that lets a third-party add-on (running in an
embedded iframe) obtain an access token without exposing it to client-side
JavaScript:
1. /init: the add-on opens a session and receives a short-lived transit
token (used to bootstrap the OAuth-style exchange in a dialog) and a
CSRF token. The opaque session id is stored in an HttpOnly, Secure,
SameSite=None cookie so it can accompany cross-origin polls.
2. /poll: the add-on polls until the session transitions from pending to
authenticated. On the terminal read, the session payload (access
token, token type, expiry, etc.) is returned, the session is evicted
server-side, and the session cookie is cleared so the tokens can be
retrieved exactly once.
3. /exchange: called from the post-login callback page on our own domain,
after the user has authenticated in a dialog opened by the addon. The
transit token (carried client-side via postMessage + sessionStorage)
is redeemed here for the authenticated user's access token, which is
stored server-side against the session. Requires an authenticated
user — that user is whose access token gets bound to the session.
/init and /poll authenticate the caller through the session cookie +
CSRF token pair alone — no user login is required, since the whole point
of the flow is to bootstrap one. /exchange, by contrast, requires an
authenticated user and does not use the addonsSid cookie.
"""
throttle_classes = []
@decorators.action(
detail=False,
methods=["POST"],
url_path="init",
authentication_classes=[],
permission_classes=[],
)
@FeatureFlag.require("addons")
def init(self, request):
"""Open a new add-on authentication session.
Creates a fresh session server-side and returns the credentials the
add-on needs to drive the rest of the flow.
"""
transit_token, session_id, csrf_token = TokenExchangeService().init_session()
response = drf_response.Response(
{"transit_token": transit_token, "csrf_token": csrf_token},
status=drf_status.HTTP_201_CREATED,
)
# SameSite=None allows the cookie to be sent on cross-origin requests,
# which is required because the /poll endpoint is called from an iframe
# embedded in a third-party site. Secure=True is mandatory when SameSite=None.
# HttpOnly prevents JS access, so the cookie can only be read by the server.
response.set_cookie(
key=settings.ADDONS_SESSION_ID_COOKIE,
value=session_id,
max_age=settings.ADDONS_SESSION_TTL,
httponly=True,
secure=True,
samesite="None",
)
return response
@decorators.action(
detail=False,
methods=["POST"],
url_path="poll",
authentication_classes=[],
permission_classes=[],
)
@FeatureFlag.require("addons")
def poll(self, request):
"""Poll a session for its current state and, if terminal, consume it.
Authenticates the caller using the addonsSid cookie (set by
/init) together with the X-CSRF-Token header, which must match
the CSRF token issued for that session. The session id alone is not
sufficient — both must be presented and must correspond.
Behavior depends on the session's current state:
- **Pending**: the token exchange has not yet completed. Returns
202 Accepted with `{"state": "pending"}`. The cookie is preserved
so the add-on can keep polling.
- **Authenticated** (or any other terminal state): returns 200 OK
with the session payload (access token, token type, expiry, etc.)
and clears the `addonsSid` cookie. The session is also evicted
server-side on this terminal read, so the tokens can be retrieved
exactly once.
A CSRF mismatch is treated as a `SuspiciousOperation` rather than a
normal 4xx, so it is logged by Django's security middleware and
surfaced as a 400 without leaking which check failed.
"""
session_id = request.COOKIES.get(settings.ADDONS_SESSION_ID_COOKIE)
submitted_csrf = request.headers.get("X-CSRF-Token")
if not session_id:
return drf_response.Response(
{"detail": "Missing credentials."},
status=drf_status.HTTP_401_UNAUTHORIZED,
)
if not submitted_csrf:
return drf_response.Response(
{"detail": "Missing CSRF token."},
status=drf_status.HTTP_400_BAD_REQUEST,
)
service = TokenExchangeService()
try:
service.verify_csrf(session_id, submitted_csrf)
except CSRFTokenError as e:
raise SuspiciousOperation(str(e)) from e
try:
session = service.get_session(session_id)
except SessionNotFoundError:
return drf_response.Response(
{"detail": "Session not found."},
status=drf_status.HTTP_404_NOT_FOUND,
)
except SessionDataError:
return drf_response.Response(
{"detail": "Invalid or expired session."},
status=drf_status.HTTP_400_BAD_REQUEST,
)
if service.is_session_pending(session):
return drf_response.Response(
{"state": "pending"}, status=drf_status.HTTP_202_ACCEPTED
)
response = drf_response.Response(session, status=drf_status.HTTP_200_OK)
response.delete_cookie(
key=settings.ADDONS_SESSION_ID_COOKIE,
samesite="None",
)
return response
@decorators.action(
detail=False,
methods=["POST"],
url_path="exchange",
permission_classes=[IsAuthenticated],
)
@FeatureFlag.require("addons")
def exchange(self, request):
"""Redeem a transit token for an access token bound to the current user.
Called from the post-OIDC callback page on our own domain. The transit
token was issued by /init, passed to the authentication dialog via
postMessage, stashed in sessionStorage, and read back by this page
after login completes.
The authenticated user (request.user) is whose access token gets stored
against the session. On success, the addon's next /poll will transition
from pending to authenticated and receive the token payload.
Transit tokens are single-use: a replayed token is rejected with 400.
"""
transit_token = request.data.get("transit_token")
if not transit_token:
return drf_response.Response(
{"detail": "Missing transit_token."},
status=drf_status.HTTP_400_BAD_REQUEST,
)
service = TokenExchangeService()
try:
session_id = service.consume_transit_token(transit_token)
except TransitTokenError:
return drf_response.Response(
{"detail": "Invalid or expired transit token."},
status=drf_status.HTTP_400_BAD_REQUEST,
)
try:
service.set_access_token(request.user, session_id)
except SessionNotFoundError:
return drf_response.Response(
{"detail": "Session not found."},
status=drf_status.HTTP_404_NOT_FOUND,
)
except (SessionDataError, SessionExpiredError, SuspiciousSessionError):
return drf_response.Response(
{"detail": "Invalid or expired session."},
status=drf_status.HTTP_400_BAD_REQUEST,
)
return drf_response.Response({"status": "ok"}, status=drf_status.HTTP_200_OK)

View File

@@ -14,6 +14,7 @@ class FeatureFlag:
"storage_event": "RECORDING_STORAGE_EVENT_ENABLE",
"subtitle": "ROOM_SUBTITLE_ENABLED",
"file_upload": "FILE_UPLOAD_ENABLED",
"addons": "ADDONS_ENABLED",
}
@classmethod

View File

@@ -0,0 +1,601 @@
"""
Tests for add-ons API /sessions/init and /sessions/poll endpoints
"""
# pylint: disable=redefined-outer-name,unused-argument
import re
from unittest.mock import patch
import pytest
from rest_framework.test import APIClient
from core.addons.service import (
SessionDataError,
SessionExpiredError,
SessionNotFoundError,
SuspiciousSessionError,
TokenExchangeService,
)
from core.factories import UserFactory
pytestmark = pytest.mark.django_db
# ================================
# endpoint /addons/sessions/init/
# ================================
def test_init_feature_flag_disabled(client, settings):
"""Should return 404 on POST when feature is disabled."""
settings.ADDONS_ENABLED = False
response = client.post("/api/v1.0/addons/sessions/init/")
assert response.status_code == 404
def test_init_only_accepts_post():
"""Should return 201 JSON with only transit_token and csrf_token."""
response = APIClient().post("/api/v1.0/addons/sessions/init/")
assert response.status_code == 201
assert response["Content-Type"] == "application/json"
response_data = response.json()
# session_id must only be delivered via cookie, not in the same channel as csrf_token.
assert set(response_data.keys()) == {"transit_token", "csrf_token"}
transit_token = response_data["transit_token"]
# URL-safe base64 alphabet: A-Z, a-z, 0-9, -, _
assert re.match(r"^[A-Za-z0-9_-]+$", transit_token)
csrf_token = response_data["csrf_token"]
# HMAC-SHA256 → 64-character hex string.
assert re.match(r"^[a-f0-9]{64}$", csrf_token)
assert csrf_token != transit_token
def test_init_rejects_non_post_methods():
"""Should return 405 Method Not Allowed on GET."""
response = APIClient().get("/api/v1.0/addons/sessions/init/")
assert response.status_code == 405
def test_init_generates_unique_tokens_across_calls():
"""Should generate a distinct transit_token and csrf_token for every call."""
api_client = APIClient()
tokens = set()
csrf_tokens = set()
for _ in range(5):
response = api_client.post("/api/v1.0/addons/sessions/init/")
tokens.add(response.json()["transit_token"])
csrf_tokens.add(response.json()["csrf_token"])
assert len(tokens) == 5
assert len(csrf_tokens) == 5
def test_init_cookie_authorizes_subsequent_poll():
"""Should issue a session cookie that, with the returned csrf_token, authorizes /poll."""
api_client = APIClient()
init_response = api_client.post("/api/v1.0/addons/sessions/init/")
assert init_response.status_code == 201
assert "addonsSid" in init_response.cookies
csrf_token = init_response.json()["csrf_token"]
poll_response = api_client.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_token,
)
assert poll_response.status_code == 202
assert poll_response.json() == {"state": "pending"}
def test_init_session_id_cookie_attributes(settings):
"""Should set the session cookie with the security attributes required for iframe embedding."""
response = APIClient().post("/api/v1.0/addons/sessions/init/")
cookies = response.cookies
assert list(cookies) == ["addonsSid"] # only this cookie
cookie = cookies["addonsSid"]
assert re.match(r"^[A-Za-z0-9_-]+$", cookie.value), "URL-safe base64 expected"
assert cookie["httponly"] is True, (
"HttpOnly required — cookie must not be JS-readable"
)
assert cookie["secure"] is True, (
"Secure required — cookie must not travel over HTTP"
)
assert cookie["samesite"] == "None", (
"SameSite=None required for cross-origin iframe"
)
assert cookie["max-age"] == settings.ADDONS_SESSION_TTL
def test_init_session_id_cookie_respects_configured_name(settings):
"""Should name the session cookie according to the ADDONS_SESSION_ID_COOKIE setting."""
api_client = APIClient()
settings.ADDONS_SESSION_ID_COOKIE = "mockSessionSid"
response = api_client.post("/api/v1.0/addons/sessions/init/")
assert "mockSessionSid" in response.cookies
assert response.cookies.get("mockSessionSid") is not None
# =================================
# endpoint /addons/sessions/poll/
# =================================
def test_poll_feature_flag_disabled(client, settings):
"""Should return 404 on POST when feature is disabled."""
settings.ADDONS_ENABLED = False
response = client.post("/api/v1.0/addons/sessions/poll/")
assert response.status_code == 404
def test_poll_rejects_missing_csrf_token():
"""Should reject requests that carry the sessionSid cookie but omit the CSRF header."""
api_client = APIClient()
init_response = api_client.post("/api/v1.0/addons/sessions/init/")
assert init_response.status_code == 201
# X-CSRF-Token is deliberately omitted
poll_response = api_client.post("/api/v1.0/addons/sessions/poll/")
assert poll_response.status_code == 400
assert poll_response.json() == {"detail": "Missing CSRF token."}
def test_poll_missing_cookie():
"""Should return 401 when no sessionSid cookie is present."""
api_client = APIClient()
poll_response = api_client.post("/api/v1.0/addons/sessions/poll/")
assert poll_response.status_code == 401
assert poll_response.json() == {"detail": "Missing credentials."}
def test_poll_rejects_invalid_csrf_token():
"""Should reject requests carrying an invalid CSRF token."""
api_client = APIClient()
init_response = api_client.post("/api/v1.0/addons/sessions/init/")
assert init_response.status_code == 201
poll_response = api_client.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN="invalid-csrf-token",
)
# SuspiciousOperation translates to 400 via Django's exception middleware.
assert poll_response.status_code == 400
@patch(
"core.addons.service.TokenExchangeService._get_session_data",
side_effect=SessionNotFoundError("Session not found."),
)
def test_poll_session_not_found(mock_get_session_data):
"""Should return 404 when the session is not found."""
api_client = APIClient()
init_response = api_client.post("/api/v1.0/addons/sessions/init/")
assert init_response.status_code == 201
csrf_token = init_response.json()["csrf_token"]
poll_response = api_client.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_token,
)
assert poll_response.status_code == 404
assert poll_response.json() == {"detail": "Session not found."}
@patch(
"core.addons.service.TokenExchangeService._get_session_data",
side_effect=SessionDataError("Session corrupted."),
)
def test_poll_session_corrupted(mock_get_session_data):
"""Should return 400 when the session is corrupted."""
api_client = APIClient()
init_response = api_client.post("/api/v1.0/addons/sessions/init/")
assert init_response.status_code == 201
csrf_token = init_response.json()["csrf_token"]
poll_response = api_client.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_token,
)
assert poll_response.status_code == 400
assert poll_response.json() == {"detail": "Invalid or expired session."}
def test_poll_session_authenticated():
"""Should return tokens and tears down the polling channel when authenticated."""
api_client = APIClient()
init_response = api_client.post("/api/v1.0/addons/sessions/init/")
assert init_response.status_code == 201
session_id_cookie = init_response.cookies["addonsSid"]
csrf_token = init_response.json()["csrf_token"]
transit_token = init_response.json()["transit_token"]
# Simulate Authentication done in the opened dialog
service = TokenExchangeService()
service.consume_transit_token(transit_token)
service.set_access_token(UserFactory(), session_id_cookie.value)
poll_response = api_client.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_token,
)
assert poll_response.status_code == 200
response_data = poll_response.json()
access_token = response_data.pop("access_token")
assert isinstance(access_token, str) and access_token # non-empty string
assert response_data == {
"expires_in": 7200,
"scope": "room:create",
"state": "authenticated",
"token_type": "Bearer",
}
# Verify the server cleared the addonsSid cookie
cleared_cookie = poll_response.cookies["addonsSid"]
assert cleared_cookie.value == ""
assert cleared_cookie["max-age"] == 0
# Server cleared the addonsSid cookie; APIClient drops it → no credentials.
poll_response = api_client.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_token,
)
assert poll_response.status_code == 401
# Replay the original addonsSid: session was evicted on terminal read.
api_client.cookies["addonsSid"] = session_id_cookie.value
poll_response = api_client.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_token,
)
assert poll_response.status_code == 404
assert poll_response.json() == {"detail": "Session not found."}
def test_poll_two_clients_do_not_interfere():
"""Two clients poll independently; CSRF tokens are bound to their own session."""
client_a = APIClient()
client_b = APIClient()
init_a = client_a.post("/api/v1.0/addons/sessions/init/")
init_b = client_b.post("/api/v1.0/addons/sessions/init/")
assert init_a.status_code == 201
assert init_b.status_code == 201
csrf_a = init_a.json()["csrf_token"]
csrf_b = init_b.json()["csrf_token"]
poll_id_a = init_a.cookies["addonsSid"].value
poll_id_b = init_b.cookies["addonsSid"].value
# Sessions must be distinct.
assert csrf_a != csrf_b
assert poll_id_a != poll_id_b
# Each client polls its own session.
poll_a = client_a.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_a,
)
poll_b = client_b.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_b,
)
assert poll_a.status_code == 202
assert poll_b.status_code == 202
# Cross-use (A's cookie + B's CSRF) must be rejected.
cross_response = client_a.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_b,
)
assert cross_response.status_code == 400
# A's session transitioning to authenticated must not affect B.
with patch(
"core.addons.service.TokenExchangeService._get_session_data",
return_value={
"state": "authenticated",
"expires_at": "foo",
"access_token": "mock-token",
"token_type": "Bearer",
"expires_in": 100,
},
):
poll_a = client_a.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_a,
)
assert poll_a.status_code == 200
poll_b = client_b.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_b,
)
assert poll_b.status_code == 202
def test_poll_csrf_attack_does_not_disrupt_legitimate_client():
"""CSRF attack using the pollId cookie must fail without burning the session."""
legitimate = APIClient()
init_response = legitimate.post("/api/v1.0/addons/sessions/init/")
assert init_response.status_code == 201
csrf_token = init_response.json()["csrf_token"]
session_id_value = init_response.cookies["addonsSid"].value
# Attacker has the cookie (SameSite=None) but not the CSRF token.
attacker = APIClient()
attacker.cookies["addonsSid"] = session_id_value
# No CSRF header
attack_no_csrf = attacker.post("/api/v1.0/addons/sessions/poll/")
assert attack_no_csrf.status_code == 400
assert attack_no_csrf.json() == {"detail": "Missing CSRF token."}
# Fabricated CSRF token
attack_bad_csrf = attacker.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN="attacker-guessed-token",
)
assert attack_bad_csrf.status_code == 400
# Legitimate client's session is still usable.
legitimate_poll = legitimate.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_token,
)
assert legitimate_poll.status_code == 202
assert legitimate_poll.json() == {"state": "pending"}
# =====================================
# endpoint /addons/sessions/exchange/
# =====================================
def test_exchange_feature_flag_disabled(settings):
"""Should return 404 on POST when feature is disabled."""
settings.ADDONS_ENABLED = False
api_client = APIClient()
api_client.force_authenticate(user=UserFactory())
response = api_client.post("/api/v1.0/addons/sessions/exchange/")
assert response.status_code == 404
def test_exchange_requires_authentication():
"""Should return 401 when the caller is not authenticated."""
api_client = APIClient()
response = api_client.post(
"/api/v1.0/addons/sessions/exchange/",
{"transit_token": "irrelevant"},
format="json",
)
assert response.status_code == 401
def test_exchange_rejects_missing_transit_token():
"""Should return 400 when the request body has no transit_token."""
api_client = APIClient()
api_client.force_authenticate(user=UserFactory())
response = api_client.post(
"/api/v1.0/addons/sessions/exchange/",
{},
format="json",
)
assert response.status_code == 400
assert response.json() == {"detail": "Missing transit_token."}
def test_exchange_rejects_empty_transit_token():
"""Should return 400 when transit_token is present but empty."""
api_client = APIClient()
api_client.force_authenticate(user=UserFactory())
response = api_client.post(
"/api/v1.0/addons/sessions/exchange/",
{"transit_token": ""},
format="json",
)
assert response.status_code == 400
assert response.json() == {"detail": "Missing transit_token."}
def test_exchange_rejects_invalid_transit_token():
"""Should return 400 when the transit token is unknown or malformed."""
api_client = APIClient()
api_client.force_authenticate(user=UserFactory())
response = api_client.post(
"/api/v1.0/addons/sessions/exchange/",
{"transit_token": "not-a-real-transit-token"},
format="json",
)
assert response.status_code == 400
assert response.json() == {"detail": "Invalid or expired transit token."}
def test_exchange_rejects_replayed_transit_token():
"""Should return 400 when a transit token is reused after being consumed."""
init_client = APIClient()
init_response = init_client.post("/api/v1.0/addons/sessions/init/")
assert init_response.status_code == 201
transit_token = init_response.json()["transit_token"]
auth_client = APIClient()
auth_client.force_authenticate(user=UserFactory())
first = auth_client.post(
"/api/v1.0/addons/sessions/exchange/",
{"transit_token": transit_token},
format="json",
)
assert first.status_code == 200
second = auth_client.post(
"/api/v1.0/addons/sessions/exchange/",
{"transit_token": transit_token},
format="json",
)
assert second.status_code == 400
assert second.json() == {"detail": "Invalid or expired transit token."}
def test_exchange_success_enables_poll_to_complete():
"""Should bind tokens to the session so the polling completes."""
# 1. Taskpane opens a session.
taskpane = APIClient()
init_response = taskpane.post("/api/v1.0/addons/sessions/init/")
assert init_response.status_code == 201
transit_token = init_response.json()["transit_token"]
csrf_token = init_response.json()["csrf_token"]
# 2. Dialog completes OIDC; post-login page (authenticated, separate
# client — no addonsSid cookie) calls /exchange with the transit token.
dialog = APIClient()
dialog.force_authenticate(user=UserFactory())
exchange_response = dialog.post(
"/api/v1.0/addons/sessions/exchange/",
{"transit_token": transit_token},
format="json",
)
assert exchange_response.status_code == 200
assert exchange_response.json() == {"status": "ok"}
# 3. Taskpane's next poll transitions from pending → authenticated.
poll_response = taskpane.post(
"/api/v1.0/addons/sessions/poll/",
HTTP_X_CSRF_TOKEN=csrf_token,
)
assert poll_response.status_code == 200
response_data = poll_response.json()
assert response_data["state"] == "authenticated"
assert response_data["token_type"] == "Bearer"
assert isinstance(response_data["access_token"], str)
assert response_data["access_token"]
@patch(
"core.addons.service.TokenExchangeService.set_access_token",
side_effect=SessionNotFoundError("Session not found."),
)
def test_exchange_returns_when_session_missing(mock_set_access_token):
"""Should return 404 when the session bound to the transit token is gone."""
init_response = APIClient().post("/api/v1.0/addons/sessions/init/")
transit_token = init_response.json()["transit_token"]
auth_client = APIClient()
auth_client.force_authenticate(user=UserFactory())
response = auth_client.post(
"/api/v1.0/addons/sessions/exchange/",
{"transit_token": transit_token},
format="json",
)
assert response.status_code == 404
assert response.json() == {"detail": "Session not found."}
@pytest.mark.parametrize(
"service_error",
[SessionDataError, SessionExpiredError, SuspiciousSessionError],
)
def test_exchange_on_invalid_session(service_error):
"""Should return 400 on malformed, expired, or suspicious sessions."""
init_response = APIClient().post("/api/v1.0/addons/sessions/init/")
transit_token = init_response.json()["transit_token"]
auth_client = APIClient()
auth_client.force_authenticate(user=UserFactory())
with patch(
"core.addons.service.TokenExchangeService.set_access_token",
side_effect=service_error("boom"),
):
response = auth_client.post(
"/api/v1.0/addons/sessions/exchange/",
{"transit_token": transit_token},
format="json",
)
assert response.status_code == 400
assert response.json() == {"detail": "Invalid or expired session."}
def test_exchange_rejects_non_post_methods():
"""Should return 405 Method Not Allowed on non-POST verbs."""
api_client = APIClient()
api_client.force_authenticate(user=UserFactory())
for method in ("get", "put", "patch", "delete"):
response = getattr(api_client, method)("/api/v1.0/addons/sessions/exchange/")
assert response.status_code == 405, f"{method.upper()} should be rejected"
def test_exchange_binds_to_authenticated_user():
"""Should pass the authenticated user to set_access_token."""
init_response = APIClient().post("/api/v1.0/addons/sessions/init/")
transit_token = init_response.json()["transit_token"]
expected_user = UserFactory()
auth_client = APIClient()
auth_client.force_authenticate(user=expected_user)
with patch(
"core.addons.service.TokenExchangeService.set_access_token",
return_value=None,
) as mock_set:
response = auth_client.post(
"/api/v1.0/addons/sessions/exchange/",
{"transit_token": transit_token},
format="json",
)
assert response.status_code == 200
mock_set.assert_called_once()
called_user, _called_session_id = mock_set.call_args.args
assert called_user == expected_user

View File

@@ -0,0 +1,586 @@
"""
Unit tests for TokenExchangeService.
"""
# pylint: disable=redefined-outer-name,unused-argument,protected-access
from django.core.cache import cache
from django.core.exceptions import ImproperlyConfigured
import pytest
from core.addons.service import (
_PUBLIC_SESSION_FIELDS,
CSRFTokenError,
SessionDataError,
SessionExpiredError,
SessionNotFoundError,
SessionState,
SuspiciousSessionError,
TokenExchangeService,
TransitTokenError,
TransitTokenState,
)
from ...factories import UserFactory
pytestmark = pytest.mark.django_db
@pytest.fixture
def service():
"""Fresh service instance per test."""
return TokenExchangeService()
# ==============================
# init
# ==============================
def test_init_service_improperly_configured_secret_token(settings):
"""Should raise ImproperlyConfigured when ADDONS_TOKEN_SECRET_KEY is unset."""
settings.ADDONS_TOKEN_SECRET_KEY = None
with pytest.raises(ImproperlyConfigured, match="Secret key is required."):
TokenExchangeService()
def test_init_service_improperly_configured_token_scope(settings):
"""Should raise ImproperlyConfigured when ADDONS_TOKEN_SCOPE is empty."""
settings.ADDONS_TOKEN_SCOPE = None
with pytest.raises(ImproperlyConfigured, match="Token scope must be defined."):
TokenExchangeService()
def test_init_service_raises_when_csrf_secret_missing(settings):
"""Should raise ImproperlyConfigured when ADDONS_CSRF_SECRET is unset."""
settings.ADDONS_CSRF_SECRET = None
with pytest.raises(ImproperlyConfigured, match="CSRF Secret is required."):
TokenExchangeService()
# ==============================
# init_session
# ==============================
def test_init_session_returns_three_distinct_tokens(service):
"""Should return (transit_token, session_id, csrf_token), all distinct and non-empty."""
transit_token, session_id, csrf_token = service.init_session()
assert transit_token
assert session_id
assert csrf_token
assert len({transit_token, session_id, csrf_token}) == 3
def test_init_session_starts_in_pending_state(service):
"""Should return a pending initialized session."""
transit_token, session_id, _ = service.init_session()
session_data = cache.get(f"addons_sid_{session_id}")
assert session_data["state"] == SessionState.PENDING
assert "access_token" not in session_data
transit_data = cache.get(f"addons_transit_{transit_token}")
# Transit token should be bind to the same session
assert transit_data.get("session_id") == session_id
assert transit_data.get("state") == TransitTokenState.PENDING
def test_init_session_bind_transit_token_with_session(service):
"""Should bind transit_token with the initialized session."""
transit_token, session_id, _ = service.init_session()
transit_data = cache.get(f"addons_transit_{transit_token}")
assert transit_data.get("session_id") == session_id
assert transit_data.get("state") == TransitTokenState.PENDING
def test_init_session_creates_independent_cache_entries(service):
"""Should write to distinct cache keys when called multiple times."""
transit_a, session_id_a, csrf_a = service.init_session()
transit_b, session_id_b, csrf_b = service.init_session()
assert transit_a != transit_b
assert session_id_a != session_id_b
assert csrf_a != csrf_b
def test_init_session_csrf_token_is_derived_from_session_id(service, settings):
"""Should derive the csrf_token as HMAC(session_id, ADDONS_CSRF_SECRET)."""
_, session_id, csrf_token = service.init_session()
# Same inputs, same output: derivation is pure.
assert csrf_token == service._derive_csrf_token(session_id)
assert csrf_token == service._derive_csrf_token(
session_id
) # deterministic across calls
assert len(csrf_token) == 64
assert all(c in "0123456789abcdef" for c in csrf_token)
# CSRF token is bound to the secret: rotating it invalidates outstanding tokens.
settings.ADDONS_CSRF_SECRET = "another-secret-entirely"
assert csrf_token != service._derive_csrf_token(session_id)
# CSRF token is bound to the session_id: same secret, different session ≠ same token.
settings.ADDONS_CSRF_SECRET = "secret-key-padded-for-minimum-len!-addons" # restore
_, other_session_id, _ = service.init_session()
assert service._derive_csrf_token(session_id) != service._derive_csrf_token(
other_session_id
)
def test_init_session_tokens_have_sufficient_entropy(service):
"""Should be long enough by default that collision is negligible."""
transit_token, session_id, csrf_token = service.init_session()
assert len(transit_token) >= 40
assert len(session_id) >= 40
assert len(csrf_token) == 64
def test_init_session_respects_configured_ttls(service, settings):
"""Should respect their respective TTL configured through settings."""
transit_token, session_id, _ = service.init_session()
session_a_ttl = cache.ttl(f"addons_sid_{session_id}")
transit_a_ttl = cache.ttl(f"addons_transit_{transit_token}")
# By default, transit token has a shorter TTL
assert transit_a_ttl < session_a_ttl
settings.ADDONS_SESSION_TTL = 3000
settings.ADDONS_TRANSIT_TOKEN_TTL = 60
transit_token_b, session_id_b, _ = service.init_session()
session_b_ttl = cache.ttl(f"addons_sid_{session_id_b}")
transit_b_ttl = cache.ttl(f"addons_transit_{transit_token_b}")
assert abs(session_b_ttl - 3000) <= 2
assert abs(transit_b_ttl - 60) <= 2
# ==============================
# verify_csrf
# ==============================
def test_verify_csrf_accepts_matching_token(service):
"""Should verify against its session_id."""
_, session_id, csrf_token = service.init_session()
assert service.verify_csrf(session_id, csrf_token) is None
def test_verify_csrf_is_deterministic_for_same_session(service):
"""Should yield the same token when deriving CSRF twice."""
_, session_id, csrf_token = service.init_session()
# Verify once, then verify again, both must succeed because
# _derive_csrf_token is a pure function of session_id + secret.
# without raising exceptions;
assert service.verify_csrf(session_id, csrf_token) is None
assert service.verify_csrf(session_id, csrf_token) is None
def test_verify_csrf_rejects_after_secret_rotation(service, settings):
"""Should invalidate tokens issued under the old secret when ADDONS_CSRF_SECRET was rotated."""
_, session_id, csrf_token = service.init_session()
# Rotate the secret
settings.ADDONS_CSRF_SECRET = "different-secret-entirely"
with pytest.raises(CSRFTokenError, match="Invalid CSRF token."):
service.verify_csrf(session_id, csrf_token)
def test_verify_csrf_rejects_foreign_token(service):
"""Should reject against another csrf_token."""
_, session_id_a, _ = service.init_session()
_, _, csrf_b = service.init_session()
with pytest.raises(CSRFTokenError, match="Invalid CSRF token."):
service.verify_csrf(session_id_a, csrf_b)
def test_verify_csrf_rejects_random_token(service):
"""Should reject against a random csrf_token."""
_, session_id_a, _ = service.init_session()
with pytest.raises(CSRFTokenError, match="Invalid CSRF token."):
service.verify_csrf(session_id_a, "wrong-csrf-value")
def test_verify_csrf_rejects_empty_token(service):
"""Should reject against an empty csrf_token."""
_, session_id_a, _ = service.init_session()
with pytest.raises(CSRFTokenError, match="Invalid CSRF token."):
service.verify_csrf(session_id_a, "")
def test_verify_csrf_is_case_sensitive(service):
"""Should be case-sensitive (HMAC output is lowercase hex)."""
_, session_id, csrf_token = service.init_session()
with pytest.raises(CSRFTokenError, match="Invalid CSRF token."):
service.verify_csrf(session_id, csrf_token.upper())
# ==============================
# get_session
# ==============================
def test_get_session_raises_when_missing(service):
"""Should raise SessionNotFoundError for an unknown session_id."""
with pytest.raises(SessionNotFoundError, match="Session not found."):
service.get_session("nonexistent-session-id")
def test_get_session_authenticated_returns_token_then_evicts(service):
"""Should return tokens once and evict session when authenticated."""
user = UserFactory()
transit_token_a, session_id_a, _ = service.init_session()
_, session_id_b, _ = service.init_session()
# Authenticate the session
service.consume_transit_token(transit_token_a)
service.set_access_token(user, session_id_a)
# First read: returns the token payload.
result = service.get_session(session_id_a)
assert result["state"] == SessionState.AUTHENTICATED
assert "access_token" in result
# Assert session_a is evicted from the cache
session_data_a = cache.get(f"addons_sid_{session_id_a}")
assert session_data_a is None
# Second read: binding was evicted.
with pytest.raises(SessionNotFoundError, match="Session not found."):
service.get_session(session_id_a)
# Assert session_b is untouched
session_data_b = cache.get(f"addons_sid_{session_id_b}")
assert session_data_b is not None
assert session_data_b.get("state") == SessionState.PENDING
def test_get_session_pending_preserve_cache(service):
"""Should keep session state in cache when the session is pending."""
_, session_id, _ = service.init_session()
# First read: returns the pending session.
result_1 = service.get_session(session_id)
assert result_1["state"] == SessionState.PENDING
assert "access_token" not in result_1
# Second read: returns the pending session.
result_2 = service.get_session(session_id)
assert result_2["state"] == SessionState.PENDING
assert "access_token" not in result_2
def test_get_session_pending_only_exposes_public_fields(service):
"""Should only return whitelisted public fields when session is pending."""
_, session_id, _ = service.init_session()
session = service.get_session(session_id)
assert set(session.keys()) <= _PUBLIC_SESSION_FIELDS
assert session["state"] == SessionState.PENDING
assert "expires_at" not in session
assert "transit_token" not in session
def test_get_session_authenticated_only_exposes_public_fields(service):
"""Should only return whitelisted public fields when session is authenticated."""
transit_token, session_id, _ = service.init_session()
# Authenticate the session
user = UserFactory()
service.consume_transit_token(transit_token)
service.set_access_token(user, session_id)
session = service.get_session(session_id)
assert session["state"] == SessionState.AUTHENTICATED
assert set(session.keys()) <= _PUBLIC_SESSION_FIELDS
assert "expires_at" not in session
assert "transit_token" not in session
def test_get_session_empty_string(service):
"""Should raise SessionNotFoundError if session is empty."""
with pytest.raises(SessionNotFoundError, match="Session not found."):
service.get_session("")
def test_get_session_corrupted_session_data(service):
"""Should raise SessionDataError if session's data is corrupted."""
session_id = "mock-corrupted-session-id"
cache.set(f"addons_sid_{session_id}", {"invalid": "invalid-value"})
with pytest.raises(
SessionDataError, match="Invalid session data: missing state field."
):
service.get_session(session_id)
# ==============================
# consume_transit_token
# ==============================
def test_consume_transit_token_returns_session_id(service):
"""Should return the session_id the transit token was bound to."""
_, session_id, _ = service.init_session()
transit_token = cache.get(f"addons_sid_{session_id}")["transit_token"]
returned_session_id = service.consume_transit_token(transit_token)
assert returned_session_id == session_id
def test_consume_transit_token_replay_raises(service):
"""Should raise on the second consume of the same transit token."""
transit_token, _, _ = service.init_session()
service.consume_transit_token(transit_token)
with pytest.raises(TransitTokenError, match="Transit token already consumed."):
service.consume_transit_token(transit_token)
def test_consume_transit_token_replay_evicts_session(service):
"""Should evict the session as security cleanup when a replay is detected."""
transit_token, session_id, _ = service.init_session()
service.consume_transit_token(transit_token)
assert service.get_session(session_id)
with pytest.raises(TransitTokenError):
service.consume_transit_token(transit_token)
# After replay, the session is gone.
with pytest.raises(SessionNotFoundError):
service.get_session(session_id)
def test_consume_transit_token_raises_on_unknown_token(service):
"""Should raise TransitTokenError when the transit token is unknown or expired."""
with pytest.raises(TransitTokenError, match="Invalid or expired transit token."):
service.consume_transit_token("nonexistent-transit-token")
def test_consume_transit_token_replay_when_session_already_gone(service):
"""Should still detect replay even if the session was evicted independently."""
transit_token, session_id, _ = service.init_session()
service.consume_transit_token(transit_token)
# Simulate session evicted independently
cache.delete(f"addons_sid_{session_id}")
with pytest.raises(TransitTokenError, match="Transit token already consumed."):
service.consume_transit_token(transit_token)
def test_consume_transit_token_extends_ttl_for_replay_detection(service, settings):
"""Should extend the consumed transit entry's TTL to session length."""
settings.ADDONS_SESSION_TTL = 3000
settings.ADDONS_TRANSIT_TOKEN_TTL = 60
transit_token, _, _ = service.init_session()
# Before consume: transit has the short TTL.
assert cache.ttl(f"addons_transit_{transit_token}") <= 60 + 1
service.consume_transit_token(transit_token)
# After consume: TTL is extended to session length.
assert cache.ttl(f"addons_transit_{transit_token}") > 60
# ==============================
# set_access_token
# ==============================
def test_set_access_token_writes_jwt_fields_to_session(service, settings):
"""Should populate the session with JWT fields and flip state to authenticated."""
user = UserFactory()
transit_token, session_id, _ = service.init_session()
service.consume_transit_token(transit_token)
service.set_access_token(user, session_id)
session = service.get_session(session_id)
assert session["state"] == SessionState.AUTHENTICATED
assert session["access_token"]
assert session["token_type"] == settings.ADDONS_TOKEN_TYPE
assert session["expires_in"] == settings.ADDONS_TOKEN_TTL
assert session["scope"] == settings.ADDONS_TOKEN_SCOPE
def test_set_access_token_preserves_remaining_ttl(service, settings):
"""Should inherit the pending session's remaining TTL rather than resetting it."""
settings.ADDONS_SESSION_TTL = 3000
user = UserFactory()
transit_token, session_id, _ = service.init_session()
service.consume_transit_token(transit_token)
ttl_before = cache.ttl(f"addons_sid_{session_id}")
service.set_access_token(user, session_id)
ttl_after = cache.ttl(f"addons_sid_{session_id}")
# TTL must not jump back to full — allow small tolerance for execution time.
assert ttl_after <= ttl_before + 1
# And it shouldn't have somehow grown beyond the session length either.
assert ttl_after <= 3000
def test_authenticating_one_session_leaves_others_pending(service):
"""Should leave other pending sessions untouched when authenticating one."""
user = UserFactory()
transit_a, session_id_a, _ = service.init_session()
_, session_id_b, _ = service.init_session()
service.consume_transit_token(transit_a)
service.set_access_token(user, session_id_a)
session_b = service.get_session(session_id_b)
assert session_b["state"] == SessionState.PENDING
assert "access_token" not in session_b
def test_set_access_token_raises_when_transit_entry_missing(service):
"""Should raise when the transit cache entry is gone (TTL expired or evicted)."""
user = UserFactory()
transit_token, session_id, _ = service.init_session()
# Manually delete the transit entry, simulating expiry or eviction.
cache.delete(f"addons_transit_{transit_token}")
with pytest.raises(SuspiciousSessionError, match="Transit token not found."):
service.set_access_token(user, session_id)
def test_set_access_token_raises_if_transit_token_not_consumed(service):
"""Should refuse to authenticate a session whose transit token hasn't been consumed."""
user = UserFactory()
_, session_id, _ = service.init_session()
with pytest.raises(SuspiciousSessionError, match="Transit token not consumed."):
service.set_access_token(user, session_id)
assert cache.get(f"addons_sid_{session_id}") is None
def test_set_access_token_raises_on_missing_transit_token_field(service):
"""Should raise SessionDataError when session data is missing the transit_token field."""
user = UserFactory()
transit_token, session_id, _ = service.init_session()
service.consume_transit_token(transit_token)
corrupted = cache.get(f"addons_sid_{session_id}")
del corrupted["transit_token"]
cache.set(f"addons_sid_{session_id}", corrupted, 3600)
with pytest.raises(SessionDataError, match="missing transit_token field"):
service.set_access_token(user, session_id)
def test_set_access_token_raises_if_double_authenticated(service):
"""Should raise and wipe the session on double-auth while leaving the transit token intact."""
user = UserFactory()
transit_token, _, _ = service.init_session()
session_id = service.consume_transit_token(transit_token)
service.set_access_token(user, session_id)
with pytest.raises(
SuspiciousSessionError, match="Session is not in pending state."
):
service.set_access_token(user, session_id)
# Nuke session data as a security cleanup
session_data = cache.get(f"addons_sid_{session_id}")
assert session_data is None
transit_data = cache.get(f"addons_transit_{transit_token}")
assert transit_data.get("state") == TransitTokenState.CONSUMED
def test_set_access_token_raises_when_session_missing(service):
"""Should raise SessionNotFoundError when called with an unknown session_id."""
user = UserFactory()
with pytest.raises(SessionNotFoundError, match="Session not found."):
service.set_access_token(user, "nonexistent-session-id")
def test_set_access_token_rejects_malformed_expires_at(service):
"""Should raise SessionDataError when the cached expires_at is not valid ISO 8601."""
user = UserFactory()
transit_token, _, _ = service.init_session()
session_id = service.consume_transit_token(transit_token)
# Corrupt the cached session directly.
corrupted = cache.get(f"addons_sid_{session_id}")
corrupted["expires_at"] = "not-an-iso-string"
cache.set(f"addons_sid_{session_id}", corrupted, 3600)
with pytest.raises(SessionDataError, match="malformed expiration"):
service.set_access_token(user, session_id)
def test_set_access_token_rejects_missing_expires_at(service):
"""Should raise SessionDataError when the cached session is missing the expires_at field."""
user = UserFactory()
transit_token, _, _ = service.init_session()
session_id = service.consume_transit_token(transit_token)
corrupted = cache.get(f"addons_sid_{session_id}")
del corrupted["expires_at"]
cache.set(f"addons_sid_{session_id}", corrupted, 3600)
with pytest.raises(SessionDataError, match="missing expiration"):
service.set_access_token(user, session_id)
def test_set_access_token_raises_when_session_expired(service):
"""Should raise SessionExpiredError when the cached session's expires_at is in the past."""
user = UserFactory()
transit_token, session_id, _ = service.init_session()
service.consume_transit_token(transit_token)
# Simulate expiry: rewrite expires_at into the past.
corrupted = cache.get(f"addons_sid_{session_id}")
corrupted["expires_at"] = "2020-01-01T00:00:00+00:00"
cache.set(f"addons_sid_{session_id}", corrupted, 3600)
with pytest.raises(SessionExpiredError, match="Session expired."):
service.set_access_token(user, session_id)

View File

@@ -4,8 +4,9 @@ from django.conf import settings
from django.urls import include, path
from lasuite.oidc_login.urls import urlpatterns as oidc_urls
from rest_framework.routers import DefaultRouter
from rest_framework.routers import DefaultRouter, SimpleRouter
from core.addons import viewsets as addons_viewsets
from core.api import get_frontend_configuration, viewsets
from core.external_api import viewsets as external_viewsets
@@ -18,15 +19,19 @@ router.register("files", viewsets.FileViewSet, basename="files")
router.register(
"resource-accesses", viewsets.ResourceAccessViewSet, basename="resource_accesses"
)
router.register(
"addons/sessions",
addons_viewsets.SessionViewSet,
basename="addons_sessions",
)
# - External API
external_router = DefaultRouter()
external_router = SimpleRouter()
external_router.register(
"application",
external_viewsets.ApplicationViewSet,
basename="external_application",
)
external_router.register(
"rooms",
external_viewsets.RoomViewSet,

View File

@@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2026-03-12 13:46+0000\n"
"POT-Creation-Date: 2026-04-21 14:09+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@@ -94,11 +94,11 @@ msgstr ""
"Sie müssen Administrator oder Eigentümer eines Raums sein, um Zugriffe "
"hinzuzufügen."
#: core/api/serializers.py:516
#: core/api/serializers.py:509
msgid "This file extension is not allowed."
msgstr "Diese Dateiendung ist nicht erlaubt."
#: core/api/serializers.py:533
#: core/api/viewsets.py:1090
msgid "You have reached the maximum number of files for this type."
msgstr "Sie haben die maximale Anzahl an Dateien dieses Typs erreicht."

View File

@@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2026-03-12 13:46+0000\n"
"POT-Creation-Date: 2026-04-21 14:09+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@@ -92,11 +92,11 @@ msgstr "Creator is me"
msgid "You must be administrator or owner of a room to add accesses to it."
msgstr "You must be administrator or owner of a room to add accesses to it."
#: core/api/serializers.py:516
#: core/api/serializers.py:509
msgid "This file extension is not allowed."
msgstr "This file extension is not allowed."
#: core/api/serializers.py:533
#: core/api/viewsets.py:1090
msgid "You have reached the maximum number of files for this type."
msgstr "You have reached the maximum number of files for this type."

View File

@@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2026-03-12 13:46+0000\n"
"POT-Creation-Date: 2026-04-21 14:09+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: antoine.lebaud@mail.numerique.gouv.fr\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@@ -95,11 +95,11 @@ msgstr ""
"Vous devez être administrateur ou propriétaire d'une salle pour y ajouter "
"des accès."
#: core/api/serializers.py:516
#: core/api/serializers.py:509
msgid "This file extension is not allowed."
msgstr "Cette extension n'est pas autorisée"
#: core/api/serializers.py:533
#: core/api/viewsets.py:1090
msgid "You have reached the maximum number of files for this type."
msgstr "Vous avez atteint le nombre maximum de fichiers de ce type"

View File

@@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2026-03-12 13:46+0000\n"
"POT-Creation-Date: 2026-04-21 14:09+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@@ -93,11 +93,11 @@ msgid "You must be administrator or owner of a room to add accesses to it."
msgstr ""
"Je moet beheerder of eigenaar van een ruimte zijn om toegang toe te voegen."
#: core/api/serializers.py:516
#: core/api/serializers.py:509
msgid "This file extension is not allowed."
msgstr "Deze bestandsextensie is niet toegestaan."
#: core/api/serializers.py:533
#: core/api/viewsets.py:1090
msgid "You have reached the maximum number of files for this type."
msgstr "Het maximale aantal bestanden voor dit type is bereikt."

View File

@@ -871,6 +871,88 @@ class Base(Configuration):
environ_prefix=None,
)
# Addons
ADDONS_ENABLED = values.BooleanValue(
False,
environ_name="ADDONS_ENABLED",
environ_prefix=None,
)
ADDONS_SESSION_TTL = values.PositiveIntegerValue(
3600,
environ_name="ADDONS_SESSION_TTL",
environ_prefix=None,
)
ADDONS_TRANSIT_TOKEN_TTL = values.PositiveIntegerValue(
120,
environ_name="ADDONS_TRANSIT_TOKEN_TTL",
environ_prefix=None,
)
ADDONS_CSRF_SECRET = SecretFileValue(
None,
environ_name="ADDONS_CSRF_SECRET",
environ_prefix=None,
)
ADDONS_CACHE_PREFIX_SESSION = values.Value(
"sid",
environ_name="ADDONS_CACHE_PREFIX_SESSION",
environ_prefix=None,
)
ADDONS_CACHE_PREFIX_TRANSIT = values.Value(
"transit",
environ_name="ADDONS_CACHE_PREFIX_TRANSIT",
environ_prefix=None,
)
ADDONS_TOKEN_AUDIENCE = values.Value(
"addons",
environ_name="ADDONS_TOKEN_AUDIENCE",
environ_prefix=None,
)
ADDONS_TOKEN_ISSUER = values.Value(
"lasuite-meet",
environ_name="ADDONS_TOKEN_ISSUER",
environ_prefix=None,
)
ADDONS_TOKEN_TTL = values.PositiveIntegerValue(
7200,
environ_name="ADDONS_TOKEN_TTL",
environ_prefix=None,
)
ADDONS_TOKEN_ALG = values.Value(
"HS256",
environ_name="ADDONS_TOKEN_ALG",
environ_prefix=None,
)
ADDONS_TOKEN_TYPE = values.Value(
"Bearer",
environ_name="ADDONS_TOKEN_TYPE",
environ_prefix=None,
)
ADDONS_TOKEN_SECRET_KEY = SecretFileValue(
None,
environ_name="ADDONS_TOKEN_SECRET_KEY",
environ_prefix=None,
)
ADDONS_TOKEN_SCOPE = values.Value(
"room:create",
environ_name="ADDONS_TOKEN_SCOPE",
environ_prefix=None,
)
ADDONS_RANDOM_TOKEN_BYTE_LENGTH = values.PositiveIntegerValue(
60,
environ_name="ADDONS_RANDOM_TOKEN_BYTE_LENGTH",
environ_prefix=None,
)
ADDONS_SESSION_ID_COOKIE = values.Value(
"addonsSid",
environ_name="ADDONS_SESSION_ID_COOKIE",
environ_prefix=None,
)
ADDONS_PENDING_SESSION_KEY = values.Value(
"addons_sid",
environ_name="ADDONS_PENDING_SESSION_KEY",
environ_prefix=None,
)
# pylint: disable=invalid-name
@property
def ENVIRONMENT(self):
@@ -1003,6 +1085,10 @@ class Test(Base):
CELERY_TASK_ALWAYS_EAGER = True
FILE_UPLOAD_ENABLED = True
ADDONS_ENABLED = True
ADDONS_CSRF_SECRET = "secret-key-padded-for-minimum-len!-addons" # noqa:S105
ADDONS_TOKEN_SECRET_KEY = "secret-key-padded-for-minimum-len!-addons" # noqa:S105
def __init__(self):
# pylint: disable=invalid-name
self.INSTALLED_APPS += ["drf_spectacular_sidecar"]