(oidc) add refresh token tools

This provides a way to to refresh the OIDC access token.

The OIDC token will be used to request data to a resource server.
This code is highly related to
mozilla/mozilla-django-oidc#377

The refresh token is encrypted in the session.
This commit is contained in:
Quentin BEY
2025-04-02 15:17:58 +02:00
parent 270f06d6a5
commit 47930a205e
11 changed files with 832 additions and 5 deletions

View File

@@ -12,5 +12,6 @@ and this project adheres to
- ✨(tools) extract domain from email address #2
- ✨(oidc) add the authentication backends #2
- ✨(oidc) add refresh token tools #3
[unreleased]: https://github.com/suitenumerique/django-lasuite/commits/main/

View File

@@ -0,0 +1,109 @@
# Using the OIDC Authentication Backend to request a resource server
Once your project is configured with the OIDC authentication backend, you can use it to request resources from a resource server. This guide will help you set up and use the `ResourceServerBackend` for token introspection and secure API access.
## Configuration
You need to follow the steps from [how-to-use-oidc-backend.md](how-to-use-oidc-backend.md)
## Additional Settings for Resource Server Communication
To enable your application to communicate with protected resource servers, you'll need to configure token storage in your Django settings:
```python
# Store OIDC tokens in the session
OIDC_STORE_ACCESS_TOKEN = True # Store the access token in the session
OIDC_STORE_REFRESH_TOKEN = True # Store the encrypted refresh token in the session
# Required for refresh token encryption
OIDC_STORE_REFRESH_TOKEN_KEY = "your-32-byte-encryption-key==" # Must be a valid Fernet key (32 url-safe base64-encoded bytes)
```
### Purpose of Each Setting
1. **`OIDC_STORE_ACCESS_TOKEN`**: When set to `True`, the access token received from the OIDC provider will be stored in the user's session. This token is required for making authenticated requests to protected resource servers.
2. **`OIDC_STORE_REFRESH_TOKEN`**: When set to `True`, enables storing the refresh token in the user's session. The refresh token allows your application to request a new access token when the current one expires without requiring user re-authentication.
3. **`OIDC_STORE_REFRESH_TOKEN_KEY`**: This is a cryptographic key used to encrypt the refresh token before storing it in the session. This provides an additional layer of security since refresh tokens are sensitive credentials that can be used to obtain new access tokens.
## Generating a Secure Refresh Token Key
You can generate a secure Fernet key using Python:
```python
from cryptography.fernet import Fernet
key = Fernet.generate_key()
print(key.decode()) # Add this value to your settings
```
## Using the Stored Tokens
Once you have configured these settings, your application can use the stored tokens to make authenticated requests to resource servers:
```python
import requests
from django.http import JsonResponse
def call_resource_server(request):
# Get the access token from the session
access_token = request.session.get('oidc_access_token')
if not access_token:
return JsonResponse({'error': 'Not authenticated'}, status=401)
# Make an authenticated request to the resource server
response = requests.get(
'https://resource-server.example.com/api/resource',
headers={'Authorization': f'Bearer {access_token}'},
)
return JsonResponse(response.json())
```
## Token Refresh management
### View Based Token Refresh (via decorator)
Request the access token refresh only on specific views using the `refresh_oidc_access_token` decorator:
```python
from lasuite.oidc_login.decorators import refresh_oidc_access_token
class SomeViewSet(GenericViewSet):
@method_decorator(refresh_oidc_access_token)
def some_action(self, request):
# Your action logic here
# The call to the resource server
access_token = request.session.get('oidc_access_token')
requests.get(
'https://resource-server.example.com/api/resource',
headers={'Authorization': f'Bearer {access_token}'},
)
```
This will trigger the token refresh process only when the `some_action` method is called.
If the access token is expired, it will attempt to refresh it using the stored refresh token.
### Automatic Token Refresh (via middleware)
You can also use the `RefreshOIDCAccessToken` middleware to automatically refresh expired tokens:
```python
# Add to your MIDDLEWARE setting
MIDDLEWARE = [
# Other middleware...
'lasuite.oidc_login.middleware.RefreshOIDCAccessToken',
]
```
This middleware will:
1. Check if the current access token is expired
2. Use the stored refresh token to obtain a new access token
3. Update the session with the new token
4. Continue processing the request with the fresh token
If token refresh fails, the middleware will return a 401 response with a `refresh_url` header to redirect the user to re-authenticate.

