Compare commits

...

1 Commits

Author SHA1 Message Date
Dominic R
18d4f85579 sources/oauth: add AT Protocol source
Closes https://github.com/goauthentik/authentik/issues/22031
2026-05-04 19:46:25 -04:00
16 changed files with 965 additions and 22 deletions

View File

@@ -33,6 +33,7 @@ class SourceTypeSerializer(PassiveSerializer):
profile_url = CharField(read_only=True, allow_null=True)
oidc_well_known_url = CharField(read_only=True, allow_null=True)
oidc_jwks_url = CharField(read_only=True, allow_null=True)
client_secret_required = BooleanField()
class OAuthSourceSerializer(SourceSerializer):
@@ -65,6 +66,15 @@ class OAuthSourceSerializer(SourceSerializer):
)
source_type = registry.find_type(provider_type_name)
if not source_type.client_secret_required and "consumer_secret" not in attrs:
attrs["consumer_secret"] = ""
if (
source_type.client_secret_required
and not self.instance
and not attrs.get("consumer_secret")
):
raise ValidationError({"consumer_secret": "This field is required."})
well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url
inferred_oidc_jwks_url = None
@@ -149,7 +159,7 @@ class OAuthSourceSerializer(SourceSerializer):
"authorization_code_auth_method",
]
extra_kwargs = {
"consumer_secret": {"write_only": True},
"consumer_secret": {"write_only": True, "allow_blank": True, "required": False},
"request_token_url": {"allow_blank": True},
"authorization_url": {"allow_blank": True},
"access_token_url": {"allow_blank": True},

View File

@@ -10,6 +10,7 @@ LOGGER = get_logger()
AUTHENTIK_SOURCES_OAUTH_TYPES = [
"authentik.sources.oauth.types.apple",
"authentik.sources.oauth.types.atproto",
"authentik.sources.oauth.types.azure_ad",
"authentik.sources.oauth.types.discord",
"authentik.sources.oauth.types.entra_id",

View File

@@ -271,6 +271,15 @@ class EntraIDOAuthSource(CreatableType, OAuthSource):
verbose_name_plural = _("Entra ID OAuth Sources")
class AtProtoOAuthSource(CreatableType, OAuthSource):
"""Social Login using AT Protocol."""
class Meta:
abstract = True
verbose_name = _("AT Protocol OAuth Source")
verbose_name_plural = _("AT Protocol OAuth Sources")
class OpenIDConnectOAuthSource(CreatableType, OAuthSource):
"""Login using a Generic OpenID-Connect compliant provider."""

View File

@@ -0,0 +1,284 @@
"""AT Protocol OAuth Source tests"""
from urllib.parse import parse_qs, urlparse
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import Encoding, NoEncryption, PrivateFormat
from django.test import RequestFactory, SimpleTestCase
from jwt import decode, get_unverified_header
from requests_mock import Mocker
from authentik.sources.oauth.api.source import OAuthSourceSerializer
from authentik.sources.oauth.models import OAuthSource
from authentik.sources.oauth.types.atproto import (
BSKY_AUTHORIZATION_URL_DEFAULT,
BSKY_PAR_URL_DEFAULT,
BSKY_PUBLIC_PROFILE_URL_DEFAULT,
BSKY_TOKEN_URL_DEFAULT,
AtProtoOAuthClient,
AtProtoType,
)
ATPROTO_DID = "did:plc:z72i7hdynmk6r22z27h6tvur"
ATPROTO_PDS = "https://puffball.us-east.host.bsky.network"
ATPROTO_CLIENT_ID = "https://authentik.example/application/o/atproto/client-metadata.json"
ATPROTO_DID_DOCUMENT = {
"id": ATPROTO_DID,
"alsoKnownAs": ["at://bsky.app"],
"service": [
{
"id": "#atproto_pds",
"type": "AtprotoPersonalDataServer",
"serviceEndpoint": ATPROTO_PDS,
}
],
}
ATPROTO_PROFILE = {
"did": ATPROTO_DID,
"handle": "bsky.app",
"displayName": "Bluesky",
}
CUSTOM_ISSUER = "https://auth.example"
CUSTOM_AUTHORIZATION_URL = f"{CUSTOM_ISSUER}/oauth/authorize"
CUSTOM_PAR_URL = f"{CUSTOM_ISSUER}/oauth/par"
CUSTOM_TOKEN_URL = f"{CUSTOM_ISSUER}/oauth/token"
CUSTOM_PROFILE_URL = f"{CUSTOM_ISSUER}/xrpc/app.bsky.actor.getProfile"
def private_key_pem() -> str:
"""Generate an ES256 private key for DPoP tests."""
return (
ec.generate_private_key(ec.SECP256R1())
.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
.decode()
)
class TestTypeAtProto(SimpleTestCase):
"""AT Protocol OAuth Source tests"""
def setUp(self):
self.source = OAuthSource(
name="test",
slug="test",
provider_type="atproto",
consumer_key=ATPROTO_CLIENT_ID,
)
self.factory = RequestFactory()
def get_request(self):
request = self.factory.get("/")
request.session = {}
return request
def get_callback_request(self, issuer: str = "https://bsky.social"):
request = self.factory.get(f"/?state=state&iss={issuer}&code=code")
request.session = {
"authentik/sources/oauth/atproto/test": {
"state": "state",
"code_verifier": "verifier",
"issuer": issuer,
"private_key": private_key_pem(),
"dpop_nonce": "nonce-1",
"login_hint": None,
"expected_did": None,
}
}
return request
def test_enroll_context(self):
"""Test AT Protocol enrollment context."""
ak_context = AtProtoType().get_base_user_properties(
source=self.source,
info=ATPROTO_PROFILE,
)
self.assertEqual(ak_context["username"], ATPROTO_PROFILE["handle"])
self.assertEqual(ak_context["name"], ATPROTO_PROFILE["displayName"])
self.assertIsNone(ak_context["email"])
def test_serializer_allows_missing_secret(self):
"""Test AT Protocol sources can be created without a client secret."""
serializer = OAuthSourceSerializer()
validated = serializer.validate(
{
"name": "test-atproto",
"slug": "test-atproto",
"provider_type": "atproto",
"consumer_key": ATPROTO_CLIENT_ID,
}
)
self.assertEqual(validated["consumer_secret"], "")
@Mocker()
def test_redirect_uses_par_dpop_pkce_and_no_secret(self, mock: Mocker):
"""Test authorization starts with a DPoP-bound pushed authorization request."""
mock.post(
BSKY_PAR_URL_DEFAULT,
json={"request_uri": "urn:request:123"},
headers={"DPoP-Nonce": "nonce-1"},
)
request = self.get_request()
client = AtProtoOAuthClient(self.source, request, callback="/callback/")
redirect_url = client.get_redirect_url({"scope": ["atproto", "transition:generic"]})
parsed_redirect = urlparse(redirect_url)
parsed_query = parse_qs(parsed_redirect.query)
parsed_redirect_url = (
f"{parsed_redirect.scheme}://{parsed_redirect.netloc}{parsed_redirect.path}"
)
self.assertEqual(parsed_redirect_url, BSKY_AUTHORIZATION_URL_DEFAULT)
self.assertEqual(parsed_query["client_id"], [ATPROTO_CLIENT_ID])
self.assertEqual(parsed_query["request_uri"], ["urn:request:123"])
self.assertEqual(len(mock.request_history), 1)
par_request = mock.request_history[0]
self.assertIn("DPoP", par_request.headers)
self.assertEqual(par_request.text.count("client_secret"), 0)
self.assertIn("client_id=https%3A%2F%2Fauthentik.example", par_request.text)
self.assertIn("code_challenge_method=S256", par_request.text)
self.assertIn("scope=atproto+transition%3Ageneric", par_request.text)
header = get_unverified_header(par_request.headers["DPoP"])
payload = decode(par_request.headers["DPoP"], options={"verify_signature": False})
self.assertEqual(header["typ"], "dpop+jwt")
self.assertEqual(header["alg"], "ES256")
self.assertEqual(payload["htm"], "POST")
self.assertEqual(payload["htu"], BSKY_PAR_URL_DEFAULT)
@Mocker()
def test_custom_urls_override_bluesky_defaults(self, mock: Mocker):
"""Test non-Bluesky AT Protocol endpoint configuration."""
source = OAuthSource(
name="test",
slug="test",
provider_type="atproto",
consumer_key=ATPROTO_CLIENT_ID,
authorization_url=CUSTOM_AUTHORIZATION_URL,
request_token_url=CUSTOM_PAR_URL,
access_token_url=CUSTOM_TOKEN_URL,
profile_url=CUSTOM_PROFILE_URL,
)
mock.post(
CUSTOM_PAR_URL,
json={"request_uri": "urn:request:custom"},
headers={"DPoP-Nonce": "nonce-custom"},
)
request = self.get_request()
client = AtProtoOAuthClient(source, request, callback="/callback/")
redirect_url = client.get_redirect_url({"scope": ["atproto"]})
parsed_redirect = urlparse(redirect_url)
self.assertEqual(
f"{parsed_redirect.scheme}://{parsed_redirect.netloc}{parsed_redirect.path}",
CUSTOM_AUTHORIZATION_URL,
)
self.assertEqual(request.session[client.session_key]["issuer"], CUSTOM_ISSUER)
self.assertEqual(mock.request_history[0].url, CUSTOM_PAR_URL)
@Mocker()
def test_access_token_validates_subject_scope_and_issuer(self, mock: Mocker):
"""Test callback token response validation."""
mock.post(
BSKY_TOKEN_URL_DEFAULT,
json={
"access_token": "access",
"refresh_token": "refresh",
"token_type": "DPoP",
"expires_in": 300,
"sub": ATPROTO_DID,
"scope": "atproto transition:generic",
},
headers={"DPoP-Nonce": "nonce-2"},
)
mock.get(f"https://plc.directory/{ATPROTO_DID}", json=ATPROTO_DID_DOCUMENT)
mock.get(
f"{ATPROTO_PDS}/.well-known/oauth-protected-resource",
json={"authorization_servers": ["https://bsky.social"]},
)
request = self.get_callback_request()
client = AtProtoOAuthClient(self.source, request, callback="/callback/")
token = client.get_access_token()
self.assertEqual(token["sub"], ATPROTO_DID)
self.assertEqual(token["pds_url"], ATPROTO_PDS)
token_request = mock.request_history[0]
self.assertIn("DPoP", token_request.headers)
self.assertEqual(token_request.text.count("client_secret"), 0)
self.assertIn("code_verifier=verifier", token_request.text)
@Mocker()
def test_access_token_rejects_non_dpop_token_type(self, mock: Mocker):
"""Test callback rejects token responses that are not DPoP-bound."""
mock.post(
BSKY_TOKEN_URL_DEFAULT,
json={
"access_token": "access",
"token_type": "Bearer",
"sub": ATPROTO_DID,
"scope": "atproto",
},
headers={"DPoP-Nonce": "nonce-2"},
)
client = AtProtoOAuthClient(self.source, self.get_callback_request(), callback="/callback/")
token = client.get_access_token()
self.assertEqual(token["error"], "Token response did not include a DPoP token type.")
@Mocker()
def test_did_web_localhost_uses_http_for_local_testing(self, mock: Mocker):
"""Test did:web localhost resolution for the local AT Protocol simulator."""
mock.get("http://localhost:8787/.well-known/did.json", json={"id": "did:web:localhost"})
client = AtProtoOAuthClient(self.source, self.get_request(), callback="/callback/")
document = client.get_did_document("did:web:localhost%3A8787")
self.assertEqual(document["id"], "did:web:localhost")
@Mocker()
def test_profile_info(self, mock: Mocker):
"""Test public Bluesky profile lookup."""
mock.get(BSKY_PUBLIC_PROFILE_URL_DEFAULT, json=ATPROTO_PROFILE)
client = AtProtoOAuthClient(self.source, self.get_request(), callback="/callback/")
profile = client.get_profile_info({"sub": ATPROTO_DID})
self.assertEqual(profile["did"], ATPROTO_DID)
self.assertEqual(profile["handle"], "bsky.app")
@Mocker()
def test_profile_info_with_transition_email(self, mock: Mocker):
"""Test private session email lookup when transition:email is granted."""
mock.get(BSKY_PUBLIC_PROFILE_URL_DEFAULT, json=ATPROTO_PROFILE)
mock.get(
f"{ATPROTO_PDS}/xrpc/com.atproto.server.getSession",
json={"email": "user@example.com", "emailConfirmed": True},
headers={"DPoP-Nonce": "nonce-3"},
)
request = self.get_request()
request.session = {
"authentik/sources/oauth/atproto/test": {
"state": "state",
"code_verifier": "verifier",
"issuer": "https://bsky.social",
"private_key": private_key_pem(),
"dpop_nonce": "nonce-2",
"login_hint": None,
"expected_did": None,
}
}
client = AtProtoOAuthClient(self.source, request, callback="/callback/")
profile = client.get_profile_info(
{
"sub": ATPROTO_DID,
"scope": "atproto transition:email",
"access_token": "access",
"pds_url": ATPROTO_PDS,
}
)
self.assertEqual(profile["email"], "user@example.com")
session_request = mock.request_history[1]
self.assertEqual(session_request.headers["Authorization"], "DPoP access")
payload = decode(session_request.headers["DPoP"], options={"verify_signature": False})
self.assertIn("ath", payload)

View File

@@ -0,0 +1,486 @@
"""AT Protocol OAuth Views"""
from time import time
from typing import Any
from urllib.parse import parse_qs, quote, unquote, urlencode, urlparse, urlunparse
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.hazmat.primitives.hashes import SHA256, Hash
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
load_pem_private_key,
)
from django.templatetags.static import static
from django.urls import reverse
from django.utils.crypto import constant_time_compare, get_random_string
from jwt import encode
from jwt.algorithms import ECAlgorithm
from jwt.utils import base64url_encode
from requests.exceptions import RequestException
from structlog.stdlib import get_logger
from authentik.lib.generators import generate_id
from authentik.providers.oauth2.utils import pkce_s256_challenge
from authentik.sources.oauth.clients.base import BaseOAuthClient
from authentik.sources.oauth.models import OAuthSource, PKCEMethod
from authentik.sources.oauth.types.registry import SourceType, registry
from authentik.sources.oauth.views.callback import OAuthCallback
from authentik.sources.oauth.views.redirect import OAuthRedirect
LOGGER = get_logger()
# Bluesky defaults. AT Protocol OAuth requires these endpoint roles, but
# non-Bluesky deployments can use different hosts through the source URL fields.
BSKY_AUTHORIZATION_URL_DEFAULT = "https://bsky.social/oauth/authorize"
BSKY_TOKEN_URL_DEFAULT = "https://bsky.social/oauth/token" # nosec
BSKY_PAR_URL_DEFAULT = "https://bsky.social/oauth/par"
BSKY_ISSUER_DEFAULT = "https://bsky.social"
BSKY_PUBLIC_PROFILE_URL_DEFAULT = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile"
HTTP_STATUS_BAD_REQUEST = 400
SESSION_KEY_ATPROTO = "authentik/sources/oauth/atproto"
class AtProtoOAuthClient(BaseOAuthClient):
"""AT Protocol OAuth client.
AT Protocol looks like OAuth2 from a distance, but the required security
profile is different enough that sharing the generic OAuth2 client would
hide important behavior: PAR is mandatory, access tokens are DPoP-bound,
public clients use metadata URLs instead of secrets, and the token subject
is the user's DID rather than an OIDC userinfo subject.
"""
def get_client_id(self) -> str:
"""Return the public client metadata URL."""
return self.source.consumer_key
@property
def session_key(self) -> str:
return f"{SESSION_KEY_ATPROTO}/{self.source.slug}"
def get_authorization_url(self) -> str:
if self.source.source_type.urls_customizable and self.source.authorization_url:
return self.source.authorization_url
return self.source.source_type.authorization_url or BSKY_AUTHORIZATION_URL_DEFAULT
def get_token_url(self) -> str:
if self.source.source_type.urls_customizable and self.source.access_token_url:
return self.source.access_token_url
return self.source.source_type.access_token_url or BSKY_TOKEN_URL_DEFAULT
def get_par_url(self) -> str:
if self.source.source_type.urls_customizable and self.source.request_token_url:
return self.source.request_token_url
return self.source.source_type.request_token_url or BSKY_PAR_URL_DEFAULT
def get_issuer(self) -> str:
parsed_url = urlparse(self.get_authorization_url())
if parsed_url.scheme and parsed_url.netloc:
return f"{parsed_url.scheme}://{parsed_url.netloc}"
return BSKY_ISSUER_DEFAULT
def get_redirect_args(self) -> dict[str, str]:
"""AT Protocol redirects are built from PAR responses instead."""
raise NotImplementedError
def get_redirect_url(self, parameters=None):
"""Create a PAR request and redirect with request_uri."""
request_uri = self.create_pushed_authorization_request(parameters or {})
parsed_url = urlparse(self.get_authorization_url())
parsed_args = parse_qs(parsed_url.query)
args = {
"client_id": self.get_client_id(),
"request_uri": request_uri,
}
args.update(parsed_args)
params = urlencode(args, quote_via=quote, doseq=True)
return urlunparse(parsed_url._replace(query=params))
def create_pushed_authorization_request(self, parameters: dict[str, Any]) -> str:
"""Create the pushed authorization request and persist session data."""
state = get_random_string(32)
code_verifier = generate_id(length=128)
private_key = ec.generate_private_key(ec.SECP256R1())
login_hint = parameters.pop("login_hint", None)
scope = parameters.pop("scope", [])
if isinstance(scope, str):
scopes = scope.split()
else:
scopes = list(scope)
if "atproto" not in scopes:
scopes.append("atproto")
# The DPoP key and PKCE verifier must survive the browser redirect so
# the callback can prove it is the same client that created the PAR.
session_data = {
"state": state,
"code_verifier": code_verifier,
"issuer": self.get_issuer(),
"private_key": private_key.private_bytes(
Encoding.PEM,
PrivateFormat.PKCS8,
NoEncryption(),
).decode(),
"dpop_nonce": None,
"login_hint": login_hint,
"expected_did": self.resolve_identifier(login_hint) if login_hint else None,
}
self.request.session[self.session_key] = session_data
# AT Protocol starts the browser flow with a PAR request. The browser
# only receives a request_uri, not the full authorization parameters.
body = {
"client_id": self.get_client_id(),
"response_type": "code",
"redirect_uri": self.request.build_absolute_uri(self.callback),
"scope": " ".join(sorted(set(scopes))),
"state": state,
"code_challenge": pkce_s256_challenge(code_verifier),
"code_challenge_method": PKCEMethod.S256,
}
if login_hint:
body["login_hint"] = login_hint
body.update(parameters)
response = self.request_with_dpop("post", self.get_par_url(), data=body)
try:
request_uri = response.json().get("request_uri")
except ValueError as exc:
raise RequestException("PAR response was not valid JSON", response=response) from exc
if not request_uri:
raise RequestException("PAR response did not include request_uri", response=response)
return request_uri
def get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
"""Fetch the initial access token from the callback code."""
session_data = self.request.session.get(self.session_key)
if not session_data:
LOGGER.warning("No AT Protocol OAuth session found")
return {"error": "No AT Protocol OAuth session found."}
if not constant_time_compare(session_data["state"], self.get_request_arg("state", "")):
LOGGER.warning("AT Protocol OAuth state check failed")
return {"error": "State check failed."}
issuer = self.get_request_arg("iss")
if not issuer or not constant_time_compare(session_data["issuer"], issuer):
LOGGER.warning("AT Protocol OAuth issuer check failed", issuer=issuer)
return {"error": "Issuer check failed."}
code = self.get_request_arg("code")
if not code:
return {"error": self.get_request_arg("error_description") or "No token received."}
data = {
"grant_type": "authorization_code",
"client_id": self.get_client_id(),
"redirect_uri": self.request.build_absolute_uri(self.callback),
"code": code,
"code_verifier": session_data["code_verifier"],
}
try:
response = self.request_with_dpop("post", self.get_token_url(), data=data)
token = response.json()
except ValueError as exc:
LOGGER.warning("AT Protocol token response was not valid JSON", exc=exc)
return None
except RequestException as exc:
LOGGER.warning(
"Unable to fetch AT Protocol access token",
exc=exc,
response=exc.response.text if exc.response is not None else str(exc),
)
return None
validation_error = self.validate_token_response(token, session_data, issuer)
if validation_error:
return {"error": validation_error}
return token
def validate_token_response(
self,
token: dict[str, Any],
session_data: dict[str, Any],
issuer: str,
) -> str | None:
"""Validate AT Protocol token claims and attach the verified PDS URL."""
# The token response identifies the account by DID. That DID becomes
# the stable source connection identifier in authentik.
did = token.get("sub")
if not did:
return "Token response did not include an account DID."
if "atproto" not in token.get("scope", "").split():
return "Token response did not include the atproto scope."
if token.get("token_type") != "DPoP":
return "Token response did not include a DPoP token type."
expected_did = session_data.get("expected_did")
if expected_did and not constant_time_compare(expected_did, did):
LOGGER.warning("AT Protocol OAuth subject check failed", expected=expected_did, did=did)
return "Subject check failed."
# Verify the DID document's PDS points back to the authorization server
# that issued the callback, otherwise a token could claim another DID.
pds_url = self.get_pds_url_for_subject(did, issuer)
if not pds_url:
return "Issuer is not authoritative for this account."
token["pds_url"] = pds_url
return None
def get_profile_info(self, token: dict[str, Any]) -> dict[str, Any] | None:
"""Fetch public profile data for the authenticated DID."""
did = token.get("sub")
if not did:
return None
profile_url = BSKY_PUBLIC_PROFILE_URL_DEFAULT
if self.source.source_type.urls_customizable and self.source.profile_url:
profile_url = self.source.profile_url
response = self.session.get(profile_url, params={"actor": did})
try:
response.raise_for_status()
except RequestException as exc:
LOGGER.warning(
"Unable to fetch AT Protocol profile",
exc=exc,
response=exc.response.text if exc.response is not None else str(exc),
)
return {"did": did}
profile = response.json()
profile["did"] = did
if "transition:email" in token.get("scope", "").split() and token.get("pds_url"):
profile.update(self.get_session_info(token))
return profile
def request_with_dpop(self, method: str, url: str, **kwargs):
"""Make a DPoP request, retrying once when the server provides a fresh nonce."""
response = self.do_dpop_request(method, url, **kwargs)
if response.status_code == HTTP_STATUS_BAD_REQUEST and response.headers.get("DPoP-Nonce"):
self.update_dpop_nonce(response.headers["DPoP-Nonce"])
response = self.do_dpop_request(method, url, **kwargs)
response.raise_for_status()
nonce = response.headers.get("DPoP-Nonce")
if not nonce:
raise RequestException("DPoP response did not include DPoP-Nonce", response=response)
self.update_dpop_nonce(nonce)
return response
def get_session_info(self, token: dict[str, Any]) -> dict[str, Any]:
"""Fetch private session data when transition:email was granted."""
pds_url = token["pds_url"].rstrip("/")
session_url = f"{pds_url}/xrpc/com.atproto.server.getSession"
headers = {
"Authorization": f"DPoP {token['access_token']}",
}
response = self.do_dpop_request(
"get",
session_url,
headers=headers,
access_token=token["access_token"],
)
if response.status_code == HTTP_STATUS_BAD_REQUEST and response.headers.get("DPoP-Nonce"):
self.update_dpop_nonce(response.headers["DPoP-Nonce"])
response = self.do_dpop_request(
"get",
session_url,
headers=headers,
access_token=token["access_token"],
)
try:
response.raise_for_status()
except RequestException as exc:
LOGGER.warning(
"Unable to fetch AT Protocol session info",
exc=exc,
response=exc.response.text if exc.response is not None else str(exc),
)
return {}
nonce = response.headers.get("DPoP-Nonce")
if nonce:
self.update_dpop_nonce(nonce)
try:
return response.json()
except ValueError as exc:
LOGGER.warning("AT Protocol session response was not valid JSON", exc=exc)
return {}
def do_dpop_request(self, method: str, url: str, **kwargs):
access_token = kwargs.pop("access_token", None)
headers = dict(kwargs.pop("headers", {}))
headers["Accept"] = "application/json"
headers["DPoP"] = self.build_dpop_proof(method, url, access_token)
return self.session.request(method, url, headers=headers, **kwargs)
def build_dpop_proof(self, method: str, url: str, access_token: str | None = None) -> str:
session_data = self.request.session[self.session_key]
private_key = load_pem_private_key(session_data["private_key"].encode(), password=None)
if not isinstance(private_key, EllipticCurvePrivateKey):
raise TypeError("DPoP private key must be an EC key")
payload = {
"jti": generate_id(),
"htm": method.upper(),
"htu": url,
"iat": int(time()),
}
if session_data.get("dpop_nonce"):
payload["nonce"] = session_data["dpop_nonce"]
if access_token:
# Resource requests bind the proof to the access token with ath.
digest = Hash(SHA256())
digest.update(access_token.encode())
payload["ath"] = base64url_encode(digest.finalize()).decode()
public_jwk = ECAlgorithm.to_jwk(private_key.public_key(), as_dict=True)
public_jwk.pop("kid", None)
return encode(
payload,
private_key,
algorithm="ES256",
headers={
"typ": "dpop+jwt",
"jwk": public_jwk,
},
)
def update_dpop_nonce(self, nonce: str) -> None:
session_data = self.request.session[self.session_key]
session_data["dpop_nonce"] = nonce
self.request.session[self.session_key] = session_data
def get_request_arg(self, key: str, default: Any | None = None) -> Any:
if self.request.method == "POST":
return self.request.POST.get(key, default)
return self.request.GET.get(key, default)
def resolve_identifier(self, identifier: str | None) -> str | None:
"""Resolve a handle or DID to a DID."""
if not identifier:
return None
if identifier.startswith("did:"):
return identifier
response = self.session.get(
f"{self.get_issuer()}/xrpc/com.atproto.identity.resolveHandle",
params={"handle": identifier.removeprefix("@")},
)
try:
response.raise_for_status()
except RequestException as exc:
LOGGER.warning(
"Unable to resolve AT Protocol login hint",
identifier=identifier,
exc=exc,
)
return None
try:
return response.json().get("did")
except ValueError as exc:
LOGGER.warning("AT Protocol handle resolution response was not valid JSON", exc=exc)
return None
def get_pds_url_for_subject(self, did: str, issuer: str) -> str | None:
"""Verify that the DID's PDS resolves to the callback issuer."""
try:
did_document = self.get_did_document(did)
pds_url = self.get_pds_url(did_document)
if not pds_url:
LOGGER.warning("DID document does not include an atproto PDS", did=did)
return None
resource_metadata = self.session.get(
f"{pds_url.rstrip('/')}/.well-known/oauth-protected-resource"
)
resource_metadata.raise_for_status()
try:
authorization_servers = resource_metadata.json().get("authorization_servers", [])
except ValueError as exc:
raise RequestException(
"OAuth protected resource metadata was not valid JSON",
response=resource_metadata,
) from exc
except RequestException as exc:
LOGGER.warning("Unable to verify AT Protocol issuer", did=did, issuer=issuer, exc=exc)
return None
if issuer in authorization_servers:
return pds_url
return None
def get_did_document(self, did: str) -> dict[str, Any]:
if did.startswith("did:plc:"):
response = self.session.get(f"https://plc.directory/{did}")
elif did.startswith("did:web:"):
# did:web resolves by fetching a DID document from the hostname in the DID.
# The AT Protocol local simulator uses did:web:localhost, which cannot use
# HTTPS locally; real did:web identities should resolve over HTTPS.
did_parts = [unquote(part) for part in did.removeprefix("did:web:").split(":")]
host = did_parts[0]
path = "/".join(did_parts[1:])
scheme = "http" if host.startswith(("localhost", "127.0.0.1")) else "https"
did_path = f"{path}/did.json" if path else ".well-known/did.json"
response = self.session.get(f"{scheme}://{host}/{did_path}")
else:
raise RequestException(f"Unsupported DID method: {did}")
response.raise_for_status()
try:
return response.json()
except ValueError as exc:
raise RequestException("DID document was not valid JSON", response=response) from exc
def get_pds_url(self, did_document: dict[str, Any]) -> str | None:
for service in did_document.get("service", []):
if service.get("id") == "#atproto_pds":
return service.get("serviceEndpoint")
if service.get("type") == "AtprotoPersonalDataServer":
return service.get("serviceEndpoint")
return None
class AtProtoOAuthRedirect(OAuthRedirect):
"""AT Protocol OAuth redirect."""
client_class = AtProtoOAuthClient
def get_additional_parameters(self, source: OAuthSource): # pragma: no cover
return {
"scope": ["atproto"],
}
class AtProtoOAuthCallback(OAuthCallback):
"""AT Protocol OAuth callback."""
client_class = AtProtoOAuthClient
def get_callback_url(self, source: OAuthSource) -> str:
return reverse(
"authentik_sources_oauth:oauth-client-callback",
kwargs={"source_slug": source.slug},
)
def get_user_id(self, info: dict[str, Any]) -> str | None:
return info.get("did")
@registry.register()
class AtProtoType(SourceType):
"""AT Protocol Type definition"""
callback_view = AtProtoOAuthCallback
redirect_view = AtProtoOAuthRedirect
verbose_name = "AT Protocol"
name = "atproto"
# Defaults target Bluesky. They are editable because other AT Protocol
# authorization servers can expose the same endpoint roles on different URLs.
authorization_url = BSKY_AUTHORIZATION_URL_DEFAULT
request_token_url = BSKY_PAR_URL_DEFAULT
access_token_url = BSKY_TOKEN_URL_DEFAULT
profile_url = BSKY_PUBLIC_PROFILE_URL_DEFAULT
urls_customizable = True
pkce = PKCEMethod.S256
client_secret_required = False
def icon_url(self) -> str:
return static("authentik/sources/atproto.svg")
def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]:
return {
"username": info.get("handle") or info.get("did"),
"email": info.get("email"),
"name": info.get("displayName") or info.get("handle"),
}

