(oidc) add backend logout endpoint

This endpoint can be called by the IdP to kill all the user
sessions.

In order to allow to kill only one session, we need to store the
session ID when login from IdP...
This commit is contained in:
Quentin BEY
2025-10-07 18:22:56 +02:00
parent c771bae725
commit b5d1027787
5 changed files with 1085 additions and 7 deletions

View File

@@ -0,0 +1,57 @@
# Using the OIDC Authentication Backend With Back-Channel Logout
This guide explains how to integrate and configure the `OIDCBackChannelLogoutView` in your Django project for OpenID Connect (OIDC) authentication.
## Installation
To use the OIDC authentication backend with back-channel logout support, who obviously need to have set up the OIDC authentication backend first.
If you haven't done so, please refer to the [Using the OIDC Authentication Backend](how-to-use-oidc-backend.md) guide.
## Configuration
### Settings
You need to have the [Using the OIDC Authentication Backend](how-to-use-oidc-backend.md) settings configured first.
Then, add the following to your Django settings:
```python
# A db backed session engine is required to support back-channel logout
SESSION_ENGINE = "django.contrib.sessions.backends.cached_db"
# OR "django.contrib.sessions.backends.db" if you don't want caching
# New required OIDC settings
OIDC_OP_URL="https://your-provider.com"
```
### URLs
The project OIDC URLs `lasuite.oidc_login.urls` already include the back-channel logout URL,
so you just need to include them in your project's `urls.py` if you haven't done so already:
```python
from django.urls import include, path
urlpatterns = [
# Your other URLs
path('', include('lasuite.oidc_login.urls')),
]
```
The back-channel logout endpoint will be available at `<base-URL>/back-channel-logout/`.
### Set up the OIDC Provider accordingly
Make sure to configure your OIDC Provider to send back-channel logout requests
to your Django application's back-channel logout endpoint.
For instance, in Keycloak, you can set the "Backchannel Logout URL" in the client settings -> "Logout settings":
- Turn off "Front channel logout"
- Set "Backchannel Logout URL" to `<base-URL>/back-channel-logout/` (like http://app-dev:8071/api/v1.0/backchannel-logout/)
- Enable/Disable "Backchannel Logout Session Required" as per your requirements
Note the "Backchannel Logout Session Required" requires the `sid` claim to be sent in the token info at login to be
able to match session.

View File

@@ -3,7 +3,6 @@
import re
import requests
from gitlint.rules import CommitMessageTitle, LineRule, RuleViolation

View File

@@ -3,7 +3,7 @@
from django.urls import path
from mozilla_django_oidc.urls import urlpatterns as mozilla_oidc_urls
from .views import OIDCLogoutCallbackView, OIDCLogoutView
from .views import OIDCBackChannelLogoutView, OIDCLogoutCallbackView, OIDCLogoutView
urlpatterns = [
# Override the default 'logout/' path from Mozilla Django OIDC with our custom view.
@@ -13,5 +13,10 @@ urlpatterns = [
OIDCLogoutCallbackView.as_view(),
name="oidc_logout_callback",
),
path(
"backchannel-logout/",
OIDCBackChannelLogoutView.as_view(),
name="oidc_backchannel_logout",
),
*mozilla_oidc_urls,
]

View File

@@ -1,16 +1,29 @@
"""Authentication Views for the OIDC authentication backend."""
import copy
import logging
from importlib import import_module
from urllib.parse import urlencode
from django.conf import settings
from django.contrib import auth
from django.contrib.auth import get_user_model
from django.contrib.sessions.models import Session
from django.core.cache import cache
from django.core.exceptions import SuspiciousOperation
from django.http import HttpResponseRedirect
from django.http import HttpResponse, HttpResponseRedirect, JsonResponse
from django.urls import reverse
from django.utils import crypto
from mozilla_django_oidc.utils import (
absolutify,
)
from django.utils import crypto, timezone
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from joserfc import jwt
from joserfc._keys import KeySet
from jwt import get_unverified_header
from jwt.exceptions import DecodeError, InvalidTokenError
from jwt.utils import force_bytes
from mozilla_django_oidc.auth import OIDCAuthenticationBackend
from mozilla_django_oidc.utils import absolutify, import_from_settings
from mozilla_django_oidc.views import (
OIDCAuthenticationCallbackView as MozillaOIDCAuthenticationCallbackView,
)
@@ -21,6 +34,10 @@ from mozilla_django_oidc.views import (
OIDCLogoutView as MozillaOIDCOIDCLogoutView,
)
User = get_user_model()
logger = logging.getLogger(__name__)
class OIDCLogoutView(MozillaOIDCOIDCLogoutView):
"""
@@ -143,6 +160,463 @@ class OIDCLogoutCallbackView(MozillaOIDCOIDCLogoutView):
return HttpResponseRedirect(self.redirect_url)
@method_decorator(csrf_exempt, name="dispatch")
class OIDCBackChannelLogoutView(View):
"""
View to handle OIDC back-channel logout requests.
This view implements the OpenID Connect Back-Channel Logout 1.0 specification
(https://openid.net/specs/openid-connect-backchannel-1_0.html).
The identity provider (IdP) sends a JWT Logout Token to this URL via
a server-to-server POST request when a user logs out from
another application or directly on the IdP.
"""
http_method_names = ["post"]
# OIDC Back-Channel Logout specification constants
LOGOUT_EVENT_URI = "http://schemas.openid.net/event/backchannel-logout"
LOGOUT_TOKEN_TYPE = "logout+jwt" # noqa: S105
# Recommended maximum duration for Logout Token expiration (2 minutes)
MAX_TOKEN_AGE_SECONDS = 120
# Cache key prefix for JTI storage
JTI_CACHE_PREFIX = "oidc_backchannel_jti:"
# Cache timeout (5 minutes - should be longer than MAX_TOKEN_AGE_SECONDS)
JTI_CACHE_TIMEOUT = 300
@staticmethod
def get_settings(attr, *args):
"""Retrieve a parameter from Django settings."""
return import_from_settings(attr, *args)
def post(self, request, *args, **kwargs): # noqa: PLR0911
"""
Process the back-channel logout request.
According to section 2.8 of the spec, returns:
- 200 OK if logout succeeded
- 204 No Content is also accepted (some Web frameworks)
- 400 Bad Request if the request is invalid or logout fails
Returns:
HttpResponse: 200/204 on success, 400 on error
"""
try:
logout_token = self.get_logout_token(request)
if not logout_token:
logger.error("No logout_token provided in the request")
return self.error_response("invalid_request", "Missing logout_token parameter")
payload = self.validate_logout_token(logout_token)
if not payload:
return self.error_response("invalid_request", "Invalid logout token")
# Check for presence of sub or sid (spec 2.4)
sub = payload.get("sub")
sid = payload.get("sid")
if not sub and not sid:
logger.error("Neither sub nor sid present in the logout token")
return self.error_response("invalid_request", "Token must contain either sub or sid claim")
# Check for token replay with jti
jti = payload.get("jti")
if jti and not self.check_and_store_jti(jti):
logger.error("Logout token with jti=%s already processed (replay attack)", jti)
return self.error_response("invalid_request", "Token already processed")
# Log out user sessions
success = self.logout_user_sessions(sub, sid)
if success:
logger.info("Back-channel logout successful for sub=%s, sid=%s", sub, sid)
# According to the spec, return 204 No Content (or 200 OK w/ content)
response = HttpResponse(status=204)
else:
logger.warning("Logout failed for sub=%s, sid=%s", sub, sid)
return self.error_response("invalid_request", "Logout failed")
# Spec 2.8: add Cache-Control: no-store
response["Cache-Control"] = "no-store"
return response
except Exception as e:
logger.exception("Error processing logout: %s", e)
return self.error_response("invalid_request", "Internal server error during logout")
def error_response(self, error, error_description=None):
"""
Create an error response compliant with the spec (section 2.8).
Args:
error: OAuth 2.0 error code
error_description: Optional error description
Returns:
JsonResponse: 400 Bad Request with JSON body
"""
response_data = {"error": error}
if error_description:
response_data["error_description"] = error_description
response = JsonResponse(response_data, status=400)
response["Cache-Control"] = "no-store"
return response
def get_logout_token(self, request):
"""
Extract the logout_token from the POST request.
According to section 2.5 of the spec, the token must be in
the request body as application/x-www-form-urlencoded.
Args:
request: The Django HTTP request
Returns:
str: The logout_token or None
"""
return request.POST.get("logout_token")
def validate_logout_token(self, logout_token): # noqa: PLR0911
"""
Validate and decode the logout_token JWT according to section 2.6 of the spec.
Complete validation including:
1. Type verification (typ header = "logout+jwt" recommended)
2. Signature validation with OP's JWKS keys
3. Issuer verification (iss)
4. Audience verification (aud) - must contain client_id
5. Expiration verification (exp) - recommended <= 2 minutes
6. iat (issued at) verification
7. Presence of jti claim (unique identifier)
8. Presence of events claim with logout URI
9. Absence of nonce claim (FORBIDDEN by spec to avoid confusion with ID Token)
10. Presence of sub or sid
Args:
logout_token: The JWT to validate
Returns:
dict: Token payload if valid, None otherwise
"""
try:
# Check token type (recommended but not mandatory for compatibility)
logout_token = force_bytes(logout_token)
try:
header = get_unverified_header(logout_token)
token_type = header.get("typ")
if token_type and token_type.lower() != self.LOGOUT_TOKEN_TYPE:
logger.warning("Unexpected token type: %s (expected: %s)", token_type, self.LOGOUT_TOKEN_TYPE)
# Don't reject for compatibility with existing implementations
except DecodeError:
logger.error("Unable to decode JWT header")
return None
backend = OIDCAuthenticationBackend()
# Retrieve OIDC provider's public key for signature validation
jwks_client = backend.retrieve_matching_jwk(logout_token)
# Decode token with complete validation
keyset = KeySet.import_key_set({"keys": [jwks_client]})
decoded_jwt = jwt.decode(logout_token, keyset, algorithms=["RS256", "ES256"])
claims_requests = jwt.JWTClaimsRegistry(
now=int(timezone.now().timestamp()),
iss={"value": settings.OIDC_OP_URL, "essential": True},
aud={"value": backend.OIDC_RP_CLIENT_ID, "essential": True},
exp={"essential": True},
iat={"essential": True},
)
claims_requests.validate(decoded_jwt.claims)
payload = decoded_jwt.claims
# Validation according to the spec (section 2.6)
# 1. Verify that the 'events' claim exists and contains the correct URI
if not self.is_valid_logout_event(payload):
return None
# 2. IMPORTANT: Verify ABSENCE of nonce (spec 2.4)
if "nonce" in payload:
logger.error("Logout token contains a 'nonce' claim (FORBIDDEN by the spec)")
return None
# 3. Verify presence of jti (unique identifier, REQUIRED)
if "jti" not in payload:
logger.error("Logout token does not contain 'jti' claim (REQUIRED)")
return None
# 4. Verify token is not too old (security recommendation)
iat = payload.get("iat")
exp = payload.get("exp")
if iat and exp:
token_lifetime = exp - iat
if token_lifetime > self.MAX_TOKEN_AGE_SECONDS:
logger.warning(
"Logout token has a lifetime of %ss (recommended: <= %ss)",
token_lifetime,
self.MAX_TOKEN_AGE_SECONDS,
)
return payload
except InvalidTokenError as e:
logger.exception("Invalid JWT token: %s", e)
return None
except Exception as e:
logger.exception("Error validating token: %s", e)
return None
def is_valid_logout_event(self, payload):
"""
Verify that the payload contains a valid logout event.
According to section 2.4 of the spec, the 'events' claim must:
- Be a JSON object
- Contain the key "http://schemas.openid.net/event/backchannel-logout"
- The value must be a JSON object (recommended: empty object {})
Args:
payload: The decoded JWT payload
Returns:
bool: True if the event is valid
"""
events = payload.get("events")
if not events or not isinstance(events, dict):
logger.error("Invalid token: 'events' claim absent or not an object")
return False
if self.LOGOUT_EVENT_URI not in events:
logger.error("Incorrect event type: %s not found in events", self.LOGOUT_EVENT_URI)
return False
# The value must be a JSON object
event_value = events[self.LOGOUT_EVENT_URI]
if not isinstance(event_value, dict):
logger.error("Logout event value must be a JSON object")
return False
return True
def check_and_store_jti(self, jti):
"""
Check if the jti has already been processed and store it to prevent replay.
Uses Django's cache framework (configured in settings.py) to store JTIs.
This allows the implementation to work correctly across multiple server instances
and persist data appropriately based on the configured cache backend.
Recommended cache backends for production:
- Redis: Shared state across instances, fast, with TTL support
- Memcached: Similar benefits to Redis
- Database: Persistent but slower
The cache timeout is set to 5 minutes (longer than MAX_TOKEN_AGE_SECONDS)
to ensure tokens can't be replayed even if received near expiration.
Args:
jti: The unique token identifier
Returns:
bool: True if the jti is new, False if it has already been processed
"""
cache_key = f"{self.JTI_CACHE_PREFIX}{jti}"
# Try to add the jti to cache (atomic operation)
# add() returns False if key already exists, True if successfully added
was_added = cache.add(cache_key, True, timeout=self.JTI_CACHE_TIMEOUT)
if not was_added:
logger.warning("JTI %s already exists in cache (replay attack detected)", jti)
return False
logger.debug("JTI %s stored in cache with %ss timeout", jti, self.JTI_CACHE_TIMEOUT)
return True
def logout_user_sessions(self, sub, sid=None): # noqa: PLR0912,PLR0915
"""
Log out sessions associated with a user (section 2.7 of the spec).
According to the spec:
- If sid is present: log out only that specific session
- If only sub is present: log out ALL user sessions
- If both are present: implementation can choose
This method should also:
- Revoke refresh tokens (except those with offline_access)
- If the RP is also an OP, propagate logout to downstream RPs
Args:
sub: User's subject identifier (unique OIDC identifier)
sid: Session ID (optional, to target a specific session)
Returns:
bool: True if at least one session was logged out, False otherwise
"""
session_store = import_module(settings.SESSION_ENGINE).SessionStore()
user = None
sessions_deleted = 0
if settings.SESSION_ENGINE not in [
"django.contrib.sessions.backends.db",
"django.contrib.sessions.backends.cached_db",
]:
logger.error(
"OIDC back-channel logout requires database-backed sessions. Current SESSION_ENGINE: %s",
settings.SESSION_ENGINE,
)
return False
# Case 1: sub is provided - resolve user first (original behavior)
if sub is not None:
try:
user = User.objects.get(sub=sub)
except User.DoesNotExist:
logger.warning("User with sub=%s not found", sub)
# According to the spec, if the user is already logged out, consider it a success
return True
except User.MultipleObjectsReturned:
logger.error("Multiple users with the same sub=%s", sub)
return False
# Iterate through all active sessions
sessions = Session.objects.filter(expire_date__gte=timezone.now())
for session in sessions:
try:
session_data = session.get_decoded()
session_user_id = session_data.get("_auth_user_id")
if session_user_id and str(session_user_id) == str(user.pk):
# If a specific sid is provided, check for match
session_sid = session_data.get("oidc_sid")
if session_sid and sid and session_sid == sid:
session_store.delete(session.session_key)
sessions_deleted += 1
break # Only one session matches the sid
if not session_sid or not sid:
# Delete all user sessions or sessions without sid
session_store.delete(session.session_key)
sessions_deleted += 1
except Exception as e:
logger.error("Error processing session: %s", e)
continue
# Case 2: only sid is provided - find session first, then resolve user lazily
elif sid is not None:
# Iterate through all active sessions looking for the specific sid
sessions = Session.objects.filter(expire_date__gte=timezone.now())
for session in sessions:
try:
session_data = session.get_decoded()
session_sid = session_data.get("oidc_sid")
if session_sid != sid:
continue
# Found matching session, now lazily resolve the user
session_user_id = session_data.get("_auth_user_id")
if session_user_id:
try:
user = User.objects.get(pk=session_user_id)
session_store.delete(session.session_key)
sessions_deleted += 1
logger.info("Session %s deleted for user %s", sid, user.pk)
break # Only one session matches the sid
except User.DoesNotExist:
logger.warning("User with pk=%s not found for session %s", session_user_id, sid)
# Still delete the orphaned session
session_store.delete(session.session_key)
sessions_deleted += 1
logger.info("Orphaned session %s deleted", sid)
break
except (ValueError, TypeError) as e:
logger.error("Invalid user_id %s in session %s: %s", session_user_id, sid, e)
continue
else:
logger.warning("No user_id found in session %s", sid)
# Still delete the session without user context
session_store.delete(session.session_key)
sessions_deleted += 1
logger.info("Anonymous session %s deleted", sid)
break
except Exception as e:
logger.error("Error processing session: %s", e)
continue
else:
# Neither sub nor sid provided - this shouldn't happen due to validation in post()
logger.error("Neither sub nor sid provided for logout")
return False
if sessions_deleted > 0:
if user:
logger.info("%s session(s) deleted for user %s", sessions_deleted, user.pk)
else:
logger.info("%s session(s) deleted", sessions_deleted)
# According to spec 2.7, if the user is already logged out, it's a success
elif user:
logger.info("No active session found for user %s (already logged out)", user.pk)
else:
logger.info("No active session found for sid %s (already logged out)", sid)
# Only call user-related methods if we successfully resolved a user
if user:
self.revoke_refresh_tokens(user)
self.propagate_logout_to_downstream_rps(user, sub, sid)
return True
def revoke_refresh_tokens(self, user):
"""
Revoke user's refresh tokens (section 2.7 of the spec).
According to the spec:
- Refresh tokens WITHOUT offline_access MUST be revoked
- Refresh tokens WITH offline_access MUST NOT be revoked normally
NOTE: This implementation depends on the token management system.
To be implemented if needed by the project
Args:
user: The user whose tokens should be revoked
"""
def propagate_logout_to_downstream_rps(self, user, sub, sid):
"""
Propagate logout to downstream RPs if this RP is also an OP.
According to section 2.7 of the spec, if the RP receiving the logout
is itself an OP serving other RPs, it should propagate
the logout by sending logout requests to its own RPs.
NOTE: To be implemented if needed by the project.
Args:
user: The user to log out
sub: Subject identifier
sid: Session ID (optional)
"""
class OIDCAuthenticationCallbackView(MozillaOIDCAuthenticationCallbackView):
"""Custom callback view for handling the silent login flow."""

View File

@@ -1,11 +1,21 @@
"""Unit tests for the Authentication Views."""
import base64
import json
import time
from hashlib import sha256
from unittest import mock
from urllib.parse import parse_qs, urlparse
import jwt as pyjwt
import pytest
import responses
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
from django.contrib.sessions.middleware import SessionMiddleware
from django.contrib.sessions.models import Session
from django.core.exceptions import SuspiciousOperation
from django.test import RequestFactory
from django.urls import reverse
@@ -15,11 +25,14 @@ from rest_framework.test import APIClient
from lasuite.oidc_login.views import (
OIDCAuthenticationCallbackView,
OIDCAuthenticationRequestView,
OIDCBackChannelLogoutView,
OIDCLogoutCallbackView,
OIDCLogoutView,
)
from tests import factories
UserModel = get_user_model()
pytestmark = pytest.mark.django_db
@@ -347,3 +360,533 @@ def test_view_callback_failure_url_silent_login(mocked_success_url):
mocked_success_url.assert_called_once()
assert returned_url == "foo"
assert not request.session.get("silent")
def test_backchannel_missing_logout_token_returns_400():
"""POST without logout_token should return 400 with JSON and no-store header."""
view = OIDCBackChannelLogoutView.as_view()
url = "/oidc/backchannel-logout/"
# No logout_token in body
request = RequestFactory().post(url, data={})
response = view(request)
assert response.status_code == 400
assert response["Cache-Control"] == "no-store"
body = json.loads(response.content)
assert body["error"] == "invalid_request"
assert body["error_description"] == "Missing logout_token parameter"
@mock.patch.object(OIDCBackChannelLogoutView, "validate_logout_token", return_value=None)
@mock.patch.object(OIDCBackChannelLogoutView, "get_logout_token", return_value="token")
def test_backchannel_invalid_token_returns_400(mock_get, mock_validate):
"""Test that invalid logout token returns 400 error response."""
view = OIDCBackChannelLogoutView.as_view()
request = RequestFactory().post("/oidc/backchannel-logout/", data={"logout_token": "token"})
response = view(request)
assert response.status_code == 400
data = json.loads(response.content)
assert data["error"] == "invalid_request"
assert data["error_description"] == "Invalid logout token"
@mock.patch.object(
OIDCBackChannelLogoutView,
"validate_logout_token",
return_value={"events": {OIDCBackChannelLogoutView.LOGOUT_EVENT_URI: {}}, "jti": "j"},
)
@mock.patch.object(OIDCBackChannelLogoutView, "get_logout_token", return_value="token")
def test_backchannel_missing_sub_and_sid_returns_400(mock_get, mock_validate):
"""Test that logout token missing both sub and sid claims returns 400 error."""
view = OIDCBackChannelLogoutView.as_view()
request = RequestFactory().post("/oidc/backchannel-logout/", data={"logout_token": "token"})
response = view(request)
assert response.status_code == 400
data = json.loads(response.content)
assert data["error_description"] == "Token must contain either sub or sid claim"
@mock.patch.object(OIDCBackChannelLogoutView, "logout_user_sessions", return_value=True)
@mock.patch.object(OIDCBackChannelLogoutView, "check_and_store_jti", return_value=False)
@mock.patch.object(
OIDCBackChannelLogoutView,
"validate_logout_token",
return_value={"sub": "user-sub", "events": {OIDCBackChannelLogoutView.LOGOUT_EVENT_URI: {}}, "jti": "abc"},
)
@mock.patch.object(OIDCBackChannelLogoutView, "get_logout_token", return_value="token")
def test_backchannel_jti_replay_returns_400(mock_get, mock_validate, mock_check, mock_logout_sessions):
"""Test that replayed JTI (token already processed) returns 400 error."""
view = OIDCBackChannelLogoutView.as_view()
request = RequestFactory().post("/oidc/backchannel-logout/", data={"logout_token": "token"})
response = view(request)
assert response.status_code == 400
data = json.loads(response.content)
assert data["error_description"] == "Token already processed"
@mock.patch.object(OIDCBackChannelLogoutView, "logout_user_sessions", return_value=True)
@mock.patch.object(OIDCBackChannelLogoutView, "check_and_store_jti", return_value=True)
@mock.patch.object(
OIDCBackChannelLogoutView,
"validate_logout_token",
return_value={"sub": "user-sub", "events": {OIDCBackChannelLogoutView.LOGOUT_EVENT_URI: {}}, "jti": "abc"},
)
@mock.patch.object(OIDCBackChannelLogoutView, "get_logout_token", return_value="token")
def test_backchannel_success_returns_204_and_header(mock_get, mock_validate, mock_check, mock_logout_sessions):
"""Test successful backchannel logout returns 204 with no-store header."""
view = OIDCBackChannelLogoutView.as_view()
request = RequestFactory().post("/oidc/backchannel-logout/", data={"logout_token": "token"})
response = view(request)
assert response.status_code == 204
assert response["Cache-Control"] == "no-store"
@mock.patch.object(OIDCBackChannelLogoutView, "logout_user_sessions", return_value=False)
@mock.patch.object(OIDCBackChannelLogoutView, "check_and_store_jti", return_value=True)
@mock.patch.object(
OIDCBackChannelLogoutView,
"validate_logout_token",
return_value={"sub": "user-sub", "events": {OIDCBackChannelLogoutView.LOGOUT_EVENT_URI: {}}, "jti": "abc"},
)
@mock.patch.object(OIDCBackChannelLogoutView, "get_logout_token", return_value="token")
def test_backchannel_logout_failure_returns_400(mock_get, mock_validate, mock_check, mock_logout_sessions):
"""Test that logout failure returns 400 error response."""
view = OIDCBackChannelLogoutView.as_view()
request = RequestFactory().post("/oidc/backchannel-logout/", data={"logout_token": "token"})
response = view(request)
assert response.status_code == 400
data = json.loads(response.content)
assert data["error_description"] == "Logout failed"
def test_backchannel_error_response_helper():
"""Test the error_response helper method returns properly formatted error response."""
view = OIDCBackChannelLogoutView()
resp = view.error_response("invalid_request", "desc")
assert resp.status_code == 400
assert resp["Cache-Control"] == "no-store"
assert json.loads(resp.content) == {"error": "invalid_request", "error_description": "desc"}
def test_is_valid_logout_event_true_and_false_cases():
"""Test validation of logout event structure in various scenarios."""
view = OIDCBackChannelLogoutView()
# Missing events
assert not view.is_valid_logout_event({})
# Events not a dict
assert not view.is_valid_logout_event({"events": "x"})
# Wrong key
assert not view.is_valid_logout_event({"events": {"wrong": {}}})
# Value not a dict
assert not view.is_valid_logout_event({"events": {OIDCBackChannelLogoutView.LOGOUT_EVENT_URI: "x"}})
# Valid
assert view.is_valid_logout_event({"events": {OIDCBackChannelLogoutView.LOGOUT_EVENT_URI: {}}})
def test_check_and_store_jti_cache_behavior():
"""Test JTI caching behavior to prevent token replay attacks."""
view = OIDCBackChannelLogoutView()
assert view.check_and_store_jti("jti-1") is True
# Replay should be rejected
assert view.check_and_store_jti("jti-1") is False
def test_logout_user_sessions_user_not_found():
"""Test logout behavior when user with given sub is not found."""
view = OIDCBackChannelLogoutView()
# Non-existing sub should be treated as success per spec
assert view.logout_user_sessions("non-existent-sub") is True
def test_logout_user_sessions_delete_all_sessions():
"""Test that all user sessions are deleted when no specific sid is provided."""
user = factories.UserFactory()
# Create two authenticated sessions using two clients
client1 = APIClient()
client1.force_login(user)
key1 = client1.session.session_key
client2 = APIClient()
client2.force_login(user)
key2 = client2.session.session_key
view = OIDCBackChannelLogoutView()
ok = view.logout_user_sessions(user.sub)
assert ok is True
# Sessions should be gone
remaining = Session.objects.filter(session_key__in=[key1, key2])
assert remaining.count() == 0
def test_logout_user_sessions_with_specific_sid_only_one_deleted():
"""Test that only the session with specific sid is deleted when sid is provided."""
user = factories.UserFactory()
# Create two authenticated sessions and set sid on each
client_keep = APIClient()
client_keep.force_login(user)
session_keep = client_keep.session
session_keep["oidc_sid"] = "keep"
session_keep.save()
key_keep = session_keep.session_key
client_delete = APIClient()
client_delete.force_login(user)
session_delete = client_delete.session
session_delete["oidc_sid"] = "delete"
session_delete.save()
key_delete = session_delete.session_key
view = OIDCBackChannelLogoutView()
ok = view.logout_user_sessions(user.sub, sid="delete")
assert ok is True
assert Session.objects.filter(session_key=key_delete).count() == 0
assert Session.objects.filter(session_key=key_keep).count() == 1
def test_logout_user_sessions_multiple_users_same_sub_error(monkeypatch):
"""If multiple users with same sub exist, the method should return False."""
# Create two users, and then force the query to return MultipleObjectsReturned for the same sub
# Sanity: ORM would normally not allow duplicate subs due to unique=True, but
# we simulate the MultipleObjectsReturned branch by monkeypatching get() to raise.
def raise_multiple(*args, **kwargs): # noqa: ARG001
raise UserModel.MultipleObjectsReturned()
monkeypatch.setattr(UserModel.objects, "get", raise_multiple)
view = OIDCBackChannelLogoutView()
assert view.logout_user_sessions("dup-sub") is False
@pytest.mark.django_db(transaction=True)
@responses.activate
@pytest.mark.parametrize("oidc_sid", ["sid-123", None])
def test_backchannel_full_flow_no_mock(live_server, settings, oidc_sid):
"""End-to-end backchannel logout with live JWKS and real JWT signature validation."""
# Configure OIDC settings to point to live JWKS
issuer = live_server.url
settings.OIDC_RP_SIGN_ALGO = "RS256"
settings.OIDC_OP_URL = issuer
settings.OIDC_RP_CLIENT_ID = "test-client"
settings.OIDC_OP_JWKS_ENDPOINT = f"{issuer}/.well-known/jwks.json"
# Generate RSA keypair and corresponding JWK
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
public_numbers = key.public_key().public_numbers()
def b64u(x: bytes) -> str:
return base64.urlsafe_b64encode(x).rstrip(b"=").decode("ascii")
n = b64u(public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, "big"))
e = b64u(public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, "big"))
kid = b64u(sha256(n.encode("ascii")).digest())
jwk = {"kty": "RSA", "n": n, "e": e, "alg": "RS256", "use": "sig", "kid": kid}
jwks = {"keys": [jwk]}
# Mock the JWKS HTTP endpoint with responses
responses.add(
responses.GET,
settings.OIDC_OP_JWKS_ENDPOINT,
json=jwks,
status=200,
content_type="application/json",
)
# Create a user with a session and sid
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
session = client.session
session["oidc_sid"] = oidc_sid
session.save()
# Build logout token
now = int(time.time())
payload = {
"iss": issuer,
"aud": settings.OIDC_RP_CLIENT_ID,
"iat": now,
"exp": now + 60,
"jti": f"jti-{crypto.get_random_string(12)}",
"sub": user.sub,
"events": {OIDCBackChannelLogoutView.LOGOUT_EVENT_URI: {}},
}
if oidc_sid:
payload["sid"] = oidc_sid
token = pyjwt.encode(
payload,
private_pem,
algorithm="RS256",
headers={"alg": "RS256", "kid": kid, "typ": "logout+jwt"},
)
# Post backchannel logout
url = reverse("oidc_backchannel_logout")
resp = client.post(url, data={"logout_token": token})
assert resp.status_code == 204
assert resp["Cache-Control"] == "no-store"
# Session should be deleted
assert Session.objects.filter(session_key=session.session_key).count() == 0
def test_logout_user_sessions_sid_only_user_found():
"""Test logout behavior when only sid is provided and user is found."""
user = factories.UserFactory()
# Create session with oidc_sid
client = APIClient()
client.force_login(user)
session = client.session
session["oidc_sid"] = "test-sid-123"
session.save()
session_key = session.session_key
view = OIDCBackChannelLogoutView()
# Mock the user-related methods to verify they're called
with (
mock.patch.object(view, "revoke_refresh_tokens") as mock_revoke,
mock.patch.object(view, "propagate_logout_to_downstream_rps") as mock_propagate,
):
result = view.logout_user_sessions(sub=None, sid="test-sid-123")
assert result is True
# Session should be deleted
assert Session.objects.filter(session_key=session_key).count() == 0
# User-related methods should be called since user was resolved
mock_revoke.assert_called_once_with(user)
mock_propagate.assert_called_once_with(user, None, "test-sid-123")
def test_logout_user_sessions_sid_only_no_matching_session():
"""Test logout behavior when only sid is provided but no matching session found."""
user = factories.UserFactory()
# Create session with different oidc_sid
client = APIClient()
client.force_login(user)
session = client.session
session["oidc_sid"] = "different-sid"
session.save()
session_key = session.session_key
view = OIDCBackChannelLogoutView()
with (
mock.patch.object(view, "revoke_refresh_tokens") as mock_revoke,
mock.patch.object(view, "propagate_logout_to_downstream_rps") as mock_propagate,
):
result = view.logout_user_sessions(sub=None, sid="test-sid-123")
# Should succeed (per spec, already logged out is success)
assert result is True
# Session should still exist
assert Session.objects.filter(session_key=session_key).count() == 1
# User-related methods should not be called since no user was resolved
mock_revoke.assert_not_called()
mock_propagate.assert_not_called()
def test_logout_user_sessions_sid_only_orphaned_session():
"""Test logout behavior when only sid is provided but user no longer exists."""
user = factories.UserFactory()
# Create session with oidc_sid
client = APIClient()
client.force_login(user)
session = client.session
session["oidc_sid"] = "test-sid-123"
session.save()
session_key = session.session_key
# Delete the user, leaving orphaned session
user.delete()
view = OIDCBackChannelLogoutView()
with (
mock.patch.object(view, "revoke_refresh_tokens") as mock_revoke,
mock.patch.object(view, "propagate_logout_to_downstream_rps") as mock_propagate,
):
result = view.logout_user_sessions(sub=None, sid="test-sid-123")
assert result is True
# Orphaned session should be deleted
assert Session.objects.filter(session_key=session_key).count() == 0
# User-related methods should not be called since user doesn't exist
mock_revoke.assert_not_called()
mock_propagate.assert_not_called()
def test_logout_user_sessions_sid_only_anonymous_session():
"""Test logout behavior when only sid is provided for anonymous session."""
# Create anonymous session with oidc_sid but no _auth_user_id
client = APIClient()
# Force create a session without login
session = client.session
session.create() # Force session creation
session["oidc_sid"] = "test-sid-123"
session.save()
session_key = session.session_key
view = OIDCBackChannelLogoutView()
with (
mock.patch.object(view, "revoke_refresh_tokens") as mock_revoke,
mock.patch.object(view, "propagate_logout_to_downstream_rps") as mock_propagate,
):
result = view.logout_user_sessions(sub=None, sid="test-sid-123")
assert result is True
# Anonymous session should be deleted
assert Session.objects.filter(session_key=session_key).count() == 0
# User-related methods should not be called since no user was resolved
mock_revoke.assert_not_called()
mock_propagate.assert_not_called()
def test_logout_user_sessions_sid_only_invalid_user_id():
"""Test logout behavior when session has invalid user_id format."""
# Create session with invalid user_id
client = APIClient()
session = client.session
session.create() # Force session creation
session["oidc_sid"] = "test-sid-123"
session["_auth_user_id"] = "invalid-user-id"
session.save()
session_key = client.session.session_key
view = OIDCBackChannelLogoutView()
with (
mock.patch.object(view, "revoke_refresh_tokens") as mock_revoke,
mock.patch.object(view, "propagate_logout_to_downstream_rps") as mock_propagate,
):
result = view.logout_user_sessions(sub=None, sid="test-sid-123")
# Should succeed even though user_id is invalid
assert result is True
# Session should still exist since we continue on error
assert Session.objects.filter(session_key=session_key).count() == 1
# User-related methods should not be called
mock_revoke.assert_not_called()
mock_propagate.assert_not_called()
def test_logout_user_sessions_sid_only_multiple_sessions_only_matching_deleted():
"""Test that only the session with matching sid is deleted when multiple sessions exist."""
user1 = factories.UserFactory()
user2 = factories.UserFactory()
# Create session for user1 with target sid
client1 = APIClient()
client1.force_login(user1)
session1 = client1.session
session1["oidc_sid"] = "target-sid"
session1.save()
key1 = session1.session_key
# Create session for user2 with different sid
client2 = APIClient()
client2.force_login(user2)
session2 = client2.session
session2["oidc_sid"] = "other-sid"
session2.save()
key2 = session2.session_key
# Create another session for user1 with different sid
client3 = APIClient()
client3.force_login(user1)
session3 = client3.session
session3["oidc_sid"] = "another-sid"
session3.save()
key3 = session3.session_key
view = OIDCBackChannelLogoutView()
with (
mock.patch.object(view, "revoke_refresh_tokens") as mock_revoke,
mock.patch.object(view, "propagate_logout_to_downstream_rps") as mock_propagate,
):
result = view.logout_user_sessions(sub=None, sid="target-sid")
assert result is True
# Only the session with matching sid should be deleted
assert Session.objects.filter(session_key=key1).count() == 0 # deleted
assert Session.objects.filter(session_key=key2).count() == 1 # kept
assert Session.objects.filter(session_key=key3).count() == 1 # kept
# User-related methods should be called for user1
mock_revoke.assert_called_once_with(user1)
mock_propagate.assert_called_once_with(user1, None, "target-sid")
def test_logout_user_sessions_neither_sub_nor_sid():
"""Test logout behavior when neither sub nor sid is provided."""
view = OIDCBackChannelLogoutView()
with (
mock.patch.object(view, "revoke_refresh_tokens") as mock_revoke,
mock.patch.object(view, "propagate_logout_to_downstream_rps") as mock_propagate,
):
result = view.logout_user_sessions(sub=None, sid=None)
assert result is False
# User-related methods should not be called
mock_revoke.assert_not_called()
mock_propagate.assert_not_called()
def test_logout_user_sessions_both_sub_and_sid_user_resolved_first():
"""Test that when both sub and sid are provided, user is resolved first (original behavior)."""
user = factories.UserFactory()
# Create session with oidc_sid
client = APIClient()
client.force_login(user)
session = client.session
session["oidc_sid"] = "test-sid-123"
session.save()
session_key = session.session_key
view = OIDCBackChannelLogoutView()
with (
mock.patch.object(view, "revoke_refresh_tokens") as mock_revoke,
mock.patch.object(view, "propagate_logout_to_downstream_rps") as mock_propagate,
):
result = view.logout_user_sessions(sub=user.sub, sid="test-sid-123")
assert result is True
# Session should be deleted
assert Session.objects.filter(session_key=session_key).count() == 0
# User-related methods should be called
mock_revoke.assert_called_once_with(user)
mock_propagate.assert_called_once_with(user, user.sub, "test-sid-123")