Compare commits

...

32 Commits

Author SHA1 Message Date
Marc 'risson' Schmitt
7ff008d6d6 wip
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-24 16:47:38 +02:00
Marc 'risson' Schmitt
5ad0150fe4 fix page size
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-24 15:19:32 +02:00
Marc 'risson' Schmitt
4f52a79c6a application refresh
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-24 15:18:50 +02:00
Marc 'risson' Schmitt
a8b8a81375 Merge branch 'main' into rust-proxy 2026-04-24 13:54:38 +02:00
Marc 'risson' Schmitt
31e7b1dc4b Merge branch 'rust-worker-2' into rust-proxy
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-23 15:46:53 +02:00
Marc 'risson' Schmitt
8bf7efecfd fix lint
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-23 15:33:30 +02:00
Marc 'risson' Schmitt
b1ceb28f71 Merge branch 'main' into rust-worker-2
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-23 15:26:14 +02:00
Marc 'risson' Schmitt
2cb3df2a60 wip
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-16 19:00:42 +02:00
Marc 'risson' Schmitt
5426881797 wip
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-16 19:00:26 +02:00
Marc 'risson' Schmitt
3f703bb21b wip
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-16 18:23:54 +02:00
Marc 'risson' Schmitt
b3c0a50f91 metrics and logging
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-16 16:46:54 +02:00
Marc 'risson' Schmitt
1fec16b8e0 run -> start
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-16 14:04:40 +02:00
Marc 'risson' Schmitt
8657d74dc9 root: init rust worker
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-16 14:00:52 +02:00
Marc 'risson' Schmitt
347df15f50 wip
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-16 14:00:28 +02:00
Marc 'risson' Schmitt
cf2ed15ced Merge branch 'rust-worker-2' into rust-proxy
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-16 13:42:43 +02:00
Marc 'risson' Schmitt
b220e80a0d run -> start
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-16 13:40:30 +02:00
Marc 'risson' Schmitt
54f6b5c73c root: init rust worker
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-16 13:40:30 +02:00
Marc 'risson' Schmitt
9fad68bdad packages/ak-common/tracing: get sentry config from API for outposts
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-16 13:40:29 +02:00
Marc 'risson' Schmitt
dc1d99288f wip
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-15 17:51:28 +02:00
Marc 'risson' Schmitt
8fb795ec89 wip
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-15 17:41:40 +02:00
Marc 'risson' Schmitt
f8f84f5f0b fixup
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-15 17:41:33 +02:00
Marc 'risson' Schmitt
5812558463 wip
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-15 17:38:06 +02:00
Marc 'risson' Schmitt
513462f78d fixup
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-15 17:38:02 +02:00
Marc 'risson' Schmitt
833912b712 Merge branch 'rust-worker-2' into rust-proxy
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-15 17:32:31 +02:00
Marc 'risson' Schmitt
78a4b06ab3 root: init rust worker
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-15 17:21:35 +02:00
Marc 'risson' Schmitt
c38e3cbbcf packages/ak-common/tracing: get sentry config from API for outposts
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-15 17:21:35 +02:00
Marc 'risson' Schmitt
9fba928666 Merge branch 'main' into rust-proxy
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-15 17:16:50 +02:00
Marc 'risson' Schmitt
ce8f33416e ws
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-15 16:41:26 +02:00
Marc 'risson' Schmitt
6308ec3360 wip
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-14 15:04:03 +02:00
Marc 'risson' Schmitt
915bf6942e wip
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-10 17:16:32 +02:00
Marc 'risson' Schmitt
e63d2afb29 wip
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-10 14:10:05 +02:00
Marc 'risson' Schmitt
d103cea26a root: init rust worker
Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
2026-04-10 12:56:24 +02:00
36 changed files with 2146 additions and 513 deletions

View File

@@ -1,5 +1,5 @@
[alias]
t = ["nextest", "run"]
t = ["nextest", "run", "--workspace"]
[build]
rustflags = ["--cfg", "tokio_unstable"]

View File

