diff --git a/CHANGELOG.md b/CHANGELOG.md index cae0120..9d18d61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,5 +12,6 @@ and this project adheres to - ✨(tools) extract domain from email address #2 - ✨(oidc) add the authentication backends #2 +- ✨(oidc) add refresh token tools #3 [unreleased]: https://github.com/suitenumerique/django-lasuite/commits/main/ diff --git a/documentation/how-to-use-oidc-call-to-resource-server.md b/documentation/how-to-use-oidc-call-to-resource-server.md new file mode 100644 index 0000000..c816173 --- /dev/null +++ b/documentation/how-to-use-oidc-call-to-resource-server.md @@ -0,0 +1,109 @@ +# Using the OIDC Authentication Backend to request a resource server + +Once your project is configured with the OIDC authentication backend, you can use it to request resources from a resource server. This guide will help you set up and use the `ResourceServerBackend` for token introspection and secure API access. + +## Configuration + +You need to follow the steps from [how-to-use-oidc-backend.md](how-to-use-oidc-backend.md) + +## Additional Settings for Resource Server Communication + +To enable your application to communicate with protected resource servers, you'll need to configure token storage in your Django settings: + +```python +# Store OIDC tokens in the session +OIDC_STORE_ACCESS_TOKEN = True # Store the access token in the session +OIDC_STORE_REFRESH_TOKEN = True # Store the encrypted refresh token in the session + +# Required for refresh token encryption +OIDC_STORE_REFRESH_TOKEN_KEY = "your-32-byte-encryption-key==" # Must be a valid Fernet key (32 url-safe base64-encoded bytes) +``` + +### Purpose of Each Setting + +1. **`OIDC_STORE_ACCESS_TOKEN`**: When set to `True`, the access token received from the OIDC provider will be stored in the user's session. This token is required for making authenticated requests to protected resource servers. + +2. **`OIDC_STORE_REFRESH_TOKEN`**: When set to `True`, enables storing the refresh token in the user's session. The refresh token allows your application to request a new access token when the current one expires without requiring user re-authentication. + +3. **`OIDC_STORE_REFRESH_TOKEN_KEY`**: This is a cryptographic key used to encrypt the refresh token before storing it in the session. This provides an additional layer of security since refresh tokens are sensitive credentials that can be used to obtain new access tokens. + +## Generating a Secure Refresh Token Key + +You can generate a secure Fernet key using Python: + +```python +from cryptography.fernet import Fernet +key = Fernet.generate_key() +print(key.decode()) # Add this value to your settings +``` + +## Using the Stored Tokens + +Once you have configured these settings, your application can use the stored tokens to make authenticated requests to resource servers: + +```python +import requests +from django.http import JsonResponse + +def call_resource_server(request): + # Get the access token from the session + access_token = request.session.get('oidc_access_token') + + if not access_token: + return JsonResponse({'error': 'Not authenticated'}, status=401) + + # Make an authenticated request to the resource server + response = requests.get( + 'https://resource-server.example.com/api/resource', + headers={'Authorization': f'Bearer {access_token}'}, + ) + + return JsonResponse(response.json()) +``` + +## Token Refresh management + +### View Based Token Refresh (via decorator) + +Request the access token refresh only on specific views using the `refresh_oidc_access_token` decorator: + +```python +from lasuite.oidc_login.decorators import refresh_oidc_access_token + +class SomeViewSet(GenericViewSet): + + @method_decorator(refresh_oidc_access_token) + def some_action(self, request): + # Your action logic here + + # The call to the resource server + access_token = request.session.get('oidc_access_token') + requests.get( + 'https://resource-server.example.com/api/resource', + headers={'Authorization': f'Bearer {access_token}'}, + ) +``` + +This will trigger the token refresh process only when the `some_action` method is called. +If the access token is expired, it will attempt to refresh it using the stored refresh token. + +### Automatic Token Refresh (via middleware) + +You can also use the `RefreshOIDCAccessToken` middleware to automatically refresh expired tokens: + +```python +# Add to your MIDDLEWARE setting +MIDDLEWARE = [ + # Other middleware... + 'lasuite.oidc_login.middleware.RefreshOIDCAccessToken', +] +``` + +This middleware will: +1. Check if the current access token is expired +2. Use the stored refresh token to obtain a new access token +3. Update the session with the new token +4. Continue processing the request with the fresh token + +If token refresh fails, the middleware will return a 401 response with a `refresh_url` header to redirect the user to re-authenticate. + diff --git a/src/lasuite/oidc_login/backends.py b/src/lasuite/oidc_login/backends.py index 042a273..f660980 100644 --- a/src/lasuite/oidc_login/backends.py +++ b/src/lasuite/oidc_login/backends.py @@ -1,14 +1,17 @@ """Authentication Backends for OIDC.""" import logging +from functools import lru_cache import requests +from cryptography.fernet import Fernet from django.conf import settings from django.core.exceptions import SuspiciousOperation from django.utils.translation import gettext_lazy as _ from mozilla_django_oidc.auth import ( OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend, ) +from mozilla_django_oidc.utils import import_from_settings logger = logging.getLogger(__name__) @@ -20,6 +23,41 @@ OIDC_USER_SUB_FIELD = getattr( ) # Default to 'sub' if not set in settings +@lru_cache(maxsize=1) +def get_cipher_suite(): + """Return a Fernet cipher suite.""" + key = import_from_settings("OIDC_STORE_REFRESH_TOKEN_KEY", None) + if not key: + raise ValueError("OIDC_STORE_REFRESH_TOKEN_KEY setting is required.") + return Fernet(key) + + +def store_oidc_refresh_token(session, refresh_token): + """Store the encrypted OIDC refresh token in the session if enabled in settings.""" + if import_from_settings("OIDC_STORE_REFRESH_TOKEN", False): + encrypted_token = get_cipher_suite().encrypt(refresh_token.encode()) + session["oidc_refresh_token"] = encrypted_token.decode() + + +def get_oidc_refresh_token(session): + """Retrieve and decrypt the OIDC refresh token from the session.""" + encrypted_token = session.get("oidc_refresh_token") + if encrypted_token: + return get_cipher_suite().decrypt(encrypted_token.encode()).decode() + return None + + +def store_tokens(session, access_token, id_token, refresh_token): + """Store tokens in the session if enabled in settings.""" + if import_from_settings("OIDC_STORE_ACCESS_TOKEN", False): + session["oidc_access_token"] = access_token + + if import_from_settings("OIDC_STORE_ID_TOKEN", False): + session["oidc_id_token"] = id_token + + store_oidc_refresh_token(session, refresh_token) + + class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend): """ Custom OpenID Connect (OIDC) Authentication Backend. @@ -28,6 +66,36 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend): in the User model, and handles signed and/or encrypted UserInfo response. """ + def __init__(self, *args, **kwargs): + """ + Initialize the OIDC Authentication Backend. + Adds an internal attribute to store the token_info dictionary. + The purpose of `self._token_info` is to not duplicate code from + the original `authenticate` method. + This won't be needed after https://github.com/mozilla/mozilla-django-oidc/pull/377 + is merged. + """ + super().__init__(*args, **kwargs) + self._token_info = None + + def get_token(self, payload): + """ + Return token object as a dictionary. + Store the value to extract the refresh token in the `authenticate` method. + """ + self._token_info = super().get_token(payload) + return self._token_info + + def authenticate(self, request, **kwargs): + """Authenticate a user based on the OIDC code flow.""" + user = super().authenticate(request, **kwargs) + + if user is not None: + # Then the user successfully authenticated + store_oidc_refresh_token(request.session, self._token_info.get("refresh_token")) + + return user + def get_extra_claims(self, user_info): """ Return extra claims from user_info. diff --git a/src/lasuite/oidc_login/decorators.py b/src/lasuite/oidc_login/decorators.py new file mode 100644 index 0000000..c4ec1c7 --- /dev/null +++ b/src/lasuite/oidc_login/decorators.py @@ -0,0 +1,11 @@ +""" +Decorators for the authentication app. +We don't want (yet) to enforce the OIDC access token to be "fresh" for all +views, so we provide a decorator to refresh the access token only when needed. +""" + +from django.utils.decorators import decorator_from_middleware + +from .middleware import RefreshOIDCAccessToken + +refresh_oidc_access_token = decorator_from_middleware(RefreshOIDCAccessToken) diff --git a/src/lasuite/oidc_login/middleware.py b/src/lasuite/oidc_login/middleware.py new file mode 100644 index 0000000..6f3147f --- /dev/null +++ b/src/lasuite/oidc_login/middleware.py @@ -0,0 +1,194 @@ +""" +Module to declare a RefreshOIDCAccessToken middleware that extends the +mozilla_django_oidc.middleware.SessionRefresh middleware to refresh the +access token when it expires, based on the OIDC provided refresh token. +This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377 +which is still not merged. +""" + +import json +import logging +import time +from urllib.parse import quote, urlencode + +import requests +from django.http import JsonResponse +from django.urls import reverse +from django.utils.crypto import get_random_string +from mozilla_django_oidc.middleware import SessionRefresh +from rest_framework.status import HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED + +from lasuite.oidc_login.backends import get_oidc_refresh_token, store_tokens + +try: + from mozilla_django_oidc.middleware import ( + RefreshOIDCAccessToken as MozillaRefreshOIDCAccessToken, # noqa F401 + ) + + # If the import is successful, raise an error to notify the user that the + # version of mozilla_django_oidc added the expected middleware, and we don't need + # our implementation anymore. + # See https://github.com/mozilla/mozilla-django-oidc/pull/377 + raise RuntimeError("This version of mozilla_django_oidc has RefreshOIDCAccessToken") +except ImportError: + pass + +from mozilla_django_oidc.utils import ( + absolutify, + add_state_and_verifier_and_nonce_to_session, + import_from_settings, +) + +logger = logging.getLogger(__name__) + + +class RefreshOIDCAccessToken(SessionRefresh): + """ + A middleware that will refresh the access token following proper OIDC protocol: + https://auth0.com/docs/tokens/refresh-token/current + This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377 + but limited to our needs (YAGNI/KISS). + """ + + def _prepare_reauthorization(self, request): + """ + Construct a new authorization grant request to refresh the session. + Besides constructing the request, the state and nonce included in the + request are registered in the current session in preparation for the + client following through with the authorization flow. + """ + auth_url = self.OIDC_OP_AUTHORIZATION_ENDPOINT + client_id = self.OIDC_RP_CLIENT_ID + state = get_random_string(self.OIDC_STATE_SIZE) + + # Build the parameters as if we were doing a real auth handoff, except + # we also include prompt=none. + auth_params = { + "response_type": "code", + "client_id": client_id, + "redirect_uri": absolutify(request, reverse(self.OIDC_AUTHENTICATION_CALLBACK_URL)), + "state": state, + "scope": self.OIDC_RP_SCOPES, + "prompt": "none", + } + + if self.OIDC_USE_NONCE: + nonce = get_random_string(self.OIDC_NONCE_SIZE) + auth_params.update({"nonce": nonce}) + + # Register the one-time parameters in the session + add_state_and_verifier_and_nonce_to_session(request, state, auth_params) + request.session["oidc_login_next"] = request.get_full_path() + + query = urlencode(auth_params, quote_via=quote) + return f"{auth_url}?{query}" + + def is_expired(self, request): + """Check whether the access token is expired and needs to be refreshed.""" + if not self.is_refreshable_url(request): + logger.debug("request is not refreshable") + return False + + expiration = request.session.get("oidc_token_expiration", 0) + now = time.time() + if expiration > now: + # The id_token is still valid, so we don't have to do anything. + logger.debug("id token is still valid (%s > %s)", expiration, now) + return False + + return True + + def finish(self, request, prompt_reauth=True): + """ + Finish request handling and handle sending downstream responses for XHR. + + This function should only be run if the session is determined to + be expired. + Almost all XHR request handling in client-side code struggles + with redirects since redirecting to a page where the user + is supposed to do something is extremely unlikely to work + in an XHR request. Make a special response for these kinds + of requests. + The use of 403 Forbidden is to match the fact that this + middleware doesn't really want the user in if they don't + refresh their session. + WARNING: this varies from the original implementation: + - to return a 401 status code + - to consider all requests as XHR requests + """ + xhr_response_json = {"error": "the authentication session has expired"} + if prompt_reauth: + # The id_token has expired, so we have to re-authenticate silently. + refresh_url = self._prepare_reauthorization(request) + xhr_response_json["refresh_url"] = refresh_url + + xhr_response = JsonResponse(xhr_response_json, status=HTTP_401_UNAUTHORIZED) + if "refresh_url" in xhr_response_json: + xhr_response["refresh_url"] = xhr_response_json["refresh_url"] + return xhr_response + + def process_request(self, request): # noqa: PLR0911 # pylint: disable=too-many-return-statements + """Process the request and refresh the access token if necessary.""" + if not self.is_expired(request): + return None + + token_url = self.get_settings("OIDC_OP_TOKEN_ENDPOINT") + client_id = self.get_settings("OIDC_RP_CLIENT_ID") + client_secret = self.get_settings("OIDC_RP_CLIENT_SECRET") + refresh_token = get_oidc_refresh_token(request.session) + + if not refresh_token: + logger.debug("no refresh token stored") + return self.finish(request, prompt_reauth=True) + + token_payload = { + "grant_type": "refresh_token", + "client_id": client_id, + "client_secret": client_secret, + "refresh_token": refresh_token, + } + + req_auth = None + if self.get_settings("OIDC_TOKEN_USE_BASIC_AUTH", False): + # supported in https://github.com/mozilla/mozilla-django-oidc/pull/377 + # but we don't need it, so enforce error here. + raise RuntimeError("OIDC_TOKEN_USE_BASIC_AUTH is not supported") + + try: + response = requests.post( + token_url, + auth=req_auth, + data=token_payload, + verify=import_from_settings("OIDC_VERIFY_SSL", True), + timeout=import_from_settings("OIDC_TIMEOUT", 3), + ) + response.raise_for_status() + token_info = response.json() + except requests.exceptions.Timeout: + logger.debug("timed out refreshing access token") + # Don't prompt for reauth as this could be a temporary problem + return self.finish(request, prompt_reauth=False) + except requests.exceptions.HTTPError as exc: + status_code = exc.response.status_code + logger.debug("http error %s when refreshing access token", status_code) + # OAuth error response will be a 400 for various situations, including + # an expired token. https://datatracker.ietf.org/doc/html/rfc6749#section-5.2 + return self.finish(request, prompt_reauth=status_code == HTTP_400_BAD_REQUEST) + except json.JSONDecodeError: + logger.debug("malformed response when refreshing access token") + # Don't prompt for reauth as this could be a temporary problem + return self.finish(request, prompt_reauth=False) + except Exception as exc: # pylint: disable=broad-except + logger.exception("unknown error occurred when refreshing access token: %s", exc) + # Don't prompt for reauth as this could be a temporary problem + return self.finish(request, prompt_reauth=False) + + # Until we can properly validate an ID token on the refresh response + # per the spec[1], we intentionally drop the id_token. + # [1]: https://openid.net/specs/openid-connect-core-1_0.html#RefreshTokenResponse + id_token = None + access_token = token_info.get("access_token") + refresh_token = token_info.get("refresh_token") + store_tokens(request.session, access_token, id_token, refresh_token) + + return None diff --git a/src/lasuite/oidc_resource_server/backend.py b/src/lasuite/oidc_resource_server/backend.py index 4f67817..c084d85 100644 --- a/src/lasuite/oidc_resource_server/backend.py +++ b/src/lasuite/oidc_resource_server/backend.py @@ -133,7 +133,7 @@ class ResourceServerBackend: requested_scopes = introspection_response.get("scope", None).split(" ") if set(self._scopes).isdisjoint(set(requested_scopes)): - message = "Introspection response contains any required scopes." + message = "Introspection response is missing required scopes." logger.debug(message) raise SuspiciousOperation(message) @@ -273,7 +273,7 @@ class JWTResourceServerBackend(ResourceServerBackend): try: public_key_set = self._authorization_server_client.import_public_keys() except (TypeError, ValueError, AttributeError, HTTPError) as err: - message = "Could get authorization server JWKS" + message = "Could not get authorization server JWKS" logger.debug("%s. Exception:", message, exc_info=True) raise SuspiciousOperation(message) from err diff --git a/tests/oidc_login/test_backends.py b/tests/oidc_login/test_backends.py index 5d06c1c..a037ffe 100644 --- a/tests/oidc_login/test_backends.py +++ b/tests/oidc_login/test_backends.py @@ -1,10 +1,14 @@ """Unit tests for the Authentication Backends.""" +import re + import pytest +import responses +from cryptography.fernet import Fernet from django.contrib.auth import get_user_model from django.core.exceptions import SuspiciousOperation -from lasuite.oidc_login.backends import OIDCAuthenticationBackend +from lasuite.oidc_login.backends import OIDCAuthenticationBackend, get_oidc_refresh_token, store_oidc_refresh_token from .. import factories @@ -13,6 +17,23 @@ pytestmark = pytest.mark.django_db User = get_user_model() +def test_oidc_refresh_token_session_store(settings): + """Test that the OIDC refresh token is stored and retrieved from the session.""" + settings.OIDC_STORE_REFRESH_TOKEN = True + session = {} + + with pytest.raises(ValueError, match="OIDC_STORE_REFRESH_TOKEN_KEY setting is required."): + store_oidc_refresh_token(session, "test-refresh-token") + + settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key() + + store_oidc_refresh_token(session, "test-refresh-token") + assert session["oidc_refresh_token"] is not None + assert session["oidc_refresh_token"] != "test-refresh-token" + + assert get_oidc_refresh_token(session) == "test-refresh-token" + + def test_authentication_getter_existing_user_no_email(django_assert_num_queries, monkeypatch): """If an existing user matches the user's info sub, the user should be returned.""" klass = OIDCAuthenticationBackend() @@ -259,3 +280,52 @@ def test_authentication_getter_existing_disabled_user_via_email(django_assert_nu klass.get_or_create_user(access_token="test-token", id_token=None, payload=None) assert User.objects.count() == 1 + + +@responses.activate +def test_authentication_session_tokens(django_assert_num_queries, monkeypatch, rf, settings): + """Test the session contains oidc_refresh_token and oidc_access_token after authentication.""" + settings.OIDC_OP_TOKEN_ENDPOINT = "http://oidc.endpoint.test/token" + settings.OIDC_OP_USER_ENDPOINT = "http://oidc.endpoint.test/userinfo" + settings.OIDC_OP_JWKS_ENDPOINT = "http://oidc.endpoint.test/jwks" + settings.OIDC_STORE_ACCESS_TOKEN = True + settings.OIDC_STORE_REFRESH_TOKEN = True + settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key() + + klass = OIDCAuthenticationBackend() + request = rf.get("/some-url", {"state": "test-state", "code": "test-code"}) + request.session = {} + + def verify_token_mocked(*args, **kwargs): + return {"sub": "123", "email": "test@example.com"} + + monkeypatch.setattr(OIDCAuthenticationBackend, "verify_token", verify_token_mocked) + + responses.add( + responses.POST, + re.compile(settings.OIDC_OP_TOKEN_ENDPOINT), + json={ + "access_token": "test-access-token", + "refresh_token": "test-refresh-token", + }, + status=200, + ) + + responses.add( + responses.GET, + re.compile(settings.OIDC_OP_USER_ENDPOINT), + json={"sub": "123", "email": "test@example.com"}, + status=200, + ) + + with django_assert_num_queries(3): + user = klass.authenticate( + request, + code="test-code", + nonce="test-nonce", + code_verifier="test-code-verifier", + ) + + assert user is not None + assert request.session["oidc_access_token"] == "test-access-token" + assert get_oidc_refresh_token(request.session) == "test-refresh-token" diff --git a/tests/oidc_login/test_decorators.py b/tests/oidc_login/test_decorators.py new file mode 100644 index 0000000..5f72b94 --- /dev/null +++ b/tests/oidc_login/test_decorators.py @@ -0,0 +1,51 @@ +"""Tests for the refresh_oidc_access_token decorator in core app.""" + +from unittest.mock import patch + +from django.http import HttpResponse +from django.test import RequestFactory +from django.utils.decorators import method_decorator +from django.views import View + +from lasuite.oidc_login.decorators import refresh_oidc_access_token + + +class RefreshOIDCAccessTokenView(View): + """ + A Django view that uses the refresh_oidc_access_token decorator to refresh + the OIDC access token before processing the request. + """ + + @method_decorator(refresh_oidc_access_token) + def dispatch(self, request, *args, **kwargs): + """Override the dispatch method to apply the refresh_oidc_access_token decorator.""" + return super().dispatch(request, *args, **kwargs) + + def get(self, request, *args, **kwargs): + """ + Handle GET requests. + + Returns: + HttpResponse: A simple HTTP response with "OK" as the content. + + """ + return HttpResponse("OK") + + +def test_refresh_oidc_access_token_decorator(): + """ + Tests the refresh_oidc_access_token decorator is called on RefreshOIDCAccessTokenView access. + The test creates a mock request and patches the dispatch method to verify that it is called + with the correct request object. + """ + # Create a test request + factory = RequestFactory() + request = factory.get("/") + + # Mock the OIDC refresh functionality + with patch("lasuite.oidc_login.middleware.RefreshOIDCAccessToken.process_request") as mock_refresh: + # Call the decorated view + RefreshOIDCAccessTokenView.as_view()(request) + + # Assert that the refresh method was called + mock_refresh.assert_called_once_with(request) diff --git a/tests/oidc_login/test_middleware.py b/tests/oidc_login/test_middleware.py new file mode 100644 index 0000000..5365e3f --- /dev/null +++ b/tests/oidc_login/test_middleware.py @@ -0,0 +1,322 @@ +"""Tests for the RefreshOIDCAccessToken middleware.""" + +import time +from unittest.mock import MagicMock + +import factories +import pytest +import requests.exceptions +import responses +from cryptography.fernet import Fernet +from django.contrib.auth.models import AnonymousUser +from django.contrib.sessions.middleware import SessionMiddleware +from django.http import HttpResponse, JsonResponse +from django.test import RequestFactory + +from lasuite.oidc_login.backends import get_cipher_suite, get_oidc_refresh_token, store_oidc_refresh_token +from lasuite.oidc_login.middleware import RefreshOIDCAccessToken + +pytestmark = pytest.mark.django_db + + +@pytest.fixture(name="oidc_settings") +def fixture_oidc_settings(settings): + """Fixture to configure OIDC settings for the tests.""" + settings.OIDC_OP_TOKEN_ENDPOINT = "https://auth.example.com/token" + settings.OIDC_OP_AUTHORIZATION_ENDPOINT = "https://auth.example.com/authorize" + settings.OIDC_RP_CLIENT_ID = "client_id" + settings.OIDC_RP_CLIENT_SECRET = "client_secret" + settings.OIDC_AUTHENTICATION_CALLBACK_URL = "oidc_authentication_callback" + settings.OIDC_RP_SCOPES = "openid email" + settings.OIDC_USE_NONCE = True + settings.OIDC_STATE_SIZE = 32 + settings.OIDC_NONCE_SIZE = 32 + settings.OIDC_VERIFY_SSL = True + settings.OIDC_TOKEN_USE_BASIC_AUTH = False + settings.OIDC_STORE_ACCESS_TOKEN = True + settings.OIDC_STORE_REFRESH_TOKEN = True + settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key() + + get_cipher_suite.cache_clear() + + yield settings + + get_cipher_suite.cache_clear() + + +def test_anonymous_user(oidc_settings): # pylint: disable=unused-argument + """ + When the user is not authenticated, this + is not the purpose of the middleware to manage anything. + """ + request = RequestFactory().get("/test") + request.user = AnonymousUser() + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert response is None + + +def test_no_refresh_token(oidc_settings): # pylint: disable=unused-argument + """ + When the session does not contain a refresh token, + the middleware should return a 401 response containing + the URL to authenticate again. + """ + user = factories.UserFactory() + + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + + request.session["oidc_access_token"] = "expired_token" + request.session["oidc_token_expiration"] = time.time() - 100 + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert isinstance(response, JsonResponse) + assert response.status_code == 401 + assert response.has_header("refresh_url") + assert response["refresh_url"].startswith("https://auth.example.com/authorize") + + +def test_basic_auth_disabled(oidc_settings): # pylint: disable=unused-argument + """We don't support OIDC_TOKEN_USE_BASIC_AUTH.""" + oidc_settings.OIDC_TOKEN_USE_BASIC_AUTH = True + + user = factories.UserFactory() + + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + middleware = RefreshOIDCAccessToken(get_response) + with pytest.raises(RuntimeError) as excinfo: + middleware.process_request(request) + + assert str(excinfo.value) == "OIDC_TOKEN_USE_BASIC_AUTH is not supported" + + +@responses.activate +def test_successful_token_refresh(oidc_settings): # pylint: disable=unused-argument + """Test that the middleware successfully refreshes the token.""" + user = factories.UserFactory() + + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + responses.add( + responses.POST, + "https://auth.example.com/token", + json={"access_token": "new_token", "refresh_token": "new_refresh_token"}, + status=200, + ) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + request.session.save() + + assert response is None + assert request.session["oidc_access_token"] == "new_token" + assert get_oidc_refresh_token(request.session) == "new_refresh_token" + + +def test_non_expired_token(oidc_settings): # pylint: disable=unused-argument + """Test that the middleware does nothing when the token is not expired.""" + user = factories.UserFactory() + + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + request.session["oidc_access_token"] = ("valid_token",) + request.session["oidc_token_expiration"] = time.time() + 3600 + request.session.save() + + middleware = RefreshOIDCAccessToken(get_response) + + response = middleware.process_request(request) + assert response is None + + +@responses.activate +def test_refresh_token_request_timeout(oidc_settings): # pylint: disable=unused-argument + """Test that the middleware returns a 401 response when the token refresh request times out.""" + user = factories.UserFactory() + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + responses.add( + responses.POST, + "https://auth.example.com/token", + body=requests.exceptions.Timeout("timeout"), + ) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert isinstance(response, HttpResponse) + assert response.status_code == 401 + assert not response.has_header("refresh_url") + + +@responses.activate +def test_refresh_token_request_error_400(oidc_settings): # pylint: disable=unused-argument + """ + Test that the middleware returns a 401 response when the token + refresh request returns a 400 error. + """ + user = factories.UserFactory() + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + responses.add( + responses.POST, + "https://auth.example.com/token", + json={"error": "invalid_grant"}, + status=400, + ) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert isinstance(response, HttpResponse) + assert response.status_code == 401 + assert response.has_header("refresh_url") + assert response["refresh_url"].startswith("https://auth.example.com/authorize") + + +@responses.activate +def test_refresh_token_request_error(oidc_settings): # pylint: disable=unused-argument + """ + Test that the middleware returns a 401 response when + the token refresh request returns a 404 error. + """ + user = factories.UserFactory() + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + responses.add( + responses.POST, + "https://auth.example.com/token", + json={"error": "invalid_grant"}, + status=404, + ) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert isinstance(response, HttpResponse) + assert response.status_code == 401 + assert not response.has_header("refresh_url") + + +@responses.activate +def test_refresh_token_request_malformed_json_error(oidc_settings): # pylint: disable=unused-argument + """ + Test that the middleware returns a 401 response + when the token refresh request returns malformed JSON. + """ + user = factories.UserFactory() + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + responses.add( + responses.POST, + "https://auth.example.com/token", + body="malformed json", + status=200, + ) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert isinstance(response, HttpResponse) + assert response.status_code == 401 + assert not response.has_header("refresh_url") + + +@responses.activate +def test_refresh_token_request_exception(oidc_settings): # pylint: disable=unused-argument + """ + Test that the middleware returns a 401 response + when the token refresh request raises an exception. + """ + user = factories.UserFactory() + request = RequestFactory().get("/test") + request.user = user + + get_response = MagicMock() + session_middleware = SessionMiddleware(get_response) + session_middleware.process_request(request) + request.session["oidc_access_token"] = "old_token" + store_oidc_refresh_token(request.session, "refresh_token") + request.session["oidc_token_expiration"] = time.time() - 100 + request.session.save() + + responses.add( + responses.POST, + "https://auth.example.com/token", + body={"error": "invalid_grant"}, # invalid format dict + status=200, + ) + + middleware = RefreshOIDCAccessToken(get_response) + response = middleware.process_request(request) + assert isinstance(response, HttpResponse) + assert response.status_code == 401 + assert not response.has_header("refresh_url") diff --git a/tests/oidc_resource_server/test_backend.py b/tests/oidc_resource_server/test_backend.py index f6f071a..45c6e46 100644 --- a/tests/oidc_resource_server/test_backend.py +++ b/tests/oidc_resource_server/test_backend.py @@ -280,7 +280,7 @@ def test_jwt_resource_server_backend_introspect_public_key_import_failure( ) with patch.object(Logger, "debug") as mock_logger_debug: - expected_message = "Could get authorization server JWKS" + expected_message = "Could not get authorization server JWKS" with pytest.raises(SuspiciousOperation, match=expected_message): jwt_resource_server_backend._introspect(token) @@ -317,7 +317,7 @@ def test_verify_user_info_wrong_scopes(resource_server_backend): """Test '_verify_user_info' with wrong requested scopes.""" introspection_response = {"active": True, "scope": "wrong-scopes"} - expected_message = "Introspection response contains any required scopes." + expected_message = "Introspection response is missing required scopes." with patch.object(Logger, "debug") as mock_logger_debug: with pytest.raises(SuspiciousOperation, match=expected_message): resource_server_backend._verify_user_info(introspection_response) diff --git a/tests/test_project/settings.py b/tests/test_project/settings.py index c9189db..ff3ffea 100644 --- a/tests/test_project/settings.py +++ b/tests/test_project/settings.py @@ -110,6 +110,7 @@ ALLOW_LOGOUT_GET_METHOD = True OIDC_OP_TOKEN_ENDPOINT = None OIDC_OP_USER_ENDPOINT = None OIDC_OP_LOGOUT_ENDPOINT = None +OIDC_OP_AUTHORIZATION_ENDPOINT = None OIDC_RP_CLIENT_ID = "lasuite" OIDC_RP_CLIENT_SECRET = "lasuite" USER_OIDC_FIELDS_TO_FULLNAME = ["first_name", "last_name"]