Compare commits

...

4 Commits

Author SHA1 Message Date
Connor Peshek
43b73995ba cleanup 2026-02-27 19:18:57 -06:00
Connor Peshek
b0284256ea remove empty exception file 2026-02-26 16:53:51 -06:00
Connor Peshek
ad4f81e5b0 update imports 2026-02-26 04:27:17 -06:00
Connor Peshek
a98b8fccdf providers/saml: verify logoutrequest/response signatures 2026-02-26 03:29:43 -06:00
9 changed files with 691 additions and 29 deletions

View File

@@ -1,4 +1,4 @@
"""authentik SAML IDP Exceptions"""
"""Common SAML Exceptions"""
from authentik.lib.sentry import SentryIgnoredException

View File

@@ -0,0 +1,81 @@
"""LogoutResponse parser"""
import binascii
import zlib
from base64 import b64decode
from dataclasses import dataclass
from defusedxml import ElementTree
from authentik.common.saml.constants import NS_SAML_ASSERTION, NS_SAML_PROTOCOL, SAML_STATUS_SUCCESS
from authentik.common.saml.exceptions import CannotHandleAssertion
@dataclass(slots=True)
class LogoutResponse:
"""Logout Response"""
id: str | None = None
in_response_to: str | None = None
issuer: str | None = None
status: str | None = None
relay_state: str | None = None
class LogoutResponseParser:
"""LogoutResponse Parser"""
def _parse_xml(
self, decoded_xml: str | bytes, relay_state: str | None = None
) -> LogoutResponse:
root = ElementTree.fromstring(decoded_xml)
response = LogoutResponse(
id=root.attrib.get("ID"),
in_response_to=root.attrib.get("InResponseTo"),
)
# Extract Issuer
issuers = root.findall(f"{{{NS_SAML_ASSERTION}}}Issuer")
if not issuers:
issuers = root.findall(f"{{{NS_SAML_PROTOCOL}}}Issuer")
if len(issuers) > 0:
response.issuer = issuers[0].text
# Extract Status
status_elements = root.findall(f"{{{NS_SAML_PROTOCOL}}}Status")
if len(status_elements) > 0:
status_codes = status_elements[0].findall(f"{{{NS_SAML_PROTOCOL}}}StatusCode")
if len(status_codes) > 0:
response.status = status_codes[0].attrib.get("Value")
response.relay_state = relay_state
return response
def parse(self, saml_response: str, relay_state: str | None = None) -> LogoutResponse:
"""Validate and parse raw response with enveloped signature (POST binding)."""
try:
decoded_xml = b64decode(saml_response.encode())
except UnicodeDecodeError, binascii.Error:
raise CannotHandleAssertion("Cannot decode SAML response.") from None
return self._parse_xml(decoded_xml, relay_state)
def parse_detached(self, saml_response: str, relay_state: str | None = None) -> LogoutResponse:
"""Validate and parse raw response with detached signature (Redirect binding)."""
try:
decoded_data = b64decode(saml_response)
try:
decoded_xml = zlib.decompress(decoded_data, -15).decode("utf-8")
except zlib.error:
decoded_xml = decoded_data.decode("utf-8")
except UnicodeDecodeError, binascii.Error, zlib.error:
raise CannotHandleAssertion("Cannot decode SAML response.") from None
return self._parse_xml(decoded_xml, relay_state)
def verify_status(self, response: LogoutResponse):
"""Verify that the LogoutResponse has a successful status."""
if response.status != SAML_STATUS_SUCCESS:
raise CannotHandleAssertion(f"LogoutResponse status is not success: {response.status}")

View File

