Compare commits

..

12 Commits

Author SHA1 Message Date
Teffen Ellis
dc794d927e Fix import path. 2025-12-06 18:11:31 +01:00
Teffen Ellis
01b91e9dd1 Add ESLint rule. 2025-12-06 06:44:52 +01:00
Teffen Ellis
6efb3f5fb0 web: Enforce code organization. 2025-12-06 06:44:41 +01:00
Teffen Ellis
4b24a7ddc1 Replace esbuild-copy-plugin with fs module. 2025-12-06 06:40:58 +01:00
Teffen Ellis
b6b423bee9 Remove unused. 2025-12-06 06:40:58 +01:00
Teffen Ellis
860a43e56a Fix define before use. 2025-12-06 06:40:57 +01:00
Teffen Ellis
c1cbd0623c Fix unused parameters. 2025-12-06 06:40:57 +01:00
Teffen Ellis
0893031cab Fix unnamed functions. 2025-12-06 05:48:42 +01:00
Teffen Ellis
89868276da Fix empty functions 2025-12-06 05:48:42 +01:00
Teffen Ellis
dd40f60c88 Fix ts ignore comments. 2025-12-06 05:48:41 +01:00
Teffen Ellis
73be48c179 Fix linter. 2025-12-06 05:48:41 +01:00
Teffen Ellis
fc27dfeeb7 Fix config. 2025-12-06 05:48:29 +01:00
582 changed files with 12352 additions and 154466 deletions

View File

@@ -21,7 +21,7 @@ runs:
sudo apt-get install --no-install-recommends -y libpq-dev openssl libxmlsec1-dev pkg-config gettext libkrb5-dev krb5-kdc krb5-user krb5-admin-server
- name: Install uv
if: ${{ contains(inputs.dependencies, 'python') }}
uses: astral-sh/setup-uv@681c641aba71e4a1c380be3ab5e12ad51f415867 # v5
uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v5
with:
enable-cache: true
- name: Setup python

View File

@@ -8,7 +8,7 @@ inputs:
runs:
using: "composite"
steps:
- uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5
- uses: codecov/codecov-action@5a1091511ad55cbe89839c7260b706298ca349f7 # v5
with:
flags: ${{ inputs.flags }}
use_oidc: true

View File

@@ -2,10 +2,6 @@
👋 Hi there! Welcome.
Please check the Contributing guidelines: https://docs.goauthentik.io/docs/developer-docs/#how-can-i-contribute
⚠️ IMPORTANT: Make sure you are opening this PR from a FEATURE BRANCH, not from your main branch!
If you opened this PR from your main branch, please close it and create a new feature branch instead.
For more information, see: https://docs.goauthentik.io/developer-docs/contributing/#always-use-feature-branches
-->
## Details

View File

@@ -73,12 +73,14 @@ jobs:
mkdir -p ./gen-ts-api
mkdir -p ./gen-go-api
- name: Setup node
if: ${{ !inputs.release }}
uses: actions/setup-node@395ad3262231945c25e8478fd5baf05154b1d79f # v5
with:
node-version-file: web/package.json
cache: "npm"
cache-dependency-path: web/package-lock.json
- name: generate ts client
if: ${{ !inputs.release }}
run: make gen-client-ts
- name: Build Docker Image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # v6

View File

@@ -18,7 +18,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- id: generate_token
uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2
uses: actions/create-github-app-token@7e473efe3cb98aa54f8d4bac15400b15fad77d94 # v2
with:
app-id: ${{ secrets.GH_APP_ID }}
private-key: ${{ secrets.GH_APP_PRIVATE_KEY }}
@@ -46,7 +46,7 @@ jobs:
run: |
export VERSION=`node -e 'console.log(require("../gen-ts-api/package.json").version)'`
npm i @goauthentik/api@$VERSION
- uses: peter-evans/create-pull-request@98357b18bf14b5342f975ff684046ec3b2a07725 # v7
- uses: peter-evans/create-pull-request@84ae59a2cdc2258d6fa0732dd66352dddae2a412 # v7
id: cpr
with:
token: ${{ steps.generate_token.outputs.token }}

View File

@@ -41,7 +41,7 @@ jobs:
- working-directory: website/
name: Install Dependencies
run: npm ci
- uses: actions/cache@9255dc7a253b0ccc959486e2bca901246202afeb # v4
- uses: actions/cache@0057852bfaa89a56745cba8c7296529d2fc39830 # v4
with:
path: |
${{ github.workspace }}/website/api/.docusaurus
@@ -55,7 +55,7 @@ jobs:
env:
NODE_ENV: production
run: npm run build -w api
- uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v4
- uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v4
with:
name: api-docs
path: website/api/build
@@ -67,7 +67,7 @@ jobs:
- build
steps:
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v5
- uses: actions/download-artifact@37930b1c2abaa49bbe596cd826c3c89aef350131 # v5
- uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v5
with:
name: api-docs
path: website/api/build

View File

@@ -201,7 +201,7 @@ jobs:
run: |
docker compose -f tests/e2e/docker-compose.yml up -d --quiet-pull
- id: cache-web
uses: actions/cache@9255dc7a253b0ccc959486e2bca901246202afeb # v4
uses: actions/cache@0057852bfaa89a56745cba8c7296529d2fc39830 # v4
with:
path: web/dist
key: ${{ runner.os }}-web-${{ hashFiles('web/package-lock.json', 'package-lock.json', 'web/src/**', 'web/packages/sfe/src/**') }}-b

View File

@@ -29,7 +29,7 @@ jobs:
github.event.pull_request.head.repo.full_name == github.repository)
steps:
- id: generate_token
uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2
uses: actions/create-github-app-token@7e473efe3cb98aa54f8d4bac15400b15fad77d94 # v2
with:
app-id: ${{ secrets.GH_APP_ID }}
private-key: ${{ secrets.GH_APP_PRIVATE_KEY }}
@@ -42,7 +42,7 @@ jobs:
with:
GITHUB_TOKEN: ${{ steps.generate_token.outputs.token }}
compressOnly: ${{ github.event_name != 'pull_request' }}
- uses: peter-evans/create-pull-request@98357b18bf14b5342f975ff684046ec3b2a07725 # v7
- uses: peter-evans/create-pull-request@84ae59a2cdc2258d6fa0732dd66352dddae2a412 # v7
if: "${{ github.event_name != 'pull_request' && steps.compress.outputs.markdown != '' }}"
id: cpr
with:

View File

@@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- id: generate_token
uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2
uses: actions/create-github-app-token@7e473efe3cb98aa54f8d4bac15400b15fad77d94 # v2
with:
app-id: ${{ secrets.GH_APP_ID }}
private-key: ${{ secrets.GH_APP_PRIVATE_KEY }}
@@ -26,7 +26,7 @@ jobs:
- name: Setup authentik env
uses: ./.github/actions/setup
- run: uv run ak update_webauthn_mds
- uses: peter-evans/create-pull-request@98357b18bf14b5342f975ff684046ec3b2a07725 # v7
- uses: peter-evans/create-pull-request@84ae59a2cdc2258d6fa0732dd66352dddae2a412 # v7
id: cpr
with:
token: ${{ steps.generate_token.outputs.token }}

View File

@@ -10,7 +10,7 @@ jobs:
steps:
- id: app-token
name: Generate app token
uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2
uses: actions/create-github-app-token@7e473efe3cb98aa54f8d4bac15400b15fad77d94 # v2
if: ${{ env.GH_APP_ID != '' }}
with:
app-id: ${{ secrets.GH_APP_ID }}

View File

@@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- id: generate_token
uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2
uses: actions/create-github-app-token@7e473efe3cb98aa54f8d4bac15400b15fad77d94 # v2
with:
app-id: ${{ secrets.GH_APP_ID }}
private-key: ${{ secrets.GH_APP_PRIVATE_KEY }}

View File

@@ -40,7 +40,7 @@ jobs:
registry-url: "https://registry.npmjs.org"
- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@e0021407031f5be11a464abee9a0776171c79891 # 24d32ffd492484c1d75e0c0b894501ddb9d30d62
uses: tj-actions/changed-files@24d32ffd492484c1d75e0c0b894501ddb9d30d62 # 24d32ffd492484c1d75e0c0b894501ddb9d30d62
with:
files: |
${{ matrix.package }}/package.json

View File

@@ -29,7 +29,7 @@ jobs:
steps:
- id: app-token
name: Generate app token
uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2
uses: actions/create-github-app-token@7e473efe3cb98aa54f8d4bac15400b15fad77d94 # v2
with:
app-id: ${{ secrets.GH_APP_ID }}
private-key: ${{ secrets.GH_APP_PRIVATE_KEY }}
@@ -57,7 +57,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- id: generate_token
uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2
uses: actions/create-github-app-token@7e473efe3cb98aa54f8d4bac15400b15fad77d94 # v2
with:
app-id: ${{ secrets.GH_APP_ID }}
private-key: ${{ secrets.GH_APP_PRIVATE_KEY }}
@@ -73,7 +73,7 @@ jobs:
- name: Bump version
run: "make bump version=${{ inputs.next_version }}.0-rc1"
- name: Create pull request
uses: peter-evans/create-pull-request@98357b18bf14b5342f975ff684046ec3b2a07725 # v7
uses: peter-evans/create-pull-request@84ae59a2cdc2258d6fa0732dd66352dddae2a412 # v7
with:
token: ${{ steps.generate_token.outputs.token }}
branch: release-bump-${{ inputs.next_version }}

View File

@@ -61,7 +61,7 @@ jobs:
steps:
- id: app-token
name: Generate app token
uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2
uses: actions/create-github-app-token@7e473efe3cb98aa54f8d4bac15400b15fad77d94 # v2
with:
app-id: ${{ secrets.GH_APP_ID }}
private-key: ${{ secrets.GH_APP_PRIVATE_KEY }}
@@ -108,7 +108,7 @@ jobs:
steps:
- id: app-token
name: Generate app token
uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2
uses: actions/create-github-app-token@7e473efe3cb98aa54f8d4bac15400b15fad77d94 # v2
with:
app-id: ${{ secrets.GH_APP_ID }}
private-key: ${{ secrets.GH_APP_PRIVATE_KEY }}
@@ -130,7 +130,7 @@ jobs:
sed -E -i 's/[0-9]{4}\.[0-9]{1,2}\.[0-9]+$/${{ inputs.version }}/' charts/authentik/Chart.yaml
./scripts/helm-docs.sh
- name: Create pull request
uses: peter-evans/create-pull-request@98357b18bf14b5342f975ff684046ec3b2a07725 # v7
uses: peter-evans/create-pull-request@84ae59a2cdc2258d6fa0732dd66352dddae2a412 # v7
with:
token: "${{ steps.app-token.outputs.token }}"
branch: bump-${{ inputs.version }}
@@ -150,7 +150,7 @@ jobs:
steps:
- id: app-token
name: Generate app token
uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2
uses: actions/create-github-app-token@7e473efe3cb98aa54f8d4bac15400b15fad77d94 # v2
with:
app-id: ${{ secrets.GH_APP_ID }}
private-key: ${{ secrets.GH_APP_PRIVATE_KEY }}
@@ -185,7 +185,7 @@ jobs:
'.stable.version = $version | .stable.changelog = $changelog | .stable.changelog_url = $changelog_url' version.json > version.new.json
mv version.new.json version.json
- name: Create pull request
uses: peter-evans/create-pull-request@98357b18bf14b5342f975ff684046ec3b2a07725 # v7
uses: peter-evans/create-pull-request@84ae59a2cdc2258d6fa0732dd66352dddae2a412 # v7
with:
token: "${{ steps.app-token.outputs.token }}"
branch: bump-${{ inputs.version }}

View File

@@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- id: generate_token
uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2
uses: actions/create-github-app-token@7e473efe3cb98aa54f8d4bac15400b15fad77d94 # v2
with:
app-id: ${{ secrets.GH_APP_ID }}
private-key: ${{ secrets.GH_APP_PRIVATE_KEY }}

View File

@@ -21,7 +21,7 @@ jobs:
steps:
- id: generate_token
if: ${{ github.event_name != 'pull_request' }}
uses: actions/create-github-app-token@29824e69f54612133e76f7eaac726eef6c875baf # v2
uses: actions/create-github-app-token@7e473efe3cb98aa54f8d4bac15400b15fad77d94 # v2
with:
app-id: ${{ secrets.GH_APP_ID }}
private-key: ${{ secrets.GH_APP_PRIVATE_KEY }}
@@ -44,7 +44,7 @@ jobs:
make web-check-compile
- name: Create Pull Request
if: ${{ github.event_name != 'pull_request' }}
uses: peter-evans/create-pull-request@98357b18bf14b5342f975ff684046ec3b2a07725 # v7
uses: peter-evans/create-pull-request@84ae59a2cdc2258d6fa0732dd66352dddae2a412 # v7
with:
token: ${{ steps.generate_token.outputs.token }}
branch: extract-compile-backend-translation

View File

@@ -28,10 +28,8 @@ packages/django-channels-postgres @goauthentik/backend
packages/django-postgres-cache @goauthentik/backend
packages/django-dramatiq-postgres @goauthentik/backend
# Web packages
package.json @goauthentik/frontend
package-lock.json @goauthentik/frontend
packages/package.json @goauthentik/frontend
packages/package-lock.json @goauthentik/frontend
packages/package.json @goauthentik/backend @goauthentik/frontend
packages/package-lock.json @goauthentik/backend @goauthentik/frontend
packages/docusaurus-config @goauthentik/frontend
packages/esbuild-plugin-live-reload @goauthentik/frontend
packages/eslint-config @goauthentik/frontend

View File

@@ -26,7 +26,7 @@ RUN npm run build && \
npm run build:sfe
# Stage 2: Build go proxy
FROM --platform=${BUILDPLATFORM} docker.io/library/golang:1.25.5-trixie@sha256:5d35fb8d28b9095d123b7d96095bbf3750ff18be0a87e5a21c9cffc4351fbf96 AS go-builder
FROM --platform=${BUILDPLATFORM} docker.io/library/golang:1.25.5-trixie@sha256:4f9d98ebaa759f776496d850e0439c48948d587b191fc3949b5f5e4667abef90 AS go-builder
ARG TARGETOS
ARG TARGETARCH
@@ -76,7 +76,7 @@ RUN --mount=type=secret,id=GEOIPUPDATE_ACCOUNT_ID \
/bin/sh -c "GEOIPUPDATE_LICENSE_KEY_FILE=/run/secrets/GEOIPUPDATE_LICENSE_KEY /usr/bin/entry.sh || echo 'Failed to get GeoIP database, disabling'; exit 0"
# Stage 4: Download uv
FROM ghcr.io/astral-sh/uv:0.9.17@sha256:5cb6b54d2bc3fe2eb9a8483db958a0b9eebf9edff68adedb369df8e7b98711a2 AS uv
FROM ghcr.io/astral-sh/uv:0.9.15@sha256:4c1ad814fe658851f50ff95ecd6948673fffddb0d7994bdb019dcb58227abd52 AS uv
# Stage 5: Base python image
FROM ghcr.io/goauthentik/fips-python:3.13.9-slim-trixie-fips@sha256:700fc8c1e290bd14e5eaca50b1d8e8c748c820010559cbfb4c4f8dfbe2c4c9ff AS python-base

View File

@@ -9,13 +9,6 @@ NPM_VERSION = $(shell python -m scripts.generate_semver)
PY_SOURCES = authentik packages tests scripts lifecycle .github
DOCKER_IMAGE ?= "authentik:test"
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Darwin)
SED_INPLACE = sed -i ''
else
SED_INPLACE = sed -i
endif
GEN_API_TS = gen-ts-api
GEN_API_PY = gen-py-api
GEN_API_GO = gen-go-api
@@ -126,8 +119,8 @@ bump: ## Bump authentik version. Usage: make bump version=20xx.xx.xx
ifndef version
$(error Usage: make bump version=20xx.xx.xx )
endif
$(SED_INPLACE) 's/^version = ".*"/version = "$(version)"/' pyproject.toml
$(SED_INPLACE) 's/^VERSION = ".*"/VERSION = "$(version)"/' authentik/__init__.py
sed -i 's/^version = ".*"/version = "$(version)"/' pyproject.toml
sed -i 's/^VERSION = ".*"/VERSION = "$(version)"/' authentik/__init__.py
$(MAKE) gen-build gen-compose aws-cfn
npm version --no-git-tag-version --allow-same-version $(version)
cd ${PWD}/web && npm version --no-git-tag-version --allow-same-version $(version)
@@ -162,8 +155,8 @@ gen-diff: ## (Release) generate the changelog diff between the current schema a
/local/schema-old.yml \
/local/schema.yml
rm schema-old.yml
$(SED_INPLACE) 's/{/{/g' diff.md
$(SED_INPLACE) 's/}/}/g' diff.md
sed -i 's/{/{/g' diff.md
sed -i 's/}/}/g' diff.md
npx prettier --write diff.md
gen-clean-ts: ## Remove generated API client for TypeScript

View File

@@ -3,7 +3,7 @@
from functools import lru_cache
from os import environ
VERSION = "2026.2.0-rc1"
VERSION = "2025.12.0-rc1"
ENV_GIT_HASH_KEY = "GIT_BUILD_HASH"

View File