@@ -1,5 +1,6 @@
[licenses]
allow = [
"Apache-2.0 WITH LLVM-exception",
"Apache-2.0",
"BSD-3-Clause",
"CC0-1.0",

352
Cargo.lock generated
View File

@@ -17,6 +17,18 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "ahash"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "aho-corasick"
version = "1.1.4"
@@ -106,6 +118,37 @@ dependencies = [
"rustversion",
]
[[package]]
name = "argh"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "211818e820cda9ca6f167a64a5c808837366a6dfd807157c64c1304c486cd033"
dependencies = [
"argh_derive",
"argh_shared",
]
[[package]]
name = "argh_derive"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c442a9d18cef5dde467405d27d461d080d68972d6d0dfd0408265b6749ec427d"
dependencies = [
"argh_shared",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "argh_shared"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ade012bac4db278517a0132c8c10c6427025868dca16c801087c28d5a411f1"
dependencies = [
"serde",
]
[[package]]
name = "arraydeque"
version = "0.5.1"
@@ -138,6 +181,39 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "authentik"
version = "2026.5.0-rc1"
dependencies = [
"arc-swap",
"argh",
"authentik-axum",
"authentik-client",
"authentik-common",
"axum",
"color-eyre",
"eyre",
"futures",
"hyper-unix-socket",
"hyper-util",
"metrics",
"metrics-exporter-prometheus",
"nix 0.31.2",
"pyo3",
"rand 0.10.1",
"serde",
"serde_json",
"serde_repr",
"sqlx",
"time",
"tokio",
"tokio-retry2",
"tokio-tungstenite",
"tracing",
"url",
"uuid",
]
[[package]]
name = "authentik-axum"
version = "2026.5.0-rc1"
@@ -485,6 +561,17 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "chacha20"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601"
dependencies = [
"cfg-if",
"cpufeatures 0.3.0",
"rand_core 0.10.1",
]
[[package]]
name = "chrono"
version = "0.4.44"
@@ -567,6 +654,33 @@ dependencies = [
"cc",
]
[[package]]
name = "color-eyre"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5920befb47832a6d61ee3a3a846565cfa39b331331e68a3b1d1116630f2f26d"
dependencies = [
"backtrace",
"color-spantrace",
"eyre",
"indenter",
"once_cell",
"owo-colors",
"tracing-error",
]
[[package]]
name = "color-spantrace"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8b88ea9df13354b55bc7234ebcce36e6ef896aca2e42a15de9e10edce01b427"
dependencies = [
"once_cell",
"owo-colors",
"tracing-core",
"tracing-error",
]
[[package]]
name = "colorchoice"
version = "1.0.5"
@@ -695,6 +809,15 @@ dependencies = [
"libc",
]
[[package]]
name = "cpufeatures"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201"
dependencies = [
"libc",
]
[[package]]
name = "crc"
version = "3.4.0"
@@ -728,6 +851,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.12"
@@ -977,6 +1109,12 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "foldhash"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
[[package]]
name = "form_urlencoded"
version = "1.2.2"
@@ -1166,6 +1304,7 @@ dependencies = [
"cfg-if",
"libc",
"r-efi 6.0.0",
"rand_core 0.10.1",
"wasip2",
"wasip3",
]
@@ -1209,7 +1348,7 @@ checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash",
"foldhash 0.1.5",
]
[[package]]
@@ -1217,6 +1356,9 @@ name = "hashbrown"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
dependencies = [
"foldhash 0.2.0",
]
[[package]]
name = "hashlink"
@@ -1343,9 +1485,9 @@ checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
[[package]]
name = "hyper"
version = "1.8.1"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11"
checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca"
dependencies = [
"atomic-waker",
"bytes",
@@ -1358,7 +1500,6 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"pin-utils",
"smallvec",
"tokio",
"want",
@@ -1393,6 +1534,20 @@ dependencies = [
"tower-service",
]
[[package]]
name = "hyper-unix-socket"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c255628da188a9d9ee218bae99da33a4b684ed63abe140a94d0f6e4b5af9a090"
dependencies = [
"bytes",
"hyper",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "hyper-util"
version = "0.1.20"
@@ -1850,6 +2005,46 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79"
[[package]]
name = "metrics"
version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8"
dependencies = [
"ahash",
"portable-atomic",
]
[[package]]
name = "metrics-exporter-prometheus"
version = "0.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3589659543c04c7dc5526ec858591015b87cd8746583b51b48ef4353f99dbcda"
dependencies = [
"base64 0.22.1",
"indexmap",
"metrics",
"metrics-util",
"quanta",
"thiserror 2.0.18",
]
[[package]]
name = "metrics-util"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdfb1365fea27e6dd9dc1dbc19f570198bc86914533ad639dae939635f096be4"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
"hashbrown 0.16.1",
"metrics",
"quanta",
"rand 0.9.2",
"rand_xoshiro",
"sketches-ddsketch",
]
[[package]]
name = "mime"
version = "0.3.17"
@@ -2233,6 +2428,12 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "owo-colors"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d211803b9b6b570f68772237e415a029d5a50c65d382910b879fb19d3271f94d"
[[package]]
name = "parking"
version = "2.2.1"
@@ -2309,12 +2510,6 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs1"
version = "0.7.5"
@@ -2348,6 +2543,12 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6"
[[package]]
name = "portable-atomic"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49"
[[package]]
name = "potential_utf"
version = "0.1.4"
@@ -2423,6 +2624,79 @@ dependencies = [
"prost",
]
[[package]]
name = "pyo3"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91fd8e38a3b50ed1167fb981cd6fd60147e091784c427b8f7183a7ee32c31c12"
dependencies = [
"libc",
"once_cell",
"portable-atomic",
"pyo3-build-config",
"pyo3-ffi",
"pyo3-macros",
]
[[package]]
name = "pyo3-build-config"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e368e7ddfdeb98c9bca7f8383be1648fd84ab466bf2bc015e94008db6d35611e"
dependencies = [
"target-lexicon",
]
[[package]]
name = "pyo3-ffi"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f29e10af80b1f7ccaf7f69eace800a03ecd13e883acfacc1e5d0988605f651e"
dependencies = [
"libc",
"pyo3-build-config",
]
[[package]]
name = "pyo3-macros"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df6e520eff47c45997d2fc7dd8214b25dd1310918bbb2642156ef66a67f29813"
dependencies = [
"proc-macro2",
"pyo3-macros-backend",
"quote",
"syn",
]
[[package]]
name = "pyo3-macros-backend"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4cdc218d835738f81c2338f822078af45b4afdf8b2e33cbb5916f108b813acb"
dependencies = [
"heck",
"proc-macro2",
"pyo3-build-config",
"quote",
"syn",
]
[[package]]
name = "quanta"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi",
"web-sys",
"winapi",
]
[[package]]
name = "quinn"
version = "0.11.9"
@@ -2521,6 +2795,17 @@ dependencies = [
"rand_core 0.9.5",
]
[[package]]
name = "rand"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207"
dependencies = [
"chacha20",
"getrandom 0.4.2",
"rand_core 0.10.1",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
@@ -2559,6 +2844,30 @@ dependencies = [
"getrandom 0.3.4",
]
[[package]]
name = "rand_core"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69"
[[package]]
name = "rand_xoshiro"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41"
dependencies = [
"rand_core 0.9.5",
]
[[package]]
name = "raw-cpuid"
version = "11.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186"
dependencies = [
"bitflags 2.11.0",
]
[[package]]
name = "redox_syscall"
version = "0.5.18"
@@ -3095,7 +3404,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"cpufeatures 0.2.17",
"digest",
]
@@ -3106,7 +3415,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
dependencies = [
"cfg-if",
"cpufeatures",
"cpufeatures 0.2.17",
"digest",
]
@@ -3151,6 +3460,12 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "703d5c7ef118737c72f1af64ad2f6f8c5e1921f818cdcb97b8fe6fc69bf66214"
[[package]]
name = "sketches-ddsketch"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c6f73aeb92d671e0cc4dca167e59b2deb6387c375391bc99ee743f326994a2b"
[[package]]
name = "slab"
version = "0.4.12"
@@ -3476,6 +3791,12 @@ dependencies = [
"libc",
]
[[package]]
name = "target-lexicon"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adb6935a6f5c20170eeceb1a3835a49e12e19d792f6dd344ccc76a985ca5a6ca"
[[package]]
name = "tempfile"
version = "3.27.0"
@@ -3664,8 +3985,12 @@ checksum = "8f72a05e828585856dacd553fba484c242c46e391fb0e58917c942ee9202915c"
dependencies = [
"futures-util",
"log",
"rustls",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tungstenite",
"webpki-roots 0.26.11",
]
[[package]]
@@ -3879,8 +4204,11 @@ dependencies = [
"httparse",
"log",
"rand 0.9.2",
"rustls",
"rustls-pki-types",
"sha1",
"thiserror 2.0.18",
"url",
]
[[package]]

View File

@@ -20,11 +20,13 @@ publish = false
[workspace.dependencies]
arc-swap = "= 1.9.1"
argh = "= 0.1.19"
axum-server = { version = "= 0.8.0", features = ["tls-rustls-no-provider"] }
aws-lc-rs = { version = "= 1.16.3", features = ["fips"] }
axum = { version = "= 0.8.9", features = ["http2", "macros", "ws"] }
clap = { version = "= 4.6.1", features = ["derive", "env"] }
client-ip = { version = "0.2.1", features = ["forwarded-header"] }
color-eyre = "= 0.6.5"
colored = "= 3.1.1"
config-rs = { package = "config", version = "= 0.15.22", default-features = false, features = [
"json",
@@ -37,11 +39,17 @@ eyre = "= 0.6.12"
forwarded-header-value = "= 0.1.1"
futures = "= 0.3.32"
glob = "= 0.3.3"
hyper-unix-socket = "= 0.3.0"
hyper-util = "= 0.1.20"
ipnet = { version = "= 2.12.0", features = ["serde"] }
json-subscriber = "= 0.2.8"
nix = { version = "= 0.31.2", features = ["signal"] }
metrics = "= 0.24.3"
metrics-exporter-prometheus = { version = "= 0.18.1", default-features = false }
nix = { version = "= 0.31.2", features = ["hostname", "signal"] }
notify = "= 8.2.0"
pin-project-lite = "= 0.2.17"
pyo3 = "= 0.28.3"
rand = "= 0.10.1"
regex = "= 1.12.3"
reqwest = { version = "= 0.13.2", features = [
"form",
@@ -92,6 +100,10 @@ time = { version = "= 0.3.47", features = ["macros"] }
tokio = { version = "= 1.52.1", features = ["full", "tracing"] }
tokio-retry2 = "= 0.9.1"
tokio-rustls = "= 0.26.4"
tokio-tungstenite = { version = "= 0.29.0", features = [
"rustls-tls-webpki-roots",
"url",
] }
tokio-util = { version = "= 0.7.18", features = ["full"] }
tower = "= 0.5.3"
tower-http = { version = "= 0.6.8", features = ["timeout"] }
@@ -106,16 +118,10 @@ tracing-subscriber = { version = "= 0.3.23", features = [
url = "= 2.5.8"
uuid = { version = "= 1.23.1", features = ["serde", "v4"] }
ak-axum = { package = "authentik-axum", version = "2026.5.0-rc1", path = "./packages/ak-axum" }
ak-client = { package = "authentik-client", version = "2026.5.0-rc1", path = "./packages/client-rust" }
ak-common = { package = "authentik-common", version = "2026.5.0-rc1", path = "./packages/ak-common", default-features = false }
[profile.dev.package.backtrace]
opt-level = 3
[profile.release]
lto = true
debug = 2
[workspace.lints.rust]
ambiguous_negative_literals = "warn"
closure_returning_async_block = "warn"
@@ -229,3 +235,64 @@ unused_trait_names = "warn"
unwrap_in_result = "warn"
unwrap_used = "warn"
verbose_file_reads = "warn"
[profile.dev.package.backtrace]
opt-level = 3
[profile.dev]
panic = "abort"
[profile.release]
debug = 2
lto = "fat"
# Because of the async runtime, we want to die straightaway if we panic.
panic = "abort"
strip = true
[package]
name = "authentik"
version.workspace = true
authors.workspace = true
edition.workspace = true
readme.workspace = true
homepage.workspace = true
repository.workspace = true
license-file.workspace = true
publish.workspace = true
[features]
default = ["core", "proxy"]
core = ["ak-common/core", "dep:pyo3", "dep:sqlx"]
proxy = ["ak-common/proxy", "dep:ak-client"]
[dependencies]
ak-axum.workspace = true
ak-client = { workspace = true, optional = true }
ak-common.workspace = true
arc-swap.workspace = true
argh.workspace = true
axum.workspace = true
color-eyre.workspace = true
eyre.workspace = true
futures.workspace = true
hyper-unix-socket.workspace = true
hyper-util.workspace = true
metrics-exporter-prometheus.workspace = true
metrics.workspace = true
nix.workspace = true
pyo3 = { workspace = true, optional = true }
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_repr.workspace = true
sqlx = { workspace = true, optional = true }
time.workspace = true
tokio-retry2.workspace = true
tokio-tungstenite.workspace = true
tokio.workspace = true
tracing.workspace = true
url.workspace = true
uuid.workspace = true
[lints]
workspace = true

View File

@@ -115,6 +115,9 @@ run-server: ## Run the main authentik server process
run-worker: ## Run the main authentik worker process
$(UV) run ak worker
run-worker-watch: ## Run the authentik worker, with auto reloading
watchexec --on-busy-update=restart --stop-signal=SIGINT --exts py,rs --no-meta --notify -- $(UV) run ak worker
core-i18n-extract:
$(UV) run ak makemessages \
--add-location file \

View File

@@ -1,7 +1,5 @@
"""Apply blueprint from commandline"""
from sys import exit as sys_exit
from django.core.management.base import BaseCommand, no_translations
from structlog.stdlib import get_logger
@@ -28,7 +26,7 @@ class Command(BaseCommand):
self.stderr.write("Blueprint invalid")
for log in logs:
self.stderr.write(f"\t{log.logger}: {log.event}: {log.attributes}")
sys_exit(1)
raise RuntimeError("Blueprint invalid")
importer.apply()
def add_arguments(self, parser):

View File

@@ -446,8 +446,6 @@ DRAMATIQ = {
("authentik.tasks.middleware.TaskLogMiddleware", {}),
("authentik.tasks.middleware.LoggingMiddleware", {}),
("authentik.tasks.middleware.DescriptionMiddleware", {}),
("authentik.tasks.middleware.WorkerHealthcheckMiddleware", {}),
("authentik.tasks.middleware.WorkerStatusMiddleware", {}),
(
"authentik.tasks.middleware.MetricsMiddleware",
{

View File

@@ -1,4 +1,3 @@
import pglock
from django.utils.timezone import now, timedelta
from drf_spectacular.utils import extend_schema, inline_serializer
from packaging.version import parse
@@ -31,18 +30,13 @@ class WorkerView(APIView):
def get(self, request: Request) -> Response:
response = []
our_version = parse(authentik_full_version())
for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)):
lock_id = f"goauthentik.io/worker/status/{status.pk}"
with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired:
# The worker doesn't hold the lock, it isn't running
if acquired:
continue
version_matching = parse(status.version) == our_version
response.append(
{
"worker_id": f"{status.pk}@{status.hostname}",
"version": status.version,
"version_matching": version_matching,
}
)
for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(seconds=45)):
version_matching = parse(status.version) == our_version
response.append(
{
"worker_id": f"{status.pk}@{status.hostname}",
"version": status.version,
"version_matching": version_matching,
}
)
return Response(response)

View File

@@ -1,42 +1,25 @@
import socket
from collections.abc import Callable
from http.server import BaseHTTPRequestHandler
from threading import Event as TEvent
from threading import Thread, current_thread
from typing import Any, cast
import pglock
from django.db import OperationalError, connections, transaction
from django.utils.timezone import now
from django.conf import settings
from django.db import OperationalError
from django_dramatiq_postgres.middleware import (
CurrentTask as BaseCurrentTask,
)
from django_dramatiq_postgres.middleware import (
HTTPServer,
HTTPServerThread,
)
from django_dramatiq_postgres.middleware import (
MetricsMiddleware as BaseMetricsMiddleware,
)
from django_dramatiq_postgres.middleware import (
_MetricsHandler as BaseMetricsHandler,
)
from dramatiq import Worker
from dramatiq.broker import Broker
from dramatiq.message import Message
from dramatiq.middleware import Middleware
from psycopg.errors import Error
from setproctitle import setthreadtitle
from structlog.stdlib import get_logger
from authentik import authentik_full_version
from authentik.events.models import Event, EventAction
from authentik.lib.config import CONFIG
from authentik.lib.sentry import should_ignore_exception
from authentik.lib.utils.reflection import class_to_path
from authentik.root.monitoring import monitoring_set
from authentik.root.signals import post_startup, pre_startup, startup
from authentik.tasks.models import Task, TaskLog, TaskStatus, WorkerStatus
from authentik.tasks.models import Task, TaskLog, TaskStatus
from authentik.tenants.models import Tenant
from authentik.tenants.utils import get_current_tenant
@@ -193,154 +176,26 @@ class DescriptionMiddleware(Middleware):
return {"description"}
class _healthcheck_handler(BaseHTTPRequestHandler):
def log_request(self, code="-", size="-"):
HEALTHCHECK_LOGGER.info(
self.path,
method=self.command,
status=code,
)
def log_error(self, format, *args):
HEALTHCHECK_LOGGER.warning(format, *args)
def do_HEAD(self):
try:
for db_conn in connections.all():
# Force connection reload
db_conn.connect()
_ = db_conn.cursor()
self.send_response(200)
except DB_ERRORS: # pragma: no cover
self.send_response(503)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", "0")
self.end_headers()
do_GET = do_HEAD
class WorkerHealthcheckMiddleware(Middleware):
thread: HTTPServerThread | None
def __init__(self):
listen = CONFIG.get("listen.http", ["[::]:9000"])
if isinstance(listen, str):
listen = listen.split(",")
host, _, port = listen[0].rpartition(":")
try:
port = int(port)
except ValueError:
LOGGER.error(f"Invalid port entered: {port}")
self.host, self.port = host, port
def after_worker_boot(self, broker: Broker, worker: Worker):
self.thread = HTTPServerThread(
target=WorkerHealthcheckMiddleware.run, args=(self.host, self.port)
)
self.thread.start()
def before_worker_shutdown(self, broker: Broker, worker: Worker):
server = self.thread.server
if server:
server.shutdown()
LOGGER.debug("Stopping WorkerHealthcheckMiddleware")
self.thread.join()
@staticmethod
def run(addr: str, port: int):
setthreadtitle("authentik Worker Healthcheck server")
try:
server = HTTPServer((addr, port), _healthcheck_handler)
thread = cast(HTTPServerThread, current_thread())
thread.server = server
server.serve_forever()
except OSError as exc:
get_logger(__name__, type(WorkerHealthcheckMiddleware)).warning(
"Port is already in use, not starting healthcheck server",
exc=exc,
)
class WorkerStatusMiddleware(Middleware):
thread: Thread | None
thread_event: TEvent | None
def after_worker_boot(self, broker: Broker, worker: Worker):
self.thread_event = TEvent()
self.thread = Thread(target=WorkerStatusMiddleware.run, args=(self.thread_event,))
self.thread.start()
def before_worker_shutdown(self, broker: Broker, worker: Worker):
self.thread_event.set()
LOGGER.debug("Stopping WorkerStatusMiddleware")
self.thread.join()
@staticmethod
def run(event: TEvent):
setthreadtitle("authentik Worker status")
with transaction.atomic():
hostname = socket.gethostname()
WorkerStatus.objects.filter(hostname=hostname).delete()
status, _ = WorkerStatus.objects.update_or_create(
hostname=hostname,
version=authentik_full_version(),
)
while not event.is_set():
try:
WorkerStatusMiddleware.keep(event, status)
except DB_ERRORS: # pragma: no cover
event.wait(10)
try:
connections.close_all()
except DB_ERRORS:
pass
@staticmethod
def keep(event: TEvent, status: WorkerStatus):
lock_id = f"goauthentik.io/worker/status/{status.pk}"
with pglock.advisory(lock_id, side_effect=pglock.Raise):
while not event.is_set():
status.refresh_from_db()
old_last_seen = status.last_seen
status.last_seen = now()
if old_last_seen != status.last_seen:
status.save(update_fields=("last_seen",))
event.wait(30)
class _MetricsHandler(BaseMetricsHandler):
def do_GET(self) -> None:
monitoring_set.send_robust(self)
return super().do_GET()
class MetricsMiddleware(BaseMetricsMiddleware):
thread: HTTPServerThread | None
handler_class = _MetricsHandler
@property
def forks(self) -> list[Callable[[], None]]:
return []
def after_worker_boot(self, broker: Broker, worker: Worker):
listen = CONFIG.get("listen.metrics", ["[::]:9300"])
if isinstance(listen, str):
listen = listen.split(",")
addr, _, port = listen[0].rpartition(":")
def before_worker_boot(self, broker: Broker, worker: Any) -> None:
if settings.TEST:
return super().before_worker_boot(broker, worker)
try:
port = int(port)
except ValueError:
LOGGER.error(f"Invalid port entered: {port}")
self.thread = HTTPServerThread(target=MetricsMiddleware.run, args=(addr, port))
self.thread.start()
from prometheus_client import values
from prometheus_client.values import MultiProcessValue
def before_worker_shutdown(self, broker: Broker, worker: Worker):
server = self.thread.server
if server:
server.shutdown()
LOGGER.debug("Stopping MetricsMiddleware")
self.thread.join()
values.ValueClass = MultiProcessValue(lambda: worker.worker_id)
return super().before_worker_boot(broker, worker)
def after_worker_shutdown(self, broker: Broker, worker: Any) -> None:
if settings.TEST:
return
from prometheus_client import multiprocess
multiprocess.mark_process_dead(worker.worker_id)

View File

@@ -2,7 +2,6 @@
from datetime import timedelta
import pglock
from django.db.models import Count
from django.dispatch import receiver
from django.utils.timezone import now
@@ -31,24 +30,15 @@ GAUGE_TASKS_QUEUED = Gauge(
)
_version = parse(authentik_full_version())
@receiver(monitoring_set)
def monitoring_set_workers(sender, **kwargs):
"""Set worker gauge"""
worker_version_count = {}
for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)):
lock_id = f"goauthentik.io/worker/status/{status.pk}"
with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired:
# The worker doesn't hold the lock, it isn't running
if acquired:
continue
version_matching = parse(status.version) == _version
worker_version_count.setdefault(
status.version, {"count": 0, "matching": version_matching}
)
worker_version_count[status.version]["count"] += 1
our_version = parse(authentik_full_version())
for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(seconds=45)):
version_matching = parse(status.version) == our_version
worker_version_count.setdefault(status.version, {"count": 0, "matching": version_matching})
worker_version_count[status.version]["count"] += 1
for version, stats in worker_version_count.items():
OLD_GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])
GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])