@@ -0,0 +1,93 @@
"""Shared SAML signature verification utilities"""
from base64 import b64decode
from urllib.parse import quote_plus
import xmlsec
from authentik.common.saml.constants import (
NS_MAP,
SIGN_ALGORITHM_TRANSFORM_MAP,
)
from authentik.common.saml.exceptions import CannotHandleAssertion
from authentik.lib.xml import lxml_from_string
def verify_enveloped_signature(raw_xml: bytes, verification_kp, xpath: str):
"""Verify an enveloped XML signature.
Args:
raw_xml: The raw XML bytes
verification_kp: CertificateKeyPair with certificate_data
xpath: XPath to signature node, e.g. '/samlp:LogoutRequest/ds:Signature'
"""
root = lxml_from_string(raw_xml)
xmlsec.tree.add_ids(root, ["ID"])
signature_nodes = root.xpath(xpath, namespaces=NS_MAP)
if len(signature_nodes) < 1:
raise CannotHandleAssertion(
"Verification Certificate configured, but message is not signed."
)
signature_node = signature_nodes[0]
try:
ctx = xmlsec.SignatureContext()
key = xmlsec.Key.from_memory(
verification_kp.certificate_data,
xmlsec.constants.KeyDataFormatCertPem,
None,
)
ctx.key = key
ctx.verify(signature_node)
except xmlsec.Error as exc:
raise CannotHandleAssertion("Failed to verify signature") from exc
def verify_detached_signature(
saml_param_name: str,
saml_value: str,
relay_state: str | None,
signature: str | None,
sig_alg: str | None,
verification_kp,
):
"""Verify a detached redirect-binding signature.
Args:
saml_param_name: "SAMLRequest" or "SAMLResponse"
saml_value: The raw base64+deflated SAML message value
relay_state: RelayState value, if present
signature: Base64-encoded signature from query params
sig_alg: Signature algorithm URI from query params
verification_kp: CertificateKeyPair with certificate_data
"""
if not (signature and sig_alg):
raise CannotHandleAssertion(
"Verification Certificate configured, but message is not signed."
)
querystring = f"{saml_param_name}={quote_plus(saml_value)}&"
if relay_state is not None:
querystring += f"RelayState={quote_plus(relay_state)}&"
querystring += f"SigAlg={quote_plus(sig_alg)}"
dsig_ctx = xmlsec.SignatureContext()
key = xmlsec.Key.from_memory(
verification_kp.certificate_data, xmlsec.constants.KeyDataFormatCertPem, None
)
dsig_ctx.key = key
sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
sig_alg, xmlsec.constants.TransformRsaSha1
)
try:
dsig_ctx.verify_binary(
querystring.encode("utf-8"),
sign_algorithm_transform,
b64decode(signature),
)
except xmlsec.Error as exc:
raise CannotHandleAssertion("Failed to verify signature") from exc

View File

@@ -19,18 +19,12 @@ from authentik.common.saml.constants import (
RSA_SHA512,
SAML_NAME_ID_FORMAT_UNSPECIFIED,
)
from authentik.common.saml.exceptions import CannotHandleAssertion
from authentik.lib.xml import lxml_from_string
from authentik.providers.saml.exceptions import CannotHandleAssertion
from authentik.providers.saml.models import SAMLProvider
from authentik.providers.saml.utils.encoding import decode_base64_and_inflate
from authentik.sources.saml.models import SAMLNameIDPolicy
ERROR_CANNOT_DECODE_REQUEST = "Cannot decode SAML request."
ERROR_SIGNATURE_REQUIRED_BUT_ABSENT = (
"Verification Certificate configured, but request is not signed."
)
ERROR_FAILED_TO_VERIFY = "Failed to verify signature"
@dataclass(slots=True)
class AuthNRequest:
@@ -88,7 +82,7 @@ class AuthNRequestParser:
try:
decoded_xml = b64decode(saml_request.encode())
except UnicodeDecodeError:
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
raise CannotHandleAssertion("Cannot decode SAML request.") from None
verifier = self.provider.verification_kp
if not verifier:
@@ -99,7 +93,9 @@ class AuthNRequestParser:
signature_nodes = root.xpath("/samlp:AuthnRequest/ds:Signature", namespaces=NS_MAP)
# No signatures, no verifier configured -> decode xml directly
if len(signature_nodes) < 1:
raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
raise CannotHandleAssertion(
"Verification Certificate configured, but request is not signed."
)
signature_node = signature_nodes[0]
@@ -114,7 +110,7 @@ class AuthNRequestParser:
ctx.key = key
ctx.verify(signature_node)
except xmlsec.Error as exc:
raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
raise CannotHandleAssertion("Failed to verify signature") from exc
return self._parse_xml(decoded_xml, relay_state)
@@ -129,14 +125,16 @@ class AuthNRequestParser:
try:
decoded_xml = decode_base64_and_inflate(saml_request)
except UnicodeDecodeError:
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
raise CannotHandleAssertion("Cannot decode SAML request.") from None
verifier = self.provider.verification_kp
if not verifier:
return self._parse_xml(decoded_xml, relay_state)
if verifier and not (signature and sig_alg):
raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
raise CannotHandleAssertion(
"Verification Certificate configured, but request is not signed."
)
if signature and sig_alg:
querystring = f"SAMLRequest={quote_plus(saml_request)}&"
@@ -168,11 +166,11 @@ class AuthNRequestParser:
b64decode(signature),
)
except xmlsec.Error as exc:
raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
raise CannotHandleAssertion("Failed to verify signature") from exc
try:
return self._parse_xml(decoded_xml, relay_state)
except ParseError as exc:
raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
raise CannotHandleAssertion("Failed to verify signature") from exc
def idp_initiated(self) -> AuthNRequest:
"""Create IdP Initiated AuthNRequest"""