@@ -240,9 +240,7 @@ class FileUsedByView(APIView):
for field in fields:
q |= Q(**{field: params.get("name")})
objs = get_objects_for_user(
request.user, f"{app}.view_{model_name}", model.objects.all()
)
objs = get_objects_for_user(request.user, f"{app}.view_{model_name}", model)
objs = objs.filter(q)
for obj in objs:
serializer = UsedBySerializer(

View File

@@ -1,13 +1,10 @@
from collections.abc import Callable, Generator, Iterator
from typing import cast
from collections.abc import Generator, Iterator
from django.core.cache import cache
from django.http.request import HttpRequest
from structlog.stdlib import get_logger
from authentik.admin.files.usage import FileUsage
CACHE_PREFIX = "goauthentik.io/admin/files"
LOGGER = get_logger()
@@ -56,19 +53,13 @@ class Backend:
"""
raise NotImplementedError
def file_url(
self,
name: str,
request: HttpRequest | None = None,
use_cache: bool = True,
) -> str:
def file_url(self, name: str, request: HttpRequest | None = None) -> str:
"""
Get URL for accessing the file.
Args:
file_path: Relative file path
request: Optional Django HttpRequest for fully qualifed URL building
use_cache: whether to retrieve the URL from cache
Returns:
URL to access the file (may be relative or absolute depending on backend)
@@ -141,22 +132,3 @@ class ManageableBackend(Backend):
True if file exists, False otherwise
"""
raise NotImplementedError
def _cache_get_or_set(
self,
name: str,
request: HttpRequest | None,
default: Callable[[str, HttpRequest | None], str],
timeout: int,
) -> str:
timeout_ignore = 60
timeout = int(timeout * 0.67)
if timeout < timeout_ignore:
timeout = 0
request_key = "None"
if request is not None:
request_key = f"{request.build_absolute_uri('/')}"
cache_key = f"{CACHE_PREFIX}/{self.name}/{self.usage}/{request_key}/{name}"
return cast(str, cache.get_or_set(cache_key, lambda: default(name, request), timeout))

View File

@@ -63,12 +63,7 @@ class FileBackend(ManageableBackend):
rel_path = full_path.relative_to(self.base_path)
yield str(rel_path)
def file_url(
self,
name: str,
request: HttpRequest | None = None,
use_cache: bool = True,
) -> str:
def file_url(self, name: str, request: HttpRequest | None = None) -> str:
"""Get URL for accessing the file."""
expires_in = timedelta_from_string(
CONFIG.get(
@@ -77,28 +72,21 @@ class FileBackend(ManageableBackend):
)
)
def _file_url(name: str, request: HttpRequest | None) -> str:
prefix = CONFIG.get("web.path", "/")[:-1]
path = f"{self.usage.value}/{connection.schema_name}/{name}"
token = jwt.encode(
payload={
"path": path,
"exp": now() + expires_in,
"nbf": now() - timedelta(seconds=15),
},
key=sha256(f"{settings.SECRET_KEY}:{self.usage}".encode()).hexdigest(),
algorithm="HS256",
)
url = f"{prefix}/files/{path}?token={token}"
if request is None:
return url
return request.build_absolute_uri(url)
if use_cache:
timeout = int(expires_in.total_seconds())
return self._cache_get_or_set(name, request, _file_url, timeout)
else:
return _file_url(name, request)
prefix = CONFIG.get("web.path", "/")[:-1]
path = f"{self.usage.value}/{connection.schema_name}/{name}"
token = jwt.encode(
payload={
"path": path,
"exp": now() + expires_in,
"nbf": now() - timedelta(seconds=15),
},
key=sha256(f"{settings.SECRET_KEY}:{self.usage}".encode()).hexdigest(),
algorithm="HS256",
)
url = f"{prefix}/files/{path}?token={token}"
if request is None:
return url
return request.build_absolute_uri(url)
def save_file(self, name: str, content: bytes) -> None:
"""Save file to local filesystem."""

View File

@@ -38,11 +38,6 @@ class PassthroughBackend(Backend):
"""External files cannot be listed."""
yield from []
def file_url(
self,
name: str,
request: HttpRequest | None = None,
use_cache: bool = True,
) -> str:
def file_url(self, name: str, request: HttpRequest | None = None) -> str:
"""Return the URL as-is for passthrough files."""
return name

View File

@@ -130,57 +130,44 @@ class S3Backend(ManageableBackend):
if rel_path: # Skip if it's just the directory itself
yield rel_path
def file_url(
self,
name: str,
request: HttpRequest | None = None,
use_cache: bool = True,
) -> str:
def file_url(self, name: str, request: HttpRequest | None = None) -> str:
"""Generate presigned URL for file access."""
use_https = CONFIG.get_bool(
f"storage.{self.usage.value}.{self.name}.secure_urls",
CONFIG.get_bool(f"storage.{self.name}.secure_urls", True),
)
expires_in = int(
timedelta_from_string(
CONFIG.get(
f"storage.{self.usage.value}.{self.name}.url_expiry",
CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"),
)
).total_seconds()
params = {
"Bucket": self.bucket_name,
"Key": f"{self.base_path}/{name}",
}
expires_in = timedelta_from_string(
CONFIG.get(
f"storage.{self.usage.value}.{self.name}.url_expiry",
CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"),
)
)
def _file_url(name: str, request: HttpRequest | None) -> str:
params = {
"Bucket": self.bucket_name,
"Key": f"{self.base_path}/{name}",
}
url = self.client.generate_presigned_url(
"get_object",
Params=params,
ExpiresIn=expires_in.total_seconds(),
HttpMethod="GET",
)
url = self.client.generate_presigned_url(
"get_object",
Params=params,
ExpiresIn=expires_in,
HttpMethod="GET",
)
# Support custom domain for S3-compatible storage (so not AWS)
# Well, can't you do custom domains on AWS as well?
custom_domain = CONFIG.get(
f"storage.{self.usage.value}.{self.name}.custom_domain",
CONFIG.get(f"storage.{self.name}.custom_domain", None),
)
if custom_domain:
parsed = urlsplit(url)
scheme = "https" if use_https else "http"
url = f"{scheme}://{custom_domain}{parsed.path}?{parsed.query}"
# Support custom domain for S3-compatible storage (so not AWS)
# Well, can't you do custom domains on AWS as well?
custom_domain = CONFIG.get(
f"storage.{self.usage.value}.{self.name}.custom_domain",
CONFIG.get(f"storage.{self.name}.custom_domain", None),
)
if custom_domain:
parsed = urlsplit(url)
scheme = "https" if use_https else "http"
url = f"{scheme}://{custom_domain}{parsed.path}?{parsed.query}"
return url
if use_cache:
return self._cache_get_or_set(name, request, _file_url, expires_in)
else:
return _file_url(name, request)
return url
def save_file(self, name: str, content: bytes) -> None:
"""Save file to S3."""

View File

@@ -44,12 +44,7 @@ class StaticBackend(Backend):
if file_path.is_file() and (file_path.suffix in STATIC_FILE_EXTENSIONS):
yield f"{STATIC_PATH_PREFIX}/dist/{dir}/{file_path.name}"
def file_url(
self,
name: str,
request: HttpRequest | None = None,
use_cache: bool = True,
) -> str:
def file_url(self, name: str, request: HttpRequest | None = None) -> str:
"""Get URL for static file."""
prefix = CONFIG.get("web.path", "/")[:-1]
url = f"{prefix}{name}"

View File

@@ -1,13 +1,10 @@
from unittest import skipUnless
from django.test import TestCase
from authentik.admin.files.tests.utils import FileTestS3BackendMixin, s3_test_server_available
from authentik.admin.files.tests.utils import FileTestS3BackendMixin
from authentik.admin.files.usage import FileUsage
from authentik.lib.config import CONFIG
@skipUnless(s3_test_server_available(), "S3 test server not available")
class TestS3Backend(FileTestS3BackendMixin, TestCase):
"""Test S3 backend functionality"""

View File

@@ -70,7 +70,6 @@ class FileManager:
self,
name: str | None,
request: HttpRequest | Request | None = None,
use_cache: bool = True,
) -> str:
"""
Get URL for accessing the file.

View File

@@ -1,26 +1,11 @@
import shutil
import socket
from tempfile import mkdtemp
from urllib.parse import urlparse
from authentik.admin.files.backends.s3 import S3Backend
from authentik.admin.files.usage import FileUsage
from authentik.lib.config import CONFIG, UNSET
from authentik.lib.generators import generate_id
S3_TEST_ENDPOINT = "http://localhost:8020"
def s3_test_server_available() -> bool:
"""Check if the S3 test server is reachable."""
parsed = urlparse(S3_TEST_ENDPOINT)
try:
with socket.create_connection((parsed.hostname, parsed.port), timeout=2):
return True
except OSError:
return False
class FileTestFileBackendMixin:
def setUp(self):
@@ -72,7 +57,7 @@ class FileTestS3BackendMixin:
for key in s3_config_keys:
self.original_media_s3_settings[key] = CONFIG.get(f"storage.media.s3.{key}", UNSET)
self.media_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower()
CONFIG.set("storage.media.s3.endpoint", S3_TEST_ENDPOINT)
CONFIG.set("storage.media.s3.endpoint", "http://localhost:8020")
CONFIG.set("storage.media.s3.access_key", "accessKey1")
CONFIG.set("storage.media.s3.secret_key", "secretKey1")
CONFIG.set("storage.media.s3.bucket_name", self.media_s3_bucket_name)
@@ -85,7 +70,7 @@ class FileTestS3BackendMixin:
for key in s3_config_keys:
self.original_reports_s3_settings[key] = CONFIG.get(f"storage.reports.s3.{key}", UNSET)
self.reports_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower()
CONFIG.set("storage.reports.s3.endpoint", S3_TEST_ENDPOINT)
CONFIG.set("storage.reports.s3.endpoint", "http://localhost:8020")
CONFIG.set("storage.reports.s3.access_key", "accessKey1")
CONFIG.set("storage.reports.s3.secret_key", "secretKey1")
CONFIG.set("storage.reports.s3.bucket_name", self.reports_s3_bucket_name)

View File

@@ -70,9 +70,6 @@ class IPCUser(AnonymousUser):
def is_authenticated(self):
return True
def all_roles(self):
return []
class TokenAuthentication(BaseAuthentication):
"""Token-based authentication using HTTP Bearer authentication"""

View File

@@ -13,11 +13,6 @@ class Pagination(pagination.PageNumberPagination):
page_query_param = "page"
page_size_query_param = "page_size"
def get_page_size(self, request):
if self.page_size_query_param in request.query_params:
return min(super().get_page_size(request), request.tenant.pagination_max_page_size)
return request.tenant.pagination_default_page_size
def get_paginated_response(self, data):
previous_page_number = 0
if self.page.has_previous():

View File

@@ -1,12 +1,10 @@
"""authentik Blueprints app"""
import traceback
from collections.abc import Callable
from importlib import import_module
from inspect import ismethod
from django.apps import AppConfig
from django.conf import settings
from django.db import DatabaseError, InternalError, ProgrammingError
from dramatiq.broker import get_broker
from structlog.stdlib import BoundLogger, get_logger
@@ -46,21 +44,8 @@ class ManagedAppConfig(AppConfig):
module_name = f"{self.name}.{rel_module}"
import_module(module_name)
self.logger.info("Imported related module", module=module_name)
except ModuleNotFoundError as exc:
if settings.DEBUG:
# This is a heuristic for determining whether the exception was caused
# "directly" by the `import_module` call or whether the initial import
# succeeded and a later import (within the existing module) failed.
# 1. <the calling function>
# 2. importlib.import_module
# 3. importlib._bootstrap._gcd_import
# 4. importlib._bootstrap._find_and_load
# 5. importlib._bootstrap._find_and_load_unlocked
STACK_LENGTH_HEURISTIC = 5
stack_length = len(traceback.extract_tb(exc.__traceback__))
if stack_length > STACK_LENGTH_HEURISTIC:
raise
except ModuleNotFoundError:
pass
import_relative("checks")
import_relative("tasks")

View File

@@ -5,7 +5,6 @@ from typing import Any
from django.core.management.base import BaseCommand, no_translations
from django.db.models import Model, fields
from django.db.models.fields.related import OneToOneField
from drf_jsonschema_serializer.convert import converter, field_to_converter
from rest_framework.fields import Field, JSONField, UUIDField
from rest_framework.relations import PrimaryKeyRelatedField
@@ -33,8 +32,6 @@ class PrimaryKeyRelatedFieldConverter:
def convert(self, field: PrimaryKeyRelatedField):
model: Model = field.queryset.model
pk_field = model._meta.pk
if isinstance(pk_field, OneToOneField):
pk_field = pk_field.related_fields[0][1]
if isinstance(pk_field, fields.UUIDField):
return {"type": "string", "format": "uuid"}
return {"type": "integer"}

View File

@@ -36,7 +36,10 @@ class TestBlueprintsV1RBAC(TransactionTestCase):
self.assertTrue(importer.apply())
role = Role.objects.filter(name=uid).first()
self.assertIsNotNone(role)
self.assertEqual(get_perms(role), {"authentik_blueprints.view_blueprintinstance"})
self.assertEqual(
list(role.group.permissions.all().values_list("codename", flat=True)),
["view_blueprintinstance"],
)
def test_object_permission(self):
"""Test permissions"""
@@ -50,5 +53,5 @@ class TestBlueprintsV1RBAC(TransactionTestCase):
user = User.objects.filter(username=uid).first()
role = Role.objects.filter(name=uid).first()
self.assertIsNotNone(flow)
self.assertEqual(get_perms(user, flow), {"authentik_flows.view_flow"})
self.assertEqual(get_perms(role, flow), {"authentik_flows.view_flow"})
self.assertEqual(get_perms(user, flow), ["view_flow"])
self.assertEqual(get_perms(role.group, flow), ["view_flow"])

View File

@@ -16,7 +16,8 @@ from django.db.models.query_utils import Q
from django.db.transaction import atomic
from django.db.utils import IntegrityError
from django_channels_postgres.models import GroupChannel, Message
from guardian.models import RoleObjectPermission, UserObjectPermission
from guardian.models import UserObjectPermission
from guardian.shortcuts import assign_perm
from rest_framework.exceptions import ValidationError
from rest_framework.serializers import BaseSerializer, Serializer
from structlog.stdlib import BoundLogger, get_logger
@@ -109,7 +110,6 @@ def excluded_models() -> list[type[Model]]:
DjangoGroup,
ContentType,
Permission,
RoleObjectPermission,
UserObjectPermission,
# Base classes
Provider,
@@ -394,12 +394,10 @@ class Importer:
"""Apply object-level permissions for an entry"""
for perm in entry.get_permissions(self._import):
if perm.user is not None:
User.objects.get(pk=perm.user).assign_perms_to_managed_role(
perm.permission, instance
)
assign_perm(perm.permission, User.objects.get(pk=perm.user), instance)
if perm.role is not None:
role = Role.objects.get(pk=perm.role)
role.assign_perms(perm.permission, obj=instance)
role.assign_permission(perm.permission, obj=instance)
def apply(self) -> bool:
"""Apply (create/update) models yaml, in database transaction"""

View File

@@ -4,8 +4,7 @@ from collections.abc import Iterator
from copy import copy
from django.core.cache import cache
from django.db.models import Case, QuerySet
from django.db.models.expressions import When
from django.db.models import QuerySet
from django.shortcuts import get_object_or_404
from django.utils.translation import gettext as _
from drf_spectacular.types import OpenApiTypes
@@ -23,7 +22,6 @@ from authentik.api.pagination import Pagination
from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
from authentik.core.api.providers import ProviderSerializer
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.users import UserSerializer
from authentik.core.api.utils import ModelSerializer
from authentik.core.models import Application, User
from authentik.events.logs import LogEventSerializer, capture_logs
@@ -57,21 +55,9 @@ class ApplicationSerializer(ModelSerializer):
def get_launch_url(self, app: Application) -> str | None:
"""Allow formatting of launch URL"""
user = None
user_data = None
if "request" in self.context:
user = self.context["request"].user
# Cache serialized user data to avoid N+1 when formatting launch URLs
# for multiple applications. UserSerializer accesses user.ak_groups which
# would otherwise trigger a query for each application.
if user is not None:
if "_cached_user_data" not in self.context:
# Prefetch groups to avoid N+1
self.context["_cached_user_data"] = UserSerializer(instance=user).data
user_data = self.context["_cached_user_data"]
return app.get_launch_url(user, user_data=user_data)
return app.get_launch_url(user)
def validate_slug(self, slug: str) -> str:
if slug in Application.reserved_slugs:
@@ -164,23 +150,8 @@ class ApplicationViewSet(UsedByMixin, ModelViewSet):
applications.append(application)
return applications
def _expand_applications(self, applications: list[Application]) -> QuerySet[Application]:
"""
Re-fetch with proper prefetching for serialization
Cached applications don't have prefetched relationships, causing N+1 queries
during serialization when get_provider() is called
"""
if not applications:
return self.get_queryset().none()
pks = [app.pk for app in applications]
return (
self.get_queryset()
.filter(pk__in=pks)
.order_by(Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(pks)]))
)
def _filter_applications_with_launch_url(
self, paginated_apps: QuerySet[Application]
self, paginated_apps: Iterator[Application]
) -> list[Application]:
applications = []
for app in paginated_apps:
@@ -283,8 +254,6 @@ class ApplicationViewSet(UsedByMixin, ModelViewSet):
except ValueError as exc:
raise ValidationError from exc
allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user)
allowed_applications = self._expand_applications(allowed_applications)
serializer = self.get_serializer(allowed_applications, many=True)
return self.get_paginated_response(serializer.data)
@@ -303,7 +272,6 @@ class ApplicationViewSet(UsedByMixin, ModelViewSet):
allowed_applications,
timeout=86400,
)
allowed_applications = self._expand_applications(allowed_applications)
if only_with_launch_url == "true":
allowed_applications = self._filter_applications_with_launch_url(allowed_applications)

View File

@@ -78,7 +78,7 @@ class AdminDeviceViewSet(ViewSet):
"""Get all devices in all child classes"""
for model in device_classes():
device_set = get_objects_for_user(
self.request.user, f"{model._meta.app_label}.view_{model._meta.model_name}"
self.request.user, f"{model._meta.app_label}.view_{model._meta.model_name}", model
).filter(**kwargs)
yield from device_set

View File

