Compare commits

...

5 Commits

4 changed files with 308 additions and 54 deletions

View File

@@ -320,14 +320,6 @@ class AssertionProcessor:
ns=xmlsec.constants.DSigNs,
)
assertion.append(signature)
if self.provider.encryption_kp:
encryption = xmlsec.template.encrypted_data_create(
assertion,
xmlsec.constants.TransformAes128Cbc,
self._assertion_id,
ns=xmlsec.constants.DSigNs,
)
assertion.append(encryption)
assertion.append(self.get_assertion_subject())
assertion.append(self.get_assertion_conditions())
@@ -403,12 +395,15 @@ class AssertionProcessor:
def _encrypt(self, element: Element, parent: Element):
"""Encrypt SAMLResponse EncryptedAssertion Element"""
# Create a standalone copy so namespace declarations are included in the encrypted content
element_xml = etree.tostring(element)
standalone_element = etree.fromstring(element_xml)
# Remove the original element from the tree since we're replacing it with encrypted version
parent.remove(element)
manager = xmlsec.KeysManager()
key = xmlsec.Key.from_memory(
self.provider.encryption_kp.key_data,
xmlsec.constants.KeyDataFormatPem,
)
key.load_cert_from_memory(
self.provider.encryption_kp.certificate_data,
xmlsec.constants.KeyDataFormatCertPem,
)
@@ -429,24 +424,25 @@ class AssertionProcessor:
xmlsec.template.encrypted_data_ensure_cipher_value(enc_key)
try:
enc_data = encryption_context.encrypt_xml(enc_data, element)
enc_data = encryption_context.encrypt_xml(enc_data, standalone_element)
except xmlsec.Error as exc:
raise InvalidEncryption() from exc
parent.remove(enc_data)
container.append(enc_data)
def build_response(self) -> str:
"""Build string XML Response and sign if signing is enabled."""
root_response = self.get_response()
if self.provider.signing_kp:
if self.provider.sign_assertion:
assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
self._sign(assertion)
if self.provider.sign_response:
response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0]
self._sign(response)
# Sign assertion first (before encryption)
if self.provider.signing_kp and self.provider.sign_assertion:
assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
self._sign(assertion)
# Encrypt assertion (this replaces Assertion with EncryptedAssertion)
if self.provider.encryption_kp:
assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
self._encrypt(assertion, root_response)
# Sign response AFTER encryption so signature covers the encrypted content
if self.provider.signing_kp and self.provider.sign_response:
response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0]
self._sign(response)
return etree.tostring(root_response).decode("utf-8") # nosec

View File

