Compare commits

...

1 Commits

Author SHA1 Message Date
Jens Langhammer
43628f308d initial steps for concurrent execution
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
2025-10-03 19:51:57 +02:00
8 changed files with 217 additions and 33 deletions

View File

@@ -54,6 +54,7 @@ class Challenge(PassiveSerializer):
flow_info = ContextualFlowInfo(required=False)
component = CharField(default="")
xid = CharField(required=False)
response_errors = DictField(
child=ErrorDetailSerializer(many=True), allow_empty=True, required=False

View File

@@ -143,10 +143,12 @@ class FlowPlan:
request: HttpRequest,
flow: Flow,
allowed_silent_types: list["StageView"] | None = None,
**get_params,
) -> HttpResponse:
"""Redirect to the flow executor for this flow plan"""
from authentik.flows.views.executor import (
SESSION_KEY_PLAN,
FlowContainer,
FlowExecutorView,
)
@@ -157,6 +159,7 @@ class FlowPlan:
# No unskippable stages found, so we can directly return the response of the last stage
final_stage: type[StageView] = self.bindings[-1].stage.view
temp_exec = FlowExecutorView(flow=flow, request=request, plan=self)
temp_exec.container = FlowContainer(request)
temp_exec.current_stage = self.bindings[-1].stage
temp_exec.current_stage_view = final_stage
temp_exec.setup(request, flow.slug)
@@ -174,6 +177,9 @@ class FlowPlan:
):
get_qs["inspector"] = "available"
for key, value in get_params:
get_qs[key] = value
return redirect_with_qs(
"authentik_core:if-flow",
get_qs,

View File

@@ -192,6 +192,7 @@ class ChallengeStageView(StageView):
)
flow_info.is_valid()
challenge.initial_data["flow_info"] = flow_info.data
challenge.initial_data["xid"] = self.executor.container.exec_id
if isinstance(challenge, WithUserInfoChallenge):
# If there's a pending user, update the `username` field
# this field is only used by password managers.

View File

