Compare commits

..

1 Commits

Author SHA1 Message Date
Teffen Ellis
20d5228de5 web: Remember me controller clean up. 2026-01-26 19:17:27 +01:00
905 changed files with 9402 additions and 159656 deletions

View File

@@ -215,9 +215,6 @@ runs:
--head "$CHERRY_PICK_BRANCH" \
--label "cherry-pick")
# Assign the PR to the original author
gh pr edit "$NEW_PR" --add-assignee "$PR_AUTHOR" || true
echo "✅ Created cherry-pick PR $NEW_PR for $TARGET_BRANCH"
# Comment on original PR
@@ -257,9 +254,6 @@ runs:
--head "$CHERRY_PICK_BRANCH" \
--label "cherry-pick")
# Assign the PR to the original author
gh pr edit "$NEW_PR" --add-assignee "$PR_AUTHOR" || true
echo "⚠️ Created conflict resolution PR $NEW_PR for $TARGET_BRANCH"
# Comment on original PR

View File

@@ -22,7 +22,7 @@ runs:
sudo rm -rf /usr/local/lib/android
- name: Install uv
if: ${{ contains(inputs.dependencies, 'python') }}
uses: astral-sh/setup-uv@eac588ad8def6316056a12d4907a9d4d84ff7a3b # v5
uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v5
with:
enable-cache: true
- name: Setup python
@@ -58,7 +58,7 @@ runs:
run: |
export PSQL_TAG=${{ inputs.postgresql_version }}
docker compose -f .github/actions/setup/compose.yml up -d
cd web && npm ci
cd web && npm i
- name: Generate config
if: ${{ contains(inputs.dependencies, 'python') }}
shell: uv run python {0}

View File

@@ -56,29 +56,30 @@ jobs:
release: ${{ inputs.release }}
- name: Login to Docker Hub
if: ${{ inputs.registry_dockerhub }}
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
with:
username: ${{ secrets.DOCKER_CORP_USERNAME }}
password: ${{ secrets.DOCKER_CORP_PASSWORD }}
- name: Login to GitHub Container Registry
if: ${{ inputs.registry_ghcr }}
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # v5
- name: make empty clients
if: ${{ inputs.release }}
run: |
mkdir -p ./gen-ts-api
mkdir -p ./gen-go-api
- name: Setup node
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # v5
with:
node-version-file: web/package.json
cache: "npm"
cache-dependency-path: web/package-lock.json
- uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6
with:
go-version-file: "go.mod"
- name: Generate API Clients
run: |
make gen-client-ts
make gen-client-go
- name: generate ts client
run: make gen-client-ts
- name: Build Docker Image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # v6
id: push
@@ -95,7 +96,7 @@ jobs:
platforms: linux/${{ inputs.image_arch }}
cache-from: type=registry,ref=${{ steps.ev.outputs.attestImageNames }}:buildcache-${{ inputs.image_arch }}
cache-to: ${{ steps.ev.outputs.cacheTo }}
- uses: actions/attest-build-provenance@96278af6caaf10aea03fd8d33a09a777ca52d62f # v3
- uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3
id: attest
if: ${{ steps.ev.outputs.shouldPush == 'true' }}
with:

View File

@@ -79,25 +79,25 @@ jobs:
image-name: ${{ inputs.image_name }}
- name: Login to Docker Hub
if: ${{ inputs.registry_dockerhub }}
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
with:
username: ${{ secrets.DOCKER_CORP_USERNAME }}
password: ${{ secrets.DOCKER_CORP_PASSWORD }}
- name: Login to GitHub Container Registry
if: ${{ inputs.registry_ghcr }}
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- uses: int128/docker-manifest-create-action@1a059c021f1d5e9f2bd39de745d5dd3a0ef6df90 # v2
- uses: int128/docker-manifest-create-action@a39573caa37b6a8a03302d43b57c3f48635096e2 # v2
id: build
with:
tags: ${{ matrix.tag }}
sources: |
${{ steps.ev.outputs.attestImageNames }}@${{ needs.build-server-amd64.outputs.image-digest }}
${{ steps.ev.outputs.attestImageNames }}@${{ needs.build-server-arm64.outputs.image-digest }}
- uses: actions/attest-build-provenance@96278af6caaf10aea03fd8d33a09a777ca52d62f # v3
- uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3
id: attest
with:
subject-name: ${{ steps.ev.outputs.attestImageNames }}

View File

@@ -41,7 +41,7 @@ jobs:
- working-directory: website/
name: Install Dependencies
run: npm ci
- uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v4
- uses: actions/cache@8b402f58fbc84540c8b491a91e594a4576fec3d7 # v4
with:
path: |
${{ github.workspace }}/website/api/.docusaurus

View File

@@ -15,8 +15,6 @@ on:
jobs:
lint:
runs-on: ubuntu-latest
env:
NODE_ENV: production
strategy:
fail-fast: false
matrix:
@@ -32,8 +30,7 @@ jobs:
run: npm run ${{ matrix.command }}
build-docs:
runs-on: ubuntu-latest
env:
NODE_ENV: production
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v5
- uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # v5
@@ -49,8 +46,7 @@ jobs:
run: npm run build
build-integrations:
runs-on: ubuntu-latest
env:
NODE_ENV: production
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v5
- uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # v5
@@ -89,7 +85,7 @@ jobs:
image-name: ghcr.io/goauthentik/dev-docs
- name: Login to Container Registry
if: ${{ steps.ev.outputs.shouldPush == 'true' }}
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
@@ -105,7 +101,7 @@ jobs:
context: .
cache-from: type=registry,ref=ghcr.io/goauthentik/dev-docs:buildcache
cache-to: ${{ steps.ev.outputs.shouldPush == 'true' && 'type=registry,ref=ghcr.io/goauthentik/dev-docs:buildcache,mode=max' || '' }}
- uses: actions/attest-build-provenance@96278af6caaf10aea03fd8d33a09a777ca52d62f # v3
- uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3
id: attest
if: ${{ steps.ev.outputs.shouldPush == 'true' }}
with:

View File

@@ -187,8 +187,6 @@ jobs:
glob: tests/e2e/test_provider_saml* tests/e2e/test_source_saml*
- name: ldap
glob: tests/e2e/test_provider_ldap* tests/e2e/test_source_ldap*
- name: ws-fed
glob: tests/e2e/test_provider_ws_fed*
- name: radius
glob: tests/e2e/test_provider_radius*
- name: scim
@@ -205,7 +203,7 @@ jobs:
run: |
docker compose -f tests/e2e/compose.yml up -d --quiet-pull
- id: cache-web
uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v4
uses: actions/cache@8b402f58fbc84540c8b491a91e594a4576fec3d7 # v4
with:
path: web/dist
key: ${{ runner.os }}-web-${{ hashFiles('web/package-lock.json', 'package-lock.json', 'web/src/**', 'web/packages/sfe/src/**') }}-b
@@ -248,7 +246,7 @@ jobs:
run: |
docker compose -f tests/openid_conformance/compose.yml up -d --quiet-pull
- id: cache-web
uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v4
uses: actions/cache@8b402f58fbc84540c8b491a91e594a4576fec3d7 # v4
with:
path: web/dist
key: ${{ runner.os }}-web-${{ hashFiles('web/package-lock.json', 'web/src/**', 'web/packages/sfe/src/**') }}-b

View File

@@ -102,7 +102,7 @@ jobs:
image-name: ghcr.io/goauthentik/dev-${{ matrix.type }}
- name: Login to Container Registry
if: ${{ steps.ev.outputs.shouldPush == 'true' }}
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
@@ -122,7 +122,7 @@ jobs:
context: .
cache-from: type=registry,ref=ghcr.io/goauthentik/dev-${{ matrix.type }}:buildcache
cache-to: ${{ steps.ev.outputs.shouldPush == 'true' && format('type=registry,ref=ghcr.io/goauthentik/dev-{0}:buildcache,mode=max', matrix.type) || '' }}
- uses: actions/attest-build-provenance@96278af6caaf10aea03fd8d33a09a777ca52d62f # v3
- uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3
id: attest
if: ${{ steps.ev.outputs.shouldPush == 'true' }}
with:

View File

@@ -38,7 +38,7 @@ jobs:
token: ${{ steps.generate_token.outputs.token }}
- name: Compress images
id: compress
uses: calibreapp/image-actions@d9c8ee5c3dc52ae4622c82ead88d658f4b16b65f # main
uses: calibreapp/image-actions@420075c115b26f8785e293c5bd5bef0911c506e5 # main
with:
GITHUB_TOKEN: ${{ steps.generate_token.outputs.token }}
compressOnly: ${{ github.event_name != 'pull_request' }}

View File

@@ -40,7 +40,7 @@ jobs:
registry-url: "https://registry.npmjs.org"
- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@8cba46e29c11878d930bca7870bb54394d3e8b21 # 24d32ffd492484c1d75e0c0b894501ddb9d30d62
uses: tj-actions/changed-files@e0021407031f5be11a464abee9a0776171c79891 # 24d32ffd492484c1d75e0c0b894501ddb9d30d62
with:
files: |
${{ matrix.package }}/package.json

View File

@@ -44,7 +44,7 @@ jobs:
with:
image-name: ghcr.io/goauthentik/docs
- name: Login to GitHub Container Registry
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
@@ -58,7 +58,7 @@ jobs:
push: true
platforms: linux/amd64,linux/arm64
context: .
- uses: actions/attest-build-provenance@96278af6caaf10aea03fd8d33a09a777ca52d62f # v3
- uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3
id: attest
if: true
with:
@@ -87,11 +87,6 @@ jobs:
- uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6
with:
go-version-file: "go.mod"
- uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # v5
with:
node-version-file: web/package.json
cache: "npm"
cache-dependency-path: web/package-lock.json
- name: Set up QEMU
uses: docker/setup-qemu-action@c7c53464625b32c7a7e944ae62b3e17d2b600130 # v3.7.0
- name: Set up Docker Buildx
@@ -103,17 +98,17 @@ jobs:
DOCKER_USERNAME: ${{ secrets.DOCKER_CORP_USERNAME }}
with:
image-name: ghcr.io/goauthentik/${{ matrix.type }},authentik/${{ matrix.type }}
- name: Generate API Clients
- name: make empty clients
run: |
make gen-client-ts
make gen-client-go
mkdir -p ./gen-ts-api
mkdir -p ./gen-go-api
- name: Docker Login Registry
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
with:
username: ${{ secrets.DOCKER_CORP_USERNAME }}
password: ${{ secrets.DOCKER_CORP_PASSWORD }}
- name: Login to GitHub Container Registry
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
@@ -129,7 +124,7 @@ jobs:
file: lifecycle/container/${{ matrix.type }}.Dockerfile
platforms: linux/amd64,linux/arm64
context: .
- uses: actions/attest-build-provenance@96278af6caaf10aea03fd8d33a09a777ca52d62f # v3
- uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3
id: attest
with:
subject-name: ${{ steps.ev.outputs.attestImageNames }}
@@ -160,17 +155,10 @@ jobs:
node-version-file: web/package.json
cache: "npm"
cache-dependency-path: web/package-lock.json
- name: Install web dependencies
working-directory: web/
run: |
npm ci
- name: Generate API Clients
run: |
make gen-client-ts
make gen-client-go
- name: Build web
working-directory: web/
run: |
npm ci
npm run build-proxy
- name: Build outpost
run: |
@@ -199,7 +187,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v5
- uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7 # v6.0.0
- uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708 # v5
with:
role-to-assume: "arn:aws:iam::016170277896:role/github_goauthentik_authentik"
aws-region: ${{ env.AWS_REGION }}
@@ -217,12 +205,12 @@ jobs:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v5
- name: Run test suite in final docker images
run: |
echo "PG_PASS=$(openssl rand 32 | base64 -w 0)" >> lifecycle/container/.env
echo "AUTHENTIK_SECRET_KEY=$(openssl rand 32 | base64 -w 0)" >> lifecycle/container/.env
docker compose -f lifecycle/container/compose.yml pull -q
docker compose -f lifecycle/container/compose.yml up --no-start
docker compose -f lifecycle/container/compose.yml start postgresql
docker compose -f lifecycle/container/compose.yml run -u root server test-all
echo "PG_PASS=$(openssl rand 32 | base64 -w 0)" >> .env
echo "AUTHENTIK_SECRET_KEY=$(openssl rand 32 | base64 -w 0)" >> .env
docker compose pull -q
docker compose up --no-start
docker compose start postgresql
docker compose run -u root server test-all
sentry-release:
needs:
- build-server

View File

@@ -55,8 +55,6 @@ jobs:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v5
with:
ref: "version-${{ needs.check-inputs.outputs.major_version }}"
- name: Setup authentik env
uses: ./.github/actions/setup
- run: make test-docker
bump-authentik:
name: Bump authentik version
@@ -91,7 +89,6 @@ jobs:
# ID from https://api.github.com/users/authentik-automation[bot]
git config --global user.name '${{ steps.app-token.outputs.app-slug }}[bot]'
git config --global user.email '${{ steps.get-user-id.outputs.user-id }}+${{ steps.app-token.outputs.app-slug }}[bot]@users.noreply.github.com'
git pull
git commit -a -m "release: ${{ inputs.version }}" --allow-empty
git tag "version/${{ inputs.version }}" HEAD -m "version/${{ inputs.version }}"
git push --follow-tags
@@ -175,25 +172,21 @@ jobs:
if: "${{ inputs.release_reason == 'feature' }}"
run: |
changelog_url="https://docs.goauthentik.io/docs/releases/${{ needs.check-inputs.outputs.major_version }}"
reason="{{ inputs.release_reason }}"
jq \
--arg version "${{ inputs.version }}" \
--arg changelog "See ${changelog_url}" \
--arg changelog_url "${changelog_url}" \
--arg reason "${reason}" \
'.stable.version = $version | .stable.changelog = $changelog | .stable.changelog_url = $changelog_url | .stable.reason = $reason' version.json > version.new.json
'.stable.version = $version | .stable.changelog = $changelog | .stable.changelog_url = $changelog_url' version.json > version.new.json
mv version.new.json version.json
- name: Bump version
if: "${{ inputs.release_reason != 'feature' }}"
run: |
changelog_url="https://docs.goauthentik.io/docs/releases/${{ needs.check-inputs.outputs.major_version }}#fixed-in-$(echo -n ${{ inputs.version}} | sed 's/\.//g')"
reason="{{ inputs.release_reason }}"
jq \
--arg version "${{ inputs.version }}" \
--arg changelog "See ${changelog_url}" \
--arg changelog_url "${changelog_url}" \
--arg reason "${reason}" \
'.stable.version = $version | .stable.changelog = $changelog | .stable.changelog_url = $changelog_url | .stable.reason = $reason' version.json > version.new.json
'.stable.version = $version | .stable.changelog = $changelog | .stable.changelog_url = $changelog_url' version.json > version.new.json
mv version.new.json version.json
- name: Create pull request
uses: peter-evans/create-pull-request@c0f553fe549906ede9cf27b5156039d195d2ece0 # v7

View File

@@ -5,6 +5,7 @@ SHELL := /usr/bin/env bash
PWD = $(shell pwd)
UID = $(shell id -u)
GID = $(shell id -g)
NPM_VERSION = $(shell python -m scripts.generate_semver)
PY_SOURCES = authentik packages tests scripts lifecycle .github
DOCKER_IMAGE ?= "authentik:test"
@@ -49,14 +50,6 @@ ifeq ($(UNAME_S),Darwin)
endif
endif
NPM_VERSION :=
UV_EXISTS := $(shell command -v uv 2> /dev/null)
ifdef UV_EXISTS
NPM_VERSION := $(shell $(UV) run python -m scripts.generate_semver)
else
NPM_VERSION = $(shell python -m scripts.generate_semver)
endif
all: lint-fix lint gen web test ## Lint, build, and test everything
HELP_WIDTH := $(shell grep -h '^[a-z][^ ]*:.*\#\#' $(MAKEFILE_LIST) 2>/dev/null | \
@@ -84,7 +77,7 @@ lint-fix: lint-codespell ## Lint and automatically fix errors in the python sou
lint-codespell: ## Reports spelling errors.
$(UV) run codespell -w
lint: ci-bandit ci-mypy ## Lint the python and golang sources
lint: ci-bandit ## Lint the python and golang sources
golangci-lint run -v
core-install:
@@ -148,11 +141,11 @@ bump: ## Bump authentik version. Usage: make bump version=20xx.xx.xx
ifndef version
$(error Usage: make bump version=20xx.xx.xx )
endif
$(eval current_version := $(shell cat ${PWD}/internal/constants/VERSION))
$(SED_INPLACE) 's/^version = ".*"/version = "$(version)"/' ${PWD}/pyproject.toml
$(SED_INPLACE) 's/^VERSION = ".*"/VERSION = "$(version)"/' ${PWD}/authentik/__init__.py
$(SED_INPLACE) 's/^version = ".*"/version = "$(version)"/' pyproject.toml
$(SED_INPLACE) 's/^VERSION = ".*"/VERSION = "$(version)"/' authentik/__init__.py
$(MAKE) gen-build gen-compose aws-cfn
$(SED_INPLACE) "s/\"${current_version}\"/\"$(version)\"/" ${PWD}/package.json ${PWD}/package-lock.json ${PWD}/web/package.json ${PWD}/web/package-lock.json
npm version --no-git-tag-version --allow-same-version $(version)
cd ${PWD}/web && npm version --no-git-tag-version --allow-same-version $(version)
echo -n $(version) > ${PWD}/internal/constants/VERSION
#########################
@@ -213,15 +206,24 @@ gen-client-ts: gen-clean-ts ## Build and install the authentik API for Typescri
gen-client-py: gen-clean-py ## Build and install the authentik API for Python
mkdir -p ${PWD}/${GEN_API_PY}
ifeq ($(wildcard ${PWD}/${GEN_API_PY}/.*),)
git clone --depth 1 https://github.com/goauthentik/client-python.git ${PWD}/${GEN_API_PY}
else
cd ${PWD}/${GEN_API_PY} && git pull
endif
cp ${PWD}/schema.yml ${PWD}/${GEN_API_PY}
make -C ${PWD}/${GEN_API_PY} build version=${NPM_VERSION}
gen-client-go: gen-clean-go ## Build and install the authentik API for Golang
gen-client-go: ## Build and install the authentik API for Golang
mkdir -p ${PWD}/${GEN_API_GO}
ifeq ($(wildcard ${PWD}/${GEN_API_GO}/.*),)
git clone --depth 1 https://github.com/goauthentik/client-go.git ${PWD}/${GEN_API_GO}
else
cd ${PWD}/${GEN_API_GO} && git reset --hard
cd ${PWD}/${GEN_API_GO} && git pull
endif
cp ${PWD}/schema.yml ${PWD}/${GEN_API_GO}
make -C ${PWD}/${GEN_API_GO} build version=${NPM_VERSION}
make -C ${PWD}/${GEN_API_GO} build
go mod edit -replace goauthentik.io/api/v3=./${GEN_API_GO}
gen-dev-config: ## Generate a local development config file

View File

@@ -20,8 +20,8 @@ Even if the issue is not a CVE, we still greatly appreciate your help in hardeni
| Version | Supported |
| ---------- | ---------- |
| 2025.10.x | ✅ |
| 2025.12.x | ✅ |
| 2026.2.x | ✅ |
## Reporting a Vulnerability

View File

@@ -3,7 +3,7 @@
from functools import lru_cache
from os import environ
VERSION = "2026.2.1"
VERSION = "2026.2.0-rc1"
ENV_GIT_HASH_KEY = "GIT_BUILD_HASH"

View File

@@ -18,6 +18,7 @@ from rest_framework.views import APIView
from authentik import authentik_full_version
from authentik.core.api.utils import PassiveSerializer
from authentik.enterprise.license import LicenseKey
from authentik.lib.config import CONFIG
from authentik.lib.utils.reflection import get_env
from authentik.outposts.apps import MANAGED_OUTPOST
@@ -25,15 +26,6 @@ from authentik.outposts.models import Outpost
from authentik.rbac.permissions import HasPermission
def fips_enabled():
try:
from authentik.enterprise.license import LicenseKey
return backend._fips_enabled if LicenseKey.get_total().status().is_valid else None
except ModuleNotFoundError:
return None
class RuntimeDict(TypedDict):
"""Runtime information"""
@@ -88,7 +80,9 @@ class SystemInfoSerializer(PassiveSerializer):
"architecture": platform.machine(),
"authentik_version": authentik_full_version(),
"environment": get_env(),
"openssl_fips_enabled": fips_enabled(),
"openssl_fips_enabled": (
backend._fips_enabled if LicenseKey.get_total().status().is_valid else None
),
"openssl_version": OPENSSL_VERSION,
"platform": platform.platform(),
"python_version": python_version,

View File

@@ -1,3 +1,5 @@
import mimetypes
from django.db.models import Q
from django.utils.translation import gettext as _
from drf_spectacular.utils import extend_schema
@@ -10,14 +12,13 @@ from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView
from authentik.admin.files.backends.base import get_content_type
from authentik.admin.files.fields import FileField as AkFileField
from authentik.admin.files.manager import get_file_manager
from authentik.admin.files.usage import FileApiUsage
from authentik.admin.files.validation import validate_upload_file_name
from authentik.api.validation import validate
from authentik.core.api.used_by import DeleteAction, UsedBySerializer
from authentik.core.api.utils import PassiveSerializer, ThemedUrlsSerializer
from authentik.core.api.utils import PassiveSerializer
from authentik.events.models import Event, EventAction
from authentik.lib.utils.reflection import get_apps
from authentik.rbac.permissions import HasPermission
@@ -25,6 +26,11 @@ from authentik.rbac.permissions import HasPermission
MAX_FILE_SIZE_BYTES = 25 * 1024 * 1024 # 25MB
def get_mime_from_filename(filename: str) -> str:
mime_type, _ = mimetypes.guess_type(filename)
return mime_type or "application/octet-stream"
class FileView(APIView):
pagination_class = None
parser_classes = [MultiPartParser]
@@ -47,7 +53,6 @@ class FileView(APIView):
name = CharField()
mime_type = CharField()
url = CharField()
themed_urls = ThemedUrlsSerializer(required=False, allow_null=True)
@extend_schema(
parameters=[FileListParameters],
@@ -75,9 +80,8 @@ class FileView(APIView):
FileView.FileListSerializer(
data={
"name": file,
"url": manager.file_url(file, request),
"mime_type": get_content_type(file),
"themed_urls": manager.themed_urls(file, request),
"url": manager.file_url(file),
"mime_type": get_mime_from_filename(file),
}
)
for file in files
@@ -146,7 +150,7 @@ class FileView(APIView):
"pk": name,
"name": name,
"usage": usage.value,
"mime_type": get_content_type(name),
"mime_type": get_mime_from_filename(name),
},
).from_http(request)

View File

