Compare commits

...

3 Commits

Author SHA1 Message Date
Quentin BEY
f30c4ff4b3 (oidc) encrypt the refresh token in session
Enforce refresh token encryption for the session storage.
2025-03-19 17:06:56 +01:00
Quentin BEY
589346acba (oidc) store refresh token in session
This code is not very clean but it reduces the footprint
to add refresh token storage in session.
This could be cleaned up once the PR it merged:
https://github.com/mozilla/mozilla-django-oidc/pull/377
2025-03-19 17:05:36 +01:00
Quentin BEY
ea8b8be5f0 (oidc) store and refresh tokens
This provides a way to to refresh the OIDC access token.
The OIDC token will be used to request data to a resource server.
This code is highly related to
https://github.com/mozilla/mozilla-django-oidc/pull/377
2025-03-19 17:05:36 +01:00
9 changed files with 760 additions and 1 deletions

View File

@@ -8,6 +8,10 @@ and this project adheres to
## [Unreleased]
## Added
- ✨(oidc) add refresh token tools #584
## [2.5.0] - 2025-03-18
## Added

View File

@@ -4,3 +4,6 @@ BURST_THROTTLE_RATES="200/minute"
DJANGO_SERVER_TO_SERVER_API_TOKENS=test-e2e
Y_PROVIDER_API_KEY=yprovider-api-key
Y_PROVIDER_API_BASE_URL=http://y-provider:4444/api/
# - add a key to store the refresh token in tests
OIDC_STORE_REFRESH_TOKEN_KEY=qnw7gZrOFLkLuZIixzuxksNORFJyjWyi5ACugNchKJY=

View File

@@ -1,21 +1,59 @@
"""Authentication Backends for the Impress core app."""
import logging
from functools import lru_cache
from django.conf import settings
from django.core.exceptions import SuspiciousOperation
from django.utils.translation import gettext_lazy as _
import requests
from cryptography.fernet import Fernet
from mozilla_django_oidc.auth import (
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
)
from mozilla_django_oidc.utils import import_from_settings
from core.models import DuplicateEmailError, User
logger = logging.getLogger(__name__)
@lru_cache(maxsize=0)
def get_cipher_suite():
"""Return a Fernet cipher suite."""
key = import_from_settings("OIDC_STORE_REFRESH_TOKEN_KEY", None)
if not key:
raise ValueError("OIDC_STORE_REFRESH_TOKEN_KEY setting is required.")
return Fernet(key)
def store_oidc_refresh_token(session, refresh_token):
"""Store the encrypted OIDC refresh token in the session if enabled in settings."""
if import_from_settings("OIDC_STORE_REFRESH_TOKEN", False):
encrypted_token = get_cipher_suite().encrypt(refresh_token.encode())
session["oidc_refresh_token"] = encrypted_token.decode()
def get_oidc_refresh_token(session):
"""Retrieve and decrypt the OIDC refresh token from the session."""
encrypted_token = session.get("oidc_refresh_token")
if encrypted_token:
return get_cipher_suite().decrypt(encrypted_token.encode()).decode()
return None
def store_tokens(session, access_token, id_token, refresh_token):
"""Store tokens in the session if enabled in settings."""
if import_from_settings("OIDC_STORE_ACCESS_TOKEN", False):
session["oidc_access_token"] = access_token
if import_from_settings("OIDC_STORE_ID_TOKEN", False):
session["oidc_id_token"] = id_token
store_oidc_refresh_token(session, refresh_token)
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
"""Custom OpenID Connect (OIDC) Authentication Backend.
@@ -23,6 +61,40 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
in the User and Identity models, and handles signed and/or encrypted UserInfo response.
"""
def __init__(self, *args, **kwargs):
"""
Initialize the OIDC Authentication Backend.
Adds an internal attribute to store the token_info dictionary.
The purpose of `self._token_info` is to not duplicate code from
the original `authenticate` method.
This won't be needed after https://github.com/mozilla/mozilla-django-oidc/pull/377
is merged.
"""
super().__init__(*args, **kwargs)
self._token_info = None
def get_token(self, payload):
"""
Return token object as a dictionary.
Store the value to extract the refresh token in the `authenticate` method.
"""
self._token_info = super().get_token(payload)
return self._token_info
def authenticate(self, request, **kwargs):
"""Authenticates a user based on the OIDC code flow."""
user = super().authenticate(request, **kwargs)
if user is not None:
# Then the user successfully authenticated
store_oidc_refresh_token(
request.session, self._token_info.get("refresh_token")
)
return user
def get_userinfo(self, access_token, id_token, payload):
"""Return user details dictionary.

View File

@@ -0,0 +1,12 @@
"""
Decorators for the authentication app.
We don't want (yet) to enforce the OIDC access token to be "fresh" for all
views, so we provide a decorator to refresh the access token only when needed.
"""
from django.utils.decorators import decorator_from_middleware
from .middleware import RefreshOIDCAccessToken
refresh_oidc_access_token = decorator_from_middleware(RefreshOIDCAccessToken)

View File

@@ -0,0 +1,199 @@
"""
Module to declare a RefreshOIDCAccessToken middleware that extends the
mozilla_django_oidc.middleware.SessionRefresh middleware to refresh the
access token when it expires, based on the OIDC provided refresh token.
This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377
which is still not merged.
"""
import json
import logging
import time
from urllib.parse import quote, urlencode
from django.http import JsonResponse
from django.urls import reverse
from django.utils.crypto import get_random_string
import requests
from mozilla_django_oidc.middleware import SessionRefresh
try:
from mozilla_django_oidc.middleware import ( # pylint: disable=unused-import
RefreshOIDCAccessToken as MozillaRefreshOIDCAccessToken,
)
# If the import is successful, raise an error to notify the user that the
# version of mozilla_django_oidc added the expected middleware, and we don't need
# our implementation anymore.
# See https://github.com/mozilla/mozilla-django-oidc/pull/377
raise RuntimeError("This version of mozilla_django_oidc has RefreshOIDCAccessToken")
except ImportError:
pass
from mozilla_django_oidc.utils import (
absolutify,
add_state_and_verifier_and_nonce_to_session,
import_from_settings,
)
from core.authentication.backends import get_oidc_refresh_token, store_tokens
logger = logging.getLogger(__name__)
class RefreshOIDCAccessToken(SessionRefresh):
"""
A middleware that will refresh the access token following proper OIDC protocol:
https://auth0.com/docs/tokens/refresh-token/current
This is based on https://github.com/mozilla/mozilla-django-oidc/pull/377
but limited to our needs (YAGNI/KISS).
"""
def _prepare_reauthorization(self, request):
"""
Constructs a new authorization grant request to refresh the session.
Besides constructing the request, the state and nonce included in the
request are registered in the current session in preparation for the
client following through with the authorization flow.
"""
auth_url = self.OIDC_OP_AUTHORIZATION_ENDPOINT
client_id = self.OIDC_RP_CLIENT_ID
state = get_random_string(self.OIDC_STATE_SIZE)
# Build the parameters as if we were doing a real auth handoff, except
# we also include prompt=none.
auth_params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": absolutify(
request, reverse(self.OIDC_AUTHENTICATION_CALLBACK_URL)
),
"state": state,
"scope": self.OIDC_RP_SCOPES,
"prompt": "none",
}
if self.OIDC_USE_NONCE:
nonce = get_random_string(self.OIDC_NONCE_SIZE)
auth_params.update({"nonce": nonce})
# Register the one-time parameters in the session
add_state_and_verifier_and_nonce_to_session(request, state, auth_params)
request.session["oidc_login_next"] = request.get_full_path()
query = urlencode(auth_params, quote_via=quote)
return f"{auth_url}?{query}"
def is_expired(self, request):
"""Check whether the access token is expired and needs to be refreshed."""
if not self.is_refreshable_url(request):
logger.debug("request is not refreshable")
return False
expiration = request.session.get("oidc_token_expiration", 0)
now = time.time()
if expiration > now:
# The id_token is still valid, so we don't have to do anything.
logger.debug("id token is still valid (%s > %s)", expiration, now)
return False
return True
def finish(self, request, prompt_reauth=True):
"""Finish request handling and handle sending downstream responses for XHR.
This function should only be run if the session is determind to
be expired.
Almost all XHR request handling in client-side code struggles
with redirects since redirecting to a page where the user
is supposed to do something is extremely unlikely to work
in an XHR request. Make a special response for these kinds
of requests.
The use of 403 Forbidden is to match the fact that this
middleware doesn't really want the user in if they don't
refresh their session.
WARNING: this varies from the original implementation:
- to return a 401 status code
- to consider all requests as XHR requests
"""
xhr_response_json = {"error": "the authentication session has expired"}
if prompt_reauth:
# The id_token has expired, so we have to re-authenticate silently.
refresh_url = self._prepare_reauthorization(request)
xhr_response_json["refresh_url"] = refresh_url
xhr_response = JsonResponse(xhr_response_json, status=401)
if "refresh_url" in xhr_response_json:
xhr_response["refresh_url"] = xhr_response_json["refresh_url"]
return xhr_response
def process_request(self, request): # noqa: PLR0911 # pylint: disable=too-many-return-statements
"""Process the request and refresh the access token if necessary."""
if not self.is_expired(request):
return None
token_url = self.get_settings("OIDC_OP_TOKEN_ENDPOINT")
client_id = self.get_settings("OIDC_RP_CLIENT_ID")
client_secret = self.get_settings("OIDC_RP_CLIENT_SECRET")
refresh_token = get_oidc_refresh_token(request.session)
if not refresh_token:
logger.debug("no refresh token stored")
return self.finish(request, prompt_reauth=True)
token_payload = {
"grant_type": "refresh_token",
"client_id": client_id,
"client_secret": client_secret,
"refresh_token": refresh_token,
}
req_auth = None
if self.get_settings("OIDC_TOKEN_USE_BASIC_AUTH", False):
# supported in https://github.com/mozilla/mozilla-django-oidc/pull/377
# but we don't need it, so enforce error here.
raise RuntimeError("OIDC_TOKEN_USE_BASIC_AUTH is not supported")
try:
response = requests.post(
token_url,
auth=req_auth,
data=token_payload,
verify=import_from_settings("OIDC_VERIFY_SSL", True),
timeout=import_from_settings("OIDC_TIMEOUT", 3),
)
response.raise_for_status()
token_info = response.json()
except requests.exceptions.Timeout:
logger.debug("timed out refreshing access token")
# Don't prompt for reauth as this could be a temporary problem
return self.finish(request, prompt_reauth=False)
except requests.exceptions.HTTPError as exc:
status_code = exc.response.status_code
logger.debug("http error %s when refreshing access token", status_code)
# OAuth error response will be a 400 for various situations, including
# an expired token. https://datatracker.ietf.org/doc/html/rfc6749#section-5.2
return self.finish(request, prompt_reauth=status_code == 400)
except json.JSONDecodeError:
logger.debug("malformed response when refreshing access token")
# Don't prompt for reauth as this could be a temporary problem
return self.finish(request, prompt_reauth=False)
except Exception as exc: # pylint: disable=broad-except
logger.exception(
"unknown error occurred when refreshing access token: %s", exc
)
# Don't prompt for reauth as this could be a temporary problem
return self.finish(request, prompt_reauth=False)
# Until we can properly validate an ID token on the refresh response
# per the spec[1], we intentionally drop the id_token.
# [1]: https://openid.net/specs/openid-connect-core-1_0.html#RefreshTokenResponse
id_token = None
access_token = token_info.get("access_token")
refresh_token = token_info.get("refresh_token")
store_tokens(request.session, access_token, id_token, refresh_token)
return None

View File

@@ -10,14 +10,37 @@ from django.test.utils import override_settings
import pytest
import responses
from cryptography.fernet import Fernet
from core import models
from core.authentication.backends import OIDCAuthenticationBackend
from core.authentication.backends import (
OIDCAuthenticationBackend,
get_oidc_refresh_token,
store_oidc_refresh_token,
)
from core.factories import UserFactory
pytestmark = pytest.mark.django_db
def test_oidc_refresh_token_session_store(settings):
"""Test that the OIDC refresh token is stored and retrieved from the session."""
session = {}
with pytest.raises(
ValueError, match="OIDC_STORE_REFRESH_TOKEN_KEY setting is required."
):
store_oidc_refresh_token(session, "test-refresh-token")
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
store_oidc_refresh_token(session, "test-refresh-token")
assert session["oidc_refresh_token"] is not None
assert session["oidc_refresh_token"] != "test-refresh-token"
assert get_oidc_refresh_token(session) == "test-refresh-token"
def test_authentication_getter_existing_user_no_email(
django_assert_num_queries, monkeypatch
):
@@ -547,3 +570,56 @@ def test_authentication_verify_claims_success(django_assert_num_queries, monkeyp
assert user.full_name == "Doe"
assert user.short_name is None
assert user.email == "john.doe@example.com"
@responses.activate
def test_authentication_session_tokens(
django_assert_num_queries, monkeypatch, rf, settings
):
"""
Test that the session contains oidc_refresh_token and oidc_access_token after authentication.
"""
settings.OIDC_OP_TOKEN_ENDPOINT = "http://oidc.endpoint.test/token"
settings.OIDC_OP_USER_ENDPOINT = "http://oidc.endpoint.test/userinfo"
settings.OIDC_OP_JWKS_ENDPOINT = "http://oidc.endpoint.test/jwks"
settings.OIDC_STORE_ACCESS_TOKEN = True
settings.OIDC_STORE_REFRESH_TOKEN = True
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
klass = OIDCAuthenticationBackend()
request = rf.get("/some-url", {"state": "test-state", "code": "test-code"})
request.session = {}
def verify_token_mocked(*args, **kwargs):
return {"sub": "123", "email": "test@example.com"}
monkeypatch.setattr(OIDCAuthenticationBackend, "verify_token", verify_token_mocked)
responses.add(
responses.POST,
re.compile(settings.OIDC_OP_TOKEN_ENDPOINT),
json={
"access_token": "test-access-token",
"refresh_token": "test-refresh-token",
},
status=200,
)
responses.add(
responses.GET,
re.compile(settings.OIDC_OP_USER_ENDPOINT),
json={"sub": "123", "email": "test@example.com"},
status=200,
)
with django_assert_num_queries(6):
user = klass.authenticate(
request,
code="test-code",
nonce="test-nonce",
code_verifier="test-code-verifier",
)
assert user is not None
assert request.session["oidc_access_token"] == "test-access-token"
assert get_oidc_refresh_token(request.session) == "test-refresh-token"

View File

@@ -0,0 +1,55 @@
"""Tests for the refresh_oidc_access_token decorator in core app."""
from unittest.mock import patch
from django.http import HttpResponse
from django.test import RequestFactory
from django.utils.decorators import method_decorator
from django.views import View
from core.authentication.decorators import refresh_oidc_access_token
class RefreshOIDCAccessTokenView(View):
"""
A Django view that uses the refresh_oidc_access_token decorator to refresh
the OIDC access token before processing the request.
"""
@method_decorator(refresh_oidc_access_token)
def dispatch(self, request, *args, **kwargs):
"""
Overrides the dispatch method to apply the refresh_oidc_access_token decorator.
"""
return super().dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
"""
Handles GET requests.
Returns:
HttpResponse: A simple HTTP response with "OK" as the content.
"""
return HttpResponse("OK")
def test_refresh_oidc_access_token_decorator():
"""
Tests the refresh_oidc_access_token decorator is called on RefreshOIDCAccessTokenView access.
The test creates a mock request and patches the dispatch method to verify that it is called
with the correct request object.
"""
# Create a test request
factory = RequestFactory()
request = factory.get("/")
# Mock the OIDC refresh functionality
with patch(
"core.authentication.middleware.RefreshOIDCAccessToken.process_request"
) as mock_refresh:
# Call the decorated view
RefreshOIDCAccessTokenView.as_view()(request)
# Assert that the refresh method was called
mock_refresh.assert_called_once_with(request)

View File

@@ -0,0 +1,327 @@
"""Tests for the RefreshOIDCAccessToken middleware."""
import time
from unittest.mock import MagicMock
from django.contrib.auth.models import AnonymousUser
from django.contrib.sessions.middleware import SessionMiddleware
from django.http import HttpResponse, JsonResponse
from django.test import RequestFactory
import pytest
import requests.exceptions
import responses
from cryptography.fernet import Fernet
from core import factories
from core.authentication.backends import (
get_cipher_suite,
get_oidc_refresh_token,
store_oidc_refresh_token,
)
from core.authentication.middleware import RefreshOIDCAccessToken
pytestmark = pytest.mark.django_db
@pytest.fixture(name="oidc_settings")
def fixture_oidc_settings(settings):
"""Fixture to configure OIDC settings for the tests."""
settings.OIDC_OP_TOKEN_ENDPOINT = "https://auth.example.com/token"
settings.OIDC_OP_AUTHORIZATION_ENDPOINT = "https://auth.example.com/authorize"
settings.OIDC_RP_CLIENT_ID = "client_id"
settings.OIDC_RP_CLIENT_SECRET = "client_secret"
settings.OIDC_AUTHENTICATION_CALLBACK_URL = "oidc_authentication_callback"
settings.OIDC_RP_SCOPES = "openid email"
settings.OIDC_USE_NONCE = True
settings.OIDC_STATE_SIZE = 32
settings.OIDC_NONCE_SIZE = 32
settings.OIDC_VERIFY_SSL = True
settings.OIDC_TOKEN_USE_BASIC_AUTH = False
settings.OIDC_STORE_ACCESS_TOKEN = True
settings.OIDC_STORE_REFRESH_TOKEN = True
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
get_cipher_suite.cache_clear()
yield settings
get_cipher_suite.cache_clear()
def test_anonymous_user(oidc_settings): # pylint: disable=unused-argument
"""
When the user is not authenticated, this
is not the purpose of the middleware to manage anything.
"""
request = RequestFactory().get("/test")
request.user = AnonymousUser()
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert response is None
def test_no_refresh_token(oidc_settings): # pylint: disable=unused-argument
"""
When the session does not contain a refresh token,
the middleware should return a 401 response containing
the URL to authenticate again.
"""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = ("expired_token",)
request.session["oidc_token_expiration"] = time.time() - 100
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert isinstance(response, JsonResponse)
assert response.status_code == 401
assert response.has_header("refresh_url")
assert response["refresh_url"].startswith("https://auth.example.com/authorize")
def test_basic_auth_disabled(oidc_settings): # pylint: disable=unused-argument
"""We don't support OIDC_TOKEN_USE_BASIC_AUTH"""
oidc_settings.OIDC_TOKEN_USE_BASIC_AUTH = True
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
middleware = RefreshOIDCAccessToken(get_response)
with pytest.raises(RuntimeError) as excinfo:
middleware.process_request(request)
assert str(excinfo.value) == "OIDC_TOKEN_USE_BASIC_AUTH is not supported"
@responses.activate
def test_successful_token_refresh(oidc_settings): # pylint: disable=unused-argument
"""Test that the middleware successfully refreshes the token."""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
responses.add(
responses.POST,
"https://auth.example.com/token",
json={"access_token": "new_token", "refresh_token": "new_refresh_token"},
status=200,
)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
request.session.save()
assert response is None
assert request.session["oidc_access_token"] == "new_token"
assert get_oidc_refresh_token(request.session) == "new_refresh_token"
def test_non_expired_token(oidc_settings): # pylint: disable=unused-argument
"""Test that the middleware does nothing when the token is not expired."""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = ("valid_token",)
request.session["oidc_token_expiration"] = time.time() + 3600
request.session.save()
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert response is None
@responses.activate
def test_refresh_token_request_timeout(oidc_settings): # pylint: disable=unused-argument
"""Test that the middleware returns a 401 response when the token refresh request times out."""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
responses.add(
responses.POST,
"https://auth.example.com/token",
body=requests.exceptions.Timeout("timeout"),
)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert isinstance(response, HttpResponse)
assert response.status_code == 401
assert not response.has_header("refresh_url")
@responses.activate
def test_refresh_token_request_error_400(oidc_settings): # pylint: disable=unused-argument
"""
Test that the middleware returns a 401 response when the token
refresh request returns a 400 error.
"""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
responses.add(
responses.POST,
"https://auth.example.com/token",
json={"error": "invalid_grant"},
status=400,
)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert isinstance(response, HttpResponse)
assert response.status_code == 401
assert response.has_header("refresh_url")
assert response["refresh_url"].startswith("https://auth.example.com/authorize")
@responses.activate
def test_refresh_token_request_error(oidc_settings): # pylint: disable=unused-argument
"""
Test that the middleware returns a 401 response when
the token refresh request returns a 404 error.
"""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
responses.add(
responses.POST,
"https://auth.example.com/token",
json={"error": "invalid_grant"},
status=404,
)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert isinstance(response, HttpResponse)
assert response.status_code == 401
assert not response.has_header("refresh_url")
@responses.activate
def test_refresh_token_request_malformed_json_error(oidc_settings): # pylint: disable=unused-argument
"""
Test that the middleware returns a 401 response
when the token refresh request returns malformed JSON.
"""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
responses.add(
responses.POST,
"https://auth.example.com/token",
body="malformed json",
status=200,
)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert isinstance(response, HttpResponse)
assert response.status_code == 401
assert not response.has_header("refresh_url")
@responses.activate
def test_refresh_token_request_exception(oidc_settings): # pylint: disable=unused-argument
"""
Test that the middleware returns a 401 response
when the token refresh request raises an exception.
"""
user = factories.UserFactory()
request = RequestFactory().get("/test")
request.user = user
get_response = MagicMock()
session_middleware = SessionMiddleware(get_response)
session_middleware.process_request(request)
request.session["oidc_access_token"] = "old_token"
store_oidc_refresh_token(request.session, "refresh_token")
request.session["oidc_token_expiration"] = time.time() - 100
request.session.save()
responses.add(
responses.POST,
"https://auth.example.com/token",
body={"error": "invalid_grant"}, # invalid format dict
status=200,
)
middleware = RefreshOIDCAccessToken(get_response)
response = middleware.process_request(request)
assert isinstance(response, HttpResponse)
assert response.status_code == 401
assert not response.has_header("refresh_url")

View File

@@ -487,6 +487,17 @@ class Base(Configuration):
environ_name="OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION",
environ_prefix=None,
)
OIDC_STORE_ACCESS_TOKEN = values.BooleanValue(
default=True, environ_name="OIDC_STORE_ACCESS_TOKEN", environ_prefix=None
)
OIDC_STORE_REFRESH_TOKEN = values.BooleanValue(
default=True, environ_name="OIDC_STORE_REFRESH_TOKEN", environ_prefix=None
)
OIDC_STORE_REFRESH_TOKEN_KEY = values.Value(
default=None,
environ_name="OIDC_STORE_REFRESH_TOKEN_KEY",
environ_prefix=None,
)
# WARNING: Enabling this setting allows multiple user accounts to share the same email
# address. This may cause security issues and is not recommended for production use when