mirror of
https://github.com/goauthentik/authentik
synced 2026-05-05 22:52:42 +02:00
Compare commits
5 Commits
a11y-modal
...
saml-sig-e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6155b9c8bc | ||
|
|
578f8726f3 | ||
|
|
c49006c8bf | ||
|
|
56c6913f05 | ||
|
|
7c22845cf0 |
@@ -320,14 +320,6 @@ class AssertionProcessor:
|
||||
ns=xmlsec.constants.DSigNs,
|
||||
)
|
||||
assertion.append(signature)
|
||||
if self.provider.encryption_kp:
|
||||
encryption = xmlsec.template.encrypted_data_create(
|
||||
assertion,
|
||||
xmlsec.constants.TransformAes128Cbc,
|
||||
self._assertion_id,
|
||||
ns=xmlsec.constants.DSigNs,
|
||||
)
|
||||
assertion.append(encryption)
|
||||
|
||||
assertion.append(self.get_assertion_subject())
|
||||
assertion.append(self.get_assertion_conditions())
|
||||
@@ -403,12 +395,15 @@ class AssertionProcessor:
|
||||
|
||||
def _encrypt(self, element: Element, parent: Element):
|
||||
"""Encrypt SAMLResponse EncryptedAssertion Element"""
|
||||
# Create a standalone copy so namespace declarations are included in the encrypted content
|
||||
element_xml = etree.tostring(element)
|
||||
standalone_element = etree.fromstring(element_xml)
|
||||
|
||||
# Remove the original element from the tree since we're replacing it with encrypted version
|
||||
parent.remove(element)
|
||||
|
||||
manager = xmlsec.KeysManager()
|
||||
key = xmlsec.Key.from_memory(
|
||||
self.provider.encryption_kp.key_data,
|
||||
xmlsec.constants.KeyDataFormatPem,
|
||||
)
|
||||
key.load_cert_from_memory(
|
||||
self.provider.encryption_kp.certificate_data,
|
||||
xmlsec.constants.KeyDataFormatCertPem,
|
||||
)
|
||||
@@ -429,24 +424,25 @@ class AssertionProcessor:
|
||||
xmlsec.template.encrypted_data_ensure_cipher_value(enc_key)
|
||||
|
||||
try:
|
||||
enc_data = encryption_context.encrypt_xml(enc_data, element)
|
||||
enc_data = encryption_context.encrypt_xml(enc_data, standalone_element)
|
||||
except xmlsec.Error as exc:
|
||||
raise InvalidEncryption() from exc
|
||||
|
||||
parent.remove(enc_data)
|
||||
container.append(enc_data)
|
||||
|
||||
def build_response(self) -> str:
|
||||
"""Build string XML Response and sign if signing is enabled."""
|
||||
root_response = self.get_response()
|
||||
if self.provider.signing_kp:
|
||||
if self.provider.sign_assertion:
|
||||
assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
|
||||
self._sign(assertion)
|
||||
if self.provider.sign_response:
|
||||
response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0]
|
||||
self._sign(response)
|
||||
# Sign assertion first (before encryption)
|
||||
if self.provider.signing_kp and self.provider.sign_assertion:
|
||||
assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
|
||||
self._sign(assertion)
|
||||
# Encrypt assertion (this replaces Assertion with EncryptedAssertion)
|
||||
if self.provider.encryption_kp:
|
||||
assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
|
||||
self._encrypt(assertion, root_response)
|
||||
# Sign response AFTER encryption so signature covers the encrypted content
|
||||
if self.provider.signing_kp and self.provider.sign_response:
|
||||
response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0]
|
||||
self._sign(response)
|
||||
return etree.tostring(root_response).decode("utf-8") # nosec
|
||||
|
||||
@@ -150,6 +150,257 @@ class TestAuthNRequest(TestCase):
|
||||
response_parser = ResponseProcessor(self.source, http_request)
|
||||
response_parser.parse()
|
||||
|
||||
def test_request_encrypt_cert_only(self):
|
||||
"""Test SAML encryption with certificate-only keypair (no private key).
|
||||
|
||||
This tests the scenario where the IdP (provider) only has the SP's public
|
||||
certificate for encryption, without a private key. This is the expected
|
||||
real-world scenario since the SP would never share their private key.
|
||||
"""
|
||||
# Create a full keypair for the source (SP) - it needs the private key to decrypt
|
||||
full_keypair = create_test_cert()
|
||||
|
||||
# Create a certificate-only keypair for the provider (IdP)
|
||||
# This simulates having only the SP's public certificate
|
||||
cert_only = CertificateKeyPair.objects.create(
|
||||
name=generate_id(),
|
||||
certificate_data=full_keypair.certificate_data,
|
||||
key_data="", # No private key
|
||||
)
|
||||
|
||||
self.provider.encryption_kp = cert_only
|
||||
self.provider.save()
|
||||
self.source.encryption_kp = full_keypair
|
||||
self.source.save()
|
||||
http_request = self.request_factory.get("/", user=get_anonymous_user())
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
request = request_proc.build_auth_n()
|
||||
|
||||
# To get an assertion we need a parsed request (parsed by provider)
|
||||
parsed_request = AuthNRequestParser(self.provider).parse(
|
||||
b64encode(request.encode()).decode(), "test_state"
|
||||
)
|
||||
# Now create a response and convert it to string (provider)
|
||||
# This should work with only the certificate (public key) for encryption
|
||||
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
|
||||
response = response_proc.build_response()
|
||||
|
||||
# Now parse the response (source) - decryption requires the private key
|
||||
http_request.POST = QueryDict(mutable=True)
|
||||
http_request.POST["SAMLResponse"] = b64encode(response.encode()).decode()
|
||||
|
||||
response_parser = ResponseProcessor(self.source, http_request)
|
||||
response_parser.parse()
|
||||
|
||||
def test_request_sign_response_and_encrypt(self):
|
||||
"""Test SAML with sign_response enabled AND encryption.
|
||||
|
||||
This tests the fix for signature invalidation when encryption is enabled.
|
||||
The response must be signed AFTER encryption, not before, because encryption
|
||||
replaces the Assertion with EncryptedAssertion which changes the response content.
|
||||
"""
|
||||
self.provider.sign_response = True
|
||||
self.provider.sign_assertion = False
|
||||
self.provider.encryption_kp = self.cert
|
||||
self.provider.save()
|
||||
self.source.encryption_kp = self.cert
|
||||
self.source.signed_response = True
|
||||
self.source.signed_assertion = False # Only response is signed, not assertion
|
||||
self.source.save()
|
||||
http_request = self.request_factory.get("/", user=get_anonymous_user())
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
request = request_proc.build_auth_n()
|
||||
|
||||
# To get an assertion we need a parsed request (parsed by provider)
|
||||
parsed_request = AuthNRequestParser(self.provider).parse(
|
||||
b64encode(request.encode()).decode(), "test_state"
|
||||
)
|
||||
# Now create a response and convert it to string (provider)
|
||||
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
|
||||
response = response_proc.build_response()
|
||||
|
||||
# Verify the response contains EncryptedAssertion and a signature
|
||||
response_xml = fromstring(response)
|
||||
self.assertEqual(len(response_xml.xpath("//saml:EncryptedAssertion", namespaces=NS_MAP)), 1)
|
||||
self.assertEqual(
|
||||
len(response_xml.xpath("//samlp:Response/ds:Signature", namespaces=NS_MAP)), 1
|
||||
)
|
||||
|
||||
# Now parse the response (source) - this will verify the signature and decrypt
|
||||
http_request.POST = QueryDict(mutable=True)
|
||||
http_request.POST["SAMLResponse"] = b64encode(response.encode()).decode()
|
||||
|
||||
response_parser = ResponseProcessor(self.source, http_request)
|
||||
response_parser.parse()
|
||||
|
||||
def test_request_sign_assertion_and_encrypt(self):
|
||||
"""Test SAML with sign_assertion enabled AND encryption.
|
||||
|
||||
The assertion signature should be inside the encrypted content and
|
||||
remain valid after decryption.
|
||||
"""
|
||||
self.provider.sign_response = False
|
||||
self.provider.sign_assertion = True
|
||||
self.provider.encryption_kp = self.cert
|
||||
self.provider.save()
|
||||
self.source.encryption_kp = self.cert
|
||||
self.source.signed_assertion = True
|
||||
self.source.save()
|
||||
http_request = self.request_factory.get("/", user=get_anonymous_user())
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
request = request_proc.build_auth_n()
|
||||
|
||||
# To get an assertion we need a parsed request (parsed by provider)
|
||||
parsed_request = AuthNRequestParser(self.provider).parse(
|
||||
b64encode(request.encode()).decode(), "test_state"
|
||||
)
|
||||
# Now create a response and convert it to string (provider)
|
||||
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
|
||||
response = response_proc.build_response()
|
||||
|
||||
# Verify the response contains EncryptedAssertion
|
||||
response_xml = fromstring(response)
|
||||
self.assertEqual(len(response_xml.xpath("//saml:EncryptedAssertion", namespaces=NS_MAP)), 1)
|
||||
|
||||
# Now parse the response (source) - this will decrypt and verify assertion signature
|
||||
http_request.POST = QueryDict(mutable=True)
|
||||
http_request.POST["SAMLResponse"] = b64encode(response.encode()).decode()
|
||||
|
||||
response_parser = ResponseProcessor(self.source, http_request)
|
||||
response_parser.parse()
|
||||
|
||||
def test_request_sign_both_and_encrypt(self):
|
||||
"""Test SAML with both sign_assertion and sign_response enabled AND encryption.
|
||||
|
||||
This is the most complex scenario: assertion is signed, then encrypted,
|
||||
then the response is signed. Both signatures should be valid.
|
||||
"""
|
||||
self.provider.sign_response = True
|
||||
self.provider.sign_assertion = True
|
||||
self.provider.encryption_kp = self.cert
|
||||
self.provider.save()
|
||||
self.source.encryption_kp = self.cert
|
||||
self.source.signed_assertion = True
|
||||
self.source.signed_response = True
|
||||
self.source.save()
|
||||
http_request = self.request_factory.get("/", user=get_anonymous_user())
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
request = request_proc.build_auth_n()
|
||||
|
||||
# To get an assertion we need a parsed request (parsed by provider)
|
||||
parsed_request = AuthNRequestParser(self.provider).parse(
|
||||
b64encode(request.encode()).decode(), "test_state"
|
||||
)
|
||||
# Now create a response and convert it to string (provider)
|
||||
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
|
||||
response = response_proc.build_response()
|
||||
|
||||
# Verify the response contains EncryptedAssertion and response signature
|
||||
response_xml = fromstring(response)
|
||||
self.assertEqual(len(response_xml.xpath("//saml:EncryptedAssertion", namespaces=NS_MAP)), 1)
|
||||
self.assertEqual(
|
||||
len(response_xml.xpath("//samlp:Response/ds:Signature", namespaces=NS_MAP)), 1
|
||||
)
|
||||
|
||||
# Now parse the response (source) - this will verify response signature,
|
||||
# decrypt, then verify assertion signature
|
||||
http_request.POST = QueryDict(mutable=True)
|
||||
http_request.POST["SAMLResponse"] = b64encode(response.encode()).decode()
|
||||
|
||||
response_parser = ResponseProcessor(self.source, http_request)
|
||||
response_parser.parse()
|
||||
|
||||
def test_encrypted_assertion_namespace_preservation(self):
|
||||
"""Test that encrypted assertions include namespace declarations.
|
||||
|
||||
When an assertion is encrypted, the resulting decrypted XML must include
|
||||
the necessary namespace declarations (xmlns:saml) since it's now a standalone
|
||||
document fragment, no longer inheriting namespaces from the parent Response.
|
||||
"""
|
||||
self.provider.encryption_kp = self.cert
|
||||
self.provider.save()
|
||||
self.source.encryption_kp = self.cert
|
||||
self.source.save()
|
||||
http_request = self.request_factory.get("/", user=get_anonymous_user())
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
request = request_proc.build_auth_n()
|
||||
|
||||
# To get an assertion we need a parsed request (parsed by provider)
|
||||
parsed_request = AuthNRequestParser(self.provider).parse(
|
||||
b64encode(request.encode()).decode(), "test_state"
|
||||
)
|
||||
# Now create a response and convert it to string (provider)
|
||||
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
|
||||
response = response_proc.build_response()
|
||||
|
||||
# Parse the encrypted response
|
||||
response_xml = fromstring(response)
|
||||
encrypted_assertion = response_xml.xpath("//saml:EncryptedAssertion", namespaces=NS_MAP)[0]
|
||||
encrypted_data = encrypted_assertion.xpath("//xenc:EncryptedData", namespaces=NS_MAP)[0]
|
||||
|
||||
# Decrypt the assertion manually to verify namespace is present
|
||||
import xmlsec
|
||||
|
||||
manager = xmlsec.KeysManager()
|
||||
key = xmlsec.Key.from_memory(self.cert.key_data, xmlsec.constants.KeyDataFormatPem, None)
|
||||
manager.add_key(key)
|
||||
enc_ctx = xmlsec.EncryptionContext(manager)
|
||||
decrypted = enc_ctx.decrypt(encrypted_data)
|
||||
|
||||
# The decrypted assertion should have xmlns:saml namespace declaration
|
||||
decrypted_str = etree.tostring(decrypted).decode()
|
||||
self.assertIn("xmlns:saml", decrypted_str)
|
||||
|
||||
# Also verify full round-trip works (source can parse it)
|
||||
http_request.POST = QueryDict(mutable=True)
|
||||
http_request.POST["SAMLResponse"] = b64encode(response.encode()).decode()
|
||||
|
||||
response_parser = ResponseProcessor(self.source, http_request)
|
||||
response_parser.parse()
|
||||
|
||||
def test_encrypted_response_schema_validation(self):
|
||||
"""Test that encrypted SAML responses validate against the SAML schema.
|
||||
|
||||
The response with EncryptedAssertion must be valid per saml-schema-protocol-2.0.xsd.
|
||||
This ensures we don't have invalid elements like EncryptedData inside Assertion.
|
||||
"""
|
||||
self.provider.encryption_kp = self.cert
|
||||
self.provider.save()
|
||||
http_request = self.request_factory.get("/", user=get_anonymous_user())
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
request = request_proc.build_auth_n()
|
||||
|
||||
# To get an assertion we need a parsed request (parsed by provider)
|
||||
parsed_request = AuthNRequestParser(self.provider).parse(
|
||||
b64encode(request.encode()).decode(), "test_state"
|
||||
)
|
||||
# Now create a response and convert it to string (provider)
|
||||
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
|
||||
response = response_proc.build_response()
|
||||
|
||||
# Validate against SAML schema
|
||||
schema = etree.XMLSchema(
|
||||
etree.parse("schemas/saml-schema-protocol-2.0.xsd", parser=etree.XMLParser()) # nosec
|
||||
)
|
||||
self.assertTrue(schema.validate(lxml_from_string(response)))
|
||||
|
||||
# Verify structure: should have EncryptedAssertion, not Assertion with EncryptedData inside
|
||||
response_xml = fromstring(response)
|
||||
self.assertEqual(len(response_xml.xpath("//saml:EncryptedAssertion", namespaces=NS_MAP)), 1)
|
||||
self.assertEqual(len(response_xml.xpath("//saml:Assertion", namespaces=NS_MAP)), 0)
|
||||
|
||||
def test_request_signed(self):
|
||||
"""Test full SAML Request/Response flow, fully signed"""
|
||||
http_request = self.request_factory.get("/", user=get_anonymous_user())
|
||||
|
||||
@@ -77,11 +77,17 @@ class ResponseProcessor:
|
||||
self._root_xml = b64decode(raw_response.encode())
|
||||
self._root = fromstring(self._root_xml)
|
||||
|
||||
# Verify response signature BEFORE decryption (signature covers encrypted content)
|
||||
if self._source.verification_kp and self._source.signed_response:
|
||||
self._verify_response_signature()
|
||||
|
||||
if self._source.encryption_kp:
|
||||
self._decrypt_response()
|
||||
|
||||
if self._source.verification_kp:
|
||||
self._verify_signed()
|
||||
# Verify assertion signature AFTER decryption (signature is inside encrypted content)
|
||||
if self._source.verification_kp and self._source.signed_assertion:
|
||||
self._verify_assertion_signature()
|
||||
|
||||
self._verify_request_id()
|
||||
self._verify_status()
|
||||
|
||||
@@ -114,45 +120,45 @@ class ResponseProcessor:
|
||||
decrypted_assertion,
|
||||
)
|
||||
|
||||
def _verify_signed(self):
|
||||
"""Verify SAML Response's Signature"""
|
||||
signatures = []
|
||||
def _verify_signature(self, signature_node):
|
||||
"""Verify a single signature node"""
|
||||
xmlsec.tree.add_ids(self._root, ["ID"])
|
||||
|
||||
if self._source.signed_response:
|
||||
signature_nodes = self._root.xpath("/samlp:Response/ds:Signature", namespaces=NS_MAP)
|
||||
ctx = xmlsec.SignatureContext()
|
||||
key = xmlsec.Key.from_memory(
|
||||
self._source.verification_kp.certificate_data,
|
||||
xmlsec.constants.KeyDataFormatCertPem,
|
||||
)
|
||||
ctx.key = key
|
||||
|
||||
if len(signature_nodes) != 1:
|
||||
raise InvalidSignature("No Signature exists in the Response element.")
|
||||
signatures.extend(signature_nodes)
|
||||
ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509])
|
||||
try:
|
||||
ctx.verify(signature_node)
|
||||
except xmlsec.Error as exc:
|
||||
raise InvalidSignature(
|
||||
"The signature of the SAML object is either missing or invalid."
|
||||
) from exc
|
||||
LOGGER.debug("Successfully verified signature")
|
||||
|
||||
if self._source.signed_assertion:
|
||||
signature_nodes = self._root.xpath(
|
||||
"/samlp:Response/saml:Assertion/ds:Signature", namespaces=NS_MAP
|
||||
)
|
||||
def _verify_response_signature(self):
|
||||
"""Verify SAML Response's Signature (before decryption)"""
|
||||
signature_nodes = self._root.xpath("/samlp:Response/ds:Signature", namespaces=NS_MAP)
|
||||
|
||||
if len(signature_nodes) != 1:
|
||||
raise InvalidSignature("No Signature exists in the Assertion element.")
|
||||
signatures.extend(signature_nodes)
|
||||
if len(signature_nodes) != 1:
|
||||
raise InvalidSignature("No Signature exists in the Response element.")
|
||||
|
||||
if len(signatures) == 0:
|
||||
raise InvalidSignature()
|
||||
self._verify_signature(signature_nodes[0])
|
||||
|
||||
for signature_node in signatures:
|
||||
xmlsec.tree.add_ids(self._root, ["ID"])
|
||||
def _verify_assertion_signature(self):
|
||||
"""Verify SAML Assertion's Signature (after decryption)"""
|
||||
signature_nodes = self._root.xpath(
|
||||
"/samlp:Response/saml:Assertion/ds:Signature", namespaces=NS_MAP
|
||||
)
|
||||
|
||||
ctx = xmlsec.SignatureContext()
|
||||
key = xmlsec.Key.from_memory(
|
||||
self._source.verification_kp.certificate_data,
|
||||
xmlsec.constants.KeyDataFormatCertPem,
|
||||
)
|
||||
ctx.key = key
|
||||
if len(signature_nodes) != 1:
|
||||
raise InvalidSignature("No Signature exists in the Assertion element.")
|
||||
|
||||
ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509])
|
||||
try:
|
||||
ctx.verify(signature_node)
|
||||
except xmlsec.Error as exc:
|
||||
raise InvalidSignature() from exc
|
||||
LOGGER.debug("Successfully verified signature")
|
||||
self._verify_signature(signature_nodes[0])
|
||||
|
||||
def _verify_request_id(self):
|
||||
if self._source.allow_idp_initiated:
|
||||
|
||||
@@ -297,6 +297,7 @@ export function renderForm({
|
||||
>
|
||||
<ak-crypto-certificate-search
|
||||
.certificate=${provider.encryptionKp}
|
||||
nokey
|
||||
></ak-crypto-certificate-search>
|
||||
<p class="pf-c-form__helper-text">
|
||||
${msg("When selected, assertions will be encrypted using this keypair.")}
|
||||
|
||||
Reference in New Issue
Block a user