View File

@@ -10,7 +10,6 @@ from dramatiq.results.middleware import Results
from dramatiq.worker import Worker, _ConsumerThread, _WorkerThread
from authentik.tasks.broker import PostgresBroker
from authentik.tasks.middleware import WorkerHealthcheckMiddleware
TESTING_QUEUE = "testing"
@@ -18,6 +17,7 @@ TESTING_QUEUE = "testing"
class TestWorker(Worker):
def __init__(self, broker: Broker):
super().__init__(broker=broker)
self.worker_id = 1000
self.work_queue = PriorityQueue()
self.consumers = {
TESTING_QUEUE: _ConsumerThread(
@@ -82,8 +82,6 @@ def use_test_broker():
middleware: Middleware = import_string(middleware_class)(
**middleware_kwargs,
)
if isinstance(middleware, WorkerHealthcheckMiddleware):
middleware.port = 9102
if isinstance(middleware, Retries):
middleware.max_retries = 0
if isinstance(middleware, Results):

View File

@@ -7,9 +7,7 @@ import (
"net/http"
"os"
"path"
"strconv"
"strings"
"syscall"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@@ -18,8 +16,6 @@ import (
"goauthentik.io/internal/web"
)
var workerPidFile = path.Join(os.TempDir(), "authentik-worker.pid")
var healthcheckCmd = &cobra.Command{
Use: "healthcheck",
Run: func(cmd *cobra.Command, args []string) {
@@ -31,9 +27,9 @@ var healthcheckCmd = &cobra.Command{
log.WithField("mode", mode).Debug("checking health")
switch strings.ToLower(mode) {
case "server":
exitCode = checkServer()
exitCode = check(fmt.Sprintf("http://localhost%s-/health/live/", config.Get().Web.Path))
case "worker":
exitCode = checkWorker()
exitCode = check("http://localhost/-/health/live/")
default:
log.Warn("Invalid mode")
}
@@ -45,7 +41,7 @@ func init() {
rootCmd.AddCommand(healthcheckCmd)
}
func checkServer() int {
func check(url string) int {
h := &http.Client{
Transport: utils.NewUserAgentTransport("goauthentik.io/healthcheck",
&http.Transport{
@@ -55,7 +51,6 @@ func checkServer() int {
},
),
}
url := fmt.Sprintf("http://localhost%s-/health/live/", config.Get().Web.Path)
res, err := h.Head(url)
if err != nil {
log.WithError(err).Warning("failed to send healthcheck request")
@@ -68,29 +63,3 @@ func checkServer() int {
log.Debug("successfully checked health")
return 0
}
func checkWorker() int {
pidB, err := os.ReadFile(workerPidFile)
if err != nil {
log.WithError(err).Warning("failed to check worker PID file")
return 1
}
pidS := strings.TrimSpace(string(pidB[:]))
pid, err := strconv.Atoi(pidS)
if err != nil {
log.WithError(err).Warning("failed to find worker process PID")
return 1
}
process, err := os.FindProcess(pid)
if err != nil {
log.WithError(err).Warning("failed to find worker process")
return 1
}
err = process.Signal(syscall.Signal(0))
if err != nil {
log.WithError(err).Warning("failed to signal worker process")
return 1
}
log.Info("successfully checked health")
return 0
}

View File

@@ -1,10 +1,8 @@
#!/usr/bin/env -S bash
set -e -o pipefail
MODE_FILE="${TMPDIR}/authentik-mode"
#!/usr/bin/env bash
if [[ -z "${PROMETHEUS_MULTIPROC_DIR}" ]]; then
export PROMETHEUS_MULTIPROC_DIR="${TMPDIR:-/tmp}/authentik_prometheus_tmp"
fi
set -e -o pipefail
MODE_FILE="$TMPDIR/authentik-mode"
function log {
printf '{"event": "%s", "level": "info", "logger": "bootstrap"}\n' "$@" >&2
@@ -15,10 +13,41 @@ function wait_for_db {
log "Bootstrap completed"
}
function check_if_root {
function run_authentik {
case "$1" in
server)
shift 1
echo -n server >"$MODE_FILE"
if [[ -x "$(command -v authentik-server)" ]]; then
echo authentik-server "$@"
else
echo go run ./cmd/server "$@"
fi
;;
healthcheck)
if [[ -x "$(command -v authentik-server)" ]]; then
echo authentik-server "$@"
else
echo go run ./cmd/server "$@"
fi
;;
worker)
if [[ -x "$(command -v authentik)" ]]; then
echo authentik "$@"
else
echo cargo run -- "$@"
fi
;;
*)
echo "$@"
;;
esac
}
function check_if_root_and_run {
if [[ $EUID -ne 0 ]]; then
log "Not running as root, disabling permission fixes"
exec $1
exec $(run_authentik "$@")
return
fi
SOCKET="/var/run/docker.sock"
@@ -26,36 +55,19 @@ function check_if_root {
if [[ -e "$SOCKET" ]]; then
# Get group ID of the docker socket, so we can create a matching group and
# add ourselves to it
DOCKER_GID=$(stat -c '%g' $SOCKET)
DOCKER_GID="$(stat -c "%g" "${SOCKET}")"
# Ensure group for the id exists
getent group $DOCKER_GID || groupadd -f -g $DOCKER_GID docker
usermod -a -G $DOCKER_GID authentik
getent group "${DOCKER_GID}" || groupadd -f -g "${DOCKER_GID}" docker
usermod -a -G "${DOCKER_GID}" authentik
# since the name of the group might not be docker, we need to lookup the group id
GROUP_NAME=$(getent group $DOCKER_GID | sed 's/:/\n/g' | head -1)
GROUP_NAME=$(getent group "${DOCKER_GID}" | sed 's/:/\n/g' | head -1)
GROUP="authentik:${GROUP_NAME}"
fi
# Fix permissions of certs and media
chown -R authentik:authentik /data /certs "${PROMETHEUS_MULTIPROC_DIR}"
chmod ug+rwx /data
chmod ug+rx /certs
exec chpst -u authentik:$GROUP env HOME=/authentik $1
}
function run_authentik {
if [[ -x "$(command -v authentik)" ]]; then
exec authentik $@
else
exec go run -v ./cmd/server/ $@
fi
}
function set_mode {
echo $1 >$MODE_FILE
trap cleanup EXIT
}
function cleanup {
rm -f ${MODE_FILE}
exec chpst -u authentik:"${GROUP}" env HOME=/authentik $(run_authentik "$@")
}
function prepare_debug {
@@ -72,38 +84,33 @@ function prepare_debug {
chown authentik:authentik /unittest.xml
}
if [[ -z "${PROMETHEUS_MULTIPROC_DIR}" ]]; then
export PROMETHEUS_MULTIPROC_DIR="${TMPDIR:-/tmp}/authentik_prometheus_tmp"
fi
mkdir -p "${PROMETHEUS_MULTIPROC_DIR}"
if [[ "$(python -m authentik.lib.config debugger 2>/dev/null)" == "True" ]]; then
prepare_debug
fi
if [[ "$1" == "server" ]]; then
set_mode "server"
run_authentik
elif [[ "$1" == "worker" ]]; then
set_mode "worker"
shift
# If we have bootstrap credentials set, run bootstrap tasks outside of main server
# sync, so that we can sure the first start actually has working bootstrap
# credentials
if [[ -n "${AUTHENTIK_BOOTSTRAP_PASSWORD}" || -n "${AUTHENTIK_BOOTSTRAP_TOKEN}" ]]; then
python -m manage apply_blueprint system/bootstrap.yaml || true
fi
check_if_root "python -m manage worker --pid-file ${TMPDIR}/authentik-worker.pid $@"
elif [[ "$1" == "bash" ]]; then
/bin/bash
elif [[ "$1" == "test-all" ]]; then
prepare_debug
chmod 777 /root
check_if_root "python -m manage test authentik"
elif [[ "$1" == "healthcheck" ]]; then
run_authentik healthcheck $(cat $MODE_FILE)
if [[ "$1" == "bash" ]]; then
exec /usr/bin/env -S bash "$@"
elif [[ "$1" == "dump_config" ]]; then
shift
exec python -m authentik.lib.config $@
shift 1
exec python -m authentik.lib.config "$@"
elif [[ "$1" == "debug" ]]; then
exec sleep infinity
elif [[ "$1" == "test-all" ]]; then
wait_for_db
prepare_debug
chmod 777 /root
check_if_root_and_run manage test authentik
elif [[ "$1" == "server" ]] || [[ "$1" == "worker" ]]; then
wait_for_db
check_if_root_and_run "$@"
elif [[ "$1" == "healthcheck" ]]; then
check_if_root_and_run "$@" "$(cat "$MODE_FILE")"
else
wait_for_db
exec python -m manage "$@"
fi

View File

@@ -1,6 +1,6 @@
# syntax=docker/dockerfile:1
# Stage 1: Build webui
# Stage: Build webui
FROM --platform=${BUILDPLATFORM} docker.io/library/node:24-trixie-slim@sha256:735dd688da64d22ebd9dd374b3e7e5a874635668fd2a6ec20ca1f99264294086 AS node-builder
ARG GIT_BUILD_HASH
@@ -28,23 +28,14 @@ COPY ./website /work/website/
RUN npm run build && \
npm run build:sfe
# Stage 2: Build go proxy
FROM --platform=${BUILDPLATFORM} docker.io/library/golang:1.26.2-trixie@sha256:982ae929f9a74083a242c6e25d19d7d9ed78c6e97fab639a119e90707ba819e2 AS go-builder
# Stage: Build go proxy
FROM docker.io/library/golang:1.26.2-trixie@sha256:982ae929f9a74083a242c6e25d19d7d9ed78c6e97fab639a119e90707ba819e2 AS go-builder
ARG TARGETOS
ARG TARGETARCH
ARG TARGETVARIANT
ARG GOOS=$TARGETOS
ARG GOARCH=$TARGETARCH
WORKDIR /go/src/goauthentik.io
RUN --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \
dpkg --add-architecture arm64 && \
apt-get update && \
apt-get install -y --no-install-recommends crossbuild-essential-arm64 gcc-aarch64-linux-gnu
RUN --mount=type=bind,target=/go/src/goauthentik.io/go.mod,src=./go.mod \
--mount=type=bind,target=/go/src/goauthentik.io/go.sum,src=./go.sum \
--mount=type=cache,target=/go/pkg/mod \
@@ -62,11 +53,9 @@ COPY ./packages/client-go /go/src/goauthentik.io/packages/client-go
RUN --mount=type=cache,sharing=locked,target=/go/pkg/mod \
--mount=type=cache,id=go-build-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/root/.cache/go-build \
if [ "$TARGETARCH" = "arm64" ]; then export CC=aarch64-linux-gnu-gcc && export CC_FOR_TARGET=gcc-aarch64-linux-gnu; fi && \
CGO_ENABLED=1 GOFIPS140=latest GOARM="${TARGETVARIANT#v}" \
go build -o /go/authentik ./cmd/server
CGO_ENABLED=1 GOFIPS140=latest go build -o /go/authentik-server ./cmd/server
# Stage 3: MaxMind GeoIP
# Stage: MaxMind GeoIP
FROM --platform=${BUILDPLATFORM} ghcr.io/maxmind/geoipupdate:v7.1.1@sha256:faecdca22579730ab0b7dea5aa9af350bb3c93cb9d39845c173639ead30346d2 AS geoip
ENV GEOIPUPDATE_EDITION_IDS="GeoLite2-City GeoLite2-ASN"
@@ -79,9 +68,31 @@ RUN --mount=type=secret,id=GEOIPUPDATE_ACCOUNT_ID \
mkdir -p /usr/share/GeoIP && \
/bin/sh -c "GEOIPUPDATE_LICENSE_KEY_FILE=/run/secrets/GEOIPUPDATE_LICENSE_KEY /usr/bin/entry.sh || echo 'Failed to get GeoIP database, disabling'; exit 0"
# Stage 4: Download uv
# Stage: download Rust toolchain
FROM ghcr.io/goauthentik/fips-debian:trixie-slim-fips@sha256:7726387c78b5787d2146868c2ccc8948a3591d0a5a6436f7780c8c28acc76341 AS rust-toolchain
ARG TARGETARCH
ARG TARGETVARIANT
ENV PATH="/root/.cargo/bin:$PATH"
SHELL ["/bin/sh", "-o", "pipefail", "-c"]
RUN --mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
apt-get update && \
# Required for installing pip packages
apt-get install -y --no-install-recommends \
# Build essentials
build-essential && \
curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain none && \
rustup install && \
rustup default "$(sed -n 's/channel = "\(.*\)"/\1/p' rust-toolchain.toml)" && \
rustc --version && \
cargo --version
RUN cat /root/.rustup/settings.toml
# Stage: Download uv
FROM ghcr.io/astral-sh/uv:0.11.5@sha256:555ac94f9a22e656fc5f2ce5dfee13b04e94d099e46bb8dd3a73ec7263f2e484 AS uv
# Stage 5: Base python image
# Stage: Base python image
FROM ghcr.io/goauthentik/fips-python:3.14.3-slim-trixie-fips@sha256:bf45eb77a010d76fe6abd7ae137d1b0c44b6227cd984945042135fdf05ebf8d9 AS python-base
ENV VENV_PATH="/ak-root/.venv" \
@@ -95,16 +106,53 @@ WORKDIR /ak-root/
COPY --from=uv /uv /uvx /bin/
# Stage 6: Python dependencies
# Stage: build rust binary
FROM python-base AS rust-builder
ARG TARGETARCH
ARG TARGETVARIANT
WORKDIR /build
ENV PATH="/root/.cargo/bin:$PATH"
COPY --from=rust-toolchain /root/.rustup /root/.rustup
COPY --from=rust-toolchain /root/.cargo /root/.cargo
RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache
RUN --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \
--mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
apt-get update && \
apt-get install -y --no-install-recommends \
# common dependencies
build-essential \
# aws-lc deps
cmake clang golang
# See https://github.com/aws/aws-lc-rs/issues/569
ENV AWS_LC_FIPS_SYS_CC=clang
RUN --mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
--mount=type=bind,target=Cargo.toml,src=Cargo.toml \
--mount=type=bind,target=Cargo.lock,src=Cargo.lock \
--mount=type=bind,target=.cargo/,src=.cargo/ \
--mount=type=bind,target=src/,src=src/ \
--mount=type=bind,target=packages/,src=packages/ \
--mount=type=bind,target=authentik/lib/default.yml,src=authentik/lib/default.yml \
# Required otherwise workspace discovery fails
--mount=type=bind,target=website/scripts/docsmg/,src=website/scripts/docsmg/ \
--mount=type=cache,id=cargo-git-db-$TARGETARCH$TARGETVARIANT,target=/root/.cargo/git/db/ \
--mount=type=cache,id=cargo-registry-$TARGETARCH$TARGETVARIANT,target=/root/.cargo/registry/ \
--mount=type=cache,id=rust-target-$TARGETARCH$TARGETVARIANT,target=/build/target/ \
cargo build --package authentik --no-default-features --features core --locked --release && \
cp ./target/release/authentik /bin/authentik
# Stage: Python dependencies
FROM python-base AS python-deps
ARG TARGETARCH
ARG TARGETVARIANT
RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache
ENV PATH="/root/.cargo/bin:$PATH"
RUN --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \
--mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
apt-get update && \
@@ -121,28 +169,21 @@ RUN --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/v
# python-kadmin-rs
krb5-multidev libkrb5-dev heimdal-multidev libclang-dev \
# xmlsec
libltdl-dev && \
export RUST_TOOLCHAIN="$(awk -F'\"' '/^[[:space:]]*channel[[:space:]]*=/{print $2; exit}' rust-toolchain.toml)" && \
curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain "${RUST_TOOLCHAIN}" && \
rustup default "${RUST_TOOLCHAIN}" && \
rustc --version && \
cargo --version
libltdl-dev
ENV UV_NO_BINARY_PACKAGE="cryptography lxml python-kadmin-rs xmlsec" \
# https://github.com/rust-lang/rustup/issues/2949
# Fixes issues where the rust version in the build cache is older than latest
# and rustup tries to update it, which fails
RUSTUP_PERMIT_COPY_RENAME="1"
ENV PATH="/root/.cargo/bin:$PATH"
COPY --from=rust-toolchain /root/.rustup /root/.rustup
COPY --from=rust-toolchain /root/.cargo /root/.cargo
ENV UV_NO_BINARY_PACKAGE="cryptography lxml python-kadmin-rs xmlsec"
RUN --mount=type=bind,target=pyproject.toml,src=pyproject.toml \
--mount=type=bind,target=uv.lock,src=uv.lock \
--mount=type=bind,target=packages,src=packages \
--mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
--mount=type=cache,id=uv-python-deps-$TARGETARCH$TARGETVARIANT,target=/root/.cache/uv \
RUSTUP_TOOLCHAIN="$(awk -F'\"' '/^[[:space:]]*channel[[:space:]]*=/{print $2; exit}' rust-toolchain.toml)" \
uv sync --frozen --no-install-project --no-dev
# Stage 7: Run
# Stage: Run
FROM python-base AS final-image
ARG VERSION
@@ -193,7 +234,8 @@ COPY ./manage.py /
COPY ./blueprints /blueprints
COPY ./lifecycle/ /lifecycle
COPY ./authentik/sources/kerberos/krb5.conf /etc/krb5.conf
COPY --from=go-builder /go/authentik /bin/authentik
COPY --from=rust-builder /bin/authentik /bin/authentik
COPY --from=go-builder /go/authentik-server /bin/authentik-server
COPY ./packages/ /ak-root/packages
RUN ln -s /ak-root/packages /packages
COPY --from=python-deps /ak-root/.venv /ak-root/.venv

147
lifecycle/worker_process.py Executable file
View File

@@ -0,0 +1,147 @@
#!/usr/bin/env python3
import os
import random
import signal
import sys
from http.server import BaseHTTPRequestHandler, HTTPServer
from socket import AF_UNIX
from threading import Event, Thread
from typing import Any
from dramatiq import Worker, get_broker
from structlog.stdlib import get_logger
from authentik.lib.config import CONFIG
LOGGER = get_logger()
INITIAL_WORKER_ID = 1000
class HttpHandler(BaseHTTPRequestHandler):
def check_db(self):
from django.db import connections
for db_conn in connections.all():
# Force connection reload
db_conn.connect()
_ = db_conn.cursor()
def do_GET(self):
if self.path == "/-/metrics/":
from authentik.root.monitoring import monitoring_set
monitoring_set.send_robust(self)
self.send_response(200)
self.end_headers()
elif self.path == "/-/health/ready/":
from django.db.utils import OperationalError
try:
self.check_db()
except OperationalError:
self.send_response(503)
self.send_response(200)
self.end_headers()
else:
self.send_response(200)
self.end_headers()
def log_message(self, format: str, *args: Any) -> None:
pass
class UnixSocketServer(HTTPServer):
address_family = AF_UNIX
def main(worker_id: int, socket_path: str):
shutdown = Event()
try:
os.remove(socket_path)
except OSError, FileNotFoundError:
pass
srv = UnixSocketServer(socket_path, HttpHandler)
def immediate_shutdown(signum, frame):
nonlocal srv
srv.shutdown()
sys.exit(0)
def graceful_shutdown(signum, frame):
nonlocal shutdown
shutdown.set()
signal.signal(signal.SIGHUP, immediate_shutdown)
signal.signal(signal.SIGINT, immediate_shutdown)
signal.signal(signal.SIGQUIT, immediate_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)
random.seed()
logger = LOGGER.bind(worker_id=worker_id)
logger.debug("Loading broker...")
broker = get_broker()
broker.emit_after("process_boot")
logger.debug("Starting worker threads...")
queues = None # all queues
worker = Worker(broker, queues=queues, worker_threads=CONFIG.get_int("worker.threads"))
worker.worker_id = worker_id
worker.start()
logger.info("Worker process is ready for action.")
Thread(target=srv.serve_forever).start()
# Notify rust process that we are ready
os.kill(os.getppid(), signal.SIGUSR2)
shutdown.wait()
logger.info("Shutting down worker...")
# 5 secs if debug, 5 mins otherwise
worker.stop(timeout=5_000 if CONFIG.get_bool("debug") else 600_000)
srv.shutdown()
broker.close()
logger.info("Worker shut down.")
if __name__ == "__main__":
if len(sys.argv) != 3: # noqa: PLR2004
print("USAGE: worker_process <worker_id> <socket_path>")
sys.exit(1)
worker_id = int(sys.argv[1])
socket_path = sys.argv[2]
from authentik.root.setup import setup
setup()
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "authentik.root.settings")
import django
django.setup()
from django.core.management import execute_from_command_line
if worker_id == INITIAL_WORKER_ID:
from lifecycle.migrate import run_migrations
run_migrations()
if (
"AUTHENTIK_BOOTSTRAP_PASSWORD" in os.environ
or "AUTHENTIK_BOOTSTRAP_TOKEN" in os.environ
):
try:
execute_from_command_line(["", "apply_blueprint", "system/bootstrap.yaml"])
except Exception as exc: # noqa: BLE001
sys.stderr.write(f"Failed to apply bootstrap blueprint: {exc}")
main(worker_id, socket_path)

View File

@@ -17,11 +17,13 @@ if __name__ == "__main__":
if (
len(sys.argv) > 1
# Explicitly only run migrate for server and worker
and sys.argv[1] in ["dev_server", "worker"]
and sys.argv[1] in ["dev_server"]
# and don't run if this is the child process of a dev_server
and os.environ.get(DJANGO_AUTORELOAD_ENV, None) is None
):
run_migrations()
if len(sys.argv) > 1 and sys.argv[1] in ["worker"]:
raise RuntimeError(f"{sys.argv[1]} command not allowed.")
try:
from django.core.management import execute_from_command_line
except ImportError as exc:

View File

@@ -1,6 +1,7 @@
//! Utilities for working with the authentik API client.
use ak_client::apis::configuration::Configuration;
use ak_client::models::Pagination;
use eyre::{Result, eyre};
use url::Url;
@@ -60,6 +61,42 @@ pub fn make_config() -> Result<Configuration> {
})
}
/// Fetch all pages from a paginated API endpoint, returning all results combined.
///
/// - `fetch`: a function that takes a page number and returns a future resolving to a paginated
/// response.
/// - `get_pagination`: a function that extracts the [`Pagination`] metadata from a response.
/// - `get_results`: a function that extracts the result items from a response.
pub async fn fetch_all<T, R, E, F, Fut>(
fetch: F,
get_pagination: impl Fn(&R) -> &Pagination,
get_results: impl Fn(R) -> Vec<T>,
) -> std::result::Result<Vec<T>, E>
where
F: Fn(i32) -> Fut,
Fut: Future<Output = std::result::Result<R, E>>,
{
let mut page = 1;
let mut results = Vec::with_capacity(0);
loop {
let response = fetch(page).await?;
let next = get_pagination(&response).next;
if page == 1 {
let count = get_pagination(&response).count as usize;
results.reserve(count);
}
results.extend(get_results(response));
if next > 0.0 {
page += 1;
} else {
break;
}
}
Ok(results)
}
#[cfg(test)]
mod tests {
use serde_json::json;

View File

@@ -263,7 +263,7 @@ async fn watch_config(arbiter: Arbiter) -> Result<()> {
/// Start the configuration watcher.
///
/// [`init`] must be called before this is used.
pub fn run(tasks: &mut Tasks) -> Result<()> {
pub fn start(tasks: &mut Tasks) -> Result<()> {
info!("starting config file watcher");
let arbiter = tasks.arbiter();
tasks
@@ -400,7 +400,7 @@ mod tests {
let arbiter = tasks.arbiter();
let mut events_rx = arbiter.events_subscribe();
super::run(&mut tasks).expect("failed to start watcher");
super::start(&mut tasks).expect("failed to start watcher");
assert_eq!(super::get().secret_key, "my_secret_key");
assert_eq!(super::get().postgresql.password, "my_postgres_pass");

View File

@@ -30,12 +30,12 @@ pub fn install() -> Result<()> {
}
if config.debug {
let console_layer = console_subscriber::ConsoleLayer::builder()
.server_addr(config.listen.debug_tokio)
.spawn();
// let console_layer = console_subscriber::ConsoleLayer::builder()
// .server_addr(config.listen.debug_tokio)
// .spawn();
tracing_subscriber::registry()
.with(ErrorLayer::default())
.with(console_layer)
// .with(console_layer)
.with(
fmt::layer()
.compact()
@@ -180,12 +180,9 @@ pub mod sentry {
sentry_dsn: Some(config.sentry_dsn),
environment: config.environment,
send_pii: config.send_pii,
#[expect(
clippy::cast_possible_truncation,
reason = "This is fine, we'll never get big values here."
)]
#[expect(
clippy::as_conversions,
clippy::cast_possible_truncation,
reason = "This is fine, we'll never get big values here."
)]
sample_rate: config.traces_sample_rate as f32,

View File

@@ -1,102 +0,0 @@
import platform
import sys
from argparse import Namespace
from multiprocessing import set_start_method
from typing import Any
from django.apps.registry import apps
from django.core.management.base import BaseCommand, CommandParser
from django.db import connections
from django.utils.module_loading import import_string, module_has_submodule
from dramatiq.cli import main
from django_dramatiq_postgres.conf import Conf
class Command(BaseCommand):
"""Run worker"""
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--pid-file",
action="store",
default=None,
dest="pid_file",
help="PID file",
)
parser.add_argument(
"--watch",
action="store_true",
default=False,
dest="watch",
help="Watch for file changes",
)
def handle(
self,
pid_file: str,
watch: bool,
verbosity: int,
**options: Any,
) -> None:
worker = Conf().worker
setup, modules = self._discover_tasks_modules()
args = Namespace(
broker=setup,
modules=modules,
path=["."],
queues=None,
log_file=None,
skip_logging=True,
use_spawn=False,
forks=[],
worker_shutdown_timeout=600000,
watch=None,
watch_use_polling=False,
include_patterns=["**.py"],
exclude_patterns=None,
verbose=0,
)
if watch:
args.watch = worker["watch_folder"]
if worker["watch_use_polling"]:
args.watch_use_polling = True
if processes := worker["processes"]:
args.processes = processes
if threads := worker["threads"]:
args.threads = threads
if pid_file is not None:
args.pid_file = pid_file
args.verbose = verbosity - 1
# > On macOS [...] the fork start method should be considered unsafe
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
if not platform.system() == "Darwin":
set_start_method("fork")
connections.close_all()
sys.exit(main(args)) # type: ignore[no-untyped-call]
def _discover_tasks_modules(self) -> tuple[str, list[str]]:
# Does not support a tasks directory
autodiscovery = Conf().autodiscovery
modules = []
if autodiscovery["enabled"]:
for app in apps.get_app_configs():
if autodiscovery["apps_prefix"] and not app.name.startswith(
autodiscovery["apps_prefix"]
):
continue
if module_has_submodule(app.module, autodiscovery["actors_module_name"]):
modules.append(f"{app.name}.{autodiscovery['actors_module_name']}")
else:
modules_callback = autodiscovery["modules_callback"]
callback = (
modules_callback
if not isinstance(modules_callback, str)
else import_string(modules_callback)
)
modules.extend(callback())
return autodiscovery["setup_module"], modules

View File

@@ -36,7 +36,7 @@ dependencies = [
"django >=4.2,<6.0",
"django-pglock >=1.7,<2",
"django-pgtrigger >=4,<5",
"dramatiq[watch] >=1.17,<1.18",
"dramatiq >=1.17,<1.18",
"tenacity >=9,<10",
"structlog >=25,<26",
]

108
src/main.rs Normal file
View File

@@ -0,0 +1,108 @@
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(feature = "core")]
use ak_common::db;
use ak_common::{Mode, Tasks, authentik_full_version, config, tls, tracing as ak_tracing};
use argh::FromArgs;
use eyre::{Result, eyre};
use tracing::{error, info, trace};
mod metrics;
#[cfg(feature = "proxy")]
mod outpost;
#[cfg(feature = "core")]
mod server;
#[cfg(feature = "core")]
mod worker;
#[derive(Debug, FromArgs, PartialEq)]
/// The authentication glue you need.
struct Cli {
#[argh(subcommand)]
command: Command,
}
#[derive(Debug, FromArgs, PartialEq)]
#[argh(subcommand)]
enum Command {
#[cfg(feature = "core")]
Worker(worker::Cli),
#[cfg(feature = "proxy")]
Proxy(outpost::proxy::Cli),
}
fn main() -> Result<()> {
let tracing_crude = ak_tracing::install_crude();
info!(version = authentik_full_version(), "authentik is starting");
let cli: Cli = argh::from_env();
match &cli.command {
#[cfg(feature = "core")]
Command::Worker(_) => Mode::set(Mode::Worker)?,
#[cfg(feature = "proxy")]
Command::Proxy(_) => Mode::set(Mode::Proxy)?,
}
trace!("installing error formatting");
color_eyre::install()?;
#[cfg(feature = "core")]
if Mode::is_core() {
trace!("initializing Python");
pyo3::Python::initialize();
trace!("Python initialized");
}
config::init()?;
tls::init()?;
let _sentry = ak_tracing::sentry::install()?;
ak_tracing::install()?;
drop(tracing_crude);
tokio::runtime::Builder::new_multi_thread()
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-{id}")
})
.enable_all()
.build()?
.block_on(async {
let mut tasks = Tasks::new()?;
config::start(&mut tasks)?;
let metrics = metrics::start(&mut tasks)?;
#[cfg(feature = "core")]
if Mode::get() == Mode::AllInOne || Mode::get() == Mode::Worker {
db::init(&mut tasks).await?;
}
match cli.command {
#[cfg(feature = "core")]
Command::Worker(args) => {
let workers = worker::start(args, &mut tasks)?;
metrics.workers.store(Some(workers));
}
#[cfg(feature = "proxy")]
Command::Proxy(args) => {
outpost::start::<outpost::proxy::ProxyOutpost>(args, &mut tasks).await?;
}
}
let errors = tasks.run().await;
Mode::cleanup();
if errors.is_empty() {
info!("authentik exiting");
Ok(())
} else {
error!(err = ?errors, "authentik encountered errors");
Err(eyre!("Errors encountered: {:?}", errors))
}
})
}

73
src/metrics/handlers.rs Normal file
View File

@@ -0,0 +1,73 @@
use std::sync::Arc;
use ak_axum::error::Result;
use ak_common::mode::Mode;
use axum::{body::Body, extract::State, http::StatusCode, response::Response};
use tokio::task::spawn_blocking;
use super::Metrics;
pub(super) async fn metrics_handler(State(state): State<Arc<Metrics>>) -> Result<Response> {
let mut metrics = Vec::new();
state.prometheus.render_to_write(&mut metrics)?;
#[cfg(feature = "core")]
if Mode::is_core() {
if Mode::get() == Mode::Worker
&& let Some(workers) = state.workers.load_full()
{
workers.notify_metrics().await?;
}
metrics.extend(spawn_blocking(python::get_python_metrics).await??);
}
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; version=1.0.0; charset=utf-8")
.body(Body::from(metrics))?)
}
#[cfg(feature = "core")]
mod python {
use eyre::{Report, Result};
use pyo3::{
IntoPyObjectExt as _,
ffi::c_str,
prelude::*,
types::{PyBytes, PyDict},
};
pub(super) fn get_python_metrics() -> Result<Vec<u8>> {
let metrics = Python::attach(|py| {
let locals = PyDict::new(py);
Python::run(
py,
c_str!(
r#"
from prometheus_client import (
CollectorRegistry,
generate_latest,
multiprocess,
)
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
output = generate_latest(registry)
"#
),
None,
Some(&locals),
)?;
let metrics = locals
.get_item("output")?
.unwrap_or(PyBytes::new(py, &[]).into_bound_py_any(py)?)
.cast::<PyBytes>()
.map_or_else(|_| PyBytes::new(py, &[]), |v| v.to_owned())
.as_bytes()
.to_owned();
Ok::<_, Report>(metrics)
})?;
Ok::<_, Report>(metrics)
}
}

99
src/metrics/mod.rs Normal file
View File

@@ -0,0 +1,99 @@
use std::{env::temp_dir, os::unix, path::PathBuf, sync::Arc};
use ak_axum::{router::wrap_router, server};
use ak_common::{
arbiter::{Arbiter, Tasks},
config,
};
use arc_swap::ArcSwapOption;
use axum::{Router, routing::any};
use eyre::Result;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use tokio::{
task::spawn_blocking,
time::{Duration, interval},
};
use tracing::info;
#[cfg(feature = "core")]
use crate::worker::Workers;
mod handlers;
fn socket_path() -> PathBuf {
temp_dir().join("authentik-metrics.sock")
}
pub(crate) struct Metrics {
prometheus: PrometheusHandle,
#[cfg(feature = "core")]
pub(crate) workers: ArcSwapOption<Workers>,
}
impl Metrics {
fn new() -> Result<Self> {
info!("installing Prometheus recorder");
let prometheus = PrometheusBuilder::new()
.with_recommended_naming(true)
.install_recorder()?;
Ok(Self {
prometheus,
#[cfg(feature = "core")]
workers: ArcSwapOption::empty(),
})
}
}
async fn run_upkeep(arbiter: Arbiter, state: Arc<Metrics>) -> Result<()> {
info!("starting metrics upkeep runner");
let mut upkeep_interval = interval(Duration::from_secs(5));
loop {
tokio::select! {
_ = upkeep_interval.tick() => {
let state_clone = Arc::clone(&state);
spawn_blocking(move || state_clone.prometheus.run_upkeep()).await?;
},
() = arbiter.shutdown() => return Ok(())
}
}
}
fn build_router(state: Arc<Metrics>) -> Router {
wrap_router(
Router::new()
.fallback(any(handlers::metrics_handler))
.with_state(state),
true,
)
}
pub(crate) fn start(tasks: &mut Tasks) -> Result<Arc<Metrics>> {
let arbiter = tasks.arbiter();
let metrics = Arc::new(Metrics::new()?);
let router = build_router(Arc::clone(&metrics));
tasks
.build_task()
.name(&format!("{}::run_upkeep", module_path!()))
.spawn(run_upkeep(arbiter, Arc::clone(&metrics)))?;
for addr in config::get().listen.metrics.iter().copied() {
server::start_plain(
tasks,
"metrics",
router.clone(),
addr,
true, // Allow failure in case the server is running on the same machine, like in dev
)?;
}
server::start_unix(
tasks,
"metrics",
router,
unix::net::SocketAddr::from_pathname(socket_path())?,
true, // Allow failure in case the server is running on the same machine, like in dev
)?;
Ok(metrics)
}

312
src/outpost/event.rs Normal file
View File

@@ -0,0 +1,312 @@
use std::{fmt::Display, sync::Arc};
use ak_common::{Arbiter, Tasks, VERSION, api, arbiter, authentik_build_hash};
use axum::http::{HeaderValue, header::AUTHORIZATION};
use eyre::{Result, eyre};
use futures::{SinkExt as _, StreamExt as _};
use nix::unistd::gethostname;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use time::UtcDateTime;
use tokio::{
signal::unix::SignalKind,
time::{Duration, interval, sleep},
};
use tokio_tungstenite::tungstenite::{Message, client::IntoClientRequest as _};
use tracing::{debug, info, instrument, trace, warn};
use url::Url;
use crate::outpost::{Outpost, OutpostController};
#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug, Clone, Copy, Eq)]
#[repr(u8)]
enum EventKind {
/// Code used to acknowledge a previous message.
Ack = 0,
/// Code used to send a healthcheck keepalive.
Hello = 1,
/// Code received to trigger a config update.
TriggerUpdate = 2,
/// Code received to trigger some provider specific function.
ProviderSpecific = 3,
/// Code received to identify the end of a session.
SessionEnd = 4,
}
impl Display for EventKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Ack => write!(f, "Ack"),
Self::Hello => write!(f, "Hello"),
Self::TriggerUpdate => write!(f, "TriggerUpdate"),
Self::ProviderSpecific => write!(f, "ProviderSpecific"),
Self::SessionEnd => write!(f, "SessionEnd"),
}
}
}
#[derive(Serialize, Deserialize)]
struct Event {
instruction: EventKind,
args: serde_json::Value,
}
#[derive(Debug, Deserialize)]
pub(crate) struct EventSessionEnd {
session_id: String,
}
fn build_ws_url(mut url: Url, outpost_pk: &str, instance_uuid: &str, attempt: u32) -> Result<Url> {
let ws_scheme = match url.scheme() {
"https" => "wss",
"http" => "ws",
other => return Err(eyre!("Unsupported scheme for WebSocket URL: {other}")),
};
url.set_scheme(ws_scheme)
.map_err(|()| eyre!("Failed to set URL scheme to {ws_scheme}"))?;
url.set_path(&format!("{}ws/outpost/{outpost_pk}/", url.path()));
url.query_pairs_mut()
.append_pair("instance_uuid", instance_uuid)
.append_pair("attempt", &attempt.to_string());
Ok(url)
}
fn hello_args(instance_uuid: &str) -> serde_json::Value {
let raw_hostname = gethostname().unwrap_or_default();
let hostname = raw_hostname.to_string_lossy();
serde_json::json!({
"version": VERSION,
"buildHash": authentik_build_hash(None),
"uuid": instance_uuid,
// TODO: rust version and AWS-LC versions
"hostname": hostname,
})
}
#[instrument(skip_all)]
async fn handle_event<O: Outpost>(
controller: Arc<OutpostController>,
outpost: Arc<O>,
event: Event,
) -> Result<()> {
match event.instruction {
EventKind::Ack | EventKind::Hello => {}
EventKind::TriggerUpdate => {
info!("received update trigger, refreshing outpost");
sleep(controller.reload_offset).await;
controller.refresh().await?;
debug!("outpost controller has been refreshed");
outpost.refresh().await?;
debug!("outpost has been refreshed");
#[expect(
clippy::as_conversions,
clippy::cast_precision_loss,
reason = "This is fine, we'll never get big values here."
)]
controller
.m_last_update
.set(UtcDateTime::now().unix_timestamp() as f64);
}
EventKind::SessionEnd => {
let event: EventSessionEnd = serde_json::from_value(event.args)?;
outpost.end_session(event).await?;
}
#[expect(
clippy::unimplemented,
reason = "this is only relevant for the RAC provider"
)]
EventKind::ProviderSpecific => unimplemented!(),
}
Ok(())
}
async fn watch_events_inner<O: Outpost>(
arbiter: Arbiter,
controller: Arc<OutpostController>,
outpost: Arc<O>,
attempt: u32,
) -> Result<()> {
let server_config = api::ServerConfig::new()?;
let ws_url = build_ws_url(
server_config.host,
&controller.outpost.load().pk.to_string(),
&controller.instance_uuid.to_string(),
attempt,
)?;
debug!(url = %ws_url, "connecting to websocket");
let mut request = ws_url.into_client_request()?;
let token = controller
.api_config
.bearer_access_token
.as_deref()
.unwrap_or("");
request.headers_mut().insert(
AUTHORIZATION,
HeaderValue::from_str(&format!("Bearer {token}"))?,
);
let (ws_stream, _response) = tokio_tungstenite::connect_async(request).await?;
let (mut ws_write, mut ws_read) = ws_stream.split();
info!(
outpost = %controller.outpost.load().pk,
"connected to websocket"
);
controller.m_connection.set(1_u8);
let get_refresh_interval = || {
let mut interval = controller.outpost.load().refresh_interval_s;
// Ensure timer interval is not negative or 0.
// If it is, we default to 5 minutes.
if interval <= 0_i32 {
interval = 60_i32 * 5_i32;
}
// Clamp interval to be at least 30 seconds.
if interval < 30_i32 {
interval = 30_i32;
}
// infallible because we bound it to be positive above
Duration::from_secs(interval.try_into().expect("infallible"))
};
let mut refresh_interval = interval(get_refresh_interval());
let mut heartbeat_interval = interval(Duration::from_secs(10));
let mut events_rx = arbiter.events_subscribe();
loop {
tokio::select! {
_ = refresh_interval.tick() => {
info!("refreshing outpost on interval");
if let Err(err) = handle_event(
Arc::clone(&controller),
Arc::clone(&outpost),
Event {
instruction: EventKind::TriggerUpdate,
args: serde_json::Value::Null
}
).await {
warn!(?err, "failed to refresh");
}
refresh_interval = interval(get_refresh_interval());
// Since we re-create the interval, we need to make it tick instantly to avoid
// ending up in a never-ending tick-loop.
refresh_interval.tick().await;
},
_ = heartbeat_interval.tick() => {
let ping = Event {
instruction: EventKind::Hello,
args: hello_args(&controller.instance_uuid.to_string()),
};
ws_write.send(Message::text(serde_json::to_string(&ping)?)).await?;
trace!("sent websocket hello (heartbeat)");
},
Ok(arbiter::Event::Signal(signal)) = events_rx.recv() => {
if signal == SignalKind::user_defined1() {
info!("refreshing outpost on signal");
if let Err(err) = handle_event(
Arc::clone(&controller),
Arc::clone(&outpost),
Event {
instruction: EventKind::TriggerUpdate,
args: serde_json::Value::Null
}
).await {
warn!(?err, "failed to refresh");
}
}
},
msg = ws_read.next() => {
let Some(msg) = msg else {
break;
};
let msg = msg?;
match msg {
Message::Text(text) => {
let Ok(event): Result<Event, _> = serde_json::from_str(&text) else {
warn!(data = text.as_str(), "failed to parse event");
continue;
};
trace!(event = %event.instruction, "received websocket event");
if let Err(err) = handle_event(
Arc::clone(&controller),
Arc::clone(&outpost),
event,
).await {
warn!(?err, "failed to handle event");
}
},
Message::Ping(data) => {
ws_write.send(Message::Pong(data)).await?;
},
Message::Close(_) => {
break;
},
_ => {},
}
},
() = arbiter.shutdown() => break,
}
}
Ok(())
}
async fn watch_events<O: Outpost>(
arbiter: Arbiter,
controller: Arc<OutpostController>,
outpost: Arc<O>,
) -> Result<()> {
const MAX_BACKOFF: Duration = Duration::from_mins(5);
let mut backoff = Duration::from_secs(1);
let mut attempt: u32 = 0;
loop {
tokio::select! {
() = arbiter.shutdown() => break,
res = watch_events_inner(
arbiter.clone(),
Arc::clone(&controller),
Arc::clone(&outpost),
attempt
) => {
controller.m_connection.set(0_u8);
match res {
Ok(()) => debug!("websocket disconnected cleanly"),
Err(err) => warn!(?err, attempt, "websocket error"),
}
info!(attempt, delay = backoff.as_secs(), "reconnecting websocket in {}s...", backoff.as_secs());
tokio::select! {
() = arbiter.shutdown() => break,
() = sleep(backoff) => {}
}
backoff = (backoff * 2).min(MAX_BACKOFF);
attempt += 1;
}
}
}
info!("stopping event watcher");
Ok(())
}
pub(crate) fn start<O: Outpost + 'static>(
tasks: &mut Tasks,
controller: Arc<OutpostController>,
outpost: Arc<O>,
) -> Result<()> {
let arbiter = tasks.arbiter();
tasks
.build_task()
.name(&format!("{}::watch_events", module_path!()))
.spawn(watch_events(arbiter, controller, outpost))?;
Ok(())
}