@@ -18,10 +18,10 @@ from rest_framework.authentication import SessionAuthentication
from rest_framework.decorators import action
from rest_framework.fields import CharField, IntegerField, SerializerMethodField
from rest_framework.permissions import IsAuthenticated
from rest_framework.relations import PrimaryKeyRelatedField
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.serializers import ListSerializer, ValidationError
from rest_framework.validators import UniqueValidator
from rest_framework.viewsets import ModelViewSet
from authentik.api.authentication import TokenAuthentication
@@ -54,8 +54,8 @@ class PartialUserSerializer(ModelSerializer):
]
class RelatedGroupSerializer(ModelSerializer):
"""Stripped down group serializer to show relevant children/parents for groups"""
class GroupChildSerializer(ModelSerializer):
"""Stripped down group serializer to show relevant children for groups"""
attributes = JSONDictField(required=False)
@@ -74,16 +74,15 @@ class GroupSerializer(ModelSerializer):
"""Group Serializer"""
attributes = JSONDictField(required=False)
parents = PrimaryKeyRelatedField(queryset=Group.objects.all(), many=True, required=False)
parents_obj = SerializerMethodField(allow_null=True)
children_obj = SerializerMethodField(allow_null=True)
users_obj = SerializerMethodField(allow_null=True)
children_obj = SerializerMethodField(allow_null=True)
roles_obj = ListSerializer(
child=RoleSerializer(),
read_only=True,
source="roles",
required=False,
)
parent_name = CharField(source="parent.name", read_only=True, allow_null=True)
num_pk = IntegerField(read_only=True)
@property
@@ -100,30 +99,25 @@ class GroupSerializer(ModelSerializer):
return True
return str(request.query_params.get("include_children", "false")).lower() == "true"
@property
def _should_include_parents(self) -> bool:
request: Request = self.context.get("request", None)
if not request:
return True
return str(request.query_params.get("include_parents", "false")).lower() == "true"
@extend_schema_field(PartialUserSerializer(many=True))
def get_users_obj(self, instance: Group) -> list[PartialUserSerializer] | None:
if not self._should_include_users:
return None
return PartialUserSerializer(instance.users, many=True).data
@extend_schema_field(RelatedGroupSerializer(many=True))
def get_children_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None:
@extend_schema_field(GroupChildSerializer(many=True))
def get_children_obj(self, instance: Group) -> list[GroupChildSerializer] | None:
if not self._should_include_children:
return None
return RelatedGroupSerializer(instance.children, many=True).data
return GroupChildSerializer(instance.children, many=True).data
@extend_schema_field(RelatedGroupSerializer(many=True))
def get_parents_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None:
if not self._should_include_parents:
return None
return RelatedGroupSerializer(instance.parents, many=True).data
def validate_parent(self, parent: Group | None):
"""Validate group parent (if set), ensuring the parent isn't itself"""
if not self.instance or not parent:
return parent
if str(parent.group_uuid) == str(self.instance.group_uuid):
raise ValidationError(_("Cannot set group as parent of itself."))
return parent
def validate_is_superuser(self, superuser: bool):
"""Ensure that the user creating this group has permissions to set the superuser flag"""
@@ -159,8 +153,8 @@ class GroupSerializer(ModelSerializer):
"num_pk",
"name",
"is_superuser",
"parents",
"parents_obj",
"parent",
"parent_name",
"users",
"users_obj",
"attributes",
@@ -177,10 +171,9 @@ class GroupSerializer(ModelSerializer):
"required": False,
"default": list,
},
"parents": {
"required": False,
"default": list,
},
# TODO: This field isn't unique on the database which is hard to backport
# hence we just validate the uniqueness here
"name": {"validators": [UniqueValidator(Group.objects.all())]},
}
@@ -246,7 +239,11 @@ class GroupViewSet(UsedByMixin, ModelViewSet):
]
def get_ql_fields(self):
from akql.schema import BoolField, JSONSearchField, StrField
from djangoql.schema import BoolField, StrField
from authentik.enterprise.search.fields import (
JSONSearchField,
)
return [
StrField(Group, "name"),
@@ -255,7 +252,7 @@ class GroupViewSet(UsedByMixin, ModelViewSet):
]
def get_queryset(self):
base_qs = Group.objects.all().prefetch_related("roles")
base_qs = Group.objects.all().select_related("parent").prefetch_related("roles")
if self.serializer_class(context={"request": self.request})._should_include_users:
base_qs = base_qs.prefetch_related("users")
@@ -267,16 +264,12 @@ class GroupViewSet(UsedByMixin, ModelViewSet):
if self.serializer_class(context={"request": self.request})._should_include_children:
base_qs = base_qs.prefetch_related("children")
if self.serializer_class(context={"request": self.request})._should_include_parents:
base_qs = base_qs.prefetch_related("parents")
return base_qs
@extend_schema(
parameters=[
OpenApiParameter("include_users", bool, default=True),
OpenApiParameter("include_children", bool, default=False),
OpenApiParameter("include_parents", bool, default=False),
]
)
def list(self, request, *args, **kwargs):
@@ -286,7 +279,6 @@ class GroupViewSet(UsedByMixin, ModelViewSet):
parameters=[
OpenApiParameter("include_users", bool, default=True),
OpenApiParameter("include_children", bool, default=False),
OpenApiParameter("include_parents", bool, default=False),
]
)
def retrieve(self, request, *args, **kwargs):

View File

@@ -4,6 +4,7 @@ from typing import Any
from django.utils.timezone import now
from drf_spectacular.utils import OpenApiResponse, extend_schema
from guardian.shortcuts import assign_perm, get_anonymous_user
from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError
from rest_framework.fields import CharField
@@ -144,15 +145,19 @@ class TokenViewSet(UsedByMixin, ModelViewSet):
owner_field = "user"
rbac_allow_create_without_perm = True
def get_queryset(self):
user = self.request.user if self.request else get_anonymous_user()
if user.is_superuser:
return super().get_queryset()
return super().get_queryset().filter(user=user.pk)
def perform_create(self, serializer: TokenSerializer):
if not self.request.user.is_superuser:
instance = serializer.save(
user=self.request.user,
expiring=self.request.user.attributes.get(USER_ATTRIBUTE_TOKEN_EXPIRING, True),
)
self.request.user.assign_perms_to_managed_role(
"authentik_core.view_token_key", instance
)
assign_perm("authentik_core.view_token_key", self.request.user, instance)
return instance
return super().perform_create(serializer)

View File

@@ -81,7 +81,7 @@ class UsedByMixin:
# query and check if there is a difference between modes the user can see
# and can't see and add a warning
for obj in get_objects_for_user(
request.user, f"{app}.view_{model_name}", manager.all()
request.user, f"{app}.view_{model_name}", manager
).all():
# Only merge shadows on first object
if first_object:

View File

