mirror of
https://github.com/goauthentik/authentik
synced 2026-05-05 22:52:42 +02:00
Compare commits
4 Commits
docs/invit
...
logoutresp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
43b73995ba | ||
|
|
b0284256ea | ||
|
|
ad4f81e5b0 | ||
|
|
a98b8fccdf |
@@ -1,4 +1,4 @@
|
||||
"""authentik SAML IDP Exceptions"""
|
||||
"""Common SAML Exceptions"""
|
||||
|
||||
from authentik.lib.sentry import SentryIgnoredException
|
||||
|
||||
0
authentik/common/saml/parsers/__init__.py
Normal file
0
authentik/common/saml/parsers/__init__.py
Normal file
81
authentik/common/saml/parsers/logout_response.py
Normal file
81
authentik/common/saml/parsers/logout_response.py
Normal file
@@ -0,0 +1,81 @@
|
||||
"""LogoutResponse parser"""
|
||||
|
||||
import binascii
|
||||
import zlib
|
||||
from base64 import b64decode
|
||||
from dataclasses import dataclass
|
||||
|
||||
from defusedxml import ElementTree
|
||||
|
||||
from authentik.common.saml.constants import NS_SAML_ASSERTION, NS_SAML_PROTOCOL, SAML_STATUS_SUCCESS
|
||||
from authentik.common.saml.exceptions import CannotHandleAssertion
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class LogoutResponse:
|
||||
"""Logout Response"""
|
||||
|
||||
id: str | None = None
|
||||
|
||||
in_response_to: str | None = None
|
||||
|
||||
issuer: str | None = None
|
||||
|
||||
status: str | None = None
|
||||
|
||||
relay_state: str | None = None
|
||||
|
||||
|
||||
class LogoutResponseParser:
|
||||
"""LogoutResponse Parser"""
|
||||
|
||||
def _parse_xml(
|
||||
self, decoded_xml: str | bytes, relay_state: str | None = None
|
||||
) -> LogoutResponse:
|
||||
root = ElementTree.fromstring(decoded_xml)
|
||||
response = LogoutResponse(
|
||||
id=root.attrib.get("ID"),
|
||||
in_response_to=root.attrib.get("InResponseTo"),
|
||||
)
|
||||
|
||||
# Extract Issuer
|
||||
issuers = root.findall(f"{{{NS_SAML_ASSERTION}}}Issuer")
|
||||
if not issuers:
|
||||
issuers = root.findall(f"{{{NS_SAML_PROTOCOL}}}Issuer")
|
||||
if len(issuers) > 0:
|
||||
response.issuer = issuers[0].text
|
||||
|
||||
# Extract Status
|
||||
status_elements = root.findall(f"{{{NS_SAML_PROTOCOL}}}Status")
|
||||
if len(status_elements) > 0:
|
||||
status_codes = status_elements[0].findall(f"{{{NS_SAML_PROTOCOL}}}StatusCode")
|
||||
if len(status_codes) > 0:
|
||||
response.status = status_codes[0].attrib.get("Value")
|
||||
|
||||
response.relay_state = relay_state
|
||||
return response
|
||||
|
||||
def parse(self, saml_response: str, relay_state: str | None = None) -> LogoutResponse:
|
||||
"""Validate and parse raw response with enveloped signature (POST binding)."""
|
||||
try:
|
||||
decoded_xml = b64decode(saml_response.encode())
|
||||
except UnicodeDecodeError, binascii.Error:
|
||||
raise CannotHandleAssertion("Cannot decode SAML response.") from None
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
|
||||
def parse_detached(self, saml_response: str, relay_state: str | None = None) -> LogoutResponse:
|
||||
"""Validate and parse raw response with detached signature (Redirect binding)."""
|
||||
try:
|
||||
decoded_data = b64decode(saml_response)
|
||||
try:
|
||||
decoded_xml = zlib.decompress(decoded_data, -15).decode("utf-8")
|
||||
except zlib.error:
|
||||
decoded_xml = decoded_data.decode("utf-8")
|
||||
except UnicodeDecodeError, binascii.Error, zlib.error:
|
||||
raise CannotHandleAssertion("Cannot decode SAML response.") from None
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
|
||||
def verify_status(self, response: LogoutResponse):
|
||||
"""Verify that the LogoutResponse has a successful status."""
|
||||
if response.status != SAML_STATUS_SUCCESS:
|
||||
raise CannotHandleAssertion(f"LogoutResponse status is not success: {response.status}")
|
||||
93
authentik/common/saml/parsers/verify.py
Normal file
93
authentik/common/saml/parsers/verify.py
Normal file
@@ -0,0 +1,93 @@
|
||||
"""Shared SAML signature verification utilities"""
|
||||
|
||||
from base64 import b64decode
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
import xmlsec
|
||||
|
||||
from authentik.common.saml.constants import (
|
||||
NS_MAP,
|
||||
SIGN_ALGORITHM_TRANSFORM_MAP,
|
||||
)
|
||||
from authentik.common.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.lib.xml import lxml_from_string
|
||||
|
||||
|
||||
def verify_enveloped_signature(raw_xml: bytes, verification_kp, xpath: str):
|
||||
"""Verify an enveloped XML signature.
|
||||
|
||||
Args:
|
||||
raw_xml: The raw XML bytes
|
||||
verification_kp: CertificateKeyPair with certificate_data
|
||||
xpath: XPath to signature node, e.g. '/samlp:LogoutRequest/ds:Signature'
|
||||
"""
|
||||
root = lxml_from_string(raw_xml)
|
||||
xmlsec.tree.add_ids(root, ["ID"])
|
||||
signature_nodes = root.xpath(xpath, namespaces=NS_MAP)
|
||||
|
||||
if len(signature_nodes) < 1:
|
||||
raise CannotHandleAssertion(
|
||||
"Verification Certificate configured, but message is not signed."
|
||||
)
|
||||
|
||||
signature_node = signature_nodes[0]
|
||||
|
||||
try:
|
||||
ctx = xmlsec.SignatureContext()
|
||||
key = xmlsec.Key.from_memory(
|
||||
verification_kp.certificate_data,
|
||||
xmlsec.constants.KeyDataFormatCertPem,
|
||||
None,
|
||||
)
|
||||
ctx.key = key
|
||||
ctx.verify(signature_node)
|
||||
except xmlsec.Error as exc:
|
||||
raise CannotHandleAssertion("Failed to verify signature") from exc
|
||||
|
||||
|
||||
def verify_detached_signature(
|
||||
saml_param_name: str,
|
||||
saml_value: str,
|
||||
relay_state: str | None,
|
||||
signature: str | None,
|
||||
sig_alg: str | None,
|
||||
verification_kp,
|
||||
):
|
||||
"""Verify a detached redirect-binding signature.
|
||||
|
||||
Args:
|
||||
saml_param_name: "SAMLRequest" or "SAMLResponse"
|
||||
saml_value: The raw base64+deflated SAML message value
|
||||
relay_state: RelayState value, if present
|
||||
signature: Base64-encoded signature from query params
|
||||
sig_alg: Signature algorithm URI from query params
|
||||
verification_kp: CertificateKeyPair with certificate_data
|
||||
"""
|
||||
if not (signature and sig_alg):
|
||||
raise CannotHandleAssertion(
|
||||
"Verification Certificate configured, but message is not signed."
|
||||
)
|
||||
|
||||
querystring = f"{saml_param_name}={quote_plus(saml_value)}&"
|
||||
if relay_state is not None:
|
||||
querystring += f"RelayState={quote_plus(relay_state)}&"
|
||||
querystring += f"SigAlg={quote_plus(sig_alg)}"
|
||||
|
||||
dsig_ctx = xmlsec.SignatureContext()
|
||||
key = xmlsec.Key.from_memory(
|
||||
verification_kp.certificate_data, xmlsec.constants.KeyDataFormatCertPem, None
|
||||
)
|
||||
dsig_ctx.key = key
|
||||
|
||||
sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
|
||||
sig_alg, xmlsec.constants.TransformRsaSha1
|
||||
)
|
||||
|
||||
try:
|
||||
dsig_ctx.verify_binary(
|
||||
querystring.encode("utf-8"),
|
||||
sign_algorithm_transform,
|
||||
b64decode(signature),
|
||||
)
|
||||
except xmlsec.Error as exc:
|
||||
raise CannotHandleAssertion("Failed to verify signature") from exc
|
||||
@@ -19,18 +19,12 @@ from authentik.common.saml.constants import (
|
||||
RSA_SHA512,
|
||||
SAML_NAME_ID_FORMAT_UNSPECIFIED,
|
||||
)
|
||||
from authentik.common.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.lib.xml import lxml_from_string
|
||||
from authentik.providers.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.providers.saml.models import SAMLProvider
|
||||
from authentik.providers.saml.utils.encoding import decode_base64_and_inflate
|
||||
from authentik.sources.saml.models import SAMLNameIDPolicy
|
||||
|
||||
ERROR_CANNOT_DECODE_REQUEST = "Cannot decode SAML request."
|
||||
ERROR_SIGNATURE_REQUIRED_BUT_ABSENT = (
|
||||
"Verification Certificate configured, but request is not signed."
|
||||
)
|
||||
ERROR_FAILED_TO_VERIFY = "Failed to verify signature"
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class AuthNRequest:
|
||||
@@ -88,7 +82,7 @@ class AuthNRequestParser:
|
||||
try:
|
||||
decoded_xml = b64decode(saml_request.encode())
|
||||
except UnicodeDecodeError:
|
||||
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
|
||||
raise CannotHandleAssertion("Cannot decode SAML request.") from None
|
||||
|
||||
verifier = self.provider.verification_kp
|
||||
if not verifier:
|
||||
@@ -99,7 +93,9 @@ class AuthNRequestParser:
|
||||
signature_nodes = root.xpath("/samlp:AuthnRequest/ds:Signature", namespaces=NS_MAP)
|
||||
# No signatures, no verifier configured -> decode xml directly
|
||||
if len(signature_nodes) < 1:
|
||||
raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
|
||||
raise CannotHandleAssertion(
|
||||
"Verification Certificate configured, but request is not signed."
|
||||
)
|
||||
|
||||
signature_node = signature_nodes[0]
|
||||
|
||||
@@ -114,7 +110,7 @@ class AuthNRequestParser:
|
||||
ctx.key = key
|
||||
ctx.verify(signature_node)
|
||||
except xmlsec.Error as exc:
|
||||
raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
|
||||
raise CannotHandleAssertion("Failed to verify signature") from exc
|
||||
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
|
||||
@@ -129,14 +125,16 @@ class AuthNRequestParser:
|
||||
try:
|
||||
decoded_xml = decode_base64_and_inflate(saml_request)
|
||||
except UnicodeDecodeError:
|
||||
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
|
||||
raise CannotHandleAssertion("Cannot decode SAML request.") from None
|
||||
|
||||
verifier = self.provider.verification_kp
|
||||
if not verifier:
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
|
||||
if verifier and not (signature and sig_alg):
|
||||
raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
|
||||
raise CannotHandleAssertion(
|
||||
"Verification Certificate configured, but request is not signed."
|
||||
)
|
||||
|
||||
if signature and sig_alg:
|
||||
querystring = f"SAMLRequest={quote_plus(saml_request)}&"
|
||||
@@ -168,11 +166,11 @@ class AuthNRequestParser:
|
||||
b64decode(signature),
|
||||
)
|
||||
except xmlsec.Error as exc:
|
||||
raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
|
||||
raise CannotHandleAssertion("Failed to verify signature") from exc
|
||||
try:
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
except ParseError as exc:
|
||||
raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
|
||||
raise CannotHandleAssertion("Failed to verify signature") from exc
|
||||
|
||||
def idp_initiated(self) -> AuthNRequest:
|
||||
"""Create IdP Initiated AuthNRequest"""
|
||||
|
||||
@@ -6,9 +6,8 @@ from dataclasses import dataclass
|
||||
from defusedxml import ElementTree
|
||||
|
||||
from authentik.common.saml.constants import NS_SAML_ASSERTION, NS_SAML_PROTOCOL
|
||||
from authentik.providers.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.common.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.providers.saml.models import SAMLProvider
|
||||
from authentik.providers.saml.processors.authn_request_parser import ERROR_CANNOT_DECODE_REQUEST
|
||||
from authentik.providers.saml.utils.encoding import decode_base64_and_inflate
|
||||
|
||||
|
||||
@@ -74,7 +73,7 @@ class LogoutRequestParser:
|
||||
try:
|
||||
decoded_xml = b64decode(saml_request.encode())
|
||||
except UnicodeDecodeError:
|
||||
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
|
||||
raise CannotHandleAssertion("Cannot decode SAML request.") from None
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
|
||||
def parse_detached(
|
||||
@@ -86,6 +85,6 @@ class LogoutRequestParser:
|
||||
try:
|
||||
decoded_xml = decode_base64_and_inflate(saml_request)
|
||||
except UnicodeDecodeError:
|
||||
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
|
||||
raise CannotHandleAssertion("Cannot decode SAML request.") from None
|
||||
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
|
||||
@@ -7,13 +7,15 @@ from django.test import RequestFactory, TestCase
|
||||
from django.urls import reverse
|
||||
|
||||
from authentik.common.saml.constants import SAML_NAME_ID_FORMAT_EMAIL
|
||||
from authentik.common.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.core.models import Application
|
||||
from authentik.core.tests.utils import create_test_brand, create_test_cert, create_test_flow
|
||||
from authentik.flows.planner import FlowPlan
|
||||
from authentik.flows.views.executor import SESSION_KEY_PLAN
|
||||
from authentik.providers.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.providers.saml.models import SAMLBindings, SAMLLogoutMethods, SAMLProvider
|
||||
from authentik.providers.saml.processors.logout_request import LogoutRequestProcessor
|
||||
from authentik.providers.saml.processors.logout_request_parser import LogoutRequest
|
||||
from authentik.providers.saml.processors.logout_response_processor import LogoutResponseProcessor
|
||||
from authentik.providers.saml.views.flows import (
|
||||
PLAN_CONTEXT_SAML_RELAY_STATE,
|
||||
)
|
||||
@@ -63,6 +65,13 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
relay_state="https://sp.example.com/return",
|
||||
)
|
||||
|
||||
# Create a LogoutResponseProcessor for generating valid test responses
|
||||
self._response_processor = LogoutResponseProcessor(
|
||||
provider=self.provider,
|
||||
logout_request=LogoutRequest(id="test-id", issuer="https://sp.example.com"),
|
||||
destination="https://idp.example.com/sls",
|
||||
)
|
||||
|
||||
def test_redirect_view_handles_logout_request(self):
|
||||
"""Test that redirect view properly handles a logout request"""
|
||||
# Generate encoded logout request
|
||||
@@ -102,7 +111,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": "dummy-response",
|
||||
"SAMLResponse": self._response_processor.encode_redirect(),
|
||||
"RelayState": relay_state,
|
||||
},
|
||||
)
|
||||
@@ -125,7 +134,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": "dummy-response",
|
||||
"SAMLResponse": self._response_processor.encode_redirect(),
|
||||
"RelayState": relay_state,
|
||||
},
|
||||
)
|
||||
@@ -148,7 +157,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": "dummy-response",
|
||||
"SAMLResponse": self._response_processor.encode_redirect(),
|
||||
},
|
||||
)
|
||||
# Create a flow plan with the return URL
|
||||
@@ -171,7 +180,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": "dummy-response",
|
||||
"SAMLResponse": self._response_processor.encode_redirect(),
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
@@ -239,7 +248,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.post(
|
||||
f"/slo/post/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": "dummy-response",
|
||||
"SAMLResponse": self._response_processor.encode_post(),
|
||||
"RelayState": relay_state,
|
||||
},
|
||||
)
|
||||
@@ -262,7 +271,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.post(
|
||||
f"/slo/post/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": "dummy-response",
|
||||
"SAMLResponse": self._response_processor.encode_post(),
|
||||
},
|
||||
)
|
||||
# Create a flow plan with the return URL
|
||||
@@ -424,7 +433,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": "dummy-response",
|
||||
"SAMLResponse": self._response_processor.encode_redirect(),
|
||||
"RelayState": "/some/invalid/path", # Use a path that starts with /
|
||||
},
|
||||
)
|
||||
@@ -725,3 +734,406 @@ class TestSPInitiatedSLOLogoutMethods(TestCase):
|
||||
# Verify relay state was captured
|
||||
logout_request = view.plan_context.get("authentik/providers/saml/logout_request")
|
||||
self.assertEqual(logout_request.relay_state, expected_relay_state)
|
||||
|
||||
|
||||
class TestSignatureVerification(TestCase):
|
||||
"""Test SAML signature verification for LogoutRequest and LogoutResponse"""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures"""
|
||||
self.factory = RequestFactory()
|
||||
self.brand = create_test_brand()
|
||||
self.flow = create_test_flow()
|
||||
self.invalidation_flow = create_test_flow()
|
||||
self.cert = create_test_cert()
|
||||
|
||||
# Create provider with signing and verification keypairs
|
||||
self.provider = SAMLProvider.objects.create(
|
||||
name="test-sig-provider",
|
||||
authorization_flow=self.flow,
|
||||
invalidation_flow=self.invalidation_flow,
|
||||
acs_url="https://sp.example.com/acs",
|
||||
sls_url="https://sp.example.com/sls",
|
||||
issuer="https://idp.example.com",
|
||||
sp_binding="redirect",
|
||||
sls_binding="redirect",
|
||||
signing_kp=self.cert,
|
||||
sign_logout_request=True,
|
||||
sign_logout_response=True,
|
||||
)
|
||||
|
||||
self.application = Application.objects.create(
|
||||
name="test-sig-app",
|
||||
slug="test-sig-app",
|
||||
provider=self.provider,
|
||||
)
|
||||
|
||||
def test_logout_response_redirect_no_verification_kp_accepted(self):
|
||||
"""LogoutResponse without verification_kp should be accepted without signature"""
|
||||
# Provider has no verification_kp — should accept unsigned response
|
||||
self.provider.verification_kp = None
|
||||
self.provider.save()
|
||||
|
||||
# Generate a valid logout response
|
||||
logout_request = LogoutRequest(id="test-id", issuer="https://sp.example.com")
|
||||
processor = LogoutResponseProcessor(
|
||||
provider=self.provider,
|
||||
logout_request=logout_request,
|
||||
destination="https://idp.example.com/sls",
|
||||
)
|
||||
encoded_response = processor.encode_redirect()
|
||||
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": encoded_response,
|
||||
"RelayState": "https://sp.example.com/return",
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
request.brand = self.brand
|
||||
|
||||
view = SPInitiatedSLOBindingRedirectView()
|
||||
view.setup(request, application_slug=self.application.slug)
|
||||
response = view.dispatch(request, application_slug=self.application.slug)
|
||||
|
||||
# Should redirect to relay state (accepted)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response.url, "https://sp.example.com/return")
|
||||
|
||||
def test_logout_response_redirect_verification_kp_no_signature_rejected(self):
|
||||
"""LogoutResponse with verification_kp but no signature should be rejected"""
|
||||
self.provider.verification_kp = self.cert
|
||||
self.provider.save()
|
||||
|
||||
# Generate an unsigned logout response
|
||||
logout_request = LogoutRequest(id="test-id", issuer="https://sp.example.com")
|
||||
processor = LogoutResponseProcessor(
|
||||
provider=self.provider,
|
||||
logout_request=logout_request,
|
||||
destination="https://idp.example.com/sls",
|
||||
)
|
||||
# encode_redirect() does NOT add signature to XML (it's detached for redirect)
|
||||
encoded_response = processor.encode_redirect()
|
||||
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": encoded_response,
|
||||
"RelayState": "https://sp.example.com/return",
|
||||
# No Signature or SigAlg params
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
request.brand = self.brand
|
||||
|
||||
view = SPInitiatedSLOBindingRedirectView()
|
||||
view.setup(request, application_slug=self.application.slug)
|
||||
response = view.dispatch(request, application_slug=self.application.slug)
|
||||
|
||||
# Should redirect to root (rejected)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
|
||||
|
||||
def test_logout_response_redirect_valid_signature_accepted(self):
|
||||
"""LogoutResponse with valid detached signature should be accepted"""
|
||||
self.provider.verification_kp = self.cert
|
||||
self.provider.save()
|
||||
|
||||
# Generate a signed logout response URL (has Signature + SigAlg params)
|
||||
logout_request = LogoutRequest(id="test-id", issuer="https://sp.example.com")
|
||||
processor = LogoutResponseProcessor(
|
||||
provider=self.provider,
|
||||
logout_request=logout_request,
|
||||
destination=f"https://idp.example.com/slo/redirect/{self.application.slug}/",
|
||||
relay_state="https://sp.example.com/return",
|
||||
)
|
||||
redirect_url = processor.get_redirect_url()
|
||||
|
||||
# Parse the URL to get query params
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
parsed = urlparse(redirect_url)
|
||||
params = parse_qs(parsed.query)
|
||||
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": params["SAMLResponse"][0],
|
||||
"RelayState": params["RelayState"][0],
|
||||
"Signature": params["Signature"][0],
|
||||
"SigAlg": params["SigAlg"][0],
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
request.brand = self.brand
|
||||
|
||||
view = SPInitiatedSLOBindingRedirectView()
|
||||
view.setup(request, application_slug=self.application.slug)
|
||||
response = view.dispatch(request, application_slug=self.application.slug)
|
||||
|
||||
# Should redirect to relay state (accepted)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response.url, "https://sp.example.com/return")
|
||||
|
||||
def test_logout_response_redirect_garbage_rejected(self):
|
||||
"""LogoutResponse with garbage SAMLResponse should be rejected gracefully"""
|
||||
self.provider.verification_kp = None
|
||||
self.provider.save()
|
||||
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": "not-valid-base64-!!!",
|
||||
"RelayState": "https://sp.example.com/return",
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
request.brand = self.brand
|
||||
|
||||
view = SPInitiatedSLOBindingRedirectView()
|
||||
view.setup(request, application_slug=self.application.slug)
|
||||
response = view.dispatch(request, application_slug=self.application.slug)
|
||||
|
||||
# Should redirect to root (rejected gracefully)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
|
||||
|
||||
def test_logout_request_redirect_verification_kp_no_signature_rejected(self):
|
||||
"""LogoutRequest with verification_kp but no Signature/SigAlg should be rejected"""
|
||||
self.provider.verification_kp = self.cert
|
||||
self.provider.save()
|
||||
|
||||
# Generate an unsigned logout request
|
||||
processor = LogoutRequestProcessor(
|
||||
provider=self.provider,
|
||||
user=None,
|
||||
destination="https://idp.example.com/sls",
|
||||
name_id="test@example.com",
|
||||
name_id_format=SAML_NAME_ID_FORMAT_EMAIL,
|
||||
session_index="test-session-123",
|
||||
)
|
||||
encoded_request = processor.encode_redirect()
|
||||
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLRequest": encoded_request,
|
||||
"RelayState": "https://sp.example.com/return",
|
||||
# No Signature or SigAlg params
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
request.brand = self.brand
|
||||
|
||||
view = SPInitiatedSLOBindingRedirectView()
|
||||
view.setup(request, application_slug=self.application.slug)
|
||||
view.resolve_provider_application()
|
||||
|
||||
result = view.check_saml_request()
|
||||
self.assertIsNotNone(result)
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
||||
def test_logout_request_redirect_valid_signature_accepted(self):
|
||||
"""LogoutRequest with valid detached signature should be accepted"""
|
||||
self.provider.verification_kp = self.cert
|
||||
self.provider.save()
|
||||
|
||||
# Generate a signed logout request URL
|
||||
processor = LogoutRequestProcessor(
|
||||
provider=self.provider,
|
||||
user=None,
|
||||
destination="https://idp.example.com/sls",
|
||||
name_id="test@example.com",
|
||||
name_id_format=SAML_NAME_ID_FORMAT_EMAIL,
|
||||
session_index="test-session-123",
|
||||
relay_state="https://sp.example.com/return",
|
||||
)
|
||||
redirect_url = processor.get_redirect_url()
|
||||
|
||||
# Parse the URL to get query params
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
parsed = urlparse(redirect_url)
|
||||
params = parse_qs(parsed.query)
|
||||
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLRequest": params["SAMLRequest"][0],
|
||||
"RelayState": params["RelayState"][0],
|
||||
"Signature": params["Signature"][0],
|
||||
"SigAlg": params["SigAlg"][0],
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
request.brand = self.brand
|
||||
|
||||
view = SPInitiatedSLOBindingRedirectView()
|
||||
view.setup(request, application_slug=self.application.slug)
|
||||
view.resolve_provider_application()
|
||||
|
||||
result = view.check_saml_request()
|
||||
self.assertIsNone(result) # None means success
|
||||
|
||||
def test_logout_response_post_no_verification_kp_accepted(self):
|
||||
"""POST LogoutResponse without verification_kp should be accepted"""
|
||||
self.provider.verification_kp = None
|
||||
self.provider.save()
|
||||
|
||||
logout_request = LogoutRequest(id="test-id", issuer="https://sp.example.com")
|
||||
processor = LogoutResponseProcessor(
|
||||
provider=self.provider,
|
||||
logout_request=logout_request,
|
||||
destination="https://idp.example.com/sls",
|
||||
)
|
||||
encoded_response = processor.encode_post()
|
||||
|
||||
request = self.factory.post(
|
||||
f"/slo/post/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": encoded_response,
|
||||
"RelayState": "https://sp.example.com/return",
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
request.brand = self.brand
|
||||
|
||||
view = SPInitiatedSLOBindingPOSTView()
|
||||
view.setup(request, application_slug=self.application.slug)
|
||||
response = view.dispatch(request, application_slug=self.application.slug)
|
||||
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response.url, "https://sp.example.com/return")
|
||||
|
||||
def test_logout_response_post_verification_kp_no_signature_rejected(self):
|
||||
"""POST LogoutResponse with verification_kp but no enveloped signature should fail"""
|
||||
self.provider.verification_kp = self.cert
|
||||
# Disable signing so the response won't have a signature
|
||||
self.provider.sign_logout_response = False
|
||||
self.provider.save()
|
||||
|
||||
logout_request = LogoutRequest(id="test-id", issuer="https://sp.example.com")
|
||||
processor = LogoutResponseProcessor(
|
||||
provider=self.provider,
|
||||
logout_request=logout_request,
|
||||
destination="https://idp.example.com/sls",
|
||||
)
|
||||
encoded_response = processor.encode_post()
|
||||
|
||||
request = self.factory.post(
|
||||
f"/slo/post/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": encoded_response,
|
||||
"RelayState": "https://sp.example.com/return",
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
request.brand = self.brand
|
||||
|
||||
view = SPInitiatedSLOBindingPOSTView()
|
||||
view.setup(request, application_slug=self.application.slug)
|
||||
response = view.dispatch(request, application_slug=self.application.slug)
|
||||
|
||||
# Should redirect to root (rejected)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response.url, reverse("authentik_core:root-redirect"))
|
||||
|
||||
def test_logout_response_post_valid_signature_accepted(self):
|
||||
"""POST LogoutResponse with valid enveloped signature should be accepted"""
|
||||
self.provider.verification_kp = self.cert
|
||||
self.provider.sign_logout_response = True
|
||||
self.provider.save()
|
||||
|
||||
logout_request = LogoutRequest(id="test-id", issuer="https://sp.example.com")
|
||||
processor = LogoutResponseProcessor(
|
||||
provider=self.provider,
|
||||
logout_request=logout_request,
|
||||
destination="https://idp.example.com/sls",
|
||||
)
|
||||
encoded_response = processor.encode_post()
|
||||
|
||||
request = self.factory.post(
|
||||
f"/slo/post/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": encoded_response,
|
||||
"RelayState": "https://sp.example.com/return",
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
request.brand = self.brand
|
||||
|
||||
view = SPInitiatedSLOBindingPOSTView()
|
||||
view.setup(request, application_slug=self.application.slug)
|
||||
response = view.dispatch(request, application_slug=self.application.slug)
|
||||
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response.url, "https://sp.example.com/return")
|
||||
|
||||
def test_logout_request_post_verification_kp_no_signature_rejected(self):
|
||||
"""POST LogoutRequest with verification_kp but no signature should be rejected"""
|
||||
self.provider.verification_kp = self.cert
|
||||
# Disable signing so the request won't have a signature
|
||||
self.provider.sign_logout_request = False
|
||||
self.provider.save()
|
||||
|
||||
processor = LogoutRequestProcessor(
|
||||
provider=self.provider,
|
||||
user=None,
|
||||
destination="https://idp.example.com/sls",
|
||||
name_id="test@example.com",
|
||||
name_id_format=SAML_NAME_ID_FORMAT_EMAIL,
|
||||
session_index="test-session-123",
|
||||
)
|
||||
encoded_request = processor.encode_post()
|
||||
|
||||
request = self.factory.post(
|
||||
f"/slo/post/{self.application.slug}/",
|
||||
{
|
||||
"SAMLRequest": encoded_request,
|
||||
"RelayState": "https://sp.example.com/return",
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
request.brand = self.brand
|
||||
|
||||
view = SPInitiatedSLOBindingPOSTView()
|
||||
view.setup(request, application_slug=self.application.slug)
|
||||
view.resolve_provider_application()
|
||||
|
||||
result = view.check_saml_request()
|
||||
self.assertIsNotNone(result)
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
||||
def test_logout_request_post_valid_signature_accepted(self):
|
||||
"""POST LogoutRequest with valid enveloped signature should be accepted"""
|
||||
self.provider.verification_kp = self.cert
|
||||
self.provider.sign_logout_request = True
|
||||
self.provider.save()
|
||||
|
||||
processor = LogoutRequestProcessor(
|
||||
provider=self.provider,
|
||||
user=None,
|
||||
destination="https://idp.example.com/sls",
|
||||
name_id="test@example.com",
|
||||
name_id_format=SAML_NAME_ID_FORMAT_EMAIL,
|
||||
session_index="test-session-123",
|
||||
)
|
||||
encoded_request = processor.encode_post()
|
||||
|
||||
request = self.factory.post(
|
||||
f"/slo/post/{self.application.slug}/",
|
||||
{
|
||||
"SAMLRequest": encoded_request,
|
||||
"RelayState": "https://sp.example.com/return",
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
request.brand = self.brand
|
||||
|
||||
view = SPInitiatedSLOBindingPOSTView()
|
||||
view.setup(request, application_slug=self.application.slug)
|
||||
view.resolve_provider_application()
|
||||
|
||||
result = view.check_saml_request()
|
||||
self.assertIsNone(result) # None means success
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
"""SP-initiated SAML Single Logout Views"""
|
||||
|
||||
from base64 import b64decode
|
||||
|
||||
from django.http import Http404, HttpRequest, HttpResponse
|
||||
from django.shortcuts import get_object_or_404, redirect
|
||||
from django.utils.decorators import method_decorator
|
||||
@@ -7,6 +9,12 @@ from django.views.decorators.clickjacking import xframe_options_sameorigin
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from structlog.stdlib import get_logger
|
||||
|
||||
from authentik.common.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.common.saml.parsers.logout_response import LogoutResponseParser
|
||||
from authentik.common.saml.parsers.verify import (
|
||||
verify_detached_signature,
|
||||
verify_enveloped_signature,
|
||||
)
|
||||
from authentik.core.models import Application, AuthenticatedSession
|
||||
from authentik.events.models import Event, EventAction
|
||||
from authentik.flows.models import Flow, in_memory_stage
|
||||
@@ -16,7 +24,6 @@ from authentik.flows.views.executor import SESSION_KEY_PLAN
|
||||
from authentik.lib.views import bad_request_message
|
||||
from authentik.policies.views import PolicyAccessView
|
||||
from authentik.providers.iframe_logout import IframeLogoutStageView
|
||||
from authentik.providers.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.providers.saml.models import (
|
||||
SAMLBindings,
|
||||
SAMLLogoutMethods,
|
||||
@@ -36,6 +43,8 @@ from authentik.providers.saml.views.flows import (
|
||||
REQUEST_KEY_RELAY_STATE,
|
||||
REQUEST_KEY_SAML_REQUEST,
|
||||
REQUEST_KEY_SAML_RESPONSE,
|
||||
REQUEST_KEY_SAML_SIG_ALG,
|
||||
REQUEST_KEY_SAML_SIGNATURE,
|
||||
)
|
||||
|
||||
LOGGER = get_logger()
|
||||
@@ -203,6 +212,35 @@ class SPInitiatedSLOBindingRedirectView(SPInitiatedSLOView):
|
||||
# IDP SLO, so we want to redirect to our next provider
|
||||
if REQUEST_KEY_SAML_RESPONSE in request.GET:
|
||||
relay_state = request.GET.get(REQUEST_KEY_RELAY_STATE, "")
|
||||
|
||||
# Resolve provider for signature verification
|
||||
try:
|
||||
application = Application.objects.get(slug=kwargs.get("application_slug", ""))
|
||||
provider = SAMLProvider.objects.get(pk=application.provider_id)
|
||||
except Application.DoesNotExist, SAMLProvider.DoesNotExist:
|
||||
return redirect("authentik_core:root-redirect")
|
||||
|
||||
# Parse and verify LogoutResponse
|
||||
try:
|
||||
parser = LogoutResponseParser()
|
||||
logout_response = parser.parse_detached(
|
||||
request.GET[REQUEST_KEY_SAML_RESPONSE],
|
||||
relay_state=relay_state or None,
|
||||
)
|
||||
parser.verify_status(logout_response)
|
||||
if provider.verification_kp:
|
||||
verify_detached_signature(
|
||||
"SAMLResponse",
|
||||
request.GET[REQUEST_KEY_SAML_RESPONSE],
|
||||
relay_state or None,
|
||||
request.GET.get(REQUEST_KEY_SAML_SIGNATURE),
|
||||
request.GET.get(REQUEST_KEY_SAML_SIG_ALG),
|
||||
provider.verification_kp,
|
||||
)
|
||||
except CannotHandleAssertion as exc:
|
||||
LOGGER.warning("Failed to verify SAML LogoutResponse", exc=str(exc))
|
||||
return redirect("authentik_core:root-redirect")
|
||||
|
||||
if relay_state:
|
||||
return redirect(relay_state)
|
||||
|
||||
@@ -230,6 +268,15 @@ class SPInitiatedSLOBindingRedirectView(SPInitiatedSLOView):
|
||||
self.request.GET[REQUEST_KEY_SAML_REQUEST],
|
||||
relay_state=self.request.GET.get(REQUEST_KEY_RELAY_STATE, None),
|
||||
)
|
||||
if self.provider.verification_kp:
|
||||
verify_detached_signature(
|
||||
"SAMLRequest",
|
||||
self.request.GET[REQUEST_KEY_SAML_REQUEST],
|
||||
self.request.GET.get(REQUEST_KEY_RELAY_STATE),
|
||||
self.request.GET.get(REQUEST_KEY_SAML_SIGNATURE),
|
||||
self.request.GET.get(REQUEST_KEY_SAML_SIG_ALG),
|
||||
self.provider.verification_kp,
|
||||
)
|
||||
self.plan_context[PLAN_CONTEXT_SAML_LOGOUT_REQUEST] = logout_request
|
||||
except CannotHandleAssertion as exc:
|
||||
Event.new(
|
||||
@@ -254,6 +301,32 @@ class SPInitiatedSLOBindingPOSTView(SPInitiatedSLOView):
|
||||
# IDP SLO, so we want to redirect to our next provider
|
||||
if REQUEST_KEY_SAML_RESPONSE in request.POST:
|
||||
relay_state = request.POST.get(REQUEST_KEY_RELAY_STATE, "")
|
||||
|
||||
# Resolve provider for signature verification
|
||||
try:
|
||||
application = Application.objects.get(slug=kwargs.get("application_slug", ""))
|
||||
provider = SAMLProvider.objects.get(pk=application.provider_id)
|
||||
except Application.DoesNotExist, SAMLProvider.DoesNotExist:
|
||||
return redirect("authentik_core:root-redirect")
|
||||
|
||||
# Parse and verify LogoutResponse
|
||||
try:
|
||||
parser = LogoutResponseParser()
|
||||
logout_response = parser.parse(
|
||||
request.POST[REQUEST_KEY_SAML_RESPONSE],
|
||||
relay_state=relay_state or None,
|
||||
)
|
||||
parser.verify_status(logout_response)
|
||||
if provider.verification_kp:
|
||||
verify_enveloped_signature(
|
||||
b64decode(request.POST[REQUEST_KEY_SAML_RESPONSE].encode()),
|
||||
provider.verification_kp,
|
||||
"/samlp:LogoutResponse/ds:Signature",
|
||||
)
|
||||
except CannotHandleAssertion as exc:
|
||||
LOGGER.warning("Failed to verify SAML LogoutResponse", exc=str(exc))
|
||||
return redirect("authentik_core:root-redirect")
|
||||
|
||||
if relay_state:
|
||||
return redirect(relay_state)
|
||||
|
||||
@@ -282,6 +355,12 @@ class SPInitiatedSLOBindingPOSTView(SPInitiatedSLOView):
|
||||
payload[REQUEST_KEY_SAML_REQUEST],
|
||||
relay_state=payload.get(REQUEST_KEY_RELAY_STATE, None),
|
||||
)
|
||||
if self.provider.verification_kp:
|
||||
verify_enveloped_signature(
|
||||
b64decode(payload[REQUEST_KEY_SAML_REQUEST].encode()),
|
||||
self.provider.verification_kp,
|
||||
"/samlp:LogoutRequest/ds:Signature",
|
||||
)
|
||||
self.plan_context[PLAN_CONTEXT_SAML_LOGOUT_REQUEST] = logout_request
|
||||
except CannotHandleAssertion as exc:
|
||||
LOGGER.info(str(exc))
|
||||
|
||||
@@ -8,6 +8,7 @@ from django.views.decorators.clickjacking import xframe_options_sameorigin
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from structlog.stdlib import get_logger
|
||||
|
||||
from authentik.common.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.core.models import Application
|
||||
from authentik.events.models import Event, EventAction
|
||||
from authentik.flows.exceptions import FlowNonApplicableException
|
||||
@@ -16,7 +17,6 @@ from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO,
|
||||
from authentik.flows.views.executor import SESSION_KEY_POST
|
||||
from authentik.lib.views import bad_request_message
|
||||
from authentik.policies.views import BufferedPolicyAccessView
|
||||
from authentik.providers.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.providers.saml.models import SAMLBindings, SAMLProvider
|
||||
from authentik.providers.saml.processors.authn_request_parser import AuthNRequestParser
|
||||
from authentik.providers.saml.views.flows import (
|
||||
|
||||
Reference in New Issue
Block a user