mirror of
https://github.com/suitenumerique/docs.git
synced 2026-04-25 17:15:01 +02:00
Compare commits
3 Commits
v4.8.5
...
qbey/oidc-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f30c4ff4b3 | ||
|
|
589346acba | ||
|
|
ea8b8be5f0 |
@@ -8,6 +8,10 @@ and this project adheres to
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## Added
|
||||
|
||||
- ✨(oidc) add refresh token tools #584
|
||||
|
||||
## [2.5.0] - 2025-03-18
|
||||
|
||||
## Added
|
||||
|
||||
@@ -4,3 +4,6 @@ BURST_THROTTLE_RATES="200/minute"
|
||||
DJANGO_SERVER_TO_SERVER_API_TOKENS=test-e2e
|
||||
Y_PROVIDER_API_KEY=yprovider-api-key
|
||||
Y_PROVIDER_API_BASE_URL=http://y-provider:4444/api/
|
||||
|
||||
# - add a key to store the refresh token in tests
|
||||
OIDC_STORE_REFRESH_TOKEN_KEY=qnw7gZrOFLkLuZIixzuxksNORFJyjWyi5ACugNchKJY=
|
||||
|
||||
@@ -1,21 +1,59 @@
|
||||
"""Authentication Backends for the Impress core app."""
|
||||
|
||||
import logging
|
||||
from functools import lru_cache
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import SuspiciousOperation
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
import requests
|
||||
from cryptography.fernet import Fernet
|
||||
from mozilla_django_oidc.auth import (
|
||||
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
|
||||
)
|
||||
from mozilla_django_oidc.utils import import_from_settings
|
||||
|
||||
from core.models import DuplicateEmailError, User
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@lru_cache(maxsize=0)
|
||||
def get_cipher_suite():
|
||||
"""Return a Fernet cipher suite."""
|
||||
key = import_from_settings("OIDC_STORE_REFRESH_TOKEN_KEY", None)
|
||||
if not key:
|
||||
raise ValueError("OIDC_STORE_REFRESH_TOKEN_KEY setting is required.")
|
||||
return Fernet(key)
|
||||
|
||||
|
||||
def store_oidc_refresh_token(session, refresh_token):
|
||||
"""Store the encrypted OIDC refresh token in the session if enabled in settings."""
|
||||
if import_from_settings("OIDC_STORE_REFRESH_TOKEN", False):
|
||||
encrypted_token = get_cipher_suite().encrypt(refresh_token.encode())
|
||||
session["oidc_refresh_token"] = encrypted_token.decode()
|
||||
|
||||
|
||||
def get_oidc_refresh_token(session):
|
||||
"""Retrieve and decrypt the OIDC refresh token from the session."""
|
||||
encrypted_token = session.get("oidc_refresh_token")
|
||||
if encrypted_token:
|
||||
return get_cipher_suite().decrypt(encrypted_token.encode()).decode()
|
||||
return None
|
||||
|
||||
|
||||
def store_tokens(session, access_token, id_token, refresh_token):
|
||||
"""Store tokens in the session if enabled in settings."""
|
||||
if import_from_settings("OIDC_STORE_ACCESS_TOKEN", False):
|
||||
session["oidc_access_token"] = access_token
|
||||
|
||||
if import_from_settings("OIDC_STORE_ID_TOKEN", False):
|
||||
session["oidc_id_token"] = id_token
|
||||
|
||||
store_oidc_refresh_token(session, refresh_token)
|
||||
|
||||
|
||||
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
||||
"""Custom OpenID Connect (OIDC) Authentication Backend.
|
||||
|
||||
@@ -23,6 +61,40 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
||||
in the User and Identity models, and handles signed and/or encrypted UserInfo response.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""
|
||||
Initialize the OIDC Authentication Backend.
|
||||
|
||||
Adds an internal attribute to store the token_info dictionary.
|
||||
The purpose of `self._token_info` is to not duplicate code from
|
||||
the original `authenticate` method.
|
||||
This won't be needed after https://github.com/mozilla/mozilla-django-oidc/pull/377
|
||||
is merged.
|
||||
"""
|
||||
super().__init__(*args, **kwargs)
|
||||
self._token_info = None
|
||||
|
||||
def get_token(self, payload):
|
||||
"""
|
||||
Return token object as a dictionary.
|
||||
|
||||
Store the value to extract the refresh token in the `authenticate` method.
|
||||
"""
|
||||
self._token_info = super().get_token(payload)
|
||||
return self._token_info
|
||||
|
||||
def authenticate(self, request, **kwargs):
|
||||
"""Authenticates a user based on the OIDC code flow."""
|
||||
user = super().authenticate(request, **kwargs)
|
||||
|
||||
if user is not None:
|
||||
# Then the user successfully authenticated
|
||||
store_oidc_refresh_token(
|
||||
request.session, self._token_info.get("refresh_token")
|
||||
)
|
||||
|
||||
return user
|
||||
|
||||
def get_userinfo(self, access_token, id_token, payload):
|
||||
"""Return user details dictionary.
|
||||
|
||||
|
||||
12
src/backend/core/authentication/decorators.py
Normal file
12
src/backend/core/authentication/decorators.py
Normal file
@@ -0,0 +1,12 @@
|
||||
"""
|
||||
Decorators for the authentication app.
|
||||
|
||||
We don't want (yet) to enforce the OIDC access token to be "fresh" for all
|
||||
views, so we provide a decorator to refresh the access token only when needed.
|
||||
"""
|
||||
|
||||
from django.utils.decorators import decorator_from_middleware
|
||||
|
||||
from .middleware import RefreshOIDCAccessToken
|
||||
|
||||
refresh_oidc_access_token = decorator_from_middleware(RefreshOIDCAccessToken)
|
||||
199
src/backend/core/authentication/middleware.py
Normal file
199
src/backend/core/authentication/middleware.py
Normal file
@@ -0,0 +1,199 @@
|
||||
"""
|
||||
Module to declare a RefreshOIDCAccessToken middleware that extends the
|
||||
mozilla_django_oidc.middleware.SessionRefresh middleware to refresh the
|
||||
access token when it expires, based on the OIDC provided refresh token.
|
||||
|
||||
This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377
|
||||
which is still not merged.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from urllib.parse import quote, urlencode
|
||||
|
||||
from django.http import JsonResponse
|
||||
from django.urls import reverse
|
||||
from django.utils.crypto import get_random_string
|
||||
|
||||
import requests
|
||||
from mozilla_django_oidc.middleware import SessionRefresh
|
||||
|
||||
try:
|
||||
from mozilla_django_oidc.middleware import ( # pylint: disable=unused-import
|
||||
RefreshOIDCAccessToken as MozillaRefreshOIDCAccessToken,
|
||||
)
|
||||
|
||||
# If the import is successful, raise an error to notify the user that the
|
||||
# version of mozilla_django_oidc added the expected middleware, and we don't need
|
||||
# our implementation anymore.
|
||||
# See https://github.com/mozilla/mozilla-django-oidc/pull/377
|
||||
raise RuntimeError("This version of mozilla_django_oidc has RefreshOIDCAccessToken")
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
from mozilla_django_oidc.utils import (
|
||||
absolutify,
|
||||
add_state_and_verifier_and_nonce_to_session,
|
||||
import_from_settings,
|
||||
)
|
||||
|
||||
from core.authentication.backends import get_oidc_refresh_token, store_tokens
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RefreshOIDCAccessToken(SessionRefresh):
|
||||
"""
|
||||
A middleware that will refresh the access token following proper OIDC protocol:
|
||||
https://auth0.com/docs/tokens/refresh-token/current
|
||||
|
||||
This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377
|
||||
but limited to our needs (YAGNI/KISS).
|
||||
"""
|
||||
|
||||
def _prepare_reauthorization(self, request):
|
||||
"""
|
||||
Constructs a new authorization grant request to refresh the session.
|
||||
Besides constructing the request, the state and nonce included in the
|
||||
request are registered in the current session in preparation for the
|
||||
client following through with the authorization flow.
|
||||
"""
|
||||
auth_url = self.OIDC_OP_AUTHORIZATION_ENDPOINT
|
||||
client_id = self.OIDC_RP_CLIENT_ID
|
||||
state = get_random_string(self.OIDC_STATE_SIZE)
|
||||
|
||||
# Build the parameters as if we were doing a real auth handoff, except
|
||||
# we also include prompt=none.
|
||||
auth_params = {
|
||||
"response_type": "code",
|
||||
"client_id": client_id,
|
||||
"redirect_uri": absolutify(
|
||||
request, reverse(self.OIDC_AUTHENTICATION_CALLBACK_URL)
|
||||
),
|
||||
"state": state,
|
||||
"scope": self.OIDC_RP_SCOPES,
|
||||
"prompt": "none",
|
||||
}
|
||||
|
||||
if self.OIDC_USE_NONCE:
|
||||
nonce = get_random_string(self.OIDC_NONCE_SIZE)
|
||||
auth_params.update({"nonce": nonce})
|
||||
|
||||
# Register the one-time parameters in the session
|
||||
add_state_and_verifier_and_nonce_to_session(request, state, auth_params)
|
||||
request.session["oidc_login_next"] = request.get_full_path()
|
||||
|
||||
query = urlencode(auth_params, quote_via=quote)
|
||||
return f"{auth_url}?{query}"
|
||||
|
||||
def is_expired(self, request):
|
||||
"""Check whether the access token is expired and needs to be refreshed."""
|
||||
if not self.is_refreshable_url(request):
|
||||
logger.debug("request is not refreshable")
|
||||
return False
|
||||
|
||||
expiration = request.session.get("oidc_token_expiration", 0)
|
||||
now = time.time()
|
||||
if expiration > now:
|
||||
# The id_token is still valid, so we don't have to do anything.
|
||||
logger.debug("id token is still valid (%s > %s)", expiration, now)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def finish(self, request, prompt_reauth=True):
|
||||
"""Finish request handling and handle sending downstream responses for XHR.
|
||||
This function should only be run if the session is determind to
|
||||
be expired.
|
||||
Almost all XHR request handling in client-side code struggles
|
||||
with redirects since redirecting to a page where the user
|
||||
is supposed to do something is extremely unlikely to work
|
||||
in an XHR request. Make a special response for these kinds
|
||||
of requests.
|
||||
The use of 403 Forbidden is to match the fact that this
|
||||
middleware doesn't really want the user in if they don't
|
||||
refresh their session.
|
||||
|
||||
WARNING: this varies from the original implementation:
|
||||
- to return a 401 status code
|
||||
- to consider all requests as XHR requests
|
||||
"""
|
||||
xhr_response_json = {"error": "the authentication session has expired"}
|
||||
if prompt_reauth:
|
||||
# The id_token has expired, so we have to re-authenticate silently.
|
||||
refresh_url = self._prepare_reauthorization(request)
|
||||
xhr_response_json["refresh_url"] = refresh_url
|
||||
|
||||
xhr_response = JsonResponse(xhr_response_json, status=401)
|
||||
if "refresh_url" in xhr_response_json:
|
||||
xhr_response["refresh_url"] = xhr_response_json["refresh_url"]
|
||||
return xhr_response
|
||||
|
||||
def process_request(self, request): # noqa: PLR0911 # pylint: disable=too-many-return-statements
|
||||
"""Process the request and refresh the access token if necessary."""
|
||||
if not self.is_expired(request):
|
||||
return None
|
||||
|
||||
token_url = self.get_settings("OIDC_OP_TOKEN_ENDPOINT")
|
||||
client_id = self.get_settings("OIDC_RP_CLIENT_ID")
|
||||
client_secret = self.get_settings("OIDC_RP_CLIENT_SECRET")
|
||||
refresh_token = get_oidc_refresh_token(request.session)
|
||||
|
||||
if not refresh_token:
|
||||
logger.debug("no refresh token stored")
|
||||
return self.finish(request, prompt_reauth=True)
|
||||
|
||||
token_payload = {
|
||||
"grant_type": "refresh_token",
|
||||
"client_id": client_id,
|
||||
"client_secret": client_secret,
|
||||
"refresh_token": refresh_token,
|
||||
}
|
||||
|
||||
req_auth = None
|
||||
if self.get_settings("OIDC_TOKEN_USE_BASIC_AUTH", False):
|
||||
# supported in https://github.com/mozilla/mozilla-django-oidc/pull/377
|
||||
# but we don't need it, so enforce error here.
|
||||
raise RuntimeError("OIDC_TOKEN_USE_BASIC_AUTH is not supported")
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
token_url,
|
||||
auth=req_auth,
|
||||
data=token_payload,
|
||||
verify=import_from_settings("OIDC_VERIFY_SSL", True),
|
||||
timeout=import_from_settings("OIDC_TIMEOUT", 3),
|
||||
)
|
||||
response.raise_for_status()
|
||||
token_info = response.json()
|
||||
except requests.exceptions.Timeout:
|
||||
logger.debug("timed out refreshing access token")
|
||||
# Don't prompt for reauth as this could be a temporary problem
|
||||
return self.finish(request, prompt_reauth=False)
|
||||
except requests.exceptions.HTTPError as exc:
|
||||
status_code = exc.response.status_code
|
||||
logger.debug("http error %s when refreshing access token", status_code)
|
||||
# OAuth error response will be a 400 for various situations, including
|
||||
# an expired token. https://datatracker.ietf.org/doc/html/rfc6749#section-5.2
|
||||
return self.finish(request, prompt_reauth=status_code == 400)
|
||||
except json.JSONDecodeError:
|
||||
logger.debug("malformed response when refreshing access token")
|
||||
# Don't prompt for reauth as this could be a temporary problem
|
||||
return self.finish(request, prompt_reauth=False)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
logger.exception(
|
||||
"unknown error occurred when refreshing access token: %s", exc
|
||||
)
|
||||
# Don't prompt for reauth as this could be a temporary problem
|
||||
return self.finish(request, prompt_reauth=False)
|
||||
|
||||
# Until we can properly validate an ID token on the refresh response
|
||||
# per the spec[1], we intentionally drop the id_token.
|
||||
# [1]: https://openid.net/specs/openid-connect-core-1_0.html#RefreshTokenResponse
|
||||
id_token = None
|
||||
access_token = token_info.get("access_token")
|
||||
refresh_token = token_info.get("refresh_token")
|
||||
store_tokens(request.session, access_token, id_token, refresh_token)
|
||||
|
||||
return None
|
||||
@@ -10,14 +10,37 @@ from django.test.utils import override_settings
|
||||
|
||||
import pytest
|
||||
import responses
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
from core import models
|
||||
from core.authentication.backends import OIDCAuthenticationBackend
|
||||
from core.authentication.backends import (
|
||||
OIDCAuthenticationBackend,
|
||||
get_oidc_refresh_token,
|
||||
store_oidc_refresh_token,
|
||||
)
|
||||
from core.factories import UserFactory
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
def test_oidc_refresh_token_session_store(settings):
|
||||
"""Test that the OIDC refresh token is stored and retrieved from the session."""
|
||||
session = {}
|
||||
|
||||
with pytest.raises(
|
||||
ValueError, match="OIDC_STORE_REFRESH_TOKEN_KEY setting is required."
|
||||
):
|
||||
store_oidc_refresh_token(session, "test-refresh-token")
|
||||
|
||||
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
|
||||
|
||||
store_oidc_refresh_token(session, "test-refresh-token")
|
||||
assert session["oidc_refresh_token"] is not None
|
||||
assert session["oidc_refresh_token"] != "test-refresh-token"
|
||||
|
||||
assert get_oidc_refresh_token(session) == "test-refresh-token"
|
||||
|
||||
|
||||
def test_authentication_getter_existing_user_no_email(
|
||||
django_assert_num_queries, monkeypatch
|
||||
):
|
||||
@@ -547,3 +570,56 @@ def test_authentication_verify_claims_success(django_assert_num_queries, monkeyp
|
||||
assert user.full_name == "Doe"
|
||||
assert user.short_name is None
|
||||
assert user.email == "john.doe@example.com"
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_authentication_session_tokens(
|
||||
django_assert_num_queries, monkeypatch, rf, settings
|
||||
):
|
||||
"""
|
||||
Test that the session contains oidc_refresh_token and oidc_access_token after authentication.
|
||||
"""
|
||||
settings.OIDC_OP_TOKEN_ENDPOINT = "http://oidc.endpoint.test/token"
|
||||
settings.OIDC_OP_USER_ENDPOINT = "http://oidc.endpoint.test/userinfo"
|
||||
settings.OIDC_OP_JWKS_ENDPOINT = "http://oidc.endpoint.test/jwks"
|
||||
settings.OIDC_STORE_ACCESS_TOKEN = True
|
||||
settings.OIDC_STORE_REFRESH_TOKEN = True
|
||||
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
|
||||
|
||||
klass = OIDCAuthenticationBackend()
|
||||
request = rf.get("/some-url", {"state": "test-state", "code": "test-code"})
|
||||
request.session = {}
|
||||
|
||||
def verify_token_mocked(*args, **kwargs):
|
||||
return {"sub": "123", "email": "test@example.com"}
|
||||
|
||||
monkeypatch.setattr(OIDCAuthenticationBackend, "verify_token", verify_token_mocked)
|
||||
|
||||
responses.add(
|
||||
responses.POST,
|
||||
re.compile(settings.OIDC_OP_TOKEN_ENDPOINT),
|
||||
json={
|
||||
"access_token": "test-access-token",
|
||||
"refresh_token": "test-refresh-token",
|
||||
},
|
||||
status=200,
|
||||
)
|
||||
|
||||
responses.add(
|
||||
responses.GET,
|
||||
re.compile(settings.OIDC_OP_USER_ENDPOINT),
|
||||
json={"sub": "123", "email": "test@example.com"},
|
||||
status=200,
|
||||
)
|
||||
|
||||
with django_assert_num_queries(6):
|
||||
user = klass.authenticate(
|
||||
request,
|
||||
code="test-code",
|
||||
nonce="test-nonce",
|
||||
code_verifier="test-code-verifier",
|
||||
)
|
||||
|
||||
assert user is not None
|
||||
assert request.session["oidc_access_token"] == "test-access-token"
|
||||
assert get_oidc_refresh_token(request.session) == "test-refresh-token"
|
||||
|
||||
55
src/backend/core/tests/authentication/test_decorators.py
Normal file
55
src/backend/core/tests/authentication/test_decorators.py
Normal file
@@ -0,0 +1,55 @@
|
||||
"""Tests for the refresh_oidc_access_token decorator in core app."""
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.http import HttpResponse
|
||||
from django.test import RequestFactory
|
||||
from django.utils.decorators import method_decorator
|
||||
from django.views import View
|
||||
|
||||
from core.authentication.decorators import refresh_oidc_access_token
|
||||
|
||||
|
||||
class RefreshOIDCAccessTokenView(View):
|
||||
"""
|
||||
A Django view that uses the refresh_oidc_access_token decorator to refresh
|
||||
the OIDC access token before processing the request.
|
||||
"""
|
||||
|
||||
@method_decorator(refresh_oidc_access_token)
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
"""
|
||||
Overrides the dispatch method to apply the refresh_oidc_access_token decorator.
|
||||
"""
|
||||
return super().dispatch(request, *args, **kwargs)
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
"""
|
||||
Handles GET requests.
|
||||
|
||||
Returns:
|
||||
HttpResponse: A simple HTTP response with "OK" as the content.
|
||||
"""
|
||||
return HttpResponse("OK")
|
||||
|
||||
|
||||
def test_refresh_oidc_access_token_decorator():
|
||||
"""
|
||||
Tests the refresh_oidc_access_token decorator is called on RefreshOIDCAccessTokenView access.
|
||||
|
||||
The test creates a mock request and patches the dispatch method to verify that it is called
|
||||
with the correct request object.
|
||||
"""
|
||||
# Create a test request
|
||||
factory = RequestFactory()
|
||||
request = factory.get("/")
|
||||
|
||||
# Mock the OIDC refresh functionality
|
||||
with patch(
|
||||
"core.authentication.middleware.RefreshOIDCAccessToken.process_request"
|
||||
) as mock_refresh:
|
||||
# Call the decorated view
|
||||
RefreshOIDCAccessTokenView.as_view()(request)
|
||||
|
||||
# Assert that the refresh method was called
|
||||
mock_refresh.assert_called_once_with(request)
|
||||
327
src/backend/core/tests/authentication/test_middleware.py
Normal file
327
src/backend/core/tests/authentication/test_middleware.py
Normal file
@@ -0,0 +1,327 @@
|
||||
"""Tests for the RefreshOIDCAccessToken middleware."""
|
||||
|
||||
import time
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
from django.contrib.sessions.middleware import SessionMiddleware
|
||||
from django.http import HttpResponse, JsonResponse
|
||||
from django.test import RequestFactory
|
||||
|
||||
import pytest
|
||||
import requests.exceptions
|
||||
import responses
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
from core import factories
|
||||
from core.authentication.backends import (
|
||||
get_cipher_suite,
|
||||
get_oidc_refresh_token,
|
||||
store_oidc_refresh_token,
|
||||
)
|
||||
from core.authentication.middleware import RefreshOIDCAccessToken
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
@pytest.fixture(name="oidc_settings")
|
||||
def fixture_oidc_settings(settings):
|
||||
"""Fixture to configure OIDC settings for the tests."""
|
||||
settings.OIDC_OP_TOKEN_ENDPOINT = "https://auth.example.com/token"
|
||||
settings.OIDC_OP_AUTHORIZATION_ENDPOINT = "https://auth.example.com/authorize"
|
||||
settings.OIDC_RP_CLIENT_ID = "client_id"
|
||||
settings.OIDC_RP_CLIENT_SECRET = "client_secret"
|
||||
settings.OIDC_AUTHENTICATION_CALLBACK_URL = "oidc_authentication_callback"
|
||||
settings.OIDC_RP_SCOPES = "openid email"
|
||||
settings.OIDC_USE_NONCE = True
|
||||
settings.OIDC_STATE_SIZE = 32
|
||||
settings.OIDC_NONCE_SIZE = 32
|
||||
settings.OIDC_VERIFY_SSL = True
|
||||
settings.OIDC_TOKEN_USE_BASIC_AUTH = False
|
||||
settings.OIDC_STORE_ACCESS_TOKEN = True
|
||||
settings.OIDC_STORE_REFRESH_TOKEN = True
|
||||
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
|
||||
|
||||
get_cipher_suite.cache_clear()
|
||||
|
||||
yield settings
|
||||
|
||||
get_cipher_suite.cache_clear()
|
||||
|
||||
|
||||
def test_anonymous_user(oidc_settings): # pylint: disable=unused-argument
|
||||
"""
|
||||
When the user is not authenticated, this
|
||||
is not the purpose of the middleware to manage anything.
|
||||
"""
|
||||
request = RequestFactory().get("/test")
|
||||
request.user = AnonymousUser()
|
||||
|
||||
get_response = MagicMock()
|
||||
session_middleware = SessionMiddleware(get_response)
|
||||
session_middleware.process_request(request)
|
||||
|
||||
middleware = RefreshOIDCAccessToken(get_response)
|
||||
response = middleware.process_request(request)
|
||||
assert response is None
|
||||
|
||||
|
||||
def test_no_refresh_token(oidc_settings): # pylint: disable=unused-argument
|
||||
"""
|
||||
When the session does not contain a refresh token,
|
||||
the middleware should return a 401 response containing
|
||||
the URL to authenticate again.
|
||||
"""
|
||||
user = factories.UserFactory()
|
||||
|
||||
request = RequestFactory().get("/test")
|
||||
request.user = user
|
||||
|
||||
get_response = MagicMock()
|
||||
session_middleware = SessionMiddleware(get_response)
|
||||
session_middleware.process_request(request)
|
||||
|
||||
request.session["oidc_access_token"] = ("expired_token",)
|
||||
request.session["oidc_token_expiration"] = time.time() - 100
|
||||
|
||||
middleware = RefreshOIDCAccessToken(get_response)
|
||||
response = middleware.process_request(request)
|
||||
assert isinstance(response, JsonResponse)
|
||||
assert response.status_code == 401
|
||||
assert response.has_header("refresh_url")
|
||||
assert response["refresh_url"].startswith("https://auth.example.com/authorize")
|
||||
|
||||
|
||||
def test_basic_auth_disabled(oidc_settings): # pylint: disable=unused-argument
|
||||
"""We don't support OIDC_TOKEN_USE_BASIC_AUTH"""
|
||||
oidc_settings.OIDC_TOKEN_USE_BASIC_AUTH = True
|
||||
|
||||
user = factories.UserFactory()
|
||||
|
||||
request = RequestFactory().get("/test")
|
||||
request.user = user
|
||||
|
||||
get_response = MagicMock()
|
||||
session_middleware = SessionMiddleware(get_response)
|
||||
session_middleware.process_request(request)
|
||||
|
||||
request.session["oidc_access_token"] = "old_token"
|
||||
store_oidc_refresh_token(request.session, "refresh_token")
|
||||
request.session["oidc_token_expiration"] = time.time() - 100
|
||||
request.session.save()
|
||||
|
||||
middleware = RefreshOIDCAccessToken(get_response)
|
||||
with pytest.raises(RuntimeError) as excinfo:
|
||||
middleware.process_request(request)
|
||||
|
||||
assert str(excinfo.value) == "OIDC_TOKEN_USE_BASIC_AUTH is not supported"
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_successful_token_refresh(oidc_settings): # pylint: disable=unused-argument
|
||||
"""Test that the middleware successfully refreshes the token."""
|
||||
user = factories.UserFactory()
|
||||
|
||||
request = RequestFactory().get("/test")
|
||||
request.user = user
|
||||
|
||||
get_response = MagicMock()
|
||||
session_middleware = SessionMiddleware(get_response)
|
||||
session_middleware.process_request(request)
|
||||
|
||||
request.session["oidc_access_token"] = "old_token"
|
||||
store_oidc_refresh_token(request.session, "refresh_token")
|
||||
request.session["oidc_token_expiration"] = time.time() - 100
|
||||
request.session.save()
|
||||
|
||||
responses.add(
|
||||
responses.POST,
|
||||
"https://auth.example.com/token",
|
||||
json={"access_token": "new_token", "refresh_token": "new_refresh_token"},
|
||||
status=200,
|
||||
)
|
||||
|
||||
middleware = RefreshOIDCAccessToken(get_response)
|
||||
response = middleware.process_request(request)
|
||||
request.session.save()
|
||||
|
||||
assert response is None
|
||||
assert request.session["oidc_access_token"] == "new_token"
|
||||
assert get_oidc_refresh_token(request.session) == "new_refresh_token"
|
||||
|
||||
|
||||
def test_non_expired_token(oidc_settings): # pylint: disable=unused-argument
|
||||
"""Test that the middleware does nothing when the token is not expired."""
|
||||
user = factories.UserFactory()
|
||||
|
||||
request = RequestFactory().get("/test")
|
||||
request.user = user
|
||||
|
||||
get_response = MagicMock()
|
||||
session_middleware = SessionMiddleware(get_response)
|
||||
session_middleware.process_request(request)
|
||||
request.session["oidc_access_token"] = ("valid_token",)
|
||||
request.session["oidc_token_expiration"] = time.time() + 3600
|
||||
request.session.save()
|
||||
|
||||
middleware = RefreshOIDCAccessToken(get_response)
|
||||
|
||||
response = middleware.process_request(request)
|
||||
assert response is None
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_refresh_token_request_timeout(oidc_settings): # pylint: disable=unused-argument
|
||||
"""Test that the middleware returns a 401 response when the token refresh request times out."""
|
||||
user = factories.UserFactory()
|
||||
request = RequestFactory().get("/test")
|
||||
request.user = user
|
||||
|
||||
get_response = MagicMock()
|
||||
session_middleware = SessionMiddleware(get_response)
|
||||
session_middleware.process_request(request)
|
||||
request.session["oidc_access_token"] = "old_token"
|
||||
store_oidc_refresh_token(request.session, "refresh_token")
|
||||
request.session["oidc_token_expiration"] = time.time() - 100
|
||||
request.session.save()
|
||||
|
||||
responses.add(
|
||||
responses.POST,
|
||||
"https://auth.example.com/token",
|
||||
body=requests.exceptions.Timeout("timeout"),
|
||||
)
|
||||
|
||||
middleware = RefreshOIDCAccessToken(get_response)
|
||||
response = middleware.process_request(request)
|
||||
assert isinstance(response, HttpResponse)
|
||||
assert response.status_code == 401
|
||||
assert not response.has_header("refresh_url")
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_refresh_token_request_error_400(oidc_settings): # pylint: disable=unused-argument
|
||||
"""
|
||||
Test that the middleware returns a 401 response when the token
|
||||
refresh request returns a 400 error.
|
||||
"""
|
||||
user = factories.UserFactory()
|
||||
request = RequestFactory().get("/test")
|
||||
request.user = user
|
||||
|
||||
get_response = MagicMock()
|
||||
session_middleware = SessionMiddleware(get_response)
|
||||
session_middleware.process_request(request)
|
||||
request.session["oidc_access_token"] = "old_token"
|
||||
store_oidc_refresh_token(request.session, "refresh_token")
|
||||
request.session["oidc_token_expiration"] = time.time() - 100
|
||||
request.session.save()
|
||||
|
||||
responses.add(
|
||||
responses.POST,
|
||||
"https://auth.example.com/token",
|
||||
json={"error": "invalid_grant"},
|
||||
status=400,
|
||||
)
|
||||
|
||||
middleware = RefreshOIDCAccessToken(get_response)
|
||||
response = middleware.process_request(request)
|
||||
assert isinstance(response, HttpResponse)
|
||||
assert response.status_code == 401
|
||||
assert response.has_header("refresh_url")
|
||||
assert response["refresh_url"].startswith("https://auth.example.com/authorize")
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_refresh_token_request_error(oidc_settings): # pylint: disable=unused-argument
|
||||
"""
|
||||
Test that the middleware returns a 401 response when
|
||||
the token refresh request returns a 404 error.
|
||||
"""
|
||||
user = factories.UserFactory()
|
||||
request = RequestFactory().get("/test")
|
||||
request.user = user
|
||||
|
||||
get_response = MagicMock()
|
||||
session_middleware = SessionMiddleware(get_response)
|
||||
session_middleware.process_request(request)
|
||||
request.session["oidc_access_token"] = "old_token"
|
||||
store_oidc_refresh_token(request.session, "refresh_token")
|
||||
request.session["oidc_token_expiration"] = time.time() - 100
|
||||
request.session.save()
|
||||
|
||||
responses.add(
|
||||
responses.POST,
|
||||
"https://auth.example.com/token",
|
||||
json={"error": "invalid_grant"},
|
||||
status=404,
|
||||
)
|
||||
|
||||
middleware = RefreshOIDCAccessToken(get_response)
|
||||
response = middleware.process_request(request)
|
||||
assert isinstance(response, HttpResponse)
|
||||
assert response.status_code == 401
|
||||
assert not response.has_header("refresh_url")
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_refresh_token_request_malformed_json_error(oidc_settings): # pylint: disable=unused-argument
|
||||
"""
|
||||
Test that the middleware returns a 401 response
|
||||
when the token refresh request returns malformed JSON.
|
||||
"""
|
||||
user = factories.UserFactory()
|
||||
request = RequestFactory().get("/test")
|
||||
request.user = user
|
||||
|
||||
get_response = MagicMock()
|
||||
session_middleware = SessionMiddleware(get_response)
|
||||
session_middleware.process_request(request)
|
||||
request.session["oidc_access_token"] = "old_token"
|
||||
store_oidc_refresh_token(request.session, "refresh_token")
|
||||
request.session["oidc_token_expiration"] = time.time() - 100
|
||||
request.session.save()
|
||||
|
||||
responses.add(
|
||||
responses.POST,
|
||||
"https://auth.example.com/token",
|
||||
body="malformed json",
|
||||
status=200,
|
||||
)
|
||||
|
||||
middleware = RefreshOIDCAccessToken(get_response)
|
||||
response = middleware.process_request(request)
|
||||
assert isinstance(response, HttpResponse)
|
||||
assert response.status_code == 401
|
||||
assert not response.has_header("refresh_url")
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_refresh_token_request_exception(oidc_settings): # pylint: disable=unused-argument
|
||||
"""
|
||||
Test that the middleware returns a 401 response
|
||||
when the token refresh request raises an exception.
|
||||
"""
|
||||
user = factories.UserFactory()
|
||||
request = RequestFactory().get("/test")
|
||||
request.user = user
|
||||
|
||||
get_response = MagicMock()
|
||||
session_middleware = SessionMiddleware(get_response)
|
||||
session_middleware.process_request(request)
|
||||
request.session["oidc_access_token"] = "old_token"
|
||||
store_oidc_refresh_token(request.session, "refresh_token")
|
||||
request.session["oidc_token_expiration"] = time.time() - 100
|
||||
request.session.save()
|
||||
|
||||
responses.add(
|
||||
responses.POST,
|
||||
"https://auth.example.com/token",
|
||||
body={"error": "invalid_grant"}, # invalid format dict
|
||||
status=200,
|
||||
)
|
||||
|
||||
middleware = RefreshOIDCAccessToken(get_response)
|
||||
response = middleware.process_request(request)
|
||||
assert isinstance(response, HttpResponse)
|
||||
assert response.status_code == 401
|
||||
assert not response.has_header("refresh_url")
|
||||
@@ -487,6 +487,17 @@ class Base(Configuration):
|
||||
environ_name="OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION",
|
||||
environ_prefix=None,
|
||||
)
|
||||
OIDC_STORE_ACCESS_TOKEN = values.BooleanValue(
|
||||
default=True, environ_name="OIDC_STORE_ACCESS_TOKEN", environ_prefix=None
|
||||
)
|
||||
OIDC_STORE_REFRESH_TOKEN = values.BooleanValue(
|
||||
default=True, environ_name="OIDC_STORE_REFRESH_TOKEN", environ_prefix=None
|
||||
)
|
||||
OIDC_STORE_REFRESH_TOKEN_KEY = values.Value(
|
||||
default=None,
|
||||
environ_name="OIDC_STORE_REFRESH_TOKEN_KEY",
|
||||
environ_prefix=None,
|
||||
)
|
||||
|
||||
# WARNING: Enabling this setting allows multiple user accounts to share the same email
|
||||
# address. This may cause security issues and is not recommended for production use when
|
||||
|
||||
Reference in New Issue
Block a user