@@ -1,4 +1,3 @@
import mimetypes
from collections.abc import Callable, Generator, Iterator
from typing import cast
@@ -11,32 +10,6 @@ from authentik.admin.files.usage import FileUsage
CACHE_PREFIX = "goauthentik.io/admin/files"
LOGGER = get_logger()
# Theme variable placeholder for theme-specific files like logo-%(theme)s.png
THEME_VARIABLE = "%(theme)s"
def get_content_type(name: str) -> str:
"""Get MIME type for a file based on its extension."""
content_type, _ = mimetypes.guess_type(name)
return content_type or "application/octet-stream"
def get_valid_themes() -> list[str]:
"""Get valid themes that can be substituted for %(theme)s."""
from authentik.brands.api import Themes
return [t.value for t in Themes if t != Themes.AUTOMATIC]
def has_theme_variable(name: str) -> bool:
"""Check if filename contains %(theme)s variable."""
return THEME_VARIABLE in name
def substitute_theme(name: str, theme: str) -> str:
"""Replace %(theme)s with the given theme."""
return name.replace(THEME_VARIABLE, theme)
class Backend:
"""
@@ -102,29 +75,6 @@ class Backend:
"""
raise NotImplementedError
def themed_urls(
self,
name: str,
request: HttpRequest | None = None,
) -> dict[str, str] | None:
"""
Get URLs for each theme variant when filename contains %(theme)s.
Args:
name: File path potentially containing %(theme)s
request: Optional Django HttpRequest for URL building
Returns:
Dict mapping theme to URL if %(theme)s present, None otherwise
"""
if not has_theme_variable(name):
return None
return {
theme: self.file_url(substitute_theme(name, theme), request, use_cache=True)
for theme in get_valid_themes()
}
class ManageableBackend(Backend):
"""

View File

@@ -46,25 +46,3 @@ class PassthroughBackend(Backend):
) -> str:
"""Return the URL as-is for passthrough files."""
return name
def themed_urls(
self,
name: str,
request: HttpRequest | None = None,
) -> dict[str, str] | None:
"""Support themed URLs for external URLs with %(theme)s placeholder.
If the external URL contains %(theme)s, substitute it for each theme.
We can't verify that themed variants exist at the external location,
but we trust the user to provide valid URLs.
"""
from authentik.admin.files.backends.base import (
get_valid_themes,
has_theme_variable,
substitute_theme,
)
if not has_theme_variable(name):
return None
return {theme: substitute_theme(name, theme) for theme in get_valid_themes()}

View File

@@ -9,7 +9,7 @@ from botocore.exceptions import ClientError
from django.db import connection
from django.http.request import HttpRequest
from authentik.admin.files.backends.base import ManageableBackend, get_content_type
from authentik.admin.files.backends.base import ManageableBackend
from authentik.admin.files.usage import FileUsage
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import timedelta_from_string
@@ -204,7 +204,6 @@ class S3Backend(ManageableBackend):
Key=f"{self.base_path}/{name}",
Body=content,
ACL="private",
ContentType=get_content_type(name),
)
@contextmanager
@@ -220,7 +219,6 @@ class S3Backend(ManageableBackend):
Key=f"{self.base_path}/{name}",
ExtraArgs={
"ACL": "private",
"ContentType": get_content_type(name),
},
)

View File

@@ -165,31 +165,3 @@ class TestFileBackend(FileTestFileBackendMixin, TestCase):
def test_file_exists_false(self):
"""Test file_exists returns False for nonexistent file"""
self.assertFalse(self.backend.file_exists("does_not_exist.txt"))
def test_themed_urls_without_theme_variable(self):
"""Test themed_urls returns None when filename has no %(theme)s"""
file_name = "logo.png"
result = self.backend.themed_urls(file_name)
self.assertIsNone(result)
def test_themed_urls_with_theme_variable(self):
"""Test themed_urls returns dict of URLs for each theme"""
file_name = "logo-%(theme)s.png"
result = self.backend.themed_urls(file_name)
self.assertIsInstance(result, dict)
self.assertIn("light", result)
self.assertIn("dark", result)
# Check URLs contain the substituted theme
self.assertIn("logo-light.png", result["light"])
self.assertIn("logo-dark.png", result["dark"])
def test_themed_urls_multiple_theme_variables(self):
"""Test themed_urls with multiple %(theme)s in path"""
file_name = "%(theme)s/logo-%(theme)s.svg"
result = self.backend.themed_urls(file_name)
self.assertIsInstance(result, dict)
self.assertIn("light/logo-light.svg", result["light"])
self.assertIn("dark/logo-dark.svg", result["dark"])

View File

@@ -145,71 +145,3 @@ class TestS3Backend(FileTestS3BackendMixin, TestCase):
f"Bucket name '{bucket_name}' appears {bucket_occurrences} times in URL, expected 1. "
f"URL: {url}",
)
def test_themed_urls_without_theme_variable(self):
"""Test themed_urls returns None when filename has no %(theme)s"""
result = self.media_s3_backend.themed_urls("logo.png")
self.assertIsNone(result)
def test_themed_urls_with_theme_variable(self):
"""Test themed_urls returns dict of presigned URLs for each theme"""
result = self.media_s3_backend.themed_urls("logo-%(theme)s.png")
self.assertIsInstance(result, dict)
self.assertIn("light", result)
self.assertIn("dark", result)
# Check URLs are valid presigned URLs with correct file paths
self.assertIn("logo-light.png", result["light"])
self.assertIn("logo-dark.png", result["dark"])
self.assertIn("X-Amz-Signature=", result["light"])
self.assertIn("X-Amz-Signature=", result["dark"])
def test_themed_urls_multiple_theme_variables(self):
"""Test themed_urls with multiple %(theme)s in path"""
result = self.media_s3_backend.themed_urls("%(theme)s/logo-%(theme)s.svg")
self.assertIsInstance(result, dict)
self.assertIn("light/logo-light.svg", result["light"])
self.assertIn("dark/logo-dark.svg", result["dark"])
def test_save_file_sets_content_type_svg(self):
"""Test save_file sets correct ContentType for SVG files"""
self.media_s3_backend.save_file("test.svg", b"<svg></svg>")
response = self.media_s3_backend.client.head_object(
Bucket=self.media_s3_bucket_name,
Key="media/public/test.svg",
)
self.assertEqual(response["ContentType"], "image/svg+xml")
def test_save_file_sets_content_type_png(self):
"""Test save_file sets correct ContentType for PNG files"""
self.media_s3_backend.save_file("test.png", b"\x89PNG\r\n\x1a\n")
response = self.media_s3_backend.client.head_object(
Bucket=self.media_s3_bucket_name,
Key="media/public/test.png",
)
self.assertEqual(response["ContentType"], "image/png")
def test_save_file_stream_sets_content_type(self):
"""Test save_file_stream sets correct ContentType"""
with self.media_s3_backend.save_file_stream("test.css") as f:
f.write(b"body { color: red; }")
response = self.media_s3_backend.client.head_object(
Bucket=self.media_s3_bucket_name,
Key="media/public/test.css",
)
self.assertEqual(response["ContentType"], "text/css")
def test_save_file_unknown_extension_octet_stream(self):
"""Test save_file sets octet-stream for unknown extensions"""
self.media_s3_backend.save_file("test.unknownext123", b"data")
response = self.media_s3_backend.client.head_object(
Bucket=self.media_s3_bucket_name,
Key="media/public/test.unknownext123",
)
self.assertEqual(response["ContentType"], "application/octet-stream")

View File

@@ -88,28 +88,6 @@ class FileManager:
LOGGER.warning(f"Could not find file backend for file: {name}")
return ""
def themed_urls(
self,
name: str | None,
request: HttpRequest | Request | None = None,
) -> dict[str, str] | None:
"""
Get URLs for each theme variant when filename contains %(theme)s.
Returns dict mapping theme to URL if %(theme)s present, None otherwise.
"""
if not name:
return None
if isinstance(request, Request):
request = request._request
for backend in self.backends:
if backend.supports_file(name):
return backend.themed_urls(name, request)
return None
def _check_manageable(self) -> None:
if not self.manageable:
raise ImproperlyConfigured("No file management backend configured.")

View File

@@ -5,6 +5,7 @@ from io import BytesIO
from django.test import TestCase
from django.urls import reverse
from authentik.admin.files.api import get_mime_from_filename
from authentik.admin.files.manager import FileManager
from authentik.admin.files.tests.utils import FileTestFileBackendMixin
from authentik.admin.files.usage import FileUsage
@@ -93,9 +94,8 @@ class TestFileAPI(FileTestFileBackendMixin, TestCase):
self.assertIn(
{
"name": "/static/authentik/sources/ldap.png",
"url": "http://testserver/static/authentik/sources/ldap.png",
"url": "/static/authentik/sources/ldap.png",
"mime_type": "image/png",
"themed_urls": None,
},
response.data,
)
@@ -129,9 +129,8 @@ class TestFileAPI(FileTestFileBackendMixin, TestCase):
self.assertIn(
{
"name": "/static/authentik/sources/ldap.png",
"url": "http://testserver/static/authentik/sources/ldap.png",
"url": "/static/authentik/sources/ldap.png",
"mime_type": "image/png",
"themed_urls": None,
},
response.data,
)
@@ -201,64 +200,30 @@ class TestFileAPI(FileTestFileBackendMixin, TestCase):
self.assertEqual(response.status_code, 400)
self.assertIn("field is required", str(response.data))
def test_list_files_includes_themed_urls_none(self):
"""Test listing files includes themed_urls as None for non-themed files"""
manager = FileManager(FileUsage.MEDIA)
file_name = "test-no-theme.png"
manager.save_file(file_name, b"test content")
response = self.client.get(
reverse("authentik_api:files", query={"search": file_name, "manageableOnly": "true"})
)
class TestGetMimeFromFilename(TestCase):
"""Test get_mime_from_filename function"""
self.assertEqual(response.status_code, 200)
file_entry = next((f for f in response.data if f["name"] == file_name), None)
self.assertIsNotNone(file_entry)
self.assertIn("themed_urls", file_entry)
self.assertIsNone(file_entry["themed_urls"])
def test_image_png(self):
"""Test PNG image MIME type"""
self.assertEqual(get_mime_from_filename("test.png"), "image/png")
manager.delete_file(file_name)
def test_image_jpeg(self):
"""Test JPEG image MIME type"""
self.assertEqual(get_mime_from_filename("test.jpg"), "image/jpeg")
def test_list_files_includes_themed_urls_dict(self):
"""Test listing files includes themed_urls as dict for themed files"""
manager = FileManager(FileUsage.MEDIA)
file_name = "logo-%(theme)s.svg"
manager.save_file("logo-light.svg", b"<svg>light</svg>")
manager.save_file("logo-dark.svg", b"<svg>dark</svg>")
manager.save_file(file_name, b"<svg>placeholder</svg>")
def test_image_svg(self):
"""Test SVG image MIME type"""
self.assertEqual(get_mime_from_filename("test.svg"), "image/svg+xml")
response = self.client.get(
reverse("authentik_api:files", query={"search": "%(theme)s", "manageableOnly": "true"})
)
def test_text_plain(self):
"""Test text file MIME type"""
self.assertEqual(get_mime_from_filename("test.txt"), "text/plain")
self.assertEqual(response.status_code, 200)
file_entry = next((f for f in response.data if f["name"] == file_name), None)
self.assertIsNotNone(file_entry)
self.assertIn("themed_urls", file_entry)
self.assertIsInstance(file_entry["themed_urls"], dict)
self.assertIn("light", file_entry["themed_urls"])
self.assertIn("dark", file_entry["themed_urls"])
def test_unknown_extension(self):
"""Test unknown extension returns octet-stream"""
self.assertEqual(get_mime_from_filename("test.unknown"), "application/octet-stream")
manager.delete_file(file_name)
manager.delete_file("logo-light.svg")
manager.delete_file("logo-dark.svg")
def test_upload_file_with_theme_variable(self):
"""Test uploading file with %(theme)s in name"""
manager = FileManager(FileUsage.MEDIA)
file_name = "brand-logo-%(theme)s.svg"
file_content = b"<svg></svg>"
response = self.client.post(
reverse("authentik_api:files"),
{
"file": BytesIO(file_content),
"name": file_name,
"usage": FileUsage.MEDIA.value,
},
format="multipart",
)
self.assertEqual(response.status_code, 200)
self.assertTrue(manager.file_exists(file_name))
manager.delete_file(file_name)
def test_no_extension(self):
"""Test no extension returns octet-stream"""
self.assertEqual(get_mime_from_filename("test"), "application/octet-stream")

View File

@@ -1,7 +1,6 @@
"""Test file service layer"""
from unittest import skipUnless
from urllib.parse import urlparse
from django.http import HttpRequest
from django.test import TestCase
@@ -105,71 +104,3 @@ class TestResolveFileUrlS3Backend(FileTestS3BackendMixin, TestCase):
# S3 URLs should be returned as-is (already absolute)
self.assertTrue(result.startswith("http://s3.test:8080/test"))
class TestThemedUrls(FileTestFileBackendMixin, TestCase):
"""Test FileManager.themed_urls method"""
def test_themed_urls_none_path(self):
"""Test themed_urls returns None for None path"""
manager = FileManager(FileUsage.MEDIA)
result = manager.themed_urls(None)
self.assertIsNone(result)
def test_themed_urls_empty_path(self):
"""Test themed_urls returns None for empty path"""
manager = FileManager(FileUsage.MEDIA)
result = manager.themed_urls("")
self.assertIsNone(result)
def test_themed_urls_no_theme_variable(self):
"""Test themed_urls returns None when no %(theme)s in path"""
manager = FileManager(FileUsage.MEDIA)
result = manager.themed_urls("logo.png")
self.assertIsNone(result)
def test_themed_urls_with_theme_variable(self):
"""Test themed_urls returns dict of URLs for each theme"""
manager = FileManager(FileUsage.MEDIA)
result = manager.themed_urls("logo-%(theme)s.png")
self.assertIsInstance(result, dict)
self.assertIn("light", result)
self.assertIn("dark", result)
self.assertIn("logo-light.png", result["light"])
self.assertIn("logo-dark.png", result["dark"])
def test_themed_urls_with_request(self):
"""Test themed_urls builds absolute URLs with request"""
mock_request = HttpRequest()
mock_request.META = {
"HTTP_HOST": "example.com",
"SERVER_NAME": "example.com",
}
manager = FileManager(FileUsage.MEDIA)
result = manager.themed_urls("logo-%(theme)s.svg", mock_request)
self.assertIsInstance(result, dict)
light_url = urlparse(result["light"])
dark_url = urlparse(result["dark"])
self.assertEqual(light_url.scheme, "http")
self.assertEqual(light_url.netloc, "example.com")
self.assertEqual(dark_url.scheme, "http")
self.assertEqual(dark_url.netloc, "example.com")
def test_themed_urls_passthrough_with_theme_variable(self):
"""Test themed_urls returns dict for passthrough URLs with %(theme)s"""
manager = FileManager(FileUsage.MEDIA)
# External URLs with %(theme)s should return themed URLs
result = manager.themed_urls("https://example.com/logo-%(theme)s.png")
self.assertIsInstance(result, dict)
self.assertEqual(result["light"], "https://example.com/logo-light.png")
self.assertEqual(result["dark"], "https://example.com/logo-dark.png")
def test_themed_urls_passthrough_without_theme_variable(self):
"""Test themed_urls returns None for passthrough URLs without %(theme)s"""
manager = FileManager(FileUsage.MEDIA)
# External URLs without %(theme)s should return None
result = manager.themed_urls("https://example.com/logo.png")
self.assertIsNone(result)

View File

@@ -4,7 +4,6 @@ from pathlib import PurePosixPath
from django.core.exceptions import ValidationError
from django.utils.translation import gettext as _
from authentik.admin.files.backends.base import THEME_VARIABLE
from authentik.admin.files.backends.passthrough import PassthroughBackend
from authentik.admin.files.backends.static import StaticBackend
from authentik.admin.files.usage import FileUsage
@@ -13,6 +12,10 @@ from authentik.admin.files.usage import FileUsage
MAX_FILE_NAME_LENGTH = 1024
MAX_PATH_COMPONENT_LENGTH = 255
# Theme variable placeholder that can be used in file paths
# This allows for theme-specific files like logo-%(theme)s.png
THEME_VARIABLE = "%(theme)s"
def validate_file_name(name: str) -> None:
if PassthroughBackend(FileUsage.MEDIA).supports_file(name) or StaticBackend(
@@ -41,16 +44,16 @@ def validate_upload_file_name(
raise ValidationError(_("File name cannot be empty"))
# Allow %(theme)s placeholder for theme-specific files
# Replace with placeholder for validation, then check the result
# We temporarily replace it for validation, then check the result
name_for_validation = name.replace(THEME_VARIABLE, "theme")
# Same regex is used in the frontend as well (with %(theme)s handling)
# Same regex is used in the frontend as well (without %(theme)s handling there)
if not re.match(r"^[a-zA-Z0-9._/-]+$", name_for_validation):
raise ValidationError(
_(
"File name can only contain letters (a-z, A-Z), numbers (0-9), "
"dots (.), hyphens (-), underscores (_), forward slashes (/), "
"and the placeholder %(theme)s for theme-specific files"
"and the special placeholder %(theme)s for theme-specific files"
)
)

View File

@@ -13,10 +13,10 @@ from rest_framework.exceptions import AuthenticationFailed
from rest_framework.request import Request
from structlog.stdlib import get_logger
from authentik.common.oauth.constants import SCOPE_AUTHENTIK_API
from authentik.core.middleware import CTX_AUTH_VIA
from authentik.core.models import Token, TokenIntents, User, UserTypes
from authentik.outposts.models import Outpost
from authentik.providers.oauth2.constants import SCOPE_AUTHENTIK_API
LOGGER = get_logger()
_tmp = Path(gettempdir())

View File

@@ -11,12 +11,12 @@ from rest_framework.exceptions import AuthenticationFailed
from authentik.api.authentication import IPCUser, TokenAuthentication
from authentik.blueprints.tests import reconcile_app
from authentik.common.oauth.constants import SCOPE_AUTHENTIK_API
from authentik.core.models import Token, TokenIntents, UserTypes
from authentik.core.tests.utils import create_test_admin_user, create_test_flow
from authentik.lib.generators import generate_id
from authentik.outposts.apps import MANAGED_OUTPOST
from authentik.outposts.models import Outpost
from authentik.providers.oauth2.constants import SCOPE_AUTHENTIK_API
from authentik.providers.oauth2.models import AccessToken, OAuth2Provider

View File

@@ -1,8 +1,6 @@
"""Schema generation tests"""
from pathlib import Path
from tempfile import gettempdir
from uuid import uuid4
from django.core.management import call_command
from django.urls import reverse
@@ -31,14 +29,15 @@ class TestSchemaGeneration(APITestCase):
def test_build_schema(self):
"""Test schema build command"""
tmp = Path(gettempdir())
blueprint_file = tmp / f"{str(uuid4())}.json"
api_file = tmp / f"{str(uuid4())}.yml"
blueprint_file = Path("blueprints/schema.json")
api_file = Path("schema.yml")
blueprint_file.unlink()
api_file.unlink()
with (
CONFIG.patch("debug", True),
CONFIG.patch("tenants.enabled", True),
CONFIG.patch("outposts.disable_embedded_outpost", True),
):
call_command("build_schema", blueprint_file=blueprint_file, api_file=api_file)
call_command("build_schema")
self.assertTrue(blueprint_file.exists())
self.assertTrue(api_file.exists())

View File

@@ -18,7 +18,7 @@ entries:
name: foo
title: foo
permissions:
- permission: authentik_flows.view_flow
- permission: view_flow
user: !KeyOf user
- permission: authentik_flows.view_flow
- permission: view_flow
role: !KeyOf role

View File

@@ -43,6 +43,8 @@ def get_attrs(obj: SerializerModel) -> dict[str, Any]:
continue
if _field.read_only:
data.pop(field_name, None)
if _field.get_initial() == data.get(field_name, None):
data.pop(field_name, None)
if field_name.endswith("_set"):
data.pop(field_name, None)
return data

View File

@@ -15,7 +15,7 @@ from django.db.models import Model
from django.db.models.query_utils import Q
from django.db.transaction import atomic
from django.db.utils import IntegrityError
from guardian.models import RoleObjectPermission
from guardian.models import RoleObjectPermission, UserObjectPermission
from rest_framework.exceptions import ValidationError
from rest_framework.serializers import BaseSerializer, Serializer
from structlog.stdlib import BoundLogger, get_logger
@@ -41,6 +41,7 @@ from authentik.core.models import (
UserSourceConnection,
)
from authentik.endpoints.models import Connector
from authentik.enterprise.license import LicenseKey
from authentik.events.logs import LogEvent, capture_logs
from authentik.events.utils import cleanse_dict
from authentik.flows.models import Stage
@@ -70,6 +71,7 @@ def excluded_models() -> list[type[Model]]:
ContentType,
Permission,
RoleObjectPermission,
UserObjectPermission,
# Base classes
Provider,
Source,
@@ -139,19 +141,10 @@ class Importer:
def default_context(self):
"""Default context"""
context = {
return {
"goauthentik.io/enterprise/licensed": LicenseKey.get_total().status().is_valid,
"goauthentik.io/rbac/models": rbac_models(),
"goauthentik.io/enterprise/licensed": False,
}
try:
from authentik.enterprise.license import LicenseKey
context["goauthentik.io/enterprise/licensed"] = (
LicenseKey.get_total().status().is_valid,
)
except ModuleNotFoundError:
pass
return context
@staticmethod
def from_string(yaml_input: str, context: dict | None = None) -> Importer:

View File

@@ -6,12 +6,7 @@ from django.db import models
from drf_spectacular.utils import extend_schema, extend_schema_field
from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError
from rest_framework.fields import (
CharField,
ChoiceField,
ListField,
SerializerMethodField,
)
from rest_framework.fields import CharField, ChoiceField, ListField, SerializerMethodField
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.permissions import AllowAny
from rest_framework.request import Request
@@ -21,7 +16,7 @@ from rest_framework.viewsets import ModelViewSet
from authentik.brands.models import Brand
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import ModelSerializer, PassiveSerializer, ThemedUrlsSerializer
from authentik.core.api.utils import ModelSerializer, PassiveSerializer
from authentik.rbac.filters import SecretKeyFilter
from authentik.tenants.api.settings import FlagJSONField
from authentik.tenants.flags import Flag
@@ -95,9 +90,7 @@ class CurrentBrandSerializer(PassiveSerializer):
matched_domain = CharField(source="domain")
branding_title = CharField()
branding_logo = CharField(source="branding_logo_url")
branding_logo_themed_urls = ThemedUrlsSerializer(read_only=True, allow_null=True)
branding_favicon = CharField(source="branding_favicon_url")
branding_favicon_themed_urls = ThemedUrlsSerializer(read_only=True, allow_null=True)
branding_custom_css = CharField()
ui_footer_links = ListField(
child=FooterLinkSerializer(),

View File

@@ -89,26 +89,14 @@ class Brand(SerializerModel):
"""Get branding_logo URL"""
return get_file_manager(FileUsage.MEDIA).file_url(self.branding_logo)
def branding_logo_themed_urls(self) -> dict[str, str] | None:
"""Get themed URLs for branding_logo if it contains %(theme)s"""
return get_file_manager(FileUsage.MEDIA).themed_urls(self.branding_logo)
def branding_favicon_url(self) -> str:
"""Get branding_favicon URL"""
return get_file_manager(FileUsage.MEDIA).file_url(self.branding_favicon)
def branding_favicon_themed_urls(self) -> dict[str, str] | None:
"""Get themed URLs for branding_favicon if it contains %(theme)s"""
return get_file_manager(FileUsage.MEDIA).themed_urls(self.branding_favicon)
def branding_default_flow_background_url(self) -> str:
"""Get branding_default_flow_background URL"""
return get_file_manager(FileUsage.MEDIA).file_url(self.branding_default_flow_background)
def branding_default_flow_background_themed_urls(self) -> dict[str, str] | None:
"""Get themed URLs for branding_default_flow_background if it contains %(theme)s"""
return get_file_manager(FileUsage.MEDIA).themed_urls(self.branding_default_flow_background)
@property
def serializer(self) -> type[Serializer]:
from authentik.brands.api import BrandSerializer

View File

@@ -6,6 +6,7 @@ from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.blueprints.tests import apply_blueprint
from authentik.brands.api import Themes
from authentik.brands.models import Brand
from authentik.core.models import Application
from authentik.core.tests.utils import create_test_admin_user, create_test_brand
@@ -32,14 +33,12 @@ class TestBrands(APITestCase):
self.client.get(reverse("authentik_api:brand-current")).content.decode(),
{
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
"branding_logo_themed_urls": None,
"branding_favicon": "/static/dist/assets/icons/icon.png",
"branding_favicon_themed_urls": None,
"branding_title": "authentik",
"branding_custom_css": "",
"matched_domain": brand.domain,
"ui_footer_links": [],
"ui_theme": "automatic",
"ui_theme": Themes.AUTOMATIC,
"default_locale": "",
"flags": self.default_flags,
},
@@ -54,14 +53,12 @@ class TestBrands(APITestCase):
).content.decode(),
{
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
"branding_logo_themed_urls": None,
"branding_favicon": "/static/dist/assets/icons/icon.png",
"branding_favicon_themed_urls": None,
"branding_title": "custom",
"branding_custom_css": "",
"matched_domain": "bar.baz",
"ui_footer_links": [],
"ui_theme": "automatic",
"ui_theme": Themes.AUTOMATIC,
"default_locale": "",
"flags": self.default_flags,
},
@@ -73,14 +70,12 @@ class TestBrands(APITestCase):
self.client.get(reverse("authentik_api:brand-current")).content.decode(),
{
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
"branding_logo_themed_urls": None,
"branding_favicon": "/static/dist/assets/icons/icon.png",
"branding_favicon_themed_urls": None,
"branding_title": "authentik",
"branding_custom_css": "",
"matched_domain": "fallback",
"ui_footer_links": [],
"ui_theme": "automatic",
"ui_theme": Themes.AUTOMATIC,
"default_locale": "",
"flags": self.default_flags,
},
@@ -97,14 +92,12 @@ class TestBrands(APITestCase):
response,
{
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
"branding_logo_themed_urls": None,
"branding_favicon": "/static/dist/assets/icons/icon.png",
"branding_favicon_themed_urls": None,
"branding_title": "authentik",
"branding_custom_css": "",
"matched_domain": "authentik-default",
"ui_footer_links": [],
"ui_theme": "automatic",
"ui_theme": Themes.AUTOMATIC,
"default_locale": "",
"flags": self.default_flags,
},
@@ -122,14 +115,12 @@ class TestBrands(APITestCase):
response,
{
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
"branding_logo_themed_urls": None,
"branding_favicon": "/static/dist/assets/icons/icon.png",
"branding_favicon_themed_urls": None,
"branding_title": "authentik",
"branding_custom_css": "",
"matched_domain": "authentik-default",
"ui_footer_links": [],
"ui_theme": "automatic",
"ui_theme": Themes.AUTOMATIC,
"default_locale": "",
"flags": self.default_flags,
},
@@ -140,14 +131,12 @@ class TestBrands(APITestCase):
).content.decode(),
{
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
"branding_logo_themed_urls": None,
"branding_favicon": "/static/dist/assets/icons/icon.png",
"branding_favicon_themed_urls": None,
"branding_title": "custom",
"branding_custom_css": "",
"matched_domain": "bar.baz",
"ui_footer_links": [],
"ui_theme": "automatic",
"ui_theme": Themes.AUTOMATIC,
"default_locale": "",
"flags": self.default_flags,
},
@@ -163,14 +152,12 @@ class TestBrands(APITestCase):
).content.decode(),
{
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
"branding_logo_themed_urls": None,
"branding_favicon": "/static/dist/assets/icons/icon.png",
"branding_favicon_themed_urls": None,
"branding_title": "custom-strong",
"branding_custom_css": "",
"matched_domain": "foo.bar.baz",
"ui_footer_links": [],
"ui_theme": "automatic",
"ui_theme": Themes.AUTOMATIC,
"default_locale": "",
"flags": self.default_flags,
},
@@ -186,14 +173,12 @@ class TestBrands(APITestCase):
).content.decode(),
{
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
"branding_logo_themed_urls": None,
"branding_favicon": "/static/dist/assets/icons/icon.png",
"branding_favicon_themed_urls": None,
"branding_title": "custom-weak",
"branding_custom_css": "",
"matched_domain": "bar.baz",
"ui_footer_links": [],
"ui_theme": "automatic",
"ui_theme": Themes.AUTOMATIC,
"default_locale": "",
"flags": self.default_flags,
},
@@ -269,14 +254,12 @@ class TestBrands(APITestCase):
self.client.get(reverse("authentik_api:brand-current")).content.decode(),
{
"branding_logo": "https://goauthentik.io/img/icon.png",
"branding_logo_themed_urls": None,
"branding_favicon": "https://goauthentik.io/img/icon.png",
"branding_favicon_themed_urls": None,
"branding_title": "authentik",
"branding_custom_css": "",
"matched_domain": brand.domain,
"ui_footer_links": [],
"ui_theme": "automatic",
"ui_theme": Themes.AUTOMATIC,
"default_locale": "",
"flags": self.default_flags,
},

View File

@@ -3,7 +3,7 @@
from typing import Any
from django.db.models import Case, F, IntegerField, Q, Value, When
from django.db.models.functions import Concat, Length
from django.db.models.functions import Length
from django.http.request import HttpRequest
from django.utils.html import _json_script_escapes
from django.utils.safestring import mark_safe
@@ -26,8 +26,7 @@ def get_brand_for_request(request: HttpRequest) -> Brand:
domain_length=Length("domain"),
match_priority=Case(
When(
condition=Q(host_domain__iexact=F("domain"))
| Q(host_domain__iendswith=Concat(Value("."), F("domain"))),
condition=Q(host_domain__iendswith=F("domain")),
then=F("domain_length"),
),
default=Value(-1),

View File

@@ -24,7 +24,7 @@ from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
from authentik.core.api.providers import ProviderSerializer
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.users import UserSerializer
from authentik.core.api.utils import ModelSerializer, ThemedUrlsSerializer
from authentik.core.api.utils import ModelSerializer
from authentik.core.models import Application, User
from authentik.events.logs import LogEventSerializer, capture_logs
from authentik.policies.api.exec import PolicyTestResultSerializer
@@ -53,9 +53,6 @@ class ApplicationSerializer(ModelSerializer):
)
meta_icon_url = ReadOnlyField(source="get_meta_icon")
meta_icon_themed_urls = ThemedUrlsSerializer(
source="get_meta_icon_themed_urls", read_only=True, allow_null=True
)
def get_launch_url(self, app: Application) -> str | None:
"""Allow formatting of launch URL"""
@@ -66,7 +63,7 @@ class ApplicationSerializer(ModelSerializer):
user = self.context["request"].user
# Cache serialized user data to avoid N+1 when formatting launch URLs
# for multiple applications. UserSerializer accesses user.groups which
# for multiple applications. UserSerializer accesses user.ak_groups which
# would otherwise trigger a query for each application.
if user is not None:
if "_cached_user_data" not in self.context:
@@ -105,7 +102,6 @@ class ApplicationSerializer(ModelSerializer):
"meta_launch_url",
"meta_icon",
"meta_icon_url",
"meta_icon_themed_urls",
"meta_description",
"meta_publisher",
"policy_engine_mode",

View File

@@ -16,15 +16,11 @@ from rest_framework.viewsets import ViewSet
from authentik.api.validation import validate
from authentik.core.api.users import ParamUserSerializer
from authentik.core.api.utils import MetaNameSerializer
from authentik.enterprise.stages.authenticator_endpoint_gdtc.models import EndpointDevice
from authentik.stages.authenticator import device_classes, devices_for_user
from authentik.stages.authenticator.models import Device
from authentik.stages.authenticator_webauthn.models import WebAuthnDevice
try:
from authentik.enterprise.stages.authenticator_endpoint_gdtc.models import EndpointDevice
except ModuleNotFoundError:
EndpointDevice = None
class DeviceSerializer(MetaNameSerializer):
"""Serializer for authenticator devices"""
@@ -47,7 +43,7 @@ class DeviceSerializer(MetaNameSerializer):
"""Get extra description"""
if isinstance(instance, WebAuthnDevice):
return instance.device_type.description if instance.device_type else None
if EndpointDevice and isinstance(instance, EndpointDevice):
if isinstance(instance, EndpointDevice):
return instance.data.get("deviceSignals", {}).get("deviceModel")
return None
@@ -55,7 +51,7 @@ class DeviceSerializer(MetaNameSerializer):
"""Get external Device ID"""
if isinstance(instance, WebAuthnDevice):
return instance.device_type.aaguid if instance.device_type else None
if EndpointDevice and isinstance(instance, EndpointDevice):
if isinstance(instance, EndpointDevice):
return instance.data.get("deviceSignals", {}).get("deviceModel")
return None

View File

@@ -10,6 +10,7 @@ from rest_framework.request import Request
from rest_framework.response import Response
from authentik.core.api.utils import PassiveSerializer
from authentik.enterprise.apps import EnterpriseConfig
from authentik.lib.models import DeprecatedMixin
from authentik.lib.utils.reflection import all_subclasses
@@ -60,25 +61,19 @@ class TypesMixin:
continue
instance = subclass()
try:
type_signature = {
"name": subclass._meta.verbose_name,
"description": subclass.__doc__,
"component": instance.component,
"model_name": subclass._meta.model_name,
"icon_url": getattr(instance, "icon_url", None),
"requires_enterprise": False,
"deprecated": isinstance(instance, DeprecatedMixin),
}
try:
from authentik.enterprise.apps import EnterpriseConfig
type_signature["requires_enterprise"] = isinstance(
subclass._meta.app_config, EnterpriseConfig
)
except ModuleNotFoundError:
pass
data.append(type_signature)
data.append(
{
"name": subclass._meta.verbose_name,
"description": subclass.__doc__,
"component": instance.component,
"model_name": subclass._meta.model_name,
"icon_url": getattr(instance, "icon_url", None),
"requires_enterprise": isinstance(
subclass._meta.app_config, EnterpriseConfig
),
"deprecated": isinstance(instance, DeprecatedMixin),
}
)
except NotImplementedError:
continue
if additional:

View File

@@ -18,14 +18,10 @@ from authentik.core.models import Provider
class ProviderSerializer(ModelSerializer, MetaNameSerializer):
"""Provider Serializer"""
assigned_application_slug = ReadOnlyField(source="application.slug", allow_null=True)
assigned_application_name = ReadOnlyField(source="application.name", allow_null=True)
assigned_backchannel_application_slug = ReadOnlyField(
source="backchannel_application.slug", allow_null=True
)
assigned_backchannel_application_name = ReadOnlyField(
source="backchannel_application.name", allow_null=True
)
assigned_application_slug = ReadOnlyField(source="application.slug")
assigned_application_name = ReadOnlyField(source="application.name")
assigned_backchannel_application_slug = ReadOnlyField(source="backchannel_application.slug")
assigned_backchannel_application_name = ReadOnlyField(source="backchannel_application.name")
component = SerializerMethodField()

View File

@@ -14,7 +14,7 @@ from structlog.stdlib import get_logger
from authentik.core.api.object_types import TypesMixin
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import MetaNameSerializer, ModelSerializer, ThemedUrlsSerializer
from authentik.core.api.utils import MetaNameSerializer, ModelSerializer
from authentik.core.models import GroupSourceConnection, Source, UserSourceConnection
from authentik.core.types import UserSettingSerializer
from authentik.policies.engine import PolicyEngine
@@ -28,7 +28,6 @@ class SourceSerializer(ModelSerializer, MetaNameSerializer):
managed = ReadOnlyField()
component = SerializerMethodField()
icon_url = ReadOnlyField()
icon_themed_urls = ThemedUrlsSerializer(read_only=True, allow_null=True)
def get_component(self, obj: Source) -> str:
"""Get object component so that we know how to edit the object"""
@@ -58,7 +57,6 @@ class SourceSerializer(ModelSerializer, MetaNameSerializer):
"user_path_template",
"icon",
"icon_url",
"icon_themed_urls",
]

View File

@@ -75,8 +75,7 @@ class TokenSerializer(ManagedSerializer, ModelSerializer):
except ValueError:
pass
expires = attrs.get("expires")
if expires is not None and expires > max_token_lifetime_dt:
if "expires" in attrs and attrs.get("expires") > max_token_lifetime_dt:
raise ValidationError(
{
"expires": (

View File

@@ -30,6 +30,7 @@ from drf_spectacular.utils import (
extend_schema_field,
inline_serializer,
)
from guardian.shortcuts import get_objects_for_user
from rest_framework.authentication import SessionAuthentication
from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError
@@ -41,7 +42,6 @@ from rest_framework.fields import (
IntegerField,
ListField,
SerializerMethodField,
UUIDField,
)
from rest_framework.permissions import IsAuthenticated
from rest_framework.request import Request
@@ -72,14 +72,12 @@ from authentik.core.middleware import (
from authentik.core.models import (
USER_ATTRIBUTE_TOKEN_EXPIRING,
USER_PATH_SERVICE_ACCOUNT,
USERNAME_MAX_LENGTH,
Group,
Session,
Token,
TokenIntents,
User,
UserTypes,
default_token_duration,
)
from authentik.endpoints.connectors.agent.auth import AgentAuth
from authentik.events.models import Event, EventAction
@@ -89,7 +87,6 @@ from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner
from authentik.flows.views.executor import QS_KEY_TOKEN
from authentik.lib.avatars import get_avatar
from authentik.lib.utils.reflection import ConditionalInheritance
from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator
from authentik.rbac.api.roles import RoleSerializer
from authentik.rbac.decorators import permission_required
from authentik.rbac.models import Role, get_permission_choices
@@ -132,6 +129,7 @@ class UserSerializer(ModelSerializer):
groups = PrimaryKeyRelatedField(
allow_empty=True,
many=True,
source="ak_groups",
queryset=Group.objects.all().order_by("name"),
default=list,
)
@@ -145,7 +143,7 @@ class UserSerializer(ModelSerializer):
roles_obj = SerializerMethodField(allow_null=True)
uid = CharField(read_only=True)
username = CharField(
max_length=USERNAME_MAX_LENGTH,
max_length=150,
validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))],
)
@@ -167,7 +165,7 @@ class UserSerializer(ModelSerializer):
def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None:
if not self._should_include_groups:
return None
return PartialGroupSerializer(instance.groups, many=True).data
return PartialGroupSerializer(instance.ak_groups, many=True).data
@extend_schema_field(RoleSerializer(many=True))
def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None:
@@ -241,14 +239,14 @@ class UserSerializer(ModelSerializer):
and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT
and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value
):
raise ValidationError(_("Can't change internal service account to other user type."))
raise ValidationError("Can't change internal service account to other user type.")
if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value:
raise ValidationError(_("Setting a user to internal service account is not allowed."))
raise ValidationError("Setting a user to internal service account is not allowed.")
return user_type
def validate(self, attrs: dict) -> dict:
if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT:
raise ValidationError(_("Can't modify internal service account users"))
raise ValidationError("Can't modify internal service account users")
return super().validate(attrs)
class Meta:
@@ -400,18 +398,6 @@ class UserServiceAccountSerializer(PassiveSerializer):
)
class UserRecoveryLinkSerializer(PassiveSerializer):
"""Payload to create a recovery link"""
token_duration = CharField(required=False)
class UserRecoveryEmailSerializer(UserRecoveryLinkSerializer):
"""Payload to create and email a recovery link"""
email_stage = UUIDField()
class UsersFilter(FilterSet):
"""Filter for users"""
@@ -435,7 +421,7 @@ class UsersFilter(FilterSet):
last_login__gt = IsoDateTimeFilter(field_name="last_login", lookup_expr="gt")
last_login__isnull = BooleanFilter(field_name="last_login", lookup_expr="isnull")
is_superuser = BooleanFilter(field_name="groups", method="filter_is_superuser")
is_superuser = BooleanFilter(field_name="ak_groups", method="filter_is_superuser")
uuid = UUIDFilter(field_name="uuid")
path = CharFilter(field_name="path")
@@ -444,12 +430,12 @@ class UsersFilter(FilterSet):
type = MultipleChoiceFilter(choices=UserTypes.choices, field_name="type")
groups_by_name = ModelMultipleChoiceFilter(
field_name="groups__name",
field_name="ak_groups__name",
to_field_name="name",
queryset=Group.objects.all().order_by("name"),
)
groups_by_pk = ModelMultipleChoiceFilter(
field_name="groups",
field_name="ak_groups",
queryset=Group.objects.all().order_by("name"),
)
@@ -465,22 +451,22 @@ class UsersFilter(FilterSet):
def filter_is_superuser(self, queryset, name, value):
if value:
return queryset.filter(groups__is_superuser=True).distinct()
return queryset.exclude(groups__is_superuser=True).distinct()
return queryset.filter(ak_groups__is_superuser=True).distinct()
return queryset.exclude(ak_groups__is_superuser=True).distinct()
def filter_attributes(self, queryset, name, value):
"""Filter attributes by query args"""
try:
value = loads(value)
except ValueError:
raise ValidationError(_("filter: failed to parse JSON")) from None
raise ValidationError(detail="filter: failed to parse JSON") from None
if not isinstance(value, dict):
raise ValidationError(_("filter: value must be key:value mapping"))
raise ValidationError(detail="filter: value must be key:value mapping")
qs = {}
for key, _value in value.items():
qs[f"attributes__{key}"] = _value
try:
__ = len(queryset.filter(**qs))
_ = len(queryset.filter(**qs))
return queryset.filter(**qs)
except ValueError:
return queryset
@@ -544,7 +530,7 @@ class UserViewSet(
def get_queryset(self):
base_qs = User.objects.all().exclude_anonymous()
if self.serializer_class(context={"request": self.request})._should_include_groups:
base_qs = base_qs.prefetch_related("groups")
base_qs = base_qs.prefetch_related("ak_groups")
if self.serializer_class(context={"request": self.request})._should_include_roles:
base_qs = base_qs.prefetch_related("roles")
return base_qs
@@ -558,16 +544,14 @@ class UserViewSet(
def list(self, request, *args, **kwargs):
return super().list(request, *args, **kwargs)
def _create_recovery_link(
self, token_duration: str | None, for_email=False
) -> tuple[str, Token]:
def _create_recovery_link(self, for_email=False) -> tuple[str, Token]:
"""Create a recovery link (when the current brand has a recovery flow set),
that can either be shown to an admin or sent to the user directly"""
brand: Brand = self.request.brand
brand: Brand = self.request._request.brand
# Check that there is a recovery flow, if not return an error
flow = brand.flow_recovery
if not flow:
raise ValidationError({"non_field_errors": _("No recovery flow set.")})
raise ValidationError({"non_field_errors": "No recovery flow set."})
user: User = self.get_object()
planner = FlowPlanner(flow)
planner.allow_empty_flows = True
@@ -581,15 +565,11 @@ class UserViewSet(
)
except FlowNonApplicableException:
raise ValidationError(
{"non_field_errors": _("Recovery flow not applicable to user")}
{"non_field_errors": "Recovery flow not applicable to user"}
) from None
_plan = FlowToken.pickle(plan)
if for_email:
_plan = pickle_flow_token_for_email(plan)
expires = default_token_duration()
if token_duration:
timedelta_string_validator(token_duration)
expires = now() + timedelta_from_string(token_duration)
token, __ = FlowToken.objects.update_or_create(
identifier=f"{user.uid}-password-reset",
defaults={
@@ -597,7 +577,6 @@ class UserViewSet(
"flow": flow,
"_plan": _plan,
"revoke_on_execution": not for_email,
"expires": expires,
},
)
querystring = urlencode({QS_KEY_TOKEN: token.key})
@@ -745,60 +724,60 @@ class UserViewSet(
@permission_required("authentik_core.reset_user_password")
@extend_schema(
request=UserRecoveryLinkSerializer,
responses={
"200": LinkSerializer(many=False),
},
request=None,
)
@action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
@validate(UserRecoveryLinkSerializer)
def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response:
def recovery(self, request: Request, pk: int) -> Response:
"""Create a temporary link that a user can use to recover their account"""
link, _ = self._create_recovery_link(
token_duration=body.validated_data.get("token_duration")
)
link, _ = self._create_recovery_link()
return Response({"link": link})
@permission_required("authentik_core.reset_user_password")
@extend_schema(
request=UserRecoveryEmailSerializer,
parameters=[
OpenApiParameter(
name="email_stage",
location=OpenApiParameter.QUERY,
type=OpenApiTypes.STR,
required=True,
)
],
responses={
"204": OpenApiResponse(description="Successfully sent recover email"),
},
request=None,
)
@action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
@validate(UserRecoveryEmailSerializer)
def recovery_email(
self, request: Request, pk: int, body: UserRecoveryEmailSerializer
) -> Response:
def recovery_email(self, request: Request, pk: int) -> Response:
"""Send an email with a temporary link that a user can use to recover their account"""
email_error_message = _("User does not have an email address set.")
stage_error_message = _("Email stage not found.")
user: User = self.get_object()
if not user.email:
for_user: User = self.get_object()
if for_user.email == "":
LOGGER.debug("User doesn't have an email address")
raise ValidationError({"non_field_errors": email_error_message})
if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()):
LOGGER.debug("Email stage does not exist")
raise ValidationError({"non_field_errors": stage_error_message})
if not request.user.has_perm("authentik_stages_email.view_emailstage", stage):
LOGGER.debug("User has no view access to email stage")
raise ValidationError({"non_field_errors": stage_error_message})
link, token = self._create_recovery_link(
token_duration=body.validated_data.get("token_duration"), for_email=True
)
raise ValidationError({"non_field_errors": "User does not have an email address set."})
link, token = self._create_recovery_link(for_email=True)
# Lookup the email stage to assure the current user can access it
stages = get_objects_for_user(
request.user, "authentik_stages_email.view_emailstage"
).filter(pk=request.query_params.get("email_stage"))
if not stages.exists():
LOGGER.debug("Email stage does not exist/user has no permissions")
raise ValidationError({"non_field_errors": "Email stage does not exist."})
email_stage: EmailStage = stages.first()
message = TemplateEmailMessage(
subject=_(stage.subject),
to=[(user.name, user.email)],
template_name=stage.template,
language=user.locale(request),
subject=_(email_stage.subject),
to=[(for_user.name, for_user.email)],
template_name=email_stage.template,
language=for_user.locale(request),
template_context={
"url": link,
"user": user,
"user": for_user,
"expires": token.expires,
},
)
send_mails(stage, message)
send_mails(email_stage, message)
return Response(status=204)
@permission_required("authentik_core.impersonate")

View File

@@ -127,10 +127,3 @@ class LinkSerializer(PassiveSerializer):
"""Returns a single link"""
link = CharField()
class ThemedUrlsSerializer(PassiveSerializer):
"""Themed URLs - maps theme names to URLs for light and dark themes"""
light = CharField(required=False, allow_null=True)
dark = CharField(required=False, allow_null=True)

View File

@@ -1,47 +0,0 @@
# Generated by Django 5.2.10 on 2026-01-19 21:46
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_core", "0056_user_roles"),
("authentik_rbac", "0010_remove_role_group_alter_role_name"),
]
operations = [
migrations.RemoveField(
model_name="user",
name="user_permissions",
),
migrations.AlterField(
model_name="group",
name="roles",
field=models.ManyToManyField(
blank=True, related_name="groups", to="authentik_rbac.role"
),
),
migrations.RemoveField(
model_name="user",
name="groups",
),
migrations.RenameField(
model_name="user",
old_name="ak_groups",
new_name="groups",
),
migrations.AlterModelOptions(
name="user",
options={
"permissions": [
("reset_user_password", "Reset Password"),
("impersonate", "Can impersonate other users"),
("preview_user", "Can preview user data sent to providers"),
("view_user_applications", "View applications the user has access to"),
],
"verbose_name": "User",
"verbose_name_plural": "Users",
},
),
]

View File

@@ -1,8 +1,6 @@
"""authentik core models"""
import re
import traceback
from datetime import datetime, timedelta
from datetime import datetime
from enum import StrEnum
from hashlib import sha256
from typing import Any, Self
@@ -17,6 +15,7 @@ from django.contrib.sessions.base_session import AbstractBaseSession
from django.core.validators import validate_slug
from django.db import models
from django.db.models import Q, QuerySet, options
from django.db.models.constants import LOOKUP_SEP
from django.http import HttpRequest
from django.utils.functional import cached_property
from django.utils.timezone import now
@@ -44,7 +43,6 @@ from authentik.lib.models import (
DomainlessFormattedURLValidator,
SerializerModel,
)
from authentik.lib.utils.inheritance import get_deepest_child
from authentik.lib.utils.time import timedelta_from_string
from authentik.policies.models import PolicyBindingModel
from authentik.rbac.models import Role
@@ -52,7 +50,6 @@ from authentik.tenants.models import DEFAULT_TOKEN_DURATION, DEFAULT_TOKEN_LENGT
from authentik.tenants.utils import get_current_tenant, get_unique_identifier
LOGGER = get_logger()
USERNAME_MAX_LENGTH = 150
USER_PATH_SYSTEM_PREFIX = "goauthentik.io"
_USER_ATTR_PREFIX = f"{USER_PATH_SYSTEM_PREFIX}/user"
USER_ATTRIBUTE_DEBUG = f"{_USER_ATTR_PREFIX}/debug"
@@ -186,7 +183,7 @@ class Group(SerializerModel, AttributesMixin):
default=False, help_text=_("Users added to this group will be superusers.")
)
roles = models.ManyToManyField("authentik_rbac.Role", related_name="groups", blank=True)
roles = models.ManyToManyField("authentik_rbac.Role", related_name="ak_groups", blank=True)
parents = models.ManyToManyField(
"Group",
@@ -235,7 +232,7 @@ class Group(SerializerModel, AttributesMixin):
def all_roles(self) -> QuerySet[Role]:
"""Get all roles of this group and all of its ancestors."""
return Role.objects.filter(
groups__in=Group.objects.filter(pk=self.pk).with_ancestors()
ak_groups__in=Group.objects.filter(pk=self.pk).with_ancestors()
).distinct()
def get_managed_role(self, create=False):
@@ -243,7 +240,7 @@ class Group(SerializerModel, AttributesMixin):
name = managed_role_name(self)
role, created = Role.objects.get_or_create(name=name, managed=name)
if created:
role.groups.add(self)
role.ak_groups.add(self)
return role
else:
return Role.objects.filter(name=managed_role_name(self)).first()
@@ -358,17 +355,13 @@ class UserManager(DjangoUserManager):
class User(SerializerModel, AttributesMixin, AbstractUser):
"""authentik User model, based on django's contrib auth user model."""
# Overwriting PermissionsMixin: permissions are handled by roles.
# (This knowingly violates the Liskov substitution principle. It is better to fail loudly.)
user_permissions = None
uuid = models.UUIDField(default=uuid4, editable=False, unique=True)
name = models.TextField(help_text=_("User's display name."))
path = models.TextField(default="users")
type = models.TextField(choices=UserTypes.choices, default=UserTypes.INTERNAL)
sources = models.ManyToManyField("Source", through="UserSourceConnection")
groups = models.ManyToManyField("Group", related_name="users")
ak_groups = models.ManyToManyField("Group", related_name="users")
roles = models.ManyToManyField("authentik_rbac.Role", related_name="users", blank=True)
password_change_date = models.DateTimeField(auto_now_add=True)
@@ -382,6 +375,8 @@ class User(SerializerModel, AttributesMixin, AbstractUser):
permissions = [
("reset_user_password", _("Reset Password")),
("impersonate", _("Can impersonate other users")),
("assign_user_permissions", _("Can assign permissions to users")),
("unassign_user_permissions", _("Can unassign permissions from users")),
("preview_user", _("Can preview user data sent to providers")),
("view_user_applications", _("View applications the user has access to")),
]
@@ -405,11 +400,11 @@ class User(SerializerModel, AttributesMixin, AbstractUser):
def all_groups(self) -> QuerySet[Group]:
"""Recursively get all groups this user is a member of."""
return self.groups.all().with_ancestors()
return self.ak_groups.all().with_ancestors()
def all_roles(self) -> QuerySet[Role]:
"""Get all roles of this user and all of its groups (recursively)."""
return Role.objects.filter(Q(users=self) | Q(groups__in=self.all_groups())).distinct()
return Role.objects.filter(Q(users=self) | Q(ak_groups__in=self.all_groups())).distinct()
def get_managed_role(self, create=False):
if create:
@@ -513,54 +508,6 @@ class User(SerializerModel, AttributesMixin, AbstractUser):
"""superuser == staff user"""
return self.is_superuser # type: ignore
# TODO: remove this after 2026.
@property
def ak_groups(self):
"""This is a proxy for a renamed, deprecated field."""
from authentik.events.models import Event, EventAction
deprecation = "authentik.core.models.User.ak_groups"
replacement = "authentik.core.models.User.groups"
message_logger = (
f"{deprecation} is deprecated and will be removed in a future version of "
f"authentik. Please use {replacement} instead."
)
message_event = (
f"{message_logger} This event will not be repeated until it expires (by "
"default: in 30 days). See authentik logs for every will invocation of this "
"deprecation."
)
stacktrace = traceback.format_stack()
# The last line is this function, the next-to-last line is its caller
cause = stacktrace[-2] if len(stacktrace) > 1 else "Unknown, see stacktrace in logs"
if search := re.search(r'"(.*?)"', cause):
cause = f"Property mapping or Expression policy named {search.group(1)}"
LOGGER.warning(
"deprecation used",
message=message_logger,
deprecation=deprecation,
replacement=replacement,
cause=cause,
stacktrace=stacktrace,
)
if not Event.filter_not_expired(
action=EventAction.CONFIGURATION_WARNING,
context__deprecation=deprecation,
context__cause=cause,
).exists():
event = Event.new(
EventAction.CONFIGURATION_WARNING,
deprecation=deprecation,
replacement=replacement,
message=message_event,
cause=cause,
)
event.expires = datetime.now() + timedelta(days=30)
event.save()
return self.groups
def set_password(self, raw_password, signal=True, sender=None, request=None):
if self.pk and signal:
from authentik.core.signals import password_changed
@@ -766,14 +713,6 @@ class Application(SerializerModel, PolicyBindingModel):
return get_file_manager(FileUsage.MEDIA).file_url(self.meta_icon)
@property
def get_meta_icon_themed_urls(self) -> dict[str, str] | None:
"""Get themed URLs for meta_icon if it contains %(theme)s"""
if not self.meta_icon:
return None
return get_file_manager(FileUsage.MEDIA).themed_urls(self.meta_icon)
def get_launch_url(self, user: User | None = None, user_data: dict | None = None) -> str | None:
"""Get launch URL if set, otherwise attempt to get launch URL based on provider.
@@ -803,7 +742,25 @@ class Application(SerializerModel, PolicyBindingModel):
"""Get casted provider instance. Needs Application queryset with_provider"""
if not self.provider:
return None
return get_deepest_child(self.provider)
candidates = []
base_class = Provider
for subclass in base_class.objects.get_queryset()._get_subclasses_recurse(base_class):
parent = self.provider
for level in subclass.split(LOOKUP_SEP):
try:
parent = getattr(parent, level)
except AttributeError:
break
if parent in candidates:
continue
idx = subclass.count(LOOKUP_SEP)
if type(parent) is not base_class:
idx += 1
candidates.insert(idx, parent)
if not candidates:
return None
return candidates[-1]
def backchannel_provider_for[T: Provider](self, provider_type: type[T], **kwargs) -> T | None:
"""Get Backchannel provider for a specific type"""
@@ -970,14 +927,6 @@ class Source(ManagedModel, SerializerModel, PolicyBindingModel):
return get_file_manager(FileUsage.MEDIA).file_url(self.icon)
@property
def icon_themed_urls(self) -> dict[str, str] | None:
"""Get themed URLs for icon if it contains %(theme)s"""
if not self.icon:
return None
return get_file_manager(FileUsage.MEDIA).themed_urls(self.icon)
def get_user_path(self) -> str:
"""Get user path, fallback to default for formatting errors"""
try:

View File

@@ -392,10 +392,10 @@ class GroupUpdateStage(StageView):
groups.append(group)
with transaction.atomic():
self.user.groups.remove(
*self.user.groups.filter(groupsourceconnection__source=self.source)
self.user.ak_groups.remove(
*self.user.ak_groups.filter(groupsourceconnection__source=self.source)
)
self.user.groups.add(*groups)
self.user.ak_groups.add(*groups)
return True

View File

@@ -44,24 +44,19 @@
{% endblock %}
</div>
</main>
<footer
name="site-footer"
aria-label="{% trans 'Site footer' %}"
class="pf-c-login__footer pf-m-dark">
<div name="flow-links" aria-label="{% trans 'Flow links' %}">
<ul class="pf-c-list pf-m-inline" part="list">
{% for link in footer_links %}
<li part="list-item">
<a part="list-item-link" href="{{ link.href }}">{{ link.name }}</a>
</li>
{% endfor %}
<li part="list-item">
<span>
{% trans 'Powered by authentik' %}
</span>
</li>
</ul>
</div>
<footer aria-label="Site footer" class="pf-c-login__footer pf-m-dark">
<ul class="pf-c-list pf-m-inline">
{% for link in footer_links %}
<li>
<a href="{{ link.href }}">{{ link.name }}</a>
</li>
{% endfor %}
<li>
<span>
{% trans 'Powered by authentik' %}
</span>
</li>
</ul>
</footer>
</div>
</div>

View File

@@ -38,7 +38,7 @@ class TestApplicationEntitlements(APITestCase):
def test_group(self):
"""Test direct group"""
group = Group.objects.create(name=generate_id())
self.user.groups.add(group)
self.user.ak_groups.add(group)
ent = ApplicationEntitlement.objects.create(app=self.app, name=generate_id())
PolicyBinding.objects.create(target=ent, group=group, order=0)
ents = self.user.app_entitlements(self.app)
@@ -50,7 +50,7 @@ class TestApplicationEntitlements(APITestCase):
parent = Group.objects.create(name=generate_id())
group = Group.objects.create(name=generate_id())
group.parents.add(parent)
self.user.groups.add(group)
self.user.ak_groups.add(group)
ent = ApplicationEntitlement.objects.create(app=self.app, name=generate_id())
PolicyBinding.objects.create(target=ent, group=parent, order=0)
ents = self.user.app_entitlements(self.app)

View File

@@ -107,8 +107,6 @@ class TestApplicationsAPI(APITestCase):
"provider_obj": {
"assigned_application_name": "allowed",
"assigned_application_slug": "allowed",
"assigned_backchannel_application_name": None,
"assigned_backchannel_application_slug": None,
"authentication_flow": None,
"invalidation_flow": None,
"authorization_flow": str(self.provider.authorization_flow.pk),
@@ -127,7 +125,6 @@ class TestApplicationsAPI(APITestCase):
"open_in_new_tab": True,
"meta_icon": "",
"meta_icon_url": None,
"meta_icon_themed_urls": None,
"meta_description": "",
"meta_publisher": "",
"policy_engine_mode": "any",
@@ -165,8 +162,6 @@ class TestApplicationsAPI(APITestCase):
"provider_obj": {
"assigned_application_name": "allowed",
"assigned_application_slug": "allowed",
"assigned_backchannel_application_name": None,
"assigned_backchannel_application_slug": None,
"authentication_flow": None,
"invalidation_flow": None,
"authorization_flow": str(self.provider.authorization_flow.pk),
@@ -185,7 +180,6 @@ class TestApplicationsAPI(APITestCase):
"open_in_new_tab": True,
"meta_icon": "",
"meta_icon_url": None,
"meta_icon_themed_urls": None,
"meta_description": "",
"meta_publisher": "",
"policy_engine_mode": "any",
@@ -195,7 +189,6 @@ class TestApplicationsAPI(APITestCase):
"meta_description": "",
"meta_icon": "",
"meta_icon_url": None,
"meta_icon_themed_urls": None,
"meta_launch_url": "",
"open_in_new_tab": False,
"meta_publisher": "",

View File

@@ -122,8 +122,8 @@ class TestGroupsAPI(APITestCase):
def test_superuser_update_no_perm(self):
"""Test updating a superuser group without permission"""
group = Group.objects.create(name=generate_id(), is_superuser=True)
self.login_user.assign_perms_to_managed_role("authentik_core.view_group", group)
self.login_user.assign_perms_to_managed_role("authentik_core.change_group", group)
self.login_user.assign_perms_to_managed_role("view_group", group)
self.login_user.assign_perms_to_managed_role("change_group", group)
self.client.force_login(self.login_user)
res = self.client.patch(
reverse("authentik_api:group-detail", kwargs={"pk": group.pk}),
@@ -139,8 +139,8 @@ class TestGroupsAPI(APITestCase):
"""Test updating a superuser group without permission
and without changing the superuser status"""
group = Group.objects.create(name=generate_id(), is_superuser=True)
self.login_user.assign_perms_to_managed_role("authentik_core.view_group", group)
self.login_user.assign_perms_to_managed_role("authentik_core.change_group", group)
self.login_user.assign_perms_to_managed_role("view_group", group)
self.login_user.assign_perms_to_managed_role("change_group", group)
self.client.force_login(self.login_user)
res = self.client.patch(
reverse("authentik_api:group-detail", kwargs={"pk": group.pk}),

View File

@@ -54,7 +54,7 @@ class TestSourceFlowManager(FlowTestCase):
)
self.assertTrue(stage.handle_groups())
self.assertTrue(Group.objects.filter(name="group 1").exists())
self.assertTrue(self.user.groups.filter(name="group 1").exists())
self.assertTrue(self.user.ak_groups.filter(name="group 1").exists())
self.assertTrue(
GroupOAuthSourceConnection.objects.filter(
group=Group.objects.get(name="group 1"), source=self.source
@@ -88,7 +88,7 @@ class TestSourceFlowManager(FlowTestCase):
)
self.assertTrue(stage.handle_groups())
self.assertTrue(Group.objects.filter(name="group 1").exists())
self.assertTrue(self.user.groups.filter(name="group 1").exists())
self.assertTrue(self.user.ak_groups.filter(name="group 1").exists())
self.assertTrue(
GroupOAuthSourceConnection.objects.filter(
group=Group.objects.get(name="group 1"), source=self.source
@@ -123,7 +123,7 @@ class TestSourceFlowManager(FlowTestCase):
)
self.assertTrue(stage.handle_groups())
self.assertTrue(Group.objects.filter(name="group 1").exists())
self.assertTrue(self.user.groups.filter(name="group 1").exists())
self.assertTrue(self.user.ak_groups.filter(name="group 1").exists())
self.assertTrue(
GroupOAuthSourceConnection.objects.filter(group=group, source=self.source).exists()
)
@@ -155,7 +155,7 @@ class TestSourceFlowManager(FlowTestCase):
)
self.assertTrue(stage.handle_groups())
self.assertTrue(Group.objects.filter(name="group 1").exists())
self.assertTrue(self.user.groups.filter(name="group 1").exists())
self.assertTrue(self.user.ak_groups.filter(name="group 1").exists())
self.assertTrue(
GroupOAuthSourceConnection.objects.filter(
group=Group.objects.get(name="group 1"), source=self.source
@@ -189,7 +189,7 @@ class TestSourceFlowManager(FlowTestCase):
request=request,
)
self.assertFalse(stage.handle_groups())
self.assertFalse(self.user.groups.filter(name="group 1").exists())
self.assertFalse(self.user.ak_groups.filter(name="group 1").exists())
self.assertFalse(
GroupOAuthSourceConnection.objects.filter(group=group, source=self.source).exists()
)
@@ -201,7 +201,7 @@ class TestSourceFlowManager(FlowTestCase):
other_group = Group.objects.create(name="other group")
old_group = Group.objects.create(name="old group")
new_group = Group.objects.create(name="new group")
self.user.groups.set([other_group, old_group])
self.user.ak_groups.set([other_group, old_group])
GroupOAuthSourceConnection.objects.create(
group=old_group, source=self.source, identifier=old_group.name
)
@@ -231,7 +231,7 @@ class TestSourceFlowManager(FlowTestCase):
request=request,
)
self.assertTrue(stage.handle_groups())
self.assertFalse(self.user.groups.filter(name="old group").exists())
self.assertTrue(self.user.groups.filter(name="other group").exists())
self.assertTrue(self.user.groups.filter(name="new group").exists())
self.assertEqual(self.user.groups.count(), 2)
self.assertFalse(self.user.ak_groups.filter(name="old group").exists())
self.assertTrue(self.user.ak_groups.filter(name="other group").exists())
self.assertTrue(self.user.ak_groups.filter(name="new group").exists())
self.assertEqual(self.user.ak_groups.count(), 2)

View File

@@ -3,7 +3,6 @@
from django.test.testcases import TestCase
from authentik.core.models import User
from authentik.events.models import Event
from authentik.lib.generators import generate_id
@@ -19,17 +18,3 @@ class TestUsers(TestCase):
self.assertTrue(user.has_perm(perm))
user.remove_perms_from_managed_role(perm)
self.assertFalse(user.has_perm(perm))
def test_user_ak_groups(self):
"""Test user.ak_groups is a proxy for user.groups"""
user = User.objects.create(username=generate_id())
self.assertEqual(user.ak_groups, user.groups)
def test_user_ak_groups_event(self):
"""Test user.ak_groups creates exactly one event"""
user = User.objects.create(username=generate_id())
self.assertEqual(Event.objects.count(), 0)
user.ak_groups.all()
self.assertEqual(Event.objects.count(), 1)
user.ak_groups.all()
self.assertEqual(Event.objects.count(), 1)

View File

@@ -1,10 +1,9 @@
"""Test Users API"""
from datetime import datetime, timedelta
from datetime import datetime
from json import loads
from django.urls.base import reverse
from django.utils.timezone import now
from rest_framework.test import APITestCase
from authentik.brands.models import Brand
@@ -128,62 +127,13 @@ class TestUsersAPI(APITestCase):
)
self.assertEqual(response.status_code, 200)
def test_recovery_duration(self):
"""Test user recovery token duration"""
Token.objects.all().delete()
flow = create_test_flow(
FlowDesignation.RECOVERY,
authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
)
brand: Brand = create_test_brand()
brand.flow_recovery = flow
brand.save()
self.client.force_login(self.admin)
response = self.client.post(
reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
data={"token_duration": "days=33"},
)
self.assertEqual(response.status_code, 200)
expires = Token.objects.first().expires
expected_expires = now() + timedelta(days=33)
self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
def test_recovery_duration_update(self):
"""Test user recovery token duration update"""
Token.objects.all().delete()
flow = create_test_flow(
FlowDesignation.RECOVERY,
authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
)
brand: Brand = create_test_brand()
brand.flow_recovery = flow
brand.save()
self.client.force_login(self.admin)
response = self.client.post(
reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
data={"token_duration": "days=33"},
)
self.assertEqual(response.status_code, 200)
expires = Token.objects.first().expires
expected_expires = now() + timedelta(days=33)
self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
response = self.client.post(
reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
data={"token_duration": "days=66"},
)
expires = Token.objects.first().expires
expected_expires = now() + timedelta(days=66)
self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
def test_recovery_email_no_flow(self):
"""Test user recovery link (no recovery flow set)"""
self.client.force_login(self.admin)
self.user.email = ""
self.user.save()
stage = EmailStage.objects.create(name="email")
response = self.client.post(
reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk}),
data={"email_stage": stage.pk},
reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk})
)
self.assertEqual(response.status_code, 400)
self.assertJSONEqual(
@@ -192,8 +142,7 @@ class TestUsersAPI(APITestCase):
self.user.email = "foo@bar.baz"
self.user.save()
response = self.client.post(
reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk}),
data={"email_stage": stage.pk},
reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk})
)
self.assertEqual(response.status_code, 400)
self.assertJSONEqual(response.content, {"non_field_errors": "No recovery flow set."})
@@ -211,7 +160,7 @@ class TestUsersAPI(APITestCase):
reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk})
)
self.assertEqual(response.status_code, 400)
self.assertJSONEqual(response.content, {"email_stage": ["This field is required."]})
self.assertJSONEqual(response.content, {"non_field_errors": "Email stage does not exist."})
def test_recovery_email(self):
"""Test user recovery link"""
@@ -229,8 +178,8 @@ class TestUsersAPI(APITestCase):
reverse(
"authentik_api:user-recovery-email",
kwargs={"pk": self.user.pk},
),
data={"email_stage": stage.pk},
)
+ f"?email_stage={stage.pk}"
)
self.assertEqual(response.status_code, 204)

View File

@@ -7,8 +7,6 @@ from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
from cryptography.x509.oid import NameOID
from django.db import models
@@ -23,8 +21,6 @@ class PrivateKeyAlg(models.TextChoices):
RSA = "rsa", _("rsa")
ECDSA = "ecdsa", _("ecdsa")
ED25519 = "ed25519", _("Ed25519")
ED448 = "ed448", _("Ed448")
class CertificateBuilder:
@@ -60,10 +56,6 @@ class CertificateBuilder:
return rsa.generate_private_key(
public_exponent=65537, key_size=4096, backend=default_backend()
)
if self.alg == PrivateKeyAlg.ED25519:
return Ed25519PrivateKey.generate()
if self.alg == PrivateKeyAlg.ED448:
return Ed448PrivateKey.generate()
raise ValueError(f"Invalid alg: {self.alg}")
def build(
@@ -106,25 +98,18 @@ class CertificateBuilder:
self.__builder = self.__builder.add_extension(
x509.SubjectAlternativeName(alt_names), critical=True
)
algo = hashes.SHA256()
# EdDSA doesn't take a hash algorithm
if isinstance(self.__private_key, (Ed25519PrivateKey | Ed448PrivateKey)):
algo = None
self.__certificate = self.__builder.sign(
private_key=self.__private_key,
algorithm=algo,
algorithm=hashes.SHA256(),
backend=default_backend(),
)
@property
def private_key(self):
"""Return private key in PEM format"""
format = serialization.PrivateFormat.TraditionalOpenSSL
if isinstance(self.__private_key, (Ed25519PrivateKey | Ed448PrivateKey)):
format = serialization.PrivateFormat.PKCS8
return self.__private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=format,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8")

View File

@@ -78,7 +78,7 @@ def generate_key_id_legacy(key_data: str) -> str:
"""Generate Key ID using MD5 (legacy format for backwards compatibility)."""
if not key_data:
return ""
return md5(key_data.encode("utf-8"), usedforsecurity=False).hexdigest() # nosec
return md5(key_data.encode("utf-8")).hexdigest() # nosec
class CertificateKeyPair(SerializerModel, ManagedModel, CreatedUpdatedModel):

View File

@@ -196,10 +196,8 @@ class TestCrypto(APITestCase):
"""Test certificate export (download)"""
keypair = create_test_cert()
user = create_test_user()
user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair)
user.assign_perms_to_managed_role(
"authentik_crypto.view_certificatekeypair_certificate", keypair
)
user.assign_perms_to_managed_role("view_certificatekeypair", keypair)
user.assign_perms_to_managed_role("view_certificatekeypair_certificate", keypair)
self.client.force_login(user)
response = self.client.get(
reverse(
@@ -222,8 +220,8 @@ class TestCrypto(APITestCase):
"""Test private_key export (download)"""
keypair = create_test_cert()
user = create_test_user()
user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair)
user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair_key", keypair)
user.assign_perms_to_managed_role("view_certificatekeypair", keypair)
user.assign_perms_to_managed_role("view_certificatekeypair_key", keypair)
self.client.force_login(user)
response = self.client.get(
reverse(

View File

@@ -1,6 +1,5 @@
from hashlib import sha256
from json import loads
from unittest.mock import PropertyMock, patch
from django.urls import reverse
from jwt import encode
@@ -233,43 +232,3 @@ class TestEndpointStage(FlowTestCase):
plan = plan()
self.assertNotIn(PLAN_CONTEXT_AGENT_ENDPOINT_CHALLENGE, plan.context)
self.assertEqual(plan.context[PLAN_CONTEXT_DEVICE], self.device)
def test_endpoint_stage_connector_no_stage_optional(self):
flow = create_test_flow()
stage = EndpointStage.objects.create(connector=self.connector, mode=StageMode.OPTIONAL)
FlowStageBinding.objects.create(stage=stage, target=flow, order=0)
with patch(
"authentik.endpoints.connectors.agent.models.AgentConnector.stage",
PropertyMock(return_value=None),
):
with self.assertFlowFinishes() as plan:
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
)
self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
plan = plan()
self.assertNotIn(PLAN_CONTEXT_AGENT_ENDPOINT_CHALLENGE, plan.context)
self.assertNotIn(PLAN_CONTEXT_DEVICE, plan.context)
def test_endpoint_stage_connector_no_stage_required(self):
flow = create_test_flow()
stage = EndpointStage.objects.create(connector=self.connector, mode=StageMode.REQUIRED)
FlowStageBinding.objects.create(stage=stage, target=flow, order=0)
with patch(
"authentik.endpoints.connectors.agent.models.AgentConnector.stage",
PropertyMock(return_value=None),
):
with self.assertFlowFinishes() as plan:
res = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
)
self.assertStageResponse(
res,
component="ak-stage-access-denied",
error_message="Invalid stage configuration",
)
plan = plan()
self.assertNotIn(PLAN_CONTEXT_AGENT_ENDPOINT_CHALLENGE, plan.context)
self.assertNotIn(PLAN_CONTEXT_DEVICE, plan.context)

View File

@@ -1,4 +1,4 @@
from authentik.endpoints.models import EndpointStage, StageMode
from authentik.endpoints.models import EndpointStage
from authentik.flows.stage import StageView
PLAN_CONTEXT_ENDPOINT_CONNECTOR = "endpoint_connector"
@@ -6,24 +6,15 @@ PLAN_CONTEXT_ENDPOINT_CONNECTOR = "endpoint_connector"
class EndpointStageView(StageView):
def _get_inner(self) -> StageView | None:
def _get_inner(self):
stage: EndpointStage = self.executor.current_stage
inner_stage: type[StageView] | None = stage.connector.stage
if not inner_stage:
return None
return self.executor.stage_ok()
return inner_stage(self.executor, request=self.request)
def dispatch(self, request, *args, **kwargs):
inner = self._get_inner()
if inner is None:
stage: EndpointStage = self.executor.current_stage
if stage.mode == StageMode.OPTIONAL:
return self.executor.stage_ok()
else:
return self.executor.stage_invalid("Invalid stage configuration")
return inner.dispatch(request, *args, **kwargs)
return self._get_inner().dispatch(request, *args, **kwargs)
def cleanup(self):
inner = self._get_inner()
if inner is not None:
return inner.cleanup()
return self._get_inner().cleanup()

View File

@@ -60,18 +60,20 @@ class TestEndpointFacts(APITestCase):
]
}
)
self.assertCountEqual(
device.cached_facts.data["software"],
[
{
"name": "software-a",
"version": "1.2.3.4",
"source": "package",
},
{
"name": "software-b",
"version": "5.6.7.8",
"source": "package",
},
],
self.assertEqual(
device.cached_facts.data,
{
"software": [
{
"name": "software-a",
"version": "1.2.3.4",
"source": "package",
},
{
"name": "software-b",
"version": "5.6.7.8",
"source": "package",
},
]
},
)

View File

@@ -10,7 +10,6 @@ from jwt import PyJWTError, decode, encode, get_unverified_header
from rest_framework.exceptions import ValidationError
from structlog.stdlib import get_logger
from authentik.common.oauth.constants import TOKEN_TYPE
from authentik.core.models import AuthenticatedSession, Session, User
from authentik.core.sessions import SessionStore
from authentik.crypto.apps import MANAGED_KEY
@@ -27,6 +26,7 @@ from authentik.events.models import Event, EventAction
from authentik.events.signals import SESSION_LOGIN_EVENT
from authentik.flows.planner import PLAN_CONTEXT_DEVICE
from authentik.lib.utils.time import timedelta_from_string
from authentik.providers.oauth2.constants import TOKEN_TYPE
from authentik.providers.oauth2.id_token import IDToken
from authentik.providers.oauth2.models import JWTAlgorithms
from authentik.root.middleware import SessionMiddleware

View File

@@ -15,7 +15,6 @@ from django.core.cache import cache
from django.db.models.query import QuerySet
from django.utils.timezone import now
from jwt import PyJWTError, decode, get_unverified_header
from jwt.algorithms import ECAlgorithm
from rest_framework.exceptions import ValidationError
from rest_framework.fields import (
ChoiceField,
@@ -110,20 +109,13 @@ class LicenseKey:
intermediate.verify_directly_issued_by(get_licensing_key())
except InvalidSignature, TypeError, ValueError, Error:
raise ValidationError("Unable to verify license") from None
_validate_curve_original = ECAlgorithm._validate_curve
try:
# authentik's license are generated with `algorithm="ES512"` and signed with
# a key of curve `secp384r1`. Starting with version 2.11.0, pyjwt enforces the spec, see
# https://github.com/jpadilla/pyjwt/commit/5b8622773358e56d3d3c0a9acf404809ff34433a
# authentik will change its license generation to `algorithm="ES384"` in 2026.
# TODO: remove this when the last incompatible license runs out.
ECAlgorithm._validate_curve = lambda *_: True
body = from_dict(
LicenseKey,
decode(
jwt,
our_cert.public_key(),
algorithms=["ES384", "ES512"],
algorithms=["ES512"],
audience=get_license_aud(),
options={"verify_exp": check_expiry, "verify_signature": check_expiry},
),
@@ -133,8 +125,6 @@ class LicenseKey:
if unverified["aud"] != get_license_aud():
raise ValidationError("Invalid Install ID in license") from None
raise ValidationError("Unable to verify license") from None
finally:
ECAlgorithm._validate_curve = _validate_curve_original
return body
@staticmethod

View File

@@ -1,150 +0,0 @@
from datetime import datetime
from django.db.models import BooleanField as ModelBooleanField
from django.db.models import Case, Q, Value, When
from django_filters.rest_framework import BooleanFilter, FilterSet
from drf_spectacular.utils import extend_schema
from rest_framework.decorators import action
from rest_framework.fields import IntegerField, SerializerMethodField
from rest_framework.mixins import CreateModelMixin
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
from authentik.core.api.utils import ModelSerializer
from authentik.enterprise.api import EnterpriseRequiredMixin
from authentik.enterprise.lifecycle.api.reviews import ReviewSerializer
from authentik.enterprise.lifecycle.models import LifecycleIteration, ReviewState
from authentik.enterprise.lifecycle.utils import (
ContentTypeField,
ReviewerGroupSerializer,
ReviewerUserSerializer,
admin_link_for_model,
parse_content_type,
start_of_day,
)
from authentik.lib.utils.time import timedelta_from_string
class LifecycleIterationSerializer(EnterpriseRequiredMixin, ModelSerializer):
content_type = ContentTypeField()
object_verbose = SerializerMethodField()
object_admin_url = SerializerMethodField(read_only=True)
grace_period_end = SerializerMethodField(read_only=True)
reviews = ReviewSerializer(many=True, read_only=True, source="review_set.all")
user_can_review = SerializerMethodField(read_only=True)
reviewer_groups = ReviewerGroupSerializer(
many=True, read_only=True, source="rule.reviewer_groups"
)
min_reviewers = IntegerField(read_only=True, source="rule.min_reviewers")
reviewers = ReviewerUserSerializer(many=True, read_only=True, source="rule.reviewers")
next_review_date = SerializerMethodField(read_only=True)
class Meta:
model = LifecycleIteration
fields = [
"id",
"content_type",
"object_id",
"object_verbose",
"object_admin_url",
"state",
"opened_on",
"grace_period_end",
"next_review_date",
"reviews",
"user_can_review",
"reviewer_groups",
"min_reviewers",
"reviewers",
]
read_only_fields = fields
def get_object_verbose(self, iteration: LifecycleIteration) -> str:
return str(iteration.object)
def get_object_admin_url(self, iteration: LifecycleIteration) -> str:
return admin_link_for_model(iteration.object)
def get_grace_period_end(self, iteration: LifecycleIteration) -> datetime:
return start_of_day(
iteration.opened_on + timedelta_from_string(iteration.rule.grace_period)
)
def get_next_review_date(self, iteration: LifecycleIteration) -> datetime:
return start_of_day(iteration.opened_on + timedelta_from_string(iteration.rule.interval))
def get_user_can_review(self, iteration: LifecycleIteration) -> bool:
return iteration.user_can_review(self.context["request"].user)
class LifecycleIterationFilterSet(FilterSet):
user_is_reviewer = BooleanFilter(field_name="user_is_reviewer", lookup_expr="exact")
class IterationViewSet(EnterpriseRequiredMixin, CreateModelMixin, GenericViewSet):
queryset = LifecycleIteration.objects.all()
serializer_class = LifecycleIterationSerializer
ordering = ["-opened_on"]
ordering_fields = ["state", "content_type__model", "opened_on", "grace_period_end"]
filterset_class = LifecycleIterationFilterSet
def get_queryset(self):
user = self.request.user
return self.queryset.annotate(
user_is_reviewer=Case(
When(
Q(rule__reviewers=user)
| Q(rule__reviewer_groups__in=user.groups.all().with_ancestors()),
then=Value(True),
),
default=Value(False),
output_field=ModelBooleanField(),
)
).distinct()
@action(
detail=False,
methods=["get"],
url_path=r"latest/(?P<content_type>[^/]+)/(?P<object_id>[^/]+)",
)
def latest_iteration(self, request: Request, content_type: str, object_id: str) -> Response:
ct = parse_content_type(content_type)
try:
obj = (
self.get_queryset()
.filter(
content_type__app_label=ct["app_label"],
content_type__model=ct["model"],
object_id=object_id,
)
.latest("opened_on")
)
except LifecycleIteration.DoesNotExist:
return Response(status=404)
serializer = self.get_serializer(obj)
return Response(serializer.data)
@extend_schema(
operation_id="lifecycle_iterations_list_open",
responses={200: LifecycleIterationSerializer(many=True)},
)
@action(
detail=False,
methods=["get"],
url_path=r"open",
)
def open_iterations(self, request: Request):
iterations = self.get_queryset().filter(
Q(state=ReviewState.PENDING) | Q(state=ReviewState.OVERDUE)
)
iterations = self.filter_queryset(iterations)
page = self.paginate_queryset(iterations)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(iterations, many=True)
return Response(serializer.data)

View File

@@ -1,33 +0,0 @@
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ValidationError
from rest_framework.mixins import CreateModelMixin
from rest_framework.viewsets import GenericViewSet
from authentik.core.api.utils import ModelSerializer
from authentik.enterprise.api import EnterpriseRequiredMixin
from authentik.enterprise.lifecycle.models import LifecycleIteration, Review
from authentik.enterprise.lifecycle.utils import ReviewerUserSerializer
class ReviewSerializer(EnterpriseRequiredMixin, ModelSerializer):
reviewer = ReviewerUserSerializer(read_only=True)
class Meta:
model = Review
fields = ["id", "iteration", "reviewer", "timestamp", "note"]
read_only_fields = ["id", "timestamp", "reviewer"]
def validate_iteration(self, iteration: LifecycleIteration) -> LifecycleIteration:
user = self.context["request"].user
if not iteration.user_can_review(user):
raise ValidationError(_("You are not allowed to submit a review for this object."))
return iteration
class ReviewViewSet(EnterpriseRequiredMixin, CreateModelMixin, GenericViewSet):
queryset = Review.objects.all()
serializer_class = ReviewSerializer
def perform_create(self, serializer: ReviewSerializer) -> None:
review = serializer.save(reviewer=self.request.user)
review.iteration.on_review(self.request)

View File

@@ -1,113 +0,0 @@
from django.utils.translation import gettext as _
from rest_framework.exceptions import ValidationError
from rest_framework.fields import SerializerMethodField
from rest_framework.relations import SlugRelatedField
from rest_framework.viewsets import ModelViewSet
from authentik.core.api.utils import ModelSerializer
from authentik.core.models import User
from authentik.enterprise.api import EnterpriseRequiredMixin
from authentik.enterprise.lifecycle.models import LifecycleRule
from authentik.enterprise.lifecycle.utils import (
ContentTypeField,
ReviewerGroupSerializer,
ReviewerUserSerializer,
)
from authentik.lib.utils.time import timedelta_from_string
class LifecycleRuleSerializer(EnterpriseRequiredMixin, ModelSerializer):
content_type = ContentTypeField()
target_verbose = SerializerMethodField()
reviewer_groups_obj = ReviewerGroupSerializer(
many=True, read_only=True, source="reviewer_groups"
)
reviewers = SlugRelatedField(slug_field="uuid", many=True, queryset=User.objects.all())
reviewers_obj = ReviewerUserSerializer(many=True, read_only=True, source="reviewers")
class Meta:
model = LifecycleRule
fields = [
"id",
"name",
"content_type",
"object_id",
"interval",
"grace_period",
"reviewer_groups",
"reviewer_groups_obj",
"min_reviewers",
"min_reviewers_is_per_group",
"reviewers",
"reviewers_obj",
"notification_transports",
"target_verbose",
]
read_only_fields = ["id", "reviewers_obj", "reviewer_groups_obj", "target_verbose"]
def get_target_verbose(self, rule: LifecycleRule) -> str:
if rule.object_id is None:
return rule.content_type.model_class()._meta.verbose_name_plural
else:
return f"{rule.content_type.model_class()._meta.verbose_name}: {rule.object}"
def validate_object_id(self, value: str) -> str | None:
if value == "":
return None
return value
def validate(self, attrs: dict) -> dict:
if (
attrs.get("object_id") is not None
and not attrs["content_type"]
.get_all_objects_for_this_type(pk=attrs["object_id"])
.exists()
):
raise ValidationError({"object_id": _("Object does not exist")})
if "reviewer_groups" in attrs or "reviewers" in attrs:
reviewer_groups = attrs.get(
"reviewer_groups", self.instance.reviewer_groups.all() if self.instance else []
)
reviewers = attrs.get(
"reviewers", self.instance.reviewers.all() if self.instance else []
)
if len(reviewer_groups) == 0 and len(reviewers) == 0:
raise ValidationError(_("Either a reviewer group or a reviewer must be set."))
if "grace_period" in attrs or "interval" in attrs:
grace_period = attrs.get("grace_period", getattr(self.instance, "grace_period", None))
interval = attrs.get("interval", getattr(self.instance, "interval", None))
if (
grace_period is not None
and interval is not None
and (timedelta_from_string(grace_period) > timedelta_from_string(interval))
):
raise ValidationError(
{"grace_period": _("Grace period must be shorter than the interval.")}
)
if "content_type" in attrs or "object_id" in attrs:
content_type = attrs.get("content_type", getattr(self.instance, "content_type", None))
object_id = attrs.get("object_id", getattr(self.instance, "object_id", None))
if content_type is not None and object_id is None:
existing = LifecycleRule.objects.filter(
content_type=content_type, object_id__isnull=True
)
if self.instance:
existing = existing.exclude(pk=self.instance.pk)
if existing.exists():
raise ValidationError(
{
"content_type": _(
"Only one type-wide rule for each object type is allowed."
)
}
)
return attrs
class LifecycleRuleViewSet(ModelViewSet):
queryset = LifecycleRule.objects.all()
serializer_class = LifecycleRuleSerializer
search_fields = ["content_type__model", "reviewer_groups__name", "reviewers__username"]
ordering = ["name"]
ordering_fields = ["name", "content_type__model"]
filterset_fields = ["content_type__model"]

View File

@@ -1,22 +0,0 @@
from authentik.enterprise.apps import EnterpriseConfig
from authentik.lib.utils.time import fqdn_rand
from authentik.tasks.schedules.common import ScheduleSpec
class ReportsConfig(EnterpriseConfig):
name = "authentik.enterprise.lifecycle"
label = "authentik_lifecycle"
verbose_name = "authentik Enterprise.Lifecycle"
default = True
@property
def tenant_schedule_specs(self) -> list[ScheduleSpec]:
from authentik.enterprise.lifecycle.tasks import apply_lifecycle_rules
return [
ScheduleSpec(
actor=apply_lifecycle_rules,
crontab=f"{fqdn_rand('lifecycle_apply_lifecycle_rules')} "
f"{fqdn_rand('lifecycle_apply_lifecycle_rules', 24)} * * *",
)
]

View File

@@ -1,154 +0,0 @@
# Generated by Django 5.2.11 on 2026-02-09 15:57
import authentik.lib.utils.time
import django.db.models.deletion
import uuid
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("authentik_core", "0057_remove_user_groups_remove_user_user_permissions_and_more"),
("authentik_events", "0016_alter_event_action"),
("contenttypes", "0002_remove_content_type_name"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="LifecycleRule",
fields=[
("id", models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
("name", models.TextField(unique=True)),
("object_id", models.TextField(default=None, null=True)),
(
"interval",
models.TextField(
default="days=60",
validators=[authentik.lib.utils.time.timedelta_string_validator],
),
),
(
"grace_period",
models.TextField(
default="days=30",
validators=[authentik.lib.utils.time.timedelta_string_validator],
),
),
("min_reviewers", models.PositiveSmallIntegerField(default=1)),
("min_reviewers_is_per_group", models.BooleanField(default=False)),
(
"content_type",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, to="contenttypes.contenttype"
),
),
(
"notification_transports",
models.ManyToManyField(
blank=True,
help_text="Select which transports should be used to notify the reviewers. If none are selected, the notification will only be shown in the authentik UI.",
to="authentik_events.notificationtransport",
),
),
("reviewer_groups", models.ManyToManyField(blank=True, to="authentik_core.group")),
("reviewers", models.ManyToManyField(blank=True, to=settings.AUTH_USER_MODEL)),
],
),
migrations.CreateModel(
name="LifecycleIteration",
fields=[
(
"managed",
models.TextField(
default=None,
help_text="Objects that are managed by authentik. These objects are created and updated automatically. This flag only indicates that an object can be overwritten by migrations. You can still modify the objects via the API, but expect changes to be overwritten in a later update.",
null=True,
unique=True,
verbose_name="Managed by authentik",
),
),
("id", models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
("object_id", models.TextField()),
(
"state",
models.CharField(
choices=[
("REVIEWED", "Reviewed"),
("PENDING", "Pending"),
("OVERDUE", "Overdue"),
("CANCELED", "Canceled"),
],
default="PENDING",
max_length=10,
),
),
("opened_on", models.DateField(auto_now_add=True)),
(
"content_type",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, to="contenttypes.contenttype"
),
),
(
"rule",
models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="authentik_lifecycle.lifecyclerule",
),
),
],
),
migrations.CreateModel(
name="Review",
fields=[
("id", models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
("timestamp", models.DateTimeField(auto_now_add=True)),
("note", models.TextField(null=True)),
(
"iteration",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="authentik_lifecycle.lifecycleiteration",
),
),
(
"reviewer",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL
),
),
],
),
migrations.AddIndex(
model_name="lifecyclerule",
index=models.Index(fields=["content_type"], name="authentik_l_content_4e3a6a_idx"),
),
migrations.AddConstraint(
model_name="lifecyclerule",
constraint=models.UniqueConstraint(
condition=models.Q(("object_id__isnull", True)),
fields=("content_type",),
name="uniq_lifecycle_rule_ct_null_object",
),
),
migrations.AlterUniqueTogether(
name="lifecyclerule",
unique_together={("content_type", "object_id")},
),
migrations.AddIndex(
model_name="lifecycleiteration",
index=models.Index(
fields=["content_type", "opened_on"], name="authentik_l_content_09c32a_idx"
),
),
migrations.AlterUniqueTogether(
name="review",
unique_together={("iteration", "reviewer")},
),
]

View File

@@ -1,18 +0,0 @@
# Generated by Django 5.2.11 on 2026-02-13 09:33
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_lifecycle", "0001_initial"),
]
operations = [
migrations.AlterField(
model_name="lifecycleiteration",
name="opened_on",
field=models.DateTimeField(auto_now_add=True),
),
]

View File

@@ -1,292 +0,0 @@
from datetime import timedelta
from uuid import uuid4
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.db.models import Q, QuerySet
from django.db.models.fields import Field
from django.db.models.functions import Cast
from django.http import HttpRequest
from django.utils import timezone
from django.utils.translation import gettext as _
from rest_framework.serializers import BaseSerializer
from authentik.blueprints.models import ManagedModel
from authentik.core.models import Group, User
from authentik.enterprise.lifecycle.utils import link_for_model, start_of_day
from authentik.events.models import Event, EventAction, NotificationSeverity, NotificationTransport
from authentik.lib.models import SerializerModel
from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator
class LifecycleRule(SerializerModel):
id = models.UUIDField(primary_key=True, default=uuid4)
name = models.TextField(unique=True)
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
object_id = models.TextField(null=True, default=None)
object = GenericForeignKey("content_type", "object_id")
interval = models.TextField(
default="days=60",
validators=[timedelta_string_validator],
)
# Grace period starts after a review is due
grace_period = models.TextField(
default="days=30",
validators=[timedelta_string_validator],
)
# The review has to be conducted by `min_reviewers` members of `reviewer_groups`
# (total or per group depending on `min_reviewers_is_per_group` flag) as well
# as all of `reviewers`
reviewer_groups = models.ManyToManyField("authentik_core.Group", blank=True)
min_reviewers = models.PositiveSmallIntegerField(default=1)
min_reviewers_is_per_group = models.BooleanField(default=False)
reviewers = models.ManyToManyField("authentik_core.User", blank=True)
notification_transports = models.ManyToManyField(
NotificationTransport,
help_text=_(
"Select which transports should be used to notify the reviewers. If none are "
"selected, the notification will only be shown in the authentik UI."
),
blank=True,
)
class Meta:
indexes = [models.Index(fields=["content_type"])]
unique_together = [["content_type", "object_id"]]
constraints = [
models.UniqueConstraint(
fields=["content_type"],
condition=Q(object_id__isnull=True),
name="uniq_lifecycle_rule_ct_null_object",
)
]
@property
def serializer(self) -> type[BaseSerializer]:
from authentik.enterprise.lifecycle.api.rules import LifecycleRuleSerializer
return LifecycleRuleSerializer
def _get_pk_field(self) -> Field:
model = self.content_type.model_class()
pk = model._meta.pk
while hasattr(pk, "target_field"):
pk = pk.target_field
return pk.__class__()
def get_objects(self) -> QuerySet:
qs = self.content_type.get_all_objects_for_this_type()
if self.object_id:
qs = qs.filter(pk=self.object_id)
else:
qs = qs.exclude(
pk__in=LifecycleRule.objects.filter(
content_type=self.content_type, object_id__isnull=False
).values_list(Cast("object_id", output_field=self._get_pk_field()), flat=True)
)
return qs
def _get_stale_iterations(self) -> QuerySet[LifecycleIteration]:
filter = ~Q(content_type=self.content_type)
if self.object_id:
filter = filter | ~Q(object_id=self.object_id)
filter = Q(state__in=(ReviewState.PENDING, ReviewState.OVERDUE)) & filter
return self.lifecycleiteration_set.filter(filter)
def _get_newly_overdue_iterations(self) -> QuerySet[LifecycleIteration]:
return self.lifecycleiteration_set.filter(
opened_on__lt=start_of_day(
timezone.now() + timedelta(days=1) - timedelta_from_string(self.grace_period)
),
state=ReviewState.PENDING,
)
def _get_newly_due_objects(self) -> QuerySet:
recent_iteration_ids = LifecycleIteration.objects.filter(
content_type=self.content_type,
object_id__isnull=False,
opened_on__gte=start_of_day(
timezone.now() + timedelta(days=1) - timedelta_from_string(self.interval)
),
).values_list(Cast("object_id", output_field=self._get_pk_field()), flat=True)
return self.get_objects().exclude(pk__in=recent_iteration_ids)
def apply(self):
self._get_stale_iterations().update(state=ReviewState.CANCELED)
for iteration in self._get_newly_overdue_iterations():
iteration.make_overdue()
for obj in self._get_newly_due_objects():
LifecycleIteration.start(content_type=self.content_type, object_id=obj.pk, rule=self)
def is_satisfied_for_iteration(self, iteration: LifecycleIteration) -> bool:
reviewers = self.reviewers.all()
if (
iteration.review_set.filter(reviewer__in=reviewers).distinct("reviewer").count()
< reviewers.count()
):
return False
if self.reviewer_groups.count() == 0:
return True
if self.min_reviewers_is_per_group:
for g in self.reviewer_groups.all():
if (
iteration.review_set.filter(
reviewer__groups__in=Group.objects.filter(pk=g.pk).with_descendants()
)
.distinct()
.count()
< self.min_reviewers
):
return False
return True
else:
return (
iteration.review_set.filter(
reviewer__groups__in=self.reviewer_groups.all().with_descendants()
)
.distinct()
.count()
>= self.min_reviewers
)
def get_reviewers(self) -> QuerySet[User]:
return User.objects.filter(
Q(id__in=self.reviewers.all().values_list("pk", flat=True))
| Q(groups__in=self.reviewer_groups.all().with_descendants())
).distinct()
def notify_reviewers(self, event: Event, severity: str):
from authentik.enterprise.lifecycle.tasks import send_notification
for transport in self.notification_transports.all():
for user in self.get_reviewers():
send_notification.send_with_options(
args=(transport.pk, event.pk, user.pk, severity),
rel_obj=transport,
)
if transport.send_once:
break
class ReviewState(models.TextChoices):
REVIEWED = "REVIEWED", _("Reviewed")
PENDING = "PENDING", _("Pending")
OVERDUE = "OVERDUE", _("Overdue")
CANCELED = "CANCELED", _("Canceled")
class LifecycleIteration(SerializerModel, ManagedModel):
id = models.UUIDField(primary_key=True, default=uuid4)
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
object_id = models.TextField(null=False)
object = GenericForeignKey("content_type", "object_id")
rule = models.ForeignKey(LifecycleRule, null=True, on_delete=models.SET_NULL)
state = models.CharField(max_length=10, choices=ReviewState, default=ReviewState.PENDING)
opened_on = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [models.Index(fields=["content_type", "opened_on"])]
@property
def serializer(self) -> type[BaseSerializer]:
from authentik.enterprise.lifecycle.api.iterations import LifecycleIterationSerializer
return LifecycleIterationSerializer
def _get_model_name(self) -> str:
return self.content_type.name.lower()
def _get_event_args(self) -> dict:
return {
"target": self.object,
"hyperlink": link_for_model(self.object),
"hyperlink_label": _(f"Go to {self._get_model_name()}"),
"lifecycle_iteration": self.id,
}
def initialize(self):
event = Event.new(
EventAction.REVIEW_INITIATED,
message=_(f"Access review is due for {self.content_type.name} {str(self.object)}"),
**self._get_event_args(),
)
event.save()
self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
def make_overdue(self):
self.state = ReviewState.OVERDUE
event = Event.new(
EventAction.REVIEW_OVERDUE,
message=_(f"Access review is overdue for {self.content_type.name} {str(self.object)}"),
**self._get_event_args(),
)
event.save()
self.rule.notify_reviewers(event, NotificationSeverity.ALERT)
self.save()
@staticmethod
def start(content_type: ContentType, object_id: str, rule: LifecycleRule) -> LifecycleIteration:
iteration = LifecycleIteration.objects.create(
content_type=content_type, object_id=object_id, rule=rule
)
iteration.initialize()
return iteration
def make_reviewed(self, request: HttpRequest):
self.state = ReviewState.REVIEWED
event = Event.new(
EventAction.REVIEW_COMPLETED,
message=_(f"Access review completed for {self.content_type.name} {str(self.object)}"),
**self._get_event_args(),
).from_http(request)
event.save()
self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
self.save()
def on_review(self, request: HttpRequest):
if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
raise AssertionError("Review is not pending or overdue")
if self.rule.is_satisfied_for_iteration(self):
self.make_reviewed(request)
def user_can_review(self, user: User) -> bool:
if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
return False
if self.review_set.filter(reviewer=user).exists():
return False
groups = self.rule.reviewer_groups.all()
if groups:
for group in groups:
if group.is_member(user):
return True
return False
else:
return user in self.rule.get_reviewers()
class Review(SerializerModel):
id = models.UUIDField(primary_key=True, default=uuid4)
iteration = models.ForeignKey(LifecycleIteration, on_delete=models.CASCADE)
reviewer = models.ForeignKey("authentik_core.User", on_delete=models.CASCADE)
timestamp = models.DateTimeField(auto_now_add=True)
note = models.TextField(null=True)
class Meta:
unique_together = [["iteration", "reviewer"]]
@property
def serializer(self) -> type[BaseSerializer]:
from authentik.enterprise.lifecycle.api.reviews import ReviewSerializer
return ReviewSerializer

View File

@@ -1,22 +0,0 @@
from django.db.models import Q
from django.db.models.signals import post_save, pre_delete
from django.dispatch import receiver
from authentik.enterprise.lifecycle.models import LifecycleRule, ReviewState
@receiver(post_save, sender=LifecycleRule)
def post_rule_save(sender, instance: LifecycleRule, created: bool, **_):
from authentik.enterprise.lifecycle.tasks import apply_lifecycle_rule
apply_lifecycle_rule.send_with_options(
args=(instance.id,),
rel_obj=instance,
)
@receiver(pre_delete, sender=LifecycleRule)
def pre_rule_delete(sender, instance: LifecycleRule, **_):
instance.lifecycleiteration_set.filter(
Q(state=ReviewState.PENDING) | Q(state=ReviewState.OVERDUE)
).update(state=ReviewState.CANCELED)

View File

@@ -1,45 +0,0 @@
from django.utils.translation import gettext_lazy as _
from dramatiq import actor
from authentik.core.models import User
from authentik.enterprise.lifecycle.models import LifecycleRule
from authentik.events.models import Event, Notification, NotificationTransport
@actor(description=_("Dispatch tasks to validate lifecycle rules."))
def apply_lifecycle_rules():
for rule in LifecycleRule.objects.all():
apply_lifecycle_rule.send_with_options(
args=(rule.id,),
rel_obj=rule,
)
@actor(description=_("Apply lifecycle rule."))
def apply_lifecycle_rule(rule_id: str):
rule = LifecycleRule.objects.filter(pk=rule_id).first()
if rule:
rule.apply()
@actor(description=_("Send lifecycle rule notification."))
def send_notification(transport_pk: int, event_pk: str, user_pk: int, severity: str):
event = Event.objects.filter(pk=event_pk).first()
if not event:
return
user = User.objects.filter(pk=user_pk).first()
if not user:
return
notification = Notification(
severity=severity,
body=event.summary,
event=event,
user=user,
hyperlink=event.hyperlink,
hyperlink_label=event.hyperlink_label,
)
transport = NotificationTransport.objects.filter(pk=transport_pk).first()
if not transport:
return
transport.send(notification)

View File

@@ -1,425 +0,0 @@
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import Application, Group
from authentik.core.tests.utils import create_test_admin_user, create_test_user
from authentik.enterprise.lifecycle.models import LifecycleIteration, LifecycleRule, ReviewState
from authentik.enterprise.reports.tests.utils import patch_license
from authentik.lib.generators import generate_id
@patch_license
class TestLifecycleRuleAPI(APITestCase):
def setUp(self):
self.user = create_test_admin_user()
self.client.force_login(self.user)
self.app = Application.objects.create(name=generate_id(), slug=generate_id())
self.content_type = ContentType.objects.get_for_model(Application)
self.reviewer_group = Group.objects.create(name=generate_id())
def test_list_rules(self):
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
)
rule.reviewer_groups.add(self.reviewer_group)
response = self.client.get(reverse("authentik_api:lifecyclerule-list"))
self.assertEqual(response.status_code, 200)
self.assertGreaterEqual(len(response.data["results"]), 1)
def test_create_rule_with_reviewer_group(self):
response = self.client.post(
reverse("authentik_api:lifecyclerule-list"),
{
"name": generate_id(),
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
"object_id": str(self.app.pk),
"interval": "days=30",
"grace_period": "days=10",
"reviewer_groups": [str(self.reviewer_group.pk)],
"reviewers": [],
"min_reviewers": 1,
},
)
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data["object_id"], str(self.app.pk))
self.assertEqual(response.data["interval"], "days=30")
def test_create_rule_with_explicit_reviewer(self):
reviewer = create_test_user()
response = self.client.post(
reverse("authentik_api:lifecyclerule-list"),
{
"name": generate_id(),
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
"object_id": str(self.app.pk),
"interval": "days=60",
"grace_period": "days=15",
"reviewer_groups": [],
"reviewers": [str(reviewer.uuid)],
"min_reviewers": 1,
},
)
self.assertEqual(response.status_code, 201)
self.assertIn(reviewer.uuid, response.data["reviewers"])
def test_create_rule_type_level(self):
response = self.client.post(
reverse("authentik_api:lifecyclerule-list"),
{
"name": generate_id(),
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
"object_id": None,
"interval": "days=90",
"grace_period": "days=30",
"reviewer_groups": [str(self.reviewer_group.pk)],
"reviewers": [],
"min_reviewers": 1,
},
)
self.assertEqual(response.status_code, 201)
self.assertIsNone(response.data["object_id"])
def test_create_rule_fails_without_reviewers(self):
response = self.client.post(
reverse("authentik_api:lifecyclerule-list"),
{
"name": generate_id(),
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
"object_id": str(self.app.pk),
"interval": "days=30",
"grace_period": "days=10",
"reviewer_groups": [],
"reviewers": [],
"min_reviewers": 1,
},
)
self.assertEqual(response.status_code, 400)
def test_create_rule_fails_grace_period_longer_than_interval(self):
response = self.client.post(
reverse("authentik_api:lifecyclerule-list"),
{
"name": generate_id(),
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
"object_id": str(self.app.pk),
"interval": "days=10",
"grace_period": "days=30",
"reviewer_groups": [str(self.reviewer_group.pk)],
"reviewers": [],
"min_reviewers": 1,
},
)
self.assertEqual(response.status_code, 400)
self.assertIn("grace_period", response.data)
def test_create_rule_fails_invalid_object_id(self):
response = self.client.post(
reverse("authentik_api:lifecyclerule-list"),
{
"name": generate_id(),
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
"object_id": "00000000-0000-0000-0000-000000000000",
"interval": "days=30",
"grace_period": "days=10",
"reviewer_groups": [str(self.reviewer_group.pk)],
"reviewers": [],
"min_reviewers": 1,
},
)
self.assertEqual(response.status_code, 400)
self.assertIn("object_id", response.data)
def test_retrieve_rule(self):
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
)
rule.reviewer_groups.add(self.reviewer_group)
response = self.client.get(
reverse("authentik_api:lifecyclerule-detail", kwargs={"pk": rule.pk})
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["id"], str(rule.pk))
def test_update_rule(self):
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
interval="days=30",
)
rule.reviewer_groups.add(self.reviewer_group)
response = self.client.patch(
reverse("authentik_api:lifecyclerule-detail", kwargs={"pk": rule.pk}),
{"interval": "days=60"},
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["interval"], "days=60")
def test_delete_rule(self):
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
)
rule.reviewer_groups.add(self.reviewer_group)
response = self.client.delete(
reverse("authentik_api:lifecyclerule-detail", kwargs={"pk": rule.pk})
)
self.assertEqual(response.status_code, 204)
self.assertFalse(LifecycleRule.objects.filter(pk=rule.pk).exists())
@patch_license
class TestIterationAPI(APITestCase):
def setUp(self):
self.user = create_test_admin_user()
self.client.force_login(self.user)
self.app = Application.objects.create(name=generate_id(), slug=generate_id())
self.content_type = ContentType.objects.get_for_model(Application)
self.reviewer_group = Group.objects.create(name=generate_id())
self.reviewer_group.users.add(self.user)
def test_open_iterations(self):
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
)
rule.reviewer_groups.add(self.reviewer_group)
response = self.client.get(reverse("authentik_api:lifecycleiteration-open-iterations"))
self.assertEqual(response.status_code, 200)
self.assertGreaterEqual(len(response.data["results"]), 1)
for iteration in response.data["results"]:
self.assertEqual(iteration["state"], ReviewState.PENDING)
def test_open_iterations_filter_user_is_reviewer(self):
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
)
rule.reviewer_groups.add(self.reviewer_group)
response = self.client.get(
reverse("authentik_api:lifecycleiteration-open-iterations"),
{"user_is_reviewer": "true"},
)
self.assertEqual(response.status_code, 200)
# User is in reviewer_group, so should see the iteration
self.assertGreaterEqual(len(response.data["results"]), 1)
def test_latest_iteration(self):
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
)
rule.reviewer_groups.add(self.reviewer_group)
response = self.client.get(
reverse(
"authentik_api:lifecycleiteration-latest-iteration",
kwargs={
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
"object_id": str(self.app.pk),
},
)
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["object_id"], str(self.app.pk))
def test_latest_iteration_not_found(self):
response = self.client.get(
reverse(
"authentik_api:lifecycleiteration-latest-iteration",
kwargs={
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
"object_id": "00000000-0000-0000-0000-000000000000",
},
)
)
self.assertEqual(response.status_code, 404)
def test_iteration_includes_user_can_review(self):
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
)
rule.reviewer_groups.add(self.reviewer_group)
response = self.client.get(reverse("authentik_api:lifecycleiteration-open-iterations"))
self.assertEqual(response.status_code, 200)
self.assertGreaterEqual(len(response.data["results"]), 1)
# user_can_review should be present
self.assertIn("user_can_review", response.data["results"][0])
@patch_license
class TestReviewAPI(APITestCase):
def setUp(self):
self.user = create_test_admin_user()
self.client.force_login(self.user)
self.app = Application.objects.create(name=generate_id(), slug=generate_id())
self.content_type = ContentType.objects.get_for_model(Application)
self.reviewer_group = Group.objects.create(name=generate_id())
self.reviewer_group.users.add(self.user)
def test_create_review(self):
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
min_reviewers=1,
)
rule.reviewer_groups.add(self.reviewer_group)
# Get the auto-created iteration
iteration = LifecycleIteration.objects.get(
content_type=self.content_type, object_id=str(self.app.pk), rule=rule
)
response = self.client.post(
reverse("authentik_api:review-list"),
{
"iteration": str(iteration.pk),
"note": "Reviewed and approved",
},
)
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data["iteration"], iteration.pk)
self.assertEqual(response.data["note"], "Reviewed and approved")
self.assertEqual(response.data["reviewer"]["pk"], self.user.pk)
def test_create_review_completes_iteration(self):
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
min_reviewers=1,
)
rule.reviewer_groups.add(self.reviewer_group)
iteration = LifecycleIteration.objects.get(
content_type=self.content_type, object_id=str(self.app.pk), rule=rule
)
self.assertEqual(iteration.state, ReviewState.PENDING)
response = self.client.post(
reverse("authentik_api:review-list"),
{
"iteration": str(iteration.pk),
},
)
self.assertEqual(response.status_code, 201)
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.REVIEWED)
def test_create_review_sets_reviewer_from_request(self):
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
min_reviewers=1,
)
rule.reviewer_groups.add(self.reviewer_group)
iteration = LifecycleIteration.objects.get(
content_type=self.content_type, object_id=str(self.app.pk), rule=rule
)
response = self.client.post(
reverse("authentik_api:review-list"),
{
"iteration": str(iteration.pk),
},
)
self.assertEqual(response.status_code, 201)
# Reviewer should be the logged-in user
self.assertEqual(response.data["reviewer"]["pk"], self.user.pk)
def test_non_reviewer_cannot_review(self):
other_group = Group.objects.create(name=generate_id())
other_user = create_test_user()
other_group.users.add(other_user)
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
min_reviewers=1,
)
rule.reviewer_groups.add(other_group)
iteration = LifecycleIteration.objects.get(
content_type=self.content_type, object_id=str(self.app.pk), rule=rule
)
# Current user is not in the reviewer group
self.assertFalse(iteration.user_can_review(self.user))
def test_non_reviewer_review_via_api_rejected(self):
other_group = Group.objects.create(name=generate_id())
other_user = create_test_user()
other_group.users.add(other_user)
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
min_reviewers=1,
)
rule.reviewer_groups.add(other_group)
iteration = LifecycleIteration.objects.get(
content_type=self.content_type, object_id=str(self.app.pk), rule=rule
)
# Current user (self.user) is NOT in the reviewer group
response = self.client.post(
reverse("authentik_api:review-list"),
{"iteration": str(iteration.pk)},
)
self.assertEqual(response.status_code, 400)
def test_duplicate_review_via_api_rejected(self):
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=self.content_type,
object_id=str(self.app.pk),
min_reviewers=2,
)
rule.reviewer_groups.add(self.reviewer_group)
iteration = LifecycleIteration.objects.get(
content_type=self.content_type, object_id=str(self.app.pk), rule=rule
)
# First review should succeed
response = self.client.post(
reverse("authentik_api:review-list"),
{"iteration": str(iteration.pk)},
)
self.assertEqual(response.status_code, 201)
# Second review by same user should be rejected
response = self.client.post(
reverse("authentik_api:review-list"),
{"iteration": str(iteration.pk)},
)
self.assertEqual(response.status_code, 400)

View File

@@ -1,845 +0,0 @@
import datetime as dt
from datetime import timedelta
from unittest.mock import patch
from django.contrib.contenttypes.models import ContentType
from django.test import RequestFactory, TestCase
from django.utils import timezone
from authentik.core.models import Application, Group
from authentik.core.tests.utils import create_test_user
from authentik.enterprise.lifecycle.models import (
LifecycleIteration,
LifecycleRule,
Review,
ReviewState,
)
from authentik.events.models import (
Event,
EventAction,
NotificationSeverity,
NotificationTransport,
)
from authentik.lib.generators import generate_id
from authentik.rbac.models import Role
class TestLifecycleModels(TestCase):
def setUp(self):
self.factory = RequestFactory()
def _get_request(self):
return self.factory.get("/")
def _create_object(self, model):
if model is Application:
return Application.objects.create(name=generate_id(), slug=generate_id())
if model is Role:
return Role.objects.create(name=generate_id())
if model is Group:
return Group.objects.create(name=generate_id())
raise AssertionError(f"Unsupported model {model}")
def _create_rule_for_object(self, obj, **kwargs) -> LifecycleRule:
content_type = ContentType.objects.get_for_model(obj)
return LifecycleRule.objects.create(
name=generate_id(),
content_type=content_type,
object_id=str(obj.pk),
**kwargs,
)
def _create_rule_for_type(self, model, **kwargs) -> LifecycleRule:
content_type = ContentType.objects.get_for_model(model)
return LifecycleRule.objects.create(
name=generate_id(),
content_type=content_type,
object_id=None,
**kwargs,
)
def test_iteration_start_supported_objects(self):
"""Ensure iterations are automatically started for applications, roles, and groups."""
for model in (Application, Role, Group):
with self.subTest(model=model.__name__):
obj = self._create_object(model)
content_type = ContentType.objects.get_for_model(obj)
before_events = Event.objects.filter(action=EventAction.REVIEW_INITIATED).count()
rule = self._create_rule_for_object(obj)
# Verify iteration was created automatically
iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(obj.pk), rule=rule
)
self.assertEqual(iteration.state, ReviewState.PENDING)
self.assertEqual(iteration.object, obj)
self.assertEqual(iteration.rule, rule)
self.assertEqual(
Event.objects.filter(action=EventAction.REVIEW_INITIATED).count(),
before_events + 1,
)
def test_review_requires_all_explicit_reviewers(self):
obj = Group.objects.create(name=generate_id())
rule = self._create_rule_for_object(obj)
reviewer_one = create_test_user()
reviewer_two = create_test_user()
rule.reviewers.add(reviewer_one, reviewer_two)
content_type = ContentType.objects.get_for_model(obj)
iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(obj.pk), rule=rule
)
request = self._get_request()
Review.objects.create(iteration=iteration, reviewer=reviewer_one)
iteration.on_review(request)
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.PENDING)
Review.objects.create(iteration=iteration, reviewer=reviewer_two)
iteration.on_review(request)
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.REVIEWED)
self.assertTrue(Event.objects.filter(action=EventAction.REVIEW_COMPLETED).exists())
def test_review_min_reviewers_from_groups(self):
"""Group-based reviews complete once the minimum number of reviewers review."""
obj = Application.objects.create(name=generate_id(), slug=generate_id())
rule = self._create_rule_for_object(obj, min_reviewers=2)
reviewer_group = Group.objects.create(name=generate_id())
reviewer_one = create_test_user()
reviewer_two = create_test_user()
reviewer_group.users.add(reviewer_one, reviewer_two)
rule.reviewer_groups.add(reviewer_group)
content_type = ContentType.objects.get_for_model(obj)
iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(obj.pk), rule=rule
)
request = self._get_request()
Review.objects.create(iteration=iteration, reviewer=reviewer_one)
iteration.on_review(request)
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.PENDING)
Review.objects.create(iteration=iteration, reviewer=reviewer_two)
iteration.on_review(request)
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.REVIEWED)
def test_review_explicit_and_group_reviewers(self):
"""Reviews require both explicit reviewers AND min_reviewers from groups."""
obj = Application.objects.create(name=generate_id(), slug=generate_id())
rule = self._create_rule_for_object(obj, min_reviewers=1)
reviewer_group = Group.objects.create(name=generate_id())
group_member = create_test_user()
reviewer_group.users.add(group_member)
rule.reviewer_groups.add(reviewer_group)
explicit_reviewer = create_test_user()
rule.reviewers.add(explicit_reviewer)
content_type = ContentType.objects.get_for_model(obj)
iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(obj.pk), rule=rule
)
request = self._get_request()
# Only group member reviews - not satisfied (explicit reviewer missing)
Review.objects.create(iteration=iteration, reviewer=group_member)
iteration.on_review(request)
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.PENDING)
# Explicit reviewer reviews - now satisfied
Review.objects.create(iteration=iteration, reviewer=explicit_reviewer)
iteration.on_review(request)
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.REVIEWED)
def test_review_min_reviewers_per_group(self):
obj = Application.objects.create(name=generate_id(), slug=generate_id())
rule = self._create_rule_for_object(obj, min_reviewers=1, min_reviewers_is_per_group=True)
group_one = Group.objects.create(name=generate_id())
group_two = Group.objects.create(name=generate_id())
member_group_one = create_test_user()
member_group_two = create_test_user()
group_one.users.add(member_group_one)
group_two.users.add(member_group_two)
rule.reviewer_groups.add(group_one, group_two)
content_type = ContentType.objects.get_for_model(obj)
iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(obj.pk), rule=rule
)
request = self._get_request()
# Only member from group_one reviews - not satisfied (need member from each group)
Review.objects.create(iteration=iteration, reviewer=member_group_one)
iteration.on_review(request)
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.PENDING)
# Member from group_two reviews - now satisfied
Review.objects.create(iteration=iteration, reviewer=member_group_two)
iteration.on_review(request)
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.REVIEWED)
def test_review_reviewers_from_child_groups(self):
obj = Application.objects.create(name=generate_id(), slug=generate_id())
rule = self._create_rule_for_object(obj, min_reviewers=1)
parent_group = Group.objects.create(name=generate_id())
child_group = Group.objects.create(name=generate_id())
child_group.parents.add(parent_group)
child_member = create_test_user()
child_group.users.add(child_member)
rule.reviewer_groups.add(parent_group)
content_type = ContentType.objects.get_for_model(obj)
iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(obj.pk), rule=rule
)
request = self._get_request()
# Child group member should be able to review
self.assertTrue(iteration.user_can_review(child_member))
Review.objects.create(iteration=iteration, reviewer=child_member)
iteration.on_review(request)
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.REVIEWED)
def test_review_reviewers_from_nested_child_groups(self):
obj = Application.objects.create(name=generate_id(), slug=generate_id())
rule = self._create_rule_for_object(obj, min_reviewers=2)
grandparent = Group.objects.create(name=generate_id())
parent = Group.objects.create(name=generate_id())
child = Group.objects.create(name=generate_id())
parent.parents.add(grandparent)
child.parents.add(parent)
parent_member = create_test_user()
child_member = create_test_user()
parent.users.add(parent_member)
child.users.add(child_member)
rule.reviewer_groups.add(grandparent)
content_type = ContentType.objects.get_for_model(obj)
iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(obj.pk), rule=rule
)
request = self._get_request()
# Both nested members should be able to review
self.assertTrue(iteration.user_can_review(parent_member))
self.assertTrue(iteration.user_can_review(child_member))
Review.objects.create(iteration=iteration, reviewer=parent_member)
iteration.on_review(request)
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.PENDING)
Review.objects.create(iteration=iteration, reviewer=child_member)
iteration.on_review(request)
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.REVIEWED)
def test_notify_reviewers_send_once(self):
obj = Group.objects.create(name=generate_id())
rule = self._create_rule_for_object(obj)
reviewer_one = create_test_user()
reviewer_two = create_test_user()
rule.reviewers.add(reviewer_one, reviewer_two)
transport_once = NotificationTransport.objects.create(
name=generate_id(),
send_once=True,
)
transport_all = NotificationTransport.objects.create(
name=generate_id(),
send_once=False,
)
rule.notification_transports.add(transport_once, transport_all)
event = Event.new(EventAction.REVIEW_INITIATED, target=obj)
event.save()
with patch(
"authentik.enterprise.lifecycle.tasks.send_notification.send_with_options"
) as send_with_options:
rule.notify_reviewers(event, NotificationSeverity.NOTICE)
reviewer_pks = {reviewer_one.pk, reviewer_two.pk}
self.assertEqual(send_with_options.call_count, len(reviewer_pks) + 1)
calls = [call.kwargs["args"] for call in send_with_options.call_args_list]
once_calls = [args for args in calls if args[0] == transport_once.pk]
all_calls = [args for args in calls if args[0] == transport_all.pk]
self.assertEqual(len(once_calls), 1)
self.assertEqual(len(all_calls), len(reviewer_pks))
self.assertIn(once_calls[0][2], reviewer_pks)
self.assertEqual({args[2] for args in all_calls}, reviewer_pks)
def test_apply_marks_overdue_and_opens_due_reviews(self):
app_one = Application.objects.create(name=generate_id(), slug=generate_id())
app_two = Application.objects.create(name=generate_id(), slug=generate_id())
content_type = ContentType.objects.get_for_model(Application)
rule_overdue = LifecycleRule.objects.create(
name=generate_id(),
content_type=content_type,
object_id=str(app_one.pk),
interval="days=365",
grace_period="days=10",
)
# Get the automatically created iteration and backdate it past the grace period
iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(app_one.pk), rule=rule_overdue
)
LifecycleIteration.objects.filter(pk=iteration.pk).update(
opened_on=(timezone.now() - timedelta(days=20))
)
# Apply again to trigger overdue logic
rule_overdue.apply()
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.OVERDUE)
self.assertEqual(
LifecycleIteration.objects.filter(
content_type=content_type, object_id=str(app_one.pk)
).count(),
1,
)
LifecycleRule.objects.create(
name=generate_id(),
content_type=content_type,
object_id=str(app_two.pk),
interval="days=30",
grace_period="days=10",
)
self.assertEqual(
LifecycleIteration.objects.filter(
content_type=content_type, object_id=str(app_two.pk)
).count(),
1,
)
new_iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(app_two.pk)
)
self.assertEqual(new_iteration.state, ReviewState.PENDING)
def test_apply_idempotent(self):
app_due = Application.objects.create(name=generate_id(), slug=generate_id())
app_overdue = Application.objects.create(name=generate_id(), slug=generate_id())
content_type = ContentType.objects.get_for_model(Application)
initiated_before = Event.objects.filter(action=EventAction.REVIEW_INITIATED).count()
overdue_before = Event.objects.filter(action=EventAction.REVIEW_OVERDUE).count()
rule_due = LifecycleRule.objects.create(
name=generate_id(),
content_type=content_type,
object_id=str(app_due.pk),
interval="days=30",
grace_period="days=30",
)
reviewer = create_test_user()
rule_due.reviewers.add(reviewer)
transport = NotificationTransport.objects.create(name=generate_id())
rule_due.notification_transports.add(transport)
rule_overdue = LifecycleRule.objects.create(
name=generate_id(),
content_type=content_type,
object_id=str(app_overdue.pk),
interval="days=365",
grace_period="days=10",
)
overdue_iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(app_overdue.pk), rule=rule_overdue
)
LifecycleIteration.objects.filter(pk=overdue_iteration.pk).update(
opened_on=(timezone.now() - timedelta(days=20))
)
# Apply overdue rule to mark iteration as overdue
rule_overdue.apply()
due_iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(app_due.pk)
)
overdue_iteration.refresh_from_db()
self.assertEqual(due_iteration.state, ReviewState.PENDING)
self.assertEqual(overdue_iteration.state, ReviewState.OVERDUE)
initiated_after_first = Event.objects.filter(action=EventAction.REVIEW_INITIATED).count()
overdue_after_first = Event.objects.filter(action=EventAction.REVIEW_OVERDUE).count()
# Both rules created iterations on save
self.assertEqual(initiated_after_first, initiated_before + 2)
self.assertEqual(overdue_after_first, overdue_before + 1)
# Apply again - should be idempotent
rule_due.apply()
rule_overdue.apply()
due_iteration.refresh_from_db()
overdue_iteration.refresh_from_db()
self.assertEqual(due_iteration.state, ReviewState.PENDING)
self.assertEqual(overdue_iteration.state, ReviewState.OVERDUE)
self.assertEqual(
Event.objects.filter(action=EventAction.REVIEW_INITIATED).count(),
initiated_after_first,
)
self.assertEqual(
Event.objects.filter(action=EventAction.REVIEW_OVERDUE).count(),
overdue_after_first,
)
def test_rule_matches_entire_type(self):
"""A rule with object_id=None matches all objects of that type."""
app_one = Application.objects.create(name=generate_id(), slug=generate_id())
app_two = Application.objects.create(name=generate_id(), slug=generate_id())
content_type = ContentType.objects.get_for_model(Application)
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=content_type,
object_id=None,
interval="days=30",
grace_period="days=10",
)
objects = list(rule.get_objects())
self.assertIn(app_one, objects)
self.assertIn(app_two, objects)
def test_rule_type_excludes_objects_with_specific_rules(self):
app_with_rule = Application.objects.create(name=generate_id(), slug=generate_id())
app_without_rule = Application.objects.create(name=generate_id(), slug=generate_id())
content_type = ContentType.objects.get_for_model(Application)
# Create a specific rule for app_with_rule
LifecycleRule.objects.create(
name=generate_id(),
content_type=content_type,
object_id=str(app_with_rule.pk),
interval="days=30",
)
# Create a type-level rule
type_rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=content_type,
object_id=None,
interval="days=60",
)
objects = list(type_rule.get_objects())
self.assertNotIn(app_with_rule, objects)
self.assertIn(app_without_rule, objects)
def test_rule_type_apply_creates_iterations_for_all_objects(self):
app_one = Application.objects.create(name=generate_id(), slug=generate_id())
app_two = Application.objects.create(name=generate_id(), slug=generate_id())
content_type = ContentType.objects.get_for_model(Application)
LifecycleRule.objects.create(
name=generate_id(),
content_type=content_type,
object_id=None,
interval="days=30",
grace_period="days=10",
)
self.assertTrue(
LifecycleIteration.objects.filter(
content_type=content_type, object_id=str(app_one.pk)
).exists()
)
self.assertTrue(
LifecycleIteration.objects.filter(
content_type=content_type, object_id=str(app_two.pk)
).exists()
)
def test_delete_rule_cancels_open_iterations(self):
obj = Application.objects.create(name=generate_id(), slug=generate_id())
rule = self._create_rule_for_object(obj)
content_type = ContentType.objects.get_for_model(obj)
pending_iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(obj.pk), rule=rule
)
self.assertEqual(pending_iteration.state, ReviewState.PENDING)
overdue_iteration = LifecycleIteration.objects.create(
content_type=content_type,
object_id=str(obj.pk),
rule=rule,
state=ReviewState.OVERDUE,
)
reviewed_iteration = LifecycleIteration.objects.create(
content_type=content_type,
object_id=str(obj.pk),
rule=rule,
state=ReviewState.REVIEWED,
)
rule.delete()
pending_iteration.refresh_from_db()
overdue_iteration.refresh_from_db()
reviewed_iteration.refresh_from_db()
self.assertEqual(pending_iteration.state, ReviewState.CANCELED)
self.assertEqual(overdue_iteration.state, ReviewState.CANCELED)
self.assertEqual(reviewed_iteration.state, ReviewState.REVIEWED) # Not affected
def test_update_rule_target_cancels_stale_iterations(self):
app_one = Application.objects.create(name=generate_id(), slug=generate_id())
app_two = Application.objects.create(name=generate_id(), slug=generate_id())
content_type = ContentType.objects.get_for_model(Application)
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=content_type,
object_id=str(app_one.pk),
interval="days=30",
)
iteration_for_app_one = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(app_one.pk), rule=rule
)
self.assertEqual(iteration_for_app_one.state, ReviewState.PENDING)
# Change rule target to app_two - save() triggers apply() which cancels stale iterations
rule.object_id = str(app_two.pk)
rule.save()
iteration_for_app_one.refresh_from_db()
self.assertEqual(iteration_for_app_one.state, ReviewState.CANCELED)
def test_update_rule_content_type_cancels_stale_iterations(self):
app = Application.objects.create(name=generate_id(), slug=generate_id())
group = Group.objects.create(name=generate_id())
app_content_type = ContentType.objects.get_for_model(Application)
group_content_type = ContentType.objects.get_for_model(Group)
# Creating rule triggers automatic apply() which creates a iteration for app
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=app_content_type,
object_id=str(app.pk),
interval="days=30",
)
iteration = LifecycleIteration.objects.get(
content_type=app_content_type, object_id=str(app.pk), rule=rule
)
self.assertEqual(iteration.state, ReviewState.PENDING)
# Change content type to Group - save() triggers apply() which cancels stale iterations
rule.content_type = group_content_type
rule.object_id = str(group.pk)
rule.save()
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.CANCELED)
def test_user_can_review_checks_group_hierarchy(self):
obj = Application.objects.create(name=generate_id(), slug=generate_id())
rule = self._create_rule_for_object(obj)
parent_group = Group.objects.create(name=generate_id())
child_group = Group.objects.create(name=generate_id())
child_group.parents.add(parent_group)
parent_member = create_test_user()
child_member = create_test_user()
non_member = create_test_user()
parent_group.users.add(parent_member)
child_group.users.add(child_member)
rule.reviewer_groups.add(parent_group)
content_type = ContentType.objects.get_for_model(obj)
# iteration is created automatically when rule is saved
iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(obj.pk), rule=rule
)
self.assertTrue(iteration.user_can_review(parent_member))
self.assertTrue(iteration.user_can_review(child_member))
self.assertFalse(iteration.user_can_review(non_member))
def test_user_cannot_review_twice(self):
obj = Application.objects.create(name=generate_id(), slug=generate_id())
rule = self._create_rule_for_object(obj)
reviewer = create_test_user()
rule.reviewers.add(reviewer)
content_type = ContentType.objects.get_for_model(obj)
# iteration is created automatically when rule is saved
iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(obj.pk), rule=rule
)
self.assertTrue(iteration.user_can_review(reviewer))
Review.objects.create(iteration=iteration, reviewer=reviewer)
self.assertFalse(iteration.user_can_review(reviewer))
def test_user_cannot_review_completed_iteration(self):
obj = Application.objects.create(name=generate_id(), slug=generate_id())
rule = self._create_rule_for_object(obj)
reviewer = create_test_user()
rule.reviewers.add(reviewer)
content_type = ContentType.objects.get_for_model(obj)
# Get the automatically created pending iteration and test with different states
iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(obj.pk), rule=rule
)
for state in (ReviewState.REVIEWED, ReviewState.CANCELED):
iteration.state = state
iteration.save()
self.assertFalse(iteration.user_can_review(reviewer))
def test_get_reviewers_includes_child_group_members(self):
obj = Application.objects.create(name=generate_id(), slug=generate_id())
rule = self._create_rule_for_object(obj)
parent_group = Group.objects.create(name=generate_id())
child_group = Group.objects.create(name=generate_id())
child_group.parents.add(parent_group)
parent_member = create_test_user()
child_member = create_test_user()
parent_group.users.add(parent_member)
child_group.users.add(child_member)
rule.reviewer_groups.add(parent_group)
reviewers = list(rule.get_reviewers())
self.assertIn(parent_member, reviewers)
self.assertIn(child_member, reviewers)
def test_get_reviewers_includes_explicit_reviewers(self):
obj = Application.objects.create(name=generate_id(), slug=generate_id())
rule = self._create_rule_for_object(obj)
explicit_reviewer = create_test_user()
rule.reviewers.add(explicit_reviewer)
group = Group.objects.create(name=generate_id())
group_member = create_test_user()
group.users.add(group_member)
rule.reviewer_groups.add(group)
reviewers = list(rule.get_reviewers())
self.assertIn(explicit_reviewer, reviewers)
self.assertIn(group_member, reviewers)
class TestLifecycleDateBoundaries(TestCase):
"""Verify that start_of_day normalization ensures correct overdue/due
detection regardless of exact task execution time within a day.
The daily task may run at any point during the day. The start_of_day
normalization in _get_newly_overdue_iterations and _get_newly_due_objects
ensures that the boundary is always at midnight, so millisecond variations
in task execution time do not affect results."""
def _create_rule_and_iteration(self, grace_period="days=1", interval="days=365"):
app = Application.objects.create(name=generate_id(), slug=generate_id())
content_type = ContentType.objects.get_for_model(Application)
rule = LifecycleRule.objects.create(
name=generate_id(),
content_type=content_type,
object_id=str(app.pk),
interval=interval,
grace_period=grace_period,
)
iteration = LifecycleIteration.objects.get(
content_type=content_type, object_id=str(app.pk), rule=rule
)
return app, rule, iteration
def test_overdue_iteration_opened_yesterday(self):
"""grace_period=1 day: iteration opened yesterday at any time is overdue today."""
_, rule, iteration = self._create_rule_and_iteration(grace_period="days=1")
fixed_now = dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC)
for opened_on in [
dt.datetime(2025, 6, 14, 0, 0, 0, tzinfo=dt.UTC),
dt.datetime(2025, 6, 14, 12, 0, 0, tzinfo=dt.UTC),
dt.datetime(2025, 6, 14, 23, 59, 59, 999999, tzinfo=dt.UTC),
]:
with self.subTest(opened_on=opened_on):
LifecycleIteration.objects.filter(pk=iteration.pk).update(
opened_on=opened_on, state=ReviewState.PENDING
)
with patch("django.utils.timezone.now", return_value=fixed_now):
self.assertIn(iteration, list(rule._get_newly_overdue_iterations()))
def test_not_overdue_iteration_opened_today(self):
"""grace_period=1 day: iteration opened today at any time is NOT overdue."""
_, rule, iteration = self._create_rule_and_iteration(grace_period="days=1")
fixed_now = dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC)
for opened_on in [
dt.datetime(2025, 6, 15, 0, 0, 0, tzinfo=dt.UTC),
dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC),
dt.datetime(2025, 6, 15, 23, 59, 59, 999999, tzinfo=dt.UTC),
]:
with self.subTest(opened_on=opened_on):
LifecycleIteration.objects.filter(pk=iteration.pk).update(
opened_on=opened_on, state=ReviewState.PENDING
)
with patch("django.utils.timezone.now", return_value=fixed_now):
self.assertNotIn(iteration, list(rule._get_newly_overdue_iterations()))
def test_overdue_independent_of_task_execution_time(self):
"""Overdue detection gives the same result whether the task runs at 00:00:01 or 23:59:59."""
_, rule, iteration = self._create_rule_and_iteration(grace_period="days=1")
opened_on = dt.datetime(2025, 6, 14, 18, 0, 0, tzinfo=dt.UTC)
LifecycleIteration.objects.filter(pk=iteration.pk).update(
opened_on=opened_on, state=ReviewState.PENDING
)
for task_time in [
dt.datetime(2025, 6, 15, 0, 0, 1, tzinfo=dt.UTC),
dt.datetime(2025, 6, 15, 12, 0, 0, tzinfo=dt.UTC),
dt.datetime(2025, 6, 15, 23, 59, 59, tzinfo=dt.UTC),
]:
with self.subTest(task_time=task_time):
with patch("django.utils.timezone.now", return_value=task_time):
self.assertIn(iteration, list(rule._get_newly_overdue_iterations()))
def test_overdue_boundary_multi_day_grace_period(self):
"""grace_period=30 days: overdue after 30 full days, not after 29."""
_, rule, iteration = self._create_rule_and_iteration(grace_period="days=30")
fixed_now = dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC)
# Opened 30 days ago (May 16), should go overdue
LifecycleIteration.objects.filter(pk=iteration.pk).update(
opened_on=dt.datetime(2025, 5, 16, 12, 0, 0, tzinfo=dt.UTC),
state=ReviewState.PENDING,
)
with patch("django.utils.timezone.now", return_value=fixed_now):
self.assertIn(iteration, list(rule._get_newly_overdue_iterations()))
# Opened 29 days ago (May 17), should NOT go overdue
LifecycleIteration.objects.filter(pk=iteration.pk).update(
opened_on=dt.datetime(2025, 5, 17, 12, 0, 0, tzinfo=dt.UTC),
state=ReviewState.PENDING,
)
with patch("django.utils.timezone.now", return_value=fixed_now):
self.assertNotIn(iteration, list(rule._get_newly_overdue_iterations()))
def test_due_object_iteration_opened_yesterday(self):
"""interval=1 day: object with iteration opened yesterday is due for a new review."""
app, rule, iteration = self._create_rule_and_iteration(interval="days=1")
fixed_now = dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC)
for opened_on in [
dt.datetime(2025, 6, 14, 0, 0, 0, tzinfo=dt.UTC),
dt.datetime(2025, 6, 14, 12, 0, 0, tzinfo=dt.UTC),
dt.datetime(2025, 6, 14, 23, 59, 59, 999999, tzinfo=dt.UTC),
]:
with self.subTest(opened_on=opened_on):
LifecycleIteration.objects.filter(pk=iteration.pk).update(opened_on=opened_on)
with patch("django.utils.timezone.now", return_value=fixed_now):
self.assertIn(app, list(rule._get_newly_due_objects()))
def test_not_due_object_iteration_opened_today(self):
"""interval=1 day: object with iteration opened today is NOT due."""
app, rule, iteration = self._create_rule_and_iteration(interval="days=1")
fixed_now = dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC)
for opened_on in [
dt.datetime(2025, 6, 15, 0, 0, 0, tzinfo=dt.UTC),
dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC),
dt.datetime(2025, 6, 15, 23, 59, 59, 999999, tzinfo=dt.UTC),
]:
with self.subTest(opened_on=opened_on):
LifecycleIteration.objects.filter(pk=iteration.pk).update(opened_on=opened_on)
with patch("django.utils.timezone.now", return_value=fixed_now):
self.assertNotIn(app, list(rule._get_newly_due_objects()))
def test_due_independent_of_task_execution_time(self):
"""Due detection gives the same result whether the task runs at 00:00:01 or 23:59:59."""
app, rule, iteration = self._create_rule_and_iteration(interval="days=1")
opened_on = dt.datetime(2025, 6, 14, 18, 0, 0, tzinfo=dt.UTC)
LifecycleIteration.objects.filter(pk=iteration.pk).update(opened_on=opened_on)
for task_time in [
dt.datetime(2025, 6, 15, 0, 0, 1, tzinfo=dt.UTC),
dt.datetime(2025, 6, 15, 12, 0, 0, tzinfo=dt.UTC),
dt.datetime(2025, 6, 15, 23, 59, 59, tzinfo=dt.UTC),
]:
with self.subTest(task_time=task_time):
with patch("django.utils.timezone.now", return_value=task_time):
self.assertIn(app, list(rule._get_newly_due_objects()))
def test_due_boundary_multi_day_interval(self):
"""interval=30 days: due after 30 full days, not after 29."""
app, rule, iteration = self._create_rule_and_iteration(interval="days=30")
fixed_now = dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC)
# Previous review opened 30 days ago (May 16), review is due for the object
LifecycleIteration.objects.filter(pk=iteration.pk).update(
opened_on=dt.datetime(2025, 5, 16, 12, 0, 0, tzinfo=dt.UTC)
)
with patch("django.utils.timezone.now", return_value=fixed_now):
self.assertIn(app, list(rule._get_newly_due_objects()))
# Previous review opened 29 days ago (May 17), new review is NOT due
LifecycleIteration.objects.filter(pk=iteration.pk).update(
opened_on=dt.datetime(2025, 5, 17, 12, 0, 0, tzinfo=dt.UTC)
)
with patch("django.utils.timezone.now", return_value=fixed_now):
self.assertNotIn(app, list(rule._get_newly_due_objects()))
def test_apply_overdue_at_boundary(self):
"""apply() marks iteration overdue when grace period just expired,
regardless of what time the daily task runs."""
_, rule, iteration = self._create_rule_and_iteration(
grace_period="days=1", interval="days=365"
)
opened_on = dt.datetime(2025, 6, 14, 20, 0, 0, tzinfo=dt.UTC)
for task_time in [
dt.datetime(2025, 6, 15, 0, 0, 1, tzinfo=dt.UTC),
dt.datetime(2025, 6, 15, 23, 59, 59, tzinfo=dt.UTC),
]:
with self.subTest(task_time=task_time):
LifecycleIteration.objects.filter(pk=iteration.pk).update(
opened_on=opened_on, state=ReviewState.PENDING
)
with patch("django.utils.timezone.now", return_value=task_time):
rule.apply()
iteration.refresh_from_db()
self.assertEqual(iteration.state, ReviewState.OVERDUE)

View File

@@ -1,11 +0,0 @@
"""API URLs"""
from authentik.enterprise.lifecycle.api.iterations import IterationViewSet
from authentik.enterprise.lifecycle.api.reviews import ReviewViewSet
from authentik.enterprise.lifecycle.api.rules import LifecycleRuleViewSet
api_urlpatterns = [
("lifecycle/iterations", IterationViewSet),
("lifecycle/reviews", ReviewViewSet),
("lifecycle/rules", LifecycleRuleViewSet),
]

View File

@@ -1,75 +0,0 @@
from datetime import datetime
from urllib import parse
from django.contrib.contenttypes.models import ContentType
from django.db.models import Model
from django.urls import reverse
from rest_framework.serializers import ChoiceField, Serializer, UUIDField
from authentik.core.api.utils import ModelSerializer
from authentik.core.models import Application, Group, User
from authentik.rbac.models import Role
def parse_content_type(value: str) -> dict:
app_label, model = value.split(".")
return {"app_label": app_label, "model": model}
def model_choices() -> list[tuple[str, str]]:
return [
("authentik_core.application", "Application"),
("authentik_core.group", "Group"),
("authentik_rbac.role", "Role"),
]
def admin_link_for_model(model: Model) -> str:
if isinstance(model, Application):
url = f"/core/applications/{model.slug}"
elif isinstance(model, Group):
url = f"/identity/groups/{model.pk}"
elif isinstance(model, Role):
url = f"/identity/roles/{model.pk}"
else:
raise TypeError("Unsupported model")
return url + ";" + parse.quote('{"page":"page-lifecycle"}')
def link_for_model(model: Model) -> str:
return f"{reverse("authentik_core:if-admin")}#{admin_link_for_model(model)}"
def start_of_day(dt: datetime) -> datetime:
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
class ContentTypeField(ChoiceField):
def __init__(self, **kwargs):
super().__init__(choices=model_choices(), **kwargs)
def to_representation(self, content_type: ContentType) -> str:
return f"{content_type.app_label}.{content_type.model}"
def to_internal_value(self, data: str) -> ContentType:
return ContentType.objects.get(**parse_content_type(data))
class GenericForeignKeySerializer(Serializer):
content_type = ContentTypeField()
object_id = UUIDField()
class ReviewerGroupSerializer(ModelSerializer):
class Meta:
model = Group
fields = [
"pk",
"name",
]
class ReviewerUserSerializer(ModelSerializer):
class Meta:
model = User
fields = ["pk", "uuid", "username", "name"]

View File

@@ -135,21 +135,21 @@ class GoogleWorkspaceProvider(OutgoingSyncProvider, BackchannelProvider):
return GoogleWorkspaceGroupClient(self)
raise ValueError(f"Invalid model {model}")
def get_object_qs(self, type: type[User | Group], **kwargs) -> QuerySet[User | Group]:
def get_object_qs(self, type: type[User | Group]) -> QuerySet[User | Group]:
if type == User:
# Get queryset of all users with consistent ordering
# according to the provider's settings
base = User.objects.all().exclude_anonymous().filter(**kwargs)
base = User.objects.all().exclude_anonymous()
if self.exclude_users_service_account:
base = base.exclude(type=UserTypes.SERVICE_ACCOUNT).exclude(
type=UserTypes.INTERNAL_SERVICE_ACCOUNT
)
if self.filter_group:
base = base.filter(groups__in=[self.filter_group])
base = base.filter(ak_groups__in=[self.filter_group])
return base.order_by("pk")
if type == Group:
# Get queryset of all groups with consistent ordering
return Group.objects.all().filter(**kwargs).order_by("pk")
return Group.objects.all().order_by("pk")
raise ValueError(f"Invalid type {type}")
@classmethod

View File

@@ -78,8 +78,7 @@ class MicrosoftEntraUserClient(MicrosoftEntraSyncClient[User, MicrosoftEntraProv
def create(self, user: User):
"""Create user from scratch and create a connection object"""
microsoft_user = self.to_schema(user, None)
if microsoft_user.user_principal_name:
self.check_email_valid(microsoft_user.user_principal_name)
self.check_email_valid(microsoft_user.user_principal_name)
with transaction.atomic():
try:
response = self._request(self.client.users.post(microsoft_user))
@@ -119,8 +118,7 @@ class MicrosoftEntraUserClient(MicrosoftEntraSyncClient[User, MicrosoftEntraProv
def update(self, user: User, connection: MicrosoftEntraProviderUser):
"""Update existing user"""
microsoft_user = self.to_schema(user, connection)
if microsoft_user.user_principal_name:
self.check_email_valid(microsoft_user.user_principal_name)
self.check_email_valid(microsoft_user.user_principal_name)
response = self._request(
self.client.users.by_user_id(connection.microsoft_id).patch(microsoft_user)
)

View File

@@ -124,21 +124,21 @@ class MicrosoftEntraProvider(OutgoingSyncProvider, BackchannelProvider):
return MicrosoftEntraGroupClient(self)
raise ValueError(f"Invalid model {model}")
def get_object_qs(self, type: type[User | Group], **kwargs) -> QuerySet[User | Group]:
def get_object_qs(self, type: type[User | Group]) -> QuerySet[User | Group]:
if type == User:
# Get queryset of all users with consistent ordering
# according to the provider's settings
base = User.objects.all().exclude_anonymous().filter(**kwargs)
base = User.objects.all().exclude_anonymous()
if self.exclude_users_service_account:
base = base.exclude(type=UserTypes.SERVICE_ACCOUNT).exclude(
type=UserTypes.INTERNAL_SERVICE_ACCOUNT
)
if self.filter_group:
base = base.filter(groups__in=[self.filter_group])
base = base.filter(ak_groups__in=[self.filter_group])
return base.order_by("pk")
if type == Group:
# Get queryset of all groups with consistent ordering
return Group.objects.all().filter(**kwargs).order_by("pk")
return Group.objects.all().order_by("pk")
raise ValueError(f"Invalid type {type}")
@classmethod

View File

@@ -43,7 +43,7 @@ def ssf_providers_post_save(sender: type[Model], instance: SSFProvider, created:
"path": USER_PATH_PROVIDERS_SSF,
},
)
user.assign_perms_to_managed_role("authentik_providers_ssf.add_stream", instance)
user.assign_perms_to_managed_role("add_stream", instance)
token, token_created = Token.objects.update_or_create(
identifier=identifier,
defaults={

View File

@@ -1,86 +0,0 @@
"""WSFederationProvider API Views"""
from django.http import HttpRequest
from django.urls import reverse
from rest_framework.fields import CharField, SerializerMethodField, URLField
from authentik.core.api.providers import ProviderSerializer
from authentik.core.models import Provider
from authentik.enterprise.api import EnterpriseRequiredMixin
from authentik.enterprise.providers.ws_federation.models import WSFederationProvider
from authentik.enterprise.providers.ws_federation.processors.metadata import MetadataProcessor
from authentik.providers.saml.api.providers import SAMLProviderSerializer, SAMLProviderViewSet
class WSFederationProviderSerializer(EnterpriseRequiredMixin, SAMLProviderSerializer):
"""WSFederationProvider Serializer"""
reply_url = URLField(source="acs_url")
wtrealm = CharField(source="audience")
url_wsfed = SerializerMethodField()
def get_url_download_metadata(self, instance: WSFederationProvider) -> str:
"""Get metadata download URL"""
if "request" not in self._context:
return ""
request: HttpRequest = self._context["request"]._request
try:
return request.build_absolute_uri(
reverse(
"authentik_providers_ws_federation:metadata-download",
kwargs={"application_slug": instance.application.slug},
)
)
except Provider.application.RelatedObjectDoesNotExist:
return request.build_absolute_uri(
reverse(
"authentik_api:wsfederationprovider-metadata",
kwargs={
"pk": instance.pk,
},
)
+ "?download"
)
def get_url_wsfed(self, instance: WSFederationProvider) -> str:
"""Get WS-Fed url"""
if "request" not in self._context:
return ""
request: HttpRequest = self._context["request"]._request
return request.build_absolute_uri(reverse("authentik_providers_ws_federation:wsfed"))
class Meta(SAMLProviderSerializer.Meta):
model = WSFederationProvider
fields = ProviderSerializer.Meta.fields + [
"reply_url",
"wtrealm",
"assertion_valid_not_before",
"assertion_valid_not_on_or_after",
"session_valid_not_on_or_after",
"property_mappings",
"name_id_mapping",
"authn_context_class_ref_mapping",
"digest_algorithm",
"signature_algorithm",
"signing_kp",
"encryption_kp",
"sign_assertion",
"sign_logout_request",
"default_name_id_policy",
"url_download_metadata",
"url_wsfed",
]
extra_kwargs = ProviderSerializer.Meta.extra_kwargs
class WSFederationProviderViewSet(SAMLProviderViewSet):
"""WSFederationProvider Viewset"""
queryset = WSFederationProvider.objects.all()
serializer_class = WSFederationProviderSerializer
filterset_fields = "__all__"
ordering = ["name"]
search_fields = ["name"]
metadata_generator_class = MetadataProcessor
import_metadata = None

View File

@@ -1,13 +0,0 @@
"""WSFed app config"""
from authentik.enterprise.apps import EnterpriseConfig
class AuthentikEnterpriseProviderWSFederatopm(EnterpriseConfig):
"""authentik enterprise ws federation app config"""
name = "authentik.enterprise.providers.ws_federation"
label = "authentik_providers_ws_federation"
verbose_name = "authentik Enterprise.Providers.WS-Federation"
default = True
mountpoint = "application/wsfed/"

View File

@@ -1,37 +0,0 @@
# Generated by Django 5.2.10 on 2026-01-18 23:25
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("authentik_providers_saml", "0020_samlprovider_logout_method_and_more"),
]
operations = [
migrations.CreateModel(
name="WSFederationProvider",
fields=[
(
"samlprovider_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="authentik_providers_saml.samlprovider",
),
),
],
options={
"verbose_name": "WS-Federation Provider",
"verbose_name_plural": "WS-Federation Providers",
},
bases=("authentik_providers_saml.samlprovider",),
),
]

View File

@@ -1,36 +0,0 @@
from django.templatetags.static import static
from django.utils.translation import gettext_lazy as _
from rest_framework.serializers import Serializer
from authentik.providers.saml.models import SAMLProvider
class WSFederationProvider(SAMLProvider):
"""WS-Federation for applications which support WS-Fed."""
# Alias'd fields:
# - acs_url -> reply_url
# - audience -> realm / wtrealm
@property
def serializer(self) -> type[Serializer]:
from authentik.enterprise.providers.ws_federation.api.providers import (
WSFederationProviderSerializer,
)
return WSFederationProviderSerializer
@property
def icon_url(self) -> str | None:
return static("authentik/sources/wsfed.svg")
@property
def component(self) -> str:
return "ak-provider-wsfed-form"
def __str__(self):
return f"WS-Federation Provider {self.name}"
class Meta:
verbose_name = _("WS-Federation Provider")
verbose_name_plural = _("WS-Federation Providers")

View File

@@ -1,39 +0,0 @@
from authentik.common.saml.constants import NS_MAP as _map
WS_FED_ACTION_SIGN_IN = "wsignin1.0"
WS_FED_ACTION_SIGN_OUT = "wsignout1.0"
WS_FED_ACTION_SIGN_OUT_CLEANUP = "wsignoutcleanup1.0"
WS_FED_POST_KEY_ACTION = "wa"
WS_FED_POST_KEY_RESULT = "wresult"
WS_FED_POST_KEY_CONTEXT = "wctx"
WSS_TOKEN_TYPE_SAML2 = (
"http://docs.oasis-open.org/wss/oasis-wss-saml-token-profile-1.1#SAMLV2.0" # nosec
)
WSS_KEY_IDENTIFIER_SAML_ID = (
"http://docs.oasis-open.org/wss/oasis-wss-saml-token-profile-1.1#SAMLID"
)
NS_WS_FED_PROTOCOL = "http://docs.oasis-open.org/wsfed/federation/200706"
NS_WS_FED_TRUST = "http://schemas.xmlsoap.org/ws/2005/02/trust"
NS_WSI = "http://www.w3.org/2001/XMLSchema-instance"
NS_ADDRESSING = "http://www.w3.org/2005/08/addressing"
NS_POLICY = "http://schemas.xmlsoap.org/ws/2004/09/policy"
NS_WSS_SEC = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"
NS_WSS_UTILITY = (
"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"
)
NS_WSS_D3P1 = "http://docs.oasis-open.org/wss/oasis-wss-wssecurity-secext-1.1.xsd"
NS_MAP = {
**_map,
"fed": NS_WS_FED_PROTOCOL,
"xsi": NS_WSI,
"wsa": NS_ADDRESSING,
"t": NS_WS_FED_TRUST,
"wsu": NS_WSS_UTILITY,
"wsp": NS_POLICY,
"wssec": NS_WSS_SEC,
"d3p1": NS_WSS_D3P1,
}

View File

@@ -1,40 +0,0 @@
from django.urls import reverse
from lxml.etree import SubElement, _Element # nosec
from authentik.common.saml.constants import NS_SAML_METADATA
from authentik.enterprise.providers.ws_federation.processors.constants import (
NS_ADDRESSING,
NS_MAP,
NS_WS_FED_PROTOCOL,
NS_WSI,
)
from authentik.providers.saml.processors.metadata import MetadataProcessor as BaseMetadataProcessor
class MetadataProcessor(BaseMetadataProcessor):
def add_children(self, entity_descriptor: _Element):
self.add_role_descriptor_sts(entity_descriptor)
super().add_children(entity_descriptor)
def add_endpoint(self, parent: _Element, name: str):
endpoint = SubElement(parent, f"{{{NS_WS_FED_PROTOCOL}}}{name}", nsmap=NS_MAP)
endpoint_ref = SubElement(endpoint, f"{{{NS_ADDRESSING}}}EndpointReference", nsmap=NS_MAP)
address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address", nsmap=NS_MAP)
address.text = self.http_request.build_absolute_uri(
reverse("authentik_providers_ws_federation:wsfed")
)
def add_role_descriptor_sts(self, entity_descriptor: _Element):
role_descriptor = SubElement(
entity_descriptor, f"{{{NS_SAML_METADATA}}}RoleDescriptor", nsmap=NS_MAP
)
role_descriptor.attrib[f"{{{NS_WSI}}}type"] = "fed:SecurityTokenServiceType"
role_descriptor.attrib["protocolSupportEnumeration"] = NS_WS_FED_PROTOCOL
signing_descriptor = self.get_signing_key_descriptor()
if signing_descriptor is not None:
role_descriptor.append(signing_descriptor)
self.add_endpoint(role_descriptor, "SecurityTokenServiceEndpoint")
self.add_endpoint(role_descriptor, "PassiveRequestorEndpoint")

View File

@@ -1,162 +0,0 @@
from dataclasses import dataclass
from django.http import HttpRequest
from django.shortcuts import get_object_or_404
from lxml import etree # nosec
from lxml.etree import Element, SubElement, _Element # nosec
from authentik.core.models import Application
from authentik.enterprise.providers.ws_federation.models import WSFederationProvider
from authentik.enterprise.providers.ws_federation.processors.constants import (
NS_ADDRESSING,
NS_MAP,
NS_POLICY,
NS_WS_FED_TRUST,
NS_WSS_D3P1,
NS_WSS_SEC,
NS_WSS_UTILITY,
WS_FED_ACTION_SIGN_IN,
WS_FED_POST_KEY_ACTION,
WS_FED_POST_KEY_CONTEXT,
WS_FED_POST_KEY_RESULT,
WSS_KEY_IDENTIFIER_SAML_ID,
WSS_TOKEN_TYPE_SAML2,
)
from authentik.lib.utils.time import timedelta_from_string
from authentik.policies.utils import delete_none_values
from authentik.providers.saml.processors.assertion import AssertionProcessor
from authentik.providers.saml.processors.authn_request_parser import AuthNRequest
from authentik.providers.saml.utils.time import get_time_string
@dataclass()
class SignInRequest:
wa: str
wtrealm: str
wreply: str
wctx: str | None
@staticmethod
def parse(request: HttpRequest) -> SignInRequest:
action = request.GET.get("wa")
if action != WS_FED_ACTION_SIGN_IN:
raise ValueError("Invalid action")
realm = request.GET.get("wtrealm")
if not realm:
raise ValueError("Missing Realm")
req = SignInRequest(
wa=action,
wtrealm=realm,
wreply=request.GET.get("wreply"),
wctx=request.GET.get("wctx", ""),
)
_, provider = req.get_app_provider()
if not req.wreply:
req.wreply = provider.acs_url
if not req.wreply.startswith(provider.acs_url):
raise ValueError("Invalid wreply")
return req
def get_app_provider(self):
provider: WSFederationProvider = get_object_or_404(
WSFederationProvider, audience=self.wtrealm
)
application = get_object_or_404(Application, provider=provider)
return application, provider
class SignInProcessor:
provider: WSFederationProvider
request: HttpRequest
sign_in_request: SignInRequest
saml_processor: AssertionProcessor
def __init__(
self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest
):
self.provider = provider
self.request = request
self.sign_in_request = sign_in_request
self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest())
self.saml_processor.provider.audience = self.sign_in_request.wtrealm
if self.provider.signing_kp:
self.saml_processor.provider.sign_assertion = True
def create_response_token(self):
root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP)
root.append(self.response_add_lifetime())
root.append(self.response_add_applies_to())
root.append(self.response_add_requested_security_token())
root.append(
self.response_add_attached_reference(
"RequestedAttachedReference", self.saml_processor._assertion_id
)
)
root.append(
self.response_add_attached_reference(
"RequestedUnattachedReference", self.saml_processor._assertion_id
)
)
token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType")
token_type.text = WSS_TOKEN_TYPE_SAML2
request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType")
request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue"
key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType")
key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey"
return root
def response_add_lifetime(self) -> _Element:
"""Add Lifetime element"""
lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP)
created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created")
created.text = get_time_string()
expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires")
expires.text = get_time_string(
timedelta_from_string(self.provider.session_valid_not_on_or_after)
)
return lifetime
def response_add_applies_to(self) -> _Element:
"""Add AppliesTo element"""
applies_to = Element(f"{{{NS_POLICY}}}AppliesTo")
endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference")
address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address")
address.text = self.sign_in_request.wtrealm
return applies_to
def response_add_requested_security_token(self) -> _Element:
"""Add RequestedSecurityToken and child assertion"""
token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken")
token.append(self.saml_processor.get_assertion())
return token
def response_add_attached_reference(self, tag: str, value: str) -> _Element:
ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}")
sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference")
sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2
key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier")
key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID
key_identifier.text = value
return ref
def response(self) -> dict[str, str]:
root = self.create_response_token()
assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
if self.provider.signing_kp:
self.saml_processor._sign(assertion)
str_token = etree.tostring(root).decode("utf-8") # nosec
return delete_none_values(
{
WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN,
WS_FED_POST_KEY_RESULT: str_token,
WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx,
}
)

View File

@@ -1,44 +0,0 @@
from dataclasses import dataclass
from django.http import HttpRequest
from django.shortcuts import get_object_or_404
from authentik.core.models import Application
from authentik.enterprise.providers.ws_federation.models import WSFederationProvider
from authentik.enterprise.providers.ws_federation.processors.constants import WS_FED_ACTION_SIGN_OUT
@dataclass()
class SignOutRequest:
wa: str
wtrealm: str
wreply: str
@staticmethod
def parse(request: HttpRequest) -> SignOutRequest:
action = request.GET.get("wa")
if action != WS_FED_ACTION_SIGN_OUT:
raise ValueError("Invalid action")
realm = request.GET.get("wtrealm")
if not realm:
raise ValueError("Missing Realm")
req = SignOutRequest(
wa=action,
wtrealm=realm,
wreply=request.GET.get("wreply"),
)
_, provider = req.get_app_provider()
if not req.wreply:
req.wreply = provider.acs_url
if not req.wreply.startswith(provider.acs_url):
raise ValueError("Invalid wreply")
return req
def get_app_provider(self):
provider: WSFederationProvider = get_object_or_404(
WSFederationProvider, audience=self.wtrealm
)
application = get_object_or_404(Application, provider=provider)
return application, provider

View File

@@ -1,93 +0,0 @@
"""WS-Fed Provider signals"""
from urllib.parse import urlencode, urlparse, urlunparse
from django.dispatch import receiver
from django.http import HttpRequest
from django.urls import reverse
from django.utils import timezone
from structlog.stdlib import get_logger
from authentik.core.models import AuthenticatedSession, User
from authentik.enterprise.providers.ws_federation.models import WSFederationProvider
from authentik.enterprise.providers.ws_federation.processors.constants import (
WS_FED_ACTION_SIGN_OUT_CLEANUP,
WS_FED_POST_KEY_ACTION,
)
from authentik.flows.models import in_memory_stage
from authentik.flows.views.executor import FlowExecutorView
from authentik.providers.iframe_logout import IframeLogoutStageView
from authentik.providers.saml.models import SAMLBindings, SAMLSession
from authentik.providers.saml.views.flows import (
PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS,
PLAN_CONTEXT_SAML_RELAY_STATE,
)
from authentik.stages.user_logout.models import UserLogoutStage
from authentik.stages.user_logout.stage import flow_pre_user_logout
LOGGER = get_logger()
@receiver(flow_pre_user_logout)
def handle_ws_fed_iframe_pre_user_logout(
sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs
):
"""Handle WS-Fed iframe logout when user logs out via flow"""
# Only proceed if this is actually a UserLogoutStage
if not isinstance(executor.current_stage, UserLogoutStage):
return
if not user.is_authenticated:
return
auth_session = AuthenticatedSession.from_request(request, user)
if not auth_session:
return
wsfed_sessions = SAMLSession.objects.filter(
session=auth_session,
user=user,
expires__gt=timezone.now(),
expiring=True,
# Only get WS-Federation provider sessions
provider__wsfederationprovider__isnull=False,
).select_related("provider__wsfederationprovider")
if not wsfed_sessions.exists():
LOGGER.debug("No sessions requiring IFrame frontchannel logout")
return
saml_sessions = []
relay_state = request.build_absolute_uri(
reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug})
)
# Store return URL in plan context as fallback if SP doesn't echo relay_state
executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state
for session in wsfed_sessions:
provider: WSFederationProvider = session.provider.wsfederationprovider
parts = urlparse(str(provider.acs_url))
parts = parts._replace(
query=urlencode({WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_OUT_CLEANUP})
)
logout_data = {
"url": urlunparse(parts),
"provider_name": provider.name,
"binding": SAMLBindings.REDIRECT,
}
saml_sessions.append(logout_data)
if saml_sessions:
executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS] = saml_sessions
# Stage already exists, don't reinject it
if not any(
binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings
):
iframe_stage = in_memory_stage(IframeLogoutStageView)
executor.plan.insert_stage(iframe_stage, index=1)
LOGGER.debug("WSFed iframe sessions gathered")

Some files were not shown because too many files have changed in this diff Show More