mirror of
https://github.com/goauthentik/authentik
synced 2026-05-14 10:56:52 +02:00
Compare commits
1 Commits
version/20
...
remember-m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
20d5228de5 |
6
.github/actions/cherry-pick/action.yml
vendored
6
.github/actions/cherry-pick/action.yml
vendored
@@ -215,9 +215,6 @@ runs:
|
||||
--head "$CHERRY_PICK_BRANCH" \
|
||||
--label "cherry-pick")
|
||||
|
||||
# Assign the PR to the original author
|
||||
gh pr edit "$NEW_PR" --add-assignee "$PR_AUTHOR" || true
|
||||
|
||||
echo "✅ Created cherry-pick PR $NEW_PR for $TARGET_BRANCH"
|
||||
|
||||
# Comment on original PR
|
||||
@@ -257,9 +254,6 @@ runs:
|
||||
--head "$CHERRY_PICK_BRANCH" \
|
||||
--label "cherry-pick")
|
||||
|
||||
# Assign the PR to the original author
|
||||
gh pr edit "$NEW_PR" --add-assignee "$PR_AUTHOR" || true
|
||||
|
||||
echo "⚠️ Created conflict resolution PR $NEW_PR for $TARGET_BRANCH"
|
||||
|
||||
# Comment on original PR
|
||||
|
||||
4
.github/actions/setup/action.yml
vendored
4
.github/actions/setup/action.yml
vendored
@@ -22,7 +22,7 @@ runs:
|
||||
sudo rm -rf /usr/local/lib/android
|
||||
- name: Install uv
|
||||
if: ${{ contains(inputs.dependencies, 'python') }}
|
||||
uses: astral-sh/setup-uv@eac588ad8def6316056a12d4907a9d4d84ff7a3b # v5
|
||||
uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v5
|
||||
with:
|
||||
enable-cache: true
|
||||
- name: Setup python
|
||||
@@ -58,7 +58,7 @@ runs:
|
||||
run: |
|
||||
export PSQL_TAG=${{ inputs.postgresql_version }}
|
||||
docker compose -f .github/actions/setup/compose.yml up -d
|
||||
cd web && npm ci
|
||||
cd web && npm i
|
||||
- name: Generate config
|
||||
if: ${{ contains(inputs.dependencies, 'python') }}
|
||||
shell: uv run python {0}
|
||||
|
||||
@@ -56,29 +56,30 @@ jobs:
|
||||
release: ${{ inputs.release }}
|
||||
- name: Login to Docker Hub
|
||||
if: ${{ inputs.registry_dockerhub }}
|
||||
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
|
||||
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_CORP_USERNAME }}
|
||||
password: ${{ secrets.DOCKER_CORP_PASSWORD }}
|
||||
- name: Login to GitHub Container Registry
|
||||
if: ${{ inputs.registry_ghcr }}
|
||||
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
|
||||
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # v5
|
||||
- name: make empty clients
|
||||
if: ${{ inputs.release }}
|
||||
run: |
|
||||
mkdir -p ./gen-ts-api
|
||||
mkdir -p ./gen-go-api
|
||||
- name: Setup node
|
||||
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # v5
|
||||
with:
|
||||
node-version-file: web/package.json
|
||||
cache: "npm"
|
||||
cache-dependency-path: web/package-lock.json
|
||||
- uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6
|
||||
with:
|
||||
go-version-file: "go.mod"
|
||||
- name: Generate API Clients
|
||||
run: |
|
||||
make gen-client-ts
|
||||
make gen-client-go
|
||||
- name: generate ts client
|
||||
run: make gen-client-ts
|
||||
- name: Build Docker Image
|
||||
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # v6
|
||||
id: push
|
||||
@@ -95,7 +96,7 @@ jobs:
|
||||
platforms: linux/${{ inputs.image_arch }}
|
||||
cache-from: type=registry,ref=${{ steps.ev.outputs.attestImageNames }}:buildcache-${{ inputs.image_arch }}
|
||||
cache-to: ${{ steps.ev.outputs.cacheTo }}
|
||||
- uses: actions/attest-build-provenance@96278af6caaf10aea03fd8d33a09a777ca52d62f # v3
|
||||
- uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3
|
||||
id: attest
|
||||
if: ${{ steps.ev.outputs.shouldPush == 'true' }}
|
||||
with:
|
||||
|
||||
8
.github/workflows/_reusable-docker-build.yml
vendored
8
.github/workflows/_reusable-docker-build.yml
vendored
@@ -79,25 +79,25 @@ jobs:
|
||||
image-name: ${{ inputs.image_name }}
|
||||
- name: Login to Docker Hub
|
||||
if: ${{ inputs.registry_dockerhub }}
|
||||
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
|
||||
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_CORP_USERNAME }}
|
||||
password: ${{ secrets.DOCKER_CORP_PASSWORD }}
|
||||
- name: Login to GitHub Container Registry
|
||||
if: ${{ inputs.registry_ghcr }}
|
||||
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
|
||||
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
- uses: int128/docker-manifest-create-action@1a059c021f1d5e9f2bd39de745d5dd3a0ef6df90 # v2
|
||||
- uses: int128/docker-manifest-create-action@a39573caa37b6a8a03302d43b57c3f48635096e2 # v2
|
||||
id: build
|
||||
with:
|
||||
tags: ${{ matrix.tag }}
|
||||
sources: |
|
||||
${{ steps.ev.outputs.attestImageNames }}@${{ needs.build-server-amd64.outputs.image-digest }}
|
||||
${{ steps.ev.outputs.attestImageNames }}@${{ needs.build-server-arm64.outputs.image-digest }}
|
||||
- uses: actions/attest-build-provenance@96278af6caaf10aea03fd8d33a09a777ca52d62f # v3
|
||||
- uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3
|
||||
id: attest
|
||||
with:
|
||||
subject-name: ${{ steps.ev.outputs.attestImageNames }}
|
||||
|
||||
2
.github/workflows/ci-api-docs.yml
vendored
2
.github/workflows/ci-api-docs.yml
vendored
@@ -41,7 +41,7 @@ jobs:
|
||||
- working-directory: website/
|
||||
name: Install Dependencies
|
||||
run: npm ci
|
||||
- uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v4
|
||||
- uses: actions/cache@8b402f58fbc84540c8b491a91e594a4576fec3d7 # v4
|
||||
with:
|
||||
path: |
|
||||
${{ github.workspace }}/website/api/.docusaurus
|
||||
|
||||
12
.github/workflows/ci-docs.yml
vendored
12
.github/workflows/ci-docs.yml
vendored
@@ -15,8 +15,6 @@ on:
|
||||
jobs:
|
||||
lint:
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
NODE_ENV: production
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
@@ -32,8 +30,7 @@ jobs:
|
||||
run: npm run ${{ matrix.command }}
|
||||
build-docs:
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
NODE_ENV: production
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v5
|
||||
- uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # v5
|
||||
@@ -49,8 +46,7 @@ jobs:
|
||||
run: npm run build
|
||||
build-integrations:
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
NODE_ENV: production
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v5
|
||||
- uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # v5
|
||||
@@ -89,7 +85,7 @@ jobs:
|
||||
image-name: ghcr.io/goauthentik/dev-docs
|
||||
- name: Login to Container Registry
|
||||
if: ${{ steps.ev.outputs.shouldPush == 'true' }}
|
||||
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
|
||||
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
@@ -105,7 +101,7 @@ jobs:
|
||||
context: .
|
||||
cache-from: type=registry,ref=ghcr.io/goauthentik/dev-docs:buildcache
|
||||
cache-to: ${{ steps.ev.outputs.shouldPush == 'true' && 'type=registry,ref=ghcr.io/goauthentik/dev-docs:buildcache,mode=max' || '' }}
|
||||
- uses: actions/attest-build-provenance@96278af6caaf10aea03fd8d33a09a777ca52d62f # v3
|
||||
- uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3
|
||||
id: attest
|
||||
if: ${{ steps.ev.outputs.shouldPush == 'true' }}
|
||||
with:
|
||||
|
||||
6
.github/workflows/ci-main.yml
vendored
6
.github/workflows/ci-main.yml
vendored
@@ -187,8 +187,6 @@ jobs:
|
||||
glob: tests/e2e/test_provider_saml* tests/e2e/test_source_saml*
|
||||
- name: ldap
|
||||
glob: tests/e2e/test_provider_ldap* tests/e2e/test_source_ldap*
|
||||
- name: ws-fed
|
||||
glob: tests/e2e/test_provider_ws_fed*
|
||||
- name: radius
|
||||
glob: tests/e2e/test_provider_radius*
|
||||
- name: scim
|
||||
@@ -205,7 +203,7 @@ jobs:
|
||||
run: |
|
||||
docker compose -f tests/e2e/compose.yml up -d --quiet-pull
|
||||
- id: cache-web
|
||||
uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v4
|
||||
uses: actions/cache@8b402f58fbc84540c8b491a91e594a4576fec3d7 # v4
|
||||
with:
|
||||
path: web/dist
|
||||
key: ${{ runner.os }}-web-${{ hashFiles('web/package-lock.json', 'package-lock.json', 'web/src/**', 'web/packages/sfe/src/**') }}-b
|
||||
@@ -248,7 +246,7 @@ jobs:
|
||||
run: |
|
||||
docker compose -f tests/openid_conformance/compose.yml up -d --quiet-pull
|
||||
- id: cache-web
|
||||
uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v4
|
||||
uses: actions/cache@8b402f58fbc84540c8b491a91e594a4576fec3d7 # v4
|
||||
with:
|
||||
path: web/dist
|
||||
key: ${{ runner.os }}-web-${{ hashFiles('web/package-lock.json', 'web/src/**', 'web/packages/sfe/src/**') }}-b
|
||||
|
||||
4
.github/workflows/ci-outpost.yml
vendored
4
.github/workflows/ci-outpost.yml
vendored
@@ -102,7 +102,7 @@ jobs:
|
||||
image-name: ghcr.io/goauthentik/dev-${{ matrix.type }}
|
||||
- name: Login to Container Registry
|
||||
if: ${{ steps.ev.outputs.shouldPush == 'true' }}
|
||||
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
|
||||
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
@@ -122,7 +122,7 @@ jobs:
|
||||
context: .
|
||||
cache-from: type=registry,ref=ghcr.io/goauthentik/dev-${{ matrix.type }}:buildcache
|
||||
cache-to: ${{ steps.ev.outputs.shouldPush == 'true' && format('type=registry,ref=ghcr.io/goauthentik/dev-{0}:buildcache,mode=max', matrix.type) || '' }}
|
||||
- uses: actions/attest-build-provenance@96278af6caaf10aea03fd8d33a09a777ca52d62f # v3
|
||||
- uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3
|
||||
id: attest
|
||||
if: ${{ steps.ev.outputs.shouldPush == 'true' }}
|
||||
with:
|
||||
|
||||
2
.github/workflows/gen-image-compress.yml
vendored
2
.github/workflows/gen-image-compress.yml
vendored
@@ -38,7 +38,7 @@ jobs:
|
||||
token: ${{ steps.generate_token.outputs.token }}
|
||||
- name: Compress images
|
||||
id: compress
|
||||
uses: calibreapp/image-actions@d9c8ee5c3dc52ae4622c82ead88d658f4b16b65f # main
|
||||
uses: calibreapp/image-actions@420075c115b26f8785e293c5bd5bef0911c506e5 # main
|
||||
with:
|
||||
GITHUB_TOKEN: ${{ steps.generate_token.outputs.token }}
|
||||
compressOnly: ${{ github.event_name != 'pull_request' }}
|
||||
|
||||
2
.github/workflows/packages-npm-publish.yml
vendored
2
.github/workflows/packages-npm-publish.yml
vendored
@@ -40,7 +40,7 @@ jobs:
|
||||
registry-url: "https://registry.npmjs.org"
|
||||
- name: Get changed files
|
||||
id: changed-files
|
||||
uses: tj-actions/changed-files@8cba46e29c11878d930bca7870bb54394d3e8b21 # 24d32ffd492484c1d75e0c0b894501ddb9d30d62
|
||||
uses: tj-actions/changed-files@e0021407031f5be11a464abee9a0776171c79891 # 24d32ffd492484c1d75e0c0b894501ddb9d30d62
|
||||
with:
|
||||
files: |
|
||||
${{ matrix.package }}/package.json
|
||||
|
||||
44
.github/workflows/release-publish.yml
vendored
44
.github/workflows/release-publish.yml
vendored
@@ -44,7 +44,7 @@ jobs:
|
||||
with:
|
||||
image-name: ghcr.io/goauthentik/docs
|
||||
- name: Login to GitHub Container Registry
|
||||
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
|
||||
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
@@ -58,7 +58,7 @@ jobs:
|
||||
push: true
|
||||
platforms: linux/amd64,linux/arm64
|
||||
context: .
|
||||
- uses: actions/attest-build-provenance@96278af6caaf10aea03fd8d33a09a777ca52d62f # v3
|
||||
- uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3
|
||||
id: attest
|
||||
if: true
|
||||
with:
|
||||
@@ -87,11 +87,6 @@ jobs:
|
||||
- uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6
|
||||
with:
|
||||
go-version-file: "go.mod"
|
||||
- uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # v5
|
||||
with:
|
||||
node-version-file: web/package.json
|
||||
cache: "npm"
|
||||
cache-dependency-path: web/package-lock.json
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@c7c53464625b32c7a7e944ae62b3e17d2b600130 # v3.7.0
|
||||
- name: Set up Docker Buildx
|
||||
@@ -103,17 +98,17 @@ jobs:
|
||||
DOCKER_USERNAME: ${{ secrets.DOCKER_CORP_USERNAME }}
|
||||
with:
|
||||
image-name: ghcr.io/goauthentik/${{ matrix.type }},authentik/${{ matrix.type }}
|
||||
- name: Generate API Clients
|
||||
- name: make empty clients
|
||||
run: |
|
||||
make gen-client-ts
|
||||
make gen-client-go
|
||||
mkdir -p ./gen-ts-api
|
||||
mkdir -p ./gen-go-api
|
||||
- name: Docker Login Registry
|
||||
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
|
||||
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_CORP_USERNAME }}
|
||||
password: ${{ secrets.DOCKER_CORP_PASSWORD }}
|
||||
- name: Login to GitHub Container Registry
|
||||
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
|
||||
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
@@ -129,7 +124,7 @@ jobs:
|
||||
file: lifecycle/container/${{ matrix.type }}.Dockerfile
|
||||
platforms: linux/amd64,linux/arm64
|
||||
context: .
|
||||
- uses: actions/attest-build-provenance@96278af6caaf10aea03fd8d33a09a777ca52d62f # v3
|
||||
- uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3
|
||||
id: attest
|
||||
with:
|
||||
subject-name: ${{ steps.ev.outputs.attestImageNames }}
|
||||
@@ -160,17 +155,10 @@ jobs:
|
||||
node-version-file: web/package.json
|
||||
cache: "npm"
|
||||
cache-dependency-path: web/package-lock.json
|
||||
- name: Install web dependencies
|
||||
working-directory: web/
|
||||
run: |
|
||||
npm ci
|
||||
- name: Generate API Clients
|
||||
run: |
|
||||
make gen-client-ts
|
||||
make gen-client-go
|
||||
- name: Build web
|
||||
working-directory: web/
|
||||
run: |
|
||||
npm ci
|
||||
npm run build-proxy
|
||||
- name: Build outpost
|
||||
run: |
|
||||
@@ -199,7 +187,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v5
|
||||
- uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7 # v6.0.0
|
||||
- uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708 # v5
|
||||
with:
|
||||
role-to-assume: "arn:aws:iam::016170277896:role/github_goauthentik_authentik"
|
||||
aws-region: ${{ env.AWS_REGION }}
|
||||
@@ -217,12 +205,12 @@ jobs:
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v5
|
||||
- name: Run test suite in final docker images
|
||||
run: |
|
||||
echo "PG_PASS=$(openssl rand 32 | base64 -w 0)" >> lifecycle/container/.env
|
||||
echo "AUTHENTIK_SECRET_KEY=$(openssl rand 32 | base64 -w 0)" >> lifecycle/container/.env
|
||||
docker compose -f lifecycle/container/compose.yml pull -q
|
||||
docker compose -f lifecycle/container/compose.yml up --no-start
|
||||
docker compose -f lifecycle/container/compose.yml start postgresql
|
||||
docker compose -f lifecycle/container/compose.yml run -u root server test-all
|
||||
echo "PG_PASS=$(openssl rand 32 | base64 -w 0)" >> .env
|
||||
echo "AUTHENTIK_SECRET_KEY=$(openssl rand 32 | base64 -w 0)" >> .env
|
||||
docker compose pull -q
|
||||
docker compose up --no-start
|
||||
docker compose start postgresql
|
||||
docker compose run -u root server test-all
|
||||
sentry-release:
|
||||
needs:
|
||||
- build-server
|
||||
|
||||
11
.github/workflows/release-tag.yml
vendored
11
.github/workflows/release-tag.yml
vendored
@@ -55,8 +55,6 @@ jobs:
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v5
|
||||
with:
|
||||
ref: "version-${{ needs.check-inputs.outputs.major_version }}"
|
||||
- name: Setup authentik env
|
||||
uses: ./.github/actions/setup
|
||||
- run: make test-docker
|
||||
bump-authentik:
|
||||
name: Bump authentik version
|
||||
@@ -91,7 +89,6 @@ jobs:
|
||||
# ID from https://api.github.com/users/authentik-automation[bot]
|
||||
git config --global user.name '${{ steps.app-token.outputs.app-slug }}[bot]'
|
||||
git config --global user.email '${{ steps.get-user-id.outputs.user-id }}+${{ steps.app-token.outputs.app-slug }}[bot]@users.noreply.github.com'
|
||||
git pull
|
||||
git commit -a -m "release: ${{ inputs.version }}" --allow-empty
|
||||
git tag "version/${{ inputs.version }}" HEAD -m "version/${{ inputs.version }}"
|
||||
git push --follow-tags
|
||||
@@ -175,25 +172,21 @@ jobs:
|
||||
if: "${{ inputs.release_reason == 'feature' }}"
|
||||
run: |
|
||||
changelog_url="https://docs.goauthentik.io/docs/releases/${{ needs.check-inputs.outputs.major_version }}"
|
||||
reason="{{ inputs.release_reason }}"
|
||||
jq \
|
||||
--arg version "${{ inputs.version }}" \
|
||||
--arg changelog "See ${changelog_url}" \
|
||||
--arg changelog_url "${changelog_url}" \
|
||||
--arg reason "${reason}" \
|
||||
'.stable.version = $version | .stable.changelog = $changelog | .stable.changelog_url = $changelog_url | .stable.reason = $reason' version.json > version.new.json
|
||||
'.stable.version = $version | .stable.changelog = $changelog | .stable.changelog_url = $changelog_url' version.json > version.new.json
|
||||
mv version.new.json version.json
|
||||
- name: Bump version
|
||||
if: "${{ inputs.release_reason != 'feature' }}"
|
||||
run: |
|
||||
changelog_url="https://docs.goauthentik.io/docs/releases/${{ needs.check-inputs.outputs.major_version }}#fixed-in-$(echo -n ${{ inputs.version}} | sed 's/\.//g')"
|
||||
reason="{{ inputs.release_reason }}"
|
||||
jq \
|
||||
--arg version "${{ inputs.version }}" \
|
||||
--arg changelog "See ${changelog_url}" \
|
||||
--arg changelog_url "${changelog_url}" \
|
||||
--arg reason "${reason}" \
|
||||
'.stable.version = $version | .stable.changelog = $changelog | .stable.changelog_url = $changelog_url | .stable.reason = $reason' version.json > version.new.json
|
||||
'.stable.version = $version | .stable.changelog = $changelog | .stable.changelog_url = $changelog_url' version.json > version.new.json
|
||||
mv version.new.json version.json
|
||||
- name: Create pull request
|
||||
uses: peter-evans/create-pull-request@c0f553fe549906ede9cf27b5156039d195d2ece0 # v7
|
||||
|
||||
32
Makefile
32
Makefile
@@ -5,6 +5,7 @@ SHELL := /usr/bin/env bash
|
||||
PWD = $(shell pwd)
|
||||
UID = $(shell id -u)
|
||||
GID = $(shell id -g)
|
||||
NPM_VERSION = $(shell python -m scripts.generate_semver)
|
||||
PY_SOURCES = authentik packages tests scripts lifecycle .github
|
||||
DOCKER_IMAGE ?= "authentik:test"
|
||||
|
||||
@@ -49,14 +50,6 @@ ifeq ($(UNAME_S),Darwin)
|
||||
endif
|
||||
endif
|
||||
|
||||
NPM_VERSION :=
|
||||
UV_EXISTS := $(shell command -v uv 2> /dev/null)
|
||||
ifdef UV_EXISTS
|
||||
NPM_VERSION := $(shell $(UV) run python -m scripts.generate_semver)
|
||||
else
|
||||
NPM_VERSION = $(shell python -m scripts.generate_semver)
|
||||
endif
|
||||
|
||||
all: lint-fix lint gen web test ## Lint, build, and test everything
|
||||
|
||||
HELP_WIDTH := $(shell grep -h '^[a-z][^ ]*:.*\#\#' $(MAKEFILE_LIST) 2>/dev/null | \
|
||||
@@ -84,7 +77,7 @@ lint-fix: lint-codespell ## Lint and automatically fix errors in the python sou
|
||||
lint-codespell: ## Reports spelling errors.
|
||||
$(UV) run codespell -w
|
||||
|
||||
lint: ci-bandit ci-mypy ## Lint the python and golang sources
|
||||
lint: ci-bandit ## Lint the python and golang sources
|
||||
golangci-lint run -v
|
||||
|
||||
core-install:
|
||||
@@ -148,11 +141,11 @@ bump: ## Bump authentik version. Usage: make bump version=20xx.xx.xx
|
||||
ifndef version
|
||||
$(error Usage: make bump version=20xx.xx.xx )
|
||||
endif
|
||||
$(eval current_version := $(shell cat ${PWD}/internal/constants/VERSION))
|
||||
$(SED_INPLACE) 's/^version = ".*"/version = "$(version)"/' ${PWD}/pyproject.toml
|
||||
$(SED_INPLACE) 's/^VERSION = ".*"/VERSION = "$(version)"/' ${PWD}/authentik/__init__.py
|
||||
$(SED_INPLACE) 's/^version = ".*"/version = "$(version)"/' pyproject.toml
|
||||
$(SED_INPLACE) 's/^VERSION = ".*"/VERSION = "$(version)"/' authentik/__init__.py
|
||||
$(MAKE) gen-build gen-compose aws-cfn
|
||||
$(SED_INPLACE) "s/\"${current_version}\"/\"$(version)\"/" ${PWD}/package.json ${PWD}/package-lock.json ${PWD}/web/package.json ${PWD}/web/package-lock.json
|
||||
npm version --no-git-tag-version --allow-same-version $(version)
|
||||
cd ${PWD}/web && npm version --no-git-tag-version --allow-same-version $(version)
|
||||
echo -n $(version) > ${PWD}/internal/constants/VERSION
|
||||
|
||||
#########################
|
||||
@@ -213,15 +206,24 @@ gen-client-ts: gen-clean-ts ## Build and install the authentik API for Typescri
|
||||
|
||||
gen-client-py: gen-clean-py ## Build and install the authentik API for Python
|
||||
mkdir -p ${PWD}/${GEN_API_PY}
|
||||
ifeq ($(wildcard ${PWD}/${GEN_API_PY}/.*),)
|
||||
git clone --depth 1 https://github.com/goauthentik/client-python.git ${PWD}/${GEN_API_PY}
|
||||
else
|
||||
cd ${PWD}/${GEN_API_PY} && git pull
|
||||
endif
|
||||
cp ${PWD}/schema.yml ${PWD}/${GEN_API_PY}
|
||||
make -C ${PWD}/${GEN_API_PY} build version=${NPM_VERSION}
|
||||
|
||||
gen-client-go: gen-clean-go ## Build and install the authentik API for Golang
|
||||
gen-client-go: ## Build and install the authentik API for Golang
|
||||
mkdir -p ${PWD}/${GEN_API_GO}
|
||||
ifeq ($(wildcard ${PWD}/${GEN_API_GO}/.*),)
|
||||
git clone --depth 1 https://github.com/goauthentik/client-go.git ${PWD}/${GEN_API_GO}
|
||||
else
|
||||
cd ${PWD}/${GEN_API_GO} && git reset --hard
|
||||
cd ${PWD}/${GEN_API_GO} && git pull
|
||||
endif
|
||||
cp ${PWD}/schema.yml ${PWD}/${GEN_API_GO}
|
||||
make -C ${PWD}/${GEN_API_GO} build version=${NPM_VERSION}
|
||||
make -C ${PWD}/${GEN_API_GO} build
|
||||
go mod edit -replace goauthentik.io/api/v3=./${GEN_API_GO}
|
||||
|
||||
gen-dev-config: ## Generate a local development config file
|
||||
|
||||
@@ -20,8 +20,8 @@ Even if the issue is not a CVE, we still greatly appreciate your help in hardeni
|
||||
|
||||
| Version | Supported |
|
||||
| ---------- | ---------- |
|
||||
| 2025.10.x | ✅ |
|
||||
| 2025.12.x | ✅ |
|
||||
| 2026.2.x | ✅ |
|
||||
|
||||
## Reporting a Vulnerability
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
from functools import lru_cache
|
||||
from os import environ
|
||||
|
||||
VERSION = "2026.2.1"
|
||||
VERSION = "2026.2.0-rc1"
|
||||
ENV_GIT_HASH_KEY = "GIT_BUILD_HASH"
|
||||
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ from rest_framework.views import APIView
|
||||
|
||||
from authentik import authentik_full_version
|
||||
from authentik.core.api.utils import PassiveSerializer
|
||||
from authentik.enterprise.license import LicenseKey
|
||||
from authentik.lib.config import CONFIG
|
||||
from authentik.lib.utils.reflection import get_env
|
||||
from authentik.outposts.apps import MANAGED_OUTPOST
|
||||
@@ -25,15 +26,6 @@ from authentik.outposts.models import Outpost
|
||||
from authentik.rbac.permissions import HasPermission
|
||||
|
||||
|
||||
def fips_enabled():
|
||||
try:
|
||||
from authentik.enterprise.license import LicenseKey
|
||||
|
||||
return backend._fips_enabled if LicenseKey.get_total().status().is_valid else None
|
||||
except ModuleNotFoundError:
|
||||
return None
|
||||
|
||||
|
||||
class RuntimeDict(TypedDict):
|
||||
"""Runtime information"""
|
||||
|
||||
@@ -88,7 +80,9 @@ class SystemInfoSerializer(PassiveSerializer):
|
||||
"architecture": platform.machine(),
|
||||
"authentik_version": authentik_full_version(),
|
||||
"environment": get_env(),
|
||||
"openssl_fips_enabled": fips_enabled(),
|
||||
"openssl_fips_enabled": (
|
||||
backend._fips_enabled if LicenseKey.get_total().status().is_valid else None
|
||||
),
|
||||
"openssl_version": OPENSSL_VERSION,
|
||||
"platform": platform.platform(),
|
||||
"python_version": python_version,
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import mimetypes
|
||||
|
||||
from django.db.models import Q
|
||||
from django.utils.translation import gettext as _
|
||||
from drf_spectacular.utils import extend_schema
|
||||
@@ -10,14 +12,13 @@ from rest_framework.request import Request
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
|
||||
from authentik.admin.files.backends.base import get_content_type
|
||||
from authentik.admin.files.fields import FileField as AkFileField
|
||||
from authentik.admin.files.manager import get_file_manager
|
||||
from authentik.admin.files.usage import FileApiUsage
|
||||
from authentik.admin.files.validation import validate_upload_file_name
|
||||
from authentik.api.validation import validate
|
||||
from authentik.core.api.used_by import DeleteAction, UsedBySerializer
|
||||
from authentik.core.api.utils import PassiveSerializer, ThemedUrlsSerializer
|
||||
from authentik.core.api.utils import PassiveSerializer
|
||||
from authentik.events.models import Event, EventAction
|
||||
from authentik.lib.utils.reflection import get_apps
|
||||
from authentik.rbac.permissions import HasPermission
|
||||
@@ -25,6 +26,11 @@ from authentik.rbac.permissions import HasPermission
|
||||
MAX_FILE_SIZE_BYTES = 25 * 1024 * 1024 # 25MB
|
||||
|
||||
|
||||
def get_mime_from_filename(filename: str) -> str:
|
||||
mime_type, _ = mimetypes.guess_type(filename)
|
||||
return mime_type or "application/octet-stream"
|
||||
|
||||
|
||||
class FileView(APIView):
|
||||
pagination_class = None
|
||||
parser_classes = [MultiPartParser]
|
||||
@@ -47,7 +53,6 @@ class FileView(APIView):
|
||||
name = CharField()
|
||||
mime_type = CharField()
|
||||
url = CharField()
|
||||
themed_urls = ThemedUrlsSerializer(required=False, allow_null=True)
|
||||
|
||||
@extend_schema(
|
||||
parameters=[FileListParameters],
|
||||
@@ -75,9 +80,8 @@ class FileView(APIView):
|
||||
FileView.FileListSerializer(
|
||||
data={
|
||||
"name": file,
|
||||
"url": manager.file_url(file, request),
|
||||
"mime_type": get_content_type(file),
|
||||
"themed_urls": manager.themed_urls(file, request),
|
||||
"url": manager.file_url(file),
|
||||
"mime_type": get_mime_from_filename(file),
|
||||
}
|
||||
)
|
||||
for file in files
|
||||
@@ -146,7 +150,7 @@ class FileView(APIView):
|
||||
"pk": name,
|
||||
"name": name,
|
||||
"usage": usage.value,
|
||||
"mime_type": get_content_type(name),
|
||||
"mime_type": get_mime_from_filename(name),
|
||||
},
|
||||
).from_http(request)
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import mimetypes
|
||||
from collections.abc import Callable, Generator, Iterator
|
||||
from typing import cast
|
||||
|
||||
@@ -11,32 +10,6 @@ from authentik.admin.files.usage import FileUsage
|
||||
CACHE_PREFIX = "goauthentik.io/admin/files"
|
||||
LOGGER = get_logger()
|
||||
|
||||
# Theme variable placeholder for theme-specific files like logo-%(theme)s.png
|
||||
THEME_VARIABLE = "%(theme)s"
|
||||
|
||||
|
||||
def get_content_type(name: str) -> str:
|
||||
"""Get MIME type for a file based on its extension."""
|
||||
content_type, _ = mimetypes.guess_type(name)
|
||||
return content_type or "application/octet-stream"
|
||||
|
||||
|
||||
def get_valid_themes() -> list[str]:
|
||||
"""Get valid themes that can be substituted for %(theme)s."""
|
||||
from authentik.brands.api import Themes
|
||||
|
||||
return [t.value for t in Themes if t != Themes.AUTOMATIC]
|
||||
|
||||
|
||||
def has_theme_variable(name: str) -> bool:
|
||||
"""Check if filename contains %(theme)s variable."""
|
||||
return THEME_VARIABLE in name
|
||||
|
||||
|
||||
def substitute_theme(name: str, theme: str) -> str:
|
||||
"""Replace %(theme)s with the given theme."""
|
||||
return name.replace(THEME_VARIABLE, theme)
|
||||
|
||||
|
||||
class Backend:
|
||||
"""
|
||||
@@ -102,29 +75,6 @@ class Backend:
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def themed_urls(
|
||||
self,
|
||||
name: str,
|
||||
request: HttpRequest | None = None,
|
||||
) -> dict[str, str] | None:
|
||||
"""
|
||||
Get URLs for each theme variant when filename contains %(theme)s.
|
||||
|
||||
Args:
|
||||
name: File path potentially containing %(theme)s
|
||||
request: Optional Django HttpRequest for URL building
|
||||
|
||||
Returns:
|
||||
Dict mapping theme to URL if %(theme)s present, None otherwise
|
||||
"""
|
||||
if not has_theme_variable(name):
|
||||
return None
|
||||
|
||||
return {
|
||||
theme: self.file_url(substitute_theme(name, theme), request, use_cache=True)
|
||||
for theme in get_valid_themes()
|
||||
}
|
||||
|
||||
|
||||
class ManageableBackend(Backend):
|
||||
"""
|
||||
|
||||
@@ -46,25 +46,3 @@ class PassthroughBackend(Backend):
|
||||
) -> str:
|
||||
"""Return the URL as-is for passthrough files."""
|
||||
return name
|
||||
|
||||
def themed_urls(
|
||||
self,
|
||||
name: str,
|
||||
request: HttpRequest | None = None,
|
||||
) -> dict[str, str] | None:
|
||||
"""Support themed URLs for external URLs with %(theme)s placeholder.
|
||||
|
||||
If the external URL contains %(theme)s, substitute it for each theme.
|
||||
We can't verify that themed variants exist at the external location,
|
||||
but we trust the user to provide valid URLs.
|
||||
"""
|
||||
from authentik.admin.files.backends.base import (
|
||||
get_valid_themes,
|
||||
has_theme_variable,
|
||||
substitute_theme,
|
||||
)
|
||||
|
||||
if not has_theme_variable(name):
|
||||
return None
|
||||
|
||||
return {theme: substitute_theme(name, theme) for theme in get_valid_themes()}
|
||||
|
||||
@@ -9,7 +9,7 @@ from botocore.exceptions import ClientError
|
||||
from django.db import connection
|
||||
from django.http.request import HttpRequest
|
||||
|
||||
from authentik.admin.files.backends.base import ManageableBackend, get_content_type
|
||||
from authentik.admin.files.backends.base import ManageableBackend
|
||||
from authentik.admin.files.usage import FileUsage
|
||||
from authentik.lib.config import CONFIG
|
||||
from authentik.lib.utils.time import timedelta_from_string
|
||||
@@ -204,7 +204,6 @@ class S3Backend(ManageableBackend):
|
||||
Key=f"{self.base_path}/{name}",
|
||||
Body=content,
|
||||
ACL="private",
|
||||
ContentType=get_content_type(name),
|
||||
)
|
||||
|
||||
@contextmanager
|
||||
@@ -220,7 +219,6 @@ class S3Backend(ManageableBackend):
|
||||
Key=f"{self.base_path}/{name}",
|
||||
ExtraArgs={
|
||||
"ACL": "private",
|
||||
"ContentType": get_content_type(name),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -165,31 +165,3 @@ class TestFileBackend(FileTestFileBackendMixin, TestCase):
|
||||
def test_file_exists_false(self):
|
||||
"""Test file_exists returns False for nonexistent file"""
|
||||
self.assertFalse(self.backend.file_exists("does_not_exist.txt"))
|
||||
|
||||
def test_themed_urls_without_theme_variable(self):
|
||||
"""Test themed_urls returns None when filename has no %(theme)s"""
|
||||
file_name = "logo.png"
|
||||
result = self.backend.themed_urls(file_name)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_themed_urls_with_theme_variable(self):
|
||||
"""Test themed_urls returns dict of URLs for each theme"""
|
||||
file_name = "logo-%(theme)s.png"
|
||||
result = self.backend.themed_urls(file_name)
|
||||
|
||||
self.assertIsInstance(result, dict)
|
||||
self.assertIn("light", result)
|
||||
self.assertIn("dark", result)
|
||||
|
||||
# Check URLs contain the substituted theme
|
||||
self.assertIn("logo-light.png", result["light"])
|
||||
self.assertIn("logo-dark.png", result["dark"])
|
||||
|
||||
def test_themed_urls_multiple_theme_variables(self):
|
||||
"""Test themed_urls with multiple %(theme)s in path"""
|
||||
file_name = "%(theme)s/logo-%(theme)s.svg"
|
||||
result = self.backend.themed_urls(file_name)
|
||||
|
||||
self.assertIsInstance(result, dict)
|
||||
self.assertIn("light/logo-light.svg", result["light"])
|
||||
self.assertIn("dark/logo-dark.svg", result["dark"])
|
||||
|
||||
@@ -145,71 +145,3 @@ class TestS3Backend(FileTestS3BackendMixin, TestCase):
|
||||
f"Bucket name '{bucket_name}' appears {bucket_occurrences} times in URL, expected 1. "
|
||||
f"URL: {url}",
|
||||
)
|
||||
|
||||
def test_themed_urls_without_theme_variable(self):
|
||||
"""Test themed_urls returns None when filename has no %(theme)s"""
|
||||
result = self.media_s3_backend.themed_urls("logo.png")
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_themed_urls_with_theme_variable(self):
|
||||
"""Test themed_urls returns dict of presigned URLs for each theme"""
|
||||
result = self.media_s3_backend.themed_urls("logo-%(theme)s.png")
|
||||
|
||||
self.assertIsInstance(result, dict)
|
||||
self.assertIn("light", result)
|
||||
self.assertIn("dark", result)
|
||||
|
||||
# Check URLs are valid presigned URLs with correct file paths
|
||||
self.assertIn("logo-light.png", result["light"])
|
||||
self.assertIn("logo-dark.png", result["dark"])
|
||||
self.assertIn("X-Amz-Signature=", result["light"])
|
||||
self.assertIn("X-Amz-Signature=", result["dark"])
|
||||
|
||||
def test_themed_urls_multiple_theme_variables(self):
|
||||
"""Test themed_urls with multiple %(theme)s in path"""
|
||||
result = self.media_s3_backend.themed_urls("%(theme)s/logo-%(theme)s.svg")
|
||||
|
||||
self.assertIsInstance(result, dict)
|
||||
self.assertIn("light/logo-light.svg", result["light"])
|
||||
self.assertIn("dark/logo-dark.svg", result["dark"])
|
||||
|
||||
def test_save_file_sets_content_type_svg(self):
|
||||
"""Test save_file sets correct ContentType for SVG files"""
|
||||
self.media_s3_backend.save_file("test.svg", b"<svg></svg>")
|
||||
|
||||
response = self.media_s3_backend.client.head_object(
|
||||
Bucket=self.media_s3_bucket_name,
|
||||
Key="media/public/test.svg",
|
||||
)
|
||||
self.assertEqual(response["ContentType"], "image/svg+xml")
|
||||
|
||||
def test_save_file_sets_content_type_png(self):
|
||||
"""Test save_file sets correct ContentType for PNG files"""
|
||||
self.media_s3_backend.save_file("test.png", b"\x89PNG\r\n\x1a\n")
|
||||
|
||||
response = self.media_s3_backend.client.head_object(
|
||||
Bucket=self.media_s3_bucket_name,
|
||||
Key="media/public/test.png",
|
||||
)
|
||||
self.assertEqual(response["ContentType"], "image/png")
|
||||
|
||||
def test_save_file_stream_sets_content_type(self):
|
||||
"""Test save_file_stream sets correct ContentType"""
|
||||
with self.media_s3_backend.save_file_stream("test.css") as f:
|
||||
f.write(b"body { color: red; }")
|
||||
|
||||
response = self.media_s3_backend.client.head_object(
|
||||
Bucket=self.media_s3_bucket_name,
|
||||
Key="media/public/test.css",
|
||||
)
|
||||
self.assertEqual(response["ContentType"], "text/css")
|
||||
|
||||
def test_save_file_unknown_extension_octet_stream(self):
|
||||
"""Test save_file sets octet-stream for unknown extensions"""
|
||||
self.media_s3_backend.save_file("test.unknownext123", b"data")
|
||||
|
||||
response = self.media_s3_backend.client.head_object(
|
||||
Bucket=self.media_s3_bucket_name,
|
||||
Key="media/public/test.unknownext123",
|
||||
)
|
||||
self.assertEqual(response["ContentType"], "application/octet-stream")
|
||||
|
||||
@@ -88,28 +88,6 @@ class FileManager:
|
||||
LOGGER.warning(f"Could not find file backend for file: {name}")
|
||||
return ""
|
||||
|
||||
def themed_urls(
|
||||
self,
|
||||
name: str | None,
|
||||
request: HttpRequest | Request | None = None,
|
||||
) -> dict[str, str] | None:
|
||||
"""
|
||||
Get URLs for each theme variant when filename contains %(theme)s.
|
||||
|
||||
Returns dict mapping theme to URL if %(theme)s present, None otherwise.
|
||||
"""
|
||||
if not name:
|
||||
return None
|
||||
|
||||
if isinstance(request, Request):
|
||||
request = request._request
|
||||
|
||||
for backend in self.backends:
|
||||
if backend.supports_file(name):
|
||||
return backend.themed_urls(name, request)
|
||||
|
||||
return None
|
||||
|
||||
def _check_manageable(self) -> None:
|
||||
if not self.manageable:
|
||||
raise ImproperlyConfigured("No file management backend configured.")
|
||||
|
||||
@@ -5,6 +5,7 @@ from io import BytesIO
|
||||
from django.test import TestCase
|
||||
from django.urls import reverse
|
||||
|
||||
from authentik.admin.files.api import get_mime_from_filename
|
||||
from authentik.admin.files.manager import FileManager
|
||||
from authentik.admin.files.tests.utils import FileTestFileBackendMixin
|
||||
from authentik.admin.files.usage import FileUsage
|
||||
@@ -93,9 +94,8 @@ class TestFileAPI(FileTestFileBackendMixin, TestCase):
|
||||
self.assertIn(
|
||||
{
|
||||
"name": "/static/authentik/sources/ldap.png",
|
||||
"url": "http://testserver/static/authentik/sources/ldap.png",
|
||||
"url": "/static/authentik/sources/ldap.png",
|
||||
"mime_type": "image/png",
|
||||
"themed_urls": None,
|
||||
},
|
||||
response.data,
|
||||
)
|
||||
@@ -129,9 +129,8 @@ class TestFileAPI(FileTestFileBackendMixin, TestCase):
|
||||
self.assertIn(
|
||||
{
|
||||
"name": "/static/authentik/sources/ldap.png",
|
||||
"url": "http://testserver/static/authentik/sources/ldap.png",
|
||||
"url": "/static/authentik/sources/ldap.png",
|
||||
"mime_type": "image/png",
|
||||
"themed_urls": None,
|
||||
},
|
||||
response.data,
|
||||
)
|
||||
@@ -201,64 +200,30 @@ class TestFileAPI(FileTestFileBackendMixin, TestCase):
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn("field is required", str(response.data))
|
||||
|
||||
def test_list_files_includes_themed_urls_none(self):
|
||||
"""Test listing files includes themed_urls as None for non-themed files"""
|
||||
manager = FileManager(FileUsage.MEDIA)
|
||||
file_name = "test-no-theme.png"
|
||||
manager.save_file(file_name, b"test content")
|
||||
|
||||
response = self.client.get(
|
||||
reverse("authentik_api:files", query={"search": file_name, "manageableOnly": "true"})
|
||||
)
|
||||
class TestGetMimeFromFilename(TestCase):
|
||||
"""Test get_mime_from_filename function"""
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
file_entry = next((f for f in response.data if f["name"] == file_name), None)
|
||||
self.assertIsNotNone(file_entry)
|
||||
self.assertIn("themed_urls", file_entry)
|
||||
self.assertIsNone(file_entry["themed_urls"])
|
||||
def test_image_png(self):
|
||||
"""Test PNG image MIME type"""
|
||||
self.assertEqual(get_mime_from_filename("test.png"), "image/png")
|
||||
|
||||
manager.delete_file(file_name)
|
||||
def test_image_jpeg(self):
|
||||
"""Test JPEG image MIME type"""
|
||||
self.assertEqual(get_mime_from_filename("test.jpg"), "image/jpeg")
|
||||
|
||||
def test_list_files_includes_themed_urls_dict(self):
|
||||
"""Test listing files includes themed_urls as dict for themed files"""
|
||||
manager = FileManager(FileUsage.MEDIA)
|
||||
file_name = "logo-%(theme)s.svg"
|
||||
manager.save_file("logo-light.svg", b"<svg>light</svg>")
|
||||
manager.save_file("logo-dark.svg", b"<svg>dark</svg>")
|
||||
manager.save_file(file_name, b"<svg>placeholder</svg>")
|
||||
def test_image_svg(self):
|
||||
"""Test SVG image MIME type"""
|
||||
self.assertEqual(get_mime_from_filename("test.svg"), "image/svg+xml")
|
||||
|
||||
response = self.client.get(
|
||||
reverse("authentik_api:files", query={"search": "%(theme)s", "manageableOnly": "true"})
|
||||
)
|
||||
def test_text_plain(self):
|
||||
"""Test text file MIME type"""
|
||||
self.assertEqual(get_mime_from_filename("test.txt"), "text/plain")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
file_entry = next((f for f in response.data if f["name"] == file_name), None)
|
||||
self.assertIsNotNone(file_entry)
|
||||
self.assertIn("themed_urls", file_entry)
|
||||
self.assertIsInstance(file_entry["themed_urls"], dict)
|
||||
self.assertIn("light", file_entry["themed_urls"])
|
||||
self.assertIn("dark", file_entry["themed_urls"])
|
||||
def test_unknown_extension(self):
|
||||
"""Test unknown extension returns octet-stream"""
|
||||
self.assertEqual(get_mime_from_filename("test.unknown"), "application/octet-stream")
|
||||
|
||||
manager.delete_file(file_name)
|
||||
manager.delete_file("logo-light.svg")
|
||||
manager.delete_file("logo-dark.svg")
|
||||
|
||||
def test_upload_file_with_theme_variable(self):
|
||||
"""Test uploading file with %(theme)s in name"""
|
||||
manager = FileManager(FileUsage.MEDIA)
|
||||
file_name = "brand-logo-%(theme)s.svg"
|
||||
file_content = b"<svg></svg>"
|
||||
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:files"),
|
||||
{
|
||||
"file": BytesIO(file_content),
|
||||
"name": file_name,
|
||||
"usage": FileUsage.MEDIA.value,
|
||||
},
|
||||
format="multipart",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertTrue(manager.file_exists(file_name))
|
||||
manager.delete_file(file_name)
|
||||
def test_no_extension(self):
|
||||
"""Test no extension returns octet-stream"""
|
||||
self.assertEqual(get_mime_from_filename("test"), "application/octet-stream")
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
"""Test file service layer"""
|
||||
|
||||
from unittest import skipUnless
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from django.http import HttpRequest
|
||||
from django.test import TestCase
|
||||
@@ -105,71 +104,3 @@ class TestResolveFileUrlS3Backend(FileTestS3BackendMixin, TestCase):
|
||||
|
||||
# S3 URLs should be returned as-is (already absolute)
|
||||
self.assertTrue(result.startswith("http://s3.test:8080/test"))
|
||||
|
||||
|
||||
class TestThemedUrls(FileTestFileBackendMixin, TestCase):
|
||||
"""Test FileManager.themed_urls method"""
|
||||
|
||||
def test_themed_urls_none_path(self):
|
||||
"""Test themed_urls returns None for None path"""
|
||||
manager = FileManager(FileUsage.MEDIA)
|
||||
result = manager.themed_urls(None)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_themed_urls_empty_path(self):
|
||||
"""Test themed_urls returns None for empty path"""
|
||||
manager = FileManager(FileUsage.MEDIA)
|
||||
result = manager.themed_urls("")
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_themed_urls_no_theme_variable(self):
|
||||
"""Test themed_urls returns None when no %(theme)s in path"""
|
||||
manager = FileManager(FileUsage.MEDIA)
|
||||
result = manager.themed_urls("logo.png")
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_themed_urls_with_theme_variable(self):
|
||||
"""Test themed_urls returns dict of URLs for each theme"""
|
||||
manager = FileManager(FileUsage.MEDIA)
|
||||
result = manager.themed_urls("logo-%(theme)s.png")
|
||||
|
||||
self.assertIsInstance(result, dict)
|
||||
self.assertIn("light", result)
|
||||
self.assertIn("dark", result)
|
||||
self.assertIn("logo-light.png", result["light"])
|
||||
self.assertIn("logo-dark.png", result["dark"])
|
||||
|
||||
def test_themed_urls_with_request(self):
|
||||
"""Test themed_urls builds absolute URLs with request"""
|
||||
mock_request = HttpRequest()
|
||||
mock_request.META = {
|
||||
"HTTP_HOST": "example.com",
|
||||
"SERVER_NAME": "example.com",
|
||||
}
|
||||
|
||||
manager = FileManager(FileUsage.MEDIA)
|
||||
result = manager.themed_urls("logo-%(theme)s.svg", mock_request)
|
||||
|
||||
self.assertIsInstance(result, dict)
|
||||
light_url = urlparse(result["light"])
|
||||
dark_url = urlparse(result["dark"])
|
||||
self.assertEqual(light_url.scheme, "http")
|
||||
self.assertEqual(light_url.netloc, "example.com")
|
||||
self.assertEqual(dark_url.scheme, "http")
|
||||
self.assertEqual(dark_url.netloc, "example.com")
|
||||
|
||||
def test_themed_urls_passthrough_with_theme_variable(self):
|
||||
"""Test themed_urls returns dict for passthrough URLs with %(theme)s"""
|
||||
manager = FileManager(FileUsage.MEDIA)
|
||||
# External URLs with %(theme)s should return themed URLs
|
||||
result = manager.themed_urls("https://example.com/logo-%(theme)s.png")
|
||||
self.assertIsInstance(result, dict)
|
||||
self.assertEqual(result["light"], "https://example.com/logo-light.png")
|
||||
self.assertEqual(result["dark"], "https://example.com/logo-dark.png")
|
||||
|
||||
def test_themed_urls_passthrough_without_theme_variable(self):
|
||||
"""Test themed_urls returns None for passthrough URLs without %(theme)s"""
|
||||
manager = FileManager(FileUsage.MEDIA)
|
||||
# External URLs without %(theme)s should return None
|
||||
result = manager.themed_urls("https://example.com/logo.png")
|
||||
self.assertIsNone(result)
|
||||
|
||||
@@ -4,7 +4,6 @@ from pathlib import PurePosixPath
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from authentik.admin.files.backends.base import THEME_VARIABLE
|
||||
from authentik.admin.files.backends.passthrough import PassthroughBackend
|
||||
from authentik.admin.files.backends.static import StaticBackend
|
||||
from authentik.admin.files.usage import FileUsage
|
||||
@@ -13,6 +12,10 @@ from authentik.admin.files.usage import FileUsage
|
||||
MAX_FILE_NAME_LENGTH = 1024
|
||||
MAX_PATH_COMPONENT_LENGTH = 255
|
||||
|
||||
# Theme variable placeholder that can be used in file paths
|
||||
# This allows for theme-specific files like logo-%(theme)s.png
|
||||
THEME_VARIABLE = "%(theme)s"
|
||||
|
||||
|
||||
def validate_file_name(name: str) -> None:
|
||||
if PassthroughBackend(FileUsage.MEDIA).supports_file(name) or StaticBackend(
|
||||
@@ -41,16 +44,16 @@ def validate_upload_file_name(
|
||||
raise ValidationError(_("File name cannot be empty"))
|
||||
|
||||
# Allow %(theme)s placeholder for theme-specific files
|
||||
# Replace with placeholder for validation, then check the result
|
||||
# We temporarily replace it for validation, then check the result
|
||||
name_for_validation = name.replace(THEME_VARIABLE, "theme")
|
||||
|
||||
# Same regex is used in the frontend as well (with %(theme)s handling)
|
||||
# Same regex is used in the frontend as well (without %(theme)s handling there)
|
||||
if not re.match(r"^[a-zA-Z0-9._/-]+$", name_for_validation):
|
||||
raise ValidationError(
|
||||
_(
|
||||
"File name can only contain letters (a-z, A-Z), numbers (0-9), "
|
||||
"dots (.), hyphens (-), underscores (_), forward slashes (/), "
|
||||
"and the placeholder %(theme)s for theme-specific files"
|
||||
"and the special placeholder %(theme)s for theme-specific files"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -13,10 +13,10 @@ from rest_framework.exceptions import AuthenticationFailed
|
||||
from rest_framework.request import Request
|
||||
from structlog.stdlib import get_logger
|
||||
|
||||
from authentik.common.oauth.constants import SCOPE_AUTHENTIK_API
|
||||
from authentik.core.middleware import CTX_AUTH_VIA
|
||||
from authentik.core.models import Token, TokenIntents, User, UserTypes
|
||||
from authentik.outposts.models import Outpost
|
||||
from authentik.providers.oauth2.constants import SCOPE_AUTHENTIK_API
|
||||
|
||||
LOGGER = get_logger()
|
||||
_tmp = Path(gettempdir())
|
||||
|
||||
@@ -11,12 +11,12 @@ from rest_framework.exceptions import AuthenticationFailed
|
||||
|
||||
from authentik.api.authentication import IPCUser, TokenAuthentication
|
||||
from authentik.blueprints.tests import reconcile_app
|
||||
from authentik.common.oauth.constants import SCOPE_AUTHENTIK_API
|
||||
from authentik.core.models import Token, TokenIntents, UserTypes
|
||||
from authentik.core.tests.utils import create_test_admin_user, create_test_flow
|
||||
from authentik.lib.generators import generate_id
|
||||
from authentik.outposts.apps import MANAGED_OUTPOST
|
||||
from authentik.outposts.models import Outpost
|
||||
from authentik.providers.oauth2.constants import SCOPE_AUTHENTIK_API
|
||||
from authentik.providers.oauth2.models import AccessToken, OAuth2Provider
|
||||
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
"""Schema generation tests"""
|
||||
|
||||
from pathlib import Path
|
||||
from tempfile import gettempdir
|
||||
from uuid import uuid4
|
||||
|
||||
from django.core.management import call_command
|
||||
from django.urls import reverse
|
||||
@@ -31,14 +29,15 @@ class TestSchemaGeneration(APITestCase):
|
||||
|
||||
def test_build_schema(self):
|
||||
"""Test schema build command"""
|
||||
tmp = Path(gettempdir())
|
||||
blueprint_file = tmp / f"{str(uuid4())}.json"
|
||||
api_file = tmp / f"{str(uuid4())}.yml"
|
||||
blueprint_file = Path("blueprints/schema.json")
|
||||
api_file = Path("schema.yml")
|
||||
blueprint_file.unlink()
|
||||
api_file.unlink()
|
||||
with (
|
||||
CONFIG.patch("debug", True),
|
||||
CONFIG.patch("tenants.enabled", True),
|
||||
CONFIG.patch("outposts.disable_embedded_outpost", True),
|
||||
):
|
||||
call_command("build_schema", blueprint_file=blueprint_file, api_file=api_file)
|
||||
call_command("build_schema")
|
||||
self.assertTrue(blueprint_file.exists())
|
||||
self.assertTrue(api_file.exists())
|
||||
|
||||
@@ -18,7 +18,7 @@ entries:
|
||||
name: foo
|
||||
title: foo
|
||||
permissions:
|
||||
- permission: authentik_flows.view_flow
|
||||
- permission: view_flow
|
||||
user: !KeyOf user
|
||||
- permission: authentik_flows.view_flow
|
||||
- permission: view_flow
|
||||
role: !KeyOf role
|
||||
|
||||
@@ -43,6 +43,8 @@ def get_attrs(obj: SerializerModel) -> dict[str, Any]:
|
||||
continue
|
||||
if _field.read_only:
|
||||
data.pop(field_name, None)
|
||||
if _field.get_initial() == data.get(field_name, None):
|
||||
data.pop(field_name, None)
|
||||
if field_name.endswith("_set"):
|
||||
data.pop(field_name, None)
|
||||
return data
|
||||
|
||||
@@ -15,7 +15,7 @@ from django.db.models import Model
|
||||
from django.db.models.query_utils import Q
|
||||
from django.db.transaction import atomic
|
||||
from django.db.utils import IntegrityError
|
||||
from guardian.models import RoleObjectPermission
|
||||
from guardian.models import RoleObjectPermission, UserObjectPermission
|
||||
from rest_framework.exceptions import ValidationError
|
||||
from rest_framework.serializers import BaseSerializer, Serializer
|
||||
from structlog.stdlib import BoundLogger, get_logger
|
||||
@@ -41,6 +41,7 @@ from authentik.core.models import (
|
||||
UserSourceConnection,
|
||||
)
|
||||
from authentik.endpoints.models import Connector
|
||||
from authentik.enterprise.license import LicenseKey
|
||||
from authentik.events.logs import LogEvent, capture_logs
|
||||
from authentik.events.utils import cleanse_dict
|
||||
from authentik.flows.models import Stage
|
||||
@@ -70,6 +71,7 @@ def excluded_models() -> list[type[Model]]:
|
||||
ContentType,
|
||||
Permission,
|
||||
RoleObjectPermission,
|
||||
UserObjectPermission,
|
||||
# Base classes
|
||||
Provider,
|
||||
Source,
|
||||
@@ -139,19 +141,10 @@ class Importer:
|
||||
|
||||
def default_context(self):
|
||||
"""Default context"""
|
||||
context = {
|
||||
return {
|
||||
"goauthentik.io/enterprise/licensed": LicenseKey.get_total().status().is_valid,
|
||||
"goauthentik.io/rbac/models": rbac_models(),
|
||||
"goauthentik.io/enterprise/licensed": False,
|
||||
}
|
||||
try:
|
||||
from authentik.enterprise.license import LicenseKey
|
||||
|
||||
context["goauthentik.io/enterprise/licensed"] = (
|
||||
LicenseKey.get_total().status().is_valid,
|
||||
)
|
||||
except ModuleNotFoundError:
|
||||
pass
|
||||
return context
|
||||
|
||||
@staticmethod
|
||||
def from_string(yaml_input: str, context: dict | None = None) -> Importer:
|
||||
|
||||
@@ -6,12 +6,7 @@ from django.db import models
|
||||
from drf_spectacular.utils import extend_schema, extend_schema_field
|
||||
from rest_framework.decorators import action
|
||||
from rest_framework.exceptions import ValidationError
|
||||
from rest_framework.fields import (
|
||||
CharField,
|
||||
ChoiceField,
|
||||
ListField,
|
||||
SerializerMethodField,
|
||||
)
|
||||
from rest_framework.fields import CharField, ChoiceField, ListField, SerializerMethodField
|
||||
from rest_framework.filters import OrderingFilter, SearchFilter
|
||||
from rest_framework.permissions import AllowAny
|
||||
from rest_framework.request import Request
|
||||
@@ -21,7 +16,7 @@ from rest_framework.viewsets import ModelViewSet
|
||||
|
||||
from authentik.brands.models import Brand
|
||||
from authentik.core.api.used_by import UsedByMixin
|
||||
from authentik.core.api.utils import ModelSerializer, PassiveSerializer, ThemedUrlsSerializer
|
||||
from authentik.core.api.utils import ModelSerializer, PassiveSerializer
|
||||
from authentik.rbac.filters import SecretKeyFilter
|
||||
from authentik.tenants.api.settings import FlagJSONField
|
||||
from authentik.tenants.flags import Flag
|
||||
@@ -95,9 +90,7 @@ class CurrentBrandSerializer(PassiveSerializer):
|
||||
matched_domain = CharField(source="domain")
|
||||
branding_title = CharField()
|
||||
branding_logo = CharField(source="branding_logo_url")
|
||||
branding_logo_themed_urls = ThemedUrlsSerializer(read_only=True, allow_null=True)
|
||||
branding_favicon = CharField(source="branding_favicon_url")
|
||||
branding_favicon_themed_urls = ThemedUrlsSerializer(read_only=True, allow_null=True)
|
||||
branding_custom_css = CharField()
|
||||
ui_footer_links = ListField(
|
||||
child=FooterLinkSerializer(),
|
||||
|
||||
@@ -89,26 +89,14 @@ class Brand(SerializerModel):
|
||||
"""Get branding_logo URL"""
|
||||
return get_file_manager(FileUsage.MEDIA).file_url(self.branding_logo)
|
||||
|
||||
def branding_logo_themed_urls(self) -> dict[str, str] | None:
|
||||
"""Get themed URLs for branding_logo if it contains %(theme)s"""
|
||||
return get_file_manager(FileUsage.MEDIA).themed_urls(self.branding_logo)
|
||||
|
||||
def branding_favicon_url(self) -> str:
|
||||
"""Get branding_favicon URL"""
|
||||
return get_file_manager(FileUsage.MEDIA).file_url(self.branding_favicon)
|
||||
|
||||
def branding_favicon_themed_urls(self) -> dict[str, str] | None:
|
||||
"""Get themed URLs for branding_favicon if it contains %(theme)s"""
|
||||
return get_file_manager(FileUsage.MEDIA).themed_urls(self.branding_favicon)
|
||||
|
||||
def branding_default_flow_background_url(self) -> str:
|
||||
"""Get branding_default_flow_background URL"""
|
||||
return get_file_manager(FileUsage.MEDIA).file_url(self.branding_default_flow_background)
|
||||
|
||||
def branding_default_flow_background_themed_urls(self) -> dict[str, str] | None:
|
||||
"""Get themed URLs for branding_default_flow_background if it contains %(theme)s"""
|
||||
return get_file_manager(FileUsage.MEDIA).themed_urls(self.branding_default_flow_background)
|
||||
|
||||
@property
|
||||
def serializer(self) -> type[Serializer]:
|
||||
from authentik.brands.api import BrandSerializer
|
||||
|
||||
@@ -6,6 +6,7 @@ from django.urls import reverse
|
||||
from rest_framework.test import APITestCase
|
||||
|
||||
from authentik.blueprints.tests import apply_blueprint
|
||||
from authentik.brands.api import Themes
|
||||
from authentik.brands.models import Brand
|
||||
from authentik.core.models import Application
|
||||
from authentik.core.tests.utils import create_test_admin_user, create_test_brand
|
||||
@@ -32,14 +33,12 @@ class TestBrands(APITestCase):
|
||||
self.client.get(reverse("authentik_api:brand-current")).content.decode(),
|
||||
{
|
||||
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
|
||||
"branding_logo_themed_urls": None,
|
||||
"branding_favicon": "/static/dist/assets/icons/icon.png",
|
||||
"branding_favicon_themed_urls": None,
|
||||
"branding_title": "authentik",
|
||||
"branding_custom_css": "",
|
||||
"matched_domain": brand.domain,
|
||||
"ui_footer_links": [],
|
||||
"ui_theme": "automatic",
|
||||
"ui_theme": Themes.AUTOMATIC,
|
||||
"default_locale": "",
|
||||
"flags": self.default_flags,
|
||||
},
|
||||
@@ -54,14 +53,12 @@ class TestBrands(APITestCase):
|
||||
).content.decode(),
|
||||
{
|
||||
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
|
||||
"branding_logo_themed_urls": None,
|
||||
"branding_favicon": "/static/dist/assets/icons/icon.png",
|
||||
"branding_favicon_themed_urls": None,
|
||||
"branding_title": "custom",
|
||||
"branding_custom_css": "",
|
||||
"matched_domain": "bar.baz",
|
||||
"ui_footer_links": [],
|
||||
"ui_theme": "automatic",
|
||||
"ui_theme": Themes.AUTOMATIC,
|
||||
"default_locale": "",
|
||||
"flags": self.default_flags,
|
||||
},
|
||||
@@ -73,14 +70,12 @@ class TestBrands(APITestCase):
|
||||
self.client.get(reverse("authentik_api:brand-current")).content.decode(),
|
||||
{
|
||||
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
|
||||
"branding_logo_themed_urls": None,
|
||||
"branding_favicon": "/static/dist/assets/icons/icon.png",
|
||||
"branding_favicon_themed_urls": None,
|
||||
"branding_title": "authentik",
|
||||
"branding_custom_css": "",
|
||||
"matched_domain": "fallback",
|
||||
"ui_footer_links": [],
|
||||
"ui_theme": "automatic",
|
||||
"ui_theme": Themes.AUTOMATIC,
|
||||
"default_locale": "",
|
||||
"flags": self.default_flags,
|
||||
},
|
||||
@@ -97,14 +92,12 @@ class TestBrands(APITestCase):
|
||||
response,
|
||||
{
|
||||
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
|
||||
"branding_logo_themed_urls": None,
|
||||
"branding_favicon": "/static/dist/assets/icons/icon.png",
|
||||
"branding_favicon_themed_urls": None,
|
||||
"branding_title": "authentik",
|
||||
"branding_custom_css": "",
|
||||
"matched_domain": "authentik-default",
|
||||
"ui_footer_links": [],
|
||||
"ui_theme": "automatic",
|
||||
"ui_theme": Themes.AUTOMATIC,
|
||||
"default_locale": "",
|
||||
"flags": self.default_flags,
|
||||
},
|
||||
@@ -122,14 +115,12 @@ class TestBrands(APITestCase):
|
||||
response,
|
||||
{
|
||||
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
|
||||
"branding_logo_themed_urls": None,
|
||||
"branding_favicon": "/static/dist/assets/icons/icon.png",
|
||||
"branding_favicon_themed_urls": None,
|
||||
"branding_title": "authentik",
|
||||
"branding_custom_css": "",
|
||||
"matched_domain": "authentik-default",
|
||||
"ui_footer_links": [],
|
||||
"ui_theme": "automatic",
|
||||
"ui_theme": Themes.AUTOMATIC,
|
||||
"default_locale": "",
|
||||
"flags": self.default_flags,
|
||||
},
|
||||
@@ -140,14 +131,12 @@ class TestBrands(APITestCase):
|
||||
).content.decode(),
|
||||
{
|
||||
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
|
||||
"branding_logo_themed_urls": None,
|
||||
"branding_favicon": "/static/dist/assets/icons/icon.png",
|
||||
"branding_favicon_themed_urls": None,
|
||||
"branding_title": "custom",
|
||||
"branding_custom_css": "",
|
||||
"matched_domain": "bar.baz",
|
||||
"ui_footer_links": [],
|
||||
"ui_theme": "automatic",
|
||||
"ui_theme": Themes.AUTOMATIC,
|
||||
"default_locale": "",
|
||||
"flags": self.default_flags,
|
||||
},
|
||||
@@ -163,14 +152,12 @@ class TestBrands(APITestCase):
|
||||
).content.decode(),
|
||||
{
|
||||
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
|
||||
"branding_logo_themed_urls": None,
|
||||
"branding_favicon": "/static/dist/assets/icons/icon.png",
|
||||
"branding_favicon_themed_urls": None,
|
||||
"branding_title": "custom-strong",
|
||||
"branding_custom_css": "",
|
||||
"matched_domain": "foo.bar.baz",
|
||||
"ui_footer_links": [],
|
||||
"ui_theme": "automatic",
|
||||
"ui_theme": Themes.AUTOMATIC,
|
||||
"default_locale": "",
|
||||
"flags": self.default_flags,
|
||||
},
|
||||
@@ -186,14 +173,12 @@ class TestBrands(APITestCase):
|
||||
).content.decode(),
|
||||
{
|
||||
"branding_logo": "/static/dist/assets/icons/icon_left_brand.svg",
|
||||
"branding_logo_themed_urls": None,
|
||||
"branding_favicon": "/static/dist/assets/icons/icon.png",
|
||||
"branding_favicon_themed_urls": None,
|
||||
"branding_title": "custom-weak",
|
||||
"branding_custom_css": "",
|
||||
"matched_domain": "bar.baz",
|
||||
"ui_footer_links": [],
|
||||
"ui_theme": "automatic",
|
||||
"ui_theme": Themes.AUTOMATIC,
|
||||
"default_locale": "",
|
||||
"flags": self.default_flags,
|
||||
},
|
||||
@@ -269,14 +254,12 @@ class TestBrands(APITestCase):
|
||||
self.client.get(reverse("authentik_api:brand-current")).content.decode(),
|
||||
{
|
||||
"branding_logo": "https://goauthentik.io/img/icon.png",
|
||||
"branding_logo_themed_urls": None,
|
||||
"branding_favicon": "https://goauthentik.io/img/icon.png",
|
||||
"branding_favicon_themed_urls": None,
|
||||
"branding_title": "authentik",
|
||||
"branding_custom_css": "",
|
||||
"matched_domain": brand.domain,
|
||||
"ui_footer_links": [],
|
||||
"ui_theme": "automatic",
|
||||
"ui_theme": Themes.AUTOMATIC,
|
||||
"default_locale": "",
|
||||
"flags": self.default_flags,
|
||||
},
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
from typing import Any
|
||||
|
||||
from django.db.models import Case, F, IntegerField, Q, Value, When
|
||||
from django.db.models.functions import Concat, Length
|
||||
from django.db.models.functions import Length
|
||||
from django.http.request import HttpRequest
|
||||
from django.utils.html import _json_script_escapes
|
||||
from django.utils.safestring import mark_safe
|
||||
@@ -26,8 +26,7 @@ def get_brand_for_request(request: HttpRequest) -> Brand:
|
||||
domain_length=Length("domain"),
|
||||
match_priority=Case(
|
||||
When(
|
||||
condition=Q(host_domain__iexact=F("domain"))
|
||||
| Q(host_domain__iendswith=Concat(Value("."), F("domain"))),
|
||||
condition=Q(host_domain__iendswith=F("domain")),
|
||||
then=F("domain_length"),
|
||||
),
|
||||
default=Value(-1),
|
||||
|
||||
@@ -24,7 +24,7 @@ from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
|
||||
from authentik.core.api.providers import ProviderSerializer
|
||||
from authentik.core.api.used_by import UsedByMixin
|
||||
from authentik.core.api.users import UserSerializer
|
||||
from authentik.core.api.utils import ModelSerializer, ThemedUrlsSerializer
|
||||
from authentik.core.api.utils import ModelSerializer
|
||||
from authentik.core.models import Application, User
|
||||
from authentik.events.logs import LogEventSerializer, capture_logs
|
||||
from authentik.policies.api.exec import PolicyTestResultSerializer
|
||||
@@ -53,9 +53,6 @@ class ApplicationSerializer(ModelSerializer):
|
||||
)
|
||||
|
||||
meta_icon_url = ReadOnlyField(source="get_meta_icon")
|
||||
meta_icon_themed_urls = ThemedUrlsSerializer(
|
||||
source="get_meta_icon_themed_urls", read_only=True, allow_null=True
|
||||
)
|
||||
|
||||
def get_launch_url(self, app: Application) -> str | None:
|
||||
"""Allow formatting of launch URL"""
|
||||
@@ -66,7 +63,7 @@ class ApplicationSerializer(ModelSerializer):
|
||||
user = self.context["request"].user
|
||||
|
||||
# Cache serialized user data to avoid N+1 when formatting launch URLs
|
||||
# for multiple applications. UserSerializer accesses user.groups which
|
||||
# for multiple applications. UserSerializer accesses user.ak_groups which
|
||||
# would otherwise trigger a query for each application.
|
||||
if user is not None:
|
||||
if "_cached_user_data" not in self.context:
|
||||
@@ -105,7 +102,6 @@ class ApplicationSerializer(ModelSerializer):
|
||||
"meta_launch_url",
|
||||
"meta_icon",
|
||||
"meta_icon_url",
|
||||
"meta_icon_themed_urls",
|
||||
"meta_description",
|
||||
"meta_publisher",
|
||||
"policy_engine_mode",
|
||||
|
||||
@@ -16,15 +16,11 @@ from rest_framework.viewsets import ViewSet
|
||||
from authentik.api.validation import validate
|
||||
from authentik.core.api.users import ParamUserSerializer
|
||||
from authentik.core.api.utils import MetaNameSerializer
|
||||
from authentik.enterprise.stages.authenticator_endpoint_gdtc.models import EndpointDevice
|
||||
from authentik.stages.authenticator import device_classes, devices_for_user
|
||||
from authentik.stages.authenticator.models import Device
|
||||
from authentik.stages.authenticator_webauthn.models import WebAuthnDevice
|
||||
|
||||
try:
|
||||
from authentik.enterprise.stages.authenticator_endpoint_gdtc.models import EndpointDevice
|
||||
except ModuleNotFoundError:
|
||||
EndpointDevice = None
|
||||
|
||||
|
||||
class DeviceSerializer(MetaNameSerializer):
|
||||
"""Serializer for authenticator devices"""
|
||||
@@ -47,7 +43,7 @@ class DeviceSerializer(MetaNameSerializer):
|
||||
"""Get extra description"""
|
||||
if isinstance(instance, WebAuthnDevice):
|
||||
return instance.device_type.description if instance.device_type else None
|
||||
if EndpointDevice and isinstance(instance, EndpointDevice):
|
||||
if isinstance(instance, EndpointDevice):
|
||||
return instance.data.get("deviceSignals", {}).get("deviceModel")
|
||||
return None
|
||||
|
||||
@@ -55,7 +51,7 @@ class DeviceSerializer(MetaNameSerializer):
|
||||
"""Get external Device ID"""
|
||||
if isinstance(instance, WebAuthnDevice):
|
||||
return instance.device_type.aaguid if instance.device_type else None
|
||||
if EndpointDevice and isinstance(instance, EndpointDevice):
|
||||
if isinstance(instance, EndpointDevice):
|
||||
return instance.data.get("deviceSignals", {}).get("deviceModel")
|
||||
return None
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ from rest_framework.request import Request
|
||||
from rest_framework.response import Response
|
||||
|
||||
from authentik.core.api.utils import PassiveSerializer
|
||||
from authentik.enterprise.apps import EnterpriseConfig
|
||||
from authentik.lib.models import DeprecatedMixin
|
||||
from authentik.lib.utils.reflection import all_subclasses
|
||||
|
||||
@@ -60,25 +61,19 @@ class TypesMixin:
|
||||
continue
|
||||
instance = subclass()
|
||||
try:
|
||||
type_signature = {
|
||||
"name": subclass._meta.verbose_name,
|
||||
"description": subclass.__doc__,
|
||||
"component": instance.component,
|
||||
"model_name": subclass._meta.model_name,
|
||||
"icon_url": getattr(instance, "icon_url", None),
|
||||
"requires_enterprise": False,
|
||||
"deprecated": isinstance(instance, DeprecatedMixin),
|
||||
}
|
||||
try:
|
||||
from authentik.enterprise.apps import EnterpriseConfig
|
||||
|
||||
type_signature["requires_enterprise"] = isinstance(
|
||||
subclass._meta.app_config, EnterpriseConfig
|
||||
)
|
||||
except ModuleNotFoundError:
|
||||
pass
|
||||
|
||||
data.append(type_signature)
|
||||
data.append(
|
||||
{
|
||||
"name": subclass._meta.verbose_name,
|
||||
"description": subclass.__doc__,
|
||||
"component": instance.component,
|
||||
"model_name": subclass._meta.model_name,
|
||||
"icon_url": getattr(instance, "icon_url", None),
|
||||
"requires_enterprise": isinstance(
|
||||
subclass._meta.app_config, EnterpriseConfig
|
||||
),
|
||||
"deprecated": isinstance(instance, DeprecatedMixin),
|
||||
}
|
||||
)
|
||||
except NotImplementedError:
|
||||
continue
|
||||
if additional:
|
||||
|
||||
@@ -18,14 +18,10 @@ from authentik.core.models import Provider
|
||||
class ProviderSerializer(ModelSerializer, MetaNameSerializer):
|
||||
"""Provider Serializer"""
|
||||
|
||||
assigned_application_slug = ReadOnlyField(source="application.slug", allow_null=True)
|
||||
assigned_application_name = ReadOnlyField(source="application.name", allow_null=True)
|
||||
assigned_backchannel_application_slug = ReadOnlyField(
|
||||
source="backchannel_application.slug", allow_null=True
|
||||
)
|
||||
assigned_backchannel_application_name = ReadOnlyField(
|
||||
source="backchannel_application.name", allow_null=True
|
||||
)
|
||||
assigned_application_slug = ReadOnlyField(source="application.slug")
|
||||
assigned_application_name = ReadOnlyField(source="application.name")
|
||||
assigned_backchannel_application_slug = ReadOnlyField(source="backchannel_application.slug")
|
||||
assigned_backchannel_application_name = ReadOnlyField(source="backchannel_application.name")
|
||||
|
||||
component = SerializerMethodField()
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ from structlog.stdlib import get_logger
|
||||
|
||||
from authentik.core.api.object_types import TypesMixin
|
||||
from authentik.core.api.used_by import UsedByMixin
|
||||
from authentik.core.api.utils import MetaNameSerializer, ModelSerializer, ThemedUrlsSerializer
|
||||
from authentik.core.api.utils import MetaNameSerializer, ModelSerializer
|
||||
from authentik.core.models import GroupSourceConnection, Source, UserSourceConnection
|
||||
from authentik.core.types import UserSettingSerializer
|
||||
from authentik.policies.engine import PolicyEngine
|
||||
@@ -28,7 +28,6 @@ class SourceSerializer(ModelSerializer, MetaNameSerializer):
|
||||
managed = ReadOnlyField()
|
||||
component = SerializerMethodField()
|
||||
icon_url = ReadOnlyField()
|
||||
icon_themed_urls = ThemedUrlsSerializer(read_only=True, allow_null=True)
|
||||
|
||||
def get_component(self, obj: Source) -> str:
|
||||
"""Get object component so that we know how to edit the object"""
|
||||
@@ -58,7 +57,6 @@ class SourceSerializer(ModelSerializer, MetaNameSerializer):
|
||||
"user_path_template",
|
||||
"icon",
|
||||
"icon_url",
|
||||
"icon_themed_urls",
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -75,8 +75,7 @@ class TokenSerializer(ManagedSerializer, ModelSerializer):
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
expires = attrs.get("expires")
|
||||
if expires is not None and expires > max_token_lifetime_dt:
|
||||
if "expires" in attrs and attrs.get("expires") > max_token_lifetime_dt:
|
||||
raise ValidationError(
|
||||
{
|
||||
"expires": (
|
||||
|
||||
@@ -30,6 +30,7 @@ from drf_spectacular.utils import (
|
||||
extend_schema_field,
|
||||
inline_serializer,
|
||||
)
|
||||
from guardian.shortcuts import get_objects_for_user
|
||||
from rest_framework.authentication import SessionAuthentication
|
||||
from rest_framework.decorators import action
|
||||
from rest_framework.exceptions import ValidationError
|
||||
@@ -41,7 +42,6 @@ from rest_framework.fields import (
|
||||
IntegerField,
|
||||
ListField,
|
||||
SerializerMethodField,
|
||||
UUIDField,
|
||||
)
|
||||
from rest_framework.permissions import IsAuthenticated
|
||||
from rest_framework.request import Request
|
||||
@@ -72,14 +72,12 @@ from authentik.core.middleware import (
|
||||
from authentik.core.models import (
|
||||
USER_ATTRIBUTE_TOKEN_EXPIRING,
|
||||
USER_PATH_SERVICE_ACCOUNT,
|
||||
USERNAME_MAX_LENGTH,
|
||||
Group,
|
||||
Session,
|
||||
Token,
|
||||
TokenIntents,
|
||||
User,
|
||||
UserTypes,
|
||||
default_token_duration,
|
||||
)
|
||||
from authentik.endpoints.connectors.agent.auth import AgentAuth
|
||||
from authentik.events.models import Event, EventAction
|
||||
@@ -89,7 +87,6 @@ from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner
|
||||
from authentik.flows.views.executor import QS_KEY_TOKEN
|
||||
from authentik.lib.avatars import get_avatar
|
||||
from authentik.lib.utils.reflection import ConditionalInheritance
|
||||
from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator
|
||||
from authentik.rbac.api.roles import RoleSerializer
|
||||
from authentik.rbac.decorators import permission_required
|
||||
from authentik.rbac.models import Role, get_permission_choices
|
||||
@@ -132,6 +129,7 @@ class UserSerializer(ModelSerializer):
|
||||
groups = PrimaryKeyRelatedField(
|
||||
allow_empty=True,
|
||||
many=True,
|
||||
source="ak_groups",
|
||||
queryset=Group.objects.all().order_by("name"),
|
||||
default=list,
|
||||
)
|
||||
@@ -145,7 +143,7 @@ class UserSerializer(ModelSerializer):
|
||||
roles_obj = SerializerMethodField(allow_null=True)
|
||||
uid = CharField(read_only=True)
|
||||
username = CharField(
|
||||
max_length=USERNAME_MAX_LENGTH,
|
||||
max_length=150,
|
||||
validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))],
|
||||
)
|
||||
|
||||
@@ -167,7 +165,7 @@ class UserSerializer(ModelSerializer):
|
||||
def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None:
|
||||
if not self._should_include_groups:
|
||||
return None
|
||||
return PartialGroupSerializer(instance.groups, many=True).data
|
||||
return PartialGroupSerializer(instance.ak_groups, many=True).data
|
||||
|
||||
@extend_schema_field(RoleSerializer(many=True))
|
||||
def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None:
|
||||
@@ -241,14 +239,14 @@ class UserSerializer(ModelSerializer):
|
||||
and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT
|
||||
and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value
|
||||
):
|
||||
raise ValidationError(_("Can't change internal service account to other user type."))
|
||||
raise ValidationError("Can't change internal service account to other user type.")
|
||||
if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value:
|
||||
raise ValidationError(_("Setting a user to internal service account is not allowed."))
|
||||
raise ValidationError("Setting a user to internal service account is not allowed.")
|
||||
return user_type
|
||||
|
||||
def validate(self, attrs: dict) -> dict:
|
||||
if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT:
|
||||
raise ValidationError(_("Can't modify internal service account users"))
|
||||
raise ValidationError("Can't modify internal service account users")
|
||||
return super().validate(attrs)
|
||||
|
||||
class Meta:
|
||||
@@ -400,18 +398,6 @@ class UserServiceAccountSerializer(PassiveSerializer):
|
||||
)
|
||||
|
||||
|
||||
class UserRecoveryLinkSerializer(PassiveSerializer):
|
||||
"""Payload to create a recovery link"""
|
||||
|
||||
token_duration = CharField(required=False)
|
||||
|
||||
|
||||
class UserRecoveryEmailSerializer(UserRecoveryLinkSerializer):
|
||||
"""Payload to create and email a recovery link"""
|
||||
|
||||
email_stage = UUIDField()
|
||||
|
||||
|
||||
class UsersFilter(FilterSet):
|
||||
"""Filter for users"""
|
||||
|
||||
@@ -435,7 +421,7 @@ class UsersFilter(FilterSet):
|
||||
last_login__gt = IsoDateTimeFilter(field_name="last_login", lookup_expr="gt")
|
||||
last_login__isnull = BooleanFilter(field_name="last_login", lookup_expr="isnull")
|
||||
|
||||
is_superuser = BooleanFilter(field_name="groups", method="filter_is_superuser")
|
||||
is_superuser = BooleanFilter(field_name="ak_groups", method="filter_is_superuser")
|
||||
uuid = UUIDFilter(field_name="uuid")
|
||||
|
||||
path = CharFilter(field_name="path")
|
||||
@@ -444,12 +430,12 @@ class UsersFilter(FilterSet):
|
||||
type = MultipleChoiceFilter(choices=UserTypes.choices, field_name="type")
|
||||
|
||||
groups_by_name = ModelMultipleChoiceFilter(
|
||||
field_name="groups__name",
|
||||
field_name="ak_groups__name",
|
||||
to_field_name="name",
|
||||
queryset=Group.objects.all().order_by("name"),
|
||||
)
|
||||
groups_by_pk = ModelMultipleChoiceFilter(
|
||||
field_name="groups",
|
||||
field_name="ak_groups",
|
||||
queryset=Group.objects.all().order_by("name"),
|
||||
)
|
||||
|
||||
@@ -465,22 +451,22 @@ class UsersFilter(FilterSet):
|
||||
|
||||
def filter_is_superuser(self, queryset, name, value):
|
||||
if value:
|
||||
return queryset.filter(groups__is_superuser=True).distinct()
|
||||
return queryset.exclude(groups__is_superuser=True).distinct()
|
||||
return queryset.filter(ak_groups__is_superuser=True).distinct()
|
||||
return queryset.exclude(ak_groups__is_superuser=True).distinct()
|
||||
|
||||
def filter_attributes(self, queryset, name, value):
|
||||
"""Filter attributes by query args"""
|
||||
try:
|
||||
value = loads(value)
|
||||
except ValueError:
|
||||
raise ValidationError(_("filter: failed to parse JSON")) from None
|
||||
raise ValidationError(detail="filter: failed to parse JSON") from None
|
||||
if not isinstance(value, dict):
|
||||
raise ValidationError(_("filter: value must be key:value mapping"))
|
||||
raise ValidationError(detail="filter: value must be key:value mapping")
|
||||
qs = {}
|
||||
for key, _value in value.items():
|
||||
qs[f"attributes__{key}"] = _value
|
||||
try:
|
||||
__ = len(queryset.filter(**qs))
|
||||
_ = len(queryset.filter(**qs))
|
||||
return queryset.filter(**qs)
|
||||
except ValueError:
|
||||
return queryset
|
||||
@@ -544,7 +530,7 @@ class UserViewSet(
|
||||
def get_queryset(self):
|
||||
base_qs = User.objects.all().exclude_anonymous()
|
||||
if self.serializer_class(context={"request": self.request})._should_include_groups:
|
||||
base_qs = base_qs.prefetch_related("groups")
|
||||
base_qs = base_qs.prefetch_related("ak_groups")
|
||||
if self.serializer_class(context={"request": self.request})._should_include_roles:
|
||||
base_qs = base_qs.prefetch_related("roles")
|
||||
return base_qs
|
||||
@@ -558,16 +544,14 @@ class UserViewSet(
|
||||
def list(self, request, *args, **kwargs):
|
||||
return super().list(request, *args, **kwargs)
|
||||
|
||||
def _create_recovery_link(
|
||||
self, token_duration: str | None, for_email=False
|
||||
) -> tuple[str, Token]:
|
||||
def _create_recovery_link(self, for_email=False) -> tuple[str, Token]:
|
||||
"""Create a recovery link (when the current brand has a recovery flow set),
|
||||
that can either be shown to an admin or sent to the user directly"""
|
||||
brand: Brand = self.request.brand
|
||||
brand: Brand = self.request._request.brand
|
||||
# Check that there is a recovery flow, if not return an error
|
||||
flow = brand.flow_recovery
|
||||
if not flow:
|
||||
raise ValidationError({"non_field_errors": _("No recovery flow set.")})
|
||||
raise ValidationError({"non_field_errors": "No recovery flow set."})
|
||||
user: User = self.get_object()
|
||||
planner = FlowPlanner(flow)
|
||||
planner.allow_empty_flows = True
|
||||
@@ -581,15 +565,11 @@ class UserViewSet(
|
||||
)
|
||||
except FlowNonApplicableException:
|
||||
raise ValidationError(
|
||||
{"non_field_errors": _("Recovery flow not applicable to user")}
|
||||
{"non_field_errors": "Recovery flow not applicable to user"}
|
||||
) from None
|
||||
_plan = FlowToken.pickle(plan)
|
||||
if for_email:
|
||||
_plan = pickle_flow_token_for_email(plan)
|
||||
expires = default_token_duration()
|
||||
if token_duration:
|
||||
timedelta_string_validator(token_duration)
|
||||
expires = now() + timedelta_from_string(token_duration)
|
||||
token, __ = FlowToken.objects.update_or_create(
|
||||
identifier=f"{user.uid}-password-reset",
|
||||
defaults={
|
||||
@@ -597,7 +577,6 @@ class UserViewSet(
|
||||
"flow": flow,
|
||||
"_plan": _plan,
|
||||
"revoke_on_execution": not for_email,
|
||||
"expires": expires,
|
||||
},
|
||||
)
|
||||
querystring = urlencode({QS_KEY_TOKEN: token.key})
|
||||
@@ -745,60 +724,60 @@ class UserViewSet(
|
||||
|
||||
@permission_required("authentik_core.reset_user_password")
|
||||
@extend_schema(
|
||||
request=UserRecoveryLinkSerializer,
|
||||
responses={
|
||||
"200": LinkSerializer(many=False),
|
||||
},
|
||||
request=None,
|
||||
)
|
||||
@action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
|
||||
@validate(UserRecoveryLinkSerializer)
|
||||
def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response:
|
||||
def recovery(self, request: Request, pk: int) -> Response:
|
||||
"""Create a temporary link that a user can use to recover their account"""
|
||||
link, _ = self._create_recovery_link(
|
||||
token_duration=body.validated_data.get("token_duration")
|
||||
)
|
||||
link, _ = self._create_recovery_link()
|
||||
return Response({"link": link})
|
||||
|
||||
@permission_required("authentik_core.reset_user_password")
|
||||
@extend_schema(
|
||||
request=UserRecoveryEmailSerializer,
|
||||
parameters=[
|
||||
OpenApiParameter(
|
||||
name="email_stage",
|
||||
location=OpenApiParameter.QUERY,
|
||||
type=OpenApiTypes.STR,
|
||||
required=True,
|
||||
)
|
||||
],
|
||||
responses={
|
||||
"204": OpenApiResponse(description="Successfully sent recover email"),
|
||||
},
|
||||
request=None,
|
||||
)
|
||||
@action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
|
||||
@validate(UserRecoveryEmailSerializer)
|
||||
def recovery_email(
|
||||
self, request: Request, pk: int, body: UserRecoveryEmailSerializer
|
||||
) -> Response:
|
||||
def recovery_email(self, request: Request, pk: int) -> Response:
|
||||
"""Send an email with a temporary link that a user can use to recover their account"""
|
||||
email_error_message = _("User does not have an email address set.")
|
||||
stage_error_message = _("Email stage not found.")
|
||||
user: User = self.get_object()
|
||||
if not user.email:
|
||||
for_user: User = self.get_object()
|
||||
if for_user.email == "":
|
||||
LOGGER.debug("User doesn't have an email address")
|
||||
raise ValidationError({"non_field_errors": email_error_message})
|
||||
if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()):
|
||||
LOGGER.debug("Email stage does not exist")
|
||||
raise ValidationError({"non_field_errors": stage_error_message})
|
||||
if not request.user.has_perm("authentik_stages_email.view_emailstage", stage):
|
||||
LOGGER.debug("User has no view access to email stage")
|
||||
raise ValidationError({"non_field_errors": stage_error_message})
|
||||
link, token = self._create_recovery_link(
|
||||
token_duration=body.validated_data.get("token_duration"), for_email=True
|
||||
)
|
||||
raise ValidationError({"non_field_errors": "User does not have an email address set."})
|
||||
link, token = self._create_recovery_link(for_email=True)
|
||||
# Lookup the email stage to assure the current user can access it
|
||||
stages = get_objects_for_user(
|
||||
request.user, "authentik_stages_email.view_emailstage"
|
||||
).filter(pk=request.query_params.get("email_stage"))
|
||||
if not stages.exists():
|
||||
LOGGER.debug("Email stage does not exist/user has no permissions")
|
||||
raise ValidationError({"non_field_errors": "Email stage does not exist."})
|
||||
email_stage: EmailStage = stages.first()
|
||||
message = TemplateEmailMessage(
|
||||
subject=_(stage.subject),
|
||||
to=[(user.name, user.email)],
|
||||
template_name=stage.template,
|
||||
language=user.locale(request),
|
||||
subject=_(email_stage.subject),
|
||||
to=[(for_user.name, for_user.email)],
|
||||
template_name=email_stage.template,
|
||||
language=for_user.locale(request),
|
||||
template_context={
|
||||
"url": link,
|
||||
"user": user,
|
||||
"user": for_user,
|
||||
"expires": token.expires,
|
||||
},
|
||||
)
|
||||
send_mails(stage, message)
|
||||
send_mails(email_stage, message)
|
||||
return Response(status=204)
|
||||
|
||||
@permission_required("authentik_core.impersonate")
|
||||
|
||||
@@ -127,10 +127,3 @@ class LinkSerializer(PassiveSerializer):
|
||||
"""Returns a single link"""
|
||||
|
||||
link = CharField()
|
||||
|
||||
|
||||
class ThemedUrlsSerializer(PassiveSerializer):
|
||||
"""Themed URLs - maps theme names to URLs for light and dark themes"""
|
||||
|
||||
light = CharField(required=False, allow_null=True)
|
||||
dark = CharField(required=False, allow_null=True)
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
# Generated by Django 5.2.10 on 2026-01-19 21:46
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("authentik_core", "0056_user_roles"),
|
||||
("authentik_rbac", "0010_remove_role_group_alter_role_name"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RemoveField(
|
||||
model_name="user",
|
||||
name="user_permissions",
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name="group",
|
||||
name="roles",
|
||||
field=models.ManyToManyField(
|
||||
blank=True, related_name="groups", to="authentik_rbac.role"
|
||||
),
|
||||
),
|
||||
migrations.RemoveField(
|
||||
model_name="user",
|
||||
name="groups",
|
||||
),
|
||||
migrations.RenameField(
|
||||
model_name="user",
|
||||
old_name="ak_groups",
|
||||
new_name="groups",
|
||||
),
|
||||
migrations.AlterModelOptions(
|
||||
name="user",
|
||||
options={
|
||||
"permissions": [
|
||||
("reset_user_password", "Reset Password"),
|
||||
("impersonate", "Can impersonate other users"),
|
||||
("preview_user", "Can preview user data sent to providers"),
|
||||
("view_user_applications", "View applications the user has access to"),
|
||||
],
|
||||
"verbose_name": "User",
|
||||
"verbose_name_plural": "Users",
|
||||
},
|
||||
),
|
||||
]
|
||||
@@ -1,8 +1,6 @@
|
||||
"""authentik core models"""
|
||||
|
||||
import re
|
||||
import traceback
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
from enum import StrEnum
|
||||
from hashlib import sha256
|
||||
from typing import Any, Self
|
||||
@@ -17,6 +15,7 @@ from django.contrib.sessions.base_session import AbstractBaseSession
|
||||
from django.core.validators import validate_slug
|
||||
from django.db import models
|
||||
from django.db.models import Q, QuerySet, options
|
||||
from django.db.models.constants import LOOKUP_SEP
|
||||
from django.http import HttpRequest
|
||||
from django.utils.functional import cached_property
|
||||
from django.utils.timezone import now
|
||||
@@ -44,7 +43,6 @@ from authentik.lib.models import (
|
||||
DomainlessFormattedURLValidator,
|
||||
SerializerModel,
|
||||
)
|
||||
from authentik.lib.utils.inheritance import get_deepest_child
|
||||
from authentik.lib.utils.time import timedelta_from_string
|
||||
from authentik.policies.models import PolicyBindingModel
|
||||
from authentik.rbac.models import Role
|
||||
@@ -52,7 +50,6 @@ from authentik.tenants.models import DEFAULT_TOKEN_DURATION, DEFAULT_TOKEN_LENGT
|
||||
from authentik.tenants.utils import get_current_tenant, get_unique_identifier
|
||||
|
||||
LOGGER = get_logger()
|
||||
USERNAME_MAX_LENGTH = 150
|
||||
USER_PATH_SYSTEM_PREFIX = "goauthentik.io"
|
||||
_USER_ATTR_PREFIX = f"{USER_PATH_SYSTEM_PREFIX}/user"
|
||||
USER_ATTRIBUTE_DEBUG = f"{_USER_ATTR_PREFIX}/debug"
|
||||
@@ -186,7 +183,7 @@ class Group(SerializerModel, AttributesMixin):
|
||||
default=False, help_text=_("Users added to this group will be superusers.")
|
||||
)
|
||||
|
||||
roles = models.ManyToManyField("authentik_rbac.Role", related_name="groups", blank=True)
|
||||
roles = models.ManyToManyField("authentik_rbac.Role", related_name="ak_groups", blank=True)
|
||||
|
||||
parents = models.ManyToManyField(
|
||||
"Group",
|
||||
@@ -235,7 +232,7 @@ class Group(SerializerModel, AttributesMixin):
|
||||
def all_roles(self) -> QuerySet[Role]:
|
||||
"""Get all roles of this group and all of its ancestors."""
|
||||
return Role.objects.filter(
|
||||
groups__in=Group.objects.filter(pk=self.pk).with_ancestors()
|
||||
ak_groups__in=Group.objects.filter(pk=self.pk).with_ancestors()
|
||||
).distinct()
|
||||
|
||||
def get_managed_role(self, create=False):
|
||||
@@ -243,7 +240,7 @@ class Group(SerializerModel, AttributesMixin):
|
||||
name = managed_role_name(self)
|
||||
role, created = Role.objects.get_or_create(name=name, managed=name)
|
||||
if created:
|
||||
role.groups.add(self)
|
||||
role.ak_groups.add(self)
|
||||
return role
|
||||
else:
|
||||
return Role.objects.filter(name=managed_role_name(self)).first()
|
||||
@@ -358,17 +355,13 @@ class UserManager(DjangoUserManager):
|
||||
class User(SerializerModel, AttributesMixin, AbstractUser):
|
||||
"""authentik User model, based on django's contrib auth user model."""
|
||||
|
||||
# Overwriting PermissionsMixin: permissions are handled by roles.
|
||||
# (This knowingly violates the Liskov substitution principle. It is better to fail loudly.)
|
||||
user_permissions = None
|
||||
|
||||
uuid = models.UUIDField(default=uuid4, editable=False, unique=True)
|
||||
name = models.TextField(help_text=_("User's display name."))
|
||||
path = models.TextField(default="users")
|
||||
type = models.TextField(choices=UserTypes.choices, default=UserTypes.INTERNAL)
|
||||
|
||||
sources = models.ManyToManyField("Source", through="UserSourceConnection")
|
||||
groups = models.ManyToManyField("Group", related_name="users")
|
||||
ak_groups = models.ManyToManyField("Group", related_name="users")
|
||||
roles = models.ManyToManyField("authentik_rbac.Role", related_name="users", blank=True)
|
||||
password_change_date = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
@@ -382,6 +375,8 @@ class User(SerializerModel, AttributesMixin, AbstractUser):
|
||||
permissions = [
|
||||
("reset_user_password", _("Reset Password")),
|
||||
("impersonate", _("Can impersonate other users")),
|
||||
("assign_user_permissions", _("Can assign permissions to users")),
|
||||
("unassign_user_permissions", _("Can unassign permissions from users")),
|
||||
("preview_user", _("Can preview user data sent to providers")),
|
||||
("view_user_applications", _("View applications the user has access to")),
|
||||
]
|
||||
@@ -405,11 +400,11 @@ class User(SerializerModel, AttributesMixin, AbstractUser):
|
||||
|
||||
def all_groups(self) -> QuerySet[Group]:
|
||||
"""Recursively get all groups this user is a member of."""
|
||||
return self.groups.all().with_ancestors()
|
||||
return self.ak_groups.all().with_ancestors()
|
||||
|
||||
def all_roles(self) -> QuerySet[Role]:
|
||||
"""Get all roles of this user and all of its groups (recursively)."""
|
||||
return Role.objects.filter(Q(users=self) | Q(groups__in=self.all_groups())).distinct()
|
||||
return Role.objects.filter(Q(users=self) | Q(ak_groups__in=self.all_groups())).distinct()
|
||||
|
||||
def get_managed_role(self, create=False):
|
||||
if create:
|
||||
@@ -513,54 +508,6 @@ class User(SerializerModel, AttributesMixin, AbstractUser):
|
||||
"""superuser == staff user"""
|
||||
return self.is_superuser # type: ignore
|
||||
|
||||
# TODO: remove this after 2026.
|
||||
@property
|
||||
def ak_groups(self):
|
||||
"""This is a proxy for a renamed, deprecated field."""
|
||||
from authentik.events.models import Event, EventAction
|
||||
|
||||
deprecation = "authentik.core.models.User.ak_groups"
|
||||
replacement = "authentik.core.models.User.groups"
|
||||
message_logger = (
|
||||
f"{deprecation} is deprecated and will be removed in a future version of "
|
||||
f"authentik. Please use {replacement} instead."
|
||||
)
|
||||
message_event = (
|
||||
f"{message_logger} This event will not be repeated until it expires (by "
|
||||
"default: in 30 days). See authentik logs for every will invocation of this "
|
||||
"deprecation."
|
||||
)
|
||||
stacktrace = traceback.format_stack()
|
||||
# The last line is this function, the next-to-last line is its caller
|
||||
cause = stacktrace[-2] if len(stacktrace) > 1 else "Unknown, see stacktrace in logs"
|
||||
if search := re.search(r'"(.*?)"', cause):
|
||||
cause = f"Property mapping or Expression policy named {search.group(1)}"
|
||||
|
||||
LOGGER.warning(
|
||||
"deprecation used",
|
||||
message=message_logger,
|
||||
deprecation=deprecation,
|
||||
replacement=replacement,
|
||||
cause=cause,
|
||||
stacktrace=stacktrace,
|
||||
)
|
||||
if not Event.filter_not_expired(
|
||||
action=EventAction.CONFIGURATION_WARNING,
|
||||
context__deprecation=deprecation,
|
||||
context__cause=cause,
|
||||
).exists():
|
||||
event = Event.new(
|
||||
EventAction.CONFIGURATION_WARNING,
|
||||
deprecation=deprecation,
|
||||
replacement=replacement,
|
||||
message=message_event,
|
||||
cause=cause,
|
||||
)
|
||||
event.expires = datetime.now() + timedelta(days=30)
|
||||
event.save()
|
||||
|
||||
return self.groups
|
||||
|
||||
def set_password(self, raw_password, signal=True, sender=None, request=None):
|
||||
if self.pk and signal:
|
||||
from authentik.core.signals import password_changed
|
||||
@@ -766,14 +713,6 @@ class Application(SerializerModel, PolicyBindingModel):
|
||||
|
||||
return get_file_manager(FileUsage.MEDIA).file_url(self.meta_icon)
|
||||
|
||||
@property
|
||||
def get_meta_icon_themed_urls(self) -> dict[str, str] | None:
|
||||
"""Get themed URLs for meta_icon if it contains %(theme)s"""
|
||||
if not self.meta_icon:
|
||||
return None
|
||||
|
||||
return get_file_manager(FileUsage.MEDIA).themed_urls(self.meta_icon)
|
||||
|
||||
def get_launch_url(self, user: User | None = None, user_data: dict | None = None) -> str | None:
|
||||
"""Get launch URL if set, otherwise attempt to get launch URL based on provider.
|
||||
|
||||
@@ -803,7 +742,25 @@ class Application(SerializerModel, PolicyBindingModel):
|
||||
"""Get casted provider instance. Needs Application queryset with_provider"""
|
||||
if not self.provider:
|
||||
return None
|
||||
return get_deepest_child(self.provider)
|
||||
|
||||
candidates = []
|
||||
base_class = Provider
|
||||
for subclass in base_class.objects.get_queryset()._get_subclasses_recurse(base_class):
|
||||
parent = self.provider
|
||||
for level in subclass.split(LOOKUP_SEP):
|
||||
try:
|
||||
parent = getattr(parent, level)
|
||||
except AttributeError:
|
||||
break
|
||||
if parent in candidates:
|
||||
continue
|
||||
idx = subclass.count(LOOKUP_SEP)
|
||||
if type(parent) is not base_class:
|
||||
idx += 1
|
||||
candidates.insert(idx, parent)
|
||||
if not candidates:
|
||||
return None
|
||||
return candidates[-1]
|
||||
|
||||
def backchannel_provider_for[T: Provider](self, provider_type: type[T], **kwargs) -> T | None:
|
||||
"""Get Backchannel provider for a specific type"""
|
||||
@@ -970,14 +927,6 @@ class Source(ManagedModel, SerializerModel, PolicyBindingModel):
|
||||
|
||||
return get_file_manager(FileUsage.MEDIA).file_url(self.icon)
|
||||
|
||||
@property
|
||||
def icon_themed_urls(self) -> dict[str, str] | None:
|
||||
"""Get themed URLs for icon if it contains %(theme)s"""
|
||||
if not self.icon:
|
||||
return None
|
||||
|
||||
return get_file_manager(FileUsage.MEDIA).themed_urls(self.icon)
|
||||
|
||||
def get_user_path(self) -> str:
|
||||
"""Get user path, fallback to default for formatting errors"""
|
||||
try:
|
||||
|
||||
@@ -392,10 +392,10 @@ class GroupUpdateStage(StageView):
|
||||
groups.append(group)
|
||||
|
||||
with transaction.atomic():
|
||||
self.user.groups.remove(
|
||||
*self.user.groups.filter(groupsourceconnection__source=self.source)
|
||||
self.user.ak_groups.remove(
|
||||
*self.user.ak_groups.filter(groupsourceconnection__source=self.source)
|
||||
)
|
||||
self.user.groups.add(*groups)
|
||||
self.user.ak_groups.add(*groups)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@@ -44,24 +44,19 @@
|
||||
{% endblock %}
|
||||
</div>
|
||||
</main>
|
||||
<footer
|
||||
name="site-footer"
|
||||
aria-label="{% trans 'Site footer' %}"
|
||||
class="pf-c-login__footer pf-m-dark">
|
||||
<div name="flow-links" aria-label="{% trans 'Flow links' %}">
|
||||
<ul class="pf-c-list pf-m-inline" part="list">
|
||||
{% for link in footer_links %}
|
||||
<li part="list-item">
|
||||
<a part="list-item-link" href="{{ link.href }}">{{ link.name }}</a>
|
||||
</li>
|
||||
{% endfor %}
|
||||
<li part="list-item">
|
||||
<span>
|
||||
{% trans 'Powered by authentik' %}
|
||||
</span>
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
<footer aria-label="Site footer" class="pf-c-login__footer pf-m-dark">
|
||||
<ul class="pf-c-list pf-m-inline">
|
||||
{% for link in footer_links %}
|
||||
<li>
|
||||
<a href="{{ link.href }}">{{ link.name }}</a>
|
||||
</li>
|
||||
{% endfor %}
|
||||
<li>
|
||||
<span>
|
||||
{% trans 'Powered by authentik' %}
|
||||
</span>
|
||||
</li>
|
||||
</ul>
|
||||
</footer>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -38,7 +38,7 @@ class TestApplicationEntitlements(APITestCase):
|
||||
def test_group(self):
|
||||
"""Test direct group"""
|
||||
group = Group.objects.create(name=generate_id())
|
||||
self.user.groups.add(group)
|
||||
self.user.ak_groups.add(group)
|
||||
ent = ApplicationEntitlement.objects.create(app=self.app, name=generate_id())
|
||||
PolicyBinding.objects.create(target=ent, group=group, order=0)
|
||||
ents = self.user.app_entitlements(self.app)
|
||||
@@ -50,7 +50,7 @@ class TestApplicationEntitlements(APITestCase):
|
||||
parent = Group.objects.create(name=generate_id())
|
||||
group = Group.objects.create(name=generate_id())
|
||||
group.parents.add(parent)
|
||||
self.user.groups.add(group)
|
||||
self.user.ak_groups.add(group)
|
||||
ent = ApplicationEntitlement.objects.create(app=self.app, name=generate_id())
|
||||
PolicyBinding.objects.create(target=ent, group=parent, order=0)
|
||||
ents = self.user.app_entitlements(self.app)
|
||||
|
||||
@@ -107,8 +107,6 @@ class TestApplicationsAPI(APITestCase):
|
||||
"provider_obj": {
|
||||
"assigned_application_name": "allowed",
|
||||
"assigned_application_slug": "allowed",
|
||||
"assigned_backchannel_application_name": None,
|
||||
"assigned_backchannel_application_slug": None,
|
||||
"authentication_flow": None,
|
||||
"invalidation_flow": None,
|
||||
"authorization_flow": str(self.provider.authorization_flow.pk),
|
||||
@@ -127,7 +125,6 @@ class TestApplicationsAPI(APITestCase):
|
||||
"open_in_new_tab": True,
|
||||
"meta_icon": "",
|
||||
"meta_icon_url": None,
|
||||
"meta_icon_themed_urls": None,
|
||||
"meta_description": "",
|
||||
"meta_publisher": "",
|
||||
"policy_engine_mode": "any",
|
||||
@@ -165,8 +162,6 @@ class TestApplicationsAPI(APITestCase):
|
||||
"provider_obj": {
|
||||
"assigned_application_name": "allowed",
|
||||
"assigned_application_slug": "allowed",
|
||||
"assigned_backchannel_application_name": None,
|
||||
"assigned_backchannel_application_slug": None,
|
||||
"authentication_flow": None,
|
||||
"invalidation_flow": None,
|
||||
"authorization_flow": str(self.provider.authorization_flow.pk),
|
||||
@@ -185,7 +180,6 @@ class TestApplicationsAPI(APITestCase):
|
||||
"open_in_new_tab": True,
|
||||
"meta_icon": "",
|
||||
"meta_icon_url": None,
|
||||
"meta_icon_themed_urls": None,
|
||||
"meta_description": "",
|
||||
"meta_publisher": "",
|
||||
"policy_engine_mode": "any",
|
||||
@@ -195,7 +189,6 @@ class TestApplicationsAPI(APITestCase):
|
||||
"meta_description": "",
|
||||
"meta_icon": "",
|
||||
"meta_icon_url": None,
|
||||
"meta_icon_themed_urls": None,
|
||||
"meta_launch_url": "",
|
||||
"open_in_new_tab": False,
|
||||
"meta_publisher": "",
|
||||
|
||||
@@ -122,8 +122,8 @@ class TestGroupsAPI(APITestCase):
|
||||
def test_superuser_update_no_perm(self):
|
||||
"""Test updating a superuser group without permission"""
|
||||
group = Group.objects.create(name=generate_id(), is_superuser=True)
|
||||
self.login_user.assign_perms_to_managed_role("authentik_core.view_group", group)
|
||||
self.login_user.assign_perms_to_managed_role("authentik_core.change_group", group)
|
||||
self.login_user.assign_perms_to_managed_role("view_group", group)
|
||||
self.login_user.assign_perms_to_managed_role("change_group", group)
|
||||
self.client.force_login(self.login_user)
|
||||
res = self.client.patch(
|
||||
reverse("authentik_api:group-detail", kwargs={"pk": group.pk}),
|
||||
@@ -139,8 +139,8 @@ class TestGroupsAPI(APITestCase):
|
||||
"""Test updating a superuser group without permission
|
||||
and without changing the superuser status"""
|
||||
group = Group.objects.create(name=generate_id(), is_superuser=True)
|
||||
self.login_user.assign_perms_to_managed_role("authentik_core.view_group", group)
|
||||
self.login_user.assign_perms_to_managed_role("authentik_core.change_group", group)
|
||||
self.login_user.assign_perms_to_managed_role("view_group", group)
|
||||
self.login_user.assign_perms_to_managed_role("change_group", group)
|
||||
self.client.force_login(self.login_user)
|
||||
res = self.client.patch(
|
||||
reverse("authentik_api:group-detail", kwargs={"pk": group.pk}),
|
||||
|
||||
@@ -54,7 +54,7 @@ class TestSourceFlowManager(FlowTestCase):
|
||||
)
|
||||
self.assertTrue(stage.handle_groups())
|
||||
self.assertTrue(Group.objects.filter(name="group 1").exists())
|
||||
self.assertTrue(self.user.groups.filter(name="group 1").exists())
|
||||
self.assertTrue(self.user.ak_groups.filter(name="group 1").exists())
|
||||
self.assertTrue(
|
||||
GroupOAuthSourceConnection.objects.filter(
|
||||
group=Group.objects.get(name="group 1"), source=self.source
|
||||
@@ -88,7 +88,7 @@ class TestSourceFlowManager(FlowTestCase):
|
||||
)
|
||||
self.assertTrue(stage.handle_groups())
|
||||
self.assertTrue(Group.objects.filter(name="group 1").exists())
|
||||
self.assertTrue(self.user.groups.filter(name="group 1").exists())
|
||||
self.assertTrue(self.user.ak_groups.filter(name="group 1").exists())
|
||||
self.assertTrue(
|
||||
GroupOAuthSourceConnection.objects.filter(
|
||||
group=Group.objects.get(name="group 1"), source=self.source
|
||||
@@ -123,7 +123,7 @@ class TestSourceFlowManager(FlowTestCase):
|
||||
)
|
||||
self.assertTrue(stage.handle_groups())
|
||||
self.assertTrue(Group.objects.filter(name="group 1").exists())
|
||||
self.assertTrue(self.user.groups.filter(name="group 1").exists())
|
||||
self.assertTrue(self.user.ak_groups.filter(name="group 1").exists())
|
||||
self.assertTrue(
|
||||
GroupOAuthSourceConnection.objects.filter(group=group, source=self.source).exists()
|
||||
)
|
||||
@@ -155,7 +155,7 @@ class TestSourceFlowManager(FlowTestCase):
|
||||
)
|
||||
self.assertTrue(stage.handle_groups())
|
||||
self.assertTrue(Group.objects.filter(name="group 1").exists())
|
||||
self.assertTrue(self.user.groups.filter(name="group 1").exists())
|
||||
self.assertTrue(self.user.ak_groups.filter(name="group 1").exists())
|
||||
self.assertTrue(
|
||||
GroupOAuthSourceConnection.objects.filter(
|
||||
group=Group.objects.get(name="group 1"), source=self.source
|
||||
@@ -189,7 +189,7 @@ class TestSourceFlowManager(FlowTestCase):
|
||||
request=request,
|
||||
)
|
||||
self.assertFalse(stage.handle_groups())
|
||||
self.assertFalse(self.user.groups.filter(name="group 1").exists())
|
||||
self.assertFalse(self.user.ak_groups.filter(name="group 1").exists())
|
||||
self.assertFalse(
|
||||
GroupOAuthSourceConnection.objects.filter(group=group, source=self.source).exists()
|
||||
)
|
||||
@@ -201,7 +201,7 @@ class TestSourceFlowManager(FlowTestCase):
|
||||
other_group = Group.objects.create(name="other group")
|
||||
old_group = Group.objects.create(name="old group")
|
||||
new_group = Group.objects.create(name="new group")
|
||||
self.user.groups.set([other_group, old_group])
|
||||
self.user.ak_groups.set([other_group, old_group])
|
||||
GroupOAuthSourceConnection.objects.create(
|
||||
group=old_group, source=self.source, identifier=old_group.name
|
||||
)
|
||||
@@ -231,7 +231,7 @@ class TestSourceFlowManager(FlowTestCase):
|
||||
request=request,
|
||||
)
|
||||
self.assertTrue(stage.handle_groups())
|
||||
self.assertFalse(self.user.groups.filter(name="old group").exists())
|
||||
self.assertTrue(self.user.groups.filter(name="other group").exists())
|
||||
self.assertTrue(self.user.groups.filter(name="new group").exists())
|
||||
self.assertEqual(self.user.groups.count(), 2)
|
||||
self.assertFalse(self.user.ak_groups.filter(name="old group").exists())
|
||||
self.assertTrue(self.user.ak_groups.filter(name="other group").exists())
|
||||
self.assertTrue(self.user.ak_groups.filter(name="new group").exists())
|
||||
self.assertEqual(self.user.ak_groups.count(), 2)
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
from django.test.testcases import TestCase
|
||||
|
||||
from authentik.core.models import User
|
||||
from authentik.events.models import Event
|
||||
from authentik.lib.generators import generate_id
|
||||
|
||||
|
||||
@@ -19,17 +18,3 @@ class TestUsers(TestCase):
|
||||
self.assertTrue(user.has_perm(perm))
|
||||
user.remove_perms_from_managed_role(perm)
|
||||
self.assertFalse(user.has_perm(perm))
|
||||
|
||||
def test_user_ak_groups(self):
|
||||
"""Test user.ak_groups is a proxy for user.groups"""
|
||||
user = User.objects.create(username=generate_id())
|
||||
self.assertEqual(user.ak_groups, user.groups)
|
||||
|
||||
def test_user_ak_groups_event(self):
|
||||
"""Test user.ak_groups creates exactly one event"""
|
||||
user = User.objects.create(username=generate_id())
|
||||
self.assertEqual(Event.objects.count(), 0)
|
||||
user.ak_groups.all()
|
||||
self.assertEqual(Event.objects.count(), 1)
|
||||
user.ak_groups.all()
|
||||
self.assertEqual(Event.objects.count(), 1)
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
"""Test Users API"""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
from json import loads
|
||||
|
||||
from django.urls.base import reverse
|
||||
from django.utils.timezone import now
|
||||
from rest_framework.test import APITestCase
|
||||
|
||||
from authentik.brands.models import Brand
|
||||
@@ -128,62 +127,13 @@ class TestUsersAPI(APITestCase):
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_recovery_duration(self):
|
||||
"""Test user recovery token duration"""
|
||||
Token.objects.all().delete()
|
||||
flow = create_test_flow(
|
||||
FlowDesignation.RECOVERY,
|
||||
authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
|
||||
)
|
||||
brand: Brand = create_test_brand()
|
||||
brand.flow_recovery = flow
|
||||
brand.save()
|
||||
self.client.force_login(self.admin)
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
|
||||
data={"token_duration": "days=33"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
expires = Token.objects.first().expires
|
||||
expected_expires = now() + timedelta(days=33)
|
||||
self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
|
||||
|
||||
def test_recovery_duration_update(self):
|
||||
"""Test user recovery token duration update"""
|
||||
Token.objects.all().delete()
|
||||
flow = create_test_flow(
|
||||
FlowDesignation.RECOVERY,
|
||||
authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
|
||||
)
|
||||
brand: Brand = create_test_brand()
|
||||
brand.flow_recovery = flow
|
||||
brand.save()
|
||||
self.client.force_login(self.admin)
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
|
||||
data={"token_duration": "days=33"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
expires = Token.objects.first().expires
|
||||
expected_expires = now() + timedelta(days=33)
|
||||
self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
|
||||
data={"token_duration": "days=66"},
|
||||
)
|
||||
expires = Token.objects.first().expires
|
||||
expected_expires = now() + timedelta(days=66)
|
||||
self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
|
||||
|
||||
def test_recovery_email_no_flow(self):
|
||||
"""Test user recovery link (no recovery flow set)"""
|
||||
self.client.force_login(self.admin)
|
||||
self.user.email = ""
|
||||
self.user.save()
|
||||
stage = EmailStage.objects.create(name="email")
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk}),
|
||||
data={"email_stage": stage.pk},
|
||||
reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk})
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertJSONEqual(
|
||||
@@ -192,8 +142,7 @@ class TestUsersAPI(APITestCase):
|
||||
self.user.email = "foo@bar.baz"
|
||||
self.user.save()
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk}),
|
||||
data={"email_stage": stage.pk},
|
||||
reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk})
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertJSONEqual(response.content, {"non_field_errors": "No recovery flow set."})
|
||||
@@ -211,7 +160,7 @@ class TestUsersAPI(APITestCase):
|
||||
reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk})
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertJSONEqual(response.content, {"email_stage": ["This field is required."]})
|
||||
self.assertJSONEqual(response.content, {"non_field_errors": "Email stage does not exist."})
|
||||
|
||||
def test_recovery_email(self):
|
||||
"""Test user recovery link"""
|
||||
@@ -229,8 +178,8 @@ class TestUsersAPI(APITestCase):
|
||||
reverse(
|
||||
"authentik_api:user-recovery-email",
|
||||
kwargs={"pk": self.user.pk},
|
||||
),
|
||||
data={"email_stage": stage.pk},
|
||||
)
|
||||
+ f"?email_stage={stage.pk}"
|
||||
)
|
||||
self.assertEqual(response.status_code, 204)
|
||||
|
||||
|
||||
@@ -7,8 +7,6 @@ from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import ec, rsa
|
||||
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey
|
||||
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
|
||||
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
|
||||
from cryptography.x509.oid import NameOID
|
||||
from django.db import models
|
||||
@@ -23,8 +21,6 @@ class PrivateKeyAlg(models.TextChoices):
|
||||
|
||||
RSA = "rsa", _("rsa")
|
||||
ECDSA = "ecdsa", _("ecdsa")
|
||||
ED25519 = "ed25519", _("Ed25519")
|
||||
ED448 = "ed448", _("Ed448")
|
||||
|
||||
|
||||
class CertificateBuilder:
|
||||
@@ -60,10 +56,6 @@ class CertificateBuilder:
|
||||
return rsa.generate_private_key(
|
||||
public_exponent=65537, key_size=4096, backend=default_backend()
|
||||
)
|
||||
if self.alg == PrivateKeyAlg.ED25519:
|
||||
return Ed25519PrivateKey.generate()
|
||||
if self.alg == PrivateKeyAlg.ED448:
|
||||
return Ed448PrivateKey.generate()
|
||||
raise ValueError(f"Invalid alg: {self.alg}")
|
||||
|
||||
def build(
|
||||
@@ -106,25 +98,18 @@ class CertificateBuilder:
|
||||
self.__builder = self.__builder.add_extension(
|
||||
x509.SubjectAlternativeName(alt_names), critical=True
|
||||
)
|
||||
algo = hashes.SHA256()
|
||||
# EdDSA doesn't take a hash algorithm
|
||||
if isinstance(self.__private_key, (Ed25519PrivateKey | Ed448PrivateKey)):
|
||||
algo = None
|
||||
self.__certificate = self.__builder.sign(
|
||||
private_key=self.__private_key,
|
||||
algorithm=algo,
|
||||
algorithm=hashes.SHA256(),
|
||||
backend=default_backend(),
|
||||
)
|
||||
|
||||
@property
|
||||
def private_key(self):
|
||||
"""Return private key in PEM format"""
|
||||
format = serialization.PrivateFormat.TraditionalOpenSSL
|
||||
if isinstance(self.__private_key, (Ed25519PrivateKey | Ed448PrivateKey)):
|
||||
format = serialization.PrivateFormat.PKCS8
|
||||
return self.__private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=format,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
).decode("utf-8")
|
||||
|
||||
|
||||
@@ -78,7 +78,7 @@ def generate_key_id_legacy(key_data: str) -> str:
|
||||
"""Generate Key ID using MD5 (legacy format for backwards compatibility)."""
|
||||
if not key_data:
|
||||
return ""
|
||||
return md5(key_data.encode("utf-8"), usedforsecurity=False).hexdigest() # nosec
|
||||
return md5(key_data.encode("utf-8")).hexdigest() # nosec
|
||||
|
||||
|
||||
class CertificateKeyPair(SerializerModel, ManagedModel, CreatedUpdatedModel):
|
||||
|
||||
@@ -196,10 +196,8 @@ class TestCrypto(APITestCase):
|
||||
"""Test certificate export (download)"""
|
||||
keypair = create_test_cert()
|
||||
user = create_test_user()
|
||||
user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair)
|
||||
user.assign_perms_to_managed_role(
|
||||
"authentik_crypto.view_certificatekeypair_certificate", keypair
|
||||
)
|
||||
user.assign_perms_to_managed_role("view_certificatekeypair", keypair)
|
||||
user.assign_perms_to_managed_role("view_certificatekeypair_certificate", keypair)
|
||||
self.client.force_login(user)
|
||||
response = self.client.get(
|
||||
reverse(
|
||||
@@ -222,8 +220,8 @@ class TestCrypto(APITestCase):
|
||||
"""Test private_key export (download)"""
|
||||
keypair = create_test_cert()
|
||||
user = create_test_user()
|
||||
user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair)
|
||||
user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair_key", keypair)
|
||||
user.assign_perms_to_managed_role("view_certificatekeypair", keypair)
|
||||
user.assign_perms_to_managed_role("view_certificatekeypair_key", keypair)
|
||||
self.client.force_login(user)
|
||||
response = self.client.get(
|
||||
reverse(
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from hashlib import sha256
|
||||
from json import loads
|
||||
from unittest.mock import PropertyMock, patch
|
||||
|
||||
from django.urls import reverse
|
||||
from jwt import encode
|
||||
@@ -233,43 +232,3 @@ class TestEndpointStage(FlowTestCase):
|
||||
plan = plan()
|
||||
self.assertNotIn(PLAN_CONTEXT_AGENT_ENDPOINT_CHALLENGE, plan.context)
|
||||
self.assertEqual(plan.context[PLAN_CONTEXT_DEVICE], self.device)
|
||||
|
||||
def test_endpoint_stage_connector_no_stage_optional(self):
|
||||
flow = create_test_flow()
|
||||
stage = EndpointStage.objects.create(connector=self.connector, mode=StageMode.OPTIONAL)
|
||||
FlowStageBinding.objects.create(stage=stage, target=flow, order=0)
|
||||
|
||||
with patch(
|
||||
"authentik.endpoints.connectors.agent.models.AgentConnector.stage",
|
||||
PropertyMock(return_value=None),
|
||||
):
|
||||
with self.assertFlowFinishes() as plan:
|
||||
res = self.client.get(
|
||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
||||
)
|
||||
self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
|
||||
plan = plan()
|
||||
self.assertNotIn(PLAN_CONTEXT_AGENT_ENDPOINT_CHALLENGE, plan.context)
|
||||
self.assertNotIn(PLAN_CONTEXT_DEVICE, plan.context)
|
||||
|
||||
def test_endpoint_stage_connector_no_stage_required(self):
|
||||
flow = create_test_flow()
|
||||
stage = EndpointStage.objects.create(connector=self.connector, mode=StageMode.REQUIRED)
|
||||
FlowStageBinding.objects.create(stage=stage, target=flow, order=0)
|
||||
|
||||
with patch(
|
||||
"authentik.endpoints.connectors.agent.models.AgentConnector.stage",
|
||||
PropertyMock(return_value=None),
|
||||
):
|
||||
with self.assertFlowFinishes() as plan:
|
||||
res = self.client.get(
|
||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
||||
)
|
||||
self.assertStageResponse(
|
||||
res,
|
||||
component="ak-stage-access-denied",
|
||||
error_message="Invalid stage configuration",
|
||||
)
|
||||
plan = plan()
|
||||
self.assertNotIn(PLAN_CONTEXT_AGENT_ENDPOINT_CHALLENGE, plan.context)
|
||||
self.assertNotIn(PLAN_CONTEXT_DEVICE, plan.context)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from authentik.endpoints.models import EndpointStage, StageMode
|
||||
from authentik.endpoints.models import EndpointStage
|
||||
from authentik.flows.stage import StageView
|
||||
|
||||
PLAN_CONTEXT_ENDPOINT_CONNECTOR = "endpoint_connector"
|
||||
@@ -6,24 +6,15 @@ PLAN_CONTEXT_ENDPOINT_CONNECTOR = "endpoint_connector"
|
||||
|
||||
class EndpointStageView(StageView):
|
||||
|
||||
def _get_inner(self) -> StageView | None:
|
||||
def _get_inner(self):
|
||||
stage: EndpointStage = self.executor.current_stage
|
||||
inner_stage: type[StageView] | None = stage.connector.stage
|
||||
if not inner_stage:
|
||||
return None
|
||||
return self.executor.stage_ok()
|
||||
return inner_stage(self.executor, request=self.request)
|
||||
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
inner = self._get_inner()
|
||||
if inner is None:
|
||||
stage: EndpointStage = self.executor.current_stage
|
||||
if stage.mode == StageMode.OPTIONAL:
|
||||
return self.executor.stage_ok()
|
||||
else:
|
||||
return self.executor.stage_invalid("Invalid stage configuration")
|
||||
return inner.dispatch(request, *args, **kwargs)
|
||||
return self._get_inner().dispatch(request, *args, **kwargs)
|
||||
|
||||
def cleanup(self):
|
||||
inner = self._get_inner()
|
||||
if inner is not None:
|
||||
return inner.cleanup()
|
||||
return self._get_inner().cleanup()
|
||||
|
||||
@@ -60,18 +60,20 @@ class TestEndpointFacts(APITestCase):
|
||||
]
|
||||
}
|
||||
)
|
||||
self.assertCountEqual(
|
||||
device.cached_facts.data["software"],
|
||||
[
|
||||
{
|
||||
"name": "software-a",
|
||||
"version": "1.2.3.4",
|
||||
"source": "package",
|
||||
},
|
||||
{
|
||||
"name": "software-b",
|
||||
"version": "5.6.7.8",
|
||||
"source": "package",
|
||||
},
|
||||
],
|
||||
self.assertEqual(
|
||||
device.cached_facts.data,
|
||||
{
|
||||
"software": [
|
||||
{
|
||||
"name": "software-a",
|
||||
"version": "1.2.3.4",
|
||||
"source": "package",
|
||||
},
|
||||
{
|
||||
"name": "software-b",
|
||||
"version": "5.6.7.8",
|
||||
"source": "package",
|
||||
},
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
@@ -10,7 +10,6 @@ from jwt import PyJWTError, decode, encode, get_unverified_header
|
||||
from rest_framework.exceptions import ValidationError
|
||||
from structlog.stdlib import get_logger
|
||||
|
||||
from authentik.common.oauth.constants import TOKEN_TYPE
|
||||
from authentik.core.models import AuthenticatedSession, Session, User
|
||||
from authentik.core.sessions import SessionStore
|
||||
from authentik.crypto.apps import MANAGED_KEY
|
||||
@@ -27,6 +26,7 @@ from authentik.events.models import Event, EventAction
|
||||
from authentik.events.signals import SESSION_LOGIN_EVENT
|
||||
from authentik.flows.planner import PLAN_CONTEXT_DEVICE
|
||||
from authentik.lib.utils.time import timedelta_from_string
|
||||
from authentik.providers.oauth2.constants import TOKEN_TYPE
|
||||
from authentik.providers.oauth2.id_token import IDToken
|
||||
from authentik.providers.oauth2.models import JWTAlgorithms
|
||||
from authentik.root.middleware import SessionMiddleware
|
||||
|
||||
@@ -15,7 +15,6 @@ from django.core.cache import cache
|
||||
from django.db.models.query import QuerySet
|
||||
from django.utils.timezone import now
|
||||
from jwt import PyJWTError, decode, get_unverified_header
|
||||
from jwt.algorithms import ECAlgorithm
|
||||
from rest_framework.exceptions import ValidationError
|
||||
from rest_framework.fields import (
|
||||
ChoiceField,
|
||||
@@ -110,20 +109,13 @@ class LicenseKey:
|
||||
intermediate.verify_directly_issued_by(get_licensing_key())
|
||||
except InvalidSignature, TypeError, ValueError, Error:
|
||||
raise ValidationError("Unable to verify license") from None
|
||||
_validate_curve_original = ECAlgorithm._validate_curve
|
||||
try:
|
||||
# authentik's license are generated with `algorithm="ES512"` and signed with
|
||||
# a key of curve `secp384r1`. Starting with version 2.11.0, pyjwt enforces the spec, see
|
||||
# https://github.com/jpadilla/pyjwt/commit/5b8622773358e56d3d3c0a9acf404809ff34433a
|
||||
# authentik will change its license generation to `algorithm="ES384"` in 2026.
|
||||
# TODO: remove this when the last incompatible license runs out.
|
||||
ECAlgorithm._validate_curve = lambda *_: True
|
||||
body = from_dict(
|
||||
LicenseKey,
|
||||
decode(
|
||||
jwt,
|
||||
our_cert.public_key(),
|
||||
algorithms=["ES384", "ES512"],
|
||||
algorithms=["ES512"],
|
||||
audience=get_license_aud(),
|
||||
options={"verify_exp": check_expiry, "verify_signature": check_expiry},
|
||||
),
|
||||
@@ -133,8 +125,6 @@ class LicenseKey:
|
||||
if unverified["aud"] != get_license_aud():
|
||||
raise ValidationError("Invalid Install ID in license") from None
|
||||
raise ValidationError("Unable to verify license") from None
|
||||
finally:
|
||||
ECAlgorithm._validate_curve = _validate_curve_original
|
||||
return body
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -1,150 +0,0 @@
|
||||
from datetime import datetime
|
||||
|
||||
from django.db.models import BooleanField as ModelBooleanField
|
||||
from django.db.models import Case, Q, Value, When
|
||||
from django_filters.rest_framework import BooleanFilter, FilterSet
|
||||
from drf_spectacular.utils import extend_schema
|
||||
from rest_framework.decorators import action
|
||||
from rest_framework.fields import IntegerField, SerializerMethodField
|
||||
from rest_framework.mixins import CreateModelMixin
|
||||
from rest_framework.request import Request
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.viewsets import GenericViewSet
|
||||
|
||||
from authentik.core.api.utils import ModelSerializer
|
||||
from authentik.enterprise.api import EnterpriseRequiredMixin
|
||||
from authentik.enterprise.lifecycle.api.reviews import ReviewSerializer
|
||||
from authentik.enterprise.lifecycle.models import LifecycleIteration, ReviewState
|
||||
from authentik.enterprise.lifecycle.utils import (
|
||||
ContentTypeField,
|
||||
ReviewerGroupSerializer,
|
||||
ReviewerUserSerializer,
|
||||
admin_link_for_model,
|
||||
parse_content_type,
|
||||
start_of_day,
|
||||
)
|
||||
from authentik.lib.utils.time import timedelta_from_string
|
||||
|
||||
|
||||
class LifecycleIterationSerializer(EnterpriseRequiredMixin, ModelSerializer):
|
||||
content_type = ContentTypeField()
|
||||
object_verbose = SerializerMethodField()
|
||||
object_admin_url = SerializerMethodField(read_only=True)
|
||||
grace_period_end = SerializerMethodField(read_only=True)
|
||||
reviews = ReviewSerializer(many=True, read_only=True, source="review_set.all")
|
||||
user_can_review = SerializerMethodField(read_only=True)
|
||||
|
||||
reviewer_groups = ReviewerGroupSerializer(
|
||||
many=True, read_only=True, source="rule.reviewer_groups"
|
||||
)
|
||||
min_reviewers = IntegerField(read_only=True, source="rule.min_reviewers")
|
||||
reviewers = ReviewerUserSerializer(many=True, read_only=True, source="rule.reviewers")
|
||||
|
||||
next_review_date = SerializerMethodField(read_only=True)
|
||||
|
||||
class Meta:
|
||||
model = LifecycleIteration
|
||||
fields = [
|
||||
"id",
|
||||
"content_type",
|
||||
"object_id",
|
||||
"object_verbose",
|
||||
"object_admin_url",
|
||||
"state",
|
||||
"opened_on",
|
||||
"grace_period_end",
|
||||
"next_review_date",
|
||||
"reviews",
|
||||
"user_can_review",
|
||||
"reviewer_groups",
|
||||
"min_reviewers",
|
||||
"reviewers",
|
||||
]
|
||||
read_only_fields = fields
|
||||
|
||||
def get_object_verbose(self, iteration: LifecycleIteration) -> str:
|
||||
return str(iteration.object)
|
||||
|
||||
def get_object_admin_url(self, iteration: LifecycleIteration) -> str:
|
||||
return admin_link_for_model(iteration.object)
|
||||
|
||||
def get_grace_period_end(self, iteration: LifecycleIteration) -> datetime:
|
||||
return start_of_day(
|
||||
iteration.opened_on + timedelta_from_string(iteration.rule.grace_period)
|
||||
)
|
||||
|
||||
def get_next_review_date(self, iteration: LifecycleIteration) -> datetime:
|
||||
return start_of_day(iteration.opened_on + timedelta_from_string(iteration.rule.interval))
|
||||
|
||||
def get_user_can_review(self, iteration: LifecycleIteration) -> bool:
|
||||
return iteration.user_can_review(self.context["request"].user)
|
||||
|
||||
|
||||
class LifecycleIterationFilterSet(FilterSet):
|
||||
user_is_reviewer = BooleanFilter(field_name="user_is_reviewer", lookup_expr="exact")
|
||||
|
||||
|
||||
class IterationViewSet(EnterpriseRequiredMixin, CreateModelMixin, GenericViewSet):
|
||||
queryset = LifecycleIteration.objects.all()
|
||||
serializer_class = LifecycleIterationSerializer
|
||||
ordering = ["-opened_on"]
|
||||
ordering_fields = ["state", "content_type__model", "opened_on", "grace_period_end"]
|
||||
filterset_class = LifecycleIterationFilterSet
|
||||
|
||||
def get_queryset(self):
|
||||
user = self.request.user
|
||||
return self.queryset.annotate(
|
||||
user_is_reviewer=Case(
|
||||
When(
|
||||
Q(rule__reviewers=user)
|
||||
| Q(rule__reviewer_groups__in=user.groups.all().with_ancestors()),
|
||||
then=Value(True),
|
||||
),
|
||||
default=Value(False),
|
||||
output_field=ModelBooleanField(),
|
||||
)
|
||||
).distinct()
|
||||
|
||||
@action(
|
||||
detail=False,
|
||||
methods=["get"],
|
||||
url_path=r"latest/(?P<content_type>[^/]+)/(?P<object_id>[^/]+)",
|
||||
)
|
||||
def latest_iteration(self, request: Request, content_type: str, object_id: str) -> Response:
|
||||
ct = parse_content_type(content_type)
|
||||
try:
|
||||
obj = (
|
||||
self.get_queryset()
|
||||
.filter(
|
||||
content_type__app_label=ct["app_label"],
|
||||
content_type__model=ct["model"],
|
||||
object_id=object_id,
|
||||
)
|
||||
.latest("opened_on")
|
||||
)
|
||||
except LifecycleIteration.DoesNotExist:
|
||||
return Response(status=404)
|
||||
serializer = self.get_serializer(obj)
|
||||
return Response(serializer.data)
|
||||
|
||||
@extend_schema(
|
||||
operation_id="lifecycle_iterations_list_open",
|
||||
responses={200: LifecycleIterationSerializer(many=True)},
|
||||
)
|
||||
@action(
|
||||
detail=False,
|
||||
methods=["get"],
|
||||
url_path=r"open",
|
||||
)
|
||||
def open_iterations(self, request: Request):
|
||||
iterations = self.get_queryset().filter(
|
||||
Q(state=ReviewState.PENDING) | Q(state=ReviewState.OVERDUE)
|
||||
)
|
||||
iterations = self.filter_queryset(iterations)
|
||||
page = self.paginate_queryset(iterations)
|
||||
if page is not None:
|
||||
serializer = self.get_serializer(page, many=True)
|
||||
return self.get_paginated_response(serializer.data)
|
||||
|
||||
serializer = self.get_serializer(iterations, many=True)
|
||||
return Response(serializer.data)
|
||||
@@ -1,33 +0,0 @@
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from rest_framework.exceptions import ValidationError
|
||||
from rest_framework.mixins import CreateModelMixin
|
||||
from rest_framework.viewsets import GenericViewSet
|
||||
|
||||
from authentik.core.api.utils import ModelSerializer
|
||||
from authentik.enterprise.api import EnterpriseRequiredMixin
|
||||
from authentik.enterprise.lifecycle.models import LifecycleIteration, Review
|
||||
from authentik.enterprise.lifecycle.utils import ReviewerUserSerializer
|
||||
|
||||
|
||||
class ReviewSerializer(EnterpriseRequiredMixin, ModelSerializer):
|
||||
reviewer = ReviewerUserSerializer(read_only=True)
|
||||
|
||||
class Meta:
|
||||
model = Review
|
||||
fields = ["id", "iteration", "reviewer", "timestamp", "note"]
|
||||
read_only_fields = ["id", "timestamp", "reviewer"]
|
||||
|
||||
def validate_iteration(self, iteration: LifecycleIteration) -> LifecycleIteration:
|
||||
user = self.context["request"].user
|
||||
if not iteration.user_can_review(user):
|
||||
raise ValidationError(_("You are not allowed to submit a review for this object."))
|
||||
return iteration
|
||||
|
||||
|
||||
class ReviewViewSet(EnterpriseRequiredMixin, CreateModelMixin, GenericViewSet):
|
||||
queryset = Review.objects.all()
|
||||
serializer_class = ReviewSerializer
|
||||
|
||||
def perform_create(self, serializer: ReviewSerializer) -> None:
|
||||
review = serializer.save(reviewer=self.request.user)
|
||||
review.iteration.on_review(self.request)
|
||||
@@ -1,113 +0,0 @@
|
||||
from django.utils.translation import gettext as _
|
||||
from rest_framework.exceptions import ValidationError
|
||||
from rest_framework.fields import SerializerMethodField
|
||||
from rest_framework.relations import SlugRelatedField
|
||||
from rest_framework.viewsets import ModelViewSet
|
||||
|
||||
from authentik.core.api.utils import ModelSerializer
|
||||
from authentik.core.models import User
|
||||
from authentik.enterprise.api import EnterpriseRequiredMixin
|
||||
from authentik.enterprise.lifecycle.models import LifecycleRule
|
||||
from authentik.enterprise.lifecycle.utils import (
|
||||
ContentTypeField,
|
||||
ReviewerGroupSerializer,
|
||||
ReviewerUserSerializer,
|
||||
)
|
||||
from authentik.lib.utils.time import timedelta_from_string
|
||||
|
||||
|
||||
class LifecycleRuleSerializer(EnterpriseRequiredMixin, ModelSerializer):
|
||||
content_type = ContentTypeField()
|
||||
target_verbose = SerializerMethodField()
|
||||
reviewer_groups_obj = ReviewerGroupSerializer(
|
||||
many=True, read_only=True, source="reviewer_groups"
|
||||
)
|
||||
reviewers = SlugRelatedField(slug_field="uuid", many=True, queryset=User.objects.all())
|
||||
reviewers_obj = ReviewerUserSerializer(many=True, read_only=True, source="reviewers")
|
||||
|
||||
class Meta:
|
||||
model = LifecycleRule
|
||||
fields = [
|
||||
"id",
|
||||
"name",
|
||||
"content_type",
|
||||
"object_id",
|
||||
"interval",
|
||||
"grace_period",
|
||||
"reviewer_groups",
|
||||
"reviewer_groups_obj",
|
||||
"min_reviewers",
|
||||
"min_reviewers_is_per_group",
|
||||
"reviewers",
|
||||
"reviewers_obj",
|
||||
"notification_transports",
|
||||
"target_verbose",
|
||||
]
|
||||
read_only_fields = ["id", "reviewers_obj", "reviewer_groups_obj", "target_verbose"]
|
||||
|
||||
def get_target_verbose(self, rule: LifecycleRule) -> str:
|
||||
if rule.object_id is None:
|
||||
return rule.content_type.model_class()._meta.verbose_name_plural
|
||||
else:
|
||||
return f"{rule.content_type.model_class()._meta.verbose_name}: {rule.object}"
|
||||
|
||||
def validate_object_id(self, value: str) -> str | None:
|
||||
if value == "":
|
||||
return None
|
||||
return value
|
||||
|
||||
def validate(self, attrs: dict) -> dict:
|
||||
if (
|
||||
attrs.get("object_id") is not None
|
||||
and not attrs["content_type"]
|
||||
.get_all_objects_for_this_type(pk=attrs["object_id"])
|
||||
.exists()
|
||||
):
|
||||
raise ValidationError({"object_id": _("Object does not exist")})
|
||||
if "reviewer_groups" in attrs or "reviewers" in attrs:
|
||||
reviewer_groups = attrs.get(
|
||||
"reviewer_groups", self.instance.reviewer_groups.all() if self.instance else []
|
||||
)
|
||||
reviewers = attrs.get(
|
||||
"reviewers", self.instance.reviewers.all() if self.instance else []
|
||||
)
|
||||
if len(reviewer_groups) == 0 and len(reviewers) == 0:
|
||||
raise ValidationError(_("Either a reviewer group or a reviewer must be set."))
|
||||
if "grace_period" in attrs or "interval" in attrs:
|
||||
grace_period = attrs.get("grace_period", getattr(self.instance, "grace_period", None))
|
||||
interval = attrs.get("interval", getattr(self.instance, "interval", None))
|
||||
if (
|
||||
grace_period is not None
|
||||
and interval is not None
|
||||
and (timedelta_from_string(grace_period) > timedelta_from_string(interval))
|
||||
):
|
||||
raise ValidationError(
|
||||
{"grace_period": _("Grace period must be shorter than the interval.")}
|
||||
)
|
||||
if "content_type" in attrs or "object_id" in attrs:
|
||||
content_type = attrs.get("content_type", getattr(self.instance, "content_type", None))
|
||||
object_id = attrs.get("object_id", getattr(self.instance, "object_id", None))
|
||||
if content_type is not None and object_id is None:
|
||||
existing = LifecycleRule.objects.filter(
|
||||
content_type=content_type, object_id__isnull=True
|
||||
)
|
||||
if self.instance:
|
||||
existing = existing.exclude(pk=self.instance.pk)
|
||||
if existing.exists():
|
||||
raise ValidationError(
|
||||
{
|
||||
"content_type": _(
|
||||
"Only one type-wide rule for each object type is allowed."
|
||||
)
|
||||
}
|
||||
)
|
||||
return attrs
|
||||
|
||||
|
||||
class LifecycleRuleViewSet(ModelViewSet):
|
||||
queryset = LifecycleRule.objects.all()
|
||||
serializer_class = LifecycleRuleSerializer
|
||||
search_fields = ["content_type__model", "reviewer_groups__name", "reviewers__username"]
|
||||
ordering = ["name"]
|
||||
ordering_fields = ["name", "content_type__model"]
|
||||
filterset_fields = ["content_type__model"]
|
||||
@@ -1,22 +0,0 @@
|
||||
from authentik.enterprise.apps import EnterpriseConfig
|
||||
from authentik.lib.utils.time import fqdn_rand
|
||||
from authentik.tasks.schedules.common import ScheduleSpec
|
||||
|
||||
|
||||
class ReportsConfig(EnterpriseConfig):
|
||||
name = "authentik.enterprise.lifecycle"
|
||||
label = "authentik_lifecycle"
|
||||
verbose_name = "authentik Enterprise.Lifecycle"
|
||||
default = True
|
||||
|
||||
@property
|
||||
def tenant_schedule_specs(self) -> list[ScheduleSpec]:
|
||||
from authentik.enterprise.lifecycle.tasks import apply_lifecycle_rules
|
||||
|
||||
return [
|
||||
ScheduleSpec(
|
||||
actor=apply_lifecycle_rules,
|
||||
crontab=f"{fqdn_rand('lifecycle_apply_lifecycle_rules')} "
|
||||
f"{fqdn_rand('lifecycle_apply_lifecycle_rules', 24)} * * *",
|
||||
)
|
||||
]
|
||||
@@ -1,154 +0,0 @@
|
||||
# Generated by Django 5.2.11 on 2026-02-09 15:57
|
||||
|
||||
import authentik.lib.utils.time
|
||||
import django.db.models.deletion
|
||||
import uuid
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
("authentik_core", "0057_remove_user_groups_remove_user_user_permissions_and_more"),
|
||||
("authentik_events", "0016_alter_event_action"),
|
||||
("contenttypes", "0002_remove_content_type_name"),
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="LifecycleRule",
|
||||
fields=[
|
||||
("id", models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
|
||||
("name", models.TextField(unique=True)),
|
||||
("object_id", models.TextField(default=None, null=True)),
|
||||
(
|
||||
"interval",
|
||||
models.TextField(
|
||||
default="days=60",
|
||||
validators=[authentik.lib.utils.time.timedelta_string_validator],
|
||||
),
|
||||
),
|
||||
(
|
||||
"grace_period",
|
||||
models.TextField(
|
||||
default="days=30",
|
||||
validators=[authentik.lib.utils.time.timedelta_string_validator],
|
||||
),
|
||||
),
|
||||
("min_reviewers", models.PositiveSmallIntegerField(default=1)),
|
||||
("min_reviewers_is_per_group", models.BooleanField(default=False)),
|
||||
(
|
||||
"content_type",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE, to="contenttypes.contenttype"
|
||||
),
|
||||
),
|
||||
(
|
||||
"notification_transports",
|
||||
models.ManyToManyField(
|
||||
blank=True,
|
||||
help_text="Select which transports should be used to notify the reviewers. If none are selected, the notification will only be shown in the authentik UI.",
|
||||
to="authentik_events.notificationtransport",
|
||||
),
|
||||
),
|
||||
("reviewer_groups", models.ManyToManyField(blank=True, to="authentik_core.group")),
|
||||
("reviewers", models.ManyToManyField(blank=True, to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name="LifecycleIteration",
|
||||
fields=[
|
||||
(
|
||||
"managed",
|
||||
models.TextField(
|
||||
default=None,
|
||||
help_text="Objects that are managed by authentik. These objects are created and updated automatically. This flag only indicates that an object can be overwritten by migrations. You can still modify the objects via the API, but expect changes to be overwritten in a later update.",
|
||||
null=True,
|
||||
unique=True,
|
||||
verbose_name="Managed by authentik",
|
||||
),
|
||||
),
|
||||
("id", models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
|
||||
("object_id", models.TextField()),
|
||||
(
|
||||
"state",
|
||||
models.CharField(
|
||||
choices=[
|
||||
("REVIEWED", "Reviewed"),
|
||||
("PENDING", "Pending"),
|
||||
("OVERDUE", "Overdue"),
|
||||
("CANCELED", "Canceled"),
|
||||
],
|
||||
default="PENDING",
|
||||
max_length=10,
|
||||
),
|
||||
),
|
||||
("opened_on", models.DateField(auto_now_add=True)),
|
||||
(
|
||||
"content_type",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE, to="contenttypes.contenttype"
|
||||
),
|
||||
),
|
||||
(
|
||||
"rule",
|
||||
models.ForeignKey(
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.SET_NULL,
|
||||
to="authentik_lifecycle.lifecyclerule",
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name="Review",
|
||||
fields=[
|
||||
("id", models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
|
||||
("timestamp", models.DateTimeField(auto_now_add=True)),
|
||||
("note", models.TextField(null=True)),
|
||||
(
|
||||
"iteration",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
to="authentik_lifecycle.lifecycleiteration",
|
||||
),
|
||||
),
|
||||
(
|
||||
"reviewer",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name="lifecyclerule",
|
||||
index=models.Index(fields=["content_type"], name="authentik_l_content_4e3a6a_idx"),
|
||||
),
|
||||
migrations.AddConstraint(
|
||||
model_name="lifecyclerule",
|
||||
constraint=models.UniqueConstraint(
|
||||
condition=models.Q(("object_id__isnull", True)),
|
||||
fields=("content_type",),
|
||||
name="uniq_lifecycle_rule_ct_null_object",
|
||||
),
|
||||
),
|
||||
migrations.AlterUniqueTogether(
|
||||
name="lifecyclerule",
|
||||
unique_together={("content_type", "object_id")},
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name="lifecycleiteration",
|
||||
index=models.Index(
|
||||
fields=["content_type", "opened_on"], name="authentik_l_content_09c32a_idx"
|
||||
),
|
||||
),
|
||||
migrations.AlterUniqueTogether(
|
||||
name="review",
|
||||
unique_together={("iteration", "reviewer")},
|
||||
),
|
||||
]
|
||||
@@ -1,18 +0,0 @@
|
||||
# Generated by Django 5.2.11 on 2026-02-13 09:33
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("authentik_lifecycle", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name="lifecycleiteration",
|
||||
name="opened_on",
|
||||
field=models.DateTimeField(auto_now_add=True),
|
||||
),
|
||||
]
|
||||
@@ -1,292 +0,0 @@
|
||||
from datetime import timedelta
|
||||
from uuid import uuid4
|
||||
|
||||
from django.contrib.contenttypes.fields import GenericForeignKey
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db import models
|
||||
from django.db.models import Q, QuerySet
|
||||
from django.db.models.fields import Field
|
||||
from django.db.models.functions import Cast
|
||||
from django.http import HttpRequest
|
||||
from django.utils import timezone
|
||||
from django.utils.translation import gettext as _
|
||||
from rest_framework.serializers import BaseSerializer
|
||||
|
||||
from authentik.blueprints.models import ManagedModel
|
||||
from authentik.core.models import Group, User
|
||||
from authentik.enterprise.lifecycle.utils import link_for_model, start_of_day
|
||||
from authentik.events.models import Event, EventAction, NotificationSeverity, NotificationTransport
|
||||
from authentik.lib.models import SerializerModel
|
||||
from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator
|
||||
|
||||
|
||||
class LifecycleRule(SerializerModel):
|
||||
id = models.UUIDField(primary_key=True, default=uuid4)
|
||||
name = models.TextField(unique=True)
|
||||
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
|
||||
object_id = models.TextField(null=True, default=None)
|
||||
object = GenericForeignKey("content_type", "object_id")
|
||||
|
||||
interval = models.TextField(
|
||||
default="days=60",
|
||||
validators=[timedelta_string_validator],
|
||||
)
|
||||
# Grace period starts after a review is due
|
||||
grace_period = models.TextField(
|
||||
default="days=30",
|
||||
validators=[timedelta_string_validator],
|
||||
)
|
||||
|
||||
# The review has to be conducted by `min_reviewers` members of `reviewer_groups`
|
||||
# (total or per group depending on `min_reviewers_is_per_group` flag) as well
|
||||
# as all of `reviewers`
|
||||
reviewer_groups = models.ManyToManyField("authentik_core.Group", blank=True)
|
||||
min_reviewers = models.PositiveSmallIntegerField(default=1)
|
||||
min_reviewers_is_per_group = models.BooleanField(default=False)
|
||||
reviewers = models.ManyToManyField("authentik_core.User", blank=True)
|
||||
|
||||
notification_transports = models.ManyToManyField(
|
||||
NotificationTransport,
|
||||
help_text=_(
|
||||
"Select which transports should be used to notify the reviewers. If none are "
|
||||
"selected, the notification will only be shown in the authentik UI."
|
||||
),
|
||||
blank=True,
|
||||
)
|
||||
|
||||
class Meta:
|
||||
indexes = [models.Index(fields=["content_type"])]
|
||||
unique_together = [["content_type", "object_id"]]
|
||||
constraints = [
|
||||
models.UniqueConstraint(
|
||||
fields=["content_type"],
|
||||
condition=Q(object_id__isnull=True),
|
||||
name="uniq_lifecycle_rule_ct_null_object",
|
||||
)
|
||||
]
|
||||
|
||||
@property
|
||||
def serializer(self) -> type[BaseSerializer]:
|
||||
from authentik.enterprise.lifecycle.api.rules import LifecycleRuleSerializer
|
||||
|
||||
return LifecycleRuleSerializer
|
||||
|
||||
def _get_pk_field(self) -> Field:
|
||||
model = self.content_type.model_class()
|
||||
pk = model._meta.pk
|
||||
while hasattr(pk, "target_field"):
|
||||
pk = pk.target_field
|
||||
return pk.__class__()
|
||||
|
||||
def get_objects(self) -> QuerySet:
|
||||
qs = self.content_type.get_all_objects_for_this_type()
|
||||
if self.object_id:
|
||||
qs = qs.filter(pk=self.object_id)
|
||||
else:
|
||||
qs = qs.exclude(
|
||||
pk__in=LifecycleRule.objects.filter(
|
||||
content_type=self.content_type, object_id__isnull=False
|
||||
).values_list(Cast("object_id", output_field=self._get_pk_field()), flat=True)
|
||||
)
|
||||
return qs
|
||||
|
||||
def _get_stale_iterations(self) -> QuerySet[LifecycleIteration]:
|
||||
filter = ~Q(content_type=self.content_type)
|
||||
if self.object_id:
|
||||
filter = filter | ~Q(object_id=self.object_id)
|
||||
filter = Q(state__in=(ReviewState.PENDING, ReviewState.OVERDUE)) & filter
|
||||
return self.lifecycleiteration_set.filter(filter)
|
||||
|
||||
def _get_newly_overdue_iterations(self) -> QuerySet[LifecycleIteration]:
|
||||
return self.lifecycleiteration_set.filter(
|
||||
opened_on__lt=start_of_day(
|
||||
timezone.now() + timedelta(days=1) - timedelta_from_string(self.grace_period)
|
||||
),
|
||||
state=ReviewState.PENDING,
|
||||
)
|
||||
|
||||
def _get_newly_due_objects(self) -> QuerySet:
|
||||
recent_iteration_ids = LifecycleIteration.objects.filter(
|
||||
content_type=self.content_type,
|
||||
object_id__isnull=False,
|
||||
opened_on__gte=start_of_day(
|
||||
timezone.now() + timedelta(days=1) - timedelta_from_string(self.interval)
|
||||
),
|
||||
).values_list(Cast("object_id", output_field=self._get_pk_field()), flat=True)
|
||||
|
||||
return self.get_objects().exclude(pk__in=recent_iteration_ids)
|
||||
|
||||
def apply(self):
|
||||
self._get_stale_iterations().update(state=ReviewState.CANCELED)
|
||||
|
||||
for iteration in self._get_newly_overdue_iterations():
|
||||
iteration.make_overdue()
|
||||
|
||||
for obj in self._get_newly_due_objects():
|
||||
LifecycleIteration.start(content_type=self.content_type, object_id=obj.pk, rule=self)
|
||||
|
||||
def is_satisfied_for_iteration(self, iteration: LifecycleIteration) -> bool:
|
||||
reviewers = self.reviewers.all()
|
||||
if (
|
||||
iteration.review_set.filter(reviewer__in=reviewers).distinct("reviewer").count()
|
||||
< reviewers.count()
|
||||
):
|
||||
return False
|
||||
if self.reviewer_groups.count() == 0:
|
||||
return True
|
||||
if self.min_reviewers_is_per_group:
|
||||
for g in self.reviewer_groups.all():
|
||||
if (
|
||||
iteration.review_set.filter(
|
||||
reviewer__groups__in=Group.objects.filter(pk=g.pk).with_descendants()
|
||||
)
|
||||
.distinct()
|
||||
.count()
|
||||
< self.min_reviewers
|
||||
):
|
||||
return False
|
||||
return True
|
||||
else:
|
||||
return (
|
||||
iteration.review_set.filter(
|
||||
reviewer__groups__in=self.reviewer_groups.all().with_descendants()
|
||||
)
|
||||
.distinct()
|
||||
.count()
|
||||
>= self.min_reviewers
|
||||
)
|
||||
|
||||
def get_reviewers(self) -> QuerySet[User]:
|
||||
return User.objects.filter(
|
||||
Q(id__in=self.reviewers.all().values_list("pk", flat=True))
|
||||
| Q(groups__in=self.reviewer_groups.all().with_descendants())
|
||||
).distinct()
|
||||
|
||||
def notify_reviewers(self, event: Event, severity: str):
|
||||
from authentik.enterprise.lifecycle.tasks import send_notification
|
||||
|
||||
for transport in self.notification_transports.all():
|
||||
for user in self.get_reviewers():
|
||||
send_notification.send_with_options(
|
||||
args=(transport.pk, event.pk, user.pk, severity),
|
||||
rel_obj=transport,
|
||||
)
|
||||
if transport.send_once:
|
||||
break
|
||||
|
||||
|
||||
class ReviewState(models.TextChoices):
|
||||
REVIEWED = "REVIEWED", _("Reviewed")
|
||||
PENDING = "PENDING", _("Pending")
|
||||
OVERDUE = "OVERDUE", _("Overdue")
|
||||
CANCELED = "CANCELED", _("Canceled")
|
||||
|
||||
|
||||
class LifecycleIteration(SerializerModel, ManagedModel):
|
||||
id = models.UUIDField(primary_key=True, default=uuid4)
|
||||
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
|
||||
object_id = models.TextField(null=False)
|
||||
object = GenericForeignKey("content_type", "object_id")
|
||||
|
||||
rule = models.ForeignKey(LifecycleRule, null=True, on_delete=models.SET_NULL)
|
||||
|
||||
state = models.CharField(max_length=10, choices=ReviewState, default=ReviewState.PENDING)
|
||||
opened_on = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
class Meta:
|
||||
indexes = [models.Index(fields=["content_type", "opened_on"])]
|
||||
|
||||
@property
|
||||
def serializer(self) -> type[BaseSerializer]:
|
||||
from authentik.enterprise.lifecycle.api.iterations import LifecycleIterationSerializer
|
||||
|
||||
return LifecycleIterationSerializer
|
||||
|
||||
def _get_model_name(self) -> str:
|
||||
return self.content_type.name.lower()
|
||||
|
||||
def _get_event_args(self) -> dict:
|
||||
return {
|
||||
"target": self.object,
|
||||
"hyperlink": link_for_model(self.object),
|
||||
"hyperlink_label": _(f"Go to {self._get_model_name()}"),
|
||||
"lifecycle_iteration": self.id,
|
||||
}
|
||||
|
||||
def initialize(self):
|
||||
event = Event.new(
|
||||
EventAction.REVIEW_INITIATED,
|
||||
message=_(f"Access review is due for {self.content_type.name} {str(self.object)}"),
|
||||
**self._get_event_args(),
|
||||
)
|
||||
event.save()
|
||||
self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
|
||||
|
||||
def make_overdue(self):
|
||||
self.state = ReviewState.OVERDUE
|
||||
|
||||
event = Event.new(
|
||||
EventAction.REVIEW_OVERDUE,
|
||||
message=_(f"Access review is overdue for {self.content_type.name} {str(self.object)}"),
|
||||
**self._get_event_args(),
|
||||
)
|
||||
event.save()
|
||||
self.rule.notify_reviewers(event, NotificationSeverity.ALERT)
|
||||
self.save()
|
||||
|
||||
@staticmethod
|
||||
def start(content_type: ContentType, object_id: str, rule: LifecycleRule) -> LifecycleIteration:
|
||||
iteration = LifecycleIteration.objects.create(
|
||||
content_type=content_type, object_id=object_id, rule=rule
|
||||
)
|
||||
iteration.initialize()
|
||||
return iteration
|
||||
|
||||
def make_reviewed(self, request: HttpRequest):
|
||||
self.state = ReviewState.REVIEWED
|
||||
event = Event.new(
|
||||
EventAction.REVIEW_COMPLETED,
|
||||
message=_(f"Access review completed for {self.content_type.name} {str(self.object)}"),
|
||||
**self._get_event_args(),
|
||||
).from_http(request)
|
||||
event.save()
|
||||
self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
|
||||
self.save()
|
||||
|
||||
def on_review(self, request: HttpRequest):
|
||||
if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
|
||||
raise AssertionError("Review is not pending or overdue")
|
||||
if self.rule.is_satisfied_for_iteration(self):
|
||||
self.make_reviewed(request)
|
||||
|
||||
def user_can_review(self, user: User) -> bool:
|
||||
if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
|
||||
return False
|
||||
if self.review_set.filter(reviewer=user).exists():
|
||||
return False
|
||||
groups = self.rule.reviewer_groups.all()
|
||||
if groups:
|
||||
for group in groups:
|
||||
if group.is_member(user):
|
||||
return True
|
||||
return False
|
||||
else:
|
||||
return user in self.rule.get_reviewers()
|
||||
|
||||
|
||||
class Review(SerializerModel):
|
||||
id = models.UUIDField(primary_key=True, default=uuid4)
|
||||
iteration = models.ForeignKey(LifecycleIteration, on_delete=models.CASCADE)
|
||||
|
||||
reviewer = models.ForeignKey("authentik_core.User", on_delete=models.CASCADE)
|
||||
timestamp = models.DateTimeField(auto_now_add=True)
|
||||
note = models.TextField(null=True)
|
||||
|
||||
class Meta:
|
||||
unique_together = [["iteration", "reviewer"]]
|
||||
|
||||
@property
|
||||
def serializer(self) -> type[BaseSerializer]:
|
||||
from authentik.enterprise.lifecycle.api.reviews import ReviewSerializer
|
||||
|
||||
return ReviewSerializer
|
||||
@@ -1,22 +0,0 @@
|
||||
from django.db.models import Q
|
||||
from django.db.models.signals import post_save, pre_delete
|
||||
from django.dispatch import receiver
|
||||
|
||||
from authentik.enterprise.lifecycle.models import LifecycleRule, ReviewState
|
||||
|
||||
|
||||
@receiver(post_save, sender=LifecycleRule)
|
||||
def post_rule_save(sender, instance: LifecycleRule, created: bool, **_):
|
||||
from authentik.enterprise.lifecycle.tasks import apply_lifecycle_rule
|
||||
|
||||
apply_lifecycle_rule.send_with_options(
|
||||
args=(instance.id,),
|
||||
rel_obj=instance,
|
||||
)
|
||||
|
||||
|
||||
@receiver(pre_delete, sender=LifecycleRule)
|
||||
def pre_rule_delete(sender, instance: LifecycleRule, **_):
|
||||
instance.lifecycleiteration_set.filter(
|
||||
Q(state=ReviewState.PENDING) | Q(state=ReviewState.OVERDUE)
|
||||
).update(state=ReviewState.CANCELED)
|
||||
@@ -1,45 +0,0 @@
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from dramatiq import actor
|
||||
|
||||
from authentik.core.models import User
|
||||
from authentik.enterprise.lifecycle.models import LifecycleRule
|
||||
from authentik.events.models import Event, Notification, NotificationTransport
|
||||
|
||||
|
||||
@actor(description=_("Dispatch tasks to validate lifecycle rules."))
|
||||
def apply_lifecycle_rules():
|
||||
for rule in LifecycleRule.objects.all():
|
||||
apply_lifecycle_rule.send_with_options(
|
||||
args=(rule.id,),
|
||||
rel_obj=rule,
|
||||
)
|
||||
|
||||
|
||||
@actor(description=_("Apply lifecycle rule."))
|
||||
def apply_lifecycle_rule(rule_id: str):
|
||||
rule = LifecycleRule.objects.filter(pk=rule_id).first()
|
||||
if rule:
|
||||
rule.apply()
|
||||
|
||||
|
||||
@actor(description=_("Send lifecycle rule notification."))
|
||||
def send_notification(transport_pk: int, event_pk: str, user_pk: int, severity: str):
|
||||
event = Event.objects.filter(pk=event_pk).first()
|
||||
if not event:
|
||||
return
|
||||
user = User.objects.filter(pk=user_pk).first()
|
||||
if not user:
|
||||
return
|
||||
|
||||
notification = Notification(
|
||||
severity=severity,
|
||||
body=event.summary,
|
||||
event=event,
|
||||
user=user,
|
||||
hyperlink=event.hyperlink,
|
||||
hyperlink_label=event.hyperlink_label,
|
||||
)
|
||||
transport = NotificationTransport.objects.filter(pk=transport_pk).first()
|
||||
if not transport:
|
||||
return
|
||||
transport.send(notification)
|
||||
@@ -1,425 +0,0 @@
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.urls import reverse
|
||||
from rest_framework.test import APITestCase
|
||||
|
||||
from authentik.core.models import Application, Group
|
||||
from authentik.core.tests.utils import create_test_admin_user, create_test_user
|
||||
from authentik.enterprise.lifecycle.models import LifecycleIteration, LifecycleRule, ReviewState
|
||||
from authentik.enterprise.reports.tests.utils import patch_license
|
||||
from authentik.lib.generators import generate_id
|
||||
|
||||
|
||||
@patch_license
|
||||
class TestLifecycleRuleAPI(APITestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.user = create_test_admin_user()
|
||||
self.client.force_login(self.user)
|
||||
self.app = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
self.content_type = ContentType.objects.get_for_model(Application)
|
||||
self.reviewer_group = Group.objects.create(name=generate_id())
|
||||
|
||||
def test_list_rules(self):
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
)
|
||||
rule.reviewer_groups.add(self.reviewer_group)
|
||||
|
||||
response = self.client.get(reverse("authentik_api:lifecyclerule-list"))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertGreaterEqual(len(response.data["results"]), 1)
|
||||
|
||||
def test_create_rule_with_reviewer_group(self):
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:lifecyclerule-list"),
|
||||
{
|
||||
"name": generate_id(),
|
||||
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
|
||||
"object_id": str(self.app.pk),
|
||||
"interval": "days=30",
|
||||
"grace_period": "days=10",
|
||||
"reviewer_groups": [str(self.reviewer_group.pk)],
|
||||
"reviewers": [],
|
||||
"min_reviewers": 1,
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 201)
|
||||
self.assertEqual(response.data["object_id"], str(self.app.pk))
|
||||
self.assertEqual(response.data["interval"], "days=30")
|
||||
|
||||
def test_create_rule_with_explicit_reviewer(self):
|
||||
reviewer = create_test_user()
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:lifecyclerule-list"),
|
||||
{
|
||||
"name": generate_id(),
|
||||
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
|
||||
"object_id": str(self.app.pk),
|
||||
"interval": "days=60",
|
||||
"grace_period": "days=15",
|
||||
"reviewer_groups": [],
|
||||
"reviewers": [str(reviewer.uuid)],
|
||||
"min_reviewers": 1,
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 201)
|
||||
self.assertIn(reviewer.uuid, response.data["reviewers"])
|
||||
|
||||
def test_create_rule_type_level(self):
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:lifecyclerule-list"),
|
||||
{
|
||||
"name": generate_id(),
|
||||
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
|
||||
"object_id": None,
|
||||
"interval": "days=90",
|
||||
"grace_period": "days=30",
|
||||
"reviewer_groups": [str(self.reviewer_group.pk)],
|
||||
"reviewers": [],
|
||||
"min_reviewers": 1,
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 201)
|
||||
self.assertIsNone(response.data["object_id"])
|
||||
|
||||
def test_create_rule_fails_without_reviewers(self):
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:lifecyclerule-list"),
|
||||
{
|
||||
"name": generate_id(),
|
||||
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
|
||||
"object_id": str(self.app.pk),
|
||||
"interval": "days=30",
|
||||
"grace_period": "days=10",
|
||||
"reviewer_groups": [],
|
||||
"reviewers": [],
|
||||
"min_reviewers": 1,
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_create_rule_fails_grace_period_longer_than_interval(self):
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:lifecyclerule-list"),
|
||||
{
|
||||
"name": generate_id(),
|
||||
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
|
||||
"object_id": str(self.app.pk),
|
||||
"interval": "days=10",
|
||||
"grace_period": "days=30",
|
||||
"reviewer_groups": [str(self.reviewer_group.pk)],
|
||||
"reviewers": [],
|
||||
"min_reviewers": 1,
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn("grace_period", response.data)
|
||||
|
||||
def test_create_rule_fails_invalid_object_id(self):
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:lifecyclerule-list"),
|
||||
{
|
||||
"name": generate_id(),
|
||||
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
|
||||
"object_id": "00000000-0000-0000-0000-000000000000",
|
||||
"interval": "days=30",
|
||||
"grace_period": "days=10",
|
||||
"reviewer_groups": [str(self.reviewer_group.pk)],
|
||||
"reviewers": [],
|
||||
"min_reviewers": 1,
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn("object_id", response.data)
|
||||
|
||||
def test_retrieve_rule(self):
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
)
|
||||
rule.reviewer_groups.add(self.reviewer_group)
|
||||
|
||||
response = self.client.get(
|
||||
reverse("authentik_api:lifecyclerule-detail", kwargs={"pk": rule.pk})
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.data["id"], str(rule.pk))
|
||||
|
||||
def test_update_rule(self):
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
interval="days=30",
|
||||
)
|
||||
rule.reviewer_groups.add(self.reviewer_group)
|
||||
|
||||
response = self.client.patch(
|
||||
reverse("authentik_api:lifecyclerule-detail", kwargs={"pk": rule.pk}),
|
||||
{"interval": "days=60"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.data["interval"], "days=60")
|
||||
|
||||
def test_delete_rule(self):
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
)
|
||||
rule.reviewer_groups.add(self.reviewer_group)
|
||||
|
||||
response = self.client.delete(
|
||||
reverse("authentik_api:lifecyclerule-detail", kwargs={"pk": rule.pk})
|
||||
)
|
||||
self.assertEqual(response.status_code, 204)
|
||||
self.assertFalse(LifecycleRule.objects.filter(pk=rule.pk).exists())
|
||||
|
||||
|
||||
@patch_license
|
||||
class TestIterationAPI(APITestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.user = create_test_admin_user()
|
||||
self.client.force_login(self.user)
|
||||
self.app = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
self.content_type = ContentType.objects.get_for_model(Application)
|
||||
self.reviewer_group = Group.objects.create(name=generate_id())
|
||||
self.reviewer_group.users.add(self.user)
|
||||
|
||||
def test_open_iterations(self):
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
)
|
||||
rule.reviewer_groups.add(self.reviewer_group)
|
||||
|
||||
response = self.client.get(reverse("authentik_api:lifecycleiteration-open-iterations"))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertGreaterEqual(len(response.data["results"]), 1)
|
||||
|
||||
for iteration in response.data["results"]:
|
||||
self.assertEqual(iteration["state"], ReviewState.PENDING)
|
||||
|
||||
def test_open_iterations_filter_user_is_reviewer(self):
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
)
|
||||
rule.reviewer_groups.add(self.reviewer_group)
|
||||
|
||||
response = self.client.get(
|
||||
reverse("authentik_api:lifecycleiteration-open-iterations"),
|
||||
{"user_is_reviewer": "true"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
# User is in reviewer_group, so should see the iteration
|
||||
self.assertGreaterEqual(len(response.data["results"]), 1)
|
||||
|
||||
def test_latest_iteration(self):
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
)
|
||||
rule.reviewer_groups.add(self.reviewer_group)
|
||||
|
||||
response = self.client.get(
|
||||
reverse(
|
||||
"authentik_api:lifecycleiteration-latest-iteration",
|
||||
kwargs={
|
||||
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
|
||||
"object_id": str(self.app.pk),
|
||||
},
|
||||
)
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.data["object_id"], str(self.app.pk))
|
||||
|
||||
def test_latest_iteration_not_found(self):
|
||||
response = self.client.get(
|
||||
reverse(
|
||||
"authentik_api:lifecycleiteration-latest-iteration",
|
||||
kwargs={
|
||||
"content_type": f"{self.content_type.app_label}.{self.content_type.model}",
|
||||
"object_id": "00000000-0000-0000-0000-000000000000",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
def test_iteration_includes_user_can_review(self):
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
)
|
||||
rule.reviewer_groups.add(self.reviewer_group)
|
||||
|
||||
response = self.client.get(reverse("authentik_api:lifecycleiteration-open-iterations"))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertGreaterEqual(len(response.data["results"]), 1)
|
||||
# user_can_review should be present
|
||||
self.assertIn("user_can_review", response.data["results"][0])
|
||||
|
||||
|
||||
@patch_license
|
||||
class TestReviewAPI(APITestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.user = create_test_admin_user()
|
||||
self.client.force_login(self.user)
|
||||
self.app = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
self.content_type = ContentType.objects.get_for_model(Application)
|
||||
self.reviewer_group = Group.objects.create(name=generate_id())
|
||||
self.reviewer_group.users.add(self.user)
|
||||
|
||||
def test_create_review(self):
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
min_reviewers=1,
|
||||
)
|
||||
rule.reviewer_groups.add(self.reviewer_group)
|
||||
|
||||
# Get the auto-created iteration
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=self.content_type, object_id=str(self.app.pk), rule=rule
|
||||
)
|
||||
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:review-list"),
|
||||
{
|
||||
"iteration": str(iteration.pk),
|
||||
"note": "Reviewed and approved",
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 201)
|
||||
self.assertEqual(response.data["iteration"], iteration.pk)
|
||||
self.assertEqual(response.data["note"], "Reviewed and approved")
|
||||
self.assertEqual(response.data["reviewer"]["pk"], self.user.pk)
|
||||
|
||||
def test_create_review_completes_iteration(self):
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
min_reviewers=1,
|
||||
)
|
||||
rule.reviewer_groups.add(self.reviewer_group)
|
||||
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=self.content_type, object_id=str(self.app.pk), rule=rule
|
||||
)
|
||||
self.assertEqual(iteration.state, ReviewState.PENDING)
|
||||
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:review-list"),
|
||||
{
|
||||
"iteration": str(iteration.pk),
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 201)
|
||||
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.REVIEWED)
|
||||
|
||||
def test_create_review_sets_reviewer_from_request(self):
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
min_reviewers=1,
|
||||
)
|
||||
rule.reviewer_groups.add(self.reviewer_group)
|
||||
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=self.content_type, object_id=str(self.app.pk), rule=rule
|
||||
)
|
||||
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:review-list"),
|
||||
{
|
||||
"iteration": str(iteration.pk),
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 201)
|
||||
# Reviewer should be the logged-in user
|
||||
self.assertEqual(response.data["reviewer"]["pk"], self.user.pk)
|
||||
|
||||
def test_non_reviewer_cannot_review(self):
|
||||
other_group = Group.objects.create(name=generate_id())
|
||||
other_user = create_test_user()
|
||||
other_group.users.add(other_user)
|
||||
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
min_reviewers=1,
|
||||
)
|
||||
rule.reviewer_groups.add(other_group)
|
||||
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=self.content_type, object_id=str(self.app.pk), rule=rule
|
||||
)
|
||||
|
||||
# Current user is not in the reviewer group
|
||||
self.assertFalse(iteration.user_can_review(self.user))
|
||||
|
||||
def test_non_reviewer_review_via_api_rejected(self):
|
||||
other_group = Group.objects.create(name=generate_id())
|
||||
other_user = create_test_user()
|
||||
other_group.users.add(other_user)
|
||||
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
min_reviewers=1,
|
||||
)
|
||||
rule.reviewer_groups.add(other_group)
|
||||
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=self.content_type, object_id=str(self.app.pk), rule=rule
|
||||
)
|
||||
|
||||
# Current user (self.user) is NOT in the reviewer group
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:review-list"),
|
||||
{"iteration": str(iteration.pk)},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_duplicate_review_via_api_rejected(self):
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=self.content_type,
|
||||
object_id=str(self.app.pk),
|
||||
min_reviewers=2,
|
||||
)
|
||||
rule.reviewer_groups.add(self.reviewer_group)
|
||||
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=self.content_type, object_id=str(self.app.pk), rule=rule
|
||||
)
|
||||
|
||||
# First review should succeed
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:review-list"),
|
||||
{"iteration": str(iteration.pk)},
|
||||
)
|
||||
self.assertEqual(response.status_code, 201)
|
||||
|
||||
# Second review by same user should be rejected
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:review-list"),
|
||||
{"iteration": str(iteration.pk)},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
@@ -1,845 +0,0 @@
|
||||
import datetime as dt
|
||||
from datetime import timedelta
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.test import RequestFactory, TestCase
|
||||
from django.utils import timezone
|
||||
|
||||
from authentik.core.models import Application, Group
|
||||
from authentik.core.tests.utils import create_test_user
|
||||
from authentik.enterprise.lifecycle.models import (
|
||||
LifecycleIteration,
|
||||
LifecycleRule,
|
||||
Review,
|
||||
ReviewState,
|
||||
)
|
||||
from authentik.events.models import (
|
||||
Event,
|
||||
EventAction,
|
||||
NotificationSeverity,
|
||||
NotificationTransport,
|
||||
)
|
||||
from authentik.lib.generators import generate_id
|
||||
from authentik.rbac.models import Role
|
||||
|
||||
|
||||
class TestLifecycleModels(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.factory = RequestFactory()
|
||||
|
||||
def _get_request(self):
|
||||
return self.factory.get("/")
|
||||
|
||||
def _create_object(self, model):
|
||||
if model is Application:
|
||||
return Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
if model is Role:
|
||||
return Role.objects.create(name=generate_id())
|
||||
if model is Group:
|
||||
return Group.objects.create(name=generate_id())
|
||||
raise AssertionError(f"Unsupported model {model}")
|
||||
|
||||
def _create_rule_for_object(self, obj, **kwargs) -> LifecycleRule:
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
return LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=content_type,
|
||||
object_id=str(obj.pk),
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def _create_rule_for_type(self, model, **kwargs) -> LifecycleRule:
|
||||
content_type = ContentType.objects.get_for_model(model)
|
||||
return LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=content_type,
|
||||
object_id=None,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def test_iteration_start_supported_objects(self):
|
||||
"""Ensure iterations are automatically started for applications, roles, and groups."""
|
||||
for model in (Application, Role, Group):
|
||||
with self.subTest(model=model.__name__):
|
||||
obj = self._create_object(model)
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
|
||||
before_events = Event.objects.filter(action=EventAction.REVIEW_INITIATED).count()
|
||||
|
||||
rule = self._create_rule_for_object(obj)
|
||||
|
||||
# Verify iteration was created automatically
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(obj.pk), rule=rule
|
||||
)
|
||||
self.assertEqual(iteration.state, ReviewState.PENDING)
|
||||
self.assertEqual(iteration.object, obj)
|
||||
self.assertEqual(iteration.rule, rule)
|
||||
self.assertEqual(
|
||||
Event.objects.filter(action=EventAction.REVIEW_INITIATED).count(),
|
||||
before_events + 1,
|
||||
)
|
||||
|
||||
def test_review_requires_all_explicit_reviewers(self):
|
||||
obj = Group.objects.create(name=generate_id())
|
||||
rule = self._create_rule_for_object(obj)
|
||||
reviewer_one = create_test_user()
|
||||
reviewer_two = create_test_user()
|
||||
rule.reviewers.add(reviewer_one, reviewer_two)
|
||||
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(obj.pk), rule=rule
|
||||
)
|
||||
request = self._get_request()
|
||||
|
||||
Review.objects.create(iteration=iteration, reviewer=reviewer_one)
|
||||
iteration.on_review(request)
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.PENDING)
|
||||
|
||||
Review.objects.create(iteration=iteration, reviewer=reviewer_two)
|
||||
iteration.on_review(request)
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.REVIEWED)
|
||||
self.assertTrue(Event.objects.filter(action=EventAction.REVIEW_COMPLETED).exists())
|
||||
|
||||
def test_review_min_reviewers_from_groups(self):
|
||||
"""Group-based reviews complete once the minimum number of reviewers review."""
|
||||
obj = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
rule = self._create_rule_for_object(obj, min_reviewers=2)
|
||||
|
||||
reviewer_group = Group.objects.create(name=generate_id())
|
||||
reviewer_one = create_test_user()
|
||||
reviewer_two = create_test_user()
|
||||
reviewer_group.users.add(reviewer_one, reviewer_two)
|
||||
rule.reviewer_groups.add(reviewer_group)
|
||||
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(obj.pk), rule=rule
|
||||
)
|
||||
request = self._get_request()
|
||||
|
||||
Review.objects.create(iteration=iteration, reviewer=reviewer_one)
|
||||
iteration.on_review(request)
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.PENDING)
|
||||
|
||||
Review.objects.create(iteration=iteration, reviewer=reviewer_two)
|
||||
iteration.on_review(request)
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.REVIEWED)
|
||||
|
||||
def test_review_explicit_and_group_reviewers(self):
|
||||
"""Reviews require both explicit reviewers AND min_reviewers from groups."""
|
||||
obj = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
rule = self._create_rule_for_object(obj, min_reviewers=1)
|
||||
|
||||
reviewer_group = Group.objects.create(name=generate_id())
|
||||
group_member = create_test_user()
|
||||
reviewer_group.users.add(group_member)
|
||||
rule.reviewer_groups.add(reviewer_group)
|
||||
|
||||
explicit_reviewer = create_test_user()
|
||||
rule.reviewers.add(explicit_reviewer)
|
||||
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(obj.pk), rule=rule
|
||||
)
|
||||
request = self._get_request()
|
||||
|
||||
# Only group member reviews - not satisfied (explicit reviewer missing)
|
||||
Review.objects.create(iteration=iteration, reviewer=group_member)
|
||||
iteration.on_review(request)
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.PENDING)
|
||||
|
||||
# Explicit reviewer reviews - now satisfied
|
||||
Review.objects.create(iteration=iteration, reviewer=explicit_reviewer)
|
||||
iteration.on_review(request)
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.REVIEWED)
|
||||
|
||||
def test_review_min_reviewers_per_group(self):
|
||||
obj = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
rule = self._create_rule_for_object(obj, min_reviewers=1, min_reviewers_is_per_group=True)
|
||||
|
||||
group_one = Group.objects.create(name=generate_id())
|
||||
group_two = Group.objects.create(name=generate_id())
|
||||
member_group_one = create_test_user()
|
||||
member_group_two = create_test_user()
|
||||
group_one.users.add(member_group_one)
|
||||
group_two.users.add(member_group_two)
|
||||
rule.reviewer_groups.add(group_one, group_two)
|
||||
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(obj.pk), rule=rule
|
||||
)
|
||||
request = self._get_request()
|
||||
|
||||
# Only member from group_one reviews - not satisfied (need member from each group)
|
||||
Review.objects.create(iteration=iteration, reviewer=member_group_one)
|
||||
iteration.on_review(request)
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.PENDING)
|
||||
|
||||
# Member from group_two reviews - now satisfied
|
||||
Review.objects.create(iteration=iteration, reviewer=member_group_two)
|
||||
iteration.on_review(request)
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.REVIEWED)
|
||||
|
||||
def test_review_reviewers_from_child_groups(self):
|
||||
obj = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
rule = self._create_rule_for_object(obj, min_reviewers=1)
|
||||
|
||||
parent_group = Group.objects.create(name=generate_id())
|
||||
child_group = Group.objects.create(name=generate_id())
|
||||
child_group.parents.add(parent_group)
|
||||
|
||||
child_member = create_test_user()
|
||||
child_group.users.add(child_member)
|
||||
|
||||
rule.reviewer_groups.add(parent_group)
|
||||
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(obj.pk), rule=rule
|
||||
)
|
||||
request = self._get_request()
|
||||
|
||||
# Child group member should be able to review
|
||||
self.assertTrue(iteration.user_can_review(child_member))
|
||||
|
||||
Review.objects.create(iteration=iteration, reviewer=child_member)
|
||||
iteration.on_review(request)
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.REVIEWED)
|
||||
|
||||
def test_review_reviewers_from_nested_child_groups(self):
|
||||
obj = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
rule = self._create_rule_for_object(obj, min_reviewers=2)
|
||||
|
||||
grandparent = Group.objects.create(name=generate_id())
|
||||
parent = Group.objects.create(name=generate_id())
|
||||
child = Group.objects.create(name=generate_id())
|
||||
parent.parents.add(grandparent)
|
||||
child.parents.add(parent)
|
||||
|
||||
parent_member = create_test_user()
|
||||
child_member = create_test_user()
|
||||
parent.users.add(parent_member)
|
||||
child.users.add(child_member)
|
||||
|
||||
rule.reviewer_groups.add(grandparent)
|
||||
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(obj.pk), rule=rule
|
||||
)
|
||||
request = self._get_request()
|
||||
|
||||
# Both nested members should be able to review
|
||||
self.assertTrue(iteration.user_can_review(parent_member))
|
||||
self.assertTrue(iteration.user_can_review(child_member))
|
||||
|
||||
Review.objects.create(iteration=iteration, reviewer=parent_member)
|
||||
iteration.on_review(request)
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.PENDING)
|
||||
|
||||
Review.objects.create(iteration=iteration, reviewer=child_member)
|
||||
iteration.on_review(request)
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.REVIEWED)
|
||||
|
||||
def test_notify_reviewers_send_once(self):
|
||||
obj = Group.objects.create(name=generate_id())
|
||||
rule = self._create_rule_for_object(obj)
|
||||
|
||||
reviewer_one = create_test_user()
|
||||
reviewer_two = create_test_user()
|
||||
rule.reviewers.add(reviewer_one, reviewer_two)
|
||||
|
||||
transport_once = NotificationTransport.objects.create(
|
||||
name=generate_id(),
|
||||
send_once=True,
|
||||
)
|
||||
transport_all = NotificationTransport.objects.create(
|
||||
name=generate_id(),
|
||||
send_once=False,
|
||||
)
|
||||
rule.notification_transports.add(transport_once, transport_all)
|
||||
|
||||
event = Event.new(EventAction.REVIEW_INITIATED, target=obj)
|
||||
event.save()
|
||||
|
||||
with patch(
|
||||
"authentik.enterprise.lifecycle.tasks.send_notification.send_with_options"
|
||||
) as send_with_options:
|
||||
rule.notify_reviewers(event, NotificationSeverity.NOTICE)
|
||||
|
||||
reviewer_pks = {reviewer_one.pk, reviewer_two.pk}
|
||||
self.assertEqual(send_with_options.call_count, len(reviewer_pks) + 1)
|
||||
|
||||
calls = [call.kwargs["args"] for call in send_with_options.call_args_list]
|
||||
once_calls = [args for args in calls if args[0] == transport_once.pk]
|
||||
all_calls = [args for args in calls if args[0] == transport_all.pk]
|
||||
|
||||
self.assertEqual(len(once_calls), 1)
|
||||
self.assertEqual(len(all_calls), len(reviewer_pks))
|
||||
self.assertIn(once_calls[0][2], reviewer_pks)
|
||||
self.assertEqual({args[2] for args in all_calls}, reviewer_pks)
|
||||
|
||||
def test_apply_marks_overdue_and_opens_due_reviews(self):
|
||||
app_one = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
app_two = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
content_type = ContentType.objects.get_for_model(Application)
|
||||
|
||||
rule_overdue = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=content_type,
|
||||
object_id=str(app_one.pk),
|
||||
interval="days=365",
|
||||
grace_period="days=10",
|
||||
)
|
||||
|
||||
# Get the automatically created iteration and backdate it past the grace period
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(app_one.pk), rule=rule_overdue
|
||||
)
|
||||
LifecycleIteration.objects.filter(pk=iteration.pk).update(
|
||||
opened_on=(timezone.now() - timedelta(days=20))
|
||||
)
|
||||
|
||||
# Apply again to trigger overdue logic
|
||||
rule_overdue.apply()
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.OVERDUE)
|
||||
self.assertEqual(
|
||||
LifecycleIteration.objects.filter(
|
||||
content_type=content_type, object_id=str(app_one.pk)
|
||||
).count(),
|
||||
1,
|
||||
)
|
||||
|
||||
LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=content_type,
|
||||
object_id=str(app_two.pk),
|
||||
interval="days=30",
|
||||
grace_period="days=10",
|
||||
)
|
||||
self.assertEqual(
|
||||
LifecycleIteration.objects.filter(
|
||||
content_type=content_type, object_id=str(app_two.pk)
|
||||
).count(),
|
||||
1,
|
||||
)
|
||||
new_iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(app_two.pk)
|
||||
)
|
||||
self.assertEqual(new_iteration.state, ReviewState.PENDING)
|
||||
|
||||
def test_apply_idempotent(self):
|
||||
app_due = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
app_overdue = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
content_type = ContentType.objects.get_for_model(Application)
|
||||
|
||||
initiated_before = Event.objects.filter(action=EventAction.REVIEW_INITIATED).count()
|
||||
overdue_before = Event.objects.filter(action=EventAction.REVIEW_OVERDUE).count()
|
||||
|
||||
rule_due = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=content_type,
|
||||
object_id=str(app_due.pk),
|
||||
interval="days=30",
|
||||
grace_period="days=30",
|
||||
)
|
||||
reviewer = create_test_user()
|
||||
rule_due.reviewers.add(reviewer)
|
||||
transport = NotificationTransport.objects.create(name=generate_id())
|
||||
rule_due.notification_transports.add(transport)
|
||||
|
||||
rule_overdue = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=content_type,
|
||||
object_id=str(app_overdue.pk),
|
||||
interval="days=365",
|
||||
grace_period="days=10",
|
||||
)
|
||||
|
||||
overdue_iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(app_overdue.pk), rule=rule_overdue
|
||||
)
|
||||
LifecycleIteration.objects.filter(pk=overdue_iteration.pk).update(
|
||||
opened_on=(timezone.now() - timedelta(days=20))
|
||||
)
|
||||
|
||||
# Apply overdue rule to mark iteration as overdue
|
||||
rule_overdue.apply()
|
||||
|
||||
due_iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(app_due.pk)
|
||||
)
|
||||
overdue_iteration.refresh_from_db()
|
||||
self.assertEqual(due_iteration.state, ReviewState.PENDING)
|
||||
self.assertEqual(overdue_iteration.state, ReviewState.OVERDUE)
|
||||
|
||||
initiated_after_first = Event.objects.filter(action=EventAction.REVIEW_INITIATED).count()
|
||||
overdue_after_first = Event.objects.filter(action=EventAction.REVIEW_OVERDUE).count()
|
||||
# Both rules created iterations on save
|
||||
self.assertEqual(initiated_after_first, initiated_before + 2)
|
||||
self.assertEqual(overdue_after_first, overdue_before + 1)
|
||||
|
||||
# Apply again - should be idempotent
|
||||
rule_due.apply()
|
||||
rule_overdue.apply()
|
||||
|
||||
due_iteration.refresh_from_db()
|
||||
overdue_iteration.refresh_from_db()
|
||||
self.assertEqual(due_iteration.state, ReviewState.PENDING)
|
||||
self.assertEqual(overdue_iteration.state, ReviewState.OVERDUE)
|
||||
self.assertEqual(
|
||||
Event.objects.filter(action=EventAction.REVIEW_INITIATED).count(),
|
||||
initiated_after_first,
|
||||
)
|
||||
self.assertEqual(
|
||||
Event.objects.filter(action=EventAction.REVIEW_OVERDUE).count(),
|
||||
overdue_after_first,
|
||||
)
|
||||
|
||||
def test_rule_matches_entire_type(self):
|
||||
"""A rule with object_id=None matches all objects of that type."""
|
||||
app_one = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
app_two = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
content_type = ContentType.objects.get_for_model(Application)
|
||||
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=content_type,
|
||||
object_id=None,
|
||||
interval="days=30",
|
||||
grace_period="days=10",
|
||||
)
|
||||
|
||||
objects = list(rule.get_objects())
|
||||
self.assertIn(app_one, objects)
|
||||
self.assertIn(app_two, objects)
|
||||
|
||||
def test_rule_type_excludes_objects_with_specific_rules(self):
|
||||
app_with_rule = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
app_without_rule = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
content_type = ContentType.objects.get_for_model(Application)
|
||||
|
||||
# Create a specific rule for app_with_rule
|
||||
LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=content_type,
|
||||
object_id=str(app_with_rule.pk),
|
||||
interval="days=30",
|
||||
)
|
||||
|
||||
# Create a type-level rule
|
||||
type_rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=content_type,
|
||||
object_id=None,
|
||||
interval="days=60",
|
||||
)
|
||||
|
||||
objects = list(type_rule.get_objects())
|
||||
self.assertNotIn(app_with_rule, objects)
|
||||
self.assertIn(app_without_rule, objects)
|
||||
|
||||
def test_rule_type_apply_creates_iterations_for_all_objects(self):
|
||||
app_one = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
app_two = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
content_type = ContentType.objects.get_for_model(Application)
|
||||
|
||||
LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=content_type,
|
||||
object_id=None,
|
||||
interval="days=30",
|
||||
grace_period="days=10",
|
||||
)
|
||||
|
||||
self.assertTrue(
|
||||
LifecycleIteration.objects.filter(
|
||||
content_type=content_type, object_id=str(app_one.pk)
|
||||
).exists()
|
||||
)
|
||||
self.assertTrue(
|
||||
LifecycleIteration.objects.filter(
|
||||
content_type=content_type, object_id=str(app_two.pk)
|
||||
).exists()
|
||||
)
|
||||
|
||||
def test_delete_rule_cancels_open_iterations(self):
|
||||
obj = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
|
||||
rule = self._create_rule_for_object(obj)
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
|
||||
pending_iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(obj.pk), rule=rule
|
||||
)
|
||||
self.assertEqual(pending_iteration.state, ReviewState.PENDING)
|
||||
|
||||
overdue_iteration = LifecycleIteration.objects.create(
|
||||
content_type=content_type,
|
||||
object_id=str(obj.pk),
|
||||
rule=rule,
|
||||
state=ReviewState.OVERDUE,
|
||||
)
|
||||
reviewed_iteration = LifecycleIteration.objects.create(
|
||||
content_type=content_type,
|
||||
object_id=str(obj.pk),
|
||||
rule=rule,
|
||||
state=ReviewState.REVIEWED,
|
||||
)
|
||||
|
||||
rule.delete()
|
||||
|
||||
pending_iteration.refresh_from_db()
|
||||
overdue_iteration.refresh_from_db()
|
||||
reviewed_iteration.refresh_from_db()
|
||||
|
||||
self.assertEqual(pending_iteration.state, ReviewState.CANCELED)
|
||||
self.assertEqual(overdue_iteration.state, ReviewState.CANCELED)
|
||||
self.assertEqual(reviewed_iteration.state, ReviewState.REVIEWED) # Not affected
|
||||
|
||||
def test_update_rule_target_cancels_stale_iterations(self):
|
||||
app_one = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
app_two = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
content_type = ContentType.objects.get_for_model(Application)
|
||||
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=content_type,
|
||||
object_id=str(app_one.pk),
|
||||
interval="days=30",
|
||||
)
|
||||
|
||||
iteration_for_app_one = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(app_one.pk), rule=rule
|
||||
)
|
||||
self.assertEqual(iteration_for_app_one.state, ReviewState.PENDING)
|
||||
|
||||
# Change rule target to app_two - save() triggers apply() which cancels stale iterations
|
||||
rule.object_id = str(app_two.pk)
|
||||
rule.save()
|
||||
|
||||
iteration_for_app_one.refresh_from_db()
|
||||
self.assertEqual(iteration_for_app_one.state, ReviewState.CANCELED)
|
||||
|
||||
def test_update_rule_content_type_cancels_stale_iterations(self):
|
||||
app = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
group = Group.objects.create(name=generate_id())
|
||||
app_content_type = ContentType.objects.get_for_model(Application)
|
||||
group_content_type = ContentType.objects.get_for_model(Group)
|
||||
|
||||
# Creating rule triggers automatic apply() which creates a iteration for app
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=app_content_type,
|
||||
object_id=str(app.pk),
|
||||
interval="days=30",
|
||||
)
|
||||
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=app_content_type, object_id=str(app.pk), rule=rule
|
||||
)
|
||||
self.assertEqual(iteration.state, ReviewState.PENDING)
|
||||
|
||||
# Change content type to Group - save() triggers apply() which cancels stale iterations
|
||||
rule.content_type = group_content_type
|
||||
rule.object_id = str(group.pk)
|
||||
rule.save()
|
||||
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.CANCELED)
|
||||
|
||||
def test_user_can_review_checks_group_hierarchy(self):
|
||||
obj = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
rule = self._create_rule_for_object(obj)
|
||||
|
||||
parent_group = Group.objects.create(name=generate_id())
|
||||
child_group = Group.objects.create(name=generate_id())
|
||||
child_group.parents.add(parent_group)
|
||||
|
||||
parent_member = create_test_user()
|
||||
child_member = create_test_user()
|
||||
non_member = create_test_user()
|
||||
parent_group.users.add(parent_member)
|
||||
child_group.users.add(child_member)
|
||||
|
||||
rule.reviewer_groups.add(parent_group)
|
||||
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
# iteration is created automatically when rule is saved
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(obj.pk), rule=rule
|
||||
)
|
||||
|
||||
self.assertTrue(iteration.user_can_review(parent_member))
|
||||
self.assertTrue(iteration.user_can_review(child_member))
|
||||
self.assertFalse(iteration.user_can_review(non_member))
|
||||
|
||||
def test_user_cannot_review_twice(self):
|
||||
obj = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
rule = self._create_rule_for_object(obj)
|
||||
reviewer = create_test_user()
|
||||
rule.reviewers.add(reviewer)
|
||||
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
# iteration is created automatically when rule is saved
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(obj.pk), rule=rule
|
||||
)
|
||||
|
||||
self.assertTrue(iteration.user_can_review(reviewer))
|
||||
|
||||
Review.objects.create(iteration=iteration, reviewer=reviewer)
|
||||
|
||||
self.assertFalse(iteration.user_can_review(reviewer))
|
||||
|
||||
def test_user_cannot_review_completed_iteration(self):
|
||||
obj = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
rule = self._create_rule_for_object(obj)
|
||||
reviewer = create_test_user()
|
||||
rule.reviewers.add(reviewer)
|
||||
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
|
||||
# Get the automatically created pending iteration and test with different states
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(obj.pk), rule=rule
|
||||
)
|
||||
|
||||
for state in (ReviewState.REVIEWED, ReviewState.CANCELED):
|
||||
iteration.state = state
|
||||
iteration.save()
|
||||
self.assertFalse(iteration.user_can_review(reviewer))
|
||||
|
||||
def test_get_reviewers_includes_child_group_members(self):
|
||||
obj = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
rule = self._create_rule_for_object(obj)
|
||||
|
||||
parent_group = Group.objects.create(name=generate_id())
|
||||
child_group = Group.objects.create(name=generate_id())
|
||||
child_group.parents.add(parent_group)
|
||||
|
||||
parent_member = create_test_user()
|
||||
child_member = create_test_user()
|
||||
parent_group.users.add(parent_member)
|
||||
child_group.users.add(child_member)
|
||||
|
||||
rule.reviewer_groups.add(parent_group)
|
||||
|
||||
reviewers = list(rule.get_reviewers())
|
||||
self.assertIn(parent_member, reviewers)
|
||||
self.assertIn(child_member, reviewers)
|
||||
|
||||
def test_get_reviewers_includes_explicit_reviewers(self):
|
||||
obj = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
rule = self._create_rule_for_object(obj)
|
||||
|
||||
explicit_reviewer = create_test_user()
|
||||
rule.reviewers.add(explicit_reviewer)
|
||||
|
||||
group = Group.objects.create(name=generate_id())
|
||||
group_member = create_test_user()
|
||||
group.users.add(group_member)
|
||||
rule.reviewer_groups.add(group)
|
||||
|
||||
reviewers = list(rule.get_reviewers())
|
||||
self.assertIn(explicit_reviewer, reviewers)
|
||||
self.assertIn(group_member, reviewers)
|
||||
|
||||
|
||||
class TestLifecycleDateBoundaries(TestCase):
|
||||
"""Verify that start_of_day normalization ensures correct overdue/due
|
||||
detection regardless of exact task execution time within a day.
|
||||
|
||||
The daily task may run at any point during the day. The start_of_day
|
||||
normalization in _get_newly_overdue_iterations and _get_newly_due_objects
|
||||
ensures that the boundary is always at midnight, so millisecond variations
|
||||
in task execution time do not affect results."""
|
||||
|
||||
def _create_rule_and_iteration(self, grace_period="days=1", interval="days=365"):
|
||||
app = Application.objects.create(name=generate_id(), slug=generate_id())
|
||||
content_type = ContentType.objects.get_for_model(Application)
|
||||
rule = LifecycleRule.objects.create(
|
||||
name=generate_id(),
|
||||
content_type=content_type,
|
||||
object_id=str(app.pk),
|
||||
interval=interval,
|
||||
grace_period=grace_period,
|
||||
)
|
||||
iteration = LifecycleIteration.objects.get(
|
||||
content_type=content_type, object_id=str(app.pk), rule=rule
|
||||
)
|
||||
return app, rule, iteration
|
||||
|
||||
def test_overdue_iteration_opened_yesterday(self):
|
||||
"""grace_period=1 day: iteration opened yesterday at any time is overdue today."""
|
||||
_, rule, iteration = self._create_rule_and_iteration(grace_period="days=1")
|
||||
fixed_now = dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC)
|
||||
for opened_on in [
|
||||
dt.datetime(2025, 6, 14, 0, 0, 0, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 14, 12, 0, 0, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 14, 23, 59, 59, 999999, tzinfo=dt.UTC),
|
||||
]:
|
||||
with self.subTest(opened_on=opened_on):
|
||||
LifecycleIteration.objects.filter(pk=iteration.pk).update(
|
||||
opened_on=opened_on, state=ReviewState.PENDING
|
||||
)
|
||||
with patch("django.utils.timezone.now", return_value=fixed_now):
|
||||
self.assertIn(iteration, list(rule._get_newly_overdue_iterations()))
|
||||
|
||||
def test_not_overdue_iteration_opened_today(self):
|
||||
"""grace_period=1 day: iteration opened today at any time is NOT overdue."""
|
||||
_, rule, iteration = self._create_rule_and_iteration(grace_period="days=1")
|
||||
fixed_now = dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC)
|
||||
for opened_on in [
|
||||
dt.datetime(2025, 6, 15, 0, 0, 0, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 15, 23, 59, 59, 999999, tzinfo=dt.UTC),
|
||||
]:
|
||||
with self.subTest(opened_on=opened_on):
|
||||
LifecycleIteration.objects.filter(pk=iteration.pk).update(
|
||||
opened_on=opened_on, state=ReviewState.PENDING
|
||||
)
|
||||
with patch("django.utils.timezone.now", return_value=fixed_now):
|
||||
self.assertNotIn(iteration, list(rule._get_newly_overdue_iterations()))
|
||||
|
||||
def test_overdue_independent_of_task_execution_time(self):
|
||||
"""Overdue detection gives the same result whether the task runs at 00:00:01 or 23:59:59."""
|
||||
_, rule, iteration = self._create_rule_and_iteration(grace_period="days=1")
|
||||
opened_on = dt.datetime(2025, 6, 14, 18, 0, 0, tzinfo=dt.UTC)
|
||||
LifecycleIteration.objects.filter(pk=iteration.pk).update(
|
||||
opened_on=opened_on, state=ReviewState.PENDING
|
||||
)
|
||||
for task_time in [
|
||||
dt.datetime(2025, 6, 15, 0, 0, 1, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 15, 12, 0, 0, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 15, 23, 59, 59, tzinfo=dt.UTC),
|
||||
]:
|
||||
with self.subTest(task_time=task_time):
|
||||
with patch("django.utils.timezone.now", return_value=task_time):
|
||||
self.assertIn(iteration, list(rule._get_newly_overdue_iterations()))
|
||||
|
||||
def test_overdue_boundary_multi_day_grace_period(self):
|
||||
"""grace_period=30 days: overdue after 30 full days, not after 29."""
|
||||
_, rule, iteration = self._create_rule_and_iteration(grace_period="days=30")
|
||||
fixed_now = dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC)
|
||||
|
||||
# Opened 30 days ago (May 16), should go overdue
|
||||
LifecycleIteration.objects.filter(pk=iteration.pk).update(
|
||||
opened_on=dt.datetime(2025, 5, 16, 12, 0, 0, tzinfo=dt.UTC),
|
||||
state=ReviewState.PENDING,
|
||||
)
|
||||
with patch("django.utils.timezone.now", return_value=fixed_now):
|
||||
self.assertIn(iteration, list(rule._get_newly_overdue_iterations()))
|
||||
|
||||
# Opened 29 days ago (May 17), should NOT go overdue
|
||||
LifecycleIteration.objects.filter(pk=iteration.pk).update(
|
||||
opened_on=dt.datetime(2025, 5, 17, 12, 0, 0, tzinfo=dt.UTC),
|
||||
state=ReviewState.PENDING,
|
||||
)
|
||||
with patch("django.utils.timezone.now", return_value=fixed_now):
|
||||
self.assertNotIn(iteration, list(rule._get_newly_overdue_iterations()))
|
||||
|
||||
def test_due_object_iteration_opened_yesterday(self):
|
||||
"""interval=1 day: object with iteration opened yesterday is due for a new review."""
|
||||
app, rule, iteration = self._create_rule_and_iteration(interval="days=1")
|
||||
fixed_now = dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC)
|
||||
for opened_on in [
|
||||
dt.datetime(2025, 6, 14, 0, 0, 0, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 14, 12, 0, 0, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 14, 23, 59, 59, 999999, tzinfo=dt.UTC),
|
||||
]:
|
||||
with self.subTest(opened_on=opened_on):
|
||||
LifecycleIteration.objects.filter(pk=iteration.pk).update(opened_on=opened_on)
|
||||
with patch("django.utils.timezone.now", return_value=fixed_now):
|
||||
self.assertIn(app, list(rule._get_newly_due_objects()))
|
||||
|
||||
def test_not_due_object_iteration_opened_today(self):
|
||||
"""interval=1 day: object with iteration opened today is NOT due."""
|
||||
app, rule, iteration = self._create_rule_and_iteration(interval="days=1")
|
||||
fixed_now = dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC)
|
||||
for opened_on in [
|
||||
dt.datetime(2025, 6, 15, 0, 0, 0, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 15, 23, 59, 59, 999999, tzinfo=dt.UTC),
|
||||
]:
|
||||
with self.subTest(opened_on=opened_on):
|
||||
LifecycleIteration.objects.filter(pk=iteration.pk).update(opened_on=opened_on)
|
||||
with patch("django.utils.timezone.now", return_value=fixed_now):
|
||||
self.assertNotIn(app, list(rule._get_newly_due_objects()))
|
||||
|
||||
def test_due_independent_of_task_execution_time(self):
|
||||
"""Due detection gives the same result whether the task runs at 00:00:01 or 23:59:59."""
|
||||
app, rule, iteration = self._create_rule_and_iteration(interval="days=1")
|
||||
opened_on = dt.datetime(2025, 6, 14, 18, 0, 0, tzinfo=dt.UTC)
|
||||
LifecycleIteration.objects.filter(pk=iteration.pk).update(opened_on=opened_on)
|
||||
for task_time in [
|
||||
dt.datetime(2025, 6, 15, 0, 0, 1, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 15, 12, 0, 0, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 15, 23, 59, 59, tzinfo=dt.UTC),
|
||||
]:
|
||||
with self.subTest(task_time=task_time):
|
||||
with patch("django.utils.timezone.now", return_value=task_time):
|
||||
self.assertIn(app, list(rule._get_newly_due_objects()))
|
||||
|
||||
def test_due_boundary_multi_day_interval(self):
|
||||
"""interval=30 days: due after 30 full days, not after 29."""
|
||||
app, rule, iteration = self._create_rule_and_iteration(interval="days=30")
|
||||
fixed_now = dt.datetime(2025, 6, 15, 14, 30, 0, tzinfo=dt.UTC)
|
||||
|
||||
# Previous review opened 30 days ago (May 16), review is due for the object
|
||||
LifecycleIteration.objects.filter(pk=iteration.pk).update(
|
||||
opened_on=dt.datetime(2025, 5, 16, 12, 0, 0, tzinfo=dt.UTC)
|
||||
)
|
||||
with patch("django.utils.timezone.now", return_value=fixed_now):
|
||||
self.assertIn(app, list(rule._get_newly_due_objects()))
|
||||
|
||||
# Previous review opened 29 days ago (May 17), new review is NOT due
|
||||
LifecycleIteration.objects.filter(pk=iteration.pk).update(
|
||||
opened_on=dt.datetime(2025, 5, 17, 12, 0, 0, tzinfo=dt.UTC)
|
||||
)
|
||||
with patch("django.utils.timezone.now", return_value=fixed_now):
|
||||
self.assertNotIn(app, list(rule._get_newly_due_objects()))
|
||||
|
||||
def test_apply_overdue_at_boundary(self):
|
||||
"""apply() marks iteration overdue when grace period just expired,
|
||||
regardless of what time the daily task runs."""
|
||||
_, rule, iteration = self._create_rule_and_iteration(
|
||||
grace_period="days=1", interval="days=365"
|
||||
)
|
||||
opened_on = dt.datetime(2025, 6, 14, 20, 0, 0, tzinfo=dt.UTC)
|
||||
for task_time in [
|
||||
dt.datetime(2025, 6, 15, 0, 0, 1, tzinfo=dt.UTC),
|
||||
dt.datetime(2025, 6, 15, 23, 59, 59, tzinfo=dt.UTC),
|
||||
]:
|
||||
with self.subTest(task_time=task_time):
|
||||
LifecycleIteration.objects.filter(pk=iteration.pk).update(
|
||||
opened_on=opened_on, state=ReviewState.PENDING
|
||||
)
|
||||
with patch("django.utils.timezone.now", return_value=task_time):
|
||||
rule.apply()
|
||||
iteration.refresh_from_db()
|
||||
self.assertEqual(iteration.state, ReviewState.OVERDUE)
|
||||
@@ -1,11 +0,0 @@
|
||||
"""API URLs"""
|
||||
|
||||
from authentik.enterprise.lifecycle.api.iterations import IterationViewSet
|
||||
from authentik.enterprise.lifecycle.api.reviews import ReviewViewSet
|
||||
from authentik.enterprise.lifecycle.api.rules import LifecycleRuleViewSet
|
||||
|
||||
api_urlpatterns = [
|
||||
("lifecycle/iterations", IterationViewSet),
|
||||
("lifecycle/reviews", ReviewViewSet),
|
||||
("lifecycle/rules", LifecycleRuleViewSet),
|
||||
]
|
||||
@@ -1,75 +0,0 @@
|
||||
from datetime import datetime
|
||||
from urllib import parse
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db.models import Model
|
||||
from django.urls import reverse
|
||||
from rest_framework.serializers import ChoiceField, Serializer, UUIDField
|
||||
|
||||
from authentik.core.api.utils import ModelSerializer
|
||||
from authentik.core.models import Application, Group, User
|
||||
from authentik.rbac.models import Role
|
||||
|
||||
|
||||
def parse_content_type(value: str) -> dict:
|
||||
app_label, model = value.split(".")
|
||||
return {"app_label": app_label, "model": model}
|
||||
|
||||
|
||||
def model_choices() -> list[tuple[str, str]]:
|
||||
return [
|
||||
("authentik_core.application", "Application"),
|
||||
("authentik_core.group", "Group"),
|
||||
("authentik_rbac.role", "Role"),
|
||||
]
|
||||
|
||||
|
||||
def admin_link_for_model(model: Model) -> str:
|
||||
if isinstance(model, Application):
|
||||
url = f"/core/applications/{model.slug}"
|
||||
elif isinstance(model, Group):
|
||||
url = f"/identity/groups/{model.pk}"
|
||||
elif isinstance(model, Role):
|
||||
url = f"/identity/roles/{model.pk}"
|
||||
else:
|
||||
raise TypeError("Unsupported model")
|
||||
return url + ";" + parse.quote('{"page":"page-lifecycle"}')
|
||||
|
||||
|
||||
def link_for_model(model: Model) -> str:
|
||||
return f"{reverse("authentik_core:if-admin")}#{admin_link_for_model(model)}"
|
||||
|
||||
|
||||
def start_of_day(dt: datetime) -> datetime:
|
||||
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
|
||||
|
||||
class ContentTypeField(ChoiceField):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(choices=model_choices(), **kwargs)
|
||||
|
||||
def to_representation(self, content_type: ContentType) -> str:
|
||||
return f"{content_type.app_label}.{content_type.model}"
|
||||
|
||||
def to_internal_value(self, data: str) -> ContentType:
|
||||
return ContentType.objects.get(**parse_content_type(data))
|
||||
|
||||
|
||||
class GenericForeignKeySerializer(Serializer):
|
||||
content_type = ContentTypeField()
|
||||
object_id = UUIDField()
|
||||
|
||||
|
||||
class ReviewerGroupSerializer(ModelSerializer):
|
||||
class Meta:
|
||||
model = Group
|
||||
fields = [
|
||||
"pk",
|
||||
"name",
|
||||
]
|
||||
|
||||
|
||||
class ReviewerUserSerializer(ModelSerializer):
|
||||
class Meta:
|
||||
model = User
|
||||
fields = ["pk", "uuid", "username", "name"]
|
||||
@@ -135,21 +135,21 @@ class GoogleWorkspaceProvider(OutgoingSyncProvider, BackchannelProvider):
|
||||
return GoogleWorkspaceGroupClient(self)
|
||||
raise ValueError(f"Invalid model {model}")
|
||||
|
||||
def get_object_qs(self, type: type[User | Group], **kwargs) -> QuerySet[User | Group]:
|
||||
def get_object_qs(self, type: type[User | Group]) -> QuerySet[User | Group]:
|
||||
if type == User:
|
||||
# Get queryset of all users with consistent ordering
|
||||
# according to the provider's settings
|
||||
base = User.objects.all().exclude_anonymous().filter(**kwargs)
|
||||
base = User.objects.all().exclude_anonymous()
|
||||
if self.exclude_users_service_account:
|
||||
base = base.exclude(type=UserTypes.SERVICE_ACCOUNT).exclude(
|
||||
type=UserTypes.INTERNAL_SERVICE_ACCOUNT
|
||||
)
|
||||
if self.filter_group:
|
||||
base = base.filter(groups__in=[self.filter_group])
|
||||
base = base.filter(ak_groups__in=[self.filter_group])
|
||||
return base.order_by("pk")
|
||||
if type == Group:
|
||||
# Get queryset of all groups with consistent ordering
|
||||
return Group.objects.all().filter(**kwargs).order_by("pk")
|
||||
return Group.objects.all().order_by("pk")
|
||||
raise ValueError(f"Invalid type {type}")
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -78,8 +78,7 @@ class MicrosoftEntraUserClient(MicrosoftEntraSyncClient[User, MicrosoftEntraProv
|
||||
def create(self, user: User):
|
||||
"""Create user from scratch and create a connection object"""
|
||||
microsoft_user = self.to_schema(user, None)
|
||||
if microsoft_user.user_principal_name:
|
||||
self.check_email_valid(microsoft_user.user_principal_name)
|
||||
self.check_email_valid(microsoft_user.user_principal_name)
|
||||
with transaction.atomic():
|
||||
try:
|
||||
response = self._request(self.client.users.post(microsoft_user))
|
||||
@@ -119,8 +118,7 @@ class MicrosoftEntraUserClient(MicrosoftEntraSyncClient[User, MicrosoftEntraProv
|
||||
def update(self, user: User, connection: MicrosoftEntraProviderUser):
|
||||
"""Update existing user"""
|
||||
microsoft_user = self.to_schema(user, connection)
|
||||
if microsoft_user.user_principal_name:
|
||||
self.check_email_valid(microsoft_user.user_principal_name)
|
||||
self.check_email_valid(microsoft_user.user_principal_name)
|
||||
response = self._request(
|
||||
self.client.users.by_user_id(connection.microsoft_id).patch(microsoft_user)
|
||||
)
|
||||
|
||||
@@ -124,21 +124,21 @@ class MicrosoftEntraProvider(OutgoingSyncProvider, BackchannelProvider):
|
||||
return MicrosoftEntraGroupClient(self)
|
||||
raise ValueError(f"Invalid model {model}")
|
||||
|
||||
def get_object_qs(self, type: type[User | Group], **kwargs) -> QuerySet[User | Group]:
|
||||
def get_object_qs(self, type: type[User | Group]) -> QuerySet[User | Group]:
|
||||
if type == User:
|
||||
# Get queryset of all users with consistent ordering
|
||||
# according to the provider's settings
|
||||
base = User.objects.all().exclude_anonymous().filter(**kwargs)
|
||||
base = User.objects.all().exclude_anonymous()
|
||||
if self.exclude_users_service_account:
|
||||
base = base.exclude(type=UserTypes.SERVICE_ACCOUNT).exclude(
|
||||
type=UserTypes.INTERNAL_SERVICE_ACCOUNT
|
||||
)
|
||||
if self.filter_group:
|
||||
base = base.filter(groups__in=[self.filter_group])
|
||||
base = base.filter(ak_groups__in=[self.filter_group])
|
||||
return base.order_by("pk")
|
||||
if type == Group:
|
||||
# Get queryset of all groups with consistent ordering
|
||||
return Group.objects.all().filter(**kwargs).order_by("pk")
|
||||
return Group.objects.all().order_by("pk")
|
||||
raise ValueError(f"Invalid type {type}")
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -43,7 +43,7 @@ def ssf_providers_post_save(sender: type[Model], instance: SSFProvider, created:
|
||||
"path": USER_PATH_PROVIDERS_SSF,
|
||||
},
|
||||
)
|
||||
user.assign_perms_to_managed_role("authentik_providers_ssf.add_stream", instance)
|
||||
user.assign_perms_to_managed_role("add_stream", instance)
|
||||
token, token_created = Token.objects.update_or_create(
|
||||
identifier=identifier,
|
||||
defaults={
|
||||
|
||||
@@ -1,86 +0,0 @@
|
||||
"""WSFederationProvider API Views"""
|
||||
|
||||
from django.http import HttpRequest
|
||||
from django.urls import reverse
|
||||
from rest_framework.fields import CharField, SerializerMethodField, URLField
|
||||
|
||||
from authentik.core.api.providers import ProviderSerializer
|
||||
from authentik.core.models import Provider
|
||||
from authentik.enterprise.api import EnterpriseRequiredMixin
|
||||
from authentik.enterprise.providers.ws_federation.models import WSFederationProvider
|
||||
from authentik.enterprise.providers.ws_federation.processors.metadata import MetadataProcessor
|
||||
from authentik.providers.saml.api.providers import SAMLProviderSerializer, SAMLProviderViewSet
|
||||
|
||||
|
||||
class WSFederationProviderSerializer(EnterpriseRequiredMixin, SAMLProviderSerializer):
|
||||
"""WSFederationProvider Serializer"""
|
||||
|
||||
reply_url = URLField(source="acs_url")
|
||||
wtrealm = CharField(source="audience")
|
||||
url_wsfed = SerializerMethodField()
|
||||
|
||||
def get_url_download_metadata(self, instance: WSFederationProvider) -> str:
|
||||
"""Get metadata download URL"""
|
||||
if "request" not in self._context:
|
||||
return ""
|
||||
request: HttpRequest = self._context["request"]._request
|
||||
try:
|
||||
return request.build_absolute_uri(
|
||||
reverse(
|
||||
"authentik_providers_ws_federation:metadata-download",
|
||||
kwargs={"application_slug": instance.application.slug},
|
||||
)
|
||||
)
|
||||
except Provider.application.RelatedObjectDoesNotExist:
|
||||
return request.build_absolute_uri(
|
||||
reverse(
|
||||
"authentik_api:wsfederationprovider-metadata",
|
||||
kwargs={
|
||||
"pk": instance.pk,
|
||||
},
|
||||
)
|
||||
+ "?download"
|
||||
)
|
||||
|
||||
def get_url_wsfed(self, instance: WSFederationProvider) -> str:
|
||||
"""Get WS-Fed url"""
|
||||
if "request" not in self._context:
|
||||
return ""
|
||||
request: HttpRequest = self._context["request"]._request
|
||||
return request.build_absolute_uri(reverse("authentik_providers_ws_federation:wsfed"))
|
||||
|
||||
class Meta(SAMLProviderSerializer.Meta):
|
||||
model = WSFederationProvider
|
||||
fields = ProviderSerializer.Meta.fields + [
|
||||
"reply_url",
|
||||
"wtrealm",
|
||||
"assertion_valid_not_before",
|
||||
"assertion_valid_not_on_or_after",
|
||||
"session_valid_not_on_or_after",
|
||||
"property_mappings",
|
||||
"name_id_mapping",
|
||||
"authn_context_class_ref_mapping",
|
||||
"digest_algorithm",
|
||||
"signature_algorithm",
|
||||
"signing_kp",
|
||||
"encryption_kp",
|
||||
"sign_assertion",
|
||||
"sign_logout_request",
|
||||
"default_name_id_policy",
|
||||
"url_download_metadata",
|
||||
"url_wsfed",
|
||||
]
|
||||
extra_kwargs = ProviderSerializer.Meta.extra_kwargs
|
||||
|
||||
|
||||
class WSFederationProviderViewSet(SAMLProviderViewSet):
|
||||
"""WSFederationProvider Viewset"""
|
||||
|
||||
queryset = WSFederationProvider.objects.all()
|
||||
serializer_class = WSFederationProviderSerializer
|
||||
filterset_fields = "__all__"
|
||||
ordering = ["name"]
|
||||
search_fields = ["name"]
|
||||
|
||||
metadata_generator_class = MetadataProcessor
|
||||
import_metadata = None
|
||||
@@ -1,13 +0,0 @@
|
||||
"""WSFed app config"""
|
||||
|
||||
from authentik.enterprise.apps import EnterpriseConfig
|
||||
|
||||
|
||||
class AuthentikEnterpriseProviderWSFederatopm(EnterpriseConfig):
|
||||
"""authentik enterprise ws federation app config"""
|
||||
|
||||
name = "authentik.enterprise.providers.ws_federation"
|
||||
label = "authentik_providers_ws_federation"
|
||||
verbose_name = "authentik Enterprise.Providers.WS-Federation"
|
||||
default = True
|
||||
mountpoint = "application/wsfed/"
|
||||
@@ -1,37 +0,0 @@
|
||||
# Generated by Django 5.2.10 on 2026-01-18 23:25
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
("authentik_providers_saml", "0020_samlprovider_logout_method_and_more"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="WSFederationProvider",
|
||||
fields=[
|
||||
(
|
||||
"samlprovider_ptr",
|
||||
models.OneToOneField(
|
||||
auto_created=True,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
parent_link=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
to="authentik_providers_saml.samlprovider",
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
"verbose_name": "WS-Federation Provider",
|
||||
"verbose_name_plural": "WS-Federation Providers",
|
||||
},
|
||||
bases=("authentik_providers_saml.samlprovider",),
|
||||
),
|
||||
]
|
||||
@@ -1,36 +0,0 @@
|
||||
from django.templatetags.static import static
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from rest_framework.serializers import Serializer
|
||||
|
||||
from authentik.providers.saml.models import SAMLProvider
|
||||
|
||||
|
||||
class WSFederationProvider(SAMLProvider):
|
||||
"""WS-Federation for applications which support WS-Fed."""
|
||||
|
||||
# Alias'd fields:
|
||||
# - acs_url -> reply_url
|
||||
# - audience -> realm / wtrealm
|
||||
|
||||
@property
|
||||
def serializer(self) -> type[Serializer]:
|
||||
from authentik.enterprise.providers.ws_federation.api.providers import (
|
||||
WSFederationProviderSerializer,
|
||||
)
|
||||
|
||||
return WSFederationProviderSerializer
|
||||
|
||||
@property
|
||||
def icon_url(self) -> str | None:
|
||||
return static("authentik/sources/wsfed.svg")
|
||||
|
||||
@property
|
||||
def component(self) -> str:
|
||||
return "ak-provider-wsfed-form"
|
||||
|
||||
def __str__(self):
|
||||
return f"WS-Federation Provider {self.name}"
|
||||
|
||||
class Meta:
|
||||
verbose_name = _("WS-Federation Provider")
|
||||
verbose_name_plural = _("WS-Federation Providers")
|
||||
@@ -1,39 +0,0 @@
|
||||
from authentik.common.saml.constants import NS_MAP as _map
|
||||
|
||||
WS_FED_ACTION_SIGN_IN = "wsignin1.0"
|
||||
WS_FED_ACTION_SIGN_OUT = "wsignout1.0"
|
||||
WS_FED_ACTION_SIGN_OUT_CLEANUP = "wsignoutcleanup1.0"
|
||||
|
||||
WS_FED_POST_KEY_ACTION = "wa"
|
||||
WS_FED_POST_KEY_RESULT = "wresult"
|
||||
WS_FED_POST_KEY_CONTEXT = "wctx"
|
||||
|
||||
WSS_TOKEN_TYPE_SAML2 = (
|
||||
"http://docs.oasis-open.org/wss/oasis-wss-saml-token-profile-1.1#SAMLV2.0" # nosec
|
||||
)
|
||||
WSS_KEY_IDENTIFIER_SAML_ID = (
|
||||
"http://docs.oasis-open.org/wss/oasis-wss-saml-token-profile-1.1#SAMLID"
|
||||
)
|
||||
|
||||
NS_WS_FED_PROTOCOL = "http://docs.oasis-open.org/wsfed/federation/200706"
|
||||
NS_WS_FED_TRUST = "http://schemas.xmlsoap.org/ws/2005/02/trust"
|
||||
NS_WSI = "http://www.w3.org/2001/XMLSchema-instance"
|
||||
NS_ADDRESSING = "http://www.w3.org/2005/08/addressing"
|
||||
NS_POLICY = "http://schemas.xmlsoap.org/ws/2004/09/policy"
|
||||
NS_WSS_SEC = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"
|
||||
NS_WSS_UTILITY = (
|
||||
"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"
|
||||
)
|
||||
NS_WSS_D3P1 = "http://docs.oasis-open.org/wss/oasis-wss-wssecurity-secext-1.1.xsd"
|
||||
|
||||
NS_MAP = {
|
||||
**_map,
|
||||
"fed": NS_WS_FED_PROTOCOL,
|
||||
"xsi": NS_WSI,
|
||||
"wsa": NS_ADDRESSING,
|
||||
"t": NS_WS_FED_TRUST,
|
||||
"wsu": NS_WSS_UTILITY,
|
||||
"wsp": NS_POLICY,
|
||||
"wssec": NS_WSS_SEC,
|
||||
"d3p1": NS_WSS_D3P1,
|
||||
}
|
||||
@@ -1,40 +0,0 @@
|
||||
from django.urls import reverse
|
||||
from lxml.etree import SubElement, _Element # nosec
|
||||
|
||||
from authentik.common.saml.constants import NS_SAML_METADATA
|
||||
from authentik.enterprise.providers.ws_federation.processors.constants import (
|
||||
NS_ADDRESSING,
|
||||
NS_MAP,
|
||||
NS_WS_FED_PROTOCOL,
|
||||
NS_WSI,
|
||||
)
|
||||
from authentik.providers.saml.processors.metadata import MetadataProcessor as BaseMetadataProcessor
|
||||
|
||||
|
||||
class MetadataProcessor(BaseMetadataProcessor):
|
||||
def add_children(self, entity_descriptor: _Element):
|
||||
self.add_role_descriptor_sts(entity_descriptor)
|
||||
super().add_children(entity_descriptor)
|
||||
|
||||
def add_endpoint(self, parent: _Element, name: str):
|
||||
endpoint = SubElement(parent, f"{{{NS_WS_FED_PROTOCOL}}}{name}", nsmap=NS_MAP)
|
||||
endpoint_ref = SubElement(endpoint, f"{{{NS_ADDRESSING}}}EndpointReference", nsmap=NS_MAP)
|
||||
|
||||
address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address", nsmap=NS_MAP)
|
||||
address.text = self.http_request.build_absolute_uri(
|
||||
reverse("authentik_providers_ws_federation:wsfed")
|
||||
)
|
||||
|
||||
def add_role_descriptor_sts(self, entity_descriptor: _Element):
|
||||
role_descriptor = SubElement(
|
||||
entity_descriptor, f"{{{NS_SAML_METADATA}}}RoleDescriptor", nsmap=NS_MAP
|
||||
)
|
||||
role_descriptor.attrib[f"{{{NS_WSI}}}type"] = "fed:SecurityTokenServiceType"
|
||||
role_descriptor.attrib["protocolSupportEnumeration"] = NS_WS_FED_PROTOCOL
|
||||
|
||||
signing_descriptor = self.get_signing_key_descriptor()
|
||||
if signing_descriptor is not None:
|
||||
role_descriptor.append(signing_descriptor)
|
||||
|
||||
self.add_endpoint(role_descriptor, "SecurityTokenServiceEndpoint")
|
||||
self.add_endpoint(role_descriptor, "PassiveRequestorEndpoint")
|
||||
@@ -1,162 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from django.http import HttpRequest
|
||||
from django.shortcuts import get_object_or_404
|
||||
from lxml import etree # nosec
|
||||
from lxml.etree import Element, SubElement, _Element # nosec
|
||||
|
||||
from authentik.core.models import Application
|
||||
from authentik.enterprise.providers.ws_federation.models import WSFederationProvider
|
||||
from authentik.enterprise.providers.ws_federation.processors.constants import (
|
||||
NS_ADDRESSING,
|
||||
NS_MAP,
|
||||
NS_POLICY,
|
||||
NS_WS_FED_TRUST,
|
||||
NS_WSS_D3P1,
|
||||
NS_WSS_SEC,
|
||||
NS_WSS_UTILITY,
|
||||
WS_FED_ACTION_SIGN_IN,
|
||||
WS_FED_POST_KEY_ACTION,
|
||||
WS_FED_POST_KEY_CONTEXT,
|
||||
WS_FED_POST_KEY_RESULT,
|
||||
WSS_KEY_IDENTIFIER_SAML_ID,
|
||||
WSS_TOKEN_TYPE_SAML2,
|
||||
)
|
||||
from authentik.lib.utils.time import timedelta_from_string
|
||||
from authentik.policies.utils import delete_none_values
|
||||
from authentik.providers.saml.processors.assertion import AssertionProcessor
|
||||
from authentik.providers.saml.processors.authn_request_parser import AuthNRequest
|
||||
from authentik.providers.saml.utils.time import get_time_string
|
||||
|
||||
|
||||
@dataclass()
|
||||
class SignInRequest:
|
||||
wa: str
|
||||
wtrealm: str
|
||||
wreply: str
|
||||
wctx: str | None
|
||||
|
||||
@staticmethod
|
||||
def parse(request: HttpRequest) -> SignInRequest:
|
||||
action = request.GET.get("wa")
|
||||
if action != WS_FED_ACTION_SIGN_IN:
|
||||
raise ValueError("Invalid action")
|
||||
realm = request.GET.get("wtrealm")
|
||||
if not realm:
|
||||
raise ValueError("Missing Realm")
|
||||
|
||||
req = SignInRequest(
|
||||
wa=action,
|
||||
wtrealm=realm,
|
||||
wreply=request.GET.get("wreply"),
|
||||
wctx=request.GET.get("wctx", ""),
|
||||
)
|
||||
|
||||
_, provider = req.get_app_provider()
|
||||
if not req.wreply:
|
||||
req.wreply = provider.acs_url
|
||||
if not req.wreply.startswith(provider.acs_url):
|
||||
raise ValueError("Invalid wreply")
|
||||
return req
|
||||
|
||||
def get_app_provider(self):
|
||||
provider: WSFederationProvider = get_object_or_404(
|
||||
WSFederationProvider, audience=self.wtrealm
|
||||
)
|
||||
application = get_object_or_404(Application, provider=provider)
|
||||
return application, provider
|
||||
|
||||
|
||||
class SignInProcessor:
|
||||
provider: WSFederationProvider
|
||||
request: HttpRequest
|
||||
sign_in_request: SignInRequest
|
||||
saml_processor: AssertionProcessor
|
||||
|
||||
def __init__(
|
||||
self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest
|
||||
):
|
||||
self.provider = provider
|
||||
self.request = request
|
||||
self.sign_in_request = sign_in_request
|
||||
self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest())
|
||||
self.saml_processor.provider.audience = self.sign_in_request.wtrealm
|
||||
if self.provider.signing_kp:
|
||||
self.saml_processor.provider.sign_assertion = True
|
||||
|
||||
def create_response_token(self):
|
||||
root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP)
|
||||
|
||||
root.append(self.response_add_lifetime())
|
||||
root.append(self.response_add_applies_to())
|
||||
root.append(self.response_add_requested_security_token())
|
||||
root.append(
|
||||
self.response_add_attached_reference(
|
||||
"RequestedAttachedReference", self.saml_processor._assertion_id
|
||||
)
|
||||
)
|
||||
root.append(
|
||||
self.response_add_attached_reference(
|
||||
"RequestedUnattachedReference", self.saml_processor._assertion_id
|
||||
)
|
||||
)
|
||||
|
||||
token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType")
|
||||
token_type.text = WSS_TOKEN_TYPE_SAML2
|
||||
|
||||
request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType")
|
||||
request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue"
|
||||
|
||||
key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType")
|
||||
key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey"
|
||||
|
||||
return root
|
||||
|
||||
def response_add_lifetime(self) -> _Element:
|
||||
"""Add Lifetime element"""
|
||||
lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP)
|
||||
created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created")
|
||||
created.text = get_time_string()
|
||||
expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires")
|
||||
expires.text = get_time_string(
|
||||
timedelta_from_string(self.provider.session_valid_not_on_or_after)
|
||||
)
|
||||
return lifetime
|
||||
|
||||
def response_add_applies_to(self) -> _Element:
|
||||
"""Add AppliesTo element"""
|
||||
applies_to = Element(f"{{{NS_POLICY}}}AppliesTo")
|
||||
endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference")
|
||||
address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address")
|
||||
address.text = self.sign_in_request.wtrealm
|
||||
return applies_to
|
||||
|
||||
def response_add_requested_security_token(self) -> _Element:
|
||||
"""Add RequestedSecurityToken and child assertion"""
|
||||
token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken")
|
||||
token.append(self.saml_processor.get_assertion())
|
||||
return token
|
||||
|
||||
def response_add_attached_reference(self, tag: str, value: str) -> _Element:
|
||||
ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}")
|
||||
sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference")
|
||||
sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2
|
||||
|
||||
key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier")
|
||||
key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID
|
||||
key_identifier.text = value
|
||||
return ref
|
||||
|
||||
def response(self) -> dict[str, str]:
|
||||
root = self.create_response_token()
|
||||
assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
|
||||
if self.provider.signing_kp:
|
||||
self.saml_processor._sign(assertion)
|
||||
str_token = etree.tostring(root).decode("utf-8") # nosec
|
||||
return delete_none_values(
|
||||
{
|
||||
WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN,
|
||||
WS_FED_POST_KEY_RESULT: str_token,
|
||||
WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx,
|
||||
}
|
||||
)
|
||||
@@ -1,44 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from django.http import HttpRequest
|
||||
from django.shortcuts import get_object_or_404
|
||||
|
||||
from authentik.core.models import Application
|
||||
from authentik.enterprise.providers.ws_federation.models import WSFederationProvider
|
||||
from authentik.enterprise.providers.ws_federation.processors.constants import WS_FED_ACTION_SIGN_OUT
|
||||
|
||||
|
||||
@dataclass()
|
||||
class SignOutRequest:
|
||||
wa: str
|
||||
wtrealm: str
|
||||
wreply: str
|
||||
|
||||
@staticmethod
|
||||
def parse(request: HttpRequest) -> SignOutRequest:
|
||||
action = request.GET.get("wa")
|
||||
if action != WS_FED_ACTION_SIGN_OUT:
|
||||
raise ValueError("Invalid action")
|
||||
realm = request.GET.get("wtrealm")
|
||||
if not realm:
|
||||
raise ValueError("Missing Realm")
|
||||
|
||||
req = SignOutRequest(
|
||||
wa=action,
|
||||
wtrealm=realm,
|
||||
wreply=request.GET.get("wreply"),
|
||||
)
|
||||
|
||||
_, provider = req.get_app_provider()
|
||||
if not req.wreply:
|
||||
req.wreply = provider.acs_url
|
||||
if not req.wreply.startswith(provider.acs_url):
|
||||
raise ValueError("Invalid wreply")
|
||||
return req
|
||||
|
||||
def get_app_provider(self):
|
||||
provider: WSFederationProvider = get_object_or_404(
|
||||
WSFederationProvider, audience=self.wtrealm
|
||||
)
|
||||
application = get_object_or_404(Application, provider=provider)
|
||||
return application, provider
|
||||
@@ -1,93 +0,0 @@
|
||||
"""WS-Fed Provider signals"""
|
||||
|
||||
from urllib.parse import urlencode, urlparse, urlunparse
|
||||
|
||||
from django.dispatch import receiver
|
||||
from django.http import HttpRequest
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from structlog.stdlib import get_logger
|
||||
|
||||
from authentik.core.models import AuthenticatedSession, User
|
||||
from authentik.enterprise.providers.ws_federation.models import WSFederationProvider
|
||||
from authentik.enterprise.providers.ws_federation.processors.constants import (
|
||||
WS_FED_ACTION_SIGN_OUT_CLEANUP,
|
||||
WS_FED_POST_KEY_ACTION,
|
||||
)
|
||||
from authentik.flows.models import in_memory_stage
|
||||
from authentik.flows.views.executor import FlowExecutorView
|
||||
from authentik.providers.iframe_logout import IframeLogoutStageView
|
||||
from authentik.providers.saml.models import SAMLBindings, SAMLSession
|
||||
from authentik.providers.saml.views.flows import (
|
||||
PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS,
|
||||
PLAN_CONTEXT_SAML_RELAY_STATE,
|
||||
)
|
||||
from authentik.stages.user_logout.models import UserLogoutStage
|
||||
from authentik.stages.user_logout.stage import flow_pre_user_logout
|
||||
|
||||
LOGGER = get_logger()
|
||||
|
||||
|
||||
@receiver(flow_pre_user_logout)
|
||||
def handle_ws_fed_iframe_pre_user_logout(
|
||||
sender, request: HttpRequest, user: User, executor: FlowExecutorView, **kwargs
|
||||
):
|
||||
"""Handle WS-Fed iframe logout when user logs out via flow"""
|
||||
|
||||
# Only proceed if this is actually a UserLogoutStage
|
||||
if not isinstance(executor.current_stage, UserLogoutStage):
|
||||
return
|
||||
|
||||
if not user.is_authenticated:
|
||||
return
|
||||
|
||||
auth_session = AuthenticatedSession.from_request(request, user)
|
||||
if not auth_session:
|
||||
return
|
||||
|
||||
wsfed_sessions = SAMLSession.objects.filter(
|
||||
session=auth_session,
|
||||
user=user,
|
||||
expires__gt=timezone.now(),
|
||||
expiring=True,
|
||||
# Only get WS-Federation provider sessions
|
||||
provider__wsfederationprovider__isnull=False,
|
||||
).select_related("provider__wsfederationprovider")
|
||||
|
||||
if not wsfed_sessions.exists():
|
||||
LOGGER.debug("No sessions requiring IFrame frontchannel logout")
|
||||
return
|
||||
|
||||
saml_sessions = []
|
||||
|
||||
relay_state = request.build_absolute_uri(
|
||||
reverse("authentik_core:if-flow", kwargs={"flow_slug": executor.flow.slug})
|
||||
)
|
||||
|
||||
# Store return URL in plan context as fallback if SP doesn't echo relay_state
|
||||
executor.plan.context[PLAN_CONTEXT_SAML_RELAY_STATE] = relay_state
|
||||
|
||||
for session in wsfed_sessions:
|
||||
provider: WSFederationProvider = session.provider.wsfederationprovider
|
||||
parts = urlparse(str(provider.acs_url))
|
||||
parts = parts._replace(
|
||||
query=urlencode({WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_OUT_CLEANUP})
|
||||
)
|
||||
logout_data = {
|
||||
"url": urlunparse(parts),
|
||||
"provider_name": provider.name,
|
||||
"binding": SAMLBindings.REDIRECT,
|
||||
}
|
||||
|
||||
saml_sessions.append(logout_data)
|
||||
|
||||
if saml_sessions:
|
||||
executor.plan.context[PLAN_CONTEXT_SAML_LOGOUT_IFRAME_SESSIONS] = saml_sessions
|
||||
# Stage already exists, don't reinject it
|
||||
if not any(
|
||||
binding.stage.view == IframeLogoutStageView for binding in executor.plan.bindings
|
||||
):
|
||||
iframe_stage = in_memory_stage(IframeLogoutStageView)
|
||||
executor.plan.insert_stage(iframe_stage, index=1)
|
||||
|
||||
LOGGER.debug("WSFed iframe sessions gathered")
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user