@@ -150,6 +150,257 @@ class TestAuthNRequest(TestCase):
response_parser = ResponseProcessor(self.source, http_request)
response_parser.parse()
def test_request_encrypt_cert_only(self):
"""Test SAML encryption with certificate-only keypair (no private key).
This tests the scenario where the IdP (provider) only has the SP's public
certificate for encryption, without a private key. This is the expected
real-world scenario since the SP would never share their private key.
"""
# Create a full keypair for the source (SP) - it needs the private key to decrypt
full_keypair = create_test_cert()
# Create a certificate-only keypair for the provider (IdP)
# This simulates having only the SP's public certificate
cert_only = CertificateKeyPair.objects.create(
name=generate_id(),
certificate_data=full_keypair.certificate_data,
key_data="", # No private key
)
self.provider.encryption_kp = cert_only
self.provider.save()
self.source.encryption_kp = full_keypair
self.source.save()
http_request = self.request_factory.get("/", user=get_anonymous_user())
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
request = request_proc.build_auth_n()
# To get an assertion we need a parsed request (parsed by provider)
parsed_request = AuthNRequestParser(self.provider).parse(
b64encode(request.encode()).decode(), "test_state"
)
# Now create a response and convert it to string (provider)
# This should work with only the certificate (public key) for encryption
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
response = response_proc.build_response()
# Now parse the response (source) - decryption requires the private key
http_request.POST = QueryDict(mutable=True)
http_request.POST["SAMLResponse"] = b64encode(response.encode()).decode()
response_parser = ResponseProcessor(self.source, http_request)
response_parser.parse()
def test_request_sign_response_and_encrypt(self):
"""Test SAML with sign_response enabled AND encryption.
This tests the fix for signature invalidation when encryption is enabled.
The response must be signed AFTER encryption, not before, because encryption
replaces the Assertion with EncryptedAssertion which changes the response content.
"""
self.provider.sign_response = True
self.provider.sign_assertion = False
self.provider.encryption_kp = self.cert
self.provider.save()
self.source.encryption_kp = self.cert
self.source.signed_response = True
self.source.signed_assertion = False # Only response is signed, not assertion
self.source.save()
http_request = self.request_factory.get("/", user=get_anonymous_user())
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
request = request_proc.build_auth_n()
# To get an assertion we need a parsed request (parsed by provider)
parsed_request = AuthNRequestParser(self.provider).parse(
b64encode(request.encode()).decode(), "test_state"
)
# Now create a response and convert it to string (provider)
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
response = response_proc.build_response()
# Verify the response contains EncryptedAssertion and a signature
response_xml = fromstring(response)
self.assertEqual(len(response_xml.xpath("//saml:EncryptedAssertion", namespaces=NS_MAP)), 1)
self.assertEqual(
len(response_xml.xpath("//samlp:Response/ds:Signature", namespaces=NS_MAP)), 1
)
# Now parse the response (source) - this will verify the signature and decrypt
http_request.POST = QueryDict(mutable=True)
http_request.POST["SAMLResponse"] = b64encode(response.encode()).decode()
response_parser = ResponseProcessor(self.source, http_request)
response_parser.parse()
def test_request_sign_assertion_and_encrypt(self):
"""Test SAML with sign_assertion enabled AND encryption.
The assertion signature should be inside the encrypted content and
remain valid after decryption.
"""
self.provider.sign_response = False
self.provider.sign_assertion = True
self.provider.encryption_kp = self.cert
self.provider.save()
self.source.encryption_kp = self.cert
self.source.signed_assertion = True
self.source.save()
http_request = self.request_factory.get("/", user=get_anonymous_user())
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
request = request_proc.build_auth_n()
# To get an assertion we need a parsed request (parsed by provider)
parsed_request = AuthNRequestParser(self.provider).parse(
b64encode(request.encode()).decode(), "test_state"
)
# Now create a response and convert it to string (provider)
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
response = response_proc.build_response()
# Verify the response contains EncryptedAssertion
response_xml = fromstring(response)
self.assertEqual(len(response_xml.xpath("//saml:EncryptedAssertion", namespaces=NS_MAP)), 1)
# Now parse the response (source) - this will decrypt and verify assertion signature
http_request.POST = QueryDict(mutable=True)
http_request.POST["SAMLResponse"] = b64encode(response.encode()).decode()
response_parser = ResponseProcessor(self.source, http_request)
response_parser.parse()
def test_request_sign_both_and_encrypt(self):
"""Test SAML with both sign_assertion and sign_response enabled AND encryption.
This is the most complex scenario: assertion is signed, then encrypted,
then the response is signed. Both signatures should be valid.
"""
self.provider.sign_response = True
self.provider.sign_assertion = True
self.provider.encryption_kp = self.cert
self.provider.save()
self.source.encryption_kp = self.cert
self.source.signed_assertion = True
self.source.signed_response = True
self.source.save()
http_request = self.request_factory.get("/", user=get_anonymous_user())
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
request = request_proc.build_auth_n()
# To get an assertion we need a parsed request (parsed by provider)
parsed_request = AuthNRequestParser(self.provider).parse(
b64encode(request.encode()).decode(), "test_state"
)
# Now create a response and convert it to string (provider)
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
response = response_proc.build_response()
# Verify the response contains EncryptedAssertion and response signature
response_xml = fromstring(response)
self.assertEqual(len(response_xml.xpath("//saml:EncryptedAssertion", namespaces=NS_MAP)), 1)
self.assertEqual(
len(response_xml.xpath("//samlp:Response/ds:Signature", namespaces=NS_MAP)), 1
)
# Now parse the response (source) - this will verify response signature,
# decrypt, then verify assertion signature
http_request.POST = QueryDict(mutable=True)
http_request.POST["SAMLResponse"] = b64encode(response.encode()).decode()
response_parser = ResponseProcessor(self.source, http_request)
response_parser.parse()
def test_encrypted_assertion_namespace_preservation(self):
"""Test that encrypted assertions include namespace declarations.
When an assertion is encrypted, the resulting decrypted XML must include
the necessary namespace declarations (xmlns:saml) since it's now a standalone
document fragment, no longer inheriting namespaces from the parent Response.
"""
self.provider.encryption_kp = self.cert
self.provider.save()
self.source.encryption_kp = self.cert
self.source.save()
http_request = self.request_factory.get("/", user=get_anonymous_user())
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
request = request_proc.build_auth_n()
# To get an assertion we need a parsed request (parsed by provider)
parsed_request = AuthNRequestParser(self.provider).parse(
b64encode(request.encode()).decode(), "test_state"
)
# Now create a response and convert it to string (provider)
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
response = response_proc.build_response()
# Parse the encrypted response
response_xml = fromstring(response)
encrypted_assertion = response_xml.xpath("//saml:EncryptedAssertion", namespaces=NS_MAP)[0]
encrypted_data = encrypted_assertion.xpath("//xenc:EncryptedData", namespaces=NS_MAP)[0]
# Decrypt the assertion manually to verify namespace is present
import xmlsec
manager = xmlsec.KeysManager()
key = xmlsec.Key.from_memory(self.cert.key_data, xmlsec.constants.KeyDataFormatPem, None)
manager.add_key(key)
enc_ctx = xmlsec.EncryptionContext(manager)
decrypted = enc_ctx.decrypt(encrypted_data)
# The decrypted assertion should have xmlns:saml namespace declaration
decrypted_str = etree.tostring(decrypted).decode()
self.assertIn("xmlns:saml", decrypted_str)
# Also verify full round-trip works (source can parse it)
http_request.POST = QueryDict(mutable=True)
http_request.POST["SAMLResponse"] = b64encode(response.encode()).decode()
response_parser = ResponseProcessor(self.source, http_request)
response_parser.parse()
def test_encrypted_response_schema_validation(self):
"""Test that encrypted SAML responses validate against the SAML schema.
The response with EncryptedAssertion must be valid per saml-schema-protocol-2.0.xsd.
This ensures we don't have invalid elements like EncryptedData inside Assertion.
"""
self.provider.encryption_kp = self.cert
self.provider.save()
http_request = self.request_factory.get("/", user=get_anonymous_user())
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
request = request_proc.build_auth_n()
# To get an assertion we need a parsed request (parsed by provider)
parsed_request = AuthNRequestParser(self.provider).parse(
b64encode(request.encode()).decode(), "test_state"
)
# Now create a response and convert it to string (provider)
response_proc = AssertionProcessor(self.provider, http_request, parsed_request)
response = response_proc.build_response()
# Validate against SAML schema
schema = etree.XMLSchema(
etree.parse("schemas/saml-schema-protocol-2.0.xsd", parser=etree.XMLParser()) # nosec
)
self.assertTrue(schema.validate(lxml_from_string(response)))
# Verify structure: should have EncryptedAssertion, not Assertion with EncryptedData inside
response_xml = fromstring(response)
self.assertEqual(len(response_xml.xpath("//saml:EncryptedAssertion", namespaces=NS_MAP)), 1)
self.assertEqual(len(response_xml.xpath("//saml:Assertion", namespaces=NS_MAP)), 0)
def test_request_signed(self):
"""Test full SAML Request/Response flow, fully signed"""
http_request = self.request_factory.get("/", user=get_anonymous_user())

