mirror of
https://github.com/goauthentik/authentik
synced 2026-04-25 17:15:26 +02:00
admin/files: sign custom-domain S3 URLs for the final host (#21704)
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
from collections.abc import Generator, Iterator
|
||||
from contextlib import contextmanager
|
||||
from tempfile import SpooledTemporaryFile
|
||||
from urllib.parse import urlsplit
|
||||
from urllib.parse import urlsplit, urlunsplit
|
||||
|
||||
import boto3
|
||||
from botocore.config import Config
|
||||
@@ -164,16 +164,19 @@ class S3Backend(ManageableBackend):
|
||||
)
|
||||
|
||||
def _file_url(name: str, request: HttpRequest | None) -> str:
|
||||
client = self.client
|
||||
params = {
|
||||
"Bucket": self.bucket_name,
|
||||
"Key": f"{self.base_path}/{name}",
|
||||
}
|
||||
|
||||
url = self.client.generate_presigned_url(
|
||||
"get_object",
|
||||
Params=params,
|
||||
ExpiresIn=expires_in,
|
||||
HttpMethod="GET",
|
||||
operation_name = "GetObject"
|
||||
operation_model = client.meta.service_model.operation_model(operation_name)
|
||||
request_dict = client._convert_to_request_dict(
|
||||
params,
|
||||
operation_model,
|
||||
endpoint_url=client.meta.endpoint_url,
|
||||
context={"is_presign_request": True},
|
||||
)
|
||||
|
||||
# Support custom domain for S3-compatible storage (so not AWS)
|
||||
@@ -183,9 +186,8 @@ class S3Backend(ManageableBackend):
|
||||
CONFIG.get(f"storage.{self.name}.custom_domain", None),
|
||||
)
|
||||
if custom_domain:
|
||||
parsed = urlsplit(url)
|
||||
scheme = "https" if use_https else "http"
|
||||
path = parsed.path
|
||||
path = request_dict["url_path"]
|
||||
|
||||
# When using path-style addressing, the presigned URL contains the bucket
|
||||
# name in the path (e.g., /bucket-name/key). Since custom_domain must
|
||||
@@ -200,9 +202,22 @@ class S3Backend(ManageableBackend):
|
||||
if not path.startswith("/"):
|
||||
path = f"/{path}"
|
||||
|
||||
url = f"{scheme}://{custom_domain}{path}?{parsed.query}"
|
||||
custom_base = urlsplit(f"{scheme}://{custom_domain}")
|
||||
|
||||
return url
|
||||
# Sign the final public URL instead of signing the internal S3 endpoint and
|
||||
# rewriting it afterwards. Presigned SigV4 URLs include the host header in the
|
||||
# canonical request, so post-sign host changes break strict backends like RustFS.
|
||||
public_path = f"{custom_base.path.rstrip('/')}{path}" if custom_base.path else path
|
||||
request_dict["url_path"] = public_path
|
||||
request_dict["url"] = urlunsplit(
|
||||
(custom_base.scheme, custom_base.netloc, public_path, "", "")
|
||||
)
|
||||
|
||||
return client._request_signer.generate_presigned_url(
|
||||
request_dict,
|
||||
operation_name,
|
||||
expires_in=expires_in,
|
||||
)
|
||||
|
||||
if use_cache:
|
||||
return self._cache_get_or_set(name, request, _file_url, expires_in)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from unittest import skipUnless
|
||||
from urllib.parse import parse_qs, urlsplit
|
||||
|
||||
from botocore.exceptions import UnsupportedSignatureVersionError
|
||||
from django.test import TestCase
|
||||
@@ -168,6 +169,44 @@ class TestS3Backend(FileTestS3BackendMixin, TestCase):
|
||||
f"URL: {url}",
|
||||
)
|
||||
|
||||
@CONFIG.patch("storage.s3.secure_urls", False)
|
||||
@CONFIG.patch("storage.s3.addressing_style", "path")
|
||||
def test_file_url_custom_domain_resigns_for_custom_host(self):
|
||||
"""Test presigned URLs are signed for the custom domain host.
|
||||
|
||||
Host-changing custom domains must produce a signature query string for
|
||||
the public host, not reuse the internal endpoint signature.
|
||||
"""
|
||||
bucket_name = self.media_s3_bucket_name
|
||||
key_name = "application-icons/test.svg"
|
||||
custom_domain = f"files.example.test:8020/{bucket_name}"
|
||||
|
||||
endpoint_signed_url = self.media_s3_backend.client.generate_presigned_url(
|
||||
"get_object",
|
||||
Params={
|
||||
"Bucket": bucket_name,
|
||||
"Key": f"{self.media_s3_backend.base_path}/{key_name}",
|
||||
},
|
||||
ExpiresIn=900,
|
||||
HttpMethod="GET",
|
||||
)
|
||||
|
||||
with CONFIG.patch("storage.media.s3.custom_domain", custom_domain):
|
||||
custom_url = self.media_s3_backend.file_url(key_name, use_cache=False)
|
||||
|
||||
endpoint_parts = urlsplit(endpoint_signed_url)
|
||||
custom_parts = urlsplit(custom_url)
|
||||
|
||||
self.assertEqual(custom_parts.scheme, "http")
|
||||
self.assertEqual(custom_parts.netloc, "files.example.test:8020")
|
||||
self.assertEqual(parse_qs(custom_parts.query)["X-Amz-SignedHeaders"], ["host"])
|
||||
self.assertNotEqual(
|
||||
custom_parts.query,
|
||||
endpoint_parts.query,
|
||||
"Custom-domain URLs must be signed for the public host, not reuse the endpoint "
|
||||
"signature query string.",
|
||||
)
|
||||
|
||||
def test_themed_urls_without_theme_variable(self):
|
||||
"""Test themed_urls returns None when filename has no %(theme)s"""
|
||||
result = self.media_s3_backend.themed_urls("logo.png")
|
||||
|
||||
Reference in New Issue
Block a user