123
src/outpost/mod.rs Normal file
View File

@@ -0,0 +1,123 @@
use std::{sync::Arc, time::Duration};
use ak_client::{
apis::{configuration::Configuration, outposts_api::outposts_instances_list},
models::Outpost as OutpostModel,
};
use ak_common::{Tasks, VERSION, api, authentik_build_hash};
use arc_swap::ArcSwap;
use eyre::{Result, eyre};
use tracing::{debug, info, instrument};
use uuid::Uuid;
pub(crate) mod event;
#[cfg(feature = "proxy")]
pub(crate) mod proxy;
pub(crate) trait Outpost: Send + Sync + Sized {
const OUTPOST_TYPE: &'static str;
type Cli: Send + Sync;
async fn new(controller: Arc<OutpostController>) -> Result<Self>;
fn start(&self, tasks: &mut Tasks) -> Result<()>;
fn refresh(&self) -> impl Future<Output = Result<()>> + Send;
fn end_session(&self, event: event::EventSessionEnd)
-> impl Future<Output = Result<()>> + Send;
}
#[derive(Debug)]
pub(crate) struct OutpostController {
api_config: Configuration,
outpost: ArcSwap<OutpostModel>,
instance_uuid: Uuid,
reload_offset: Duration,
m_info: metrics::Gauge,
m_last_update: metrics::Gauge,
m_connection: metrics::Gauge,
}
impl OutpostController {
#[instrument(skip_all)]
async fn get_outpost(api_config: &Configuration) -> Result<OutpostModel> {
let outposts = outposts_instances_list(
api_config, None, None, None, None, None, None, None, None, None, None, None, None,
)
.await?;
let Some(outpost) = outposts.results.into_iter().next() else {
return Err(eyre!(
"No outposts found with given token, ensure the given token corresponds to an \
authentik Outpost"
));
};
debug!(name = outpost.name, "fetched outpost configuration");
Ok(outpost)
}
#[instrument(skip_all)]
async fn new<O: Outpost>() -> Result<Self> {
let api_config = api::make_config()?;
let outpost = Self::get_outpost(&api_config).await?;
let instance_uuid = Uuid::new_v4();
let m_labels = [
("outpost_name", outpost.name.clone()),
("outpost_type", O::OUTPOST_TYPE.to_owned()),
("uuid", instance_uuid.to_string()),
("version", VERSION.to_owned()),
("build", authentik_build_hash(None)),
];
metrics::describe_gauge!("authentik_outpost_info", "Outpost info");
let m_info = metrics::gauge!("authentik_outpost_info", &m_labels);
metrics::describe_gauge!("authentik_outpost_last_update", "Time of last update");
let m_last_update = metrics::gauge!("authentik_outpost_last_update", &m_labels);
metrics::describe_gauge!("authentik_outpost_connection", "Connection status");
let m_connection = metrics::gauge!("authentik_outpost_connection", &m_labels);
let reload_offset = Duration::from_secs(rand::random_range(0..10));
let controller = Self {
api_config,
outpost: ArcSwap::from_pointee(outpost),
instance_uuid,
reload_offset,
m_info,
m_last_update,
m_connection,
};
info!(embedded = controller.is_embedded(), "outpost mode");
debug!(?reload_offset, "HA Reload offset");
Ok(controller)
}
fn is_embedded(&self) -> bool {
self.outpost
.load()
.managed
.as_ref()
.and_then(|m| m.as_deref())
.is_some_and(|m| m == "goauthentik.io/outposts/embedded")
}
async fn refresh(&self) -> Result<()> {
let outpost = Self::get_outpost(&self.api_config).await?;
self.outpost.swap(Arc::new(outpost));
Ok(())
}
}
#[instrument(skip_all)]
pub(crate) async fn start<O: Outpost + 'static>(_cli: O::Cli, tasks: &mut Tasks) -> Result<()> {
let controller = Arc::new(OutpostController::new::<O>().await?);
let outpost = Arc::new(O::new(Arc::clone(&controller)).await?);
event::start(tasks, Arc::clone(&controller), Arc::clone(&outpost))?;
outpost.start(tasks)?;
controller.m_info.set(1_u8);
Ok(())
}