View File

@@ -77,11 +77,17 @@ class ResponseProcessor:
self._root_xml = b64decode(raw_response.encode())
self._root = fromstring(self._root_xml)
# Verify response signature BEFORE decryption (signature covers encrypted content)
if self._source.verification_kp and self._source.signed_response:
self._verify_response_signature()
if self._source.encryption_kp:
self._decrypt_response()
if self._source.verification_kp:
self._verify_signed()
# Verify assertion signature AFTER decryption (signature is inside encrypted content)
if self._source.verification_kp and self._source.signed_assertion:
self._verify_assertion_signature()
self._verify_request_id()
self._verify_status()
@@ -114,45 +120,45 @@ class ResponseProcessor:
decrypted_assertion,
)
def _verify_signed(self):
"""Verify SAML Response's Signature"""
signatures = []
def _verify_signature(self, signature_node):
"""Verify a single signature node"""
xmlsec.tree.add_ids(self._root, ["ID"])
if self._source.signed_response:
signature_nodes = self._root.xpath("/samlp:Response/ds:Signature", namespaces=NS_MAP)
ctx = xmlsec.SignatureContext()
key = xmlsec.Key.from_memory(
self._source.verification_kp.certificate_data,
xmlsec.constants.KeyDataFormatCertPem,
)
ctx.key = key
if len(signature_nodes) != 1:
raise InvalidSignature("No Signature exists in the Response element.")
signatures.extend(signature_nodes)
ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509])
try:
ctx.verify(signature_node)
except xmlsec.Error as exc:
raise InvalidSignature(
"The signature of the SAML object is either missing or invalid."
) from exc
LOGGER.debug("Successfully verified signature")
if self._source.signed_assertion:
signature_nodes = self._root.xpath(
"/samlp:Response/saml:Assertion/ds:Signature", namespaces=NS_MAP
)
def _verify_response_signature(self):
"""Verify SAML Response's Signature (before decryption)"""
signature_nodes = self._root.xpath("/samlp:Response/ds:Signature", namespaces=NS_MAP)
if len(signature_nodes) != 1:
raise InvalidSignature("No Signature exists in the Assertion element.")
signatures.extend(signature_nodes)
if len(signature_nodes) != 1:
raise InvalidSignature("No Signature exists in the Response element.")
if len(signatures) == 0:
raise InvalidSignature()
self._verify_signature(signature_nodes[0])
for signature_node in signatures:
xmlsec.tree.add_ids(self._root, ["ID"])
def _verify_assertion_signature(self):
"""Verify SAML Assertion's Signature (after decryption)"""
signature_nodes = self._root.xpath(
"/samlp:Response/saml:Assertion/ds:Signature", namespaces=NS_MAP
)
ctx = xmlsec.SignatureContext()
key = xmlsec.Key.from_memory(
self._source.verification_kp.certificate_data,
xmlsec.constants.KeyDataFormatCertPem,
)
ctx.key = key
if len(signature_nodes) != 1:
raise InvalidSignature("No Signature exists in the Assertion element.")
ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509])
try:
ctx.verify(signature_node)
except xmlsec.Error as exc:
raise InvalidSignature() from exc
LOGGER.debug("Successfully verified signature")
self._verify_signature(signature_nodes[0])
def _verify_request_id(self):
if self._source.allow_idp_initiated:

View File

@@ -297,6 +297,7 @@ export function renderForm({
>
<ak-crypto-certificate-search
.certificate=${provider.encryptionKp}
nokey
></ak-crypto-certificate-search>
<p class="pf-c-form__helper-text">
${msg("When selected, assertions will be encrypted using this keypair.")}