View File

@@ -42,6 +42,8 @@ class SourceType:
oidc_jwks_url: str | None = None
pkce: PKCEMethod = PKCEMethod.NONE
client_secret_required = True
authorization_code_auth_method: AuthorizationCodeAuthMethod = (
AuthorizationCodeAuthMethod.BASIC_AUTH
)

View File

@@ -12967,6 +12967,7 @@
"type": "string",
"enum": [
"apple",
"atproto",
"openidconnect",
"entraid",
"azuread",
@@ -13038,7 +13039,6 @@
},
"consumer_secret": {
"type": "string",
"minLength": 1,
"title": "Consumer secret"
},
"additional_scopes": {

View File

@@ -162,7 +162,7 @@ export interface OAuthSourceRequest {
* @type {string}
* @memberof OAuthSourceRequest
*/
consumerSecret: string;
consumerSecret?: string;
/**
*
* @type {string}
@@ -203,7 +203,6 @@ export function instanceOfOAuthSourceRequest(value: object): value is OAuthSourc
if (!("slug" in value) || value["slug"] === undefined) return false;
if (!("providerType" in value) || value["providerType"] === undefined) return false;
if (!("consumerKey" in value) || value["consumerKey"] === undefined) return false;
if (!("consumerSecret" in value) || value["consumerSecret"] === undefined) return false;
return true;
}
@@ -252,7 +251,7 @@ export function OAuthSourceRequestFromJSONTyped(
profileUrl: json["profile_url"] == null ? undefined : json["profile_url"],
pkce: json["pkce"] == null ? undefined : PKCEMethodEnumFromJSON(json["pkce"]),
consumerKey: json["consumer_key"],
consumerSecret: json["consumer_secret"],
consumerSecret: json["consumer_secret"] == null ? undefined : json["consumer_secret"],
additionalScopes: json["additional_scopes"] == null ? undefined : json["additional_scopes"],
oidcWellKnownUrl:
json["oidc_well_known_url"] == null ? undefined : json["oidc_well_known_url"],

View File

@@ -18,6 +18,7 @@
*/
export const ProviderTypeEnum = {
Apple: "apple",
Atproto: "atproto",
Openidconnect: "openidconnect",
Entraid: "entraid",
Azuread: "azuread",

View File

@@ -72,6 +72,12 @@ export interface SourceType {
* @memberof SourceType
*/
readonly oidcJwksUrl: string | null;
/**
*
* @type {boolean}
* @memberof SourceType
*/
clientSecretRequired: boolean;
}
/**
@@ -87,6 +93,8 @@ export function instanceOfSourceType(value: object): value is SourceType {
if (!("profileUrl" in value) || value["profileUrl"] === undefined) return false;
if (!("oidcWellKnownUrl" in value) || value["oidcWellKnownUrl"] === undefined) return false;
if (!("oidcJwksUrl" in value) || value["oidcJwksUrl"] === undefined) return false;
if (!("clientSecretRequired" in value) || value["clientSecretRequired"] === undefined)
return false;
return true;
}
@@ -108,6 +116,7 @@ export function SourceTypeFromJSONTyped(json: any, ignoreDiscriminator: boolean)
profileUrl: json["profile_url"],
oidcWellKnownUrl: json["oidc_well_known_url"],
oidcJwksUrl: json["oidc_jwks_url"],
clientSecretRequired: json["client_secret_required"],
};
}
@@ -135,5 +144,6 @@ export function SourceTypeToJSONTyped(
name: value["name"],
verbose_name: value["verboseName"],
urls_customizable: value["urlsCustomizable"],
client_secret_required: value["clientSecretRequired"],
};
}

View File

@@ -44643,7 +44643,6 @@ components:
consumer_secret:
type: string
writeOnly: true
minLength: 1
additional_scopes:
type: string
oidc_well_known_url:
@@ -44660,7 +44659,6 @@ components:
token request flow
required:
- consumer_key
- consumer_secret
- name
- provider_type
- slug
@@ -49969,7 +49967,6 @@ components:
consumer_secret:
type: string
writeOnly: true
minLength: 1
additional_scopes:
type: string
oidc_well_known_url:
@@ -52675,6 +52672,7 @@ components:
ProviderTypeEnum:
enum:
- apple
- atproto
- openidconnect
- entraid
- azuread
@@ -56337,9 +56335,12 @@ components:
type: string
readOnly: true
nullable: true
client_secret_required:
type: boolean
required:
- access_token_url
- authorization_url
- client_secret_required
- name
- oidc_jwks_url
- oidc_well_known_url

View File

@@ -0,0 +1,3 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 640 640">
<path fill="#1185fe" d="M320 291.1C293.9 240.4 222.9 145.9 156.9 99.3C93.6 54.6 69.5 62.3 53.6 69.5C35.3 77.8 32 105.9 32 122.4C32 138.9 41.1 258 47 277.9C66.5 343.6 136.1 365.8 200.2 358.6C106.3 372.6 22.9 406.8 132.3 528.5C252.6 653.1 297.1 501.8 320 425.1C342.9 501.8 369.2 647.6 505.6 528.5C608 425.1 533.7 372.5 439.8 358.6C503.9 365.7 573.4 343.5 593 277.9C598.9 258 608 139 608 122.4C608 105.8 604.7 77.7 586.4 69.5C570.6 62.4 546.4 54.6 483.2 99.3C417.1 145.9 346.1 240.4 320 291.1Z"/>
</svg>

After

Width:  |  Height:  |  Size: 566 B

View File

@@ -30,6 +30,7 @@ import {
GroupMatchingModeEnum,
OAuthSource,
OAuthSourceRequest,
PatchedOAuthSourceRequest,
PKCEMethodEnum,
ProviderTypeEnum,
SourcesApi,
@@ -81,6 +82,20 @@ export class OAuthSourceForm extends BaseSourceForm<OAuthSource> {
//#region Lifecycle
private get isAtProtocolSource(): boolean {
return (
this.providerType?.name === ProviderTypeEnum.Atproto ||
this.modelName?.includes("atproto") === true
);
}
private get isClientSecretRequired(): boolean {
if (this.isAtProtocolSource) {
return false;
}
return this.providerType?.clientSecretRequired !== false;
}
protected async loadInstance(pk: string): Promise<OAuthSource> {
const source = await new SourcesApi(DEFAULT_CONFIG).sourcesOauthRetrieve({
slug: pk,
@@ -97,16 +112,20 @@ export class OAuthSourceForm extends BaseSourceForm<OAuthSource> {
protected async send(data: OAuthSource): Promise<OAuthSource> {
data.providerType = (this.providerType?.name || "") as ProviderTypeEnum;
const requestData = data as unknown as OAuthSourceRequest & PatchedOAuthSourceRequest;
if (!this.isClientSecretRequired) {
requestData.consumerSecret = "";
}
if (this.instance) {
return new SourcesApi(DEFAULT_CONFIG).sourcesOauthPartialUpdate({
slug: this.instance.slug,
patchedOAuthSourceRequest: data,
patchedOAuthSourceRequest: requestData,
});
}
return new SourcesApi(DEFAULT_CONFIG).sourcesOauthCreate({
oAuthSourceRequest: data as unknown as OAuthSourceRequest,
oAuthSourceRequest: requestData,
});
}
@@ -186,9 +205,11 @@ export class OAuthSourceForm extends BaseSourceForm<OAuthSource> {
autocomplete="off"
/>
<p class="pf-c-form__helper-text">
${msg(
"URL used to request the initial token. This URL is only required for OAuth 1.",
)}
${this.isAtProtocolSource
? msg("URL used to create pushed authorization requests.")
: msg(
"URL used to request the initial token. This URL is only required for OAuth 1.",
)}
</p>
</ak-form-element-horizontal> `
: nothing}
@@ -405,16 +426,22 @@ export class OAuthSourceForm extends BaseSourceForm<OAuthSource> {
spellcheck="false"
required
/>
<p class="pf-c-form__helper-text">${msg("Also known as Client ID.")}</p>
<p class="pf-c-form__helper-text">
${this.isAtProtocolSource
? msg("Client metadata URL.")
: msg("Also known as Client ID.")}
</p>
</ak-form-element-horizontal>
<ak-secret-textarea-input
label=${msg("Consumer secret")}
name="consumerSecret"
input-hint="code"
help=${msg("Also known as Client Secret.")}
?required=${!this.instance}
?revealed=${!this.instance}
></ak-secret-textarea-input>
${this.isClientSecretRequired
? html`<ak-secret-textarea-input
label=${msg("Consumer secret")}
name="consumerSecret"
input-hint="code"
help=${msg("Also known as Client Secret.")}
?required=${!this.instance}
?revealed=${!this.instance}
></ak-secret-textarea-input>`
: nothing}
<ak-form-element-horizontal label=${msg("Scopes")} name="additionalScopes">
<input
type="text"

View File

@@ -37,6 +37,8 @@ export function ProviderToLabel(provider?: ProviderTypeEnum): string {
return "";
case ProviderTypeEnum.Apple:
return "Apple";
case ProviderTypeEnum.Atproto:
return "AT Protocol";
case ProviderTypeEnum.Azuread:
return "Azure Active Directory (Deprecated)";
case ProviderTypeEnum.Discord:

View File

@@ -622,6 +622,7 @@ const items = [
},
items: [
"users-sources/sources/social-logins/apple/index",
"users-sources/sources/social-logins/atproto/index",
"users-sources/sources/social-logins/discord/index",
{
type: "category",

View File

@@ -0,0 +1,107 @@
---
title: Log in with AT Protocol
sidebar_label: AT Protocol
tags:
- source
- atproto
- bluesky
- oauth
---
AT Protocol is the account and data protocol used by Bluesky. A user has a stable decentralized identifier (DID), usually signs in with a handle such as `alice.bsky.social`, and stores account data on a Personal Data Server (PDS).
Use an AT Protocol source when you want users to log in to authentik with a Bluesky or other AT Protocol account. authentik redirects the user to the AT Protocol authorization server, verifies the returned DID, fetches the user's profile, and links the DID to an authentik user.
## What you need
For Bluesky, you do not create an OAuth app in a developer console and you do not get a client secret. You publish a small client metadata JSON document on a public HTTPS URL, and that URL becomes the OAuth `client_id`.
You need:
- A public authentik URL, such as `https://authentik.company`.
- An AT Protocol source slug, such as `atproto`.
- A public HTTPS URL for client metadata, such as `https://metadata.company/authentik-atproto.json`.
## How login works
1. The user clicks the AT Protocol source on the authentik login page.
2. authentik creates a pushed authorization request (PAR) with PKCE and DPoP.
3. The browser is redirected to the AT Protocol authorization server.
4. The user signs in and approves the request.
5. The authorization server redirects back to authentik with a code.
6. authentik exchanges the code using DPoP, receives the user's DID, and verifies that the DID's PDS trusts the issuer.
7. authentik fetches profile data and links the DID to an authentik user.
## Client metadata
Host a JSON document at the exact URL used as `client_id`:
```json
{
"client_id": "https://metadata.company/authentik-atproto.json",
"client_name": "authentik",
"redirect_uris": ["https://authentik.company/source/oauth/callback/atproto/"],
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"scope": "atproto",
"token_endpoint_auth_method": "none",
"application_type": "web",
"dpop_bound_access_tokens": true
}
```
The callback URL must use the same slug as the authentik source.
To request email from Bluesky, set `scope` to `atproto transition:email` in the metadata and add `transition:email` to the authentik source scopes. Email is optional; authentik can still identify the account by DID and use the AT Protocol handle as the username.
## authentik configuration
1. Log in to authentik as an administrator and open the authentik Admin interface.
2. Navigate to **Directory** > **Federation and Social login**.
3. Click **New Source**.
4. Select **AT Protocol OAuth Source**.
5. Configure the source:
- **Name**: a display name, such as `Bluesky`.
- **Slug**: the slug used in the metadata redirect URI, such as `atproto`.
- **Consumer Key**: the client metadata URL, for example `https://metadata.company/authentik-atproto.json`.
- **Consumer Secret**: leave empty. AT Protocol does not use one.
- **Scopes**: optional extra scopes, such as `transition:email`.
6. Keep the default URL settings for Bluesky, or replace them for another AT Protocol server.
7. Click **Finish**.
:::info Display new source on login screen
For instructions on how to display the new source on the authentik login page, refer to the [Add sources to default login page documentation](../../index.md#add-sources-to-default-login-page).
:::
:::info Embed new source in flow :ak-enterprise
For instructions on embedding the new source within a flow, such as an authorization flow, refer to the [Source Stage documentation](../../../../../add-secure-apps/flows-stages/stages/source/).
:::
## User matching
By default, authentik maps:
- **Username**: the AT Protocol handle when available, otherwise the DID.
- **Name**: the profile display name when available, otherwise the handle.
- **Email**: the email returned when `transition:email` is granted and supported.
## Troubleshooting
### The source form asks for a consumer secret
AT Protocol sources should not require a consumer secret. Leave it empty.
### Bluesky rejects the client
Check that:
- The client metadata URL is public HTTPS.
- The metadata `client_id` exactly matches the metadata URL.
- The metadata `redirect_uris` value exactly matches `https://authentik.company/source/oauth/callback/atproto/`.
- The authentik source slug matches the callback URL path.
- `token_endpoint_auth_method` is `none`.
- `dpop_bound_access_tokens` is `true`.
## Resources
- [AT Protocol OAuth specification](https://atproto.com/specs/oauth)