@@ -29,7 +29,7 @@ window.authentik.flow = {
{% block body %}
<ak-skip-to-content></ak-skip-to-content>
<ak-message-container></ak-message-container>
<ak-flow-executor flowSlug="{{ flow.slug }}">
<ak-flow-executor flowSlug="{{ flow.slug }}" xid="{{ xid }}">
<ak-loading></ak-loading>
</ak-flow-executor>
{% endblock %}

View File

@@ -1,6 +1,7 @@
"""authentik multi-stage authentication engine"""
from copy import deepcopy
from uuid import uuid4
from django.conf import settings
from django.contrib.auth.mixins import LoginRequiredMixin
@@ -63,6 +64,7 @@ from authentik.policies.engine import PolicyEngine
LOGGER = get_logger()
# Argument used to redirect user after login
NEXT_ARG_NAME = "next"
SESSION_KEY_PLAN_CONTAINER = "authentik/flows/plan_container/%s"
SESSION_KEY_PLAN = "authentik/flows/plan"
SESSION_KEY_APPLICATION_PRE = "authentik/flows/application_pre"
SESSION_KEY_GET = "authentik/flows/get"
@@ -70,6 +72,7 @@ SESSION_KEY_POST = "authentik/flows/post"
SESSION_KEY_HISTORY = "authentik/flows/history"
QS_KEY_TOKEN = "flow_token" # nosec
QS_QUERY = "query"
QS_EXEC_ID = "xid"
def challenge_types():
@@ -96,6 +99,88 @@ class InvalidStageError(SentryIgnoredException):
"""Error raised when a challenge from a stage is not valid"""
class FlowContainer:
"""Allow for multiple concurrent flow executions in the same session"""
def __init__(self, request: HttpRequest, exec_id: str | None = None) -> None:
self.request = request
self.exec_id = exec_id
@staticmethod
def new(request: HttpRequest):
exec_id = str(uuid4())
request.session[SESSION_KEY_PLAN_CONTAINER % exec_id] = {}
return FlowContainer(request, exec_id)
def exists(self) -> bool:
"""Check if flow exists in container/session"""
return SESSION_KEY_PLAN in self.session
def save(self):
self.request.session.modified = True
@property
def session(self):
# Backwards compatibility: store session plan/etc directly in session
if not self.exec_id:
return self.request.session
self.request.session.setdefault(SESSION_KEY_PLAN_CONTAINER % self.exec_id, {})
return self.request.session.get(SESSION_KEY_PLAN_CONTAINER % self.exec_id, {})
@property
def plan(self) -> FlowPlan:
return self.session.get(SESSION_KEY_PLAN)
def to_redirect(
self,
request: HttpRequest,
flow: Flow,
allowed_silent_types: list[StageView] | None = None,
**get_params,
) -> HttpResponse:
get_params[QS_EXEC_ID] = self.exec_id
return self.plan.to_redirect(
request, flow, allowed_silent_types=allowed_silent_types, **get_params
)
@plan.setter
def plan(self, value: FlowPlan):
self.session[SESSION_KEY_PLAN] = value
self.request.session.modified = True
self.save()
@property
def application_pre(self):
return self.session.get(SESSION_KEY_APPLICATION_PRE)
@property
def get(self) -> QueryDict:
return self.session.get(SESSION_KEY_GET)
@get.setter
def get(self, value: QueryDict):
self.session[SESSION_KEY_GET] = value
self.save()
@property
def post(self) -> QueryDict:
return self.session.get(SESSION_KEY_POST)
@post.setter
def post(self, value: QueryDict):
self.session[SESSION_KEY_POST] = value
self.save()
@property
def history(self) -> list[FlowPlan]:
return self.session.get(SESSION_KEY_HISTORY)
@history.setter
def history(self, value: list[FlowPlan]):
self.session[SESSION_KEY_HISTORY] = value
self.save()
@method_decorator(xframe_options_sameorigin, name="dispatch")
class FlowExecutorView(APIView):
"""Flow executor, passing requests to Stage Views"""
@@ -103,8 +188,9 @@ class FlowExecutorView(APIView):
permission_classes = [AllowAny]
flow: Flow = None
plan: FlowPlan | None = None
container: FlowContainer
current_binding: FlowStageBinding | None = None
current_stage: Stage
current_stage_view: View
@@ -160,10 +246,12 @@ class FlowExecutorView(APIView):
if QS_KEY_TOKEN in get_params:
plan = self._check_flow_token(get_params[QS_KEY_TOKEN])
if plan:
self.request.session[SESSION_KEY_PLAN] = plan
container = FlowContainer.new(request)
container.plan = plan
# Early check if there's an active Plan for the current session
if SESSION_KEY_PLAN in self.request.session:
self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
self.container = FlowContainer(request, request.GET.get(QS_EXEC_ID))
if self.container.exists():
self.plan: FlowPlan = self.container.plan
if self.plan.flow_pk != self.flow.pk.hex:
self._logger.warning(
"f(exec): Found existing plan for other flow, deleting plan",
@@ -176,13 +264,14 @@ class FlowExecutorView(APIView):
self._logger.debug("f(exec): Continuing existing plan")
# Initial flow request, check if we have an upstream query string passed in
request.session[SESSION_KEY_GET] = get_params
self.container.get = get_params
# Don't check session again as we've either already loaded the plan or we need to plan
if not self.plan:
request.session[SESSION_KEY_HISTORY] = []
self.container.history = []
self._logger.debug("f(exec): No active Plan found, initiating planner")
try:
self.plan = self._initiate_plan()
self.container.plan = self.plan
except FlowNonApplicableException as exc:
self._logger.warning("f(exec): Flow not applicable to current user", exc=exc)
return self.handle_invalid_flow(exc)
@@ -255,12 +344,19 @@ class FlowExecutorView(APIView):
request=OpenApiTypes.NONE,
parameters=[
OpenApiParameter(
name="query",
name=QS_QUERY,
location=OpenApiParameter.QUERY,
required=True,
description="Querystring as received",
type=OpenApiTypes.STR,
)
),
OpenApiParameter(
name=QS_EXEC_ID,
location=OpenApiParameter.QUERY,
required=False,
description="Flow execution ID",
type=OpenApiTypes.STR,
),
],
operation_id="flows_executor_get",
)
@@ -287,8 +383,8 @@ class FlowExecutorView(APIView):
span.set_data("authentik Stage", self.current_stage_view)
span.set_data("authentik Flow", self.flow.slug)
stage_response = self.current_stage_view.dispatch(request)
return to_stage_response(request, stage_response)
except Exception as exc: # noqa
return to_stage_response(request, stage_response, self.container.exec_id)
except Exception as exc:
return self.handle_exception(exc)
@extend_schema(
@@ -306,12 +402,19 @@ class FlowExecutorView(APIView):
),
parameters=[
OpenApiParameter(
name="query",
name=QS_QUERY,
location=OpenApiParameter.QUERY,
required=True,
description="Querystring as received",
type=OpenApiTypes.STR,
)
),
OpenApiParameter(
name=QS_EXEC_ID,
location=OpenApiParameter.QUERY,
required=True,
description="Flow execution ID",
type=OpenApiTypes.STR,
),
],
operation_id="flows_executor_solve",
)
@@ -338,14 +441,15 @@ class FlowExecutorView(APIView):
span.set_data("authentik Stage", self.current_stage_view)
span.set_data("authentik Flow", self.flow.slug)
stage_response = self.current_stage_view.dispatch(request)
return to_stage_response(request, stage_response)
return to_stage_response(request, stage_response, self.container.exec_id)
except Exception as exc: # noqa
return self.handle_exception(exc)
def _initiate_plan(self) -> FlowPlan:
planner = FlowPlanner(self.flow)
plan = planner.plan(self.request)
self.request.session[SESSION_KEY_PLAN] = plan
container = FlowContainer.new(self.request)
container.plan = plan
try:
# Call the has_stages getter to check that
# there are no issues with the class we might've gotten
@@ -369,7 +473,7 @@ class FlowExecutorView(APIView):
except FlowNonApplicableException as exc:
self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc)
return self.handle_invalid_flow(exc)
self.request.session[SESSION_KEY_PLAN] = plan
self.container.plan = plan
kwargs = self.kwargs
kwargs.update({"flow_slug": self.flow.slug})
return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
@@ -391,9 +495,13 @@ class FlowExecutorView(APIView):
)
self.cancel()
if next_param and not is_url_absolute(next_param):
return to_stage_response(self.request, redirect_with_qs(next_param))
return to_stage_response(
self.request, redirect_with_qs(next_param), self.container.exec_id
)
return to_stage_response(
self.request, self.stage_invalid(error_message=_("Invalid next URL"))
self.request,
self.stage_invalid(error_message=_("Invalid next URL")),
self.container.exec_id,
)
def stage_ok(self) -> HttpResponse:
@@ -407,7 +515,7 @@ class FlowExecutorView(APIView):
self.current_stage_view.cleanup()
self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan))
self.plan.pop()
self.request.session[SESSION_KEY_PLAN] = self.plan
self.container.plan = self.plan
if self.plan.bindings:
self._logger.debug(
"f(exec): Continuing with next stage",
@@ -450,6 +558,7 @@ class FlowExecutorView(APIView):
def cancel(self):
"""Cancel current flow execution"""
# TODO: Clean up container
keys_to_delete = [
SESSION_KEY_APPLICATION_PRE,
SESSION_KEY_PLAN,
@@ -472,8 +581,8 @@ class CancelView(View):
def get(self, request: HttpRequest) -> HttpResponse:
"""View which canels the currently active plan"""
if SESSION_KEY_PLAN in request.session:
del request.session[SESSION_KEY_PLAN]
if FlowContainer(request, request.GET.get(QS_EXEC_ID)).exists():
del request.session[SESSION_KEY_PLAN_CONTAINER % request.GET.get(QS_EXEC_ID)]
LOGGER.debug("Canceled current plan")
return redirect("authentik_flows:default-invalidation")
@@ -521,19 +630,12 @@ class ToDefaultFlow(View):
def dispatch(self, request: HttpRequest) -> HttpResponse:
flow = self.get_flow()
# If user already has a pending plan, clear it so we don't have to later.
if SESSION_KEY_PLAN in self.request.session:
plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
if plan.flow_pk != flow.pk.hex:
LOGGER.warning(
"f(def): Found existing plan for other flow, deleting plan",
flow_slug=flow.slug,
)
del self.request.session[SESSION_KEY_PLAN]
return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug)
get_qs = request.GET.copy()
get_qs[QS_EXEC_ID] = str(uuid4())
return redirect_with_qs("authentik_core:if-flow", get_qs, flow_slug=flow.slug)
def to_stage_response(request: HttpRequest, source: HttpResponse) -> HttpResponse:
def to_stage_response(request: HttpRequest, source: HttpResponse, xid: str) -> HttpResponse:
"""Convert normal HttpResponse into JSON Response"""
if (
isinstance(source, HttpResponseRedirect)
@@ -552,6 +654,7 @@ def to_stage_response(request: HttpRequest, source: HttpResponse) -> HttpRespons
RedirectChallenge(
{
"to": str(redirect_url),
"xid": xid,
}
)
)
@@ -560,6 +663,7 @@ def to_stage_response(request: HttpRequest, source: HttpResponse) -> HttpRespons
ShellChallenge(
{
"body": source.render().content.decode("utf-8"),
"xid": xid,
}
)
)
@@ -569,6 +673,7 @@ def to_stage_response(request: HttpRequest, source: HttpResponse) -> HttpRespons
ShellChallenge(
{
"body": source.content.decode("utf-8"),
"xid": xid,
}
)
)
@@ -600,4 +705,6 @@ class ConfigureFlowInitView(LoginRequiredMixin, View):
except FlowNonApplicableException:
LOGGER.warning("Flow not applicable to user")
raise Http404 from None
return plan.to_redirect(request, stage.configure_flow)
container = FlowContainer.new(request)
container.plan = plan
return container.to_redirect(request, stage.configure_flow)