View File

@@ -0,0 +1,38 @@
use url::Url;
use ak_client::models::ProxyOutpostConfig;
use eyre::{Result, eyre};
use tracing::instrument;
use crate::outpost::proxy::ProxyOutpost;
const REDIRECT_PARAM: &str = "rd";
const CALLBACK_SIGNATURE: &str = "X-authentik-auth-callback";
const LOGOUT_SIGNATURE: &str = "X-authentik-logout";
pub(super) struct Application {
pub(super) host: String,
}
impl Application {
#[instrument(skip_all)]
pub(super) fn new(
_existing_apps: &ProxyOutpost,
provider: &ProxyOutpostConfig,
) -> Result<Self> {
let external_url = Url::parse(&provider.external_host)?;
let external_host = external_url
.host_str()
.ok_or_else(|| eyre!("no host in external host"))?;
let _redirect_url = {
let mut redirect_url = external_url.join("outpost.goauthentik.io/callback")?;
redirect_url.set_query(Some(&format!("{CALLBACK_SIGNATURE}=true")));
redirect_url
};
Ok(Self {
host: external_host.to_owned(),
})
}
}

151
src/outpost/proxy/mod.rs Normal file
View File

@@ -0,0 +1,151 @@
use ak_axum::extract::host::Host;
use axum::extract::State;
use axum::http::Method;
use axum::routing::any;
use metrics::{Histogram, histogram};
use std::{collections::HashMap, sync::Arc};
use tokio::time::Instant;
use ak_axum::router::wrap_router;
use ak_client::apis::outposts_api::outposts_proxy_list;
use ak_common::{Tasks, api::fetch_all};
use arc_swap::ArcSwap;
use argh::FromArgs;
use axum::Router;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use eyre::Result;
use tracing::{debug, error, info, instrument, warn};
use crate::outpost::proxy::application::Application;
use crate::outpost::{Outpost, OutpostController};
mod application;
#[derive(Debug, Default, FromArgs, PartialEq, Eq)]
/// Run the authentik proxy outpost.
#[argh(subcommand, name = "proxy")]
#[expect(
clippy::empty_structs_with_brackets,
reason = "argh doesn't support unit structs"
)]
pub(crate) struct Cli {}
pub(crate) struct ProxyOutpost {
controller: Arc<OutpostController>,
applications: ArcSwap<HashMap<String, Application>>,
}
impl Outpost for ProxyOutpost {
type Cli = Cli;
const OUTPOST_TYPE: &'static str = "proxy";
#[instrument(skip_all)]
async fn new(controller: Arc<OutpostController>) -> Result<Self> {
Ok(Self {
controller,
applications: ArcSwap::from_pointee(HashMap::with_capacity(0)),
})
}
fn start(&self, _tasks: &mut Tasks) -> Result<()> {
Ok(())
}
#[instrument(skip_all)]
async fn refresh(&self) -> Result<()> {
debug!(
outpost_pk = %self.controller.outpost.load().pk,
"requesting providers for outpost"
);
let providers = fetch_all(
|page| {
outposts_proxy_list(
&self.controller.api_config,
None,
None,
Some(page),
Some(100_i32),
None,
)
},
|r| &r.pagination,
|r| r.results,
)
.await
.inspect_err(|err| error!(?err, "failed to fetch providers"))?;
debug!(count = providers.len(), "fetched providers");
if providers.is_empty() && !self.controller.is_embedded() {
warn!(
"no providers assigned to this outpost, check outpost configuration in authentik"
);
}
for (i, provider) in providers.iter().enumerate() {
debug!(
index = i,
name = provider.name,
external_host = provider.external_host,
assigned_to_app = provider.assigned_application_name,
"provider details"
);
}
let mut apps = HashMap::with_capacity(providers.len());
for provider in providers {
let Ok(application) = Application::new(self, &provider)
.inspect_err(|err| warn!(?err, "failed to setup application, skipping provider"))
else {
continue;
};
info!(
name = provider.name,
host = application.host,
"loaded application"
);
apps.insert(application.host.clone(), application);
}
self.applications.store(Arc::new(apps));
Ok(())
}
async fn end_session(&self, _event: super::event::EventSessionEnd) -> Result<()> {
// todo!()
warn!(?_event, "removing session");
Ok(())
}
}
async fn handle_ping(
method: Method,
Host(host): Host,
State(outpost): State<Arc<ProxyOutpost>>,
) -> impl IntoResponse {
let start = Instant::now();
histogram!(
"authentik_outpost_proxy_request_duration_seconds",
"outpost_name" => outpost.controller.outpost.load().name.clone(),
"method" => method.to_string(),
"host" => host,
"type" => "ping",
)
.record(start.elapsed().as_secs_f64());
StatusCode::NO_CONTENT
}
fn build_router(outpost: Arc<ProxyOutpost>) -> Router {
// TODO: static files
wrap_router(
Router::new()
.route("outpost.goauthentik.io/ping", any(handle_ping))
.with_state(outpost),
true,
)
}

