root: init rust worker

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
Marc 'risson' Schmitt
2026-04-02 13:22:10 +02:00
parent 1b53426e2c
commit 8657d74dc9
30 changed files with 1397 additions and 502 deletions

View File

@@ -1,5 +1,5 @@
[alias]
t = ["nextest", "run"]
t = ["nextest", "run", "--workspace"]
[build]
rustflags = ["--cfg", "tokio_unstable"]

View File

@@ -1,5 +1,6 @@
[licenses]
allow = [
"Apache-2.0 WITH LLVM-exception",
"Apache-2.0",
"BSD-3-Clause",
"CC0-1.0",

293
Cargo.lock generated
View File

@@ -17,6 +17,18 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "ahash"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "aho-corasick"
version = "1.1.4"
@@ -106,6 +118,37 @@ dependencies = [
"rustversion",
]
[[package]]
name = "argh"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "211818e820cda9ca6f167a64a5c808837366a6dfd807157c64c1304c486cd033"
dependencies = [
"argh_derive",
"argh_shared",
]
[[package]]
name = "argh_derive"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c442a9d18cef5dde467405d27d461d080d68972d6d0dfd0408265b6749ec427d"
dependencies = [
"argh_shared",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "argh_shared"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ade012bac4db278517a0132c8c10c6427025868dca16c801087c28d5a411f1"
dependencies = [
"serde",
]
[[package]]
name = "arraydeque"
version = "0.5.1"
@@ -138,6 +181,29 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "authentik"
version = "2026.5.0-rc1"
dependencies = [
"arc-swap",
"argh",
"authentik-axum",
"authentik-common",
"axum",
"color-eyre",
"eyre",
"hyper-unix-socket",
"hyper-util",
"metrics",
"metrics-exporter-prometheus",
"nix 0.31.2",
"pyo3",
"sqlx",
"tokio",
"tracing",
"uuid",
]
[[package]]
name = "authentik-axum"
version = "2026.5.0-rc1"
@@ -567,6 +633,33 @@ dependencies = [
"cc",
]
[[package]]
name = "color-eyre"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5920befb47832a6d61ee3a3a846565cfa39b331331e68a3b1d1116630f2f26d"
dependencies = [
"backtrace",
"color-spantrace",
"eyre",
"indenter",
"once_cell",
"owo-colors",
"tracing-error",
]
[[package]]
name = "color-spantrace"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8b88ea9df13354b55bc7234ebcce36e6ef896aca2e42a15de9e10edce01b427"
dependencies = [
"once_cell",
"owo-colors",
"tracing-core",
"tracing-error",
]
[[package]]
name = "colorchoice"
version = "1.0.5"
@@ -728,6 +821,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.12"
@@ -977,6 +1079,12 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "foldhash"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
[[package]]
name = "form_urlencoded"
version = "1.2.2"
@@ -1209,7 +1317,7 @@ checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash",
"foldhash 0.1.5",
]
[[package]]
@@ -1217,6 +1325,9 @@ name = "hashbrown"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
dependencies = [
"foldhash 0.2.0",
]
[[package]]
name = "hashlink"
@@ -1343,9 +1454,9 @@ checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
[[package]]
name = "hyper"
version = "1.8.1"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11"
checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca"
dependencies = [
"atomic-waker",
"bytes",
@@ -1358,7 +1469,6 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"pin-utils",
"smallvec",
"tokio",
"want",
@@ -1393,6 +1503,20 @@ dependencies = [
"tower-service",
]
[[package]]
name = "hyper-unix-socket"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c255628da188a9d9ee218bae99da33a4b684ed63abe140a94d0f6e4b5af9a090"
dependencies = [
"bytes",
"hyper",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "hyper-util"
version = "0.1.20"
@@ -1850,6 +1974,46 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79"
[[package]]
name = "metrics"
version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8"
dependencies = [
"ahash",
"portable-atomic",
]
[[package]]
name = "metrics-exporter-prometheus"
version = "0.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3589659543c04c7dc5526ec858591015b87cd8746583b51b48ef4353f99dbcda"
dependencies = [
"base64 0.22.1",
"indexmap",
"metrics",
"metrics-util",
"quanta",
"thiserror 2.0.18",
]
[[package]]
name = "metrics-util"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdfb1365fea27e6dd9dc1dbc19f570198bc86914533ad639dae939635f096be4"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
"hashbrown 0.16.1",
"metrics",
"quanta",
"rand 0.9.2",
"rand_xoshiro",
"sketches-ddsketch",
]
[[package]]
name = "mime"
version = "0.3.17"
@@ -2233,6 +2397,12 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "owo-colors"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d211803b9b6b570f68772237e415a029d5a50c65d382910b879fb19d3271f94d"
[[package]]
name = "parking"
version = "2.2.1"
@@ -2309,12 +2479,6 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs1"
version = "0.7.5"
@@ -2348,6 +2512,12 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6"
[[package]]
name = "portable-atomic"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49"
[[package]]
name = "potential_utf"
version = "0.1.4"
@@ -2423,6 +2593,79 @@ dependencies = [
"prost",
]
[[package]]
name = "pyo3"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91fd8e38a3b50ed1167fb981cd6fd60147e091784c427b8f7183a7ee32c31c12"
dependencies = [
"libc",
"once_cell",
"portable-atomic",
"pyo3-build-config",
"pyo3-ffi",
"pyo3-macros",
]
[[package]]
name = "pyo3-build-config"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e368e7ddfdeb98c9bca7f8383be1648fd84ab466bf2bc015e94008db6d35611e"
dependencies = [
"target-lexicon",
]
[[package]]
name = "pyo3-ffi"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f29e10af80b1f7ccaf7f69eace800a03ecd13e883acfacc1e5d0988605f651e"
dependencies = [
"libc",
"pyo3-build-config",
]
[[package]]
name = "pyo3-macros"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df6e520eff47c45997d2fc7dd8214b25dd1310918bbb2642156ef66a67f29813"
dependencies = [
"proc-macro2",
"pyo3-macros-backend",
"quote",
"syn",
]
[[package]]
name = "pyo3-macros-backend"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4cdc218d835738f81c2338f822078af45b4afdf8b2e33cbb5916f108b813acb"
dependencies = [
"heck",
"proc-macro2",
"pyo3-build-config",
"quote",
"syn",
]
[[package]]
name = "quanta"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi",
"web-sys",
"winapi",
]
[[package]]
name = "quinn"
version = "0.11.9"
@@ -2559,6 +2802,24 @@ dependencies = [
"getrandom 0.3.4",
]
[[package]]
name = "rand_xoshiro"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41"
dependencies = [
"rand_core 0.9.5",
]
[[package]]
name = "raw-cpuid"
version = "11.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186"
dependencies = [
"bitflags 2.11.0",
]
[[package]]
name = "redox_syscall"
version = "0.5.18"
@@ -3151,6 +3412,12 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "703d5c7ef118737c72f1af64ad2f6f8c5e1921f818cdcb97b8fe6fc69bf66214"
[[package]]
name = "sketches-ddsketch"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c6f73aeb92d671e0cc4dca167e59b2deb6387c375391bc99ee743f326994a2b"
[[package]]
name = "slab"
version = "0.4.12"
@@ -3476,6 +3743,12 @@ dependencies = [
"libc",
]
[[package]]
name = "target-lexicon"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adb6935a6f5c20170eeceb1a3835a49e12e19d792f6dd344ccc76a985ca5a6ca"
[[package]]
name = "tempfile"
version = "3.27.0"

View File

@@ -20,11 +20,13 @@ publish = false
[workspace.dependencies]
arc-swap = "= 1.9.1"
argh = "= 0.1.19"
axum-server = { version = "= 0.8.0", features = ["tls-rustls-no-provider"] }
aws-lc-rs = { version = "= 1.16.2", features = ["fips"] }
axum = { version = "= 0.8.8", features = ["http2", "macros", "ws"] }
clap = { version = "= 4.6.0", features = ["derive", "env"] }
client-ip = { version = "0.2.1", features = ["forwarded-header"] }
color-eyre = "= 0.6.5"
colored = "= 3.1.1"
config-rs = { package = "config", version = "= 0.15.22", default-features = false, features = [
"json",
@@ -37,11 +39,16 @@ eyre = "= 0.6.12"
forwarded-header-value = "= 0.1.1"
futures = "= 0.3.32"
glob = "= 0.3.3"
hyper-unix-socket = "= 0.3.0"
hyper-util = "= 0.1.20"
ipnet = { version = "= 2.12.0", features = ["serde"] }
json-subscriber = "= 0.2.8"
nix = { version = "= 0.31.2", features = ["signal"] }
metrics = "= 0.24.3"
metrics-exporter-prometheus = { version = "= 0.18.1", default-features = false }
nix = { version = "= 0.31.2", features = ["hostname", "signal"] }
notify = "= 8.2.0"
pin-project-lite = "= 0.2.17"
pyo3 = "= 0.28.3"
regex = "= 1.12.3"
reqwest = { version = "= 0.13.2", features = [
"form",
@@ -106,16 +113,10 @@ tracing-subscriber = { version = "= 0.3.23", features = [
url = "= 2.5.8"
uuid = { version = "= 1.23.0", features = ["serde", "v4"] }
ak-axum = { package = "authentik-axum", version = "2026.5.0-rc1", path = "./packages/ak-axum" }
ak-client = { package = "authentik-client", version = "2026.5.0-rc1", path = "./packages/client-rust" }
ak-common = { package = "authentik-common", version = "2026.5.0-rc1", path = "./packages/ak-common", default-features = false }
[profile.dev.package.backtrace]
opt-level = 3
[profile.release]
lto = true
debug = 2
[workspace.lints.rust]
ambiguous_negative_literals = "warn"
closure_returning_async_block = "warn"
@@ -229,3 +230,54 @@ unused_trait_names = "warn"
unwrap_in_result = "warn"
unwrap_used = "warn"
verbose_file_reads = "warn"
[profile.dev.package.backtrace]
opt-level = 3
[profile.dev]
panic = "abort"
[profile.release]
debug = 2
lto = "fat"
# Because of the async runtime, we want to die straightaway if we panic.
panic = "abort"
strip = true
[package]
name = "authentik"
version.workspace = true
authors.workspace = true
edition.workspace = true
readme.workspace = true
homepage.workspace = true
repository.workspace = true
license-file.workspace = true
publish.workspace = true
[features]
default = ["core", "proxy"]
core = ["ak-common/core", "dep:pyo3", "dep:sqlx"]
proxy = ["ak-common/proxy"]
[dependencies]
ak-axum.workspace = true
ak-common.workspace = true
arc-swap.workspace = true
argh.workspace = true
axum.workspace = true
color-eyre.workspace = true
eyre.workspace = true
hyper-unix-socket.workspace = true
hyper-util.workspace = true
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
nix.workspace = true
pyo3 = { workspace = true, optional = true }
sqlx = { workspace = true, optional = true }
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
[lints]
workspace = true

View File

@@ -115,6 +115,9 @@ run-server: ## Run the main authentik server process
run-worker: ## Run the main authentik worker process
$(UV) run ak worker
run-worker-watch: ## Run the authentik worker, with auto reloading
watchexec --on-busy-update=restart --stop-signal=SIGINT --exts py,rs --no-meta --notify -- $(UV) run ak worker
core-i18n-extract:
$(UV) run ak makemessages \
--add-location file \

View File

@@ -1,7 +1,5 @@
"""Apply blueprint from commandline"""
from sys import exit as sys_exit
from django.core.management.base import BaseCommand, no_translations
from structlog.stdlib import get_logger
@@ -28,7 +26,7 @@ class Command(BaseCommand):
self.stderr.write("Blueprint invalid")
for log in logs:
self.stderr.write(f"\t{log.logger}: {log.event}: {log.attributes}")
sys_exit(1)
raise RuntimeError("Blueprint invalid")
importer.apply()
def add_arguments(self, parser):

View File

@@ -446,8 +446,6 @@ DRAMATIQ = {
("authentik.tasks.middleware.TaskLogMiddleware", {}),
("authentik.tasks.middleware.LoggingMiddleware", {}),
("authentik.tasks.middleware.DescriptionMiddleware", {}),
("authentik.tasks.middleware.WorkerHealthcheckMiddleware", {}),
("authentik.tasks.middleware.WorkerStatusMiddleware", {}),
(
"authentik.tasks.middleware.MetricsMiddleware",
{

View File

@@ -1,4 +1,3 @@
import pglock
from django.utils.timezone import now, timedelta
from drf_spectacular.utils import extend_schema, inline_serializer
from packaging.version import parse
@@ -31,18 +30,13 @@ class WorkerView(APIView):
def get(self, request: Request) -> Response:
response = []
our_version = parse(authentik_full_version())
for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)):
lock_id = f"goauthentik.io/worker/status/{status.pk}"
with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired:
# The worker doesn't hold the lock, it isn't running
if acquired:
continue
version_matching = parse(status.version) == our_version
response.append(
{
"worker_id": f"{status.pk}@{status.hostname}",
"version": status.version,
"version_matching": version_matching,
}
)
for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(seconds=45)):
version_matching = parse(status.version) == our_version
response.append(
{
"worker_id": f"{status.pk}@{status.hostname}",
"version": status.version,
"version_matching": version_matching,
}
)
return Response(response)

View File

@@ -1,42 +1,25 @@
import socket
from collections.abc import Callable
from http.server import BaseHTTPRequestHandler
from threading import Event as TEvent
from threading import Thread, current_thread
from typing import Any, cast
import pglock
from django.db import OperationalError, connections, transaction
from django.utils.timezone import now
from django.conf import settings
from django.db import OperationalError
from django_dramatiq_postgres.middleware import (
CurrentTask as BaseCurrentTask,
)
from django_dramatiq_postgres.middleware import (
HTTPServer,
HTTPServerThread,
)
from django_dramatiq_postgres.middleware import (
MetricsMiddleware as BaseMetricsMiddleware,
)
from django_dramatiq_postgres.middleware import (
_MetricsHandler as BaseMetricsHandler,
)
from dramatiq import Worker
from dramatiq.broker import Broker
from dramatiq.message import Message
from dramatiq.middleware import Middleware
from psycopg.errors import Error
from setproctitle import setthreadtitle
from structlog.stdlib import get_logger
from authentik import authentik_full_version
from authentik.events.models import Event, EventAction
from authentik.lib.config import CONFIG
from authentik.lib.sentry import should_ignore_exception
from authentik.lib.utils.reflection import class_to_path
from authentik.root.monitoring import monitoring_set
from authentik.root.signals import post_startup, pre_startup, startup
from authentik.tasks.models import Task, TaskLog, TaskStatus, WorkerStatus
from authentik.tasks.models import Task, TaskLog, TaskStatus
from authentik.tenants.models import Tenant
from authentik.tenants.utils import get_current_tenant
@@ -193,154 +176,26 @@ class DescriptionMiddleware(Middleware):
return {"description"}
class _healthcheck_handler(BaseHTTPRequestHandler):
def log_request(self, code="-", size="-"):
HEALTHCHECK_LOGGER.info(
self.path,
method=self.command,
status=code,
)
def log_error(self, format, *args):
HEALTHCHECK_LOGGER.warning(format, *args)
def do_HEAD(self):
try:
for db_conn in connections.all():
# Force connection reload
db_conn.connect()
_ = db_conn.cursor()
self.send_response(200)
except DB_ERRORS: # pragma: no cover
self.send_response(503)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", "0")
self.end_headers()
do_GET = do_HEAD
class WorkerHealthcheckMiddleware(Middleware):
thread: HTTPServerThread | None
def __init__(self):
listen = CONFIG.get("listen.http", ["[::]:9000"])
if isinstance(listen, str):
listen = listen.split(",")
host, _, port = listen[0].rpartition(":")
try:
port = int(port)
except ValueError:
LOGGER.error(f"Invalid port entered: {port}")
self.host, self.port = host, port
def after_worker_boot(self, broker: Broker, worker: Worker):
self.thread = HTTPServerThread(
target=WorkerHealthcheckMiddleware.run, args=(self.host, self.port)
)
self.thread.start()
def before_worker_shutdown(self, broker: Broker, worker: Worker):
server = self.thread.server
if server:
server.shutdown()
LOGGER.debug("Stopping WorkerHealthcheckMiddleware")
self.thread.join()
@staticmethod
def run(addr: str, port: int):
setthreadtitle("authentik Worker Healthcheck server")
try:
server = HTTPServer((addr, port), _healthcheck_handler)
thread = cast(HTTPServerThread, current_thread())
thread.server = server
server.serve_forever()
except OSError as exc:
get_logger(__name__, type(WorkerHealthcheckMiddleware)).warning(
"Port is already in use, not starting healthcheck server",
exc=exc,
)
class WorkerStatusMiddleware(Middleware):
thread: Thread | None
thread_event: TEvent | None
def after_worker_boot(self, broker: Broker, worker: Worker):
self.thread_event = TEvent()
self.thread = Thread(target=WorkerStatusMiddleware.run, args=(self.thread_event,))
self.thread.start()
def before_worker_shutdown(self, broker: Broker, worker: Worker):
self.thread_event.set()
LOGGER.debug("Stopping WorkerStatusMiddleware")
self.thread.join()
@staticmethod
def run(event: TEvent):
setthreadtitle("authentik Worker status")
with transaction.atomic():
hostname = socket.gethostname()
WorkerStatus.objects.filter(hostname=hostname).delete()
status, _ = WorkerStatus.objects.update_or_create(
hostname=hostname,
version=authentik_full_version(),
)
while not event.is_set():
try:
WorkerStatusMiddleware.keep(event, status)
except DB_ERRORS: # pragma: no cover
event.wait(10)
try:
connections.close_all()
except DB_ERRORS:
pass
@staticmethod
def keep(event: TEvent, status: WorkerStatus):
lock_id = f"goauthentik.io/worker/status/{status.pk}"
with pglock.advisory(lock_id, side_effect=pglock.Raise):
while not event.is_set():
status.refresh_from_db()
old_last_seen = status.last_seen
status.last_seen = now()
if old_last_seen != status.last_seen:
status.save(update_fields=("last_seen",))
event.wait(30)
class _MetricsHandler(BaseMetricsHandler):
def do_GET(self) -> None:
monitoring_set.send_robust(self)
return super().do_GET()
class MetricsMiddleware(BaseMetricsMiddleware):
thread: HTTPServerThread | None
handler_class = _MetricsHandler
@property
def forks(self) -> list[Callable[[], None]]:
return []
def after_worker_boot(self, broker: Broker, worker: Worker):
listen = CONFIG.get("listen.metrics", ["[::]:9300"])
if isinstance(listen, str):
listen = listen.split(",")
addr, _, port = listen[0].rpartition(":")
def before_worker_boot(self, broker: Broker, worker: Any) -> None:
if settings.TEST:
return super().before_worker_boot(broker, worker)
try:
port = int(port)
except ValueError:
LOGGER.error(f"Invalid port entered: {port}")
self.thread = HTTPServerThread(target=MetricsMiddleware.run, args=(addr, port))
self.thread.start()
from prometheus_client import values
from prometheus_client.values import MultiProcessValue
def before_worker_shutdown(self, broker: Broker, worker: Worker):
server = self.thread.server
if server:
server.shutdown()
LOGGER.debug("Stopping MetricsMiddleware")
self.thread.join()
values.ValueClass = MultiProcessValue(lambda: worker.worker_id)
return super().before_worker_boot(broker, worker)
def after_worker_shutdown(self, broker: Broker, worker: Any) -> None:
if settings.TEST:
return
from prometheus_client import multiprocess
multiprocess.mark_process_dead(worker.worker_id)

View File

@@ -2,7 +2,6 @@
from datetime import timedelta
import pglock
from django.db.models import Count
from django.dispatch import receiver
from django.utils.timezone import now
@@ -31,24 +30,15 @@ GAUGE_TASKS_QUEUED = Gauge(
)
_version = parse(authentik_full_version())
@receiver(monitoring_set)
def monitoring_set_workers(sender, **kwargs):
"""Set worker gauge"""
worker_version_count = {}
for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)):
lock_id = f"goauthentik.io/worker/status/{status.pk}"
with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired:
# The worker doesn't hold the lock, it isn't running
if acquired:
continue
version_matching = parse(status.version) == _version
worker_version_count.setdefault(
status.version, {"count": 0, "matching": version_matching}
)
worker_version_count[status.version]["count"] += 1
our_version = parse(authentik_full_version())
for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(seconds=45)):
version_matching = parse(status.version) == our_version
worker_version_count.setdefault(status.version, {"count": 0, "matching": version_matching})
worker_version_count[status.version]["count"] += 1
for version, stats in worker_version_count.items():
OLD_GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])
GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])