View File

@@ -6,9 +6,8 @@ from dataclasses import dataclass
from defusedxml import ElementTree
from authentik.common.saml.constants import NS_SAML_ASSERTION, NS_SAML_PROTOCOL
from authentik.providers.saml.exceptions import CannotHandleAssertion
from authentik.common.saml.exceptions import CannotHandleAssertion
from authentik.providers.saml.models import SAMLProvider
from authentik.providers.saml.processors.authn_request_parser import ERROR_CANNOT_DECODE_REQUEST
from authentik.providers.saml.utils.encoding import decode_base64_and_inflate
@@ -74,7 +73,7 @@ class LogoutRequestParser:
try:
decoded_xml = b64decode(saml_request.encode())
except UnicodeDecodeError:
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
raise CannotHandleAssertion("Cannot decode SAML request.") from None
return self._parse_xml(decoded_xml, relay_state)
def parse_detached(
@@ -86,6 +85,6 @@ class LogoutRequestParser:
try:
decoded_xml = decode_base64_and_inflate(saml_request)
except UnicodeDecodeError:
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
raise CannotHandleAssertion("Cannot decode SAML request.") from None
return self._parse_xml(decoded_xml, relay_state)

View File

@@ -7,13 +7,15 @@ from django.test import RequestFactory, TestCase
from django.urls import reverse
from authentik.common.saml.constants import SAML_NAME_ID_FORMAT_EMAIL
from authentik.common.saml.exceptions import CannotHandleAssertion
from authentik.core.models import Application
from authentik.core.tests.utils import create_test_brand, create_test_cert, create_test_flow
from authentik.flows.planner import FlowPlan
from authentik.flows.views.executor import SESSION_KEY_PLAN
from authentik.providers.saml.exceptions import CannotHandleAssertion
from authentik.providers.saml.models import SAMLBindings, SAMLLogoutMethods, SAMLProvider
from authentik.providers.saml.processors.logout_request import LogoutRequestProcessor
from authentik.providers.saml.processors.logout_request_parser import LogoutRequest
from authentik.providers.saml.processors.logout_response_processor import LogoutResponseProcessor
from authentik.providers.saml.views.flows import (
PLAN_CONTEXT_SAML_RELAY_STATE,
)
@@ -63,6 +65,13 @@ class TestSPInitiatedSLOViews(TestCase):
relay_state="https://sp.example.com/return",
)
# Create a LogoutResponseProcessor for generating valid test responses
self._response_processor = LogoutResponseProcessor(
provider=self.provider,
logout_request=LogoutRequest(id="test-id", issuer="https://sp.example.com"),
destination="https://idp.example.com/sls",
)
def test_redirect_view_handles_logout_request(self):
"""Test that redirect view properly handles a logout request"""
# Generate encoded logout request
@@ -102,7 +111,7 @@ class TestSPInitiatedSLOViews(TestCase):
request = self.factory.get(
f"/slo/redirect/{self.application.slug}/",
{
"SAMLResponse": "dummy-response",
"SAMLResponse": self._response_processor.encode_redirect(),
"RelayState": relay_state,
},
)
@@ -125,7 +134,7 @@ class TestSPInitiatedSLOViews(TestCase):
request = self.factory.get(
f"/slo/redirect/{self.application.slug}/",
{
"SAMLResponse": "dummy-response",
"SAMLResponse": self._response_processor.encode_redirect(),
"RelayState": relay_state,
},
)
@@ -148,7 +157,7 @@ class TestSPInitiatedSLOViews(TestCase):
request = self.factory.get(
f"/slo/redirect/{self.application.slug}/",
{
"SAMLResponse": "dummy-response",
"SAMLResponse": self._response_processor.encode_redirect(),
},
)
# Create a flow plan with the return URL
@@ -171,7 +180,7 @@ class TestSPInitiatedSLOViews(TestCase):
request = self.factory.get(
f"/slo/redirect/{self.application.slug}/",
{
"SAMLResponse": "dummy-response",
"SAMLResponse": self._response_processor.encode_redirect(),
},
)
request.session = {}
@@ -239,7 +248,7 @@ class TestSPInitiatedSLOViews(TestCase):
request = self.factory.post(
f"/slo/post/{self.application.slug}/",
{
"SAMLResponse": "dummy-response",
"SAMLResponse": self._response_processor.encode_post(),
"RelayState": relay_state,
},
)
@@ -262,7 +271,7 @@ class TestSPInitiatedSLOViews(TestCase):
request = self.factory.post(
f"/slo/post/{self.application.slug}/",
{
"SAMLResponse": "dummy-response",
"SAMLResponse": self._response_processor.encode_post(),
},
)
# Create a flow plan with the return URL
@@ -424,7 +433,7 @@ class TestSPInitiatedSLOViews(TestCase):
request = self.factory.get(
f"/slo/redirect/{self.application.slug}/",
{
"SAMLResponse": "dummy-response",
"SAMLResponse": self._response_processor.encode_redirect(),
"RelayState": "/some/invalid/path", # Use a path that starts with /
},
)
@@ -725,3 +734,406 @@ class TestSPInitiatedSLOLogoutMethods(TestCase):
# Verify relay state was captured
logout_request = view.plan_context.get("authentik/providers/saml/logout_request")
self.assertEqual(logout_request.relay_state, expected_relay_state)
class TestSignatureVerification(TestCase):
"""Test SAML signature verification for LogoutRequest and LogoutResponse"""
def setUp(self):
"""Set up test fixtures"""
self.factory = RequestFactory()
self.brand = create_test_brand()
self.flow = create_test_flow()
self.invalidation_flow = create_test_flow()
self.cert = create_test_cert()
# Create provider with signing and verification keypairs
self.provider = SAMLProvider.objects.create(
name="test-sig-provider",
authorization_flow=self.flow,
invalidation_flow=self.invalidation_flow,
acs_url="https://sp.example.com/acs",
sls_url="https://sp.example.com/sls",
issuer="https://idp.example.com",
sp_binding="redirect",
sls_binding="redirect",
signing_kp=self.cert,
sign_logout_request=True,
sign_logout_response=True,
)
self.application = Application.objects.create(
name="test-sig-app",
slug="test-sig-app",
provider=self.provider,
)
def test_logout_response_redirect_no_verification_kp_accepted(self):
"""LogoutResponse without verification_kp should be accepted without signature"""
# Provider has no verification_kp — should accept unsigned response
self.provider.verification_kp = None
self.provider.save()
# Generate a valid logout response
logout_request = LogoutRequest(id="test-id", issuer="https://sp.example.com")
processor = LogoutResponseProcessor(
provider=self.provider,
logout_request=logout_request,
destination="https://idp.example.com/sls",
)
encoded_response = processor.encode_redirect()
request = self.factory.get(
f"/slo/redirect/{self.application.slug}/",
{
"SAMLResponse": encoded_response,
"RelayState": "https://sp.example.com/return",
},
)
request.session = {}
request.brand = self.brand
view = SPInitiatedSLOBindingRedirectView()
view.setup(request, application_slug=self.application.slug)
response = view.dispatch(request, application_slug=self.application.slug)
# Should redirect to relay state (accepted)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, "https://sp.example.com/return")
def test_logout_response_redirect_verification_kp_no_signature_rejected(self):
"""LogoutResponse with verification_kp but no signature should be rejected"""
self.provider.verification_kp = self.cert
self.provider.save()
# Generate an unsigned logout response
logout_request = LogoutRequest(id="test-id", issuer="https://sp.example.com")
processor = LogoutResponseProcessor(
provider=self.provider,
logout_request=logout_request,
destination="https://idp.example.com/sls",
)
# encode_redirect() does NOT add signature to XML (it's detached for redirect)
encoded_response = processor.encode_redirect()
request = self.factory.get(
f"/slo/redirect/{self.application.slug}/",
{
"SAMLResponse": encoded_response,
"RelayState": "https://sp.example.com/return",
# No Signature or SigAlg params
},
)
request.session = {}
request.brand = self.brand
view = SPInitiatedSLOBindingRedirectView()
view.setup(request, application_slug=self.application.slug)
response = view.dispatch(request, application_slug=self.application.slug)
# Should redirect to root (rejected)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
def test_logout_response_redirect_valid_signature_accepted(self):
"""LogoutResponse with valid detached signature should be accepted"""
self.provider.verification_kp = self.cert
self.provider.save()
# Generate a signed logout response URL (has Signature + SigAlg params)
logout_request = LogoutRequest(id="test-id", issuer="https://sp.example.com")
processor = LogoutResponseProcessor(
provider=self.provider,
logout_request=logout_request,
destination=f"https://idp.example.com/slo/redirect/{self.application.slug}/",
relay_state="https://sp.example.com/return",
)
redirect_url = processor.get_redirect_url()
# Parse the URL to get query params
from urllib.parse import parse_qs, urlparse
parsed = urlparse(redirect_url)
params = parse_qs(parsed.query)
request = self.factory.get(
f"/slo/redirect/{self.application.slug}/",
{
"SAMLResponse": params["SAMLResponse"][0],
"RelayState": params["RelayState"][0],
"Signature": params["Signature"][0],
"SigAlg": params["SigAlg"][0],
},
)
request.session = {}
request.brand = self.brand
view = SPInitiatedSLOBindingRedirectView()
view.setup(request, application_slug=self.application.slug)
response = view.dispatch(request, application_slug=self.application.slug)
# Should redirect to relay state (accepted)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, "https://sp.example.com/return")
def test_logout_response_redirect_garbage_rejected(self):
"""LogoutResponse with garbage SAMLResponse should be rejected gracefully"""
self.provider.verification_kp = None
self.provider.save()
request = self.factory.get(
f"/slo/redirect/{self.application.slug}/",
{
"SAMLResponse": "not-valid-base64-!!!",
"RelayState": "https://sp.example.com/return",
},
)
request.session = {}
request.brand = self.brand
view = SPInitiatedSLOBindingRedirectView()
view.setup(request, application_slug=self.application.slug)
response = view.dispatch(request, application_slug=self.application.slug)
# Should redirect to root (rejected gracefully)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
def test_logout_request_redirect_verification_kp_no_signature_rejected(self):
"""LogoutRequest with verification_kp but no Signature/SigAlg should be rejected"""
self.provider.verification_kp = self.cert
self.provider.save()
# Generate an unsigned logout request
processor = LogoutRequestProcessor(
provider=self.provider,
user=None,
destination="https://idp.example.com/sls",
name_id="test@example.com",
name_id_format=SAML_NAME_ID_FORMAT_EMAIL,
session_index="test-session-123",
)
encoded_request = processor.encode_redirect()
request = self.factory.get(
f"/slo/redirect/{self.application.slug}/",
{
"SAMLRequest": encoded_request,
"RelayState": "https://sp.example.com/return",
# No Signature or SigAlg params
},
)
request.session = {}
request.brand = self.brand
view = SPInitiatedSLOBindingRedirectView()
view.setup(request, application_slug=self.application.slug)
view.resolve_provider_application()
result = view.check_saml_request()
self.assertIsNotNone(result)
self.assertEqual(result.status_code, 400)
def test_logout_request_redirect_valid_signature_accepted(self):
"""LogoutRequest with valid detached signature should be accepted"""
self.provider.verification_kp = self.cert
self.provider.save()
# Generate a signed logout request URL
processor = LogoutRequestProcessor(
provider=self.provider,
user=None,
destination="https://idp.example.com/sls",
name_id="test@example.com",
name_id_format=SAML_NAME_ID_FORMAT_EMAIL,
session_index="test-session-123",
relay_state="https://sp.example.com/return",
)
redirect_url = processor.get_redirect_url()
# Parse the URL to get query params
from urllib.parse import parse_qs, urlparse
parsed = urlparse(redirect_url)
params = parse_qs(parsed.query)
request = self.factory.get(
f"/slo/redirect/{self.application.slug}/",
{
"SAMLRequest": params["SAMLRequest"][0],
"RelayState": params["RelayState"][0],
"Signature": params["Signature"][0],
"SigAlg": params["SigAlg"][0],
},
)
request.session = {}
request.brand = self.brand
view = SPInitiatedSLOBindingRedirectView()
view.setup(request, application_slug=self.application.slug)
view.resolve_provider_application()
result = view.check_saml_request()
self.assertIsNone(result) # None means success
def test_logout_response_post_no_verification_kp_accepted(self):
"""POST LogoutResponse without verification_kp should be accepted"""
self.provider.verification_kp = None
self.provider.save()
logout_request = LogoutRequest(id="test-id", issuer="https://sp.example.com")
processor = LogoutResponseProcessor(
provider=self.provider,
logout_request=logout_request,
destination="https://idp.example.com/sls",
)
encoded_response = processor.encode_post()
request = self.factory.post(
f"/slo/post/{self.application.slug}/",
{
"SAMLResponse": encoded_response,
"RelayState": "https://sp.example.com/return",
},
)
request.session = {}
request.brand = self.brand
view = SPInitiatedSLOBindingPOSTView()
view.setup(request, application_slug=self.application.slug)
response = view.dispatch(request, application_slug=self.application.slug)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, "https://sp.example.com/return")
def test_logout_response_post_verification_kp_no_signature_rejected(self):
"""POST LogoutResponse with verification_kp but no enveloped signature should fail"""
self.provider.verification_kp = self.cert
# Disable signing so the response won't have a signature
self.provider.sign_logout_response = False
self.provider.save()
logout_request = LogoutRequest(id="test-id", issuer="https://sp.example.com")
processor = LogoutResponseProcessor(
provider=self.provider,
logout_request=logout_request,
destination="https://idp.example.com/sls",
)
encoded_response = processor.encode_post()
request = self.factory.post(
f"/slo/post/{self.application.slug}/",
{
"SAMLResponse": encoded_response,
"RelayState": "https://sp.example.com/return",
},
)
request.session = {}
request.brand = self.brand
view = SPInitiatedSLOBindingPOSTView()
view.setup(request, application_slug=self.application.slug)
response = view.dispatch(request, application_slug=self.application.slug)
# Should redirect to root (rejected)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
def test_logout_response_post_valid_signature_accepted(self):
"""POST LogoutResponse with valid enveloped signature should be accepted"""
self.provider.verification_kp = self.cert
self.provider.sign_logout_response = True
self.provider.save()
logout_request = LogoutRequest(id="test-id", issuer="https://sp.example.com")
processor = LogoutResponseProcessor(
provider=self.provider,
logout_request=logout_request,
destination="https://idp.example.com/sls",
)
encoded_response = processor.encode_post()
request = self.factory.post(
f"/slo/post/{self.application.slug}/",
{
"SAMLResponse": encoded_response,
"RelayState": "https://sp.example.com/return",
},
)
request.session = {}
request.brand = self.brand
view = SPInitiatedSLOBindingPOSTView()
view.setup(request, application_slug=self.application.slug)
response = view.dispatch(request, application_slug=self.application.slug)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, "https://sp.example.com/return")
def test_logout_request_post_verification_kp_no_signature_rejected(self):
"""POST LogoutRequest with verification_kp but no signature should be rejected"""
self.provider.verification_kp = self.cert
# Disable signing so the request won't have a signature
self.provider.sign_logout_request = False
self.provider.save()
processor = LogoutRequestProcessor(
provider=self.provider,
user=None,
destination="https://idp.example.com/sls",
name_id="test@example.com",
name_id_format=SAML_NAME_ID_FORMAT_EMAIL,
session_index="test-session-123",
)
encoded_request = processor.encode_post()
request = self.factory.post(
f"/slo/post/{self.application.slug}/",
{
"SAMLRequest": encoded_request,
"RelayState": "https://sp.example.com/return",
},
)
request.session = {}
request.brand = self.brand
view = SPInitiatedSLOBindingPOSTView()
view.setup(request, application_slug=self.application.slug)
view.resolve_provider_application()
result = view.check_saml_request()
self.assertIsNotNone(result)
self.assertEqual(result.status_code, 400)
def test_logout_request_post_valid_signature_accepted(self):
"""POST LogoutRequest with valid enveloped signature should be accepted"""
self.provider.verification_kp = self.cert
self.provider.sign_logout_request = True
self.provider.save()
processor = LogoutRequestProcessor(
provider=self.provider,
user=None,
destination="https://idp.example.com/sls",
name_id="test@example.com",
name_id_format=SAML_NAME_ID_FORMAT_EMAIL,
session_index="test-session-123",
)
encoded_request = processor.encode_post()
request = self.factory.post(
f"/slo/post/{self.application.slug}/",
{
"SAMLRequest": encoded_request,
"RelayState": "https://sp.example.com/return",
},
)
request.session = {}
request.brand = self.brand
view = SPInitiatedSLOBindingPOSTView()
view.setup(request, application_slug=self.application.slug)
view.resolve_provider_application()
result = view.check_saml_request()
self.assertIsNone(result) # None means success