5
src/server/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
use std::{env::temp_dir, path::PathBuf};
pub(crate) fn socket_path() -> PathBuf {
temp_dir().join("authentik.sock")
}

42
src/worker/healthcheck.rs Normal file
View File

@@ -0,0 +1,42 @@
use std::sync::Arc;
use ak_axum::{error::Result, router::wrap_router};
use ak_common::db;
use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::any};
use super::Workers;
async fn health_ready(State(workers): State<Arc<Workers>>) -> Result<StatusCode> {
if !workers.are_alive().await || sqlx::query("SELECT 1").execute(db::get()).await.is_err() {
Ok(StatusCode::SERVICE_UNAVAILABLE)
} else if workers.health_ready().await? {
Ok(StatusCode::OK)
} else {
Ok(StatusCode::SERVICE_UNAVAILABLE)
}
}
async fn health_live(State(workers): State<Arc<Workers>>) -> Result<StatusCode> {
if !workers.are_alive().await || sqlx::query("SELECT 1").execute(db::get()).await.is_err() {
Ok(StatusCode::SERVICE_UNAVAILABLE)
} else if workers.health_live().await? {
Ok(StatusCode::OK)
} else {
Ok(StatusCode::SERVICE_UNAVAILABLE)
}
}
async fn fallback() -> impl IntoResponse {
StatusCode::OK
}
pub(super) fn build_router(workers: Arc<Workers>) -> Router {
wrap_router(
Router::new()
.route("/-/heath/ready/", any(health_ready))
.route("/-/heath/live/", any(health_live))
.fallback(fallback)
.with_state(workers),
true,
)
}