View File

@@ -10,7 +10,6 @@ from dramatiq.results.middleware import Results
from dramatiq.worker import Worker, _ConsumerThread, _WorkerThread
from authentik.tasks.broker import PostgresBroker
from authentik.tasks.middleware import WorkerHealthcheckMiddleware
TESTING_QUEUE = "testing"
@@ -18,6 +17,7 @@ TESTING_QUEUE = "testing"
class TestWorker(Worker):
def __init__(self, broker: Broker):
super().__init__(broker=broker)
self.worker_id = 1000
self.work_queue = PriorityQueue()
self.consumers = {
TESTING_QUEUE: _ConsumerThread(
@@ -82,8 +82,6 @@ def use_test_broker():
middleware: Middleware = import_string(middleware_class)(
**middleware_kwargs,
)
if isinstance(middleware, WorkerHealthcheckMiddleware):
middleware.port = 9102
if isinstance(middleware, Retries):
middleware.max_retries = 0
if isinstance(middleware, Results):

View File

@@ -7,9 +7,7 @@ import (
"net/http"
"os"
"path"
"strconv"
"strings"
"syscall"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@@ -18,8 +16,6 @@ import (
"goauthentik.io/internal/web"
)
var workerPidFile = path.Join(os.TempDir(), "authentik-worker.pid")
var healthcheckCmd = &cobra.Command{
Use: "healthcheck",
Run: func(cmd *cobra.Command, args []string) {
@@ -31,9 +27,9 @@ var healthcheckCmd = &cobra.Command{
log.WithField("mode", mode).Debug("checking health")
switch strings.ToLower(mode) {
case "server":
exitCode = checkServer()
exitCode = check(fmt.Sprintf("http://localhost%s-/health/live/", config.Get().Web.Path))
case "worker":
exitCode = checkWorker()
exitCode = check("http://localhost/-/health/live/")
default:
log.Warn("Invalid mode")
}
@@ -45,7 +41,7 @@ func init() {
rootCmd.AddCommand(healthcheckCmd)
}
func checkServer() int {
func check(url string) int {
h := &http.Client{
Transport: utils.NewUserAgentTransport("goauthentik.io/healthcheck",
&http.Transport{
@@ -55,7 +51,6 @@ func checkServer() int {
},
),
}
url := fmt.Sprintf("http://localhost%s-/health/live/", config.Get().Web.Path)
res, err := h.Head(url)
if err != nil {
log.WithError(err).Warning("failed to send healthcheck request")
@@ -68,29 +63,3 @@ func checkServer() int {
log.Debug("successfully checked health")
return 0
}
func checkWorker() int {
pidB, err := os.ReadFile(workerPidFile)
if err != nil {
log.WithError(err).Warning("failed to check worker PID file")
return 1
}
pidS := strings.TrimSpace(string(pidB[:]))
pid, err := strconv.Atoi(pidS)
if err != nil {
log.WithError(err).Warning("failed to find worker process PID")
return 1
}
process, err := os.FindProcess(pid)
if err != nil {
log.WithError(err).Warning("failed to find worker process")
return 1
}
err = process.Signal(syscall.Signal(0))
if err != nil {
log.WithError(err).Warning("failed to signal worker process")
return 1
}
log.Info("successfully checked health")
return 0
}

View File

@@ -1,10 +1,8 @@
#!/usr/bin/env -S bash
set -e -o pipefail
MODE_FILE="${TMPDIR}/authentik-mode"
#!/usr/bin/env bash
if [[ -z "${PROMETHEUS_MULTIPROC_DIR}" ]]; then
export PROMETHEUS_MULTIPROC_DIR="${TMPDIR:-/tmp}/authentik_prometheus_tmp"
fi
set -e -o pipefail
MODE_FILE="$TMPDIR/authentik-mode"
function log {
printf '{"event": "%s", "level": "info", "logger": "bootstrap"}\n' "$@" >&2
@@ -15,10 +13,41 @@ function wait_for_db {
log "Bootstrap completed"
}
function check_if_root {
function run_authentik {
case "$1" in
server)
shift 1
echo -n server >"$MODE_FILE"
if [[ -x "$(command -v authentik-server)" ]]; then
echo authentik-server "$@"
else
echo go run ./cmd/server "$@"
fi
;;
healthcheck)
if [[ -x "$(command -v authentik-server)" ]]; then
echo authentik-server "$@"
else
echo go run ./cmd/server "$@"
fi
;;
worker)
if [[ -x "$(command -v authentik)" ]]; then
echo authentik "$@"
else
echo cargo run -- "$@"
fi
;;
*)
echo "$@"
;;
esac
}
function check_if_root_and_run {
if [[ $EUID -ne 0 ]]; then
log "Not running as root, disabling permission fixes"
exec $1
exec $(run_authentik "$@")
return
fi
SOCKET="/var/run/docker.sock"
@@ -26,36 +55,19 @@ function check_if_root {
if [[ -e "$SOCKET" ]]; then
# Get group ID of the docker socket, so we can create a matching group and
# add ourselves to it
DOCKER_GID=$(stat -c '%g' $SOCKET)
DOCKER_GID="$(stat -c "%g" "${SOCKET}")"
# Ensure group for the id exists
getent group $DOCKER_GID || groupadd -f -g $DOCKER_GID docker
usermod -a -G $DOCKER_GID authentik
getent group "${DOCKER_GID}" || groupadd -f -g "${DOCKER_GID}" docker
usermod -a -G "${DOCKER_GID}" authentik
# since the name of the group might not be docker, we need to lookup the group id
GROUP_NAME=$(getent group $DOCKER_GID | sed 's/:/\n/g' | head -1)
GROUP_NAME=$(getent group "${DOCKER_GID}" | sed 's/:/\n/g' | head -1)
GROUP="authentik:${GROUP_NAME}"
fi
# Fix permissions of certs and media
chown -R authentik:authentik /data /certs "${PROMETHEUS_MULTIPROC_DIR}"
chmod ug+rwx /data
chmod ug+rx /certs
exec chpst -u authentik:$GROUP env HOME=/authentik $1
}
function run_authentik {
if [[ -x "$(command -v authentik)" ]]; then
exec authentik $@
else
exec go run -v ./cmd/server/ $@
fi
}
function set_mode {
echo $1 >$MODE_FILE
trap cleanup EXIT
}
function cleanup {
rm -f ${MODE_FILE}
exec chpst -u authentik:"${GROUP}" env HOME=/authentik $(run_authentik "$@")
}
function prepare_debug {
@@ -72,38 +84,33 @@ function prepare_debug {
chown authentik:authentik /unittest.xml
}
if [[ -z "${PROMETHEUS_MULTIPROC_DIR}" ]]; then
export PROMETHEUS_MULTIPROC_DIR="${TMPDIR:-/tmp}/authentik_prometheus_tmp"
fi
mkdir -p "${PROMETHEUS_MULTIPROC_DIR}"
if [[ "$(python -m authentik.lib.config debugger 2>/dev/null)" == "True" ]]; then
prepare_debug
fi
if [[ "$1" == "server" ]]; then
set_mode "server"
run_authentik
elif [[ "$1" == "worker" ]]; then
set_mode "worker"
shift
# If we have bootstrap credentials set, run bootstrap tasks outside of main server
# sync, so that we can sure the first start actually has working bootstrap
# credentials
if [[ -n "${AUTHENTIK_BOOTSTRAP_PASSWORD}" || -n "${AUTHENTIK_BOOTSTRAP_TOKEN}" ]]; then
python -m manage apply_blueprint system/bootstrap.yaml || true
fi
check_if_root "python -m manage worker --pid-file ${TMPDIR}/authentik-worker.pid $@"
elif [[ "$1" == "bash" ]]; then
/bin/bash
elif [[ "$1" == "test-all" ]]; then
prepare_debug
chmod 777 /root
check_if_root "python -m manage test authentik"
elif [[ "$1" == "healthcheck" ]]; then
run_authentik healthcheck $(cat $MODE_FILE)
if [[ "$1" == "bash" ]]; then
exec /usr/bin/env -S bash "$@"
elif [[ "$1" == "dump_config" ]]; then
shift
exec python -m authentik.lib.config $@
shift 1
exec python -m authentik.lib.config "$@"
elif [[ "$1" == "debug" ]]; then
exec sleep infinity
elif [[ "$1" == "test-all" ]]; then
wait_for_db
prepare_debug
chmod 777 /root
check_if_root_and_run manage test authentik
elif [[ "$1" == "server" ]] || [[ "$1" == "worker" ]]; then
wait_for_db
check_if_root_and_run "$@"
elif [[ "$1" == "healthcheck" ]]; then
check_if_root_and_run "$@" "$(cat "$MODE_FILE")"
else
wait_for_db
exec python -m manage "$@"
fi

View File

@@ -1,6 +1,6 @@
# syntax=docker/dockerfile:1
# Stage 1: Build webui
# Stage: Build webui
FROM --platform=${BUILDPLATFORM} docker.io/library/node:24-trixie-slim@sha256:9707cd4542f400df5078df04f9652a272429112f15202d22b5b8bdd148df494f AS node-builder
ARG GIT_BUILD_HASH
@@ -28,23 +28,14 @@ COPY ./website /work/website/
RUN npm run build && \
npm run build:sfe
# Stage 2: Build go proxy
FROM --platform=${BUILDPLATFORM} docker.io/library/golang:1.26.2-trixie@sha256:c0074c718b473f3827043f86532c4c0ff537e3fe7a81b8219b0d1ccfcc2c9a09 AS go-builder
# Stage: Build go proxy
FROM docker.io/library/golang:1.26.2-trixie@sha256:c0074c718b473f3827043f86532c4c0ff537e3fe7a81b8219b0d1ccfcc2c9a09 AS go-builder
ARG TARGETOS
ARG TARGETARCH
ARG TARGETVARIANT
ARG GOOS=$TARGETOS
ARG GOARCH=$TARGETARCH
WORKDIR /go/src/goauthentik.io
RUN --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \
dpkg --add-architecture arm64 && \
apt-get update && \
apt-get install -y --no-install-recommends crossbuild-essential-arm64 gcc-aarch64-linux-gnu
RUN --mount=type=bind,target=/go/src/goauthentik.io/go.mod,src=./go.mod \
--mount=type=bind,target=/go/src/goauthentik.io/go.sum,src=./go.sum \
--mount=type=cache,target=/go/pkg/mod \
@@ -62,11 +53,9 @@ COPY ./packages/client-go /go/src/goauthentik.io/packages/client-go
RUN --mount=type=cache,sharing=locked,target=/go/pkg/mod \
--mount=type=cache,id=go-build-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/root/.cache/go-build \
if [ "$TARGETARCH" = "arm64" ]; then export CC=aarch64-linux-gnu-gcc && export CC_FOR_TARGET=gcc-aarch64-linux-gnu; fi && \
CGO_ENABLED=1 GOFIPS140=latest GOARM="${TARGETVARIANT#v}" \
go build -o /go/authentik ./cmd/server
CGO_ENABLED=1 GOFIPS140=latest go build -o /go/authentik-server ./cmd/server
# Stage 3: MaxMind GeoIP
# Stage: MaxMind GeoIP
FROM --platform=${BUILDPLATFORM} ghcr.io/maxmind/geoipupdate:v7.1.1@sha256:faecdca22579730ab0b7dea5aa9af350bb3c93cb9d39845c173639ead30346d2 AS geoip
ENV GEOIPUPDATE_EDITION_IDS="GeoLite2-City GeoLite2-ASN"
@@ -79,9 +68,31 @@ RUN --mount=type=secret,id=GEOIPUPDATE_ACCOUNT_ID \
mkdir -p /usr/share/GeoIP && \
/bin/sh -c "GEOIPUPDATE_LICENSE_KEY_FILE=/run/secrets/GEOIPUPDATE_LICENSE_KEY /usr/bin/entry.sh || echo 'Failed to get GeoIP database, disabling'; exit 0"
# Stage 4: Download uv
# Stage: download Rust toolchain
FROM ghcr.io/goauthentik/fips-debian:trixie-slim-fips@sha256:7726387c78b5787d2146868c2ccc8948a3591d0a5a6436f7780c8c28acc76341 AS rust-toolchain
ARG TARGETARCH
ARG TARGETVARIANT
ENV PATH="/root/.cargo/bin:$PATH"
SHELL ["/bin/sh", "-o", "pipefail", "-c"]
RUN --mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
apt-get update && \
# Required for installing pip packages
apt-get install -y --no-install-recommends \
# Build essentials
build-essential && \
curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain none && \
rustup install && \
rustup default "$(sed -n 's/channel = "\(.*\)"/\1/p' rust-toolchain.toml)" && \
rustc --version && \
cargo --version
RUN cat /root/.rustup/settings.toml
# Stage: Download uv
FROM ghcr.io/astral-sh/uv:0.11.5@sha256:555ac94f9a22e656fc5f2ce5dfee13b04e94d099e46bb8dd3a73ec7263f2e484 AS uv
# Stage 5: Base python image
# Stage: Base python image
FROM ghcr.io/goauthentik/fips-python:3.14.3-slim-trixie-fips@sha256:bf45eb77a010d76fe6abd7ae137d1b0c44b6227cd984945042135fdf05ebf8d9 AS python-base
ENV VENV_PATH="/ak-root/.venv" \
@@ -95,16 +106,53 @@ WORKDIR /ak-root/
COPY --from=uv /uv /uvx /bin/
# Stage 6: Python dependencies
# Stage: build rust binary
FROM python-base AS rust-builder
ARG TARGETARCH
ARG TARGETVARIANT
WORKDIR /build
ENV PATH="/root/.cargo/bin:$PATH"
COPY --from=rust-toolchain /root/.rustup /root/.rustup
COPY --from=rust-toolchain /root/.cargo /root/.cargo
RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache
RUN --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \
--mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
apt-get update && \
apt-get install -y --no-install-recommends \
# common dependencies
build-essential \
# aws-lc deps
cmake clang golang
# See https://github.com/aws/aws-lc-rs/issues/569
ENV AWS_LC_FIPS_SYS_CC=clang
RUN --mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
--mount=type=bind,target=Cargo.toml,src=Cargo.toml \
--mount=type=bind,target=Cargo.lock,src=Cargo.lock \
--mount=type=bind,target=.cargo/,src=.cargo/ \
--mount=type=bind,target=src/,src=src/ \
--mount=type=bind,target=packages/,src=packages/ \
--mount=type=bind,target=authentik/lib/default.yml,src=authentik/lib/default.yml \
# Required otherwise workspace discovery fails
--mount=type=bind,target=website/scripts/docsmg/,src=website/scripts/docsmg/ \
--mount=type=cache,id=cargo-git-db-$TARGETARCH$TARGETVARIANT,target=/root/.cargo/git/db/ \
--mount=type=cache,id=cargo-registry-$TARGETARCH$TARGETVARIANT,target=/root/.cargo/registry/ \
--mount=type=cache,id=rust-target-$TARGETARCH$TARGETVARIANT,target=/build/target/ \
cargo build --package authentik --no-default-features --features core --locked --release && \
cp ./target/release/authentik /bin/authentik
# Stage: Python dependencies
FROM python-base AS python-deps
ARG TARGETARCH
ARG TARGETVARIANT
RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache
ENV PATH="/root/.cargo/bin:$PATH"
RUN --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \
--mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
apt-get update && \
@@ -121,28 +169,21 @@ RUN --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/v
# python-kadmin-rs
krb5-multidev libkrb5-dev heimdal-multidev libclang-dev \
# xmlsec
libltdl-dev && \
export RUST_TOOLCHAIN="$(awk -F'\"' '/^[[:space:]]*channel[[:space:]]*=/{print $2; exit}' rust-toolchain.toml)" && \
curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain "${RUST_TOOLCHAIN}" && \
rustup default "${RUST_TOOLCHAIN}" && \
rustc --version && \
cargo --version
libltdl-dev
ENV UV_NO_BINARY_PACKAGE="cryptography lxml python-kadmin-rs xmlsec" \
# https://github.com/rust-lang/rustup/issues/2949
# Fixes issues where the rust version in the build cache is older than latest
# and rustup tries to update it, which fails
RUSTUP_PERMIT_COPY_RENAME="1"
ENV PATH="/root/.cargo/bin:$PATH"
COPY --from=rust-toolchain /root/.rustup /root/.rustup
COPY --from=rust-toolchain /root/.cargo /root/.cargo
ENV UV_NO_BINARY_PACKAGE="cryptography lxml python-kadmin-rs xmlsec"
RUN --mount=type=bind,target=pyproject.toml,src=pyproject.toml \
--mount=type=bind,target=uv.lock,src=uv.lock \
--mount=type=bind,target=packages,src=packages \
--mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
--mount=type=cache,id=uv-python-deps-$TARGETARCH$TARGETVARIANT,target=/root/.cache/uv \
RUSTUP_TOOLCHAIN="$(awk -F'\"' '/^[[:space:]]*channel[[:space:]]*=/{print $2; exit}' rust-toolchain.toml)" \
uv sync --frozen --no-install-project --no-dev
# Stage 7: Run
# Stage: Run
FROM python-base AS final-image
ARG VERSION
@@ -193,7 +234,8 @@ COPY ./manage.py /
COPY ./blueprints /blueprints
COPY ./lifecycle/ /lifecycle
COPY ./authentik/sources/kerberos/krb5.conf /etc/krb5.conf
COPY --from=go-builder /go/authentik /bin/authentik
COPY --from=rust-builder /bin/authentik /bin/authentik
COPY --from=go-builder /go/authentik-server /bin/authentik-server
COPY ./packages/ /ak-root/packages
RUN ln -s /ak-root/packages /packages
COPY --from=python-deps /ak-root/.venv /ak-root/.venv

147
lifecycle/worker_process.py Executable file
View File

@@ -0,0 +1,147 @@
#!/usr/bin/env python3
import os
import random
import signal
import sys
from http.server import BaseHTTPRequestHandler, HTTPServer
from socket import AF_UNIX
from threading import Event, Thread
from typing import Any
from dramatiq import Worker, get_broker
from structlog.stdlib import get_logger
from authentik.lib.config import CONFIG
LOGGER = get_logger()
INITIAL_WORKER_ID = 1000
class HttpHandler(BaseHTTPRequestHandler):
def check_db(self):
from django.db import connections
for db_conn in connections.all():
# Force connection reload
db_conn.connect()
_ = db_conn.cursor()
def do_GET(self):
if self.path == "/-/metrics/":
from authentik.root.monitoring import monitoring_set
monitoring_set.send_robust(self)
self.send_response(200)
self.end_headers()
elif self.path == "/-/health/ready/":
from django.db.utils import OperationalError
try:
self.check_db()
except OperationalError:
self.send_response(503)
self.send_response(200)
self.end_headers()
else:
self.send_response(200)
self.end_headers()
def log_message(self, format: str, *args: Any) -> None:
pass
class UnixSocketServer(HTTPServer):
address_family = AF_UNIX
def main(worker_id: int, socket_path: str):
shutdown = Event()
try:
os.remove(socket_path)
except OSError, FileNotFoundError:
pass
srv = UnixSocketServer(socket_path, HttpHandler)
def immediate_shutdown(signum, frame):
nonlocal srv
srv.shutdown()
sys.exit(0)
def graceful_shutdown(signum, frame):
nonlocal shutdown
shutdown.set()
signal.signal(signal.SIGHUP, immediate_shutdown)
signal.signal(signal.SIGINT, immediate_shutdown)
signal.signal(signal.SIGQUIT, immediate_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)
random.seed()
logger = LOGGER.bind(worker_id=worker_id)
logger.debug("Loading broker...")
broker = get_broker()
broker.emit_after("process_boot")
logger.debug("Starting worker threads...")
queues = None # all queues
worker = Worker(broker, queues=queues, worker_threads=CONFIG.get_int("worker.threads"))
worker.worker_id = worker_id
worker.start()
logger.info("Worker process is ready for action.")
Thread(target=srv.serve_forever).start()
# Notify rust process that we are ready
os.kill(os.getppid(), signal.SIGUSR2)
shutdown.wait()
logger.info("Shutting down worker...")
# 5 secs if debug, 5 mins otherwise
worker.stop(timeout=5_000 if CONFIG.get_bool("debug") else 600_000)
srv.shutdown()
broker.close()
logger.info("Worker shut down.")
if __name__ == "__main__":
if len(sys.argv) != 3: # noqa: PLR2004
print("USAGE: worker_process <worker_id> <socket_path>")
sys.exit(1)
worker_id = int(sys.argv[1])
socket_path = sys.argv[2]
from authentik.root.setup import setup
setup()
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "authentik.root.settings")
import django
django.setup()
from django.core.management import execute_from_command_line
if worker_id == INITIAL_WORKER_ID:
from lifecycle.migrate import run_migrations
run_migrations()
if (
"AUTHENTIK_BOOTSTRAP_PASSWORD" in os.environ
or "AUTHENTIK_BOOTSTRAP_TOKEN" in os.environ
):
try:
execute_from_command_line(["", "apply_blueprint", "system/bootstrap.yaml"])
except Exception as exc: # noqa: BLE001
sys.stderr.write(f"Failed to apply bootstrap blueprint: {exc}")
main(worker_id, socket_path)

View File

@@ -17,11 +17,13 @@ if __name__ == "__main__":
if (
len(sys.argv) > 1
# Explicitly only run migrate for server and worker
and sys.argv[1] in ["dev_server", "worker"]
and sys.argv[1] in ["dev_server"]
# and don't run if this is the child process of a dev_server
and os.environ.get(DJANGO_AUTORELOAD_ENV, None) is None
):
run_migrations()
if len(sys.argv) > 1 and sys.argv[1] in ["worker"]:
raise RuntimeError(f"{sys.argv[1]} command not allowed.")
try:
from django.core.management import execute_from_command_line
except ImportError as exc:

View File

@@ -84,7 +84,7 @@ pub(crate) async fn run_unix(
.handle(handle)
.serve(router.into_make_service())
.await;
if let Some(path) = addr.as_pathname() {
if !allow_failure && let Some(path) = addr.as_pathname() {
trace!(?addr, "removing socket");
if let Err(err) = std::fs::remove_file(path) {
trace!(?err, "failed to remove socket, ignoring");

View File

@@ -1,102 +0,0 @@
import platform
import sys
from argparse import Namespace
from multiprocessing import set_start_method
from typing import Any
from django.apps.registry import apps
from django.core.management.base import BaseCommand, CommandParser
from django.db import connections
from django.utils.module_loading import import_string, module_has_submodule
from dramatiq.cli import main
from django_dramatiq_postgres.conf import Conf
class Command(BaseCommand):
"""Run worker"""
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--pid-file",
action="store",
default=None,
dest="pid_file",
help="PID file",
)
parser.add_argument(
"--watch",
action="store_true",
default=False,
dest="watch",
help="Watch for file changes",
)
def handle(
self,
pid_file: str,
watch: bool,
verbosity: int,
**options: Any,
) -> None:
worker = Conf().worker
setup, modules = self._discover_tasks_modules()
args = Namespace(
broker=setup,
modules=modules,
path=["."],
queues=None,
log_file=None,
skip_logging=True,
use_spawn=False,
forks=[],
worker_shutdown_timeout=600000,
watch=None,
watch_use_polling=False,
include_patterns=["**.py"],
exclude_patterns=None,
verbose=0,
)
if watch:
args.watch = worker["watch_folder"]
if worker["watch_use_polling"]:
args.watch_use_polling = True
if processes := worker["processes"]:
args.processes = processes
if threads := worker["threads"]:
args.threads = threads
if pid_file is not None:
args.pid_file = pid_file
args.verbose = verbosity - 1
# > On macOS [...] the fork start method should be considered unsafe
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
if not platform.system() == "Darwin":
set_start_method("fork")
connections.close_all()
sys.exit(main(args)) # type: ignore[no-untyped-call]
def _discover_tasks_modules(self) -> tuple[str, list[str]]:
# Does not support a tasks directory
autodiscovery = Conf().autodiscovery
modules = []
if autodiscovery["enabled"]:
for app in apps.get_app_configs():
if autodiscovery["apps_prefix"] and not app.name.startswith(
autodiscovery["apps_prefix"]
):
continue
if module_has_submodule(app.module, autodiscovery["actors_module_name"]):
modules.append(f"{app.name}.{autodiscovery['actors_module_name']}")
else:
modules_callback = autodiscovery["modules_callback"]
callback = (
modules_callback
if not isinstance(modules_callback, str)
else import_string(modules_callback)
)
modules.extend(callback())
return autodiscovery["setup_module"], modules

View File

@@ -36,7 +36,7 @@ dependencies = [
"django >=4.2,<6.0",
"django-pglock >=1.7,<2",
"django-pgtrigger >=4,<5",
"dramatiq[watch] >=1.17,<1.18",
"dramatiq >=1.17,<1.18",
"tenacity >=9,<10",
"structlog >=25,<26",
]

98
src/main.rs Normal file
View File

@@ -0,0 +1,98 @@
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(feature = "core")]
use ak_common::db;
use ak_common::{Mode, Tasks, authentik_full_version, config, tls, tracing as ak_tracing};
use argh::FromArgs;
use eyre::{Result, eyre};
use tracing::{error, info, trace};
mod metrics;
#[cfg(feature = "core")]
mod server;
#[cfg(feature = "core")]
mod worker;
#[derive(Debug, FromArgs, PartialEq)]
/// The authentication glue you need.
struct Cli {
#[argh(subcommand)]
command: Command,
}
#[derive(Debug, FromArgs, PartialEq)]
#[argh(subcommand)]
enum Command {
#[cfg(feature = "core")]
Worker(worker::Cli),
}
fn main() -> Result<()> {
let tracing_crude = ak_tracing::install_crude();
info!(version = authentik_full_version(), "authentik is starting");
let cli: Cli = argh::from_env();
match &cli.command {
#[cfg(feature = "core")]
Command::Worker(_) => Mode::set(Mode::Worker)?,
}
trace!("installing error formatting");
color_eyre::install()?;
#[cfg(feature = "core")]
if Mode::is_core() {
trace!("initializing Python");
pyo3::Python::initialize();
trace!("Python initialized");
}
config::init()?;
tls::init()?;
let _sentry = ak_tracing::sentry::install()?;
ak_tracing::install()?;
drop(tracing_crude);
tokio::runtime::Builder::new_multi_thread()
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-{id}")
})
.enable_all()
.build()?
.block_on(async {
let mut tasks = Tasks::new()?;
config::run(&mut tasks)?;
let metrics = metrics::run(&mut tasks)?;
#[cfg(feature = "core")]
if Mode::get() == Mode::AllInOne || Mode::get() == Mode::Worker {
db::init(&mut tasks).await?;
}
match cli.command {
#[cfg(feature = "core")]
Command::Worker(args) => {
let workers = worker::run(args, &mut tasks)?;
metrics.workers.store(Some(workers));
}
}
let errors = tasks.run().await;
Mode::cleanup();
if errors.is_empty() {
info!("authentik exiting");
Ok(())
} else {
error!(err = ?errors, "authentik encountered errors");
Err(eyre!("Errors encountered: {:?}", errors))
}
})
}

73
src/metrics/handlers.rs Normal file
View File

@@ -0,0 +1,73 @@
use std::sync::Arc;
use ak_axum::error::Result;
use ak_common::mode::Mode;
use axum::{body::Body, extract::State, http::StatusCode, response::Response};
use tokio::task::spawn_blocking;
use super::Metrics;
pub(super) async fn metrics_handler(State(state): State<Arc<Metrics>>) -> Result<Response> {
let mut metrics = Vec::new();
state.prometheus.render_to_write(&mut metrics)?;
#[cfg(feature = "core")]
if Mode::is_core() {
if Mode::get() == Mode::Worker
&& let Some(workers) = state.workers.load_full()
{
workers.notify_metrics().await?;
}
metrics.extend(spawn_blocking(python::get_python_metrics).await??);
}
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; version=1.0.0; charset=utf-8")
.body(Body::from(metrics))?)
}
#[cfg(feature = "core")]
mod python {
use eyre::{Report, Result};
use pyo3::{
IntoPyObjectExt as _,
ffi::c_str,
prelude::*,
types::{PyBytes, PyDict},
};
pub(super) fn get_python_metrics() -> Result<Vec<u8>> {
let metrics = Python::attach(|py| {
let locals = PyDict::new(py);
Python::run(
py,
c_str!(
r#"
from prometheus_client import (
CollectorRegistry,
generate_latest,
multiprocess,
)
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
output = generate_latest(registry)
"#
),
None,
Some(&locals),
)?;
let metrics = locals
.get_item("output")?
.unwrap_or(PyBytes::new(py, &[]).into_bound_py_any(py)?)
.cast::<PyBytes>()
.map_or_else(|_| PyBytes::new(py, &[]), |v| v.to_owned())
.as_bytes()
.to_owned();
Ok::<_, Report>(metrics)
})?;
Ok::<_, Report>(metrics)
}
}

99
src/metrics/mod.rs Normal file
View File

@@ -0,0 +1,99 @@
use std::{env::temp_dir, os::unix, path::PathBuf, sync::Arc};
use ak_axum::{router::wrap_router, server};
use ak_common::{
arbiter::{Arbiter, Tasks},
config,
};
use arc_swap::ArcSwapOption;
use axum::{Router, routing::any};
use eyre::Result;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use tokio::{
task::spawn_blocking,
time::{Duration, interval},
};
use tracing::info;
#[cfg(feature = "core")]
use crate::worker::Workers;
mod handlers;
fn socket_path() -> PathBuf {
temp_dir().join("authentik-metrics.sock")
}
pub(crate) struct Metrics {
prometheus: PrometheusHandle,
#[cfg(feature = "core")]
pub(crate) workers: ArcSwapOption<Workers>,
}
impl Metrics {
fn new() -> Result<Self> {
info!("installing Prometheus recorder");
let prometheus = PrometheusBuilder::new()
.with_recommended_naming(true)
.install_recorder()?;
Ok(Self {
prometheus,
#[cfg(feature = "core")]
workers: ArcSwapOption::empty(),
})
}
}
async fn run_upkeep(arbiter: Arbiter, state: Arc<Metrics>) -> Result<()> {
info!("starting metrics upkeep runner");
let mut upkeep_interval = interval(Duration::from_secs(5));
loop {
tokio::select! {
_ = upkeep_interval.tick() => {
let state_clone = Arc::clone(&state);
spawn_blocking(move || state_clone.prometheus.run_upkeep()).await?;
},
() = arbiter.shutdown() => return Ok(())
}
}
}
fn build_router(state: Arc<Metrics>) -> Router {
wrap_router(
Router::new()
.fallback(any(handlers::metrics_handler))
.with_state(state),
true,
)
}
pub(crate) fn run(tasks: &mut Tasks) -> Result<Arc<Metrics>> {
let arbiter = tasks.arbiter();
let metrics = Arc::new(Metrics::new()?);
let router = build_router(Arc::clone(&metrics));
tasks
.build_task()
.name(&format!("{}::run_upkeep", module_path!(),))
.spawn(run_upkeep(arbiter, Arc::clone(&metrics)))?;
for addr in config::get().listen.metrics.iter().copied() {
server::start_plain(
tasks,
"metrics",
router.clone(),
addr,
true, // Allow failure in case the server is running on the same machine, like in dev
)?;
}
server::start_unix(
tasks,
"metrics",
router,
unix::net::SocketAddr::from_pathname(socket_path())?,
true, // Allow failure in case the server is running on the same machine, like in dev
)?;
Ok(metrics)
}

5
src/server/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
use std::{env::temp_dir, path::PathBuf};
pub(crate) fn socket_path() -> PathBuf {
temp_dir().join("authentik.sock")
}

42
src/worker/healthcheck.rs Normal file
View File

@@ -0,0 +1,42 @@
use std::sync::Arc;
use ak_axum::{error::Result, router::wrap_router};
use ak_common::db;
use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::any};
use super::Workers;
async fn health_ready(State(workers): State<Arc<Workers>>) -> Result<StatusCode> {
if !workers.are_alive().await || sqlx::query("SELECT 1").execute(db::get()).await.is_err() {
Ok(StatusCode::SERVICE_UNAVAILABLE)
} else if workers.health_ready().await? {
Ok(StatusCode::OK)
} else {
Ok(StatusCode::SERVICE_UNAVAILABLE)
}
}
async fn health_live(State(workers): State<Arc<Workers>>) -> Result<StatusCode> {
if !workers.are_alive().await || sqlx::query("SELECT 1").execute(db::get()).await.is_err() {
Ok(StatusCode::SERVICE_UNAVAILABLE)
} else if workers.health_live().await? {
Ok(StatusCode::OK)
} else {
Ok(StatusCode::SERVICE_UNAVAILABLE)
}
}
async fn fallback() -> impl IntoResponse {
StatusCode::OK
}
pub(super) fn build_router(workers: Arc<Workers>) -> Router {
wrap_router(
Router::new()
.route("/-/heath/ready/", any(health_ready))
.route("/-/heath/live/", any(health_live))
.fallback(fallback)
.with_state(workers),
true,
)
}

338
src/worker/mod.rs Normal file
View File

@@ -0,0 +1,338 @@
use std::{
env::temp_dir,
os::unix,
path::PathBuf,
process::Stdio,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use ak_common::{
Event,
arbiter::{Arbiter, Tasks},
config,
mode::Mode,
};
use argh::FromArgs;
use axum::{
body::Body,
http::{Request, header::HOST},
};
use eyre::{Result, eyre};
use hyper_unix_socket::UnixSocketConnector;
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use nix::{
sys::signal::{Signal, kill},
unistd::Pid,
};
use tokio::{
net::UnixStream,
process::{Child, Command},
signal::unix::SignalKind,
sync::Mutex,
time::{Duration, interval},
};
use tracing::{info, trace, warn};
use crate::server::socket_path;
mod healthcheck;
mod worker_status;
#[derive(Debug, Default, FromArgs, PartialEq, Eq)]
/// Run the authentik worker.
#[argh(subcommand, name = "worker")]
#[expect(
clippy::empty_structs_with_brackets,
reason = "argh doesn't support unit structs"
)]
pub(crate) struct Cli {}
const INITIAL_WORKER_ID: usize = 1000;
static INITIAL_WORKER_READY: AtomicBool = AtomicBool::new(false);
pub(crate) struct Worker {
worker: Child,
client: Client<UnixSocketConnector<PathBuf>, Body>,
socket_path: PathBuf,
}
impl Worker {
fn new(worker_id: usize, socket_path: PathBuf) -> Result<Self> {
info!(worker_id, "starting worker");
let mut cmd = Command::new("python");
cmd.arg("-m");
cmd.arg("lifecycle.worker_process");
cmd.arg(worker_id.to_string());
cmd.arg(&socket_path);
let client = Client::builder(TokioExecutor::new())
.pool_idle_timeout(Duration::from_secs(60))
.set_host(false)
.build(UnixSocketConnector::new(socket_path.clone()));
Ok(Self {
worker: cmd
.kill_on_drop(true)
.stdin(Stdio::null())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()?,
client,
socket_path,
})
}
async fn shutdown(&mut self, signal: Signal) -> Result<()> {
trace!(
signal = signal.as_str(),
"sending shutdown signal to worker"
);
if let Some(id) = self.worker.id() {
kill(Pid::from_raw(id.cast_signed()), signal)?;
}
self.worker.wait().await?;
Ok(())
}
async fn graceful_shutdown(&mut self) -> Result<()> {
info!("gracefully shutting down worker");
self.shutdown(Signal::SIGTERM).await
}
async fn fast_shutdown(&mut self) -> Result<()> {
info!("immediately shutting down worker");
self.shutdown(Signal::SIGINT).await
}
fn is_alive(&mut self) -> bool {
let try_wait = self.worker.try_wait();
match try_wait {
Ok(Some(code)) => {
warn!(?code, "worker has exited");
false
}
Ok(None) => true,
Err(err) => {
warn!(
?err,
"failed to check the status of worker process, ignoring"
);
true
}
}
}
async fn is_socket_ready(&self) -> bool {
let result = UnixStream::connect(&self.socket_path).await;
trace!(?result, "checking if worker socket is ready");
result.is_ok()
}
async fn health_live(&self) -> Result<bool> {
let req = Request::builder()
.method("GET")
.uri("http://localhost:8000/-/health/live/")
.header(HOST, "localhost")
.body(Body::from(""))?;
Ok(self.client.request(req).await?.status().is_success())
}
async fn health_ready(&self) -> Result<bool> {
let req = Request::builder()
.method("GET")
.uri("http://localhost:8000/-/health/ready/")
.header(HOST, "localhost")
.body(Body::from(""))?;
Ok(self.client.request(req).await?.status().is_success())
}
async fn notify_metrics(&self) -> Result<()> {
let req = Request::builder()
.method("GET")
.uri("http://localhost:8000/-/metrics/")
.header(HOST, "localhost")
.body(Body::from(""))?;
self.client.request(req).await?;
Ok(())
}
}
impl Drop for Worker {
fn drop(&mut self) {
if let Err(err) = std::fs::remove_file(&self.socket_path) {
trace!(?err, "failed to remove socket, ignoring");
}
}
}
pub(crate) struct Workers(Mutex<Vec<Worker>>);
impl Workers {
fn new() -> Result<Self> {
let mut workers = Vec::with_capacity(config::get().worker.processes.get());
workers.push(Worker::new(
INITIAL_WORKER_ID,
temp_dir().join(format!("authentik-worker-{INITIAL_WORKER_ID}.sock")),
)?);
Ok(Self(Mutex::new(workers)))
}
async fn start_other_workers(&self) -> Result<()> {
let mut workers = self.0.lock().await;
while workers.len() != config::get().worker.processes.get() {
let worker_id = INITIAL_WORKER_ID + workers.len();
workers.push(Worker::new(
worker_id,
temp_dir().join(format!("authentik-worker-{worker_id}.sock")),
)?);
}
Ok(())
}
async fn graceful_shutdown(&self) -> Result<()> {
let mut results = Vec::with_capacity(self.0.lock().await.capacity());
for worker in self.0.lock().await.iter_mut() {
results.push(worker.graceful_shutdown().await);
}
results.into_iter().find(Result::is_err).unwrap_or(Ok(()))
}
async fn fast_shutdown(&self) -> Result<()> {
let mut results = Vec::with_capacity(self.0.lock().await.capacity());
for worker in self.0.lock().await.iter_mut() {
results.push(worker.fast_shutdown().await);
}
results.into_iter().find(Result::is_err).unwrap_or(Ok(()))
}
async fn are_alive(&self) -> bool {
for worker in self.0.lock().await.iter_mut() {
if !worker.is_alive() {
return false;
}
}
true
}
async fn is_socket_ready(&self) -> bool {
if let Some(initial_worker) = self.0.lock().await.iter_mut().next() {
return initial_worker.is_socket_ready().await;
}
false
}
async fn health_live(&self) -> Result<bool> {
for worker in self.0.lock().await.iter() {
if !worker.health_live().await? {
return Ok(false);
}
}
Ok(true)
}
async fn health_ready(&self) -> Result<bool> {
for worker in self.0.lock().await.iter() {
if !worker.health_ready().await? {
return Ok(false);
}
}
Ok(true)
}
pub(crate) async fn notify_metrics(&self) -> Result<()> {
if let Some(worker) = self.0.lock().await.iter().next() {
worker.notify_metrics().await?;
}
Ok(())
}
}
async fn watch_workers(arbiter: Arbiter, workers: Arc<Workers>) -> Result<()> {
info!("starting worker watcher");
let mut events_rx = arbiter.events_subscribe();
let mut check_interval = interval(Duration::from_secs(5));
let mut start_interval = interval(Duration::from_secs(1));
loop {
tokio::select! {
Ok(Event::Signal(signal)) = events_rx.recv() => {
if signal == SignalKind::user_defined2() && !INITIAL_WORKER_READY.load(Ordering::Relaxed) {
info!("worker notified us ready, marked ready for operation");
INITIAL_WORKER_READY.store(true, Ordering::Relaxed);
workers.start_other_workers().await?;
}
},
_ = start_interval.tick(), if !INITIAL_WORKER_READY.load(Ordering::Relaxed) => {
// On some platforms the SIGUSR1 can be missed.
// Fall back to probing the worker unix socket and mark ready once it accepts connections.
if workers.is_socket_ready().await {
info!("worker socket is accepting connections, marked ready for operation");
INITIAL_WORKER_READY.store(true, Ordering::Relaxed);
workers.start_other_workers().await?;
}
},
_ = check_interval.tick() => {
if !workers.are_alive().await {
return Err(eyre!("one or more workers have exited unexpectedly"));
}
},
() = arbiter.fast_shutdown() => {
workers.fast_shutdown().await?;
return Ok(());
},
() = arbiter.graceful_shutdown() => {
workers.graceful_shutdown().await?;
return Ok(());
},
}
}
}
pub(crate) fn run(_cli: Cli, tasks: &mut Tasks) -> Result<Arc<Workers>> {
let arbiter = tasks.arbiter();
let workers = Arc::new(Workers::new()?);
tasks
.build_task()
.name(&format!("{}::watch_workers", module_path!()))
.spawn(watch_workers(arbiter.clone(), Arc::clone(&workers)))?;
tasks
.build_task()
.name(&format!("{}::worker_status::run", module_path!()))
.spawn(worker_status::run(arbiter))?;
// Only run HTTP server in worker mode, in allinone mode, they're handled by the server.
if Mode::get() == Mode::Worker {
let router = healthcheck::build_router(Arc::clone(&workers));
for addr in config::get().listen.http.iter().copied() {
ak_axum::server::start_plain(
tasks,
"worker",
router.clone(),
addr,
true, /* Allow failure in case the server is running on the same machine, like
* in dev. */
)?;
}
ak_axum::server::start_unix(
tasks,
"worker",
router,
unix::net::SocketAddr::from_pathname(socket_path())?,
true, // Allow failure in case the server is running on the same machine, like in dev.
)?;
}
Ok(workers)
}

View File

@@ -0,0 +1,48 @@
use ak_common::{arbiter::Arbiter, authentik_full_version, db};
use eyre::Result;
use nix::unistd::gethostname;
use tokio::time::{Duration, interval, sleep};
use tracing::warn;
use uuid::Uuid;
async fn keep(arbiter: Arbiter, id: Uuid, hostname: &str, version: &str) -> Result<()> {
let query = "
INSERT INTO authentik_tasks_workerstatus (id, hostname, version, last_seen)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (id) DO UPDATE SET last_seen = NOW()
";
let mut keep_interval = interval(Duration::from_secs(30));
loop {
tokio::select! {
_ = keep_interval.tick() => {
sqlx::query(query)
.bind(id)
.bind(hostname)
.bind(version)
.execute(db::get())
.await?;
},
() = arbiter.shutdown() => return Ok(()),
}
}
}
pub(super) async fn run(arbiter: Arbiter) -> Result<()> {
let id = Uuid::new_v4();
let raw_hostname = gethostname()?;
let hostname = raw_hostname.to_string_lossy();
let version = authentik_full_version();
loop {
if let Err(err) = keep(arbiter.clone(), id, hostname.as_ref(), &version).await {
warn!(?err, "failed to update worker status in database");
}
// `keep` returned. It's either an error in which case we wait 10s before
// retrying.
// Or we actually need to exit, which will happen here.
tokio::select! {
() = sleep(Duration::from_secs(10)) => {},
() = arbiter.shutdown() => return Ok(()),
}
}
}

54
uv.lock generated
View File

@@ -1116,7 +1116,7 @@ dependencies = [
{ name = "django" },
{ name = "django-pglock" },
{ name = "django-pgtrigger" },
{ name = "dramatiq", extra = ["watch"] },
{ name = "dramatiq" },
{ name = "structlog" },
{ name = "tenacity" },
]
@@ -1127,7 +1127,7 @@ requires-dist = [
{ name = "django", specifier = ">=4.2,<6.0" },
{ name = "django-pglock", specifier = ">=1.7,<2" },
{ name = "django-pgtrigger", specifier = ">=4,<5" },
{ name = "dramatiq", extras = ["watch"], specifier = ">=1.17,<1.18" },
{ name = "dramatiq", specifier = ">=1.17,<1.18" },
{ name = "structlog", specifier = ">=25,<26" },
{ name = "tenacity", specifier = ">=9,<10" },
]
@@ -1375,12 +1375,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ee/36/925c7afd5db4f1a3f00676b9c3c58f31ff7ae29a347282d86c8d429280a5/dramatiq-1.17.1-py3-none-any.whl", hash = "sha256:951cdc334478dff8e5150bb02a6f7a947d215ee24b5aedaf738eff20e17913df", size = 120382, upload-time = "2024-10-26T05:09:26.436Z" },
]
[package.optional-dependencies]
watch = [
{ name = "watchdog" },
{ name = "watchdog-gevent" },
]
[[package]]
name = "drf-jsonschema-serializer"
version = "3.0.0"
@@ -1572,28 +1566,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e5/15/cf2a69ade4b194aa524ac75112d5caac37414b20a3a03e6865dfe0bd1539/geopy-2.4.1-py3-none-any.whl", hash = "sha256:ae8b4bc5c1131820f4d75fce9d4aaaca0c85189b3aa5d64c3dcaf5e3b7b882a7", size = 125437, upload-time = "2023-11-23T21:49:30.421Z" },
]
[[package]]
name = "gevent"
version = "25.9.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "cffi", marker = "platform_python_implementation == 'CPython' and sys_platform == 'win32'" },
{ name = "greenlet", marker = "platform_python_implementation == 'CPython'" },
{ name = "zope-event" },
{ name = "zope-interface" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9e/48/b3ef2673ffb940f980966694e40d6d32560f3ffa284ecaeb5ea3a90a6d3f/gevent-25.9.1.tar.gz", hash = "sha256:adf9cd552de44a4e6754c51ff2e78d9193b7fa6eab123db9578a210e657235dd", size = 5059025, upload-time = "2025-09-17T16:15:34.528Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/15/1a/948f8167b2cdce573cf01cec07afc64d0456dc134b07900b26ac7018b37e/gevent-25.9.1-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:1a3fe4ea1c312dbf6b375b416925036fe79a40054e6bf6248ee46526ea628be1", size = 2982934, upload-time = "2025-09-17T14:54:11.302Z" },
{ url = "https://files.pythonhosted.org/packages/9b/ec/726b146d1d3aad82e03d2e1e1507048ab6072f906e83f97f40667866e582/gevent-25.9.1-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:0adb937f13e5fb90cca2edf66d8d7e99d62a299687400ce2edee3f3504009356", size = 1813982, upload-time = "2025-09-17T15:41:28.506Z" },
{ url = "https://files.pythonhosted.org/packages/35/5d/5f83f17162301662bd1ce702f8a736a8a8cac7b7a35e1d8b9866938d1f9d/gevent-25.9.1-cp314-cp314-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:427f869a2050a4202d93cf7fd6ab5cffb06d3e9113c10c967b6e2a0d45237cb8", size = 1894902, upload-time = "2025-09-17T15:49:03.702Z" },
{ url = "https://files.pythonhosted.org/packages/83/cd/cf5e74e353f60dab357829069ffc300a7bb414c761f52cf8c0c6e9728b8d/gevent-25.9.1-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:c049880175e8c93124188f9d926af0a62826a3b81aa6d3074928345f8238279e", size = 1861792, upload-time = "2025-09-17T15:49:23.279Z" },
{ url = "https://files.pythonhosted.org/packages/dd/65/b9a4526d4a4edce26fe4b3b993914ec9dc64baabad625a3101e51adb17f3/gevent-25.9.1-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:b5a67a0974ad9f24721034d1e008856111e0535f1541499f72a733a73d658d1c", size = 2113215, upload-time = "2025-09-17T15:15:16.34Z" },
{ url = "https://files.pythonhosted.org/packages/e5/be/7d35731dfaf8370795b606e515d964a0967e129db76ea7873f552045dd39/gevent-25.9.1-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:1d0f5d8d73f97e24ea8d24d8be0f51e0cf7c54b8021c1fddb580bf239474690f", size = 1833449, upload-time = "2025-09-17T15:52:43.75Z" },
{ url = "https://files.pythonhosted.org/packages/65/58/7bc52544ea5e63af88c4a26c90776feb42551b7555a1c89c20069c168a3f/gevent-25.9.1-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:ddd3ff26e5c4240d3fbf5516c2d9d5f2a998ef87cfb73e1429cfaeaaec860fa6", size = 2176034, upload-time = "2025-09-17T15:24:15.676Z" },
{ url = "https://files.pythonhosted.org/packages/c2/69/a7c4ba2ffbc7c7dbf6d8b4f5d0f0a421f7815d229f4909854266c445a3d4/gevent-25.9.1-cp314-cp314-win_amd64.whl", hash = "sha256:bb63c0d6cb9950cc94036a4995b9cc4667b8915366613449236970f4394f94d7", size = 1703019, upload-time = "2025-09-17T19:30:55.272Z" },
]
[[package]]
name = "google-api-core"
version = "2.29.0"
@@ -3886,19 +3858,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/33/e8/e40370e6d74ddba47f002a32919d91310d6074130fe4e17dabcafc15cbf1/watchdog-6.0.0-py3-none-win_ia64.whl", hash = "sha256:a1914259fa9e1454315171103c6a30961236f508b9b623eae470268bbcc6a22f", size = 79067, upload-time = "2024-11-01T14:07:11.845Z" },
]
[[package]]
name = "watchdog-gevent"
version = "0.2.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "gevent" },
{ name = "watchdog" },
]
sdist = { url = "https://files.pythonhosted.org/packages/97/69/91cfca7c21c382e3a8aca4251dcd7d4315228d9346381feb2dde36d14061/watchdog_gevent-0.2.1.tar.gz", hash = "sha256:ae6b94d0f8c8ce1c5956cd865f612b61f456cf19801744bba25a349fe8e8c337", size = 4296, upload-time = "2024-10-19T05:29:12.987Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/00/a9/54b88e150b77791958957e2188312477d09fc84820fc03f8b3a7569d10b0/watchdog_gevent-0.2.1-py3-none-any.whl", hash = "sha256:e8114658104a018f626ee54052335407c1438369febc776c4b4c4308ed002350", size = 3462, upload-time = "2024-10-19T05:29:11.421Z" },
]
[[package]]
name = "watchfiles"
version = "1.1.1"
@@ -4080,15 +4039,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/2e/54/647ade08bf0db230bfea292f893923872fd20be6ac6f53b2b936ba839d75/zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e", size = 10276, upload-time = "2025-06-08T17:06:38.034Z" },
]
[[package]]
name = "zope-event"
version = "6.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/46/33/d3eeac228fc14de76615612ee208be2d8a5b5b0fada36bf9b62d6b40600c/zope_event-6.1.tar.gz", hash = "sha256:6052a3e0cb8565d3d4ef1a3a7809336ac519bc4fe38398cb8d466db09adef4f0", size = 18739, upload-time = "2025-11-07T08:05:49.934Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c2/b0/956902e5e1302f8c5d124e219c6bf214e2649f92ad5fce85b05c039a04c9/zope_event-6.1-py3-none-any.whl", hash = "sha256:0ca78b6391b694272b23ec1335c0294cc471065ed10f7f606858fc54566c25a0", size = 6414, upload-time = "2025-11-07T08:05:48.874Z" },
]
[[package]]
name = "zope-interface"
version = "8.2"