View File

@@ -1,14 +1,17 @@
"""Authentication Backends for OIDC."""
import logging
from functools import lru_cache
import requests
from cryptography.fernet import Fernet
from django.conf import settings
from django.core.exceptions import SuspiciousOperation
from django.utils.translation import gettext_lazy as _
from mozilla_django_oidc.auth import (
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
)
from mozilla_django_oidc.utils import import_from_settings
logger = logging.getLogger(__name__)
@@ -20,6 +23,41 @@ OIDC_USER_SUB_FIELD = getattr(
) # Default to 'sub' if not set in settings
@lru_cache(maxsize=1)
def get_cipher_suite():
"""Return a Fernet cipher suite."""
key = import_from_settings("OIDC_STORE_REFRESH_TOKEN_KEY", None)
if not key:
raise ValueError("OIDC_STORE_REFRESH_TOKEN_KEY setting is required.")
return Fernet(key)
def store_oidc_refresh_token(session, refresh_token):
"""Store the encrypted OIDC refresh token in the session if enabled in settings."""
if import_from_settings("OIDC_STORE_REFRESH_TOKEN", False):
encrypted_token = get_cipher_suite().encrypt(refresh_token.encode())
session["oidc_refresh_token"] = encrypted_token.decode()
def get_oidc_refresh_token(session):
"""Retrieve and decrypt the OIDC refresh token from the session."""
encrypted_token = session.get("oidc_refresh_token")
if encrypted_token:
return get_cipher_suite().decrypt(encrypted_token.encode()).decode()
return None
def store_tokens(session, access_token, id_token, refresh_token):
"""Store tokens in the session if enabled in settings."""
if import_from_settings("OIDC_STORE_ACCESS_TOKEN", False):
session["oidc_access_token"] = access_token
if import_from_settings("OIDC_STORE_ID_TOKEN", False):
session["oidc_id_token"] = id_token
store_oidc_refresh_token(session, refresh_token)
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
"""
Custom OpenID Connect (OIDC) Authentication Backend.
@@ -28,6 +66,36 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
in the User model, and handles signed and/or encrypted UserInfo response.
"""
def __init__(self, *args, **kwargs):
"""
Initialize the OIDC Authentication Backend.
Adds an internal attribute to store the token_info dictionary.
The purpose of `self._token_info` is to not duplicate code from
the original `authenticate` method.
This won't be needed after https://github.com/mozilla/mozilla-django-oidc/pull/377
is merged.
"""
super().__init__(*args, **kwargs)
self._token_info = None
def get_token(self, payload):
"""
Return token object as a dictionary.
Store the value to extract the refresh token in the `authenticate` method.
"""
self._token_info = super().get_token(payload)
return self._token_info
def authenticate(self, request, **kwargs):
"""Authenticate a user based on the OIDC code flow."""
user = super().authenticate(request, **kwargs)
if user is not None:
# Then the user successfully authenticated
store_oidc_refresh_token(request.session, self._token_info.get("refresh_token"))
return user
def get_extra_claims(self, user_info):
"""
Return extra claims from user_info.

View File

@@ -0,0 +1,11 @@
"""
Decorators for the authentication app.
We don't want (yet) to enforce the OIDC access token to be "fresh" for all
views, so we provide a decorator to refresh the access token only when needed.
"""
from django.utils.decorators import decorator_from_middleware
from .middleware import RefreshOIDCAccessToken
refresh_oidc_access_token = decorator_from_middleware(RefreshOIDCAccessToken)

View File

@@ -0,0 +1,194 @@
"""
Module to declare a RefreshOIDCAccessToken middleware that extends the
mozilla_django_oidc.middleware.SessionRefresh middleware to refresh the
access token when it expires, based on the OIDC provided refresh token.
This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377
which is still not merged.
"""
import json
import logging
import time
from urllib.parse import quote, urlencode
import requests
from django.http import JsonResponse
from django.urls import reverse
from django.utils.crypto import get_random_string
from mozilla_django_oidc.middleware import SessionRefresh
from rest_framework.status import HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED
from lasuite.oidc_login.backends import get_oidc_refresh_token, store_tokens
try:
from mozilla_django_oidc.middleware import (
RefreshOIDCAccessToken as MozillaRefreshOIDCAccessToken, # noqa F401
)
# If the import is successful, raise an error to notify the user that the
# version of mozilla_django_oidc added the expected middleware, and we don't need
# our implementation anymore.
# See https://github.com/mozilla/mozilla-django-oidc/pull/377
raise RuntimeError("This version of mozilla_django_oidc has RefreshOIDCAccessToken")
except ImportError:
pass
from mozilla_django_oidc.utils import (
absolutify,
add_state_and_verifier_and_nonce_to_session,
import_from_settings,
)
logger = logging.getLogger(__name__)
class RefreshOIDCAccessToken(SessionRefresh):
"""
A middleware that will refresh the access token following proper OIDC protocol:
https://auth0.com/docs/tokens/refresh-token/current
This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377
but limited to our needs (YAGNI/KISS).
"""
def _prepare_reauthorization(self, request):
"""
Construct a new authorization grant request to refresh the session.
Besides constructing the request, the state and nonce included in the
request are registered in the current session in preparation for the
client following through with the authorization flow.
"""
auth_url = self.OIDC_OP_AUTHORIZATION_ENDPOINT
client_id = self.OIDC_RP_CLIENT_ID
state = get_random_string(self.OIDC_STATE_SIZE)
# Build the parameters as if we were doing a real auth handoff, except
# we also include prompt=none.
auth_params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": absolutify(request, reverse(self.OIDC_AUTHENTICATION_CALLBACK_URL)),
"state": state,
"scope": self.OIDC_RP_SCOPES,
"prompt": "none",
}
if self.OIDC_USE_NONCE:
nonce = get_random_string(self.OIDC_NONCE_SIZE)
auth_params.update({"nonce": nonce})
# Register the one-time parameters in the session
add_state_and_verifier_and_nonce_to_session(request, state, auth_params)
request.session["oidc_login_next"] = request.get_full_path()
query = urlencode(auth_params, quote_via=quote)
return f"{auth_url}?{query}"
def is_expired(self, request):
"""Check whether the access token is expired and needs to be refreshed."""
if not self.is_refreshable_url(request):
logger.debug("request is not refreshable")
return False
expiration = request.session.get("oidc_token_expiration", 0)
now = time.time()
if expiration > now:
# The id_token is still valid, so we don't have to do anything.
logger.debug("id token is still valid (%s > %s)", expiration, now)
return False
return True
def finish(self, request, prompt_reauth=True):
"""
Finish request handling and handle sending downstream responses for XHR.
This function should only be run if the session is determined to
be expired.
Almost all XHR request handling in client-side code struggles
with redirects since redirecting to a page where the user
is supposed to do something is extremely unlikely to work
in an XHR request. Make a special response for these kinds
of requests.
The use of 403 Forbidden is to match the fact that this
middleware doesn't really want the user in if they don't
refresh their session.
WARNING: this varies from the original implementation:
- to return a 401 status code
- to consider all requests as XHR requests
"""
xhr_response_json = {"error": "the authentication session has expired"}
if prompt_reauth:
# The id_token has expired, so we have to re-authenticate silently.
refresh_url = self._prepare_reauthorization(request)
xhr_response_json["refresh_url"] = refresh_url
xhr_response = JsonResponse(xhr_response_json, status=HTTP_401_UNAUTHORIZED)
if "refresh_url" in xhr_response_json:
xhr_response["refresh_url"] = xhr_response_json["refresh_url"]
return xhr_response
def process_request(self, request): # noqa: PLR0911 # pylint: disable=too-many-return-statements
"""Process the request and refresh the access token if necessary."""
if not self.is_expired(request):
return None
token_url = self.get_settings("OIDC_OP_TOKEN_ENDPOINT")
client_id = self.get_settings("OIDC_RP_CLIENT_ID")
client_secret = self.get_settings("OIDC_RP_CLIENT_SECRET")
refresh_token = get_oidc_refresh_token(request.session)
if not refresh_token:
logger.debug("no refresh token stored")
return self.finish(request, prompt_reauth=True)
token_payload = {
"grant_type": "refresh_token",
"client_id": client_id,
"client_secret": client_secret,
"refresh_token": refresh_token,
}
req_auth = None
if self.get_settings("OIDC_TOKEN_USE_BASIC_AUTH", False):
# supported in https://github.com/mozilla/mozilla-django-oidc/pull/377
# but we don't need it, so enforce error here.
raise RuntimeError("OIDC_TOKEN_USE_BASIC_AUTH is not supported")
try:
response = requests.post(
token_url,
auth=req_auth,
data=token_payload,
verify=import_from_settings("OIDC_VERIFY_SSL", True),
timeout=import_from_settings("OIDC_TIMEOUT", 3),
)
response.raise_for_status()
token_info = response.json()
except requests.exceptions.Timeout:
logger.debug("timed out refreshing access token")
# Don't prompt for reauth as this could be a temporary problem
return self.finish(request, prompt_reauth=False)
except requests.exceptions.HTTPError as exc:
status_code = exc.response.status_code
logger.debug("http error %s when refreshing access token", status_code)
# OAuth error response will be a 400 for various situations, including
# an expired token. https://datatracker.ietf.org/doc/html/rfc6749#section-5.2
return self.finish(request, prompt_reauth=status_code == HTTP_400_BAD_REQUEST)
except json.JSONDecodeError:
logger.debug("malformed response when refreshing access token")
# Don't prompt for reauth as this could be a temporary problem
return self.finish(request, prompt_reauth=False)
except Exception as exc: # pylint: disable=broad-except
logger.exception("unknown error occurred when refreshing access token: %s", exc)
# Don't prompt for reauth as this could be a temporary problem
return self.finish(request, prompt_reauth=False)
# Until we can properly validate an ID token on the refresh response
# per the spec[1], we intentionally drop the id_token.
# [1]: https://openid.net/specs/openid-connect-core-1_0.html#RefreshTokenResponse
id_token = None
access_token = token_info.get("access_token")
refresh_token = token_info.get("refresh_token")
store_tokens(request.session, access_token, id_token, refresh_token)
return None

View File

@@ -133,7 +133,7 @@ class ResourceServerBackend:
requested_scopes = introspection_response.get("scope", None).split(" ")
if set(self._scopes).isdisjoint(set(requested_scopes)):
message = "Introspection response contains any required scopes."
message = "Introspection response is missing required scopes."
logger.debug(message)
raise SuspiciousOperation(message)
@@ -273,7 +273,7 @@ class JWTResourceServerBackend(ResourceServerBackend):
try:
public_key_set = self._authorization_server_client.import_public_keys()
except (TypeError, ValueError, AttributeError, HTTPError) as err:
message = "Could get authorization server JWKS"
message = "Could not get authorization server JWKS"
logger.debug("%s. Exception:", message, exc_info=True)
raise SuspiciousOperation(message) from err

View File

@@ -1,10 +1,14 @@
"""Unit tests for the Authentication Backends."""
import re
import pytest
import responses
from cryptography.fernet import Fernet
from django.contrib.auth import get_user_model
from django.core.exceptions import SuspiciousOperation
from lasuite.oidc_login.backends import OIDCAuthenticationBackend
from lasuite.oidc_login.backends import OIDCAuthenticationBackend, get_oidc_refresh_token, store_oidc_refresh_token
from .. import factories
@@ -13,6 +17,23 @@ pytestmark = pytest.mark.django_db
User = get_user_model()
def test_oidc_refresh_token_session_store(settings):
"""Test that the OIDC refresh token is stored and retrieved from the session."""
settings.OIDC_STORE_REFRESH_TOKEN = True
session = {}
with pytest.raises(ValueError, match="OIDC_STORE_REFRESH_TOKEN_KEY setting is required."):
store_oidc_refresh_token(session, "test-refresh-token")
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
store_oidc_refresh_token(session, "test-refresh-token")
assert session["oidc_refresh_token"] is not None
assert session["oidc_refresh_token"] != "test-refresh-token"
assert get_oidc_refresh_token(session) == "test-refresh-token"
def test_authentication_getter_existing_user_no_email(django_assert_num_queries, monkeypatch):
"""If an existing user matches the user's info sub, the user should be returned."""
klass = OIDCAuthenticationBackend()
@@ -259,3 +280,52 @@ def test_authentication_getter_existing_disabled_user_via_email(django_assert_nu
klass.get_or_create_user(access_token="test-token", id_token=None, payload=None)
assert User.objects.count() == 1
@responses.activate
def test_authentication_session_tokens(django_assert_num_queries, monkeypatch, rf, settings):
"""Test the session contains oidc_refresh_token and oidc_access_token after authentication."""
settings.OIDC_OP_TOKEN_ENDPOINT = "http://oidc.endpoint.test/token"
settings.OIDC_OP_USER_ENDPOINT = "http://oidc.endpoint.test/userinfo"
settings.OIDC_OP_JWKS_ENDPOINT = "http://oidc.endpoint.test/jwks"
settings.OIDC_STORE_ACCESS_TOKEN = True
settings.OIDC_STORE_REFRESH_TOKEN = True
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
klass = OIDCAuthenticationBackend()
request = rf.get("/some-url", {"state": "test-state", "code": "test-code"})
request.session = {}
def verify_token_mocked(*args, **kwargs):
return {"sub": "123", "email": "test@example.com"}
monkeypatch.setattr(OIDCAuthenticationBackend, "verify_token", verify_token_mocked)
responses.add(
responses.POST,
re.compile(settings.OIDC_OP_TOKEN_ENDPOINT),
json={
"access_token": "test-access-token",
"refresh_token": "test-refresh-token",
},
status=200,
)
responses.add(
responses.GET,
re.compile(settings.OIDC_OP_USER_ENDPOINT),
json={"sub": "123", "email": "test@example.com"},
status=200,
)
with django_assert_num_queries(3):
user = klass.authenticate(
request,
code="test-code",
nonce="test-nonce",
code_verifier="test-code-verifier",
)
assert user is not None
assert request.session["oidc_access_token"] == "test-access-token"
assert get_oidc_refresh_token(request.session) == "test-refresh-token"

View File

@@ -0,0 +1,51 @@
"""Tests for the refresh_oidc_access_token decorator in core app."""
from unittest.mock import patch
from django.http import HttpResponse
from django.test import RequestFactory
from django.utils.decorators import method_decorator
from django.views import View
from lasuite.oidc_login.decorators import refresh_oidc_access_token
class RefreshOIDCAccessTokenView(View):
"""
A Django view that uses the refresh_oidc_access_token decorator to refresh
the OIDC access token before processing the request.
"""
@method_decorator(refresh_oidc_access_token)
def dispatch(self, request, *args, **kwargs):
"""Override the dispatch method to apply the refresh_oidc_access_token decorator."""
return super().dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
"""
Handle GET requests.
Returns:
HttpResponse: A simple HTTP response with "OK" as the content.
"""
return HttpResponse("OK")
def test_refresh_oidc_access_token_decorator():
"""
Tests the refresh_oidc_access_token decorator is called on RefreshOIDCAccessTokenView access.
The test creates a mock request and patches the dispatch method to verify that it is called
with the correct request object.
"""
# Create a test request
factory = RequestFactory()
request = factory.get("/")
# Mock the OIDC refresh functionality
with patch("lasuite.oidc_login.middleware.RefreshOIDCAccessToken.process_request") as mock_refresh:
# Call the decorated view
RefreshOIDCAccessTokenView.as_view()(request)
# Assert that the refresh method was called
mock_refresh.assert_called_once_with(request)

View File

@@ -0,0 +1,322 @@
"""Tests for the RefreshOIDCAccessToken middleware."""
import time
from unittest.mock import MagicMock
import factories
import pytest
import requests.exceptions
import responses
from cryptography.fernet import Fernet
from django.contrib.auth.models import AnonymousUser
from django.contrib.sessions.middleware import SessionMiddleware
from django.http import HttpResponse, JsonResponse
from django.test import RequestFactory
from lasuite.oidc_login.backends import get_cipher_suite, get_oidc_refresh_token, store_oidc_refresh_token
from lasuite.oidc_login.middleware import RefreshOIDCAccessToken
pytestmark = pytest.mark.django_db
@pytest.fixture(name="oidc_settings")
def fixture_oidc_settings(settings):
"""Fixture to configure OIDC settings for the tests."""
settings.OIDC_OP_TOKEN_ENDPOINT = "https://auth.example.com/token"
settings.OIDC_OP_AUTHORIZATION_ENDPOINT = "https://auth.example.com/authorize"
settings.OIDC_RP_CLIENT_ID = "client_id"
settings.OIDC_RP_CLIENT_SECRET = "client_secret"
settings.OIDC_AUTHENTICATION_CALLBACK_URL = "oidc_authentication_callback"
settings.OIDC_RP_SCOPES = "openid email"
settings.OIDC_USE_NONCE = True
settings.OIDC_STATE_SIZE = 32
settings.OIDC_NONCE_SIZE = 32
settings.OIDC_VERIFY_SSL = True
settings.OIDC_TOKEN_USE_BASIC_AUTH = False
settings.OIDC_STORE_ACCESS_TOKEN = True
settings.OIDC_STORE_REFRESH_TOKEN = True
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
get_cipher_suite.cache_clear()
yield settings
get_cipher_suite.cache_clear()
def test_anonymous_user(oidc_settings): # pylint: disable=unused-argument
"""
When the user is not authenticated, this
is not the purpose of the middleware to manage anything.
"""
request = RequestFactory().get("/test")
request.user = AnonymousUser()
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert response is None
def test_no_refresh_token(oidc_settings): # pylint: disable=unused-argument
"""
When the session does not contain a refresh token,
the middleware should return a 401 response containing
the URL to authenticate again.
"""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "expired_token"
request.session["oidc_token_expiration"] = time.time() - 100
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert isinstance(response, JsonResponse)
assert response.status_code == 401
assert response.has_header("refresh_url")
assert response["refresh_url"].startswith("https://auth.example.com/authorize")
def test_basic_auth_disabled(oidc_settings): # pylint: disable=unused-argument
"""We don't support OIDC_TOKEN_USE_BASIC_AUTH."""
oidc_settings.OIDC_TOKEN_USE_BASIC_AUTH = True
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
middleware = RefreshOIDCAccessToken(get_response)
with pytest.raises(RuntimeError) as excinfo:
middleware.process_request(request)
assert str(excinfo.value) == "OIDC_TOKEN_USE_BASIC_AUTH is not supported"
@responses.activate
def test_successful_token_refresh(oidc_settings): # pylint: disable=unused-argument
"""Test that the middleware successfully refreshes the token."""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
responses.add(
responses.POST,
"https://auth.example.com/token",
json={"access_token": "new_token", "refresh_token": "new_refresh_token"},
status=200,
)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
request.session.save()
assert response is None
assert request.session["oidc_access_token"] == "new_token"
assert get_oidc_refresh_token(request.session) == "new_refresh_token"
def test_non_expired_token(oidc_settings): # pylint: disable=unused-argument
"""Test that the middleware does nothing when the token is not expired."""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = ("valid_token",)
request.session["oidc_token_expiration"] = time.time() + 3600
request.session.save()
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert response is None
@responses.activate
def test_refresh_token_request_timeout(oidc_settings): # pylint: disable=unused-argument
"""Test that the middleware returns a 401 response when the token refresh request times out."""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
responses.add(
responses.POST,
"https://auth.example.com/token",
body=requests.exceptions.Timeout("timeout"),
)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert isinstance(response, HttpResponse)
assert response.status_code == 401
assert not response.has_header("refresh_url")
@responses.activate
def test_refresh_token_request_error_400(oidc_settings): # pylint: disable=unused-argument
"""
Test that the middleware returns a 401 response when the token
refresh request returns a 400 error.
"""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
responses.add(
responses.POST,
"https://auth.example.com/token",
json={"error": "invalid_grant"},
status=400,
)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert isinstance(response, HttpResponse)
assert response.status_code == 401
assert response.has_header("refresh_url")
assert response["refresh_url"].startswith("https://auth.example.com/authorize")
@responses.activate
def test_refresh_token_request_error(oidc_settings): # pylint: disable=unused-argument
"""
Test that the middleware returns a 401 response when
the token refresh request returns a 404 error.
"""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
responses.add(
responses.POST,
"https://auth.example.com/token",
json={"error": "invalid_grant"},
status=404,
)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert isinstance(response, HttpResponse)
assert response.status_code == 401
assert not response.has_header("refresh_url")
@responses.activate
def test_refresh_token_request_malformed_json_error(oidc_settings): # pylint: disable=unused-argument
"""
Test that the middleware returns a 401 response
when the token refresh request returns malformed JSON.
"""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
responses.add(
responses.POST,
"https://auth.example.com/token",
body="malformed json",
status=200,
)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert isinstance(response, HttpResponse)
assert response.status_code == 401
assert not response.has_header("refresh_url")
@responses.activate
def test_refresh_token_request_exception(oidc_settings): # pylint: disable=unused-argument
"""
Test that the middleware returns a 401 response
when the token refresh request raises an exception.
"""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
responses.add(
responses.POST,
"https://auth.example.com/token",
body={"error": "invalid_grant"}, # invalid format dict
status=200,
)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert isinstance(response, HttpResponse)
assert response.status_code == 401
assert not response.has_header("refresh_url")

View File

@@ -280,7 +280,7 @@ def test_jwt_resource_server_backend_introspect_public_key_import_failure(
)
with patch.object(Logger, "debug") as mock_logger_debug:
expected_message = "Could get authorization server JWKS"
expected_message = "Could not get authorization server JWKS"
with pytest.raises(SuspiciousOperation, match=expected_message):
jwt_resource_server_backend._introspect(token)
@@ -317,7 +317,7 @@ def test_verify_user_info_wrong_scopes(resource_server_backend):
"""Test '_verify_user_info' with wrong requested scopes."""
introspection_response = {"active": True, "scope": "wrong-scopes"}
expected_message = "Introspection response contains any required scopes."
expected_message = "Introspection response is missing required scopes."
with patch.object(Logger, "debug") as mock_logger_debug:
with pytest.raises(SuspiciousOperation, match=expected_message):
resource_server_backend._verify_user_info(introspection_response)

View File

@@ -110,6 +110,7 @@ ALLOW_LOGOUT_GET_METHOD = True
OIDC_OP_TOKEN_ENDPOINT = None
OIDC_OP_USER_ENDPOINT = None
OIDC_OP_LOGOUT_ENDPOINT = None
OIDC_OP_AUTHORIZATION_ENDPOINT = None
OIDC_RP_CLIENT_ID = "lasuite"
OIDC_RP_CLIENT_SECRET = "lasuite"
USER_OIDC_FIELDS_TO_FULLNAME = ["first_name", "last_name"]