@@ -86,10 +86,8 @@ from authentik.flows.models import FlowToken
from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner
from authentik.flows.views.executor import QS_KEY_TOKEN
from authentik.lib.avatars import get_avatar
from authentik.lib.utils.reflection import ConditionalInheritance
from authentik.rbac.api.roles import RoleSerializer
from authentik.rbac.decorators import permission_required
from authentik.rbac.models import Role, get_permission_choices
from authentik.rbac.models import get_permission_choices
from authentik.stages.email.flow import pickle_flow_token_for_email
from authentik.stages.email.models import EmailStage
from authentik.stages.email.tasks import send_mails
@@ -108,6 +106,7 @@ class PartialGroupSerializer(ModelSerializer):
"""Partial Group Serializer, does not include child relations."""
attributes = JSONDictField(required=False)
parent_name = CharField(source="parent.name", read_only=True, allow_null=True)
class Meta:
model = Group
@@ -116,6 +115,8 @@ class PartialGroupSerializer(ModelSerializer):
"num_pk",
"name",
"is_superuser",
"parent",
"parent_name",
"attributes",
]
@@ -134,13 +135,6 @@ class UserSerializer(ModelSerializer):
default=list,
)
groups_obj = SerializerMethodField(allow_null=True)
roles = PrimaryKeyRelatedField(
allow_empty=True,
many=True,
queryset=Role.objects.all().order_by("name"),
default=list,
)
roles_obj = SerializerMethodField(allow_null=True)
uid = CharField(read_only=True)
username = CharField(
max_length=150,
@@ -154,25 +148,12 @@ class UserSerializer(ModelSerializer):
return True
return str(request.query_params.get("include_groups", "true")).lower() == "true"
@property
def _should_include_roles(self) -> bool:
request: Request = self.context.get("request", None)
if not request:
return True
return str(request.query_params.get("include_roles", "true")).lower() == "true"
@extend_schema_field(PartialGroupSerializer(many=True))
def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None:
if not self._should_include_groups:
return None
return PartialGroupSerializer(instance.ak_groups, many=True).data
@extend_schema_field(RoleSerializer(many=True))
def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None:
if not self._should_include_roles:
return None
return RoleSerializer(instance.roles, many=True).data
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
@@ -187,26 +168,24 @@ class UserSerializer(ModelSerializer):
directly setting a password. However should be done via the `set_password`
method instead of directly setting it like rest_framework."""
password = validated_data.pop("password", None)
perms_qs = Permission.objects.filter(
permissions = Permission.objects.filter(
codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
).values_list("content_type__app_label", "codename")
perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
)
validated_data["user_permissions"] = permissions
instance: User = super().create(validated_data)
self._set_password(instance, password)
instance.assign_perms_to_managed_role(perms_list)
return instance
def update(self, instance: User, validated_data: dict) -> User:
"""Same as `create` above, set the password directly if we're in a blueprint
context"""
password = validated_data.pop("password", None)
perms_qs = Permission.objects.filter(
permissions = Permission.objects.filter(
codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
).values_list("content_type__app_label", "codename")
perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
)
validated_data["user_permissions"] = permissions
instance = super().update(instance, validated_data)
self._set_password(instance, password)
instance.assign_perms_to_managed_role(perms_list)
return instance
def _set_password(self, instance: User, password: str | None):
@@ -261,8 +240,6 @@ class UserSerializer(ModelSerializer):
"is_superuser",
"groups",
"groups_obj",
"roles",
"roles_obj",
"email",
"avatar",
"attributes",
@@ -286,7 +263,6 @@ class UserSelfSerializer(ModelSerializer):
is_superuser = BooleanField(read_only=True)
avatar = SerializerMethodField()
groups = SerializerMethodField()
roles = SerializerMethodField()
uid = CharField(read_only=True)
settings = SerializerMethodField()
system_permissions = SerializerMethodField()
@@ -314,25 +290,6 @@ class UserSelfSerializer(ModelSerializer):
"pk": group.pk,
}
@extend_schema_field(
ListSerializer(
child=inline_serializer(
"UserSelfRoles",
{
"name": CharField(read_only=True),
"pk": CharField(read_only=True),
},
)
)
)
def get_roles(self, _: User):
"""Return only the roles a user is member of"""
for role in self.instance.all_roles().order_by("name"):
yield {
"name": role.name,
"pk": role.pk,
}
def get_settings(self, user: User) -> dict[str, Any]:
"""Get user settings with brand and group settings applied"""
return user.group_attributes(self._context["request"]).get("settings", {})
@@ -354,7 +311,6 @@ class UserSelfSerializer(ModelSerializer):
"is_active",
"is_superuser",
"groups",
"roles",
"email",
"avatar",
"uid",
@@ -434,16 +390,6 @@ class UsersFilter(FilterSet):
queryset=Group.objects.all().order_by("name"),
)
roles_by_name = ModelMultipleChoiceFilter(
field_name="roles__name",
to_field_name="name",
queryset=Role.objects.all().order_by("name"),
)
roles_by_pk = ModelMultipleChoiceFilter(
field_name="roles",
queryset=Role.objects.all().order_by("name"),
)
def filter_is_superuser(self, queryset, name, value):
if value:
return queryset.filter(ak_groups__is_superuser=True).distinct()
@@ -479,17 +425,11 @@ class UsersFilter(FilterSet):
"attributes",
"groups_by_name",
"groups_by_pk",
"roles_by_name",
"roles_by_pk",
"type",
]
class UserViewSet(
ConditionalInheritance("authentik.enterprise.reports.api.reports.ExportMixin"),
UsedByMixin,
ModelViewSet,
):
class UserViewSet(UsedByMixin, ModelViewSet):
"""User Viewset"""
queryset = User.objects.none()
@@ -504,7 +444,12 @@ class UserViewSet(
]
def get_ql_fields(self):
from akql.schema import BoolField, ChoiceSearchField, JSONSearchField, StrField
from djangoql.schema import BoolField, StrField
from authentik.enterprise.search.fields import (
ChoiceSearchField,
JSONSearchField,
)
return [
StrField(User, "username"),
@@ -520,14 +465,11 @@ class UserViewSet(
base_qs = User.objects.all().exclude_anonymous()
if self.serializer_class(context={"request": self.request})._should_include_groups:
base_qs = base_qs.prefetch_related("ak_groups")
if self.serializer_class(context={"request": self.request})._should_include_roles:
base_qs = base_qs.prefetch_related("roles")
return base_qs
@extend_schema(
parameters=[
OpenApiParameter("include_groups", bool, default=True),
OpenApiParameter("include_roles", bool, default=True),
]
)
def list(self, request, *args, **kwargs):

View File

@@ -12,27 +12,7 @@ from authentik.flows.views.executor import SESSION_KEY_PLAN
from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
class ModelBackendNoAuthz(ModelBackend):
def get_user_permissions(self, user_obj, obj=None):
return set()
def get_group_permissions(self, user_obj, obj=None):
return set()
def get_all_permissions(self, user_obj, obj=None):
return set()
def has_perm(self, user_obj, perm, obj=None):
return False
def has_module_perms(self, user_obj, app_label):
return False
def with_perm(self, perm, is_active=True, include_superusers=True, obj=None):
return User.objects.none()
class InbuiltBackend(ModelBackendNoAuthz):
class InbuiltBackend(ModelBackend):
"""Inbuilt backend"""
def authenticate(

View File

@@ -6,6 +6,7 @@ import django.contrib.auth.models
import django.contrib.auth.validators
import django.db.models.deletion
import django.utils.timezone
import guardian.mixins
from django.conf import settings
from django.db import migrations, models
@@ -110,7 +111,7 @@ class Migration(migrations.Migration):
options={
"permissions": (("reset_user_password", "Reset Password"),),
},
bases=(models.Model,),
bases=(guardian.mixins.GuardianUserMixin, models.Model),
managers=[
("objects", django.contrib.auth.models.UserManager()),
],

View File

@@ -1,155 +0,0 @@
# Generated by Django 5.1.12 on 2025-09-12 08:38
import django.db.models.deletion
import pgtrigger.compiler
import pgtrigger.migrations
import psqlextra.backend.migrations.operations.apply_state
import psqlextra.backend.migrations.operations.create_materialized_view_model
import psqlextra.indexes.unique_index
import psqlextra.manager.manager
import psqlextra.models.view
import uuid
from django.apps.registry import Apps
from django.db import migrations, models
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
def migrate_parents(apps: Apps, schema_editor: BaseDatabaseSchemaEditor):
Group = apps.get_model("authentik_core", "Group")
db_alias = schema_editor.connection.alias
for group in Group.objects.using(db_alias).all():
if not group.parent:
continue
group.parents.add(group.parent)
group.save()
class Migration(migrations.Migration):
dependencies = [
("authentik_core", "0054_alter_application_meta_icon_alter_source_icon"),
]
operations = [
migrations.CreateModel(
name="GroupParentageNode",
fields=[
(
"uuid",
models.UUIDField(
default=uuid.uuid4, editable=False, primary_key=True, serialize=False
),
),
],
options={
"verbose_name": "Group Parentage Node",
"verbose_name_plural": "Group Parentage Nodes",
"db_table": "authentik_core_groupparentage",
},
),
migrations.AddField(
model_name="groupparentagenode",
name="child",
field=models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="parent_nodes",
to="authentik_core.group",
),
),
migrations.AddField(
model_name="groupparentagenode",
name="parent",
field=models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="child_nodes",
to="authentik_core.group",
),
),
psqlextra.backend.migrations.operations.create_materialized_view_model.PostgresCreateMaterializedViewModel(
name="GroupAncestryNode",
fields=[
(
"id",
models.AutoField(
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
),
),
],
options={
"db_table": "authentik_core_groupancestry",
},
view_options={
"query": (
"\n WITH RECURSIVE accumulator AS (\n SELECT\n child_id::text || '-' || parent_id::text as id,\n child_id AS descendant_id,\n parent_id AS ancestor_id\n FROM authentik_core_groupparentage\n\n UNION\n\n SELECT\n accumulator.descendant_id::text || '-' || current.parent_id::text as id,\n accumulator.descendant_id,\n current.parent_id AS ancestor_id\n FROM accumulator\n JOIN authentik_core_groupparentage current\n ON accumulator.ancestor_id = current.child_id\n )\n SELECT * FROM accumulator\n ",
(),
),
},
bases=(psqlextra.models.view.PostgresMaterializedViewModel,),
managers=[
("objects", psqlextra.manager.manager.PostgresManager()),
],
),
psqlextra.backend.migrations.operations.apply_state.ApplyState(
state_operation=migrations.AddField(
model_name="groupancestrynode",
name="ancestor",
field=models.ForeignKey(
on_delete=django.db.models.deletion.DO_NOTHING,
related_name="descendant_nodes",
to="authentik_core.group",
),
),
),
psqlextra.backend.migrations.operations.apply_state.ApplyState(
state_operation=migrations.AddField(
model_name="groupancestrynode",
name="descendant",
field=models.ForeignKey(
on_delete=django.db.models.deletion.DO_NOTHING,
related_name="ancestor_nodes",
to="authentik_core.group",
),
),
),
migrations.AddIndex(
model_name="groupancestrynode",
index=models.Index(fields=["descendant"], name="authentik_c_descend_f83a71_idx"),
),
migrations.AddIndex(
model_name="groupancestrynode",
index=models.Index(fields=["ancestor"], name="authentik_c_ancesto_974845_idx"),
),
migrations.AddIndex(
model_name="groupancestrynode",
index=psqlextra.indexes.unique_index.UniqueIndex(
fields=["id"], name="authentik_c_id_5d0bb4_idx"
),
),
pgtrigger.migrations.AddTrigger(
model_name="groupparentagenode",
trigger=pgtrigger.compiler.Trigger(
name="refresh_groupancestry",
sql=pgtrigger.compiler.UpsertTriggerSql(
func="\n REFRESH MATERIALIZED VIEW CONCURRENTLY authentik_core_groupancestry;\n RETURN NULL;\n ",
hash="a987621714359aa0389e03fd2d52f86b118e7d24",
operation="INSERT OR UPDATE OR DELETE",
pgid="pgtrigger_refresh_groupancestry_62450",
table="authentik_core_groupparentage",
when="AFTER",
),
),
),
migrations.AddField(
model_name="group",
name="parents",
field=models.ManyToManyField(
blank=True,
related_name="children",
through="authentik_core.GroupParentageNode",
to="authentik_core.group",
),
),
migrations.RunPython(migrate_parents, migrations.RunPython.noop),
]

View File

@@ -1,180 +0,0 @@
# Generated by Django 5.1.12 on 2025-09-30 12:29
from django.db import migrations, models
from django.apps.registry import Apps
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
def migrate_object_permissions(apps: Apps, schema_editor: BaseDatabaseSchemaEditor):
db_alias = schema_editor.connection.alias
User = apps.get_model("authentik_core", "User")
Group = apps.get_model("auth", "Group")
Role = apps.get_model("authentik_rbac", "Role")
UserObjectPermission = apps.get_model("guardian", "UserObjectPermission")
GroupObjectPermission = apps.get_model("guardian", "GroupObjectPermission")
RoleObjectPermission = apps.get_model("guardian", "RoleObjectPermission")
RoleModelPermission = apps.get_model("guardian", "RoleModelPermission")
def get_role_for_user_id(user_id: int) -> Role:
name = f"ak-managed-role--user-{user_id}"
role, created = Role.objects.using(db_alias).get_or_create(
name=name,
managed=name,
)
if created:
role.users.add(user_id)
return role
def get_role_for_group_id(group_id: int) -> Role:
role = Role.objects.using(db_alias).filter(group_id=group_id).first()
if not role:
# Every django group should already have a role, so this should never happen.
# But let's be nice.
name = f"ak-managed-role--group-{group_id}"
role, created = Role.objects.using(db_alias).get_or_create(
group_id=group_id,
name=name,
managed=name,
)
if created:
role.group_id = group_id
role.save()
return role
# Below are 4 very similar pieces of code, for (user, group) x (model, object).
# Since this is a one-off migration, I won't attempt DRYing them.
# User model permissions
user_ids_with_model_permissions = (
User.user_permissions.through.objects.using(db_alias)
.values_list("user", flat=True)
.distinct()
)
for user_id in user_ids_with_model_permissions:
role = get_role_for_user_id(user_id)
user_model_permissions = User.user_permissions.through.objects.using(db_alias).filter(
user_id=user_id
)
role_model_permissions = []
for user_model_permission in user_model_permissions:
role_model_permissions.append(
RoleModelPermission(
permission=user_model_permission.permission,
content_type=user_model_permission.permission.content_type,
role=role,
)
)
RoleModelPermission.objects.using(db_alias).bulk_create(role_model_permissions)
# Group model permissions
group_ids_with_model_permissions = (
Group.permissions.through.objects.using(db_alias).values_list("group", flat=True).distinct()
)
for group_id in group_ids_with_model_permissions:
role = get_role_for_group_id(group_id)
group_model_permissions = Group.permissions.through.objects.using(db_alias).filter(
group_id=group_id
)
role_model_permissions = []
for group_model_permission in group_model_permissions:
role_model_permissions.append(
RoleModelPermission(
permission=group_model_permission.permission,
content_type=group_model_permission.permission.content_type,
role=role,
)
)
RoleModelPermission.objects.using(db_alias).bulk_create(role_model_permissions)
# User object permissions
user_ids_with_object_permissions = (
UserObjectPermission.objects.using(db_alias).values_list("user", flat=True).distinct()
)
for user_id in user_ids_with_object_permissions:
role = get_role_for_user_id(user_id)
user_object_permissions = UserObjectPermission.objects.using(db_alias).filter(user=user_id)
role_object_permissions = []
for user_object_permission in user_object_permissions:
role_object_permissions.append(
RoleObjectPermission(
permission=user_object_permission.permission,
content_type=user_object_permission.content_type,
object_pk=user_object_permission.object_pk,
role=role,
)
)
RoleObjectPermission.objects.using(db_alias).bulk_create(role_object_permissions)
# Group object permissions
group_ids_with_object_permissions = (
GroupObjectPermission.objects.using(db_alias).values_list("group", flat=True).distinct()
)
for group_id in group_ids_with_object_permissions:
role = get_role_for_group_id(group_id)
group_object_permissions = GroupObjectPermission.objects.using(db_alias).filter(
group=group_id
)
role_object_permissions = []
for group_object_permission in group_object_permissions:
role_object_permissions.append(
RoleObjectPermission(
permission=group_object_permission.permission,
content_type=group_object_permission.content_type,
object_pk=group_object_permission.object_pk,
role=role,
)
)
RoleObjectPermission.objects.using(db_alias).bulk_create(role_object_permissions)
class Migration(migrations.Migration):
dependencies = [
("guardian", "0004_role_permissions"),
("authentik_core", "0055_groupancestor_groupparentagenode_group_parents"),
("authentik_rbac", "0008_alter_role_group"),
]
operations = [
migrations.AddField(
model_name="user",
name="roles",
field=models.ManyToManyField(
blank=True, related_name="users", to="authentik_rbac.role"
),
),
migrations.RunPython(migrate_object_permissions),
migrations.AlterUniqueTogether(
name="group",
unique_together=set(),
),
migrations.AlterField(
model_name="group",
name="parents",
field=models.ManyToManyField(
blank=True,
related_name="children",
through="authentik_core.GroupParentageNode",
to="authentik_core.group",
),
),
migrations.RemoveField(
model_name="group",
name="parent",
),
migrations.AlterField(
model_name="group",
name="name",
field=models.TextField(unique=True, verbose_name="name"),
),
]

View File

@@ -6,10 +6,9 @@ from hashlib import sha256
from typing import Any, Optional, Self
from uuid import uuid4
import pgtrigger
from deepmerge import always_merger
from django.contrib.auth.hashers import check_password
from django.contrib.auth.models import AbstractUser, Permission
from django.contrib.auth.models import AbstractUser
from django.contrib.auth.models import UserManager as DjangoUserManager
from django.contrib.sessions.base_session import AbstractBaseSession
from django.core.validators import validate_slug
@@ -20,11 +19,10 @@ from django.http import HttpRequest
from django.utils.functional import cached_property
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from django_cte import CTE, with_cte
from guardian.conf import settings
from guardian.models import RoleModelPermission, RoleObjectPermission
from guardian.mixins import GuardianUserMixin
from model_utils.managers import InheritanceManager
from psqlextra.indexes import UniqueIndex
from psqlextra.models import PostgresMaterializedViewModel
from rest_framework.serializers import Serializer
from structlog.stdlib import get_logger
@@ -45,7 +43,6 @@ from authentik.lib.models import (
)
from authentik.lib.utils.time import timedelta_from_string
from authentik.policies.models import PolicyBindingModel
from authentik.rbac.models import Role
from authentik.tenants.models import DEFAULT_TOKEN_DURATION, DEFAULT_TOKEN_LENGTH
from authentik.tenants.utils import get_current_tenant, get_unique_identifier
@@ -72,17 +69,6 @@ options.DEFAULT_NAMES = options.DEFAULT_NAMES + (
GROUP_RECURSION_LIMIT = 20
MANAGED_ROLE_PREFIX_USER = "ak-managed-role--user"
MANAGED_ROLE_PREFIX_GROUP = "ak-managed-role--group"
def managed_role_name(user_or_group: models.Model):
if isinstance(user_or_group, User):
return f"{MANAGED_ROLE_PREFIX_USER}-{user_or_group.pk}"
if isinstance(user_or_group, Group):
return f"{MANAGED_ROLE_PREFIX_GROUP}-{user_or_group.pk}"
raise TypeError("Managed roles are only available for User or Group.")
def default_token_duration() -> datetime:
"""Default duration a Token is valid"""
@@ -152,7 +138,7 @@ class AttributesMixin(models.Model):
@classmethod
def update_or_create_attributes(
cls, query: dict[str, Any], properties: dict[str, Any]
) -> tuple[Self, bool]:
) -> tuple[models.Model, bool]:
"""Same as django's update_or_create but correctly updates attributes by merging dicts"""
instance = cls.objects.filter(**query).first()
if not instance:
@@ -162,40 +148,69 @@ class AttributesMixin(models.Model):
class GroupQuerySet(QuerySet):
def with_descendants(self):
pks = self.values_list("pk", flat=True)
return Group.objects.filter(Q(pk__in=pks) | Q(ancestor_nodes__ancestor__in=pks)).distinct()
def with_children_recursive(self):
"""Recursively get all groups that have the current queryset as parents
or are indirectly related."""
def with_ancestors(self):
pks = self.values_list("pk", flat=True)
return Group.objects.filter(
Q(pk__in=pks) | Q(descendant_nodes__descendant__in=pks)
).distinct()
def make_cte(cte):
"""Build the query that ends up in WITH RECURSIVE"""
# Start from self, aka the current query
# Add a depth attribute to limit the recursion
return self.annotate(
relative_depth=models.Value(0, output_field=models.IntegerField())
).union(
# Here is the recursive part of the query. cte refers to the previous iteration
# Only select groups for which the parent is part of the previous iteration
# and increase the depth
# Finally, limit the depth
cte.join(Group, group_uuid=cte.col.parent_id)
.annotate(
relative_depth=models.ExpressionWrapper(
cte.col.relative_depth
+ models.Value(1, output_field=models.IntegerField()),
output_field=models.IntegerField(),
)
)
.filter(relative_depth__lt=GROUP_RECURSION_LIMIT),
all=True,
)
# Build the recursive query, see above
cte = CTE.recursive(make_cte)
# Return the result, as a usable queryset for Group.
return with_cte(cte, select=cte.join(Group, group_uuid=cte.col.group_uuid))
class Group(SerializerModel, AttributesMixin):
"""Group model which supports a hierarchy and has attributes"""
"""Group model which supports a basic hierarchy and has attributes"""
group_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
name = models.TextField(verbose_name=_("name"), unique=True)
name = models.TextField(_("name"))
is_superuser = models.BooleanField(
default=False, help_text=_("Users added to this group will be superusers.")
)
roles = models.ManyToManyField("authentik_rbac.Role", related_name="ak_groups", blank=True)
parents = models.ManyToManyField(
parent = models.ForeignKey(
"Group",
blank=True,
symmetrical=False,
through="GroupParentageNode",
null=True,
default=None,
on_delete=models.SET_NULL,
related_name="children",
)
objects = GroupQuerySet.as_manager()
class Meta:
unique_together = (
(
"name",
"parent",
),
)
indexes = (
models.Index(fields=["name"]),
models.Index(fields=["is_superuser"]),
@@ -229,103 +244,12 @@ class Group(SerializerModel, AttributesMixin):
"""Recursively check if `user` is member of us, or any parent."""
return user.all_groups().filter(group_uuid=self.group_uuid).exists()
def all_roles(self) -> QuerySet[Role]:
"""Get all roles of this group and all of its ancestors."""
return Role.objects.filter(
ak_groups__in=Group.objects.filter(pk=self.pk).with_ancestors()
).distinct()
def get_managed_role(self, create=False):
if create:
name = managed_role_name(self)
role, created = Role.objects.get_or_create(name=name, managed=name)
if created:
role.ak_groups.add(self)
return role
else:
return Role.objects.filter(name=managed_role_name(self)).first()
def assign_perms_to_managed_role(
self,
perms: str | list[str] | Permission | list[Permission],
obj: models.Model | None = None,
):
if not perms:
return
role = self.get_managed_role(create=True)
role.assign_perms(perms, obj)
class GroupParentageNode(models.Model):
uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
child = models.ForeignKey(Group, related_name="parent_nodes", on_delete=models.CASCADE)
parent = models.ForeignKey(Group, related_name="child_nodes", on_delete=models.CASCADE)
class Meta:
verbose_name = _("Group Parentage Node")
verbose_name_plural = _("Group Parentage Nodes")
db_table = "authentik_core_groupparentage"
triggers = [
pgtrigger.Trigger(
name="refresh_groupancestry",
operation=pgtrigger.Insert | pgtrigger.Update | pgtrigger.Delete,
when=pgtrigger.After,
func="""
REFRESH MATERIALIZED VIEW CONCURRENTLY authentik_core_groupancestry;
RETURN NULL;
""",
),
]
def __str__(self) -> str:
return f"Group Parentage Node from #{self.child_id} to {self.parent_id}"
class GroupAncestryNode(PostgresMaterializedViewModel):
descendant = models.ForeignKey(
Group, related_name="ancestor_nodes", on_delete=models.DO_NOTHING
)
ancestor = models.ForeignKey(
Group, related_name="descendant_nodes", on_delete=models.DO_NOTHING
)
class Meta:
# This is a transitive closure of authentik_core_groupparentage
# See https://en.wikipedia.org/wiki/Transitive_closure#In_graph_theory
db_table = "authentik_core_groupancestry"
indexes = [
models.Index(fields=["descendant"]),
models.Index(fields=["ancestor"]),
UniqueIndex(fields=["id"]),
]
class ViewMeta:
query = """
WITH RECURSIVE accumulator AS (
SELECT
child_id::text || '-' || parent_id::text as id,
child_id AS descendant_id,
parent_id AS ancestor_id
FROM authentik_core_groupparentage
UNION
SELECT
accumulator.descendant_id::text || '-' || current.parent_id::text as id,
accumulator.descendant_id,
current.parent_id AS ancestor_id
FROM accumulator
JOIN authentik_core_groupparentage current
ON accumulator.ancestor_id = current.child_id
)
SELECT * FROM accumulator
"""
def __str__(self) -> str:
return f"Group Ancestry Node from {self.descendant_id} to {self.ancestor_id}"
def children_recursive(self: Self | QuerySet["Group"]) -> QuerySet["Group"]:
"""Compatibility layer for Group.objects.with_children_recursive()"""
qs = self
if not isinstance(self, QuerySet):
qs = Group.objects.filter(group_uuid=self.group_uuid)
return qs.with_children_recursive()
class UserQuerySet(models.QuerySet):
@@ -352,7 +276,7 @@ class UserManager(DjangoUserManager):
return self.get_queryset().exclude_anonymous()
class User(SerializerModel, AttributesMixin, AbstractUser):
class User(SerializerModel, GuardianUserMixin, AttributesMixin, AbstractUser):
"""authentik User model, based on django's contrib auth user model."""
uuid = models.UUIDField(default=uuid4, editable=False, unique=True)
@@ -362,7 +286,6 @@ class User(SerializerModel, AttributesMixin, AbstractUser):
sources = models.ManyToManyField("Source", through="UserSourceConnection")
ak_groups = models.ManyToManyField("Group", related_name="users")
roles = models.ManyToManyField("authentik_rbac.Role", related_name="users", blank=True)
password_change_date = models.DateTimeField(auto_now_add=True)
last_updated = models.DateTimeField(auto_now=True)
@@ -400,60 +323,7 @@ class User(SerializerModel, AttributesMixin, AbstractUser):
def all_groups(self) -> QuerySet[Group]:
"""Recursively get all groups this user is a member of."""
return self.ak_groups.all().with_ancestors()
def all_roles(self) -> QuerySet[Role]:
"""Get all roles of this user and all of its groups (recursively)."""
return Role.objects.filter(Q(users=self) | Q(ak_groups__in=self.all_groups())).distinct()
def get_managed_role(self, create=False):
if create:
name = managed_role_name(self)
role, created = Role.objects.get_or_create(name=name, managed=name)
if created:
role.users.add(self)
return role
else:
return Role.objects.filter(name=managed_role_name(self)).first()
def get_all_model_perms_on_managed_role(self) -> QuerySet[RoleModelPermission]:
role = self.get_managed_role()
if not role:
return RoleModelPermission.objects.none()
return RoleModelPermission.objects.filter(role=role)
def get_all_obj_perms_on_managed_role(self) -> QuerySet[RoleObjectPermission]:
role = self.get_managed_role()
if not role:
return RoleObjectPermission.objects.none()
return RoleObjectPermission.objects.filter(role=role)
def assign_perms_to_managed_role(
self,
perms: str | list[str] | Permission | list[Permission],
obj: models.Model | None = None,
):
if not perms:
return
role = self.get_managed_role(create=True)
role.assign_perms(perms, obj)
def remove_perms_from_managed_role(
self,
perms: str | list[str] | Permission | list[Permission],
obj: models.Model | None = None,
):
role = self.get_managed_role()
if not role:
return None
role.remove_perms(perms, obj)
def remove_all_perms_from_managed_role(self):
role = self.get_managed_role()
if not role:
return None
RoleModelPermission.objects.filter(role=role).delete()
RoleObjectPermission.objects.filter(role=role).delete()
return self.ak_groups.all().with_children_recursive()
def group_attributes(self, request: HttpRequest | None = None) -> dict[str, Any]:
"""Get a dictionary containing the attributes from all groups the user belongs to,
@@ -658,10 +528,6 @@ class ApplicationQuerySet(QuerySet):
qs = self.select_related("provider")
for subclass in Provider.objects.get_queryset()._get_subclasses_recurse(Provider):
qs = qs.select_related(f"provider__{subclass}")
# Also prefetch/select through each subclass path to ensure casted instances have access
qs = qs.prefetch_related(f"provider__{subclass}__property_mappings")
qs = qs.select_related(f"provider__{subclass}__application")
qs = qs.select_related(f"provider__{subclass}__backchannel_application")
return qs
@@ -713,15 +579,8 @@ class Application(SerializerModel, PolicyBindingModel):
return get_file_manager(FileUsage.MEDIA).file_url(self.meta_icon)
def get_launch_url(
self, user: Optional["User"] = None, user_data: dict | None = None
) -> str | None:
"""Get launch URL if set, otherwise attempt to get launch URL based on provider.
Args:
user: User instance for formatting the URL
user_data: Pre-serialized user data to avoid re-serialization (performance optimization)
"""
def get_launch_url(self, user: Optional["User"] = None) -> str | None:
"""Get launch URL if set, otherwise attempt to get launch URL based on provider."""
from authentik.core.api.users import UserSerializer
url = None
@@ -731,10 +590,7 @@ class Application(SerializerModel, PolicyBindingModel):
url = provider.launch_url
if user and url:
try:
# Use pre-serialized data if available, otherwise serialize now
if user_data is None:
user_data = UserSerializer(instance=user).data
return url % user_data
return url % UserSerializer(instance=user).data
except Exception as exc: # noqa
LOGGER.warning("Failed to format launch url", exc=exc)
return url

View File

@@ -34,12 +34,19 @@ class SessionStore(SessionBase):
def _get_session_from_db(self):
try:
return self.model.objects.select_related(
"authenticatedsession",
"authenticatedsession__user",
).get(
session_key=self.session_key,
expires__gt=timezone.now(),
return (
self.model.objects.select_related(
"authenticatedsession",
"authenticatedsession__user",
)
.prefetch_related(
"authenticatedsession__user__groups",
"authenticatedsession__user__user_permissions",
)
.get(
session_key=self.session_key,
expires__gt=timezone.now(),
)
)
except (self.model.DoesNotExist, SuspiciousOperation) as exc:
if isinstance(exc, SuspiciousOperation):
@@ -48,12 +55,19 @@ class SessionStore(SessionBase):
async def _aget_session_from_db(self):
try:
return await self.model.objects.select_related(
"authenticatedsession",
"authenticatedsession__user",
).aget(
session_key=self.session_key,
expires__gt=timezone.now(),
return (
await self.model.objects.select_related(
"authenticatedsession",
"authenticatedsession__user",
)
.prefetch_related(
"authenticatedsession__user__groups",
"authenticatedsession__user__user_permissions",
)
.aget(
session_key=self.session_key,
expires__gt=timezone.now(),
)
)
except (self.model.DoesNotExist, SuspiciousOperation) as exc:
if isinstance(exc, SuspiciousOperation):

View File

@@ -1,6 +1,7 @@
"""Test Application Entitlements API"""
from django.urls import reverse
from guardian.shortcuts import assign_perm
from rest_framework.test import APITestCase
from authentik.core.models import Application, ApplicationEntitlement, Group
@@ -48,8 +49,7 @@ class TestApplicationEntitlements(APITestCase):
def test_group_indirect(self):
"""Test indirect group"""
parent = Group.objects.create(name=generate_id())
group = Group.objects.create(name=generate_id())
group.parents.add(parent)
group = Group.objects.create(name=generate_id(), parent=parent)
self.user.ak_groups.add(group)
ent = ApplicationEntitlement.objects.create(app=self.app, name=generate_id())
PolicyBinding.objects.create(target=ent, group=parent, order=0)
@@ -76,8 +76,8 @@ class TestApplicationEntitlements(APITestCase):
def test_api_perms_global(self):
"""Test API creation with global permissions"""
self.user.assign_perms_to_managed_role("authentik_core.add_applicationentitlement")
self.user.assign_perms_to_managed_role("authentik_core.view_application")
assign_perm("authentik_core.add_applicationentitlement", self.user)
assign_perm("authentik_core.view_application", self.user)
self.client.force_login(self.user)
res = self.client.post(
reverse("authentik_api:applicationentitlement-list"),
@@ -90,8 +90,8 @@ class TestApplicationEntitlements(APITestCase):
def test_api_perms_scoped(self):
"""Test API creation with scoped permissions"""
self.user.assign_perms_to_managed_role("authentik_core.add_applicationentitlement")
self.user.assign_perms_to_managed_role("authentik_core.view_application", self.app)
assign_perm("authentik_core.add_applicationentitlement", self.user)
assign_perm("authentik_core.view_application", self.user, self.app)
self.client.force_login(self.user)
res = self.client.post(
reverse("authentik_api:applicationentitlement-list"),
@@ -104,7 +104,7 @@ class TestApplicationEntitlements(APITestCase):
def test_api_perms_missing(self):
"""Test API creation with no permissions"""
self.user.assign_perms_to_managed_role("authentik_core.add_applicationentitlement")
assign_perm("authentik_core.add_applicationentitlement", self.user)
self.client.force_login(self.user)
res = self.client.post(
reverse("authentik_api:applicationentitlement-list"),

View File

@@ -25,8 +25,7 @@ class TestGroups(TestCase):
user = User.objects.create(username=generate_id())
user2 = User.objects.create(username=generate_id())
parent = Group.objects.create(name=generate_id())
child = Group.objects.create(name=generate_id())
child.parents.add(parent)
child = Group.objects.create(name=generate_id(), parent=parent)
child.users.add(user)
self.assertTrue(child.is_member(user))
self.assertTrue(parent.is_member(user))
@@ -38,10 +37,8 @@ class TestGroups(TestCase):
user = User.objects.create(username=generate_id())
user2 = User.objects.create(username=generate_id())
parent = Group.objects.create(name=generate_id())
second = Group.objects.create(name=generate_id())
second.parents.add(parent)
third = Group.objects.create(name=generate_id())
third.parents.add(second)
second = Group.objects.create(name=generate_id(), parent=parent)
third = Group.objects.create(name=generate_id(), parent=second)
second.users.add(user)
self.assertTrue(parent.is_member(user))
self.assertFalse(parent.is_member(user2))
@@ -54,21 +51,9 @@ class TestGroups(TestCase):
"""Test group membership (recursive)"""
user = User.objects.create(username=generate_id())
group = Group.objects.create(name=generate_id())
group2 = Group.objects.create(name=generate_id())
group.parents.add(group2)
group2.parents.add(group)
group2 = Group.objects.create(name=generate_id(), parent=group)
group.users.add(user)
group.parent = group2
group.save()
self.assertTrue(group.is_member(user))
self.assertTrue(group2.is_member(user))
def test_group_managed_role(self):
"""Test group managed role"""
perm = "authentik_core.view_user"
user = User.objects.create(username=generate_id())
group = Group.objects.create(name=generate_id())
group.users.add(user)
group.assign_perms_to_managed_role(perm)
self.assertEqual(group.roles.count(), 1)
self.assertEqual(user.roles.count(), 0)
self.assertTrue(user.has_perm(perm))

View File

@@ -1,6 +1,7 @@
"""Test Groups API"""
from django.urls.base import reverse
from guardian.shortcuts import assign_perm
from rest_framework.test import APITestCase
from authentik.core.models import Group
@@ -36,8 +37,8 @@ class TestGroupsAPI(APITestCase):
def test_add_user(self):
"""Test add_user"""
group = Group.objects.create(name=generate_id())
self.login_user.assign_perms_to_managed_role("authentik_core.add_user_to_group", group)
self.login_user.assign_perms_to_managed_role("authentik_core.view_user")
assign_perm("authentik_core.add_user_to_group", self.login_user, group)
assign_perm("authentik_core.view_user", self.login_user)
self.client.force_login(self.login_user)
res = self.client.post(
reverse("authentik_api:group-add-user", kwargs={"pk": group.pk}),
@@ -52,8 +53,8 @@ class TestGroupsAPI(APITestCase):
def test_add_user_404(self):
"""Test add_user"""
group = Group.objects.create(name=generate_id())
self.login_user.assign_perms_to_managed_role("authentik_core.add_user_to_group", group)
self.login_user.assign_perms_to_managed_role("authentik_core.view_user")
assign_perm("authentik_core.add_user_to_group", self.login_user, group)
assign_perm("authentik_core.view_user", self.login_user)
self.client.force_login(self.login_user)
res = self.client.post(
reverse("authentik_api:group-add-user", kwargs={"pk": group.pk}),
@@ -66,8 +67,8 @@ class TestGroupsAPI(APITestCase):
def test_remove_user(self):
"""Test remove_user"""
group = Group.objects.create(name=generate_id())
self.login_user.assign_perms_to_managed_role("authentik_core.remove_user_from_group", group)
self.login_user.assign_perms_to_managed_role("authentik_core.view_user")
assign_perm("authentik_core.remove_user_from_group", self.login_user, group)
assign_perm("authentik_core.view_user", self.login_user)
group.users.add(self.user)
self.client.force_login(self.login_user)
res = self.client.post(
@@ -83,8 +84,8 @@ class TestGroupsAPI(APITestCase):
def test_remove_user_404(self):
"""Test remove_user"""
group = Group.objects.create(name=generate_id())
self.login_user.assign_perms_to_managed_role("authentik_core.remove_user_from_group", group)
self.login_user.assign_perms_to_managed_role("authentik_core.view_user")
assign_perm("authentik_core.remove_user_from_group", self.login_user, group)
assign_perm("authentik_core.view_user", self.login_user)
group.users.add(self.user)
self.client.force_login(self.login_user)
res = self.client.post(
@@ -95,9 +96,23 @@ class TestGroupsAPI(APITestCase):
)
self.assertEqual(res.status_code, 404)
def test_parent_self(self):
"""Test parent"""
group = Group.objects.create(name=generate_id())
assign_perm("view_group", self.login_user, group)
assign_perm("change_group", self.login_user, group)
self.client.force_login(self.login_user)
res = self.client.patch(
reverse("authentik_api:group-detail", kwargs={"pk": group.pk}),
data={
"parent": group.pk,
},
)
self.assertEqual(res.status_code, 400)
def test_superuser_no_perm(self):
"""Test creating a superuser group without permission"""
self.login_user.assign_perms_to_managed_role("authentik_core.add_group")
assign_perm("authentik_core.add_group", self.login_user)
self.client.force_login(self.login_user)
res = self.client.post(
reverse("authentik_api:group-list"),
@@ -111,7 +126,7 @@ class TestGroupsAPI(APITestCase):
def test_superuser_no_perm_no_superuser(self):
"""Test creating a group without permission and without superuser flag"""
self.login_user.assign_perms_to_managed_role("authentik_core.add_group")
assign_perm("authentik_core.add_group", self.login_user)
self.client.force_login(self.login_user)
res = self.client.post(
reverse("authentik_api:group-list"),
@@ -122,8 +137,8 @@ class TestGroupsAPI(APITestCase):
def test_superuser_update_no_perm(self):
"""Test updating a superuser group without permission"""
group = Group.objects.create(name=generate_id(), is_superuser=True)
self.login_user.assign_perms_to_managed_role("view_group", group)
self.login_user.assign_perms_to_managed_role("change_group", group)
assign_perm("view_group", self.login_user, group)
assign_perm("change_group", self.login_user, group)
self.client.force_login(self.login_user)
res = self.client.patch(
reverse("authentik_api:group-detail", kwargs={"pk": group.pk}),
@@ -139,8 +154,8 @@ class TestGroupsAPI(APITestCase):
"""Test updating a superuser group without permission
and without changing the superuser status"""
group = Group.objects.create(name=generate_id(), is_superuser=True)
self.login_user.assign_perms_to_managed_role("view_group", group)
self.login_user.assign_perms_to_managed_role("change_group", group)
assign_perm("view_group", self.login_user, group)
assign_perm("change_group", self.login_user, group)
self.client.force_login(self.login_user)
res = self.client.patch(
reverse("authentik_api:group-detail", kwargs={"pk": group.pk}),
@@ -150,8 +165,8 @@ class TestGroupsAPI(APITestCase):
def test_superuser_create(self):
"""Test creating a superuser group with permission"""
self.login_user.assign_perms_to_managed_role("authentik_core.add_group")
self.login_user.assign_perms_to_managed_role("authentik_core.enable_group_superuser")
assign_perm("authentik_core.add_group", self.login_user)
assign_perm("authentik_core.enable_group_superuser", self.login_user)
self.client.force_login(self.login_user)
res = self.client.post(
reverse("authentik_api:group-list"),

View File

@@ -3,6 +3,7 @@
from json import loads
from django.urls import reverse
from guardian.shortcuts import assign_perm
from rest_framework.test import APITestCase
from authentik.core.tests.utils import create_test_admin_user, create_test_user
@@ -47,8 +48,8 @@ class TestImpersonation(APITestCase):
def test_impersonate_global(self):
"""Test impersonation with global permissions"""
new_user = create_test_user()
new_user.assign_perms_to_managed_role("authentik_core.impersonate")
new_user.assign_perms_to_managed_role("authentik_core.view_user")
assign_perm("authentik_core.impersonate", new_user)
assign_perm("authentik_core.view_user", new_user)
self.client.force_login(new_user)
response = self.client.post(
@@ -68,8 +69,8 @@ class TestImpersonation(APITestCase):
def test_impersonate_scoped(self):
"""Test impersonation with scoped permissions"""
new_user = create_test_user()
new_user.assign_perms_to_managed_role("authentik_core.impersonate", self.other_user)
new_user.assign_perms_to_managed_role("authentik_core.view_user", self.other_user)
assign_perm("authentik_core.impersonate", new_user, self.other_user)
assign_perm("authentik_core.view_user", new_user, self.other_user)
self.client.force_login(new_user)
response = self.client.post(

View File

@@ -39,7 +39,7 @@ def source_tester_factory(test_model: type[Source]) -> Callable:
def tester(self: TestModels):
model_class = None
if test_model._meta.abstract:
return
model_class = [x for x in test_model.__bases__ if issubclass(x, Source)][0]()
else:
model_class = test_model()
model_class.slug = "test"

View File

@@ -3,7 +3,7 @@
from django.contrib.auth.models import AnonymousUser
from django.test import TestCase
from django.urls import reverse
from guardian.shortcuts import get_anonymous_user
from guardian.utils import get_anonymous_user
from authentik.core.models import SourceUserMatchingModes, User
from authentik.core.sources.flow_manager import Action

View File

@@ -183,16 +183,16 @@ class TestTokenAPI(APITestCase):
self.assertEqual(len(body["results"]), 1)
self.assertEqual(body["results"][0]["identifier"], token_should.identifier)
def test_list_with_permission(self):
"""Test Token List (Test with `view_token` permission)"""
def test_list_admin(self):
"""Test Token List (Test with admin auth)"""
Token.objects.all().delete()
self.client.force_login(self.admin)
token_should: Token = Token.objects.create(
identifier="test", expiring=False, user=self.user
)
token_should_not: Token = Token.objects.create(
identifier="test-2", expiring=False, user=get_anonymous_user()
)
self.user.assign_perms_to_managed_role("authentik_core.view_token")
response = self.client.get(reverse("authentik_api:token-list"))
body = loads(response.content)
self.assertEqual(len(body["results"]), 2)

View File

@@ -1,6 +1,7 @@
"""Test Transactional API"""
from django.urls import reverse
from guardian.shortcuts import assign_perm
from rest_framework.test import APITestCase
from authentik.core.models import Application, Group
@@ -15,8 +16,8 @@ class TestTransactionalApplicationsAPI(APITestCase):
def setUp(self) -> None:
self.user = create_test_user()
self.user.assign_perms_to_managed_role("authentik_core.add_application")
self.user.assign_perms_to_managed_role("authentik_providers_oauth2.add_oauth2provider")
assign_perm("authentik_core.add_application", self.user)
assign_perm("authentik_providers_oauth2.add_oauth2provider", self.user)
def test_create_transactional(self):
"""Test transactional Application + provider creation"""
@@ -72,7 +73,7 @@ class TestTransactionalApplicationsAPI(APITestCase):
def test_create_transactional_bindings(self):
"""Test transactional Application + provider creation"""
self.user.assign_perms_to_managed_role("authentik_policies.add_policybinding")
assign_perm("authentik_policies.add_policybinding", self.user)
self.client.force_login(self.user)
uid = generate_id()
group = Group.objects.create(name=generate_id())

View File

@@ -1,20 +0,0 @@
"""user tests"""
from django.test.testcases import TestCase
from authentik.core.models import User
from authentik.lib.generators import generate_id
class TestUsers(TestCase):
"""Test user"""
def test_user_managed_role(self):
"""Test user managed role"""
perm = "authentik_core.view_user"
user = User.objects.create(username=generate_id())
user.assign_perms_to_managed_role(perm)
self.assertEqual(user.roles.count(), 1)
self.assertTrue(user.has_perm(perm))
user.remove_perms_from_managed_role(perm)
self.assertFalse(user.has_perm(perm))

View File

@@ -1,5 +1,7 @@
"""Crypto API Views"""
from datetime import datetime
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.x509 import load_pem_x509_certificate
@@ -13,12 +15,14 @@ from drf_spectacular.utils import (
OpenApiParameter,
OpenApiResponse,
extend_schema,
extend_schema_field,
)
from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError
from rest_framework.fields import (
CharField,
ChoiceField,
DateTimeField,
IntegerField,
SerializerMethodField,
)
@@ -47,15 +51,59 @@ LOGGER = get_logger()
class CertificateKeyPairSerializer(ModelSerializer):
"""CertificateKeyPair Serializer"""
fingerprint_sha256 = SerializerMethodField()
fingerprint_sha1 = SerializerMethodField()
cert_expiry = SerializerMethodField()
cert_subject = SerializerMethodField()
private_key_available = SerializerMethodField()
key_type = SerializerMethodField()
certificate_download_url = SerializerMethodField()
private_key_download_url = SerializerMethodField()
@property
def _should_include_details(self) -> bool:
request: Request = self.context.get("request", None)
if not request:
return True
return str(request.query_params.get("include_details", "true")).lower() == "true"
def get_fingerprint_sha256(self, instance: CertificateKeyPair) -> str | None:
"Get certificate Hash (SHA256)"
if not self._should_include_details:
return None
return instance.fingerprint_sha256
def get_fingerprint_sha1(self, instance: CertificateKeyPair) -> str | None:
"Get certificate Hash (SHA1)"
if not self._should_include_details:
return None
return instance.fingerprint_sha1
def get_cert_expiry(self, instance: CertificateKeyPair) -> datetime | None:
"Get certificate expiry"
if not self._should_include_details:
return None
return DateTimeField().to_representation(instance.certificate.not_valid_after_utc)
def get_cert_subject(self, instance: CertificateKeyPair) -> str | None:
"""Get certificate subject as full rfc4514"""
if not self._should_include_details:
return None
return instance.certificate.subject.rfc4514_string()
def get_private_key_available(self, instance: CertificateKeyPair) -> bool:
"""Show if this keypair has a private key configured or not"""
return instance.key_data != "" and instance.key_data is not None
@extend_schema_field(ChoiceField(choices=KeyType.choices, allow_null=True))
def get_key_type(self, instance: CertificateKeyPair) -> str | None:
"""Get the key algorithm type from the certificate's public key"""
if not self._should_include_details:
return None
return instance.key_type
def get_certificate_download_url(self, instance: CertificateKeyPair) -> str:
"""Get URL to download certificate"""
return (
@@ -127,11 +175,6 @@ class CertificateKeyPairSerializer(ModelSerializer):
"managed": {"read_only": True},
"key_data": {"write_only": True},
"certificate_data": {"write_only": True},
"fingerprint_sha256": {"read_only": True},
"fingerprint_sha1": {"read_only": True},
"cert_expiry": {"read_only": True},
"cert_subject": {"read_only": True},
"key_type": {"read_only": True},
}
@@ -173,12 +216,17 @@ class CertificateKeyPairFilter(FilterSet):
return queryset.exclude(key_data__exact="")
def filter_key_type(self, queryset, name, value): # pragma: no cover
"""Filter certificates by key type using the stored database field"""
"""Filter certificates by key type using the public key from the certificate"""
if not value:
return queryset
# value is a list of KeyType enum values from MultipleChoiceFilter
return queryset.filter(key_type__in=value)
filtered_pks = []
for cert in queryset:
if cert.key_type in value:
filtered_pks.append(cert.pk)
return queryset.filter(pk__in=filtered_pks)
class Meta:
model = CertificateKeyPair
@@ -215,6 +263,7 @@ class CertificateKeyPairViewSet(UsedByMixin, ModelViewSet):
"Can be specified multiple times (e.g. '?key_type=rsa&key_type=ec')"
),
),
OpenApiParameter("include_details", bool, default=True),
]
)
def list(self, request, *args, **kwargs):

