Compare commits

...

2 Commits

Author SHA1 Message Date
Jens Langhammer
ab0c066506 refactor
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
2026-03-20 18:01:39 +01:00
Jens Langhammer
adb48f3a26 stages/user_login: init DBSC
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
2026-03-20 16:00:16 +01:00
3 changed files with 153 additions and 5 deletions

View File

@@ -28,6 +28,9 @@ from authentik.stages.user_login.middleware import (
SESSION_KEY_BINDING_NET,
)
from authentik.stages.user_login.models import UserLoginStage
from authentik.stages.user_login.views import (
set_dbsc_reg_header,
)
from authentik.tenants.utils import get_unique_identifier
COOKIE_NAME_KNOWN_DEVICE = "authentik_device"
@@ -97,11 +100,10 @@ class UserLoginStageView(ChallengeStageView):
f"{get_unique_identifier()}:{self.executor.current_stage.pk.hex}".encode("ascii")
).hexdigest()
def set_known_device_cookie(self, user: User):
def set_known_device_cookie(self, response: HttpResponse, user: User):
"""Set a cookie, valid longer than the session, which denotes that this user
has logged in on this device before."""
delta = timedelta_from_string(self.executor.current_stage.remember_device)
response = self.executor.stage_ok()
if delta.total_seconds() < 1:
return response
expiry = datetime.now() + delta
@@ -118,7 +120,6 @@ class UserLoginStageView(ChallengeStageView):
domain=settings.SESSION_COOKIE_DOMAIN,
samesite=settings.SESSION_COOKIE_SAMESITE,
)
return response
def is_known_device(self, user: User):
"""Returns `True` if the login happened on a "known" device, by the same user."""
@@ -180,6 +181,7 @@ class UserLoginStageView(ChallengeStageView):
Session.objects.filter(
authenticatedsession__user=user,
).exclude(session_key=self.request.session.session_key).delete()
response = self.executor.stage_ok()
if remember is None:
return self.set_known_device_cookie(user)
return self.executor.stage_ok()
self.set_known_device_cookie(response, user)
return set_dbsc_reg_header(self.request, response)

View File

@@ -1,7 +1,23 @@
"""API URLs"""
from django.urls import path
from authentik.stages.user_login.api import UserLoginStageViewSet
from authentik.stages.user_login.views import (
DeviceBoundSessionCredentailsStart,
DeviceBoundSessionCredentialRefresh,
)
api_urlpatterns = [
("stages/user_login", UserLoginStageViewSet),
path(
"dbsc/start/",
DeviceBoundSessionCredentailsStart.as_view(),
name="dbsc-start",
),
path(
"dbsc/refresh/",
DeviceBoundSessionCredentialRefresh.as_view(),
name="dbsc-refresh",
),
]

View File

@@ -0,0 +1,130 @@
from hashlib import sha256
from hmac import compare_digest
from http.cookies import Morsel
from json import dumps
from typing import Any
from django.conf import settings
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import HttpRequest, HttpResponse, HttpResponseBadRequest, JsonResponse
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from jwt import PyJWTError, decode_complete
from structlog.stdlib import get_logger
from authentik.lib.generators import generate_id
from authentik.providers.oauth2.models import JWTAlgorithms
SESSION_KEY_DBSC_CHALLENGE = "goauthentik.io/stages/user_login/dbsc/challenge"
HEADER_DSBC_REGISTRATION = "Secure-Session-Registration"
HEADER_DBSC_RESPONSE = "Secure-Session-Response"
DBSC_ALGS = [JWTAlgorithms.ES256, JWTAlgorithms.RS256]
DBSC_COOKIE_NAME = "ak_tk"
LOGGER = get_logger()
def cookie_to_attrs(cookie: Morsel) -> str:
"""Convert a cookie to its attributes in a string form (without expiry attributes and
without name and value)"""
cookie_clone: Morsel = cookie.copy()
cookie_clone.pop("expires", None)
cookie_clone.pop("max-age", None)
return (
cookie_clone.OutputString()
# Remove cookie name and value
.replace(f"{cookie_clone.key}={cookie_clone.value}", "")
# Remove leading semicolon
.lstrip(";")
# Remove surrounding spaces
.strip()
)
def set_dbsc_reg_header(request: HttpRequest, response: HttpResponse):
"""Generate Device-bound session credentials challenge and set header"""
challenge = generate_id()
request.session[SESSION_KEY_DBSC_CHALLENGE] = challenge
dbsc_start_url = reverse("authentik_api:dbsc-start")
response[HEADER_DSBC_REGISTRATION] = (
f'({" ".join(DBSC_ALGS)}); path="{dbsc_start_url}"; challenge="{challenge}"'
)
return response
def validate_dbsc_challenge(request: HttpRequest) -> dict[str, Any]:
"""Validate Device-bound session credentials"""
response = request.headers.get(HEADER_DBSC_RESPONSE)
if not response:
raise ValueError()
try:
decoded = decode_complete(
response,
algorithms=DBSC_ALGS,
options={"verify_signature": False},
)
except PyJWTError as exc:
LOGGER.warning("Invalid DBSC jwt", exc=exc)
raise ValueError() from None
if decoded["header"]["typ"] != "dbsc+jwt":
LOGGER.warning("DBSC JWT with incorrect typ")
raise ValueError()
if not compare_digest(
request.session[SESSION_KEY_DBSC_CHALLENGE], decoded["payload"]["jti"]
):
LOGGER.warning("DBSC challenge mismatch")
raise ValueError()
return decoded
@method_decorator(csrf_exempt, name="dispatch")
class DeviceBoundSessionCredentailsStart(LoginRequiredMixin, View):
def post(self, request: HttpRequest):
try:
validate_dbsc_challenge(request)
except ValueError:
return HttpResponseBadRequest()
LOGGER.info("Registered for device-bound session credentials")
response = HttpResponse(content_type="application/json")
response["Cache-Control"] = "no-store"
response.set_cookie(
DBSC_COOKIE_NAME,
"foo",
10,
path=settings.SESSION_COOKIE_PATH,
domain=settings.SESSION_COOKIE_DOMAIN,
httponly=True,
samesite=settings.SESSION_COOKIE_SAMESITE,
)
response.content = dumps(
{
"session_identifier": sha256(
self.request.session.session_key.encode("ascii")
).hexdigest(),
"refresh_url": reverse("authentik_api:dbsc-refresh"),
"scope": {
"origin": request._current_scheme_host,
"include_site": False,
},
"credentials": [
{
"type": "cookie",
"name": DBSC_COOKIE_NAME,
"attributes": cookie_to_attrs(response.cookies[DBSC_COOKIE_NAME]),
}
],
}
)
return response
@method_decorator(csrf_exempt, name="dispatch")
class DeviceBoundSessionCredentialRefresh(View):
def post(self, request: HttpRequest) -> HttpResponse:
if not request.user.is_authenticated:
return JsonResponse({"continue": False})
print(request.GET)
print(request.POST)
print(request.headers)
return HttpResponse()