View File

@@ -1,5 +1,7 @@
"""SP-initiated SAML Single Logout Views"""
from base64 import b64decode
from django.http import Http404, HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404, redirect
from django.utils.decorators import method_decorator
@@ -7,6 +9,12 @@ from django.views.decorators.clickjacking import xframe_options_sameorigin
from django.views.decorators.csrf import csrf_exempt
from structlog.stdlib import get_logger
from authentik.common.saml.exceptions import CannotHandleAssertion
from authentik.common.saml.parsers.logout_response import LogoutResponseParser
from authentik.common.saml.parsers.verify import (
verify_detached_signature,
verify_enveloped_signature,
)
from authentik.core.models import Application, AuthenticatedSession
from authentik.events.models import Event, EventAction
from authentik.flows.models import Flow, in_memory_stage
@@ -16,7 +24,6 @@ from authentik.flows.views.executor import SESSION_KEY_PLAN
from authentik.lib.views import bad_request_message
from authentik.policies.views import PolicyAccessView
from authentik.providers.iframe_logout import IframeLogoutStageView
from authentik.providers.saml.exceptions import CannotHandleAssertion
from authentik.providers.saml.models import (
SAMLBindings,
SAMLLogoutMethods,
@@ -36,6 +43,8 @@ from authentik.providers.saml.views.flows import (
REQUEST_KEY_RELAY_STATE,
REQUEST_KEY_SAML_REQUEST,
REQUEST_KEY_SAML_RESPONSE,
REQUEST_KEY_SAML_SIG_ALG,
REQUEST_KEY_SAML_SIGNATURE,
)
LOGGER = get_logger()
@@ -203,6 +212,35 @@ class SPInitiatedSLOBindingRedirectView(SPInitiatedSLOView):
# IDP SLO, so we want to redirect to our next provider
if REQUEST_KEY_SAML_RESPONSE in request.GET:
relay_state = request.GET.get(REQUEST_KEY_RELAY_STATE, "")
# Resolve provider for signature verification
try:
application = Application.objects.get(slug=kwargs.get("application_slug", ""))
provider = SAMLProvider.objects.get(pk=application.provider_id)
except Application.DoesNotExist, SAMLProvider.DoesNotExist:
return redirect("authentik_core:root-redirect")
# Parse and verify LogoutResponse
try:
parser = LogoutResponseParser()
logout_response = parser.parse_detached(
request.GET[REQUEST_KEY_SAML_RESPONSE],
relay_state=relay_state or None,
)
parser.verify_status(logout_response)
if provider.verification_kp:
verify_detached_signature(
"SAMLResponse",
request.GET[REQUEST_KEY_SAML_RESPONSE],
relay_state or None,
request.GET.get(REQUEST_KEY_SAML_SIGNATURE),
request.GET.get(REQUEST_KEY_SAML_SIG_ALG),
provider.verification_kp,
)
except CannotHandleAssertion as exc:
LOGGER.warning("Failed to verify SAML LogoutResponse", exc=str(exc))
return redirect("authentik_core:root-redirect")
if relay_state:
return redirect(relay_state)
@@ -230,6 +268,15 @@ class SPInitiatedSLOBindingRedirectView(SPInitiatedSLOView):
self.request.GET[REQUEST_KEY_SAML_REQUEST],
relay_state=self.request.GET.get(REQUEST_KEY_RELAY_STATE, None),
)
if self.provider.verification_kp:
verify_detached_signature(
"SAMLRequest",
self.request.GET[REQUEST_KEY_SAML_REQUEST],
self.request.GET.get(REQUEST_KEY_RELAY_STATE),
self.request.GET.get(REQUEST_KEY_SAML_SIGNATURE),
self.request.GET.get(REQUEST_KEY_SAML_SIG_ALG),
self.provider.verification_kp,
)
self.plan_context[PLAN_CONTEXT_SAML_LOGOUT_REQUEST] = logout_request
except CannotHandleAssertion as exc:
Event.new(
@@ -254,6 +301,32 @@ class SPInitiatedSLOBindingPOSTView(SPInitiatedSLOView):
# IDP SLO, so we want to redirect to our next provider
if REQUEST_KEY_SAML_RESPONSE in request.POST:
relay_state = request.POST.get(REQUEST_KEY_RELAY_STATE, "")
# Resolve provider for signature verification
try:
application = Application.objects.get(slug=kwargs.get("application_slug", ""))
provider = SAMLProvider.objects.get(pk=application.provider_id)
except Application.DoesNotExist, SAMLProvider.DoesNotExist:
return redirect("authentik_core:root-redirect")
# Parse and verify LogoutResponse
try:
parser = LogoutResponseParser()
logout_response = parser.parse(
request.POST[REQUEST_KEY_SAML_RESPONSE],
relay_state=relay_state or None,
)
parser.verify_status(logout_response)
if provider.verification_kp:
verify_enveloped_signature(
b64decode(request.POST[REQUEST_KEY_SAML_RESPONSE].encode()),
provider.verification_kp,
"/samlp:LogoutResponse/ds:Signature",
)
except CannotHandleAssertion as exc:
LOGGER.warning("Failed to verify SAML LogoutResponse", exc=str(exc))
return redirect("authentik_core:root-redirect")
if relay_state:
return redirect(relay_state)
@@ -282,6 +355,12 @@ class SPInitiatedSLOBindingPOSTView(SPInitiatedSLOView):
payload[REQUEST_KEY_SAML_REQUEST],
relay_state=payload.get(REQUEST_KEY_RELAY_STATE, None),
)
if self.provider.verification_kp:
verify_enveloped_signature(
b64decode(payload[REQUEST_KEY_SAML_REQUEST].encode()),
self.provider.verification_kp,
"/samlp:LogoutRequest/ds:Signature",
)
self.plan_context[PLAN_CONTEXT_SAML_LOGOUT_REQUEST] = logout_request
except CannotHandleAssertion as exc:
LOGGER.info(str(exc))

View File

@@ -8,6 +8,7 @@ from django.views.decorators.clickjacking import xframe_options_sameorigin
from django.views.decorators.csrf import csrf_exempt
from structlog.stdlib import get_logger
from authentik.common.saml.exceptions import CannotHandleAssertion
from authentik.core.models import Application
from authentik.events.models import Event, EventAction
from authentik.flows.exceptions import FlowNonApplicableException
@@ -16,7 +17,6 @@ from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO,
from authentik.flows.views.executor import SESSION_KEY_POST
from authentik.lib.views import bad_request_message
from authentik.policies.views import BufferedPolicyAccessView
from authentik.providers.saml.exceptions import CannotHandleAssertion
from authentik.providers.saml.models import SAMLBindings, SAMLProvider
from authentik.providers.saml.processors.authn_request_parser import AuthNRequestParser
from authentik.providers.saml.views.flows import (