View File

@@ -1,117 +0,0 @@
# Generated by Django 5.2.9 on 2025-12-09 06:22
from hashlib import md5
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_pem_x509_certificate
from django.db import migrations, models
from authentik.crypto.signals import extract_certificate_metadata
def backfill_certificate_metadata(apps, schema_editor): # noqa: ARG001
"""Backfill certificate metadata and kid for existing records."""
CertificateKeyPair = apps.get_model("authentik_crypto", "CertificateKeyPair")
for cert in CertificateKeyPair.objects.all():
updated_fields = []
if cert.certificate_data:
try:
certificate = load_pem_x509_certificate(
cert.certificate_data.encode("utf-8"), default_backend()
)
metadata = extract_certificate_metadata(certificate)
cert.key_type = metadata["key_type"]
cert.cert_expiry = metadata["cert_expiry"]
cert.cert_subject = metadata["cert_subject"]
cert.fingerprint_sha256 = metadata["fingerprint_sha256"]
cert.fingerprint_sha1 = metadata["fingerprint_sha1"]
updated_fields.extend(
[
"key_type",
"cert_expiry",
"cert_subject",
"fingerprint_sha256",
"fingerprint_sha1",
]
)
except (ValueError, TypeError, AttributeError):
pass
# Backfill kid with MD5 for backwards compatibility
if cert.key_data:
cert.kid = md5(cert.key_data.encode("utf-8"), usedforsecurity=False).hexdigest()
updated_fields.append("kid")
if updated_fields:
cert.save(update_fields=updated_fields)
class Migration(migrations.Migration):
dependencies = [
("authentik_crypto", "0005_alter_certificatekeypair_options"),
]
operations = [
migrations.AddField(
model_name="certificatekeypair",
name="cert_expiry",
field=models.DateTimeField(blank=True, help_text="Certificate expiry date", null=True),
),
migrations.AddField(
model_name="certificatekeypair",
name="cert_subject",
field=models.TextField(
blank=True, help_text="Certificate subject as RFC4514 string", null=True
),
),
migrations.AddField(
model_name="certificatekeypair",
name="fingerprint_sha1",
field=models.CharField(
blank=True,
help_text="SHA1 fingerprint of the certificate",
max_length=59,
null=True,
),
),
migrations.AddField(
model_name="certificatekeypair",
name="fingerprint_sha256",
field=models.CharField(
blank=True,
help_text="SHA256 fingerprint of the certificate",
max_length=95,
null=True,
),
),
migrations.AddField(
model_name="certificatekeypair",
name="key_type",
field=models.CharField(
blank=True,
choices=[
("rsa", "RSA"),
("ec", "Elliptic Curve"),
("dsa", "DSA"),
("ed25519", "Ed25519"),
("ed448", "Ed448"),
],
help_text="Key algorithm type detected from the certificate's public key",
max_length=16,
null=True,
),
),
migrations.AddField(
model_name="certificatekeypair",
name="kid",
field=models.CharField(
blank=True, help_text="Key ID generated from private key", max_length=128, null=True
),
),
migrations.RunPython(backfill_certificate_metadata, migrations.RunPython.noop),
]

