diff --git a/authentik/admin/files/backends/s3.py b/authentik/admin/files/backends/s3.py index f50f43b073..2a867a3a58 100644 --- a/authentik/admin/files/backends/s3.py +++ b/authentik/admin/files/backends/s3.py @@ -1,7 +1,7 @@ from collections.abc import Generator, Iterator from contextlib import contextmanager from tempfile import SpooledTemporaryFile -from urllib.parse import urlsplit +from urllib.parse import urlsplit, urlunsplit import boto3 from botocore.config import Config @@ -164,16 +164,19 @@ class S3Backend(ManageableBackend): ) def _file_url(name: str, request: HttpRequest | None) -> str: + client = self.client params = { "Bucket": self.bucket_name, "Key": f"{self.base_path}/{name}", } - url = self.client.generate_presigned_url( - "get_object", - Params=params, - ExpiresIn=expires_in, - HttpMethod="GET", + operation_name = "GetObject" + operation_model = client.meta.service_model.operation_model(operation_name) + request_dict = client._convert_to_request_dict( + params, + operation_model, + endpoint_url=client.meta.endpoint_url, + context={"is_presign_request": True}, ) # Support custom domain for S3-compatible storage (so not AWS) @@ -183,9 +186,8 @@ class S3Backend(ManageableBackend): CONFIG.get(f"storage.{self.name}.custom_domain", None), ) if custom_domain: - parsed = urlsplit(url) scheme = "https" if use_https else "http" - path = parsed.path + path = request_dict["url_path"] # When using path-style addressing, the presigned URL contains the bucket # name in the path (e.g., /bucket-name/key). Since custom_domain must @@ -200,9 +202,22 @@ class S3Backend(ManageableBackend): if not path.startswith("/"): path = f"/{path}" - url = f"{scheme}://{custom_domain}{path}?{parsed.query}" + custom_base = urlsplit(f"{scheme}://{custom_domain}") - return url + # Sign the final public URL instead of signing the internal S3 endpoint and + # rewriting it afterwards. Presigned SigV4 URLs include the host header in the + # canonical request, so post-sign host changes break strict backends like RustFS. + public_path = f"{custom_base.path.rstrip('/')}{path}" if custom_base.path else path + request_dict["url_path"] = public_path + request_dict["url"] = urlunsplit( + (custom_base.scheme, custom_base.netloc, public_path, "", "") + ) + + return client._request_signer.generate_presigned_url( + request_dict, + operation_name, + expires_in=expires_in, + ) if use_cache: return self._cache_get_or_set(name, request, _file_url, expires_in) diff --git a/authentik/admin/files/backends/tests/test_s3_backend.py b/authentik/admin/files/backends/tests/test_s3_backend.py index 769d886f40..23aa8ddfb9 100644 --- a/authentik/admin/files/backends/tests/test_s3_backend.py +++ b/authentik/admin/files/backends/tests/test_s3_backend.py @@ -1,4 +1,5 @@ from unittest import skipUnless +from urllib.parse import parse_qs, urlsplit from botocore.exceptions import UnsupportedSignatureVersionError from django.test import TestCase @@ -168,6 +169,44 @@ class TestS3Backend(FileTestS3BackendMixin, TestCase): f"URL: {url}", ) + @CONFIG.patch("storage.s3.secure_urls", False) + @CONFIG.patch("storage.s3.addressing_style", "path") + def test_file_url_custom_domain_resigns_for_custom_host(self): + """Test presigned URLs are signed for the custom domain host. + + Host-changing custom domains must produce a signature query string for + the public host, not reuse the internal endpoint signature. + """ + bucket_name = self.media_s3_bucket_name + key_name = "application-icons/test.svg" + custom_domain = f"files.example.test:8020/{bucket_name}" + + endpoint_signed_url = self.media_s3_backend.client.generate_presigned_url( + "get_object", + Params={ + "Bucket": bucket_name, + "Key": f"{self.media_s3_backend.base_path}/{key_name}", + }, + ExpiresIn=900, + HttpMethod="GET", + ) + + with CONFIG.patch("storage.media.s3.custom_domain", custom_domain): + custom_url = self.media_s3_backend.file_url(key_name, use_cache=False) + + endpoint_parts = urlsplit(endpoint_signed_url) + custom_parts = urlsplit(custom_url) + + self.assertEqual(custom_parts.scheme, "http") + self.assertEqual(custom_parts.netloc, "files.example.test:8020") + self.assertEqual(parse_qs(custom_parts.query)["X-Amz-SignedHeaders"], ["host"]) + self.assertNotEqual( + custom_parts.query, + endpoint_parts.query, + "Custom-domain URLs must be signed for the public host, not reuse the endpoint " + "signature query string.", + ) + def test_themed_urls_without_theme_variable(self): """Test themed_urls returns None when filename has no %(theme)s""" result = self.media_s3_backend.themed_urls("logo.png")