338
src/worker/mod.rs Normal file
View File

@@ -0,0 +1,338 @@
use std::{
env::temp_dir,
os::unix,
path::PathBuf,
process::Stdio,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use ak_common::{
Event,
arbiter::{Arbiter, Tasks},
config,
mode::Mode,
};
use argh::FromArgs;
use axum::{
body::Body,
http::{Request, header::HOST},
};
use eyre::{Result, eyre};
use hyper_unix_socket::UnixSocketConnector;
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use nix::{
sys::signal::{Signal, kill},
unistd::Pid,
};
use tokio::{
net::UnixStream,
process::{Child, Command},
signal::unix::SignalKind,
sync::Mutex,
time::{Duration, interval},
};
use tracing::{info, trace, warn};
use crate::server::socket_path;
mod healthcheck;
mod worker_status;
#[derive(Debug, Default, FromArgs, PartialEq, Eq)]
/// Run the authentik worker.
#[argh(subcommand, name = "worker")]
#[expect(
clippy::empty_structs_with_brackets,
reason = "argh doesn't support unit structs"
)]
pub(crate) struct Cli {}
const INITIAL_WORKER_ID: usize = 1000;
static INITIAL_WORKER_READY: AtomicBool = AtomicBool::new(false);
pub(crate) struct Worker {
worker: Child,
client: Client<UnixSocketConnector<PathBuf>, Body>,
socket_path: PathBuf,
}
impl Worker {
fn new(worker_id: usize, socket_path: PathBuf) -> Result<Self> {
info!(worker_id, "starting worker");
let mut cmd = Command::new("python");
cmd.arg("-m");
cmd.arg("lifecycle.worker_process");
cmd.arg(worker_id.to_string());
cmd.arg(&socket_path);
let client = Client::builder(TokioExecutor::new())
.pool_idle_timeout(Duration::from_mins(1))
.set_host(false)
.build(UnixSocketConnector::new(socket_path.clone()));
Ok(Self {
worker: cmd
.kill_on_drop(true)
.stdin(Stdio::null())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()?,
client,
socket_path,
})
}
async fn shutdown(&mut self, signal: Signal) -> Result<()> {
trace!(
signal = signal.as_str(),
"sending shutdown signal to worker"
);
if let Some(id) = self.worker.id() {
kill(Pid::from_raw(id.cast_signed()), signal)?;
}
self.worker.wait().await?;
Ok(())
}
async fn graceful_shutdown(&mut self) -> Result<()> {
info!("gracefully shutting down worker");
self.shutdown(Signal::SIGTERM).await
}
async fn fast_shutdown(&mut self) -> Result<()> {
info!("immediately shutting down worker");
self.shutdown(Signal::SIGINT).await
}
fn is_alive(&mut self) -> bool {
let try_wait = self.worker.try_wait();
match try_wait {
Ok(Some(code)) => {
warn!(?code, "worker has exited");
false
}
Ok(None) => true,
Err(err) => {
warn!(
?err,
"failed to check the status of worker process, ignoring"
);
true
}
}
}
async fn is_socket_ready(&self) -> bool {
let result = UnixStream::connect(&self.socket_path).await;
trace!(?result, "checking if worker socket is ready");
result.is_ok()
}
async fn health_live(&self) -> Result<bool> {
let req = Request::builder()
.method("GET")
.uri("http://localhost:8000/-/health/live/")
.header(HOST, "localhost")
.body(Body::from(""))?;
Ok(self.client.request(req).await?.status().is_success())
}
async fn health_ready(&self) -> Result<bool> {
let req = Request::builder()
.method("GET")
.uri("http://localhost:8000/-/health/ready/")
.header(HOST, "localhost")
.body(Body::from(""))?;
Ok(self.client.request(req).await?.status().is_success())
}
async fn notify_metrics(&self) -> Result<()> {
let req = Request::builder()
.method("GET")
.uri("http://localhost:8000/-/metrics/")
.header(HOST, "localhost")
.body(Body::from(""))?;
self.client.request(req).await?;
Ok(())
}
}
impl Drop for Worker {
fn drop(&mut self) {
if let Err(err) = std::fs::remove_file(&self.socket_path) {
trace!(?err, "failed to remove socket, ignoring");
}
}
}
pub(crate) struct Workers(Mutex<Vec<Worker>>);
impl Workers {
fn new() -> Result<Self> {
let mut workers = Vec::with_capacity(config::get().worker.processes.get());
workers.push(Worker::new(
INITIAL_WORKER_ID,
temp_dir().join(format!("authentik-worker-{INITIAL_WORKER_ID}.sock")),
)?);
Ok(Self(Mutex::new(workers)))
}
async fn start_other_workers(&self) -> Result<()> {
let mut workers = self.0.lock().await;
while workers.len() != config::get().worker.processes.get() {
let worker_id = INITIAL_WORKER_ID + workers.len();
workers.push(Worker::new(
worker_id,
temp_dir().join(format!("authentik-worker-{worker_id}.sock")),
)?);
}
Ok(())
}
async fn graceful_shutdown(&self) -> Result<()> {
let mut results = Vec::with_capacity(self.0.lock().await.capacity());
for worker in self.0.lock().await.iter_mut() {
results.push(worker.graceful_shutdown().await);
}
results.into_iter().find(Result::is_err).unwrap_or(Ok(()))
}
async fn fast_shutdown(&self) -> Result<()> {
let mut results = Vec::with_capacity(self.0.lock().await.capacity());
for worker in self.0.lock().await.iter_mut() {
results.push(worker.fast_shutdown().await);
}
results.into_iter().find(Result::is_err).unwrap_or(Ok(()))
}
async fn are_alive(&self) -> bool {
for worker in self.0.lock().await.iter_mut() {
if !worker.is_alive() {
return false;
}
}
true
}
async fn is_socket_ready(&self) -> bool {
if let Some(initial_worker) = self.0.lock().await.iter_mut().next() {
return initial_worker.is_socket_ready().await;
}
false
}
async fn health_live(&self) -> Result<bool> {
for worker in self.0.lock().await.iter() {
if !worker.health_live().await? {
return Ok(false);
}
}
Ok(true)
}
async fn health_ready(&self) -> Result<bool> {
for worker in self.0.lock().await.iter() {
if !worker.health_ready().await? {
return Ok(false);
}
}
Ok(true)
}
pub(crate) async fn notify_metrics(&self) -> Result<()> {
if let Some(worker) = self.0.lock().await.iter().next() {
worker.notify_metrics().await?;
}
Ok(())
}
}
async fn watch_workers(arbiter: Arbiter, workers: Arc<Workers>) -> Result<()> {
info!("starting worker watcher");
let mut events_rx = arbiter.events_subscribe();
let mut check_interval = interval(Duration::from_secs(5));
let mut start_interval = interval(Duration::from_secs(1));
loop {
tokio::select! {
Ok(Event::Signal(signal)) = events_rx.recv() => {
if signal == SignalKind::user_defined2() && !INITIAL_WORKER_READY.load(Ordering::Relaxed) {
info!("worker notified us ready, marked ready for operation");
INITIAL_WORKER_READY.store(true, Ordering::Relaxed);
workers.start_other_workers().await?;
}
},
_ = start_interval.tick(), if !INITIAL_WORKER_READY.load(Ordering::Relaxed) => {
// On some platforms the SIGUSR1 can be missed.
// Fall back to probing the worker unix socket and mark ready once it accepts connections.
if workers.is_socket_ready().await {
info!("worker socket is accepting connections, marked ready for operation");
INITIAL_WORKER_READY.store(true, Ordering::Relaxed);
workers.start_other_workers().await?;
}
},
_ = check_interval.tick() => {
if !workers.are_alive().await {
return Err(eyre!("one or more workers have exited unexpectedly"));
}
},
() = arbiter.fast_shutdown() => {
workers.fast_shutdown().await?;
return Ok(());
},
() = arbiter.graceful_shutdown() => {
workers.graceful_shutdown().await?;
return Ok(());
},
}
}
}
pub(crate) fn start(_cli: Cli, tasks: &mut Tasks) -> Result<Arc<Workers>> {
let arbiter = tasks.arbiter();
let workers = Arc::new(Workers::new()?);
tasks
.build_task()
.name(&format!("{}::watch_workers", module_path!()))
.spawn(watch_workers(arbiter.clone(), Arc::clone(&workers)))?;
tasks
.build_task()
.name(&format!("{}::worker_status::run", module_path!()))
.spawn(worker_status::run(arbiter))?;
// Only run HTTP server in worker mode, in allinone mode, they're handled by the server.
if Mode::get() == Mode::Worker {
let router = healthcheck::build_router(Arc::clone(&workers));
for addr in config::get().listen.http.iter().copied() {
ak_axum::server::start_plain(
tasks,
"worker",
router.clone(),
addr,
true, /* Allow failure in case the server is running on the same machine, like
* in dev. */
)?;
}
ak_axum::server::start_unix(
tasks,
"worker",
router,
unix::net::SocketAddr::from_pathname(socket_path())?,
true, // Allow failure in case the server is running on the same machine, like in dev.
)?;
}
Ok(workers)
}