View File

@@ -1,8 +1,7 @@
"""authentik crypto models"""
from base64 import urlsafe_b64encode
from binascii import hexlify
from hashlib import md5, sha512
from hashlib import md5
from ssl import PEM_FOOTER, PEM_HEADER
from textwrap import wrap
from uuid import uuid4
@@ -48,39 +47,6 @@ def fingerprint_sha256(cert: Certificate) -> str:
return hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8")
def detect_key_type(certificate: Certificate) -> str | None:
"""Detect the key algorithm type by parsing the certificate's public key"""
try:
public_key = certificate.public_key()
if isinstance(public_key, RSAPublicKey):
return KeyType.RSA
if isinstance(public_key, EllipticCurvePublicKey):
return KeyType.EC
if isinstance(public_key, DSAPublicKey):
return KeyType.DSA
if isinstance(public_key, Ed25519PublicKey):
return KeyType.ED25519
if isinstance(public_key, Ed448PublicKey):
return KeyType.ED448
except (ValueError, TypeError, AttributeError) as exc:
LOGGER.warning("Failed to detect key type", exc=exc)
return None
def generate_key_id(key_data: str) -> str:
"""Generate Key ID using SHA512 + urlsafe_b64encode."""
if not key_data:
return ""
return urlsafe_b64encode(sha512(key_data.encode("utf-8")).digest()).decode("utf-8").rstrip("=")
def generate_key_id_legacy(key_data: str) -> str:
"""Generate Key ID using MD5 (legacy format for backwards compatibility)."""
if not key_data:
return ""
return md5(key_data.encode("utf-8")).hexdigest() # nosec
class CertificateKeyPair(SerializerModel, ManagedModel, CreatedUpdatedModel):
"""CertificateKeyPair that can be used for signing or encrypting if `key_data`
is set, otherwise it can be used to verify remote data."""
@@ -96,41 +62,6 @@ class CertificateKeyPair(SerializerModel, ManagedModel, CreatedUpdatedModel):
blank=True,
default="",
)
key_type = models.CharField(
max_length=16,
choices=KeyType.choices,
null=True,
blank=True,
help_text=_("Key algorithm type detected from the certificate's public key"),
)
cert_expiry = models.DateTimeField(
null=True,
blank=True,
help_text=_("Certificate expiry date"),
)
cert_subject = models.TextField(
null=True,
blank=True,
help_text=_("Certificate subject as RFC4514 string"),
)
fingerprint_sha256 = models.CharField(
max_length=95,
null=True,
blank=True,
help_text=_("SHA256 fingerprint of the certificate"),
)
fingerprint_sha1 = models.CharField(
max_length=59,
null=True,
blank=True,
help_text=_("SHA1 fingerprint of the certificate"),
)
kid = models.CharField(
max_length=128,
null=True,
blank=True,
help_text=_("Key ID generated from private key"),
)
_cert: Certificate | None = None
_private_key: PrivateKeyTypes | None = None
@@ -175,6 +106,41 @@ class CertificateKeyPair(SerializerModel, ManagedModel, CreatedUpdatedModel):
return None
return self._private_key
@property
def fingerprint_sha256(self) -> str:
"""Get SHA256 Fingerprint of certificate_data"""
return fingerprint_sha256(self.certificate)
@property
def fingerprint_sha1(self) -> str:
"""Get SHA1 Fingerprint of certificate_data"""
return hexlify(self.certificate.fingerprint(hashes.SHA1()), ":").decode("utf-8") # nosec
@property
def kid(self):
"""Get Key ID used for JWKS"""
return (
md5(self.key_data.encode("utf-8"), usedforsecurity=False).hexdigest()
if self.key_data
else ""
) # nosec
@property
def key_type(self) -> str | None:
"""Get the key algorithm type from the certificate's public key"""
public_key = self.certificate.public_key()
if isinstance(public_key, RSAPublicKey):
return KeyType.RSA
if isinstance(public_key, EllipticCurvePublicKey):
return KeyType.EC
if isinstance(public_key, DSAPublicKey):
return KeyType.DSA
if isinstance(public_key, Ed25519PublicKey):
return KeyType.ED25519
if isinstance(public_key, Ed448PublicKey):
return KeyType.ED448
return None
def __str__(self) -> str:
return f"Certificate-Key Pair {self.name}"

View File

@@ -1,70 +0,0 @@
"""authentik crypto signals"""
from binascii import hexlify
from datetime import datetime
from ssl import CertificateError
from cryptography.hazmat.primitives import hashes
from cryptography.x509 import Certificate
from django.db.models.signals import pre_save
from django.dispatch import receiver
from structlog.stdlib import get_logger
from authentik.crypto.models import (
CertificateKeyPair,
detect_key_type,
fingerprint_sha256,
generate_key_id,
generate_key_id_legacy,
)
LOGGER = get_logger()
def extract_certificate_metadata(certificate: Certificate) -> dict[str, str | datetime]:
"""Extract all metadata fields from a certificate."""
metadata = {}
try:
metadata["key_type"] = detect_key_type(certificate)
metadata["cert_expiry"] = certificate.not_valid_after_utc
metadata["cert_subject"] = certificate.subject.rfc4514_string()
metadata["fingerprint_sha256"] = fingerprint_sha256(certificate)
metadata["fingerprint_sha1"] = hexlify(
certificate.fingerprint(hashes.SHA1()), ":" # nosec
).decode("utf-8")
except (ValueError, TypeError, AttributeError) as exc:
raise CertificateError(f"Invalid certificate metadata: {exc}") from exc
return metadata
@receiver(pre_save, sender="authentik_crypto.CertificateKeyPair")
def certificate_key_pair_pre_save(
sender: type[CertificateKeyPair], instance: CertificateKeyPair, **_
):
"""Automatically populate certificate metadata fields before saving"""
# Only extract metadata if certificate_data is present
if not instance.certificate_data:
return
try:
metadata = extract_certificate_metadata(instance.certificate)
except (CertificateError, ValueError, TypeError, AttributeError) as exc:
LOGGER.warning("Failed to extract certificate metadata", exc=exc)
return
instance.key_type = metadata["key_type"]
instance.cert_expiry = metadata["cert_expiry"]
instance.cert_subject = metadata["cert_subject"]
instance.fingerprint_sha256 = metadata["fingerprint_sha256"]
instance.fingerprint_sha1 = metadata["fingerprint_sha1"]
# Generate kid if not set, or regenerate if key_data has changed
# Preserve existing kid (MD5 or SHA512) if it matches the current key_data
if instance.key_data:
new_kid = generate_key_id(instance.key_data)
legacy_kid = generate_key_id_legacy(instance.key_data)
if instance.kid not in (new_kid, legacy_kid):
instance.kid = new_kid

View File