View File

@@ -92,6 +92,15 @@ For other distributions (Red Hat, SUSE, Arch), adjust the package names as neede
Install `golangci-lint` by following the [official installation instructions](https://golangci-lint.run/welcome/install/#other-ci).
:::warning
[aws-lc-rs](https://github.com/aws/aws-lc-rs) currently has an [issue](https://github.com/aws/aws-lc-rs/issues/569) building its FIPS module with GCC >= 14. If you encounter this issue, you have two options:
- Use an older version of GCC.
- Install [Clang](https://clang.llvm.org) and `export AWS_LC_FIPS_SYS_CC=clang`.
:::
</TabItem>
<TabItem value="Windows">
@@ -154,7 +163,13 @@ Both processes need to run to get a fully functioning authentik development envi
### Hot-reloading
When `AUTHENTIK_DEBUG` is set to `true` (the default for the development environment), the authentik server automatically reloads whenever changes are made to the code. However, due to instabilities in the reloading process of the worker, that behavior is turned off for the worker. You can enable code reloading in the worker by manually running `uv run ak worker --watch`.
When `AUTHENTIK_DEBUG` is set to `true` (the default for the development environment), the authentik server automatically reloads whenever changes are made to the code.
For the authentik worker, install [watchexec](https://github.com/watchexec/watchexec), and run:
```shell
make run-worker-watch
```
## 6. Build the frontend