mirror of
https://github.com/goauthentik/authentik
synced 2026-04-25 17:15:26 +02:00
cleanup
This commit is contained in:
@@ -10,8 +10,6 @@ from defusedxml import ElementTree
|
||||
from authentik.common.saml.constants import NS_SAML_ASSERTION, NS_SAML_PROTOCOL, SAML_STATUS_SUCCESS
|
||||
from authentik.common.saml.exceptions import CannotHandleAssertion
|
||||
|
||||
ERROR_CANNOT_DECODE_RESPONSE = "Cannot decode SAML response."
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class LogoutResponse:
|
||||
@@ -62,7 +60,7 @@ class LogoutResponseParser:
|
||||
try:
|
||||
decoded_xml = b64decode(saml_response.encode())
|
||||
except UnicodeDecodeError, binascii.Error:
|
||||
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_RESPONSE) from None
|
||||
raise CannotHandleAssertion("Cannot decode SAML response.") from None
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
|
||||
def parse_detached(self, saml_response: str, relay_state: str | None = None) -> LogoutResponse:
|
||||
@@ -74,7 +72,7 @@ class LogoutResponseParser:
|
||||
except zlib.error:
|
||||
decoded_xml = decoded_data.decode("utf-8")
|
||||
except UnicodeDecodeError, binascii.Error, zlib.error:
|
||||
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_RESPONSE) from None
|
||||
raise CannotHandleAssertion("Cannot decode SAML response.") from None
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
|
||||
def verify_status(self, response: LogoutResponse):
|
||||
|
||||
@@ -12,11 +12,6 @@ from authentik.common.saml.constants import (
|
||||
from authentik.common.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.lib.xml import lxml_from_string
|
||||
|
||||
ERROR_SIGNATURE_REQUIRED_BUT_ABSENT = (
|
||||
"Verification Certificate configured, but message is not signed."
|
||||
)
|
||||
ERROR_FAILED_TO_VERIFY = "Failed to verify signature"
|
||||
|
||||
|
||||
def verify_enveloped_signature(raw_xml: bytes, verification_kp, xpath: str):
|
||||
"""Verify an enveloped XML signature.
|
||||
@@ -31,7 +26,9 @@ def verify_enveloped_signature(raw_xml: bytes, verification_kp, xpath: str):
|
||||
signature_nodes = root.xpath(xpath, namespaces=NS_MAP)
|
||||
|
||||
if len(signature_nodes) < 1:
|
||||
raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
|
||||
raise CannotHandleAssertion(
|
||||
"Verification Certificate configured, but message is not signed."
|
||||
)
|
||||
|
||||
signature_node = signature_nodes[0]
|
||||
|
||||
@@ -45,7 +42,7 @@ def verify_enveloped_signature(raw_xml: bytes, verification_kp, xpath: str):
|
||||
ctx.key = key
|
||||
ctx.verify(signature_node)
|
||||
except xmlsec.Error as exc:
|
||||
raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
|
||||
raise CannotHandleAssertion("Failed to verify signature") from exc
|
||||
|
||||
|
||||
def verify_detached_signature(
|
||||
@@ -67,7 +64,9 @@ def verify_detached_signature(
|
||||
verification_kp: CertificateKeyPair with certificate_data
|
||||
"""
|
||||
if not (signature and sig_alg):
|
||||
raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
|
||||
raise CannotHandleAssertion(
|
||||
"Verification Certificate configured, but message is not signed."
|
||||
)
|
||||
|
||||
querystring = f"{saml_param_name}={quote_plus(saml_value)}&"
|
||||
if relay_state is not None:
|
||||
@@ -91,4 +90,4 @@ def verify_detached_signature(
|
||||
b64decode(signature),
|
||||
)
|
||||
except xmlsec.Error as exc:
|
||||
raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
|
||||
raise CannotHandleAssertion("Failed to verify signature") from exc
|
||||
|
||||
@@ -25,12 +25,6 @@ from authentik.providers.saml.models import SAMLProvider
|
||||
from authentik.providers.saml.utils.encoding import decode_base64_and_inflate
|
||||
from authentik.sources.saml.models import SAMLNameIDPolicy
|
||||
|
||||
ERROR_CANNOT_DECODE_REQUEST = "Cannot decode SAML request."
|
||||
ERROR_SIGNATURE_REQUIRED_BUT_ABSENT = (
|
||||
"Verification Certificate configured, but request is not signed."
|
||||
)
|
||||
ERROR_FAILED_TO_VERIFY = "Failed to verify signature"
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class AuthNRequest:
|
||||
@@ -88,7 +82,7 @@ class AuthNRequestParser:
|
||||
try:
|
||||
decoded_xml = b64decode(saml_request.encode())
|
||||
except UnicodeDecodeError:
|
||||
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
|
||||
raise CannotHandleAssertion("Cannot decode SAML request.") from None
|
||||
|
||||
verifier = self.provider.verification_kp
|
||||
if not verifier:
|
||||
@@ -99,7 +93,9 @@ class AuthNRequestParser:
|
||||
signature_nodes = root.xpath("/samlp:AuthnRequest/ds:Signature", namespaces=NS_MAP)
|
||||
# No signatures, no verifier configured -> decode xml directly
|
||||
if len(signature_nodes) < 1:
|
||||
raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
|
||||
raise CannotHandleAssertion(
|
||||
"Verification Certificate configured, but request is not signed."
|
||||
)
|
||||
|
||||
signature_node = signature_nodes[0]
|
||||
|
||||
@@ -114,7 +110,7 @@ class AuthNRequestParser:
|
||||
ctx.key = key
|
||||
ctx.verify(signature_node)
|
||||
except xmlsec.Error as exc:
|
||||
raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
|
||||
raise CannotHandleAssertion("Failed to verify signature") from exc
|
||||
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
|
||||
@@ -129,14 +125,16 @@ class AuthNRequestParser:
|
||||
try:
|
||||
decoded_xml = decode_base64_and_inflate(saml_request)
|
||||
except UnicodeDecodeError:
|
||||
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
|
||||
raise CannotHandleAssertion("Cannot decode SAML request.") from None
|
||||
|
||||
verifier = self.provider.verification_kp
|
||||
if not verifier:
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
|
||||
if verifier and not (signature and sig_alg):
|
||||
raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
|
||||
raise CannotHandleAssertion(
|
||||
"Verification Certificate configured, but request is not signed."
|
||||
)
|
||||
|
||||
if signature and sig_alg:
|
||||
querystring = f"SAMLRequest={quote_plus(saml_request)}&"
|
||||
@@ -168,11 +166,11 @@ class AuthNRequestParser:
|
||||
b64decode(signature),
|
||||
)
|
||||
except xmlsec.Error as exc:
|
||||
raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
|
||||
raise CannotHandleAssertion("Failed to verify signature") from exc
|
||||
try:
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
except ParseError as exc:
|
||||
raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
|
||||
raise CannotHandleAssertion("Failed to verify signature") from exc
|
||||
|
||||
def idp_initiated(self) -> AuthNRequest:
|
||||
"""Create IdP Initiated AuthNRequest"""
|
||||
|
||||
@@ -8,7 +8,6 @@ from defusedxml import ElementTree
|
||||
from authentik.common.saml.constants import NS_SAML_ASSERTION, NS_SAML_PROTOCOL
|
||||
from authentik.common.saml.exceptions import CannotHandleAssertion
|
||||
from authentik.providers.saml.models import SAMLProvider
|
||||
from authentik.providers.saml.processors.authn_request_parser import ERROR_CANNOT_DECODE_REQUEST
|
||||
from authentik.providers.saml.utils.encoding import decode_base64_and_inflate
|
||||
|
||||
|
||||
@@ -74,7 +73,7 @@ class LogoutRequestParser:
|
||||
try:
|
||||
decoded_xml = b64decode(saml_request.encode())
|
||||
except UnicodeDecodeError:
|
||||
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
|
||||
raise CannotHandleAssertion("Cannot decode SAML request.") from None
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
|
||||
def parse_detached(
|
||||
@@ -86,6 +85,6 @@ class LogoutRequestParser:
|
||||
try:
|
||||
decoded_xml = decode_base64_and_inflate(saml_request)
|
||||
except UnicodeDecodeError:
|
||||
raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
|
||||
raise CannotHandleAssertion("Cannot decode SAML request.") from None
|
||||
|
||||
return self._parse_xml(decoded_xml, relay_state)
|
||||
|
||||
@@ -72,14 +72,6 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
destination="https://idp.example.com/sls",
|
||||
)
|
||||
|
||||
def _encode_logout_response_redirect(self):
|
||||
"""Generate a valid encoded LogoutResponse for redirect binding."""
|
||||
return self._response_processor.encode_redirect()
|
||||
|
||||
def _encode_logout_response_post(self):
|
||||
"""Generate a valid encoded LogoutResponse for POST binding."""
|
||||
return self._response_processor.encode_post()
|
||||
|
||||
def test_redirect_view_handles_logout_request(self):
|
||||
"""Test that redirect view properly handles a logout request"""
|
||||
# Generate encoded logout request
|
||||
@@ -119,7 +111,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": self._encode_logout_response_redirect(),
|
||||
"SAMLResponse": self._response_processor.encode_redirect(),
|
||||
"RelayState": relay_state,
|
||||
},
|
||||
)
|
||||
@@ -142,7 +134,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": self._encode_logout_response_redirect(),
|
||||
"SAMLResponse": self._response_processor.encode_redirect(),
|
||||
"RelayState": relay_state,
|
||||
},
|
||||
)
|
||||
@@ -165,7 +157,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": self._encode_logout_response_redirect(),
|
||||
"SAMLResponse": self._response_processor.encode_redirect(),
|
||||
},
|
||||
)
|
||||
# Create a flow plan with the return URL
|
||||
@@ -188,7 +180,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": self._encode_logout_response_redirect(),
|
||||
"SAMLResponse": self._response_processor.encode_redirect(),
|
||||
},
|
||||
)
|
||||
request.session = {}
|
||||
@@ -256,7 +248,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.post(
|
||||
f"/slo/post/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": self._encode_logout_response_post(),
|
||||
"SAMLResponse": self._response_processor.encode_post(),
|
||||
"RelayState": relay_state,
|
||||
},
|
||||
)
|
||||
@@ -279,7 +271,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.post(
|
||||
f"/slo/post/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": self._encode_logout_response_post(),
|
||||
"SAMLResponse": self._response_processor.encode_post(),
|
||||
},
|
||||
)
|
||||
# Create a flow plan with the return URL
|
||||
@@ -441,7 +433,7 @@ class TestSPInitiatedSLOViews(TestCase):
|
||||
request = self.factory.get(
|
||||
f"/slo/redirect/{self.application.slug}/",
|
||||
{
|
||||
"SAMLResponse": self._encode_logout_response_redirect(),
|
||||
"SAMLResponse": self._response_processor.encode_redirect(),
|
||||
"RelayState": "/some/invalid/path", # Use a path that starts with /
|
||||
},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user