@@ -9,6 +9,7 @@ from cryptography.x509.extensions import SubjectAlternativeName
from cryptography.x509.general_name import DNSName
from django.urls import reverse
from django.utils.timezone import now
from guardian.shortcuts import assign_perm
from rest_framework.test import APITestCase
from authentik.core.api.used_by import DeleteAction
@@ -20,7 +21,7 @@ from authentik.core.tests.utils import (
)
from authentik.crypto.api import CertificateKeyPairSerializer
from authentik.crypto.builder import CertificateBuilder
from authentik.crypto.models import CertificateKeyPair, generate_key_id, generate_key_id_legacy
from authentik.crypto.models import CertificateKeyPair
from authentik.crypto.tasks import MANAGED_DISCOVERED, certificate_discovery
from authentik.lib.config import CONFIG
from authentik.lib.generators import generate_id, generate_key
@@ -173,31 +174,28 @@ class TestCrypto(APITestCase):
self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1)
self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)
def test_list_always_includes_details(self):
"""Test API List always includes certificate details"""
def test_list_without_details(self):
"""Test API List (no details)"""
cert = create_test_cert()
self.client.force_login(create_test_admin_user())
response = self.client.get(
reverse(
"authentik_api:certificatekeypair-list",
),
data={"name": cert.name},
data={"name": cert.name, "include_details": False},
)
self.assertEqual(response.status_code, 200)
body = loads(response.content.decode())
api_cert = [x for x in body["results"] if x["name"] == cert.name][0]
# All details should now always be included
self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1)
self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)
self.assertIsNotNone(api_cert["cert_expiry"])
self.assertIsNotNone(api_cert["cert_subject"])
self.assertEqual(api_cert["fingerprint_sha1"], None)
self.assertEqual(api_cert["fingerprint_sha256"], None)
def test_certificate_download(self):
"""Test certificate export (download)"""
keypair = create_test_cert()
user = create_test_user()
user.assign_perms_to_managed_role("view_certificatekeypair", keypair)
user.assign_perms_to_managed_role("view_certificatekeypair_certificate", keypair)
assign_perm("view_certificatekeypair", user, keypair)
assign_perm("view_certificatekeypair_certificate", user, keypair)
self.client.force_login(user)
response = self.client.get(
reverse(
@@ -220,8 +218,8 @@ class TestCrypto(APITestCase):
"""Test private_key export (download)"""
keypair = create_test_cert()
user = create_test_user()
user.assign_perms_to_managed_role("view_certificatekeypair", keypair)
user.assign_perms_to_managed_role("view_certificatekeypair_key", keypair)
assign_perm("view_certificatekeypair", user, keypair)
assign_perm("view_certificatekeypair_key", user, keypair)
self.client.force_login(user)
response = self.client.get(
reverse(
@@ -429,114 +427,3 @@ class TestCrypto(APITestCase):
self.assertEqual(
1, final_count, "Should not create duplicate cert for same private key"
)
def test_metadata_extraction_with_cert_and_key(self):
"""Test that metadata is extracted when creating keypair with certificate and key"""
cert = create_test_cert()
# Verify all metadata fields are populated
self.assertIsNotNone(cert.key_type)
self.assertIsNotNone(cert.cert_expiry)
self.assertIsNotNone(cert.cert_subject)
self.assertIsNotNone(cert.fingerprint_sha256)
self.assertIsNotNone(cert.fingerprint_sha1)
# Verify kid is generated using SHA512 for new records
self.assertIsNotNone(cert.kid)
self.assertEqual(cert.kid, generate_key_id(cert.key_data))
def test_metadata_extraction_without_key(self):
"""Test that metadata is extracted when creating keypair without private key"""
builder = CertificateBuilder(generate_id())
builder.build(subject_alt_names=[], validity_days=3)
# Create keypair with only certificate, no key
cert = CertificateKeyPair.objects.create(
name=generate_id(),
certificate_data=builder.certificate,
key_data="",
)
# Verify certificate metadata fields are populated
self.assertIsNotNone(cert.key_type)
self.assertIsNotNone(cert.cert_expiry)
self.assertIsNotNone(cert.cert_subject)
self.assertIsNotNone(cert.fingerprint_sha256)
self.assertIsNotNone(cert.fingerprint_sha1)
# Verify kid is empty when no key_data
self.assertEqual(cert.kid, None)
def test_metadata_extraction_invalid_cert(self):
"""Test that invalid certificate data doesn't crash, just skips metadata"""
cert = CertificateKeyPair.objects.create(
name=generate_id(),
certificate_data="invalid certificate data",
key_data="",
)
# Verify metadata fields are None for invalid cert
self.assertIsNone(cert.key_type)
self.assertIsNone(cert.cert_expiry)
self.assertIsNone(cert.cert_subject)
self.assertIsNone(cert.fingerprint_sha256)
self.assertIsNone(cert.fingerprint_sha1)
self.assertIsNone(cert.kid)
def test_kid_legacy_preservation(self):
"""Test that legacy MD5 kid is preserved when key_data hasn't changed"""
cert = create_test_cert()
# Simulate a legacy MD5 kid (as if backfilled from old system)
legacy_kid = generate_key_id_legacy(cert.key_data)
CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid)
cert.refresh_from_db()
self.assertEqual(cert.kid, legacy_kid)
# Save the cert again (e.g., name change) - kid should be preserved
cert.name = generate_id()
cert.save()
cert.refresh_from_db()
self.assertEqual(cert.kid, legacy_kid)
def test_kid_regenerated_on_key_change(self):
"""Test that kid is regenerated when key_data changes"""
cert = create_test_cert()
original_kid = cert.kid
# Generate a new key and update the keypair
builder = CertificateBuilder(generate_id())
builder.build(subject_alt_names=[], validity_days=3)
cert.key_data = builder.private_key
cert.certificate_data = builder.certificate
cert.save()
cert.refresh_from_db()
# Kid should be regenerated for the new key
self.assertNotEqual(cert.kid, original_kid)
self.assertEqual(cert.kid, generate_key_id(cert.key_data))
def test_kid_regenerated_on_key_change_from_legacy(self):
"""Test that kid is regenerated from legacy MD5 when key_data changes"""
cert = create_test_cert()
# Simulate a legacy MD5 kid
legacy_kid = generate_key_id_legacy(cert.key_data)
CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid)
cert.refresh_from_db()
self.assertEqual(cert.kid, legacy_kid)
# Generate a new key and update the keypair
builder = CertificateBuilder(generate_id())
builder.build(subject_alt_names=[], validity_days=3)
cert.key_data = builder.private_key
cert.certificate_data = builder.certificate
cert.save()
cert.refresh_from_db()
# Kid should now be SHA512 for the new key
self.assertNotEqual(cert.kid, legacy_kid)
self.assertEqual(cert.kid, generate_key_id(cert.key_data))

View File

@@ -20,7 +20,6 @@ class DeviceUserBindingSerializer(PolicyBindingSerializer):
class DeviceUserBindingViewSet(PolicyBindingViewSet):
"""PolicyBinding Viewset"""
serializer_class = DeviceUserBindingSerializer
queryset = (
DeviceUserBinding.objects.all()
.select_related("target", "group", "user")

View File

@@ -5,7 +5,10 @@ from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiResponse, extend_schema
from rest_framework.decorators import action
from rest_framework.exceptions import PermissionDenied, ValidationError
from rest_framework.fields import ChoiceField
from rest_framework.fields import (
CharField,
ChoiceField,
)
from rest_framework.relations import PrimaryKeyRelatedField
from rest_framework.request import Request
from rest_framework.response import Response
@@ -23,7 +26,6 @@ from authentik.endpoints.connectors.agent.auth import (
AgentAuth,
AgentEnrollmentAuth,
)
from authentik.endpoints.connectors.agent.controller import MDMConfigResponseSerializer
from authentik.endpoints.connectors.agent.models import (
AgentConnector,
AgentDeviceConnection,
@@ -72,6 +74,11 @@ class MDMConfigSerializer(PassiveSerializer):
return token
class MDMConfigResponseSerializer(PassiveSerializer):
config = CharField(required=True)
class AgentConnectorViewSet(
ConditionalInheritance(
"authentik.enterprise.endpoints.connectors.agent.api.connectors.AgentConnectorViewSetMixin"
@@ -101,7 +108,7 @@ class AgentConnectorViewSet(
raise PermissionDenied()
ctrl = connector.controller(connector)
payload = ctrl.generate_mdm_config(data.validated_data["platform"], request, token)
return Response(payload.validated_data)
return Response({"config": payload})
@extend_schema(
request=EnrollSerializer(),

View File

@@ -4,9 +4,7 @@ from xml.etree.ElementTree import Element, SubElement, tostring # nosec
from django.http import HttpRequest
from django.urls import reverse
from rest_framework.fields import CharField
from authentik.core.api.utils import PassiveSerializer
from authentik.endpoints.connectors.agent.models import AgentConnector, EnrollmentToken
from authentik.endpoints.controller import BaseController
from authentik.endpoints.facts import OSFamily
@@ -35,13 +33,6 @@ def csp_create_replace_item(loc_uri, data_value) -> Element:
return replace
class MDMConfigResponseSerializer(PassiveSerializer):
config = CharField(required=True)
mime_type = CharField(required=True)
filename = CharField(required=True)
class AgentConnectorController(BaseController[AgentConnector]):
def supported_enrollment_methods(self):
@@ -49,20 +40,14 @@ class AgentConnectorController(BaseController[AgentConnector]):
def generate_mdm_config(
self, target_platform: OSFamily, request: HttpRequest, token: EnrollmentToken
) -> MDMConfigResponseSerializer:
response = None
) -> str:
if target_platform == OSFamily.windows:
response = self._generate_mdm_config_windows(request, token)
return self._generate_mdm_config_windows(request, token)
if target_platform in [OSFamily.iOS, OSFamily.macOS]:
response = self._generate_mdm_config_macos(request, token)
if not response:
raise ValueError(f"Unsupported platform for MDM Configuration: {target_platform}")
response.is_valid(raise_exception=True)
return response
return self._generate_mdm_config_macos(request, token)
raise ValueError(f"Unsupported platform for MDM Configuration: {target_platform}")
def _generate_mdm_config_windows(
self, request: HttpRequest, token: EnrollmentToken
) -> MDMConfigResponseSerializer:
def _generate_mdm_config_windows(self, request: HttpRequest, token: EnrollmentToken) -> str:
base_uri = (
"./Vendor/MSFT/Registry/HKLM/SOFTWARE/authentik Security Inc./Platform/ManagedConfig"
)
@@ -76,17 +61,9 @@ class AgentConnectorController(BaseController[AgentConnector]):
)
payload = tostring(token_item, encoding="unicode") + tostring(url_item, encoding="unicode")
return MDMConfigResponseSerializer(
data={
"config": payload,
"mime_type": "application/xml",
"filename": f"{self.connector.name}_config.csp.xml",
}
)
return payload
def _generate_mdm_config_macos(
self, request: HttpRequest, token: EnrollmentToken
) -> MDMConfigResponseSerializer:
def _generate_mdm_config_macos(self, request: HttpRequest, token: EnrollmentToken) -> str:
token_uuid = str(token.pk).upper()
payload = dumps(
{
@@ -153,10 +130,4 @@ class AgentConnectorController(BaseController[AgentConnector]):
},
fmt=PlistFormat.FMT_XML,
).decode()
return MDMConfigResponseSerializer(
data={
"config": payload,
"mime_type": "application/xml",
"filename": f"{self.connector.name}_config.mobileconfig",
}
)
return payload

View File

@@ -23,8 +23,8 @@ class TestAgentConnector(APITestCase):
res = self.connector.controller(self.connector).generate_mdm_config(
OSFamily.macOS, request, self.token
)
self.assertIsNotNone(res.validated_data)
data = loads(res.validated_data["config"], fmt=PlistFormat.FMT_XML)
self.assertIsNotNone(res)
data = loads(res, fmt=PlistFormat.FMT_XML)
self.assertEqual(data["PayloadContent"][0]["RegistrationToken"], self.token.key)
self.assertEqual(data["PayloadContent"][0]["URL"], "http://testserver/")
@@ -33,8 +33,7 @@ class TestAgentConnector(APITestCase):
res = self.connector.controller(self.connector).generate_mdm_config(
OSFamily.windows, request, self.token
)
self.assertIsNotNone(res.validated_data)
config = res.validated_data["config"]
fromstring(f"<root>{config}</root>")
self.assertIn(self.token.key, config)
self.assertIn("http://testserver/", config)
self.assertIsNotNone(res)
fromstring(f"<root>{res}</root>")
self.assertIn(self.token.key, res)
self.assertIn("http://testserver/", res)

View File

@@ -175,7 +175,7 @@ class Connector(ScheduledModel, SerializerModel):
]
class DeviceAccessGroup(SerializerModel, PolicyBindingModel):
class DeviceAccessGroup(PolicyBindingModel):
name = models.TextField(unique=True)

View File

@@ -16,7 +16,6 @@ from authentik.endpoints.connectors.agent.models import (
EnrollmentToken,
)
from authentik.endpoints.models import Device
from authentik.events.models import Event, EventAction
from authentik.lib.generators import generate_id
from authentik.providers.oauth2.models import JWTAlgorithms
@@ -107,9 +106,3 @@ class TestAppleToken(TestCase):
)
self.assertEqual(res.status_code, 200)
event = Event.objects.filter(
action=EventAction.LOGIN,
app="authentik.endpoints.connectors.agent",
).first()
self.assertIsNotNone(event)
self.assertEqual(event.context["device"]["name"], self.device.name)

View File

@@ -24,7 +24,6 @@ from authentik.endpoints.connectors.agent.models import (
from authentik.enterprise.endpoints.connectors.agent.http import JWEResponse
from authentik.events.models import Event, EventAction
from authentik.events.signals import SESSION_LOGIN_EVENT
from authentik.flows.planner import PLAN_CONTEXT_DEVICE
from authentik.lib.utils.time import timedelta_from_string
from authentik.providers.oauth2.constants import TOKEN_TYPE
from authentik.providers.oauth2.id_token import IDToken
@@ -126,13 +125,7 @@ class TokenView(View):
return device_user, decoded
def create_auth_session(self, user: User):
event = Event.new(
EventAction.LOGIN,
app="authentik.endpoints.connectors.agent",
**{
PLAN_CONTEXT_DEVICE: self.device_connection.device,
},
).from_http(self.request, user=user)
event = Event.new(EventAction.LOGIN).from_http(self.request, user=user)
store = SessionStore()
store[SESSION_LOGIN_EVENT] = event
store.save()

View File

@@ -60,8 +60,6 @@ class AgentInteractiveAuth(EnterprisePolicyAccessView):
device_token_hash, sha256(self.auth_token.device_token.key.encode()).hexdigest()
):
return HttpResponseBadRequest("Invalid device token")
if not self.connector.authorization_flow:
return HttpResponseBadRequest("No authorization flow configured")
planner = FlowPlanner(self.connector.authorization_flow)
planner.allow_empty_flows = True

View File

@@ -3,6 +3,7 @@ from hashlib import sha256
from django.db.models import Model
from django.db.models.signals import post_delete, post_save, pre_delete
from django.dispatch import receiver
from guardian.shortcuts import assign_perm
from authentik.core.models import (
USER_PATH_SYSTEM_PREFIX,
@@ -43,7 +44,7 @@ def ssf_providers_post_save(sender: type[Model], instance: SSFProvider, created:
"path": USER_PATH_PROVIDERS_SSF,
},
)
user.assign_perms_to_managed_role("add_stream", instance)
assign_perm("add_stream", user, instance)
token, token_created = Token.objects.update_or_create(
identifier=identifier,
defaults={

View File

@@ -1,129 +0,0 @@
from django.contrib.contenttypes.models import ContentType
from django.db.models import QuerySet
from django.urls import reverse
from drf_spectacular.utils import extend_schema
from rest_framework import mixins
from rest_framework.decorators import action
from rest_framework.fields import CharField
from rest_framework.permissions import BasePermission
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
from authentik.core.api.utils import ModelSerializer
from authentik.core.models import User
from authentik.enterprise.api import EnterpriseRequiredMixin
from authentik.enterprise.reports.models import DataExport
from authentik.enterprise.reports.tasks import generate_export
from authentik.rbac.permissions import HasPermission
class RequestedBySerializer(ModelSerializer):
class Meta:
model = User
fields = ("pk", "username")
class ContentTypeSerializer(ModelSerializer):
app_label = CharField(read_only=True)
model = CharField(read_only=True)
class Meta:
model = ContentType
fields = ("id", "app_label", "model")
class DataExportSerializer(EnterpriseRequiredMixin, ModelSerializer):
requested_by = RequestedBySerializer(read_only=True)
content_type = ContentTypeSerializer(read_only=True)
class Meta:
model = DataExport
fields = (
"id",
"requested_by",
"requested_on",
"content_type",
"query_params",
"file_url",
"completed",
)
read_only_fields = (
"id",
"requested_by",
"requested_on",
"content_type",
"file_url",
"completed",
)
class DataExportViewSet(
mixins.RetrieveModelMixin, mixins.DestroyModelMixin, mixins.ListModelMixin, GenericViewSet
):
queryset = DataExport.objects.all()
serializer_class = DataExportSerializer
owner_field = "requested_by"
ordering_fields = ["completed", "requested_by", "requested_on", "content_type__model"]
ordering = ["-requested_on"]
search_fields = ["requested_by__username", "content_type__model"]
def get_queryset(self) -> QuerySet[DataExport]:
"""Limit to exports of content types the user has view permission on"""
qs = super().get_queryset()
permitted_cts = []
for ct in ContentType.objects.filter(
id__in=qs.values_list("content_type_id", flat=True).distinct()
):
model = ct.model_class()
if model is None:
continue
perm = f"{ct.app_label}.view_{ct.model}"
if self.request.user.has_perm(perm):
permitted_cts.append(ct)
return qs.filter(content_type__in=permitted_cts)
class ExportMixin:
@extend_schema(
request=None,
parameters=[],
responses={201: DataExportSerializer},
filters=True,
)
@action(
detail=False,
methods=["POST"],
permission_classes=[HasPermission("authentik_reports.add_dataexport")],
)
def export(self: GenericViewSet, request: Request) -> Response:
"""
Create a data export for this data type. Note that the export is generated asynchronously:
this method returns a `DataExport` object that will initially have `completed=false` as well
as the permanent URL to that object in the `Location` header.
You can poll that URL until `completed=true`, at which point the `file_url` property will
contain a URL to download
"""
s = DataExportSerializer(data={"query_params": request.query_params.dict()})
s.is_valid(raise_exception=True)
export = s.save(
requested_by=request.user,
content_type=ContentType.objects.get_for_model(self.queryset.model),
)
generate_export.send(export.id)
set = export.serializer(instance=export)
return Response(
set.data,
status=201,
headers={"Location": reverse("authentik_api:dataexport-detail", args=[export.id])},
)
def get_permissions(self: GenericViewSet) -> list[BasePermission]:
perms = super().get_permissions()
if self.action == "export":
model = self.get_queryset().model
perms.append(HasPermission(f"{model._meta.app_label}.view_{model._meta.model_name}")())
return perms

View File

@@ -1,8 +0,0 @@
from authentik.enterprise.apps import EnterpriseConfig
class ReportsConfig(EnterpriseConfig):
name = "authentik.enterprise.reports"
label = "authentik_reports"
verbose_name = "authentik Enterprise.Reports"
default = True

View File

@@ -1,48 +0,0 @@
# Generated by Django 5.2.8 on 2025-12-02 17:19
import authentik.admin.files.fields
import django.db.models.deletion
import uuid
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("contenttypes", "0002_remove_content_type_name"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="DataExport",
fields=[
("id", models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
("requested_on", models.DateTimeField(auto_now_add=True)),
("query_params", models.JSONField()),
("file", authentik.admin.files.fields.FileField(blank=True)),
("completed", models.BooleanField(default=False)),
(
"content_type",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, to="contenttypes.contenttype"
),
),
(
"requested_by",
models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"verbose_name": "Data Export",
"verbose_name_plural": "Data Exports",
},
),
]

View File

@@ -1,123 +0,0 @@
import csv
import io
from uuid import uuid4
from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.utils.translation import gettext as _
from rest_framework.serializers import Serializer
from rest_framework.viewsets import ModelViewSet
from authentik.admin.files.fields import FileField
from authentik.admin.files.manager import get_file_manager
from authentik.admin.files.usage import FileUsage
from authentik.core.models import User
from authentik.enterprise.reports.utils import MockRequest
from authentik.events.models import Event, EventAction, Notification, NotificationSeverity
from authentik.lib.models import SerializerModel
from authentik.lib.utils.db import chunked_queryset
from authentik.tenants.utils import get_current_tenant
class DataExport(SerializerModel):
id = models.UUIDField(primary_key=True, default=uuid4)
requested_by = models.ForeignKey(User, null=True, on_delete=models.SET_NULL)
requested_on = models.DateTimeField(auto_now_add=True)
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
query_params = models.JSONField()
file = FileField(blank=True)
completed = models.BooleanField(default=False)
class Meta:
verbose_name = _("Data Export")
verbose_name_plural = _("Data Exports")
@property
def serializer(self) -> type[Serializer]:
"""Get serializer for this model"""
from authentik.enterprise.reports.api.reports import DataExportSerializer
return DataExportSerializer
def generate(self) -> None:
if self.completed:
raise AssertionError("Data export must only be generated once")
model_class = self.content_type.model_class()
model_verbose_name = model_class._meta.verbose_name
model_verbose_name_plural = model_class._meta.verbose_name_plural
queryset = chunked_queryset(self.get_queryset())
serializer = self.get_serializer_class()(
context={"request": self._get_request()}, instance=queryset, many=True
)
self.file = f"{model_verbose_name_plural.lower()}_{self.id}.csv"
with get_file_manager(FileUsage.REPORTS).save_file_stream(self.file) as f:
with io.TextIOWrapper(f, encoding="utf-8", newline="") as text:
writer = csv.writer(text)
fields = [field.label for field in serializer.child.fields.values()]
writer.writerow(fields)
for record in queryset:
data = serializer.child.to_representation(record).values()
writer.writerow(data)
self.completed = True
self.save()
message = _(f"{model_verbose_name} export generated successfully")
e = Event.new(
EventAction.EXPORT_READY,
message=message,
export=self,
).set_user(self.requested_by)
e.save()
Notification.objects.create(
event=e,
severity=NotificationSeverity.NOTICE,
body=message,
hyperlink=self.file_url,
hyperlink_label=_("Download"),
user=self.requested_by,
)
@property
def file_url(self) -> str:
return get_file_manager(FileUsage.REPORTS).file_url(self.file)
def _get_request(self) -> MockRequest:
return MockRequest(
user=self.requested_by, query_params=self.query_params, tenant=get_current_tenant()
)
def get_queryset(self) -> models.QuerySet:
request = self._get_request()
viewset = self.get_viewset()
viewset.request = request
queryset = viewset.get_queryset()
queryset = viewset.filter_queryset(queryset)
return queryset
def get_viewset(self) -> ModelViewSet:
from authentik.core.api.users import UserViewSet
from authentik.events.api.events import EventViewSet
model = (self.content_type.app_label, self.content_type.model)
if model == ("authentik_core", "user"):
return UserViewSet()
elif model == ("authentik_events", "event"):
return EventViewSet()
raise NotImplementedError(f"Unsupported data export type {self.content_type.model}")
def get_serializer_class(self) -> type[Serializer]:
from authentik.enterprise.reports.serializers import (
ExportEventSerializer,
ExportUserSerializer,
)
if self.content_type.model == "user":
return ExportUserSerializer
elif self.content_type.model == "event":
return ExportEventSerializer
return self.get_viewset().get_serializer_class()

View File

@@ -1,32 +0,0 @@
from rest_framework.fields import CharField, IntegerField, SerializerMethodField
from authentik.core.api.users import UserSerializer
from authentik.core.models import User
from authentik.events.api.events import EventSerializer
class ExportUserSerializer(UserSerializer):
"""Serializer for exporting users"""
groups = SerializerMethodField(source="get_groups")
def get_groups(self, instance: User) -> str:
return ",".join([group.name for group in instance.ak_groups.all()])
class Meta(UserSerializer.Meta):
fields = [f for f in UserSerializer.Meta.fields if f != "groups_obj"] + ["groups"]
class ExportEventSerializer(EventSerializer):
"""Serializer for exporting events"""
user_pk = IntegerField(source="user.pk", read_only=True)
username = CharField(source="user.username", read_only=True)
email = CharField(source="user.email", read_only=True)
class Meta(EventSerializer.Meta):
fields = [f for f in EventSerializer.Meta.fields if f != "user"] + [
"user_pk",
"username",
"email",
]

View File

@@ -1,10 +0,0 @@
from django.utils.translation import gettext_lazy as _
from dramatiq import actor
from authentik.enterprise.reports.models import DataExport
@actor(description=_("Generate data export."))
def generate_export(export_id: int):
export = DataExport.objects.get(id=export_id)
export.generate()

View File

@@ -1,53 +0,0 @@
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import User
from authentik.core.tests.utils import create_test_admin_user
from authentik.enterprise.reports.tests.utils import patch_license
from authentik.events.models import Event
@patch_license
class TestExportAPI(APITestCase):
def setUp(self) -> None:
self.user = create_test_admin_user()
self.client.force_login(self.user)
def test_create_user_export(self):
"""Test User export endpoint"""
response = self.client.post(
reverse("authentik_api:user-export"),
)
self.assertEqual(response.status_code, 201)
self.assertEqual(
response.headers["Location"],
reverse("authentik_api:dataexport-detail", kwargs={"pk": response.data["id"]}),
)
self.assertEqual(response.data["requested_by"]["pk"], self.user.pk)
self.assertEqual(response.data["completed"], False)
self.assertEqual(response.data["file_url"], "")
self.assertEqual(response.data["query_params"], {})
self.assertEqual(
response.data["content_type"]["id"],
ContentType.objects.get_for_model(User).id,
)
def test_create_event_export(self):
"""Test Event export endpoint"""
response = self.client.post(
reverse("authentik_api:event-export"),
)
self.assertEqual(response.status_code, 201)
self.assertEqual(
response.headers["Location"],
reverse("authentik_api:dataexport-detail", kwargs={"pk": response.data["id"]}),
)
self.assertEqual(response.data["requested_by"]["pk"], self.user.pk)
self.assertEqual(response.data["completed"], False)
self.assertEqual(response.data["file_url"], "")
self.assertEqual(response.data["query_params"], {})
self.assertEqual(
response.data["content_type"]["id"],
ContentType.objects.get_for_model(Event).id,
)

View File

@@ -1,29 +0,0 @@
from django.contrib.contenttypes.models import ContentType
from django.test.testcases import TestCase
from authentik.core.tests.utils import create_test_user
from authentik.enterprise.reports.models import DataExport
from authentik.enterprise.reports.tests.utils import patch_license
from authentik.events.models import Event, EventAction
@patch_license
class TestEventExport(TestCase):
def setUp(self) -> None:
self.user = create_test_user()
self.user.assign_perms_to_managed_role("authentik_events.view_event")
self.e1 = Event.new(EventAction.LOGIN, user=self.user)
self.e1.save()
self.e2 = Event.new(EventAction.LOGIN_FAILED, user=self.user)
self.e2.save()
def test_type_filter(self):
export = DataExport.objects.create(
content_type=ContentType.objects.get_for_model(Event),
requested_by=self.user,
query_params={"actions": [EventAction.LOGIN]},
)
records = list(export.get_queryset())
self.assertEqual(len(records), 1)
self.assertEqual(records[0], self.e1)

View File

@@ -1,80 +0,0 @@
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.tests.utils import create_test_user
from authentik.enterprise.reports.tests.utils import patch_license
@patch_license
class TestExportPermissions(APITestCase):
def setUp(self) -> None:
self.user = create_test_user()
self.client.force_login(self.user)
def test_export_without_permission(self):
"""Test User export endpoint without permission"""
response = self.client.post(reverse("authentik_api:user-export"))
self.assertEqual(response.status_code, 403)
def test_export_only_user_permission(self):
"""Test User export endpoint with only view_user permission"""
self.user.assign_perms_to_managed_role("authentik_core.view_user")
response = self.client.post(reverse("authentik_api:user-export"))
self.assertEqual(response.status_code, 403)
def test_export_with_permission(self):
"""Test User export endpoint with view_user and add_dataexport permission"""
self.user.assign_perms_to_managed_role("authentik_core.view_user")
self.user.assign_perms_to_managed_role("authentik_reports.add_dataexport")
response = self.client.post(reverse("authentik_api:user-export"))
self.assertEqual(response.status_code, 201)
def test_export_access(self):
"""Test that data export access is restricted to the user who created it"""
self.user.assign_perms_to_managed_role("authentik_core.view_user")
self.user.assign_perms_to_managed_role("authentik_reports.add_dataexport")
response = self.client.post(reverse("authentik_api:user-export"))
self.assertEqual(response.status_code, 201)
export_url = reverse("authentik_api:dataexport-detail", kwargs={"pk": response.data["id"]})
response = self.client.get(export_url)
self.assertEqual(response.status_code, 200)
other_user = create_test_user()
other_user.assign_perms_to_managed_role("authentik_core.view_user")
other_user.assign_perms_to_managed_role("authentik_reports.add_dataexport")
self.client.logout()
self.client.force_login(other_user)
response = self.client.get(export_url)
self.assertEqual(response.status_code, 404)
def test_export_access_no_datatype_permission(self):
"""Test that data export access requires view permission on the data type"""
self.user.assign_perms_to_managed_role("authentik_core.view_user")
self.user.assign_perms_to_managed_role("authentik_reports.add_dataexport")
self.user.assign_perms_to_managed_role("authentik_reports.view_dataexport")
response = self.client.post(reverse("authentik_api:user-export"))
self.assertEqual(response.status_code, 201)
export_url = reverse("authentik_api:dataexport-detail", kwargs={"pk": response.data["id"]})
response = self.client.get(export_url)
self.assertEqual(response.status_code, 200)
self.user.remove_perms_from_managed_role("authentik_core.view_user")
response = self.client.get(export_url)
self.assertEqual(response.status_code, 404)
response = self.client.get(reverse("authentik_api:dataexport-list"))
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data["results"]), 0)
def test_export_access_owner(self):
self.user.assign_perms_to_managed_role("authentik_core.view_user")
self.user.assign_perms_to_managed_role("authentik_reports.add_dataexport")
response = self.client.post(reverse("authentik_api:user-export"))
self.assertEqual(response.status_code, 201)
export_url = reverse("authentik_api:dataexport-detail", kwargs={"pk": response.data["id"]})
response = self.client.get(export_url)
self.assertEqual(response.status_code, 200)
self.user.remove_perms_from_managed_role("authentik_core.view_user")
response = self.client.get(export_url)
self.assertEqual(response.status_code, 404)

View File

@@ -1,48 +0,0 @@
from django.test.testcases import TestCase
from drf_spectacular.generators import SchemaGenerator
from authentik.enterprise.reports.tests.utils import patch_license
@patch_license
class TestSchemaMatch(TestCase):
def setUp(self) -> None:
generator = SchemaGenerator()
self.schema = generator.get_schema(request=None, public=True)
def _index_params_by_name(self, parameters):
result = {}
for p in parameters or []:
if p.get("in") != "query":
continue
schema = p.get("schema", {})
result[p["name"]] = {
"required": p.get("required", False),
"type": schema.get("type"),
"format": schema.get("format"),
"enum": tuple(schema.get("enum", [])),
}
return result
def _find_operation_by_operation_id(self, operation_id):
for path_item in self.schema.get("paths", {}).values():
for operation in path_item.values():
if isinstance(operation, dict) and operation.get("operationId") == operation_id:
return operation
raise AssertionError(f"operationId '{operation_id}' not found in schema")
def _get_op_params(self, operation_id):
operation = self._find_operation_by_operation_id(operation_id)
return self._index_params_by_name(operation.get("parameters", []))
def test_user_export_action_query_params_match_list(self):
list_params = self._get_op_params("core_users_list")
del list_params["include_groups"] # Not applicable for export
del list_params["include_roles"] # Not applicable for export
export_params = self._get_op_params("core_users_export_create")
self.assertDictEqual(list_params, export_params)
def test_event_export_action_query_params_match_list(self):
list_params = self._get_op_params("events_events_list")
export_params = self._get_op_params("events_events_export_create")
self.assertDictEqual(list_params, export_params)

View File

@@ -1,75 +0,0 @@
import csv
from django.contrib.contenttypes.models import ContentType
from django.test.testcases import TestCase
from authentik.admin.files.tests.utils import FileTestFileBackendMixin
from authentik.core.models import User
from authentik.core.tests.utils import create_test_user
from authentik.enterprise.reports.models import DataExport
from authentik.enterprise.reports.tests.utils import patch_license
@patch_license
class TestUserExport(FileTestFileBackendMixin, TestCase):
def setUp(self) -> None:
super().setUp()
self.u1 = create_test_user(username="a")
self.u1.assign_perms_to_managed_role("authentik_core.view_user")
self.u2 = create_test_user(username="b", path="abcd")
self.u1.assign_perms_to_managed_role("authentik_core.view_user")
def _read_export(self, filename):
with open(f"{self.reports_backend_path}/reports/public/{filename}") as f:
reader = csv.DictReader(f)
return list(reader)
def test_generate_user_export(self):
export = DataExport.objects.create(
content_type=ContentType.objects.get_for_model(User),
requested_by=self.u1,
query_params={"email": str(self.u1.email)},
)
export.generate()
self.assertEqual(export.completed, True)
data = self._read_export(export.file)
self.assertEqual(len(data), 1)
self.assertEqual(data[0]["Username"], self.u1.username)
def test_path_filter(self):
export = DataExport.objects.create(
content_type=ContentType.objects.get_for_model(User),
requested_by=self.u1,
query_params={"path": str(self.u2.path)},
)
records = list(export.get_queryset())
self.assertEqual(len(records), 1)
self.assertEqual(records[0], self.u2)
def test_search_filter(self):
export = DataExport.objects.create(
content_type=ContentType.objects.get_for_model(User),
requested_by=self.u1,
query_params={"search": f'username = "{self.u2.username}"'},
)
records = list(export.get_queryset())
self.assertEqual(len(records), 1)
self.assertEqual(records[0], self.u2)
def test_ordering(self):
export = DataExport.objects.create(
content_type=ContentType.objects.get_for_model(User),
requested_by=self.u1,
query_params={"ordering": "-username"},
)
records = list(export.get_queryset())
self.assertGreaterEqual(records[0].username, records[-1].username)
export = DataExport.objects.create(
content_type=ContentType.objects.get_for_model(User),
requested_by=self.u1,
query_params={"ordering": "username"},
)
records = list(export.get_queryset())
self.assertLess(records[0].username, records[-1].username)

View File

@@ -1,6 +0,0 @@
from unittest.mock import MagicMock, patch
patch_license = patch(
"authentik.enterprise.models.LicenseUsageStatus.is_valid",
MagicMock(return_value=True),
)

View File

@@ -1,7 +0,0 @@
"""API URLs"""
from authentik.enterprise.reports.api.reports import DataExportViewSet
api_urlpatterns = [
("reports/exports", DataExportViewSet),
]

View File

@@ -1,11 +0,0 @@
from dataclasses import dataclass
from authentik.core.models import User
from authentik.tenants.models import Tenant
@dataclass
class MockRequest:
user: User
query_params: dict[str, str]
tenant: Tenant

View File

@@ -0,0 +1,128 @@
"""DjangoQL search"""
from collections import OrderedDict, defaultdict
from collections.abc import Generator
from django.db import connection
from django.db.models import Model, Q
from djangoql.compat import text_type
from djangoql.schema import StrField
class JSONSearchField(StrField):
"""JSON field for DjangoQL"""
model: Model
def __init__(self, model=None, name=None, nullable=None, suggest_nested=True):
# Set this in the constructor to not clobber the type variable
self.type = "relation"
self.suggest_nested = suggest_nested
super().__init__(model, name, nullable)
def get_lookup(self, path, operator, value):
search = "__".join(path)
op, invert = self.get_operator(operator)
q = Q(**{f"{search}{op}": self.get_lookup_value(value)})
return ~q if invert else q
def json_field_keys(self) -> Generator[tuple[str]]:
with connection.cursor() as cursor:
cursor.execute(
f"""
WITH RECURSIVE "{self.name}_keys" AS (
SELECT
ARRAY[jsonb_object_keys("{self.name}")] AS key_path_array,
"{self.name}" -> jsonb_object_keys("{self.name}") AS value
FROM {self.model._meta.db_table}
WHERE "{self.name}" IS NOT NULL
AND jsonb_typeof("{self.name}") = 'object'
UNION ALL
SELECT
ck.key_path_array || jsonb_object_keys(ck.value),
ck.value -> jsonb_object_keys(ck.value) AS value
FROM "{self.name}_keys" ck
WHERE jsonb_typeof(ck.value) = 'object'
),
unique_paths AS (
SELECT DISTINCT key_path_array
FROM "{self.name}_keys"
)
SELECT key_path_array FROM unique_paths;
""" # nosec
)
return (x[0] for x in cursor.fetchall())
def get_nested_options(self) -> OrderedDict:
"""Get keys of all nested objects to show autocomplete"""
if not self.suggest_nested:
return OrderedDict()
base_model_name = f"{self.model._meta.app_label}.{self.model._meta.model_name}_{self.name}"
def recursive_function(parts: list[str], parent_parts: list[str] | None = None):
if not parent_parts:
parent_parts = []
path = parts.pop(0)
parent_parts.append(path)
relation_key = "_".join(parent_parts)
if len(parts) > 1:
out_dict = {
relation_key: {
parts[0]: {
"type": "relation",
"relation": f"{relation_key}_{parts[0]}",
}
}
}
child_paths = recursive_function(parts.copy(), parent_parts.copy())
child_paths.update(out_dict)
return child_paths
else:
return {relation_key: {parts[0]: {}}}
relation_structure = defaultdict(dict)
for relations in self.json_field_keys():
result = recursive_function([base_model_name] + relations)
for relation_key, value in result.items():
for sub_relation_key, sub_value in value.items():
if not relation_structure[relation_key].get(sub_relation_key, None):
relation_structure[relation_key][sub_relation_key] = sub_value
else:
relation_structure[relation_key][sub_relation_key].update(sub_value)
final_dict = defaultdict(dict)
for key, value in relation_structure.items():
for sub_key, sub_value in value.items():
if not sub_value:
final_dict[key][sub_key] = {
"type": "str",
"nullable": True,
}
else:
final_dict[key][sub_key] = sub_value
return OrderedDict(final_dict)
def relation(self) -> str:
return f"{self.model._meta.app_label}.{self.model._meta.model_name}_{self.name}"
class ChoiceSearchField(StrField):
def __init__(self, model=None, name=None, nullable=None):
super().__init__(model, name, nullable, suggest_options=True)
def get_options(self, search):
result = []
choices = self._field_choices()
if choices:
search = search.lower()
for c in choices:
choice = text_type(c[0])
if search in choice.lower():
result.append(choice)
return result

View File

@@ -1,15 +1,18 @@
"""QL search"""
"""DjangoQL search"""
from akql.exceptions import AKQLError
from akql.queryset import apply_search
from akql.schema import AKQLSchema
from django.apps import apps
from django.db.models import QuerySet
from djangoql.ast import Name
from djangoql.exceptions import DjangoQLError
from djangoql.queryset import apply_search
from djangoql.schema import DjangoQLSchema
from drf_spectacular.plumbing import ResolvedComponent, build_object_type
from rest_framework.filters import SearchFilter
from rest_framework.request import Request
from structlog.stdlib import get_logger
from authentik.enterprise.search.fields import JSONSearchField
LOGGER = get_logger()
AUTOCOMPLETE_SCHEMA = ResolvedComponent(
name="Autocomplete",
@@ -19,8 +22,27 @@ AUTOCOMPLETE_SCHEMA = ResolvedComponent(
)
class BaseSchema(DjangoQLSchema):
"""Base Schema which deals with JSON Fields"""
def resolve_name(self, name: Name):
model = self.model_label(self.current_model)
root_field = name.parts[0]
field = self.models[model].get(root_field)
# If the query goes into a JSON field, return the root
# field as the JSON field will do the rest
if isinstance(field, JSONSearchField):
# This is a workaround; build_filter will remove the right-most
# entry in the path as that is intended to be the same as the field
# however for JSON that is not the case
if name.parts[-1] != root_field:
name.parts.append(root_field)
return field
return super().resolve_name(name)
class QLSearch(SearchFilter):
"""rest_framework search filter which uses AKQL"""
"""rest_framework search filter which uses DjangoQL"""
def __init__(self):
super().__init__()
@@ -37,30 +59,24 @@ class QLSearch(SearchFilter):
params = params.replace("\x00", "") # strip null characters
return params
def get_schema(self, request: Request, view) -> AKQLSchema:
def get_schema(self, request: Request, view) -> BaseSchema:
ql_fields = []
if hasattr(view, "get_ql_fields"):
ql_fields = view.get_ql_fields()
class InlineSchema(AKQLSchema):
class InlineSchema(BaseSchema):
def get_fields(self, model):
return ql_fields or []
return InlineSchema
def get_search_context(self, request: Request):
return {
"$ak_user": request.user.pk,
}
def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet:
search_query = self.get_search_terms(request)
schema = self.get_schema(request, view)
if len(search_query) == 0 or not self.enabled:
return self._fallback.filter_queryset(request, queryset, view)
context = self.get_search_context(request)
try:
return apply_search(queryset, search_query, context=context, schema=schema)
except AKQLError as exc:
return apply_search(queryset, search_query, schema=schema)
except DjangoQLError as exc:
LOGGER.debug("Failed to parse search expression", exc=exc)
return self._fallback.filter_queryset(request, queryset, view)

View File

@@ -1,11 +1,11 @@
from akql.schema import JSONSearchField
from akql.serializers import AKQLSchemaSerializer
from djangoql.serializers import DjangoQLSchemaSerializer
from drf_spectacular.generators import SchemaGenerator
from authentik.enterprise.search.fields import JSONSearchField
from authentik.enterprise.search.ql import AUTOCOMPLETE_SCHEMA
class AKQLSchemaSerializer(AKQLSchemaSerializer):
class AKQLSchemaSerializer(DjangoQLSchemaSerializer):
def serialize(self, schema):
serialization = super().serialize(schema)
for _, fields in schema.models.items():
@@ -15,6 +15,12 @@ class AKQLSchemaSerializer(AKQLSchemaSerializer):
serialization["models"].update(field.get_nested_options())
return serialization
def serialize_field(self, field):
result = super().serialize_field(field)
if isinstance(field, JSONSearchField):
result["relation"] = field.relation()
return result
def postprocess_schema_search_autocomplete(result, generator: SchemaGenerator, **kwargs):
generator.registry.register_on_missing(AUTOCOMPLETE_SCHEMA)

View File

@@ -9,7 +9,6 @@ TENANT_APPS = [
"authentik.enterprise.providers.radius",
"authentik.enterprise.providers.scim",
"authentik.enterprise.providers.ssf",
"authentik.enterprise.reports",
"authentik.enterprise.search",
"authentik.enterprise.stages.authenticator_endpoint_gdtc",
"authentik.enterprise.stages.mtls",

View File

@@ -192,7 +192,6 @@ class MTLSStageView(ChallengeStageView):
self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
{"certificate": self._cert_to_dict(cert)}
)
self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
def enroll_prepare_user(self, cert: Certificate):
self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {})

View File

@@ -3,6 +3,7 @@ from unittest.mock import MagicMock, patch
from urllib.parse import quote_plus
from django.urls import reverse
from guardian.shortcuts import assign_perm
from authentik.core.models import User
from authentik.core.tests.utils import (
@@ -91,7 +92,7 @@ class MTLSStageTests(FlowTestCase):
def test_parse_outpost_object(self):
"""Test outposts's format"""
outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
outpost.user.assign_perms_to_managed_role("pass_outpost_certificate", self.stage)
assign_perm("pass_outpost_certificate", outpost.user, self.stage)
with patch(
"authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
MagicMock(return_value=outpost.user),
@@ -108,7 +109,7 @@ class MTLSStageTests(FlowTestCase):
def test_parse_outpost_global(self):
"""Test outposts's format"""
outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate")
assign_perm("authentik_stages_mtls.pass_outpost_certificate", outpost.user)
with patch(
"authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
MagicMock(return_value=outpost.user),

View File

@@ -21,7 +21,6 @@ from rest_framework.viewsets import ModelViewSet
from authentik.core.api.object_types import TypeCreateSerializer
from authentik.core.api.utils import ModelSerializer, PassiveSerializer
from authentik.events.models import Event, EventAction
from authentik.lib.utils.reflection import ConditionalInheritance
class EventVolumeSerializer(PassiveSerializer):
@@ -117,9 +116,7 @@ class EventsFilter(django_filters.FilterSet):
fields = ["action", "client_ip", "username"]
class EventViewSet(
ConditionalInheritance("authentik.enterprise.reports.api.reports.ExportMixin"), ModelViewSet
):
class EventViewSet(ModelViewSet):
"""Event Read-Only Viewset"""
queryset = Event.objects.all()
@@ -136,7 +133,9 @@ class EventViewSet(
filterset_class = EventsFilter
def get_ql_fields(self):
from akql.schema import ChoiceSearchField, DateTimeField, JSONSearchField, StrField
from djangoql.schema import DateTimeField, StrField
from authentik.enterprise.search.fields import ChoiceSearchField, JSONSearchField
return [
ChoiceSearchField(Event, "action"),

View File

@@ -29,8 +29,6 @@ class NotificationSerializer(ModelSerializer):
"pk",
"severity",
"body",
"hyperlink",
"hyperlink_label",
"created",
"event",
"seen",

View File

@@ -1,59 +0,0 @@
# Generated by Django 5.2.9 on 2025-12-05 10:24
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_events", "0013_delete_systemtask"),
]
operations = [
migrations.AddField(
model_name="notification",
name="hyperlink",
field=models.TextField(blank=True, max_length=4096, null=True),
),
migrations.AddField(
model_name="notification",
name="hyperlink_label",
field=models.TextField(blank=True, null=True),
),
migrations.AlterField(
model_name="event",
name="action",
field=models.TextField(
choices=[
("login", "Login"),
("login_failed", "Login Failed"),
("logout", "Logout"),
("user_write", "User Write"),
("suspicious_request", "Suspicious Request"),
("password_set", "Password Set"),
("secret_view", "Secret View"),
("secret_rotate", "Secret Rotate"),
("invitation_used", "Invite Used"),
("authorize_application", "Authorize Application"),
("source_linked", "Source Linked"),
("impersonation_started", "Impersonation Started"),
("impersonation_ended", "Impersonation Ended"),
("flow_execution", "Flow Execution"),
("policy_execution", "Policy Execution"),
("policy_exception", "Policy Exception"),
("property_mapping_exception", "Property Mapping Exception"),
("system_task_execution", "System Task Execution"),
("system_task_exception", "System Task Exception"),
("system_exception", "System Exception"),
("configuration_error", "Configuration Error"),
("model_created", "Model Created"),
("model_updated", "Model Updated"),
("model_deleted", "Model Deleted"),
("email_sent", "Email Sent"),
("update_available", "Update Available"),
("export_ready", "Export Ready"),
("custom_", "Custom Prefix"),
]
),
),
]

View File

@@ -117,8 +117,6 @@ class EventAction(models.TextChoices):
UPDATE_AVAILABLE = "update_available"
EXPORT_READY = "export_ready"
CUSTOM_PREFIX = "custom_"
@@ -263,14 +261,6 @@ class Event(SerializerModel, ExpiringModel):
return self.context["message"]
return f"{self.action}: {self.context}"
@property
def hyperlink(self) -> str | None:
return self.context.get("hyperlink")
@property
def hyperlink_label(self) -> str | None:
return self.context.get("hyperlink_label")
def __str__(self) -> str:
return f"Event action={self.action} user={self.user} context={self.context}"
@@ -489,11 +479,6 @@ class NotificationTransport(TasksModel, SerializerModel):
context["key_value"]["event_user_username"] = notification.event.user.get(
"username", None
)
if notification.hyperlink:
context["link"] = {
"target": notification.hyperlink,
"label": notification.hyperlink_label,
}
if notification.event:
context["title"] += notification.event.action
for key, value in notification.event.context.items():
@@ -547,8 +532,6 @@ class Notification(SerializerModel):
uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
severity = models.TextField(choices=NotificationSeverity.choices)
body = models.TextField()
hyperlink = models.TextField(blank=True, null=True, max_length=4096)
hyperlink_label = models.TextField(blank=True, null=True)
created = models.DateTimeField(auto_now_add=True)
event = models.ForeignKey(Event, on_delete=models.SET_NULL, null=True, blank=True)
seen = models.BooleanField(default=False)

View File

@@ -110,12 +110,7 @@ def notification_transport(transport_pk: int, event_pk: str, user_pk: int, trigg
if not trigger:
return
notification = Notification(
severity=trigger.severity,
body=event.summary,
event=event,
user=user,
hyperlink=event.hyperlink,
hyperlink_label=event.hyperlink_label,
severity=trigger.severity, body=event.summary, event=event, user=user
)
transport: NotificationTransport = NotificationTransport.objects.filter(pk=transport_pk).first()
if not transport:

View File

@@ -20,7 +20,7 @@ from django.utils import timezone
from django.views.debug import SafeExceptionReporterFilter
from geoip2.models import ASN, City
from guardian.conf import settings
from guardian.shortcuts import get_anonymous_user
from guardian.utils import get_anonymous_user
from authentik.blueprints.v1.common import YAMLTag
from authentik.core.models import User

Some files were not shown because too many files have changed in this diff Show More