mirror of
https://github.com/suitenumerique/docs.git
synced 2026-05-05 06:32:21 +02:00
Compare commits
3 Commits
documentat
...
qbey/oidc-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f30c4ff4b3 | ||
|
|
589346acba | ||
|
|
ea8b8be5f0 |
@@ -8,6 +8,10 @@ and this project adheres to
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## Added
|
||||||
|
|
||||||
|
- ✨(oidc) add refresh token tools #584
|
||||||
|
|
||||||
## [2.5.0] - 2025-03-18
|
## [2.5.0] - 2025-03-18
|
||||||
|
|
||||||
## Added
|
## Added
|
||||||
|
|||||||
@@ -4,3 +4,6 @@ BURST_THROTTLE_RATES="200/minute"
|
|||||||
DJANGO_SERVER_TO_SERVER_API_TOKENS=test-e2e
|
DJANGO_SERVER_TO_SERVER_API_TOKENS=test-e2e
|
||||||
Y_PROVIDER_API_KEY=yprovider-api-key
|
Y_PROVIDER_API_KEY=yprovider-api-key
|
||||||
Y_PROVIDER_API_BASE_URL=http://y-provider:4444/api/
|
Y_PROVIDER_API_BASE_URL=http://y-provider:4444/api/
|
||||||
|
|
||||||
|
# - add a key to store the refresh token in tests
|
||||||
|
OIDC_STORE_REFRESH_TOKEN_KEY=qnw7gZrOFLkLuZIixzuxksNORFJyjWyi5ACugNchKJY=
|
||||||
|
|||||||
@@ -1,21 +1,59 @@
|
|||||||
"""Authentication Backends for the Impress core app."""
|
"""Authentication Backends for the Impress core app."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from functools import lru_cache
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.exceptions import SuspiciousOperation
|
from django.core.exceptions import SuspiciousOperation
|
||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
from cryptography.fernet import Fernet
|
||||||
from mozilla_django_oidc.auth import (
|
from mozilla_django_oidc.auth import (
|
||||||
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
|
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
|
||||||
)
|
)
|
||||||
|
from mozilla_django_oidc.utils import import_from_settings
|
||||||
|
|
||||||
from core.models import DuplicateEmailError, User
|
from core.models import DuplicateEmailError, User
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache(maxsize=0)
|
||||||
|
def get_cipher_suite():
|
||||||
|
"""Return a Fernet cipher suite."""
|
||||||
|
key = import_from_settings("OIDC_STORE_REFRESH_TOKEN_KEY", None)
|
||||||
|
if not key:
|
||||||
|
raise ValueError("OIDC_STORE_REFRESH_TOKEN_KEY setting is required.")
|
||||||
|
return Fernet(key)
|
||||||
|
|
||||||
|
|
||||||
|
def store_oidc_refresh_token(session, refresh_token):
|
||||||
|
"""Store the encrypted OIDC refresh token in the session if enabled in settings."""
|
||||||
|
if import_from_settings("OIDC_STORE_REFRESH_TOKEN", False):
|
||||||
|
encrypted_token = get_cipher_suite().encrypt(refresh_token.encode())
|
||||||
|
session["oidc_refresh_token"] = encrypted_token.decode()
|
||||||
|
|
||||||
|
|
||||||
|
def get_oidc_refresh_token(session):
|
||||||
|
"""Retrieve and decrypt the OIDC refresh token from the session."""
|
||||||
|
encrypted_token = session.get("oidc_refresh_token")
|
||||||
|
if encrypted_token:
|
||||||
|
return get_cipher_suite().decrypt(encrypted_token.encode()).decode()
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def store_tokens(session, access_token, id_token, refresh_token):
|
||||||
|
"""Store tokens in the session if enabled in settings."""
|
||||||
|
if import_from_settings("OIDC_STORE_ACCESS_TOKEN", False):
|
||||||
|
session["oidc_access_token"] = access_token
|
||||||
|
|
||||||
|
if import_from_settings("OIDC_STORE_ID_TOKEN", False):
|
||||||
|
session["oidc_id_token"] = id_token
|
||||||
|
|
||||||
|
store_oidc_refresh_token(session, refresh_token)
|
||||||
|
|
||||||
|
|
||||||
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
||||||
"""Custom OpenID Connect (OIDC) Authentication Backend.
|
"""Custom OpenID Connect (OIDC) Authentication Backend.
|
||||||
|
|
||||||
@@ -23,6 +61,40 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
|||||||
in the User and Identity models, and handles signed and/or encrypted UserInfo response.
|
in the User and Identity models, and handles signed and/or encrypted UserInfo response.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Initialize the OIDC Authentication Backend.
|
||||||
|
|
||||||
|
Adds an internal attribute to store the token_info dictionary.
|
||||||
|
The purpose of `self._token_info` is to not duplicate code from
|
||||||
|
the original `authenticate` method.
|
||||||
|
This won't be needed after https://github.com/mozilla/mozilla-django-oidc/pull/377
|
||||||
|
is merged.
|
||||||
|
"""
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self._token_info = None
|
||||||
|
|
||||||
|
def get_token(self, payload):
|
||||||
|
"""
|
||||||
|
Return token object as a dictionary.
|
||||||
|
|
||||||
|
Store the value to extract the refresh token in the `authenticate` method.
|
||||||
|
"""
|
||||||
|
self._token_info = super().get_token(payload)
|
||||||
|
return self._token_info
|
||||||
|
|
||||||
|
def authenticate(self, request, **kwargs):
|
||||||
|
"""Authenticates a user based on the OIDC code flow."""
|
||||||
|
user = super().authenticate(request, **kwargs)
|
||||||
|
|
||||||
|
if user is not None:
|
||||||
|
# Then the user successfully authenticated
|
||||||
|
store_oidc_refresh_token(
|
||||||
|
request.session, self._token_info.get("refresh_token")
|
||||||
|
)
|
||||||
|
|
||||||
|
return user
|
||||||
|
|
||||||
def get_userinfo(self, access_token, id_token, payload):
|
def get_userinfo(self, access_token, id_token, payload):
|
||||||
"""Return user details dictionary.
|
"""Return user details dictionary.
|
||||||
|
|
||||||
|
|||||||
12
src/backend/core/authentication/decorators.py
Normal file
12
src/backend/core/authentication/decorators.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
"""
|
||||||
|
Decorators for the authentication app.
|
||||||
|
|
||||||
|
We don't want (yet) to enforce the OIDC access token to be "fresh" for all
|
||||||
|
views, so we provide a decorator to refresh the access token only when needed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from django.utils.decorators import decorator_from_middleware
|
||||||
|
|
||||||
|
from .middleware import RefreshOIDCAccessToken
|
||||||
|
|
||||||
|
refresh_oidc_access_token = decorator_from_middleware(RefreshOIDCAccessToken)
|
||||||
199
src/backend/core/authentication/middleware.py
Normal file
199
src/backend/core/authentication/middleware.py
Normal file
@@ -0,0 +1,199 @@
|
|||||||
|
"""
|
||||||
|
Module to declare a RefreshOIDCAccessToken middleware that extends the
|
||||||
|
mozilla_django_oidc.middleware.SessionRefresh middleware to refresh the
|
||||||
|
access token when it expires, based on the OIDC provided refresh token.
|
||||||
|
|
||||||
|
This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377
|
||||||
|
which is still not merged.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from urllib.parse import quote, urlencode
|
||||||
|
|
||||||
|
from django.http import JsonResponse
|
||||||
|
from django.urls import reverse
|
||||||
|
from django.utils.crypto import get_random_string
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from mozilla_django_oidc.middleware import SessionRefresh
|
||||||
|
|
||||||
|
try:
|
||||||
|
from mozilla_django_oidc.middleware import ( # pylint: disable=unused-import
|
||||||
|
RefreshOIDCAccessToken as MozillaRefreshOIDCAccessToken,
|
||||||
|
)
|
||||||
|
|
||||||
|
# If the import is successful, raise an error to notify the user that the
|
||||||
|
# version of mozilla_django_oidc added the expected middleware, and we don't need
|
||||||
|
# our implementation anymore.
|
||||||
|
# See https://github.com/mozilla/mozilla-django-oidc/pull/377
|
||||||
|
raise RuntimeError("This version of mozilla_django_oidc has RefreshOIDCAccessToken")
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
from mozilla_django_oidc.utils import (
|
||||||
|
absolutify,
|
||||||
|
add_state_and_verifier_and_nonce_to_session,
|
||||||
|
import_from_settings,
|
||||||
|
)
|
||||||
|
|
||||||
|
from core.authentication.backends import get_oidc_refresh_token, store_tokens
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RefreshOIDCAccessToken(SessionRefresh):
|
||||||
|
"""
|
||||||
|
A middleware that will refresh the access token following proper OIDC protocol:
|
||||||
|
https://auth0.com/docs/tokens/refresh-token/current
|
||||||
|
|
||||||
|
This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377
|
||||||
|
but limited to our needs (YAGNI/KISS).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _prepare_reauthorization(self, request):
|
||||||
|
"""
|
||||||
|
Constructs a new authorization grant request to refresh the session.
|
||||||
|
Besides constructing the request, the state and nonce included in the
|
||||||
|
request are registered in the current session in preparation for the
|
||||||
|
client following through with the authorization flow.
|
||||||
|
"""
|
||||||
|
auth_url = self.OIDC_OP_AUTHORIZATION_ENDPOINT
|
||||||
|
client_id = self.OIDC_RP_CLIENT_ID
|
||||||
|
state = get_random_string(self.OIDC_STATE_SIZE)
|
||||||
|
|
||||||
|
# Build the parameters as if we were doing a real auth handoff, except
|
||||||
|
# we also include prompt=none.
|
||||||
|
auth_params = {
|
||||||
|
"response_type": "code",
|
||||||
|
"client_id": client_id,
|
||||||
|
"redirect_uri": absolutify(
|
||||||
|
request, reverse(self.OIDC_AUTHENTICATION_CALLBACK_URL)
|
||||||
|
),
|
||||||
|
"state": state,
|
||||||
|
"scope": self.OIDC_RP_SCOPES,
|
||||||
|
"prompt": "none",
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.OIDC_USE_NONCE:
|
||||||
|
nonce = get_random_string(self.OIDC_NONCE_SIZE)
|
||||||
|
auth_params.update({"nonce": nonce})
|
||||||
|
|
||||||
|
# Register the one-time parameters in the session
|
||||||
|
add_state_and_verifier_and_nonce_to_session(request, state, auth_params)
|
||||||
|
request.session["oidc_login_next"] = request.get_full_path()
|
||||||
|
|
||||||
|
query = urlencode(auth_params, quote_via=quote)
|
||||||
|
return f"{auth_url}?{query}"
|
||||||
|
|
||||||
|
def is_expired(self, request):
|
||||||
|
"""Check whether the access token is expired and needs to be refreshed."""
|
||||||
|
if not self.is_refreshable_url(request):
|
||||||
|
logger.debug("request is not refreshable")
|
||||||
|
return False
|
||||||
|
|
||||||
|
expiration = request.session.get("oidc_token_expiration", 0)
|
||||||
|
now = time.time()
|
||||||
|
if expiration > now:
|
||||||
|
# The id_token is still valid, so we don't have to do anything.
|
||||||
|
logger.debug("id token is still valid (%s > %s)", expiration, now)
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def finish(self, request, prompt_reauth=True):
|
||||||
|
"""Finish request handling and handle sending downstream responses for XHR.
|
||||||
|
This function should only be run if the session is determind to
|
||||||
|
be expired.
|
||||||
|
Almost all XHR request handling in client-side code struggles
|
||||||
|
with redirects since redirecting to a page where the user
|
||||||
|
is supposed to do something is extremely unlikely to work
|
||||||
|
in an XHR request. Make a special response for these kinds
|
||||||
|
of requests.
|
||||||
|
The use of 403 Forbidden is to match the fact that this
|
||||||
|
middleware doesn't really want the user in if they don't
|
||||||
|
refresh their session.
|
||||||
|
|
||||||
|
WARNING: this varies from the original implementation:
|
||||||
|
- to return a 401 status code
|
||||||
|
- to consider all requests as XHR requests
|
||||||
|
"""
|
||||||
|
xhr_response_json = {"error": "the authentication session has expired"}
|
||||||
|
if prompt_reauth:
|
||||||
|
# The id_token has expired, so we have to re-authenticate silently.
|
||||||
|
refresh_url = self._prepare_reauthorization(request)
|
||||||
|
xhr_response_json["refresh_url"] = refresh_url
|
||||||
|
|
||||||
|
xhr_response = JsonResponse(xhr_response_json, status=401)
|
||||||
|
if "refresh_url" in xhr_response_json:
|
||||||
|
xhr_response["refresh_url"] = xhr_response_json["refresh_url"]
|
||||||
|
return xhr_response
|
||||||
|
|
||||||
|
def process_request(self, request): # noqa: PLR0911 # pylint: disable=too-many-return-statements
|
||||||
|
"""Process the request and refresh the access token if necessary."""
|
||||||
|
if not self.is_expired(request):
|
||||||
|
return None
|
||||||
|
|
||||||
|
token_url = self.get_settings("OIDC_OP_TOKEN_ENDPOINT")
|
||||||
|
client_id = self.get_settings("OIDC_RP_CLIENT_ID")
|
||||||
|
client_secret = self.get_settings("OIDC_RP_CLIENT_SECRET")
|
||||||
|
refresh_token = get_oidc_refresh_token(request.session)
|
||||||
|
|
||||||
|
if not refresh_token:
|
||||||
|
logger.debug("no refresh token stored")
|
||||||
|
return self.finish(request, prompt_reauth=True)
|
||||||
|
|
||||||
|
token_payload = {
|
||||||
|
"grant_type": "refresh_token",
|
||||||
|
"client_id": client_id,
|
||||||
|
"client_secret": client_secret,
|
||||||
|
"refresh_token": refresh_token,
|
||||||
|
}
|
||||||
|
|
||||||
|
req_auth = None
|
||||||
|
if self.get_settings("OIDC_TOKEN_USE_BASIC_AUTH", False):
|
||||||
|
# supported in https://github.com/mozilla/mozilla-django-oidc/pull/377
|
||||||
|
# but we don't need it, so enforce error here.
|
||||||
|
raise RuntimeError("OIDC_TOKEN_USE_BASIC_AUTH is not supported")
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.post(
|
||||||
|
token_url,
|
||||||
|
auth=req_auth,
|
||||||
|
data=token_payload,
|
||||||
|
verify=import_from_settings("OIDC_VERIFY_SSL", True),
|
||||||
|
timeout=import_from_settings("OIDC_TIMEOUT", 3),
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
token_info = response.json()
|
||||||
|
except requests.exceptions.Timeout:
|
||||||
|
logger.debug("timed out refreshing access token")
|
||||||
|
# Don't prompt for reauth as this could be a temporary problem
|
||||||
|
return self.finish(request, prompt_reauth=False)
|
||||||
|
except requests.exceptions.HTTPError as exc:
|
||||||
|
status_code = exc.response.status_code
|
||||||
|
logger.debug("http error %s when refreshing access token", status_code)
|
||||||
|
# OAuth error response will be a 400 for various situations, including
|
||||||
|
# an expired token. https://datatracker.ietf.org/doc/html/rfc6749#section-5.2
|
||||||
|
return self.finish(request, prompt_reauth=status_code == 400)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logger.debug("malformed response when refreshing access token")
|
||||||
|
# Don't prompt for reauth as this could be a temporary problem
|
||||||
|
return self.finish(request, prompt_reauth=False)
|
||||||
|
except Exception as exc: # pylint: disable=broad-except
|
||||||
|
logger.exception(
|
||||||
|
"unknown error occurred when refreshing access token: %s", exc
|
||||||
|
)
|
||||||
|
# Don't prompt for reauth as this could be a temporary problem
|
||||||
|
return self.finish(request, prompt_reauth=False)
|
||||||
|
|
||||||
|
# Until we can properly validate an ID token on the refresh response
|
||||||
|
# per the spec[1], we intentionally drop the id_token.
|
||||||
|
# [1]: https://openid.net/specs/openid-connect-core-1_0.html#RefreshTokenResponse
|
||||||
|
id_token = None
|
||||||
|
access_token = token_info.get("access_token")
|
||||||
|
refresh_token = token_info.get("refresh_token")
|
||||||
|
store_tokens(request.session, access_token, id_token, refresh_token)
|
||||||
|
|
||||||
|
return None
|
||||||
@@ -10,14 +10,37 @@ from django.test.utils import override_settings
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import responses
|
import responses
|
||||||
|
from cryptography.fernet import Fernet
|
||||||
|
|
||||||
from core import models
|
from core import models
|
||||||
from core.authentication.backends import OIDCAuthenticationBackend
|
from core.authentication.backends import (
|
||||||
|
OIDCAuthenticationBackend,
|
||||||
|
get_oidc_refresh_token,
|
||||||
|
store_oidc_refresh_token,
|
||||||
|
)
|
||||||
from core.factories import UserFactory
|
from core.factories import UserFactory
|
||||||
|
|
||||||
pytestmark = pytest.mark.django_db
|
pytestmark = pytest.mark.django_db
|
||||||
|
|
||||||
|
|
||||||
|
def test_oidc_refresh_token_session_store(settings):
|
||||||
|
"""Test that the OIDC refresh token is stored and retrieved from the session."""
|
||||||
|
session = {}
|
||||||
|
|
||||||
|
with pytest.raises(
|
||||||
|
ValueError, match="OIDC_STORE_REFRESH_TOKEN_KEY setting is required."
|
||||||
|
):
|
||||||
|
store_oidc_refresh_token(session, "test-refresh-token")
|
||||||
|
|
||||||
|
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
|
||||||
|
|
||||||
|
store_oidc_refresh_token(session, "test-refresh-token")
|
||||||
|
assert session["oidc_refresh_token"] is not None
|
||||||
|
assert session["oidc_refresh_token"] != "test-refresh-token"
|
||||||
|
|
||||||
|
assert get_oidc_refresh_token(session) == "test-refresh-token"
|
||||||
|
|
||||||
|
|
||||||
def test_authentication_getter_existing_user_no_email(
|
def test_authentication_getter_existing_user_no_email(
|
||||||
django_assert_num_queries, monkeypatch
|
django_assert_num_queries, monkeypatch
|
||||||
):
|
):
|
||||||
@@ -547,3 +570,56 @@ def test_authentication_verify_claims_success(django_assert_num_queries, monkeyp
|
|||||||
assert user.full_name == "Doe"
|
assert user.full_name == "Doe"
|
||||||
assert user.short_name is None
|
assert user.short_name is None
|
||||||
assert user.email == "john.doe@example.com"
|
assert user.email == "john.doe@example.com"
|
||||||
|
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_authentication_session_tokens(
|
||||||
|
django_assert_num_queries, monkeypatch, rf, settings
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Test that the session contains oidc_refresh_token and oidc_access_token after authentication.
|
||||||
|
"""
|
||||||
|
settings.OIDC_OP_TOKEN_ENDPOINT = "http://oidc.endpoint.test/token"
|
||||||
|
settings.OIDC_OP_USER_ENDPOINT = "http://oidc.endpoint.test/userinfo"
|
||||||
|
settings.OIDC_OP_JWKS_ENDPOINT = "http://oidc.endpoint.test/jwks"
|
||||||
|
settings.OIDC_STORE_ACCESS_TOKEN = True
|
||||||
|
settings.OIDC_STORE_REFRESH_TOKEN = True
|
||||||
|
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
|
||||||
|
|
||||||
|
klass = OIDCAuthenticationBackend()
|
||||||
|
request = rf.get("/some-url", {"state": "test-state", "code": "test-code"})
|
||||||
|
request.session = {}
|
||||||
|
|
||||||
|
def verify_token_mocked(*args, **kwargs):
|
||||||
|
return {"sub": "123", "email": "test@example.com"}
|
||||||
|
|
||||||
|
monkeypatch.setattr(OIDCAuthenticationBackend, "verify_token", verify_token_mocked)
|
||||||
|
|
||||||
|
responses.add(
|
||||||
|
responses.POST,
|
||||||
|
re.compile(settings.OIDC_OP_TOKEN_ENDPOINT),
|
||||||
|
json={
|
||||||
|
"access_token": "test-access-token",
|
||||||
|
"refresh_token": "test-refresh-token",
|
||||||
|
},
|
||||||
|
status=200,
|
||||||
|
)
|
||||||
|
|
||||||
|
responses.add(
|
||||||
|
responses.GET,
|
||||||
|
re.compile(settings.OIDC_OP_USER_ENDPOINT),
|
||||||
|
json={"sub": "123", "email": "test@example.com"},
|
||||||
|
status=200,
|
||||||
|
)
|
||||||
|
|
||||||
|
with django_assert_num_queries(6):
|
||||||
|
user = klass.authenticate(
|
||||||
|
request,
|
||||||
|
code="test-code",
|
||||||
|
nonce="test-nonce",
|
||||||
|
code_verifier="test-code-verifier",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert user is not None
|
||||||
|
assert request.session["oidc_access_token"] == "test-access-token"
|
||||||
|
assert get_oidc_refresh_token(request.session) == "test-refresh-token"
|
||||||
|
|||||||
55
src/backend/core/tests/authentication/test_decorators.py
Normal file
55
src/backend/core/tests/authentication/test_decorators.py
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
"""Tests for the refresh_oidc_access_token decorator in core app."""
|
||||||
|
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from django.http import HttpResponse
|
||||||
|
from django.test import RequestFactory
|
||||||
|
from django.utils.decorators import method_decorator
|
||||||
|
from django.views import View
|
||||||
|
|
||||||
|
from core.authentication.decorators import refresh_oidc_access_token
|
||||||
|
|
||||||
|
|
||||||
|
class RefreshOIDCAccessTokenView(View):
|
||||||
|
"""
|
||||||
|
A Django view that uses the refresh_oidc_access_token decorator to refresh
|
||||||
|
the OIDC access token before processing the request.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@method_decorator(refresh_oidc_access_token)
|
||||||
|
def dispatch(self, request, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Overrides the dispatch method to apply the refresh_oidc_access_token decorator.
|
||||||
|
"""
|
||||||
|
return super().dispatch(request, *args, **kwargs)
|
||||||
|
|
||||||
|
def get(self, request, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Handles GET requests.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
HttpResponse: A simple HTTP response with "OK" as the content.
|
||||||
|
"""
|
||||||
|
return HttpResponse("OK")
|
||||||
|
|
||||||
|
|
||||||
|
def test_refresh_oidc_access_token_decorator():
|
||||||
|
"""
|
||||||
|
Tests the refresh_oidc_access_token decorator is called on RefreshOIDCAccessTokenView access.
|
||||||
|
|
||||||
|
The test creates a mock request and patches the dispatch method to verify that it is called
|
||||||
|
with the correct request object.
|
||||||
|
"""
|
||||||
|
# Create a test request
|
||||||
|
factory = RequestFactory()
|
||||||
|
request = factory.get("/")
|
||||||
|
|
||||||
|
# Mock the OIDC refresh functionality
|
||||||
|
with patch(
|
||||||
|
"core.authentication.middleware.RefreshOIDCAccessToken.process_request"
|
||||||
|
) as mock_refresh:
|
||||||
|
# Call the decorated view
|
||||||
|
RefreshOIDCAccessTokenView.as_view()(request)
|
||||||
|
|
||||||
|
# Assert that the refresh method was called
|
||||||
|
mock_refresh.assert_called_once_with(request)
|
||||||
327
src/backend/core/tests/authentication/test_middleware.py
Normal file
327
src/backend/core/tests/authentication/test_middleware.py
Normal file
@@ -0,0 +1,327 @@
|
|||||||
|
"""Tests for the RefreshOIDCAccessToken middleware."""
|
||||||
|
|
||||||
|
import time
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
from django.contrib.auth.models import AnonymousUser
|
||||||
|
from django.contrib.sessions.middleware import SessionMiddleware
|
||||||
|
from django.http import HttpResponse, JsonResponse
|
||||||
|
from django.test import RequestFactory
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import requests.exceptions
|
||||||
|
import responses
|
||||||
|
from cryptography.fernet import Fernet
|
||||||
|
|
||||||
|
from core import factories
|
||||||
|
from core.authentication.backends import (
|
||||||
|
get_cipher_suite,
|
||||||
|
get_oidc_refresh_token,
|
||||||
|
store_oidc_refresh_token,
|
||||||
|
)
|
||||||
|
from core.authentication.middleware import RefreshOIDCAccessToken
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.django_db
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(name="oidc_settings")
|
||||||
|
def fixture_oidc_settings(settings):
|
||||||
|
"""Fixture to configure OIDC settings for the tests."""
|
||||||
|
settings.OIDC_OP_TOKEN_ENDPOINT = "https://auth.example.com/token"
|
||||||
|
settings.OIDC_OP_AUTHORIZATION_ENDPOINT = "https://auth.example.com/authorize"
|
||||||
|
settings.OIDC_RP_CLIENT_ID = "client_id"
|
||||||
|
settings.OIDC_RP_CLIENT_SECRET = "client_secret"
|
||||||
|
settings.OIDC_AUTHENTICATION_CALLBACK_URL = "oidc_authentication_callback"
|
||||||
|
settings.OIDC_RP_SCOPES = "openid email"
|
||||||
|
settings.OIDC_USE_NONCE = True
|
||||||
|
settings.OIDC_STATE_SIZE = 32
|
||||||
|
settings.OIDC_NONCE_SIZE = 32
|
||||||
|
settings.OIDC_VERIFY_SSL = True
|
||||||
|
settings.OIDC_TOKEN_USE_BASIC_AUTH = False
|
||||||
|
settings.OIDC_STORE_ACCESS_TOKEN = True
|
||||||
|
settings.OIDC_STORE_REFRESH_TOKEN = True
|
||||||
|
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
|
||||||
|
|
||||||
|
get_cipher_suite.cache_clear()
|
||||||
|
|
||||||
|
yield settings
|
||||||
|
|
||||||
|
get_cipher_suite.cache_clear()
|
||||||
|
|
||||||
|
|
||||||
|
def test_anonymous_user(oidc_settings): # pylint: disable=unused-argument
|
||||||
|
"""
|
||||||
|
When the user is not authenticated, this
|
||||||
|
is not the purpose of the middleware to manage anything.
|
||||||
|
"""
|
||||||
|
request = RequestFactory().get("/test")
|
||||||
|
request.user = AnonymousUser()
|
||||||
|
|
||||||
|
get_response = MagicMock()
|
||||||
|
session_middleware = SessionMiddleware(get_response)
|
||||||
|
session_middleware.process_request(request)
|
||||||
|
|
||||||
|
middleware = RefreshOIDCAccessToken(get_response)
|
||||||
|
response = middleware.process_request(request)
|
||||||
|
assert response is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_no_refresh_token(oidc_settings): # pylint: disable=unused-argument
|
||||||
|
"""
|
||||||
|
When the session does not contain a refresh token,
|
||||||
|
the middleware should return a 401 response containing
|
||||||
|
the URL to authenticate again.
|
||||||
|
"""
|
||||||
|
user = factories.UserFactory()
|
||||||
|
|
||||||
|
request = RequestFactory().get("/test")
|
||||||
|
request.user = user
|
||||||
|
|
||||||
|
get_response = MagicMock()
|
||||||
|
session_middleware = SessionMiddleware(get_response)
|
||||||
|
session_middleware.process_request(request)
|
||||||
|
|
||||||
|
request.session["oidc_access_token"] = ("expired_token",)
|
||||||
|
request.session["oidc_token_expiration"] = time.time() - 100
|
||||||
|
|
||||||
|
middleware = RefreshOIDCAccessToken(get_response)
|
||||||
|
response = middleware.process_request(request)
|
||||||
|
assert isinstance(response, JsonResponse)
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.has_header("refresh_url")
|
||||||
|
assert response["refresh_url"].startswith("https://auth.example.com/authorize")
|
||||||
|
|
||||||
|
|
||||||
|
def test_basic_auth_disabled(oidc_settings): # pylint: disable=unused-argument
|
||||||
|
"""We don't support OIDC_TOKEN_USE_BASIC_AUTH"""
|
||||||
|
oidc_settings.OIDC_TOKEN_USE_BASIC_AUTH = True
|
||||||
|
|
||||||
|
user = factories.UserFactory()
|
||||||
|
|
||||||
|
request = RequestFactory().get("/test")
|
||||||
|
request.user = user
|
||||||
|
|
||||||
|
get_response = MagicMock()
|
||||||
|
session_middleware = SessionMiddleware(get_response)
|
||||||
|
session_middleware.process_request(request)
|
||||||
|
|
||||||
|
request.session["oidc_access_token"] = "old_token"
|
||||||
|
store_oidc_refresh_token(request.session, "refresh_token")
|
||||||
|
request.session["oidc_token_expiration"] = time.time() - 100
|
||||||
|
request.session.save()
|
||||||
|
|
||||||
|
middleware = RefreshOIDCAccessToken(get_response)
|
||||||
|
with pytest.raises(RuntimeError) as excinfo:
|
||||||
|
middleware.process_request(request)
|
||||||
|
|
||||||
|
assert str(excinfo.value) == "OIDC_TOKEN_USE_BASIC_AUTH is not supported"
|
||||||
|
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_successful_token_refresh(oidc_settings): # pylint: disable=unused-argument
|
||||||
|
"""Test that the middleware successfully refreshes the token."""
|
||||||
|
user = factories.UserFactory()
|
||||||
|
|
||||||
|
request = RequestFactory().get("/test")
|
||||||
|
request.user = user
|
||||||
|
|
||||||
|
get_response = MagicMock()
|
||||||
|
session_middleware = SessionMiddleware(get_response)
|
||||||
|
session_middleware.process_request(request)
|
||||||
|
|
||||||
|
request.session["oidc_access_token"] = "old_token"
|
||||||
|
store_oidc_refresh_token(request.session, "refresh_token")
|
||||||
|
request.session["oidc_token_expiration"] = time.time() - 100
|
||||||
|
request.session.save()
|
||||||
|
|
||||||
|
responses.add(
|
||||||
|
responses.POST,
|
||||||
|
"https://auth.example.com/token",
|
||||||
|
json={"access_token": "new_token", "refresh_token": "new_refresh_token"},
|
||||||
|
status=200,
|
||||||
|
)
|
||||||
|
|
||||||
|
middleware = RefreshOIDCAccessToken(get_response)
|
||||||
|
response = middleware.process_request(request)
|
||||||
|
request.session.save()
|
||||||
|
|
||||||
|
assert response is None
|
||||||
|
assert request.session["oidc_access_token"] == "new_token"
|
||||||
|
assert get_oidc_refresh_token(request.session) == "new_refresh_token"
|
||||||
|
|
||||||
|
|
||||||
|
def test_non_expired_token(oidc_settings): # pylint: disable=unused-argument
|
||||||
|
"""Test that the middleware does nothing when the token is not expired."""
|
||||||
|
user = factories.UserFactory()
|
||||||
|
|
||||||
|
request = RequestFactory().get("/test")
|
||||||
|
request.user = user
|
||||||
|
|
||||||
|
get_response = MagicMock()
|
||||||
|
session_middleware = SessionMiddleware(get_response)
|
||||||
|
session_middleware.process_request(request)
|
||||||
|
request.session["oidc_access_token"] = ("valid_token",)
|
||||||
|
request.session["oidc_token_expiration"] = time.time() + 3600
|
||||||
|
request.session.save()
|
||||||
|
|
||||||
|
middleware = RefreshOIDCAccessToken(get_response)
|
||||||
|
|
||||||
|
response = middleware.process_request(request)
|
||||||
|
assert response is None
|
||||||
|
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_refresh_token_request_timeout(oidc_settings): # pylint: disable=unused-argument
|
||||||
|
"""Test that the middleware returns a 401 response when the token refresh request times out."""
|
||||||
|
user = factories.UserFactory()
|
||||||
|
request = RequestFactory().get("/test")
|
||||||
|
request.user = user
|
||||||
|
|
||||||
|
get_response = MagicMock()
|
||||||
|
session_middleware = SessionMiddleware(get_response)
|
||||||
|
session_middleware.process_request(request)
|
||||||
|
request.session["oidc_access_token"] = "old_token"
|
||||||
|
store_oidc_refresh_token(request.session, "refresh_token")
|
||||||
|
request.session["oidc_token_expiration"] = time.time() - 100
|
||||||
|
request.session.save()
|
||||||
|
|
||||||
|
responses.add(
|
||||||
|
responses.POST,
|
||||||
|
"https://auth.example.com/token",
|
||||||
|
body=requests.exceptions.Timeout("timeout"),
|
||||||
|
)
|
||||||
|
|
||||||
|
middleware = RefreshOIDCAccessToken(get_response)
|
||||||
|
response = middleware.process_request(request)
|
||||||
|
assert isinstance(response, HttpResponse)
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert not response.has_header("refresh_url")
|
||||||
|
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_refresh_token_request_error_400(oidc_settings): # pylint: disable=unused-argument
|
||||||
|
"""
|
||||||
|
Test that the middleware returns a 401 response when the token
|
||||||
|
refresh request returns a 400 error.
|
||||||
|
"""
|
||||||
|
user = factories.UserFactory()
|
||||||
|
request = RequestFactory().get("/test")
|
||||||
|
request.user = user
|
||||||
|
|
||||||
|
get_response = MagicMock()
|
||||||
|
session_middleware = SessionMiddleware(get_response)
|
||||||
|
session_middleware.process_request(request)
|
||||||
|
request.session["oidc_access_token"] = "old_token"
|
||||||
|
store_oidc_refresh_token(request.session, "refresh_token")
|
||||||
|
request.session["oidc_token_expiration"] = time.time() - 100
|
||||||
|
request.session.save()
|
||||||
|
|
||||||
|
responses.add(
|
||||||
|
responses.POST,
|
||||||
|
"https://auth.example.com/token",
|
||||||
|
json={"error": "invalid_grant"},
|
||||||
|
status=400,
|
||||||
|
)
|
||||||
|
|
||||||
|
middleware = RefreshOIDCAccessToken(get_response)
|
||||||
|
response = middleware.process_request(request)
|
||||||
|
assert isinstance(response, HttpResponse)
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.has_header("refresh_url")
|
||||||
|
assert response["refresh_url"].startswith("https://auth.example.com/authorize")
|
||||||
|
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_refresh_token_request_error(oidc_settings): # pylint: disable=unused-argument
|
||||||
|
"""
|
||||||
|
Test that the middleware returns a 401 response when
|
||||||
|
the token refresh request returns a 404 error.
|
||||||
|
"""
|
||||||
|
user = factories.UserFactory()
|
||||||
|
request = RequestFactory().get("/test")
|
||||||
|
request.user = user
|
||||||
|
|
||||||
|
get_response = MagicMock()
|
||||||
|
session_middleware = SessionMiddleware(get_response)
|
||||||
|
session_middleware.process_request(request)
|
||||||
|
request.session["oidc_access_token"] = "old_token"
|
||||||
|
store_oidc_refresh_token(request.session, "refresh_token")
|
||||||
|
request.session["oidc_token_expiration"] = time.time() - 100
|
||||||
|
request.session.save()
|
||||||
|
|
||||||
|
responses.add(
|
||||||
|
responses.POST,
|
||||||
|
"https://auth.example.com/token",
|
||||||
|
json={"error": "invalid_grant"},
|
||||||
|
status=404,
|
||||||
|
)
|
||||||
|
|
||||||
|
middleware = RefreshOIDCAccessToken(get_response)
|
||||||
|
response = middleware.process_request(request)
|
||||||
|
assert isinstance(response, HttpResponse)
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert not response.has_header("refresh_url")
|
||||||
|
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_refresh_token_request_malformed_json_error(oidc_settings): # pylint: disable=unused-argument
|
||||||
|
"""
|
||||||
|
Test that the middleware returns a 401 response
|
||||||
|
when the token refresh request returns malformed JSON.
|
||||||
|
"""
|
||||||
|
user = factories.UserFactory()
|
||||||
|
request = RequestFactory().get("/test")
|
||||||
|
request.user = user
|
||||||
|
|
||||||
|
get_response = MagicMock()
|
||||||
|
session_middleware = SessionMiddleware(get_response)
|
||||||
|
session_middleware.process_request(request)
|
||||||
|
request.session["oidc_access_token"] = "old_token"
|
||||||
|
store_oidc_refresh_token(request.session, "refresh_token")
|
||||||
|
request.session["oidc_token_expiration"] = time.time() - 100
|
||||||
|
request.session.save()
|
||||||
|
|
||||||
|
responses.add(
|
||||||
|
responses.POST,
|
||||||
|
"https://auth.example.com/token",
|
||||||
|
body="malformed json",
|
||||||
|
status=200,
|
||||||
|
)
|
||||||
|
|
||||||
|
middleware = RefreshOIDCAccessToken(get_response)
|
||||||
|
response = middleware.process_request(request)
|
||||||
|
assert isinstance(response, HttpResponse)
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert not response.has_header("refresh_url")
|
||||||
|
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_refresh_token_request_exception(oidc_settings): # pylint: disable=unused-argument
|
||||||
|
"""
|
||||||
|
Test that the middleware returns a 401 response
|
||||||
|
when the token refresh request raises an exception.
|
||||||
|
"""
|
||||||
|
user = factories.UserFactory()
|
||||||
|
request = RequestFactory().get("/test")
|
||||||
|
request.user = user
|
||||||
|
|
||||||
|
get_response = MagicMock()
|
||||||
|
session_middleware = SessionMiddleware(get_response)
|
||||||
|
session_middleware.process_request(request)
|
||||||
|
request.session["oidc_access_token"] = "old_token"
|
||||||
|
store_oidc_refresh_token(request.session, "refresh_token")
|
||||||
|
request.session["oidc_token_expiration"] = time.time() - 100
|
||||||
|
request.session.save()
|
||||||
|
|
||||||
|
responses.add(
|
||||||
|
responses.POST,
|
||||||
|
"https://auth.example.com/token",
|
||||||
|
body={"error": "invalid_grant"}, # invalid format dict
|
||||||
|
status=200,
|
||||||
|
)
|
||||||
|
|
||||||
|
middleware = RefreshOIDCAccessToken(get_response)
|
||||||
|
response = middleware.process_request(request)
|
||||||
|
assert isinstance(response, HttpResponse)
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert not response.has_header("refresh_url")
|
||||||
@@ -487,6 +487,17 @@ class Base(Configuration):
|
|||||||
environ_name="OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION",
|
environ_name="OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION",
|
||||||
environ_prefix=None,
|
environ_prefix=None,
|
||||||
)
|
)
|
||||||
|
OIDC_STORE_ACCESS_TOKEN = values.BooleanValue(
|
||||||
|
default=True, environ_name="OIDC_STORE_ACCESS_TOKEN", environ_prefix=None
|
||||||
|
)
|
||||||
|
OIDC_STORE_REFRESH_TOKEN = values.BooleanValue(
|
||||||
|
default=True, environ_name="OIDC_STORE_REFRESH_TOKEN", environ_prefix=None
|
||||||
|
)
|
||||||
|
OIDC_STORE_REFRESH_TOKEN_KEY = values.Value(
|
||||||
|
default=None,
|
||||||
|
environ_name="OIDC_STORE_REFRESH_TOKEN_KEY",
|
||||||
|
environ_prefix=None,
|
||||||
|
)
|
||||||
|
|
||||||
# WARNING: Enabling this setting allows multiple user accounts to share the same email
|
# WARNING: Enabling this setting allows multiple user accounts to share the same email
|
||||||
# address. This may cause security issues and is not recommended for production use when
|
# address. This may cause security issues and is not recommended for production use when
|
||||||
|
|||||||
Reference in New Issue
Block a user