View File

@@ -0,0 +1,48 @@
use ak_common::{arbiter::Arbiter, authentik_full_version, db};
use eyre::Result;
use nix::unistd::gethostname;
use tokio::time::{Duration, interval, sleep};
use tracing::warn;
use uuid::Uuid;
async fn keep(arbiter: Arbiter, id: Uuid, hostname: &str, version: &str) -> Result<()> {
let query = "
INSERT INTO authentik_tasks_workerstatus (id, hostname, version, last_seen)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (id) DO UPDATE SET last_seen = NOW()
";
let mut keep_interval = interval(Duration::from_secs(30));
loop {
tokio::select! {
_ = keep_interval.tick() => {
sqlx::query(query)
.bind(id)
.bind(hostname)
.bind(version)
.execute(db::get())
.await?;
},
() = arbiter.shutdown() => return Ok(()),
}
}
}
pub(super) async fn run(arbiter: Arbiter) -> Result<()> {
let id = Uuid::new_v4();
let raw_hostname = gethostname()?;
let hostname = raw_hostname.to_string_lossy();
let version = authentik_full_version();
loop {
if let Err(err) = keep(arbiter.clone(), id, hostname.as_ref(), &version).await {
warn!(?err, "failed to update worker status in database");
}
// `keep` returned. It's either an error in which case we wait 10s before
// retrying.
// Or we actually need to exit, which will happen here.
tokio::select! {
() = sleep(Duration::from_secs(10)) => {},
() = arbiter.shutdown() => return Ok(()),
}
}
}

54
uv.lock generated
View File

@@ -1116,7 +1116,7 @@ dependencies = [
{ name = "django" },
{ name = "django-pglock" },
{ name = "django-pgtrigger" },
{ name = "dramatiq", extra = ["watch"] },
{ name = "dramatiq" },
{ name = "structlog" },
{ name = "tenacity" },
]
@@ -1127,7 +1127,7 @@ requires-dist = [
{ name = "django", specifier = ">=4.2,<6.0" },
{ name = "django-pglock", specifier = ">=1.7,<2" },
{ name = "django-pgtrigger", specifier = ">=4,<5" },
{ name = "dramatiq", extras = ["watch"], specifier = ">=1.17,<1.18" },
{ name = "dramatiq", specifier = ">=1.17,<1.18" },
{ name = "structlog", specifier = ">=25,<26" },
{ name = "tenacity", specifier = ">=9,<10" },
]
@@ -1375,12 +1375,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ee/36/925c7afd5db4f1a3f00676b9c3c58f31ff7ae29a347282d86c8d429280a5/dramatiq-1.17.1-py3-none-any.whl", hash = "sha256:951cdc334478dff8e5150bb02a6f7a947d215ee24b5aedaf738eff20e17913df", size = 120382, upload-time = "2024-10-26T05:09:26.436Z" },
]
[package.optional-dependencies]
watch = [
{ name = "watchdog" },
{ name = "watchdog-gevent" },
]
[[package]]
name = "drf-jsonschema-serializer"
version = "3.0.0"
@@ -1572,28 +1566,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e5/15/cf2a69ade4b194aa524ac75112d5caac37414b20a3a03e6865dfe0bd1539/geopy-2.4.1-py3-none-any.whl", hash = "sha256:ae8b4bc5c1131820f4d75fce9d4aaaca0c85189b3aa5d64c3dcaf5e3b7b882a7", size = 125437, upload-time = "2023-11-23T21:49:30.421Z" },
]
[[package]]
name = "gevent"
version = "25.9.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "cffi", marker = "platform_python_implementation == 'CPython' and sys_platform == 'win32'" },
{ name = "greenlet", marker = "platform_python_implementation == 'CPython'" },
{ name = "zope-event" },
{ name = "zope-interface" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9e/48/b3ef2673ffb940f980966694e40d6d32560f3ffa284ecaeb5ea3a90a6d3f/gevent-25.9.1.tar.gz", hash = "sha256:adf9cd552de44a4e6754c51ff2e78d9193b7fa6eab123db9578a210e657235dd", size = 5059025, upload-time = "2025-09-17T16:15:34.528Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/15/1a/948f8167b2cdce573cf01cec07afc64d0456dc134b07900b26ac7018b37e/gevent-25.9.1-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:1a3fe4ea1c312dbf6b375b416925036fe79a40054e6bf6248ee46526ea628be1", size = 2982934, upload-time = "2025-09-17T14:54:11.302Z" },
{ url = "https://files.pythonhosted.org/packages/9b/ec/726b146d1d3aad82e03d2e1e1507048ab6072f906e83f97f40667866e582/gevent-25.9.1-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:0adb937f13e5fb90cca2edf66d8d7e99d62a299687400ce2edee3f3504009356", size = 1813982, upload-time = "2025-09-17T15:41:28.506Z" },
{ url = "https://files.pythonhosted.org/packages/35/5d/5f83f17162301662bd1ce702f8a736a8a8cac7b7a35e1d8b9866938d1f9d/gevent-25.9.1-cp314-cp314-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:427f869a2050a4202d93cf7fd6ab5cffb06d3e9113c10c967b6e2a0d45237cb8", size = 1894902, upload-time = "2025-09-17T15:49:03.702Z" },
{ url = "https://files.pythonhosted.org/packages/83/cd/cf5e74e353f60dab357829069ffc300a7bb414c761f52cf8c0c6e9728b8d/gevent-25.9.1-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:c049880175e8c93124188f9d926af0a62826a3b81aa6d3074928345f8238279e", size = 1861792, upload-time = "2025-09-17T15:49:23.279Z" },
{ url = "https://files.pythonhosted.org/packages/dd/65/b9a4526d4a4edce26fe4b3b993914ec9dc64baabad625a3101e51adb17f3/gevent-25.9.1-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:b5a67a0974ad9f24721034d1e008856111e0535f1541499f72a733a73d658d1c", size = 2113215, upload-time = "2025-09-17T15:15:16.34Z" },
{ url = "https://files.pythonhosted.org/packages/e5/be/7d35731dfaf8370795b606e515d964a0967e129db76ea7873f552045dd39/gevent-25.9.1-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:1d0f5d8d73f97e24ea8d24d8be0f51e0cf7c54b8021c1fddb580bf239474690f", size = 1833449, upload-time = "2025-09-17T15:52:43.75Z" },
{ url = "https://files.pythonhosted.org/packages/65/58/7bc52544ea5e63af88c4a26c90776feb42551b7555a1c89c20069c168a3f/gevent-25.9.1-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:ddd3ff26e5c4240d3fbf5516c2d9d5f2a998ef87cfb73e1429cfaeaaec860fa6", size = 2176034, upload-time = "2025-09-17T15:24:15.676Z" },
{ url = "https://files.pythonhosted.org/packages/c2/69/a7c4ba2ffbc7c7dbf6d8b4f5d0f0a421f7815d229f4909854266c445a3d4/gevent-25.9.1-cp314-cp314-win_amd64.whl", hash = "sha256:bb63c0d6cb9950cc94036a4995b9cc4667b8915366613449236970f4394f94d7", size = 1703019, upload-time = "2025-09-17T19:30:55.272Z" },
]
[[package]]
name = "google-api-core"
version = "2.29.0"
@@ -3888,19 +3860,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/33/e8/e40370e6d74ddba47f002a32919d91310d6074130fe4e17dabcafc15cbf1/watchdog-6.0.0-py3-none-win_ia64.whl", hash = "sha256:a1914259fa9e1454315171103c6a30961236f508b9b623eae470268bbcc6a22f", size = 79067, upload-time = "2024-11-01T14:07:11.845Z" },
]
[[package]]
name = "watchdog-gevent"
version = "0.2.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "gevent" },
{ name = "watchdog" },
]
sdist = { url = "https://files.pythonhosted.org/packages/97/69/91cfca7c21c382e3a8aca4251dcd7d4315228d9346381feb2dde36d14061/watchdog_gevent-0.2.1.tar.gz", hash = "sha256:ae6b94d0f8c8ce1c5956cd865f612b61f456cf19801744bba25a349fe8e8c337", size = 4296, upload-time = "2024-10-19T05:29:12.987Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/00/a9/54b88e150b77791958957e2188312477d09fc84820fc03f8b3a7569d10b0/watchdog_gevent-0.2.1-py3-none-any.whl", hash = "sha256:e8114658104a018f626ee54052335407c1438369febc776c4b4c4308ed002350", size = 3462, upload-time = "2024-10-19T05:29:11.421Z" },
]
[[package]]
name = "watchfiles"
version = "1.1.1"
@@ -4082,15 +4041,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/2e/54/647ade08bf0db230bfea292f893923872fd20be6ac6f53b2b936ba839d75/zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e", size = 10276, upload-time = "2025-06-08T17:06:38.034Z" },
]
[[package]]
name = "zope-event"
version = "6.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/46/33/d3eeac228fc14de76615612ee208be2d8a5b5b0fada36bf9b62d6b40600c/zope_event-6.1.tar.gz", hash = "sha256:6052a3e0cb8565d3d4ef1a3a7809336ac519bc4fe38398cb8d466db09adef4f0", size = 18739, upload-time = "2025-11-07T08:05:49.934Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c2/b0/956902e5e1302f8c5d124e219c6bf214e2649f92ad5fce85b05c039a04c9/zope_event-6.1-py3-none-any.whl", hash = "sha256:0ca78b6391b694272b23ec1335c0294cc471065ed10f7f606858fc54566c25a0", size = 6414, upload-time = "2025-11-07T08:05:48.874Z" },
]
[[package]]
name = "zope-interface"
version = "8.2"

View File

@@ -92,6 +92,15 @@ For other distributions (Red Hat, SUSE, Arch), adjust the package names as neede
Install `golangci-lint` by following the [official installation instructions](https://golangci-lint.run/welcome/install/#other-ci).
:::warning
[aws-lc-rs](https://github.com/aws/aws-lc-rs) currently has an [issue](https://github.com/aws/aws-lc-rs/issues/569) building its FIPS module with GCC >= 14. If you encounter this issue, you have two options:
- Use an older version of GCC.
- Install [Clang](https://clang.llvm.org) and `export AWS_LC_FIPS_SYS_CC=clang`.
:::
</TabItem>
<TabItem value="Windows">
@@ -154,7 +163,13 @@ Both processes need to run to get a fully functioning authentik development envi
### Hot-reloading
When `AUTHENTIK_DEBUG` is set to `true` (the default for the development environment), the authentik server automatically reloads whenever changes are made to the code. However, due to instabilities in the reloading process of the worker, that behavior is turned off for the worker. You can enable code reloading in the worker by manually running `uv run ak worker --watch`.
When `AUTHENTIK_DEBUG` is set to `true` (the default for the development environment), the authentik server automatically reloads whenever changes are made to the code.
For the authentik worker, install [watchexec](https://github.com/watchexec/watchexec), and run:
```shell
make run-worker-watch
```
## 6. Build the frontend