View File

@@ -7,6 +7,7 @@ from ua_parser.user_agent_parser import Parse
from authentik.core.views.interface import InterfaceView
from authentik.flows.models import Flow
from authentik.flows.views.executor import QS_EXEC_ID
class FlowInterfaceView(InterfaceView):
@@ -17,6 +18,7 @@ class FlowInterfaceView(InterfaceView):
kwargs["flow"] = flow
kwargs["flow_background_url"] = flow.background_url(self.request)
kwargs["inspector"] = "inspector" in self.request.GET
kwargs["xid"] = self.request.GET.get(QS_EXEC_ID)
return super().get_context_data(**kwargs)
def compat_needs_sfe(self) -> bool:

2
go.mod
View File

@@ -96,3 +96,5 @@ require (
google.golang.org/protobuf v1.36.8 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace goauthentik.io/api/v3 => ./gen-go-api

View File

@@ -8138,6 +8138,11 @@ paths:
type: string
description: Querystring as received
required: true
- in: query
name: xid
schema:
type: string
description: Flow execution ID
tags:
- flows
security:
@@ -8178,6 +8183,12 @@ paths:
type: string
description: Querystring as received
required: true
- in: query
name: xid
schema:
type: string
description: Flow execution ID
required: true
tags:
- flows
requestBody:
@@ -39574,6 +39585,8 @@ components:
component:
type: string
default: ak-stage-access-denied
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -39696,6 +39709,8 @@ components:
component:
type: string
default: ak-source-oauth-apple
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -40031,6 +40046,8 @@ components:
component:
type: string
default: ak-stage-authenticator-duo
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -40187,6 +40204,8 @@ components:
component:
type: string
default: ak-stage-authenticator-email
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -40441,6 +40460,8 @@ components:
component:
type: string
default: ak-stage-authenticator-sms
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -40601,6 +40622,8 @@ components:
component:
type: string
default: ak-stage-authenticator-static
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -40719,6 +40742,8 @@ components:
component:
type: string
default: ak-stage-authenticator-totp
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -40943,6 +40968,8 @@ components:
component:
type: string
default: ak-stage-authenticator-validate
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -40996,6 +41023,8 @@ components:
component:
type: string
default: ak-stage-authenticator-webauthn
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -41158,6 +41187,8 @@ components:
component:
type: string
default: ak-stage-autosubmit
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -41452,6 +41483,8 @@ components:
component:
type: string
default: ak-stage-captcha
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -41866,6 +41899,8 @@ components:
component:
type: string
default: ak-stage-consent
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -42661,6 +42696,8 @@ components:
component:
type: string
default: ak-stage-dummy
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -42863,6 +42900,8 @@ components:
component:
type: string
default: ak-stage-email
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -43844,6 +43883,8 @@ components:
component:
type: string
default: ak-stage-flow-error
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -44172,6 +44213,8 @@ components:
component:
type: string
default: xak-flow-frame
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -45142,6 +45185,8 @@ components:
component:
type: string
default: ak-stage-identification
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -47848,6 +47893,8 @@ components:
component:
type: string
default: ak-provider-oauth2-device-code
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -47876,6 +47923,8 @@ components:
component:
type: string
default: ak-provider-oauth2-device-code-finish
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -50671,6 +50720,8 @@ components:
component:
type: string
default: ak-stage-password
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -54630,6 +54681,8 @@ components:
component:
type: string
default: ak-source-plex
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -55155,6 +55208,8 @@ components:
component:
type: string
default: ak-stage-prompt
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -56368,6 +56423,8 @@ components:
component:
type: string
default: xak-flow-redirect
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -58413,6 +58470,8 @@ components:
component:
type: string
default: ak-stage-session-end
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -58583,6 +58642,8 @@ components:
component:
type: string
default: xak-flow-shell
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -59270,6 +59331,8 @@ components:
component:
type: string
default: ak-source-telegram
xid:
type: string
response_errors:
type: object
additionalProperties:
@@ -60255,6 +60318,8 @@ components:
component:
type: string
default: ak-stage-user-login
xid:
type: string
response_errors:
type: object
additionalProperties: