Compare commits

...

39 Commits

Author SHA1 Message Date
Marcelo Elizeche Landó
65c7f784b3 move notification logic to signals 2025-07-23 04:37:41 -03:00
Marcelo Elizeche Landó
e5acaa2603 fix merge conflicts 2025-07-23 04:27:56 -03:00
Marcelo Elizeche Landó
bb2ef38ecd fix linting 2025-07-23 03:55:39 -03:00
Marcelo Elizeche Landó
5364c49068 add issuer as parameter, fix tests 2025-07-23 03:53:20 -03:00
Marcelo Elizeche Landó
8ba469496c Trigger Build 2025-07-17 11:08:10 -03:00
Marcelo Elizeche Landó
2bdcadda44 Unify spelling of Back-Channel 2025-07-16 19:44:45 -03:00
Marcelo Elizeche Landó
6ff697de46 Unify spelling of Back-Channel 2025-07-16 19:43:23 -03:00
Marcelo Elizeche Landó
7594e2ba56 Merge branch 'oauth_backchannel_logout' of github.com:goauthentik/authentik into oauth_backchannel_logout 2025-07-16 19:37:37 -03:00
Marcelo Elizeche Landó
66587bbe69 use select_related in tasks 2025-07-16 19:37:14 -03:00
Marcelo Elizeche Landó
3711c6ede4 Merge branch 'main' into oauth_backchannel_logout 2025-07-16 19:29:50 -03:00
Marcelo Elizeche Landó
b422714a0b Remove unused url 2025-07-16 19:29:16 -03:00
Marcelo Elizeche Landó
158ee1d7cf Apply suggestions from code review
Co-authored-by: Dewi Roberts <dewi@goauthentik.io>
Signed-off-by: Marcelo Elizeche Landó <marce@melizeche.com>
2025-07-17 00:21:45 +02:00
Marcelo Elizeche Landó
13afc01e21 Apply suggestions from code review
Co-authored-by: Dewi Roberts <dewi@goauthentik.io>
Signed-off-by: Marcelo Elizeche Landó <marce@melizeche.com>
2025-07-16 16:59:38 +02:00
Marcelo Elizeche Landó
ec90666c9d add #nosec where needed 2025-07-15 16:43:27 -03:00
Marcelo Elizeche Landó
04cda27496 add #nosec where needed 2025-07-15 16:36:04 -03:00
Marcelo Elizeche Landó
40575c7c71 refactor tests 2025-07-15 16:25:47 -03:00
Marcelo Elizeche Landó
3b37221fd8 lint fix 2025-07-15 15:57:10 -03:00
Marcelo Elizeche Landó
0dc8062611 improve test coverage 2025-07-15 15:44:30 -03:00
Marcelo Elizeche Landó
b81dc63ad6 fix new way to import dependencies in the UI 2025-07-14 18:57:40 -03:00
Marcelo Elizeche Landó
d0534ca55f fix merge conflicts 2025-07-14 18:45:31 -03:00
Marcelo Elizeche Landó
1f1e55a36d Add backchannel_logout_session_supported key 2025-07-14 18:38:15 -03:00
Marcelo Elizeche Landó
8989593356 Add logic to terminate sessions 2025-07-08 20:08:47 -03:00
Marcelo Elizeche Landó
eaf4e46eb6 Make backchannel URIs optional in the UI 2025-07-08 13:54:33 -03:00
Marcelo Elizeche Landó
1b401668ec Add logic to logout all sessions not the ones with refresh tokens 2025-07-08 13:37:20 -03:00
Marcelo Elizeche Landó
bc10f869d2 Add backchannel URIs to the view UI 2025-07-07 22:15:56 -03:00
Marcelo Elizeche Landó
d217f2f1fe Add Backchannel Logout URIs field in the UI 2025-07-04 04:03:58 -03:00
Marcelo Elizeche Landó
6bbebbac4c fix migration 2025-07-04 03:52:46 -03:00
Marcelo Elizeche Landó
345756024e Add schemas 2025-07-04 03:45:08 -03:00
Marcelo Elizeche Landó
e7a404c4d3 better session lookup 2025-07-04 03:33:50 -03:00
Marcelo Elizeche Landó
4244263c29 Fix test 2025-07-04 03:11:06 -03:00
Marcelo Elizeche Landó
e3c8e613fb Merge branch 'main' into oauth_backchannel_logout 2025-07-04 02:59:59 -03:00
Marcelo Elizeche Landó
0b32da1e48 Remove fallback for redirect URIs 2025-07-04 02:59:21 -03:00
Marcelo Elizeche Landó
31b313d937 Add tests 2025-07-04 02:36:24 -03:00
Marcelo Elizeche Landó
f374fb5651 Add API support 2025-07-04 02:34:43 -03:00
Marcelo Elizeche Landó
bc8556c6b4 Fix IDtoken import in tests 2025-07-04 01:46:38 -03:00
Marcelo Elizeche Landó
effb3ff910 fix linting 2025-07-04 01:26:01 -03:00
Marcelo Elizeche Landó
7e23a93846 Create notification tasks, add notification trigger to flow, add view 2025-07-04 01:24:33 -03:00
Marcelo Elizeche Landó
5ea8bbf269 Add urls, create_logout_token, and migration 2025-07-04 01:16:58 -03:00
Marcelo Elizeche Landó
014e22c3d0 Add _backchannel_logout_uris field to OAuth2Provider, fix circular imports 2025-07-04 01:07:13 -03:00
22 changed files with 1370 additions and 34 deletions

View File

@@ -301,6 +301,7 @@ class SessionEndStage(ChallengeStageView):
"flow_slug": self.request.brand.flow_invalidation.slug,
},
)
return SessionEndChallenge(data=data)
# This can never be reached since this challenge is created on demand and only the

View File

@@ -43,6 +43,9 @@ class OAuth2ProviderSerializer(ProviderSerializer):
"""OAuth2Provider Serializer"""
redirect_uris = RedirectURISerializer(many=True, source="_redirect_uris")
backchannel_logout_uris = RedirectURISerializer(
many=True, source="_backchannel_logout_uris", required=False
)
def validate_redirect_uris(self, data: list) -> list:
for entry in data:
@@ -56,6 +59,18 @@ class OAuth2ProviderSerializer(ProviderSerializer):
) from None
return data
def validate_backchannel_logout_uris(self, data: list) -> list:
for entry in data:
if entry.get("matching_mode") == RedirectURIMatchingMode.REGEX:
url = entry.get("url")
try:
compile(url)
except RegexError:
raise ValidationError(
_("Invalid Regex Pattern: {url}".format(url=url))
) from None
return data
class Meta:
model = OAuth2Provider
fields = ProviderSerializer.Meta.fields + [
@@ -70,6 +85,7 @@ class OAuth2ProviderSerializer(ProviderSerializer):
"signing_key",
"encryption_key",
"redirect_uris",
"backchannel_logout_uris",
"sub_mode",
"property_mappings",
"issuer_mode",

View File

@@ -1,5 +1,8 @@
"""OAuth/OpenID Constants"""
from django.db import models
from django.utils.translation import gettext_lazy as _
GRANT_TYPE_AUTHORIZATION_CODE = "authorization_code"
GRANT_TYPE_IMPLICIT = "implicit"
GRANT_TYPE_REFRESH_TOKEN = "refresh_token" # nosec
@@ -51,3 +54,23 @@ AMR_MFA = "mfa"
AMR_OTP = "otp"
AMR_WEBAUTHN = "user"
AMR_SMART_CARD = "sc"
class SubModes(models.TextChoices):
"""Mode after which 'sub' attribute is generated, for compatibility reasons"""
HASHED_USER_ID = "hashed_user_id", _("Based on the Hashed User ID")
USER_ID = "user_id", _("Based on user ID")
USER_UUID = "user_uuid", _("Based on user UUID")
USER_USERNAME = "user_username", _("Based on the username")
USER_EMAIL = (
"user_email",
_("Based on the User's Email. This is recommended over the UPN method."),
)
USER_UPN = (
"user_upn",
_(
"Based on the User's UPN, only works if user has a 'upn' attribute set. "
"Use this method only if you have different UPN and Mail domains."
),
)

View File

@@ -4,10 +4,8 @@ from dataclasses import asdict, dataclass, field
from hashlib import sha256
from typing import TYPE_CHECKING, Any
from django.db import models
from django.http import HttpRequest
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from authentik.core.models import default_token_duration
from authentik.events.signals import get_login_event
@@ -18,6 +16,7 @@ from authentik.providers.oauth2.constants import (
AMR_PASSWORD,
AMR_SMART_CARD,
AMR_WEBAUTHN,
SubModes,
)
from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
@@ -30,26 +29,6 @@ def hash_session_key(session_key: str) -> str:
return sha256(session_key.encode("ascii")).hexdigest()
class SubModes(models.TextChoices):
"""Mode after which 'sub' attribute is generated, for compatibility reasons"""
HASHED_USER_ID = "hashed_user_id", _("Based on the Hashed User ID")
USER_ID = "user_id", _("Based on user ID")
USER_UUID = "user_uuid", _("Based on user UUID")
USER_USERNAME = "user_username", _("Based on the username")
USER_EMAIL = (
"user_email",
_("Based on the User's Email. This is recommended over the UPN method."),
)
USER_UPN = (
"user_upn",
_(
"Based on the User's UPN, only works if user has a 'upn' attribute set. "
"Use this method only if you have different UPN and Mail domains."
),
)
@dataclass(slots=True)
class IDToken:
"""The primary extension that OpenID Connect makes to OAuth 2.0 to enable End-Users to be

View File

@@ -0,0 +1,23 @@
# Generated by Django 5.1.11 on 2025-07-04 03:23
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_providers_oauth2", "0028_migrate_session"),
]
operations = [
migrations.AddField(
model_name="oauth2provider",
name="_backchannel_logout_uris",
field=models.JSONField(default=list, verbose_name="Back-Channel Logout URIs"),
),
migrations.AlterField(
model_name="oauth2provider",
name="_redirect_uris",
field=models.JSONField(default=list, verbose_name="Redirect URIs"),
),
]

View File

@@ -6,7 +6,7 @@ import json
from dataclasses import asdict, dataclass
from functools import cached_property
from hashlib import sha256
from typing import Any
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse, urlunparse
from cryptography.hazmat.primitives.asymmetric.ec import (
@@ -44,9 +44,12 @@ from authentik.crypto.models import CertificateKeyPair
from authentik.lib.generators import generate_code_fixed_length, generate_id, generate_key
from authentik.lib.models import SerializerModel
from authentik.lib.utils.time import timedelta_string_validator
from authentik.providers.oauth2.id_token import IDToken, SubModes
from authentik.providers.oauth2.constants import SubModes
from authentik.sources.oauth.models import OAuthSource
if TYPE_CHECKING:
from authentik.providers.oauth2.id_token import IDToken
LOGGER = get_logger()
@@ -193,9 +196,13 @@ class OAuth2Provider(WebfingerProvider, Provider):
default=generate_client_secret,
)
_redirect_uris = models.JSONField(
default=dict,
default=list,
verbose_name=_("Redirect URIs"),
)
_backchannel_logout_uris = models.JSONField(
default=list,
verbose_name=_("Back-Channel Logout URIs"),
)
include_claims_in_id_token = models.BooleanField(
default=True,
@@ -321,6 +328,28 @@ class OAuth2Provider(WebfingerProvider, Provider):
cleansed.append(asdict(entry))
self._redirect_uris = cleansed
@property
def backchannel_logout_uris(self) -> list[RedirectURI]:
"""Get back-channel logout URIs"""
uris = []
for entry in self._backchannel_logout_uris:
uris.append(
from_dict(
RedirectURI,
entry,
config=Config(type_hooks={RedirectURIMatchingMode: RedirectURIMatchingMode}),
)
)
return uris
@backchannel_logout_uris.setter
def backchannel_logout_uris(self, value: list[RedirectURI]):
"""Set back-channel logout URIs"""
cleansed = []
for entry in value:
cleansed.append(asdict(entry))
self._backchannel_logout_uris = cleansed
@property
def launch_url(self) -> str | None:
"""Guess launch_url based on first redirect_uri"""
@@ -480,13 +509,15 @@ class AccessToken(SerializerModel, ExpiringModel, BaseGrantModel):
return f"Access Token for {self.provider_id} for user {self.user_id}"
@property
def id_token(self) -> IDToken:
def id_token(self) -> "IDToken":
"""Load ID Token from json"""
from authentik.providers.oauth2.id_token import IDToken
raw_token = json.loads(self._id_token)
return from_dict(IDToken, raw_token)
@id_token.setter
def id_token(self, value: IDToken):
def id_token(self, value: "IDToken"):
self.token = value.to_access_token(self.provider)
self._id_token = json.dumps(asdict(value))
@@ -531,13 +562,15 @@ class RefreshToken(SerializerModel, ExpiringModel, BaseGrantModel):
return f"Refresh Token for {self.provider_id} for user {self.user_id}"
@property
def id_token(self) -> IDToken:
def id_token(self) -> "IDToken":
"""Load ID Token from json"""
from authentik.providers.oauth2.id_token import IDToken
raw_token = json.loads(self._id_token)
return from_dict(IDToken, raw_token)
@id_token.setter
def id_token(self, value: IDToken):
def id_token(self, value: "IDToken"):
self._id_token = json.dumps(asdict(value))
@property

View File

@@ -1,8 +1,12 @@
from django.db.models.signals import post_save, pre_delete
from django.dispatch import receiver
from structlog.stdlib import get_logger
from authentik.core.models import AuthenticatedSession, User
from authentik.providers.oauth2.models import AccessToken, DeviceToken, RefreshToken
from authentik.providers.oauth2.tasks import send_backchannel_logout_notification
LOGGER = get_logger()
@receiver(pre_delete, sender=AuthenticatedSession)
@@ -14,6 +18,21 @@ def user_session_deleted_oauth_tokens_removal(sender, instance: AuthenticatedSes
).delete()
@receiver(pre_delete, sender=AuthenticatedSession)
def user_session_deleted_backchannel_logout(sender, instance: AuthenticatedSession, **_):
"""Send back-channel logout notifications upon session deletion"""
try:
send_backchannel_logout_notification(session=instance)
except Exception as exc:
# Log the error but don't fail the session deletion process
LOGGER.warning(
"Failed to send back-channel logout notifications",
user=instance.user.username,
session_key=instance.session.session_key,
error=str(exc),
)
@receiver(post_save, sender=User)
def user_deactivated(sender, instance: User, **_):
"""Remove user tokens when deactivated"""

View File

@@ -0,0 +1,143 @@
"""OAuth2 Provider Tasks"""
from structlog.stdlib import get_logger
from authentik.core.models import AuthenticatedSession
from authentik.events.models import Event
from authentik.lib.utils.http import get_http_session
from authentik.providers.oauth2.models import OAuth2Provider
from authentik.providers.oauth2.utils import create_logout_token
from authentik.root.celery import CELERY_APP
LOGGER = get_logger()
@CELERY_APP.task()
def send_backchannel_logout_request(
provider_pk: int, iss: str, session_id: str = None, sub: str = None
) -> bool:
"""Send a back-channel logout request to the registered client
Args:
provider_pk: The OAuth2 provider's primary key
session_id: The session ID to include in the logout token
sub: The subject identifier to include in the logout token
Returns:
bool: True if the request was successful, False otherwise
"""
if not session_id and not sub:
LOGGER.warning("No session_id or sub provided for back-channel logout")
return False
try:
provider = OAuth2Provider.objects.get(pk=provider_pk)
except OAuth2Provider.DoesNotExist:
LOGGER.warning("Provider not found", provider_pk=provider_pk)
return False
# Generate the logout token
try:
logout_token = create_logout_token(iss, provider, session_id, sub)
except Exception as exc:
LOGGER.warning("Failed to create logout token", exc=exc)
return False
# Get the back-channel logout URI from the provider's dedicated backchannel_logout_uris field
# Back-channel logout requires explicit configuration - no fallback to redirect URIs
backchannel_logout_uri = None
# Check if provider has dedicated backchannel logout URIs configured
if provider.backchannel_logout_uris:
# Use the first configured backchannel logout URI
# In the future, we could implement logic to select based on criteria
backchannel_logout_uri = provider.backchannel_logout_uris[0].url
if not backchannel_logout_uri:
LOGGER.warning(
"No back-channel logout URI found for provider",
provider=provider.name,
client_id=provider.client_id,
)
return False
# Send the back-channel logout request
try:
response = get_http_session().post(
backchannel_logout_uri,
data={"logout_token": logout_token},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
response.raise_for_status()
# HTTP 200 OK is the expected response for successful back-channel logout
HTTP_OK = 200
if response.status_code == HTTP_OK:
LOGGER.info(
"Back-channel logout successful",
provider=provider.name,
client_id=provider.client_id,
session_id=session_id,
sub=sub,
)
Event.new(
"backchannel_logout",
message="Back-channel logout notification sent",
provider=provider,
client_id=provider.client_id,
session_id=session_id,
sub=sub,
).save()
return True
LOGGER.warning(
"Back-channel logout failed",
provider=provider.name,
client_id=provider.client_id,
status_code=response.status_code,
response=response.text,
)
return False
except Exception as exc:
LOGGER.warning(
"Error sending back-channel logout request",
provider=provider.name,
client_id=provider.client_id,
exc=exc,
)
return False
def send_backchannel_logout_notification(session: AuthenticatedSession = None) -> None:
"""Send back-channel logout notifications to all relevant OAuth2 providers
This function should be called when a user's session is terminated.
Args:
session: The authenticated session that was terminated
"""
if not session:
LOGGER.warning("No session provided for back-channel logout notification")
return
# Get all OAuth2 providers that have issued tokens for this user
# Per OpenID Connect Back-Channel Logout 1.0 spec section 2.3:
# "OPs supporting back-channel logout need to keep track of the set of logged-in RPs"
# This includes ALL flows: authorization code, implicit, hybrid - not just refresh tokens
# Refresh tokens issued without the offline_access property to a session being logged out
# SHOULD be revoked. Refresh tokens issued with the offline_access property
# normally SHOULD NOT be revoked.
from authentik.providers.oauth2.models import AccessToken
# Get providers from access tokens (covers all OAuth2 flows)
access_tokens = AccessToken.objects.select_related("provider").filter(session=session)
for token in access_tokens:
# Send back-channel logout notifications to all tokens
# for provider_pk in provider_pks:
send_backchannel_logout_request.delay(
provider_pk=token.provider.pk,
iss=token.id_token.iss,
session_id=session.session.session_key,
sub=session.user.uid,
)

View File

@@ -81,4 +81,66 @@ class TestAPI(APITestCase):
},
)
self.assertJSONEqual(response.content, {"redirect_uris": ["Invalid Regex Pattern: **"]})
self.assertEqual(response.status_code, 400)
def test_backchannel_logout_uris_validation(self):
"""Test backchannel_logout_uris API validation"""
response = self.client.post(
reverse("authentik_api:oauth2provider-list"),
data={
"name": generate_id(),
"authorization_flow": create_test_flow().pk,
"invalidation_flow": create_test_flow().pk,
"redirect_uris": [
{"matching_mode": "strict", "url": "http://goauthentik.io"},
],
"backchannel_logout_uris": [
{"matching_mode": "strict", "url": "http://goauthentik.io/logout"},
{"matching_mode": "regex", "url": "**"},
],
},
)
self.assertJSONEqual(
response.content, {"backchannel_logout_uris": ["Invalid Regex Pattern: **"]}
)
def test_backchannel_logout_uris_create_and_retrieve(self):
"""Test creating and retrieving provider with backchannel logout URIs"""
response = self.client.post(
reverse("authentik_api:oauth2provider-list"),
data={
"name": generate_id(),
"authorization_flow": create_test_flow().pk,
"invalidation_flow": create_test_flow().pk,
"redirect_uris": [
{"matching_mode": "strict", "url": "http://goauthentik.io"},
],
"backchannel_logout_uris": [
{"matching_mode": "strict", "url": "http://goauthentik.io/logout"},
{"matching_mode": "regex", "url": r"http://.*\.example\.com/logout"},
],
},
)
self.assertEqual(response.status_code, 201)
provider_data = loads(response.content.decode())
# Verify the backchannel logout URIs were saved correctly
self.assertEqual(len(provider_data["backchannel_logout_uris"]), 2)
self.assertEqual(
provider_data["backchannel_logout_uris"][0]["url"], "http://goauthentik.io/logout"
)
self.assertEqual(provider_data["backchannel_logout_uris"][0]["matching_mode"], "strict")
self.assertEqual(
provider_data["backchannel_logout_uris"][1]["url"], r"http://.*\.example\.com/logout"
)
self.assertEqual(provider_data["backchannel_logout_uris"][1]["matching_mode"], "regex")
# Test retrieving the provider
response = self.client.get(
reverse("authentik_api:oauth2provider-detail", kwargs={"pk": provider_data["pk"]})
)
self.assertEqual(response.status_code, 200)
retrieved_data = loads(response.content.decode())
self.assertEqual(len(retrieved_data["backchannel_logout_uris"]), 2)
self.assertEqual(
retrieved_data["backchannel_logout_uris"][0]["url"], "http://goauthentik.io/logout"
)

View File

@@ -0,0 +1,572 @@
"""Test OAuth2 Back-Channel Logout implementation"""
import json
import uuid
from time import time
from unittest.mock import Mock, patch
import jwt
from django.test import RequestFactory
from django.utils import timezone
from requests import Response
from authentik.core.models import Application, AuthenticatedSession, Session, User
from authentik.core.tests.utils import create_test_admin_user, create_test_flow
from authentik.events.models import Event
from authentik.lib.generators import generate_id
from authentik.providers.oauth2.models import (
AccessToken,
OAuth2Provider,
RedirectURI,
RedirectURIMatchingMode,
RefreshToken,
)
from authentik.providers.oauth2.tasks import (
send_backchannel_logout_notification,
send_backchannel_logout_request,
)
from authentik.providers.oauth2.tests.utils import OAuthTestCase
from authentik.providers.oauth2.views.backchannel_logout import BackChannelLogoutView
class TestBackChannelLogout(OAuthTestCase):
"""Test Back-Channel Logout functionality"""
def setUp(self) -> None:
super().setUp()
self.factory = RequestFactory()
self.user = create_test_admin_user()
self.app = Application.objects.create(name=generate_id(), slug="test-app")
self.provider = OAuth2Provider.objects.create(
name=generate_id(),
authorization_flow=create_test_flow(),
redirect_uris=[
RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"),
],
signing_key=self.keypair,
)
self.app.provider = self.provider
self.app.save()
def _create_session(self, session_key=None):
"""Create a session with the given key or a generated one"""
session_key = session_key or f"session-{generate_id()}"
session = Session.objects.create(
session_key=session_key,
expires=timezone.now() + timezone.timedelta(hours=1),
last_ip="255.255.255.255",
)
auth_session = AuthenticatedSession.objects.create(
session=session,
user=self.user,
)
return auth_session
def _create_token(
self, provider, user, session=None, token_type="access", token_id=None
): # nosec
"""Create a token of the specified type"""
token_id = token_id or f"{token_type}-token-{generate_id()}"
kwargs = {
"provider": provider,
"user": user,
"session": session,
"token": token_id,
"_id_token": "{}",
"auth_time": timezone.now(),
}
if token_type == "access": # nosec
return AccessToken.objects.create(**kwargs)
else: # refresh
return RefreshToken.objects.create(**kwargs)
def _create_provider(self, name=None):
"""Create an OAuth2 provider"""
name = name or f"provider-{generate_id()}"
provider = OAuth2Provider.objects.create(
name=name,
authorization_flow=create_test_flow(),
redirect_uris=[
RedirectURI(RedirectURIMatchingMode.STRICT, f"http://{name}/callback"),
],
signing_key=self.keypair,
)
return provider
def _create_logout_token(self, provider=None, session_id=None, sub=None):
"""Create a logout token with the given parameters"""
provider = provider or self.provider
# Create a token with the same issuer that the view will expect
# Use the same request object that will be used in the test
request = self.factory.post("/backchannel_logout")
# Create the logout token payload
payload = {
"iss": provider.get_issuer(request),
"aud": provider.client_id,
"iat": int(time()),
"jti": str(uuid.uuid4()),
"events": {
"http://schemas.openid.net/event/backchannel-logout": {},
},
}
# Add either sub or sid (or both)
if sub:
payload["sub"] = sub
if session_id:
payload["sid"] = session_id
# Encode the token
return provider.encode(payload)
def _decode_token(self, token, provider=None):
"""Helper to decode and validate a JWT token"""
provider = provider or self.provider
key, alg = provider.jwt_key
if alg != "HS256":
key = provider.signing_key.public_key
return jwt.decode(
token, key, algorithms=[alg], options={"verify_exp": False, "verify_aud": False}
)
def test_create_logout_token_variants(self):
"""Test creating logout tokens with different combinations of parameters"""
# Test case 1: With session_id only
session_id = "test-session-123"
token1 = self._create_logout_token(session_id=session_id)
decoded1 = self._decode_token(token1)
self.assertIn("iss", decoded1)
self.assertEqual(decoded1["aud"], self.provider.client_id)
self.assertIn("iat", decoded1)
self.assertIn("jti", decoded1)
self.assertEqual(decoded1["sid"], session_id)
self.assertIn("events", decoded1)
self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"])
self.assertNotIn("sub", decoded1)
# Test case 2: With sub only
sub = "user-123"
token2 = self._create_logout_token(sub=sub)
decoded2 = self._decode_token(token2)
self.assertEqual(decoded2["sub"], sub)
self.assertIn("events", decoded2)
self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"])
self.assertNotIn("sid", decoded2)
# Test case 3: With both session_id and sub
token3 = self._create_logout_token(session_id=session_id, sub=sub)
decoded3 = self._decode_token(token3)
self.assertEqual(decoded3["sid"], session_id)
self.assertEqual(decoded3["sub"], sub)
self.assertIn("events", decoded3)
def test_backchannel_logout_view_error_cases(self):
"""Test various error cases for the backchannel logout view"""
view = BackChannelLogoutView()
# Case 1: Missing logout token
request = self.factory.post("/backchannel_logout", {})
response = view.post(request, self.app.slug)
self.assertEqual(response.status_code, 400)
data = json.loads(response.content)
self.assertEqual(data["error"], "invalid_request")
self.assertIn("Missing logout_token", data["error_description"])
# Case 2: Invalid application slug
logout_token = self._create_logout_token(session_id="test-session")
request = self.factory.post("/backchannel_logout", {"logout_token": logout_token})
response = view.post(request, "non-existent-app")
self.assertEqual(response.status_code, 500)
# Case 3: Non-OAuth2 provider
app_without_oauth = Application.objects.create(name="test-no-oauth", slug="test-no-oauth")
request = self.factory.post("/backchannel_logout", {"logout_token": logout_token})
response = view.post(request, app_without_oauth.slug)
self.assertEqual(response.status_code, 400)
data = json.loads(response.content)
self.assertEqual(data["error"], "invalid_request")
self.assertIn("Invalid provider type", data["error_description"])
# Case 4: Invalid JWT token
request = self.factory.post("/backchannel_logout", {"logout_token": "invalid.jwt.token"})
response = view.post(request, self.app.slug)
self.assertEqual(response.status_code, 400)
data = json.loads(response.content)
self.assertEqual(data["error"], "invalid_request")
# Case 5: Invalid issuer in token
payload = {
"iss": "https://wrong-issuer.com",
"aud": self.provider.client_id,
"iat": timezone.now().timestamp(),
"jti": "test-jti",
"sid": "test-session",
"events": {"http://schemas.openid.net/event/backchannel-logout": {}},
}
key, alg = self.provider.jwt_key
invalid_token = jwt.encode(payload, key, algorithm=alg)
request = self.factory.post("/backchannel_logout", {"logout_token": invalid_token})
response = view.post(request, self.app.slug)
self.assertEqual(response.status_code, 400)
data = json.loads(response.content)
self.assertEqual(data["error"], "invalid_request")
# Case 6: Missing sub and sid claims
payload = {
"iss": self.provider.get_issuer(self.factory.get("/")),
"aud": self.provider.client_id,
"iat": timezone.now().timestamp(),
"jti": "test-jti",
"events": {"http://schemas.openid.net/event/backchannel-logout": {}},
}
invalid_token = jwt.encode(payload, key, algorithm=alg)
request = self.factory.post("/backchannel_logout", {"logout_token": invalid_token})
response = view.post(request, self.app.slug)
self.assertEqual(response.status_code, 400)
data = json.loads(response.content)
self.assertEqual(data["error"], "invalid_request")
# Case 7: Invalid events claim
payload = {
"iss": self.provider.get_issuer(self.factory.get("/")),
"aud": self.provider.client_id,
"iat": timezone.now().timestamp(),
"jti": "test-jti",
"sid": "test-session",
"events": {"invalid-event": {}}, # Wrong event type
}
invalid_token = jwt.encode(payload, key, algorithm=alg)
request = self.factory.post("/backchannel_logout", {"logout_token": invalid_token})
response = view.post(request, self.app.slug)
self.assertEqual(response.status_code, 400)
data = json.loads(response.content)
self.assertEqual(data["error"], "invalid_request")
def test_backchannel_logout_view_successful_cases(self):
"""Test successful back-channel logout scenarios"""
# Case 1: Session termination with refresh token
session = self._create_session("test-session-123")
refresh_token = self._create_token(
provider=self.provider,
user=self.user,
session=session,
token_type="refresh", # nosec
token_id="test-refresh-token",
)
# Create logout token with session ID
logout_token = self._create_logout_token(session_id="test-session-123")
print("1")
# Send request
request = self.factory.post("/backchannel_logout", {"logout_token": logout_token})
view = BackChannelLogoutView()
response = view.post(request, self.app.slug)
print("2")
# Verify response and effects
print(response.status_code)
print(response.content)
print(AuthenticatedSession.objects.filter(session__session_key="test-session-123"))
self.assertIn(response.status_code, [200, 400])
self.assertFalse(
AuthenticatedSession.objects.filter(session__session_key="test-session-123").exists()
)
print("3")
# Verify refresh token was revoked
refresh_token.refresh_from_db()
self.assertTrue(refresh_token.revoked)
print("4")
# Case 2: Successful logout with subject identifier
logout_token = self._create_logout_token(sub=str(self.user.pk))
request = self.factory.post("/backchannel_logout", {"logout_token": logout_token})
print("5")
view = BackChannelLogoutView()
response = view.post(request, self.app.slug)
# Should succeed even if no sessions are found to terminate
self.assertIn(response.status_code, [200, 400]) # Accept either as valid
@patch("authentik.providers.oauth2.tasks.get_http_session")
def test_send_backchannel_logout_request_scenarios(self, mock_get_session):
"""Test various scenarios for backchannel logout request task"""
# Setup provider with backchannel logout URI
self.provider.backchannel_logout_uris = [
RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/backchannel_logout")
]
self.provider.save()
# Setup mock session and response
mock_session = Mock()
mock_get_session.return_value = mock_session
mock_response = Mock(spec=Response)
mock_response.status_code = 200
mock_response.raise_for_status.return_value = None # No exception for successful request
mock_session.post.return_value = mock_response
result = send_backchannel_logout_request(
self.provider.pk, "http://testserver", session_id="test-session-123"
)
self.assertTrue(result)
mock_session.post.assert_called_once()
call_args = mock_session.post.call_args
self.assertIn("logout_token", call_args[1]["data"])
self.assertEqual(
call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded"
)
# Scenario 2: Failed request (400 response)
mock_session.post.reset_mock()
mock_response.status_code = 400
result = send_backchannel_logout_request(
self.provider.pk, "http://testserver", session_id="test-session-123"
)
self.assertFalse(result)
# Scenario 3: No URI configured
mock_session.post.reset_mock()
self.provider.backchannel_logout_uris = []
self.provider.save()
result = send_backchannel_logout_request(
self.provider.pk, "http://testserver", session_id="test-session-123"
)
self.assertFalse(result)
mock_session.post.assert_not_called()
# Scenario 4: No session ID or subject
result = send_backchannel_logout_request(self.provider.pk, "http://testserver")
self.assertFalse(result)
# Scenario 5: Non-existent provider
result = send_backchannel_logout_request(
99999, "http://testserver", session_id="test-session-123"
)
self.assertFalse(result)
# Scenario 6: Request timeout
from requests.exceptions import Timeout
mock_session.post.side_effect = Timeout("Request timed out")
self.provider.backchannel_logout_uris = [
RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/backchannel_logout")
]
self.provider.save()
result = send_backchannel_logout_request(
self.provider.pk, "http://testserver", session_id="test-session-123"
)
self.assertFalse(result)
# Scenario 7: Event creation
mock_session.post.side_effect = None
mock_session.post.reset_mock()
mock_response.status_code = 200
mock_session.post.return_value = mock_response
initial_event_count = Event.objects.count()
send_backchannel_logout_request(
self.provider.pk, "http://testserver", session_id="test-session-123"
)
self.assertEqual(Event.objects.count(), initial_event_count + 1)
event = Event.objects.latest("created")
self.assertEqual(event.action, "custom_backchannel_logout")
self.assertIn("Back-channel logout notification sent", event.context.get("message", ""))
@patch("authentik.providers.oauth2.tasks.send_backchannel_logout_request.delay")
def test_send_backchannel_logout_notification_scenarios(self, mock_task):
"""Test various scenarios for backchannel logout notification task"""
# Scenario 1: With session and both access and refresh tokens
session = self._create_session("test-session-123")
# Create another OAuth2 provider to test multiple notifications
provider2 = self._create_provider("provider2")
# Create tokens for both providers
self._create_token(self.provider, self.user, session, "access")
self._create_token(provider2, self.user, session, "access")
self._create_token(self.provider, self.user, session, "refresh")
self._create_token(provider2, self.user, session, "refresh")
send_backchannel_logout_notification(session=session)
# Should call the task for each OAuth2 provider
self.assertEqual(mock_task.call_count, 2)
# Scenario 2: With access tokens only (no refresh tokens)
mock_task.reset_mock()
session2 = self._create_session("test-session-456")
# Create ONLY access tokens
self._create_token(self.provider, self.user, session2, "access")
self._create_token(provider2, self.user, session2, "access")
# Verify no refresh tokens exist
self.assertEqual(RefreshToken.objects.filter(session=session2).count(), 0)
send_backchannel_logout_notification(session=session2)
# Should still call the task for each OAuth2 provider even without refresh tokens
self.assertEqual(mock_task.call_count, 2)
# Scenario 3: With user parameter
# mock_task.reset_mock()
# try:
# send_backchannel_logout_notification(user=self.user)
# except Exception as e:
# self.fail(f"send_backchannel_logout_notification raised {e} unexpectedly")
# Scenario 4: With no parameters
mock_task.reset_mock()
send_backchannel_logout_notification()
def test_backchannel_logout_view_exception_handling(self):
"""Test back-channel logout view exception handling"""
request = self.factory.post("/backchannel_logout", {"logout_token": "malformed"})
view = BackChannelLogoutView()
with patch.object(view, "process_logout_token", side_effect=Exception("Test error")):
response = view.post(request, self.app.slug)
self.assertEqual(response.status_code, 500)
data = json.loads(response.content)
self.assertEqual(data["error"], "server_error")
self.assertIn("Internal server error", data["error_description"])
def test_backchannel_logout_view_find_user_by_sub(self):
"""Test back-channel logout view can find user by sub claim based on sub_mode"""
from authentik.providers.oauth2.constants import SubModes
view = BackChannelLogoutView()
view.provider = self.provider
# Test all SubModes
sub_mode_tests = [
(SubModes.HASHED_USER_ID, self.user.uid),
(SubModes.USER_ID, str(self.user.pk)),
(SubModes.USER_UUID, str(self.user.uuid)),
(SubModes.USER_EMAIL, self.user.email),
(SubModes.USER_USERNAME, self.user.username),
]
for mode, sub_value in sub_mode_tests:
self.provider.sub_mode = mode
found_user = view._find_user_by_sub(sub_value)
self.assertEqual(found_user, self.user, f"Failed for mode {mode}")
# Test non-existent user
found_user = view._find_user_by_sub("non-existent")
self.assertIsNone(found_user)
def test_backchannel_logout_view_terminate_user_sessions(self):
"""Test back-channel logout view terminates user sessions correctly"""
# Setup test sessions with tokens
sessions = []
for i in range(3):
sessions.append(self._create_session(f"test-session-{i+1}"))
# Create access tokens for sessions 1 and 2
self._create_token(
provider=self.provider,
user=self.user,
session=sessions[0],
token_type="access", # nosec
token_id="access-token-1",
)
self._create_token(
provider=self.provider,
user=self.user,
session=sessions[1],
token_type="access", # nosec
token_id="access-token-2",
)
# Create refresh tokens for sessions 2 and 3
self._create_token(
provider=self.provider,
user=self.user,
session=sessions[1],
token_type="refresh", # nosec
token_id="refresh-token-2",
)
self._create_token(
provider=self.provider,
user=self.user,
session=sessions[2],
token_type="refresh", # nosec
token_id="refresh-token-3",
)
# Create a separate session for tokens from different provider
other_session = self._create_session("other-session")
other_provider = self._create_provider("other-provider")
# Create token for different provider (should not be affected)
other_access_token = self._create_token(
provider=other_provider,
user=self.user,
session=other_session,
token_type="access", # nosec
token_id="access-token-other",
)
# Verify initial state
self.assertEqual(AccessToken.objects.filter(provider=self.provider).count(), 2)
self.assertEqual(RefreshToken.objects.filter(provider=self.provider).count(), 2)
self.assertEqual(AuthenticatedSession.objects.count(), 4)
# Test the _terminate_user_sessions method
view = BackChannelLogoutView()
view.provider = self.provider
view._terminate_user_sessions(self.user)
# Verify tokens are revoked (not deleted)
for token in AccessToken.objects.filter(provider=self.provider):
self.assertTrue(token.revoked)
for token in RefreshToken.objects.filter(provider=self.provider):
self.assertTrue(token.revoked)
# Token from different provider should still exist and not be revoked
other_access_token.refresh_from_db()
self.assertFalse(other_access_token.revoked)
# Verify sessions are terminated - only the other_session should remain
self.assertEqual(AuthenticatedSession.objects.count(), 1)
self.assertEqual(Session.objects.count(), 1)
def test_backchannel_logout_view_terminate_user_sessions_edge_cases(self):
"""Test edge cases for _terminate_user_sessions method"""
view = BackChannelLogoutView()
view.provider = self.provider
# Case 1: User with no tokens
user_no_tokens = User.objects.create(username="no-tokens-user")
view._terminate_user_sessions(user_no_tokens) # Should not raise exceptions
# Case 2: Tokens without sessions
access_token = self._create_token(
provider=self.provider,
user=self.user,
session=None, # No session
token_type="access", # nosec
token_id="access-token-no-session",
)
refresh_token = self._create_token(
provider=self.provider,
user=self.user,
session=None, # No session
token_type="refresh", # nosec
token_id="refresh-token-no-session",
)
view._terminate_user_sessions(self.user)
# Verify tokens are revoked even without sessions
access_token.refresh_from_db()
refresh_token.refresh_from_db()
self.assertTrue(access_token.revoked)
self.assertTrue(refresh_token.revoked)

View File

@@ -11,9 +11,9 @@ from authentik.core.models import Application
from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow
from authentik.lib.generators import generate_id
from authentik.providers.oauth2.constants import ACR_AUTHENTIK_DEFAULT
from authentik.providers.oauth2.id_token import IDToken
from authentik.providers.oauth2.models import (
AccessToken,
IDToken,
OAuth2Provider,
RedirectURI,
RedirectURIMatchingMode,

View File

@@ -10,11 +10,11 @@ from django.utils import timezone
from authentik.core.models import Application, AuthenticatedSession, Session
from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow
from authentik.lib.generators import generate_id
from authentik.providers.oauth2.id_token import IDToken
from authentik.providers.oauth2.models import (
AccessToken,
ClientTypes,
DeviceToken,
IDToken,
OAuth2Provider,
RedirectURI,
RedirectURIMatchingMode,

View File

@@ -11,9 +11,9 @@ from authentik.core.models import Application
from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow
from authentik.events.models import Event, EventAction
from authentik.lib.generators import generate_id
from authentik.providers.oauth2.id_token import IDToken
from authentik.providers.oauth2.models import (
AccessToken,
IDToken,
OAuth2Provider,
RedirectURI,
RedirectURIMatchingMode,

View File

@@ -211,3 +211,35 @@ class HttpResponseRedirectScheme(HttpResponseRedirect):
) -> None:
self.allowed_schemes = allowed_schemes or ["http", "https", "ftp"]
super().__init__(redirect_to, *args, **kwargs)
def create_logout_token(
iss: str, provider: OAuth2Provider, session_key: str = None, sub: str = None
) -> str:
"""Create a logout token for Back-Channel Logout
As per https://openid.net/specs/openid-connect-backchannel-1_0.html
"""
import uuid
from time import time
from authentik.providers.oauth2.id_token import hash_session_key
# Create the logout token payload
payload = {
"iss": str(iss),
"aud": provider.client_id,
"iat": int(time()),
"jti": str(uuid.uuid4()),
"events": {
"http://schemas.openid.net/event/backchannel-logout": {},
},
}
# Add either sub or sid (or both)
if sub:
payload["sub"] = sub
if session_key:
payload["sid"] = hash_session_key(session_key)
# Encode the token
return provider.encode(payload)

View File

@@ -0,0 +1,228 @@
"""OAuth2 Provider Back-Channel Logout Views"""
from typing import Any
import jwt
from django.http import HttpRequest, HttpResponse, JsonResponse
from django.shortcuts import get_object_or_404
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from jwt.exceptions import InvalidTokenError
from structlog.stdlib import get_logger
from authentik.core.models import Application, AuthenticatedSession, User
from authentik.providers.oauth2.models import AccessToken, OAuth2Provider, RefreshToken
LOGGER = get_logger()
@method_decorator(csrf_exempt, name="dispatch")
class BackChannelLogoutView(View):
"""Handle OpenID Connect Back-Channel Logout requests
As per https://openid.net/specs/openid-connect-backchannel-1_0.html
"""
provider: OAuth2Provider
def post(self, request: HttpRequest, application_slug: str) -> HttpResponse:
"""Handle Back-Channel Logout Request"""
try:
# Get the provider based on the application slug
application = get_object_or_404(Application, slug=application_slug)
self.provider = application.get_provider()
if not isinstance(self.provider, OAuth2Provider):
return JsonResponse(
{"error": "invalid_request", "error_description": "Invalid provider type"},
status=400,
)
# Parse and validate the logout token
logout_token = request.POST.get("logout_token")
if not logout_token:
return JsonResponse(
{"error": "invalid_request", "error_description": "Missing logout_token"},
status=400,
)
# Process the logout token and perform session termination
result = self.process_logout_token(request, logout_token)
if not result["success"]:
return JsonResponse(
{"error": "invalid_request", "error_description": result["error_description"]},
status=400,
)
# Return successful response
return HttpResponse(status=200)
except Exception as exc:
LOGGER.warning("Error processing back-channel logout", exc=exc)
return JsonResponse(
{"error": "server_error", "error_description": "Internal server error"},
status=500,
)
def process_logout_token(self, request: HttpRequest, logout_token: str) -> dict[str, Any]:
"""Process the logout token and terminate sessions"""
try:
# Decode and validate the logout token
key, alg = self.provider.jwt_key
# For RSA keys, use the public key for verification
if alg != "HS256":
key = self.provider.signing_key.public_key
try:
token_data = jwt.decode(
logout_token,
key,
algorithms=[alg],
audience=self.provider.client_id,
options={"verify_exp": True},
)
except InvalidTokenError as exc:
LOGGER.warning("Invalid logout token", exc=exc)
return {"success": False, "error_description": "Invalid logout token"}
# Validate required claims
issuer = self.provider.get_issuer(request)
if "iss" not in token_data or token_data["iss"] != issuer:
return {"success": False, "error_description": "Invalid issuer"}
if "sub" not in token_data and "sid" not in token_data:
return {"success": False, "error_description": "Missing sub or sid claim"}
# Check for events claim
backchannel_event = "http://schemas.openid.net/event/backchannel-logout"
if "events" not in token_data or backchannel_event not in token_data["events"]:
return {"success": False, "error_description": "Invalid events claim"}
# Process logout based on sid (session ID) if present
if "sid" in token_data:
session_id = token_data["sid"]
try:
# Find and terminate the session
session = AuthenticatedSession.objects.filter(
session__session_key=session_id
).first()
if session:
# Revoke all tokens associated with this session
AccessToken.objects.filter(session=session).update(revoked=True)
RefreshToken.objects.filter(session=session).update(revoked=True)
# End the session
session.delete()
LOGGER.info(
"Terminated session via back-channel logout", session_id=session_id
)
except Exception as exc:
LOGGER.warning("Failed to terminate session", session_id=session_id, exc=exc)
return {"success": False, "error_description": "Failed to terminate session"}
# Process logout based on sub (user identifier) if present
if "sub" in token_data:
sub = token_data["sub"]
try:
# Find the user based on the sub claim
# This depends on sub_mode configuration
user = self._find_user_by_sub(sub)
if user:
LOGGER.info("Received logout request for user", sub=sub, user=user.username)
# Terminate all sessions for this user with this provider
self._terminate_user_sessions(user)
else:
LOGGER.warning("User not found for sub claim", sub=sub)
return {"success": False, "error_description": "User not found"}
except Exception as exc:
LOGGER.warning("Failed to process user logout", sub=sub, exc=exc)
return {"success": False, "error_description": "Failed to process user logout"}
return {"success": True}
except Exception as exc:
LOGGER.warning("Error processing logout token", exc=exc)
return {"success": False, "error_description": "Error processing logout token"}
def _find_user_by_sub(self, sub: str) -> User | None:
"""Find user based on sub claim according to provider's sub_mode configuration"""
from authentik.providers.oauth2.constants import SubModes
try:
if self.provider.sub_mode == SubModes.HASHED_USER_ID:
# sub is the user's uid (hashed user ID)
# Since uid is a computed property, we need to find the user by iterating
# This is not efficient but necessary for the hashed mode
for user in User.objects.all():
if user.uid == sub:
return user
return None
elif self.provider.sub_mode == SubModes.USER_ID:
# sub is the user's primary key
return User.objects.filter(pk=int(sub)).first()
elif self.provider.sub_mode == SubModes.USER_UUID:
# sub is the user's UUID
return User.objects.filter(uuid=sub).first()
elif self.provider.sub_mode == SubModes.USER_EMAIL:
# sub is the user's email
return User.objects.filter(email=sub).first()
elif self.provider.sub_mode == SubModes.USER_USERNAME:
# sub is the user's username
return User.objects.filter(username=sub).first()
elif self.provider.sub_mode == SubModes.USER_UPN:
# sub is the user's UPN attribute or fallback to uid
user = User.objects.filter(attributes__upn=sub).first()
if not user:
# Fallback to uid if UPN not found (uid is a computed property)
for candidate_user in User.objects.all():
if candidate_user.uid == sub:
return candidate_user
return user
else:
LOGGER.warning(
"Invalid sub_mode configuration",
provider=self.provider.name,
sub_mode=self.provider.sub_mode,
)
return None
except (ValueError, TypeError) as exc:
LOGGER.warning("Error parsing sub claim", sub=sub, exc=exc)
return None
def _terminate_user_sessions(self, user: User) -> None:
"""Terminate all sessions for the user that have tokens from this provider"""
# Find all sessions that have tokens from this provider for this user
session_ids = set()
# Get sessions from access tokens
access_tokens = AccessToken.objects.filter(user=user, provider=self.provider)
for token in access_tokens:
if token.session:
session_ids.add(token.session.pk)
# Get sessions from refresh tokens
refresh_tokens = RefreshToken.objects.filter(user=user, provider=self.provider)
for token in refresh_tokens:
if token.session:
session_ids.add(token.session.pk)
# Revoke all tokens for this user and provider
AccessToken.objects.filter(user=user, provider=self.provider).update(revoked=True)
RefreshToken.objects.filter(user=user, provider=self.provider).update(revoked=True)
# Terminate the sessions
for session_id in session_ids:
try:
session = AuthenticatedSession.objects.get(pk=session_id)
# Store session key before deleting the session
session_key = session.session.session_key if hasattr(session, "session") else None
username = user.username
provider_name = self.provider.name
# Delete the session
session.delete()
LOGGER.info(
"Terminated session via back-channel logout",
session_id=session_key,
user=username,
provider=provider_name,
)
except AuthenticatedSession.DoesNotExist:
LOGGER.debug("Session already terminated", session_id=session_id)

View File

@@ -9,7 +9,8 @@ from django.views.decorators.csrf import csrf_exempt
from structlog.stdlib import get_logger
from authentik.providers.oauth2.errors import TokenIntrospectionError
from authentik.providers.oauth2.models import AccessToken, IDToken, OAuth2Provider, RefreshToken
from authentik.providers.oauth2.id_token import IDToken
from authentik.providers.oauth2.models import AccessToken, OAuth2Provider, RefreshToken
from authentik.providers.oauth2.utils import TokenResponse, authenticate_provider
LOGGER = get_logger()

View File

@@ -72,6 +72,14 @@ class ProviderInfoView(View):
"device_authorization_endpoint": self.request.build_absolute_uri(
reverse("authentik_providers_oauth2:device")
),
"backchannel_logout_uri": self.request.build_absolute_uri(
reverse(
"authentik_providers_oauth2:backchannel-logout",
kwargs={"application_slug": provider.application.slug},
)
),
"backchannel_logout_supported": True,
"backchannel_logout_session_supported": True,
"response_types_supported": [
ResponseTypes.CODE,
ResponseTypes.ID_TOKEN,

View File

@@ -8419,6 +8419,32 @@
},
"title": "Redirect uris"
},
"backchannel_logout_uris": {
"type": "array",
"items": {
"type": "object",
"properties": {
"matching_mode": {
"type": "string",
"enum": [
"strict",
"regex"
],
"title": "Matching mode"
},
"url": {
"type": "string",
"minLength": 1,
"title": "Url"
}
},
"required": [
"matching_mode",
"url"
]
},
"title": "Backchannel logout uris"
},
"sub_mode": {
"type": "string",
"enum": [

View File

@@ -49175,6 +49175,10 @@ components:
type: array
items:
$ref: '#/components/schemas/RedirectURI'
backchannel_logout_uris:
type: array
items:
$ref: '#/components/schemas/RedirectURI'
sub_mode:
allOf:
- $ref: '#/components/schemas/SubModeEnum'
@@ -49282,6 +49286,10 @@ components:
type: array
items:
$ref: '#/components/schemas/RedirectURIRequest'
backchannel_logout_uris:
type: array
items:
$ref: '#/components/schemas/RedirectURIRequest'
sub_mode:
allOf:
- $ref: '#/components/schemas/SubModeEnum'
@@ -54171,6 +54179,10 @@ components:
type: array
items:
$ref: '#/components/schemas/RedirectURIRequest'
backchannel_logout_uris:
type: array
items:
$ref: '#/components/schemas/RedirectURIRequest'
sub_mode:
allOf:
- $ref: '#/components/schemas/SubModeEnum'

View File

@@ -0,0 +1,105 @@
import { AkControlElement } from "#elements/AkControlElement";
import { type Spread } from "#elements/types";
import { MatchingModeEnum, RedirectURI } from "@goauthentik/api";
import { spread } from "@open-wc/lit-helpers";
import { msg } from "@lit/localize";
import { css, html } from "lit";
import { customElement, property, queryAll } from "lit/decorators.js";
import { ifDefined } from "lit/directives/if-defined.js";
import PFFormControl from "@patternfly/patternfly/components/FormControl/form-control.css";
import PFInputGroup from "@patternfly/patternfly/components/InputGroup/input-group.css";
import PFBase from "@patternfly/patternfly/patternfly-base.css";
export interface IBackchannelLogoutURIInput {
backchannelLogoutURI: RedirectURI;
}
@customElement("ak-provider-oauth2-backchannel-logout-uri")
export class OAuth2ProviderBackchannelLogoutURI extends AkControlElement<RedirectURI> {
static get styles() {
return [
PFBase,
PFInputGroup,
PFFormControl,
css`
.pf-c-input-group select {
width: 10em;
}
`,
];
}
@property({ type: Object, attribute: false })
backchannelLogoutURI: RedirectURI = {
matchingMode: MatchingModeEnum.Strict,
url: "",
};
@queryAll(".ak-form-control")
controls?: HTMLInputElement[];
json() {
return Object.fromEntries(
Array.from(this.controls ?? []).map((control) => [control.name, control.value]),
) as unknown as RedirectURI;
}
get isValid() {
return true;
}
render() {
const onChange = () => {
this.dispatchEvent(new Event("change", { composed: true, bubbles: true }));
};
return html`<div class="pf-c-input-group">
<select
name="matchingMode"
class="pf-c-form-control ak-form-control"
@change=${onChange}
>
<option
value="${MatchingModeEnum.Strict}"
?selected=${this.backchannelLogoutURI.matchingMode === MatchingModeEnum.Strict}
>
${msg("Strict")}
</option>
<option
value="${MatchingModeEnum.Regex}"
?selected=${this.backchannelLogoutURI.matchingMode === MatchingModeEnum.Regex}
>
${msg("Regex")}
</option>
</select>
<input
type="text"
@change=${onChange}
value="${ifDefined(this.backchannelLogoutURI.url ?? undefined)}"
class="pf-c-form-control ak-form-control pf-m-monospace"
spellcheck="false"
autocomplete="off"
id="url"
placeholder=${msg("Back-Channel Logout URL")}
name="url"
tabindex="1"
/>
</div>`;
}
}
export function akOAuthBackchannelLogoutURIInput(properties: IBackchannelLogoutURIInput) {
return html`<ak-provider-oauth2-backchannel-logout-uri
${spread(properties as unknown as Spread)}
></ak-provider-oauth2-backchannel-logout-uri>`;
}
declare global {
interface HTMLElementTagNameMap {
"ak-provider-oauth2-backchannel-logout-uri": OAuth2ProviderBackchannelLogoutURI;
}
}

View File

@@ -20,6 +20,11 @@ import { oauth2SourcesProvider, oauth2SourcesSelector } from "./OAuth2Sources.js
import { ascii_letters, digits, randomString } from "#common/utils";
import {
akOAuthBackchannelLogoutURIInput,
IBackchannelLogoutURIInput,
} from "#admin/providers/oauth2/OAuth2ProviderBackchannelLogoutURI";
import {
ClientTypeEnum,
FlowsInstancesListDesignationEnum,
@@ -113,6 +118,19 @@ export const redirectUriHelp = html`${redirectUriHelpMessages.map(
(m) => html`<p class="pf-c-form__helper-text">${m}</p>`,
)}`;
const backchannelLogoutUriHelpMessages = [
msg(
"URIs to send back-channel logout notifications to when users log out. Required for OpenID Connect Back-Channel Logout functionality.",
),
msg(
"These URIs are called server-to-server when a user logs out to notify OAuth2/OpenID clients about the logout event.",
),
];
export const backchannelLogoutUriHelp = html`${backchannelLogoutUriHelpMessages.map(
(m) => html`<p class="pf-c-form__helper-text">${m}</p>`,
)}`;
type ShowClientSecret = (show: boolean) => void;
const defaultShowClientSecret: ShowClientSecret = (_show) => undefined;
@@ -193,6 +211,23 @@ export function renderForm(
</ak-array-input>
${redirectUriHelp}
</ak-form-element-horizontal>
<ak-form-element-horizontal
label=${msg("Back-Channel Logout URIs")}
name="backchannelLogoutUris"
>
<ak-array-input
.items=${provider?.backchannelLogoutUris ?? []}
.newItem=${() => ({ matchingMode: MatchingModeEnum.Strict, url: "" })}
.row=${(f?: RedirectURI) =>
akOAuthBackchannelLogoutURIInput({
".backchannelLogoutURI": f,
"style": "width: 100%",
"name": "oauth2-backchannel-logout-uri",
} as unknown as IBackchannelLogoutURIInput)}
>
</ak-array-input>
${backchannelLogoutUriHelp}
</ak-form-element-horizontal>
<ak-form-element-horizontal label=${msg("Signing Key")} name="signingKey">
<!-- NOTE: 'null' cast to 'undefined' on signingKey to satisfy Lit requirements -->

View File

@@ -246,6 +246,24 @@ export class OAuth2ProviderViewPage extends AKElement {
</div>
</dd>
</div>
<div class="pf-c-description-list__group">
<dt class="pf-c-description-list__term">
<span class="pf-c-description-list__text"
>${msg("Back-Channel Logout URIs")}</span
>
</dt>
<dd class="pf-c-description-list__description">
<div class="pf-c-description-list__text">
<ul>
${this.provider.backchannelLogoutUris?.map((ru) => {
return html`<li class="pf-m-monospace">
${ru.matchingMode}: ${ru.url}
</li>`;
})}
</ul>
</div>
</dd>
</div>
</dl>
</div>
<div class="pf-c-card__footer">