mirror of
https://github.com/goauthentik/authentik
synced 2026-04-28 10:28:22 +02:00
Compare commits
2 Commits
dependabot
...
rust-proxy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
94a6646c78 | ||
|
|
464c1f32aa |
59
Cargo.lock
generated
59
Cargo.lock
generated
@@ -188,19 +188,29 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"argh",
|
||||
"authentik-axum",
|
||||
"authentik-client",
|
||||
"authentik-common",
|
||||
"axum",
|
||||
"color-eyre",
|
||||
"eyre",
|
||||
"futures",
|
||||
"hyper-unix-socket",
|
||||
"hyper-util",
|
||||
"metrics",
|
||||
"metrics-exporter-prometheus",
|
||||
"nix 0.31.2",
|
||||
"pyo3",
|
||||
"rand 0.10.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_repr",
|
||||
"sqlx",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-retry2",
|
||||
"tokio-tungstenite",
|
||||
"tracing",
|
||||
"url",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
@@ -551,6 +561,17 @@ version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
|
||||
|
||||
[[package]]
|
||||
name = "chacha20"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures 0.3.0",
|
||||
"rand_core 0.10.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "chrono"
|
||||
version = "0.4.44"
|
||||
@@ -788,6 +809,15 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crc"
|
||||
version = "3.4.0"
|
||||
@@ -1274,6 +1304,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"r-efi 6.0.0",
|
||||
"rand_core 0.10.1",
|
||||
"wasip2",
|
||||
"wasip3",
|
||||
]
|
||||
@@ -2764,6 +2795,17 @@ dependencies = [
|
||||
"rand_core 0.9.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207"
|
||||
dependencies = [
|
||||
"chacha20",
|
||||
"getrandom 0.4.2",
|
||||
"rand_core 0.10.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.3.1"
|
||||
@@ -2802,6 +2844,12 @@ dependencies = [
|
||||
"getrandom 0.3.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69"
|
||||
|
||||
[[package]]
|
||||
name = "rand_xoshiro"
|
||||
version = "0.7.0"
|
||||
@@ -3356,7 +3404,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures",
|
||||
"cpufeatures 0.2.17",
|
||||
"digest",
|
||||
]
|
||||
|
||||
@@ -3367,7 +3415,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures",
|
||||
"cpufeatures 0.2.17",
|
||||
"digest",
|
||||
]
|
||||
|
||||
@@ -3937,8 +3985,12 @@ checksum = "8f72a05e828585856dacd553fba484c242c46e391fb0e58917c942ee9202915c"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"log",
|
||||
"rustls",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tungstenite",
|
||||
"webpki-roots 0.26.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4152,8 +4204,11 @@ dependencies = [
|
||||
"httparse",
|
||||
"log",
|
||||
"rand 0.9.2",
|
||||
"rustls",
|
||||
"rustls-pki-types",
|
||||
"sha1",
|
||||
"thiserror 2.0.18",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
19
Cargo.toml
19
Cargo.toml
@@ -49,6 +49,7 @@ nix = { version = "= 0.31.2", features = ["hostname", "signal"] }
|
||||
notify = "= 8.2.0"
|
||||
pin-project-lite = "= 0.2.17"
|
||||
pyo3 = "= 0.28.3"
|
||||
rand = "= 0.10.1"
|
||||
regex = "= 1.12.3"
|
||||
reqwest = { version = "= 0.13.2", features = [
|
||||
"form",
|
||||
@@ -99,6 +100,10 @@ time = { version = "= 0.3.47", features = ["macros"] }
|
||||
tokio = { version = "= 1.52.1", features = ["full", "tracing"] }
|
||||
tokio-retry2 = "= 0.9.1"
|
||||
tokio-rustls = "= 0.26.4"
|
||||
tokio-tungstenite = { version = "= 0.29.0", features = [
|
||||
"rustls-tls-webpki-roots",
|
||||
"url",
|
||||
] }
|
||||
tokio-util = { version = "= 0.7.18", features = ["full"] }
|
||||
tower = "= 0.5.3"
|
||||
tower-http = { version = "= 0.6.8", features = ["timeout"] }
|
||||
@@ -258,25 +263,35 @@ publish.workspace = true
|
||||
[features]
|
||||
default = ["core", "proxy"]
|
||||
core = ["ak-common/core", "dep:pyo3", "dep:sqlx"]
|
||||
proxy = ["ak-common/proxy"]
|
||||
proxy = ["ak-common/proxy", "dep:ak-client"]
|
||||
|
||||
[dependencies]
|
||||
ak-axum.workspace = true
|
||||
ak-client = { workspace = true, optional = true }
|
||||
ak-common.workspace = true
|
||||
arc-swap.workspace = true
|
||||
argh.workspace = true
|
||||
axum.workspace = true
|
||||
color-eyre.workspace = true
|
||||
eyre.workspace = true
|
||||
futures.workspace = true
|
||||
hyper-unix-socket.workspace = true
|
||||
hyper-util.workspace = true
|
||||
metrics.workspace = true
|
||||
metrics-exporter-prometheus.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
pyo3 = { workspace = true, optional = true }
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_repr.workspace = true
|
||||
sqlx = { workspace = true, optional = true }
|
||||
time.workspace = true
|
||||
tokio-retry2.workspace = true
|
||||
tokio-tungstenite.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
[lints]
|
||||
|
||||
@@ -101,8 +101,6 @@ RUN --mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
|
||||
rustc --version && \
|
||||
cargo --version
|
||||
|
||||
RUN cat /root/.rustup/settings.toml
|
||||
|
||||
# Stage: Download uv
|
||||
FROM ghcr.io/astral-sh/uv:0.11.5@sha256:555ac94f9a22e656fc5f2ce5dfee13b04e94d099e46bb8dd3a73ec7263f2e484 AS uv
|
||||
# Stage: Base python image
|
||||
|
||||
@@ -21,33 +21,45 @@ COPY web .
|
||||
RUN npm run build-proxy
|
||||
|
||||
# Stage 2: Build
|
||||
FROM --platform=${BUILDPLATFORM} docker.io/library/golang:1.26.2-trixie@sha256:4a7137ea573f79c86ae451ff05817ed762ef5597fcf732259e97abeb3108d873 AS builder
|
||||
FROM ghcr.io/goauthentik/fips-debian:trixie-slim-fips@sha256:7726387c78b5787d2146868c2ccc8948a3591d0a5a6436f7780c8c28acc76341 AS builder
|
||||
|
||||
ARG TARGETOS
|
||||
ARG TARGETARCH
|
||||
ARG TARGETVARIANT
|
||||
|
||||
ARG GOOS=$TARGETOS
|
||||
ARG GOARCH=$TARGETARCH
|
||||
|
||||
WORKDIR /go/src/goauthentik.io
|
||||
|
||||
ENV PATH="/root/.cargo/bin:$PATH"
|
||||
SHELL ["/bin/sh", "-o", "pipefail", "-c"]
|
||||
RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache
|
||||
RUN --mount=type=cache,id=apt-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/var/cache/apt \
|
||||
dpkg --add-architecture arm64 && \
|
||||
--mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
|
||||
apt-get update && \
|
||||
apt-get install -y --no-install-recommends crossbuild-essential-arm64 gcc-aarch64-linux-gnu
|
||||
# Required for installing pip packages
|
||||
apt-get install -y --no-install-recommends \
|
||||
# Build essentials
|
||||
build-essential \
|
||||
# aws-lc deps
|
||||
cmake clang golang && \
|
||||
curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain none && \
|
||||
rustup install && \
|
||||
rustup default "$(sed -n 's/channel = "\(.*\)"/\1/p' rust-toolchain.toml)" && \
|
||||
rustc --version && \
|
||||
cargo --version
|
||||
# See https://github.com/aws/aws-lc-rs/issues/569
|
||||
ENV AWS_LC_FIPS_SYS_CC=clang
|
||||
|
||||
RUN --mount=type=bind,target=/go/src/goauthentik.io/go.mod,src=./go.mod \
|
||||
--mount=type=bind,target=/go/src/goauthentik.io/go.sum,src=./go.sum \
|
||||
--mount=type=cache,target=/go/pkg/mod \
|
||||
go mod download
|
||||
|
||||
COPY . .
|
||||
RUN --mount=type=cache,sharing=locked,target=/go/pkg/mod \
|
||||
--mount=type=cache,id=go-build-$TARGETARCH$TARGETVARIANT,sharing=locked,target=/root/.cache/go-build \
|
||||
if [ "$TARGETARCH" = "arm64" ]; then export CC=aarch64-linux-gnu-gcc && export CC_FOR_TARGET=gcc-aarch64-linux-gnu; fi && \
|
||||
CGO_ENABLED=1 GOFIPS140=latest GOARM="${TARGETVARIANT#v}" \
|
||||
go build -o /go/proxy ./cmd/proxy
|
||||
RUN --mount=type=bind,target=rust-toolchain.toml,src=rust-toolchain.toml \
|
||||
--mount=type=bind,target=Cargo.toml,src=Cargo.toml \
|
||||
--mount=type=bind,target=Cargo.lock,src=Cargo.lock \
|
||||
--mount=type=bind,target=.cargo/,src=.cargo/ \
|
||||
--mount=type=bind,target=src/,src=src/ \
|
||||
--mount=type=bind,target=packages/,src=packages/ \
|
||||
--mount=type=bind,target=authentik/lib/default.yml,src=authentik/lib/default.yml \
|
||||
# Required otherwise workspace discovery fails
|
||||
--mount=type=bind,target=website/scripts/docsmg/,src=website/scripts/docsmg/ \
|
||||
--mount=type=cache,id=cargo-git-db-$TARGETARCH$TARGETVARIANT,target=/root/.cargo/git/db/ \
|
||||
--mount=type=cache,id=cargo-registry-$TARGETARCH$TARGETVARIANT,target=/root/.cargo/registry/ \
|
||||
--mount=type=cache,id=rust-target-$TARGETARCH$TARGETVARIANT,target=/build/target/ \
|
||||
cargo build --package authentik --no-default-features --features proxy --locked --release && \
|
||||
cp ./target/release/authentik /bin/authentik
|
||||
|
||||
# Stage 3: Run
|
||||
FROM ghcr.io/goauthentik/fips-debian:trixie-slim-fips@sha256:7726387c78b5787d2146868c2ccc8948a3591d0a5a6436f7780c8c28acc76341
|
||||
@@ -72,13 +84,13 @@ RUN apt-get update && \
|
||||
apt-get clean && \
|
||||
rm -rf /tmp/* /var/lib/apt/lists/*
|
||||
|
||||
COPY --from=builder /go/proxy /
|
||||
COPY --from=builder /bin/authentik /
|
||||
COPY --from=web-builder /static/robots.txt /web/robots.txt
|
||||
COPY --from=web-builder /static/security.txt /web/security.txt
|
||||
COPY --from=web-builder /static/dist/ /web/dist/
|
||||
COPY --from=web-builder /static/authentik/ /web/authentik/
|
||||
|
||||
HEALTHCHECK --interval=5s --retries=20 --start-period=3s CMD [ "/proxy", "healthcheck" ]
|
||||
HEALTHCHECK --interval=5s --retries=20 --start-period=3s CMD [ "/authentik", "healthcheck" ]
|
||||
|
||||
EXPOSE 9000 9300 9443
|
||||
|
||||
@@ -87,4 +99,4 @@ USER 1000
|
||||
ENV TMPDIR=/dev/shm/ \
|
||||
GOFIPS=1
|
||||
|
||||
ENTRYPOINT ["/proxy"]
|
||||
ENTRYPOINT ["/authentik", "proxy"]
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Utilities for working with the authentik API client.
|
||||
|
||||
use ak_client::apis::configuration::Configuration;
|
||||
use ak_client::models::Pagination;
|
||||
use eyre::{Result, eyre};
|
||||
use url::Url;
|
||||
|
||||
@@ -60,6 +61,42 @@ pub fn make_config() -> Result<Configuration> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Fetch all pages from a paginated API endpoint, returning all results combined.
|
||||
///
|
||||
/// - `fetch`: a function that takes a page number and returns a future resolving to a paginated
|
||||
/// response.
|
||||
/// - `get_pagination`: a function that extracts the [`Pagination`] metadata from a response.
|
||||
/// - `get_results`: a function that extracts the result items from a response.
|
||||
pub async fn fetch_all<T, R, E, F, Fut>(
|
||||
fetch: F,
|
||||
get_pagination: impl Fn(&R) -> &Pagination,
|
||||
get_results: impl Fn(R) -> Vec<T>,
|
||||
) -> std::result::Result<Vec<T>, E>
|
||||
where
|
||||
F: Fn(i32) -> Fut,
|
||||
Fut: Future<Output = std::result::Result<R, E>>,
|
||||
{
|
||||
let mut page = 1;
|
||||
let mut results = Vec::with_capacity(0);
|
||||
|
||||
loop {
|
||||
let response = fetch(page).await?;
|
||||
let next = get_pagination(&response).next;
|
||||
if page == 1 {
|
||||
let count = get_pagination(&response).count as usize;
|
||||
results.reserve(count);
|
||||
}
|
||||
results.extend(get_results(response));
|
||||
if next > 0.0 {
|
||||
page += 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use serde_json::json;
|
||||
|
||||
@@ -30,12 +30,12 @@ pub fn install() -> Result<()> {
|
||||
}
|
||||
|
||||
if config.debug {
|
||||
let console_layer = console_subscriber::ConsoleLayer::builder()
|
||||
.server_addr(config.listen.debug_tokio)
|
||||
.spawn();
|
||||
// let console_layer = console_subscriber::ConsoleLayer::builder()
|
||||
// .server_addr(config.listen.debug_tokio)
|
||||
// .spawn();
|
||||
tracing_subscriber::registry()
|
||||
.with(ErrorLayer::default())
|
||||
.with(console_layer)
|
||||
// .with(console_layer)
|
||||
.with(
|
||||
fmt::layer()
|
||||
.compact()
|
||||
@@ -180,12 +180,9 @@ pub mod sentry {
|
||||
sentry_dsn: Some(config.sentry_dsn),
|
||||
environment: config.environment,
|
||||
send_pii: config.send_pii,
|
||||
#[expect(
|
||||
clippy::cast_possible_truncation,
|
||||
reason = "This is fine, we'll never get big values here."
|
||||
)]
|
||||
#[expect(
|
||||
clippy::as_conversions,
|
||||
clippy::cast_possible_truncation,
|
||||
reason = "This is fine, we'll never get big values here."
|
||||
)]
|
||||
sample_rate: config.traces_sample_rate as f32,
|
||||
|
||||
10
src/main.rs
10
src/main.rs
@@ -8,6 +8,8 @@ use eyre::{Result, eyre};
|
||||
use tracing::{error, info, trace};
|
||||
|
||||
mod metrics;
|
||||
#[cfg(feature = "proxy")]
|
||||
mod outpost;
|
||||
#[cfg(feature = "core")]
|
||||
mod server;
|
||||
#[cfg(feature = "core")]
|
||||
@@ -25,6 +27,8 @@ struct Cli {
|
||||
enum Command {
|
||||
#[cfg(feature = "core")]
|
||||
Worker(worker::Cli),
|
||||
#[cfg(feature = "proxy")]
|
||||
Proxy(outpost::proxy::Cli),
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
@@ -36,6 +40,8 @@ fn main() -> Result<()> {
|
||||
match &cli.command {
|
||||
#[cfg(feature = "core")]
|
||||
Command::Worker(_) => Mode::set(Mode::Worker)?,
|
||||
#[cfg(feature = "proxy")]
|
||||
Command::Proxy(_) => Mode::set(Mode::Proxy)?,
|
||||
}
|
||||
|
||||
trace!("installing error formatting");
|
||||
@@ -81,6 +87,10 @@ fn main() -> Result<()> {
|
||||
let workers = worker::start(args, &mut tasks)?;
|
||||
metrics.workers.store(Some(workers));
|
||||
}
|
||||
#[cfg(feature = "proxy")]
|
||||
Command::Proxy(args) => {
|
||||
outpost::start::<outpost::proxy::ProxyOutpost>(args, &mut tasks).await?;
|
||||
}
|
||||
}
|
||||
|
||||
let errors = tasks.run().await;
|
||||
|
||||
312
src/outpost/event.rs
Normal file
312
src/outpost/event.rs
Normal file
@@ -0,0 +1,312 @@
|
||||
use std::{fmt::Display, sync::Arc};
|
||||
|
||||
use ak_common::{Arbiter, Tasks, VERSION, api, arbiter, authentik_build_hash};
|
||||
use axum::http::{HeaderValue, header::AUTHORIZATION};
|
||||
use eyre::{Result, eyre};
|
||||
use futures::{SinkExt as _, StreamExt as _};
|
||||
use nix::unistd::gethostname;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
||||
use time::UtcDateTime;
|
||||
use tokio::{
|
||||
signal::unix::SignalKind,
|
||||
time::{Duration, interval, sleep},
|
||||
};
|
||||
use tokio_tungstenite::tungstenite::{Message, client::IntoClientRequest as _};
|
||||
use tracing::{debug, info, instrument, trace, warn};
|
||||
use url::Url;
|
||||
|
||||
use crate::outpost::{Outpost, OutpostController};
|
||||
|
||||
#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug, Clone, Copy, Eq)]
|
||||
#[repr(u8)]
|
||||
enum EventKind {
|
||||
/// Code used to acknowledge a previous message.
|
||||
Ack = 0,
|
||||
/// Code used to send a healthcheck keepalive.
|
||||
Hello = 1,
|
||||
/// Code received to trigger a config update.
|
||||
TriggerUpdate = 2,
|
||||
/// Code received to trigger some provider specific function.
|
||||
ProviderSpecific = 3,
|
||||
/// Code received to identify the end of a session.
|
||||
SessionEnd = 4,
|
||||
}
|
||||
|
||||
impl Display for EventKind {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Ack => write!(f, "Ack"),
|
||||
Self::Hello => write!(f, "Hello"),
|
||||
Self::TriggerUpdate => write!(f, "TriggerUpdate"),
|
||||
Self::ProviderSpecific => write!(f, "ProviderSpecific"),
|
||||
Self::SessionEnd => write!(f, "SessionEnd"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct Event {
|
||||
instruction: EventKind,
|
||||
args: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub(crate) struct EventSessionEnd {
|
||||
session_id: String,
|
||||
}
|
||||
|
||||
fn build_ws_url(mut url: Url, outpost_pk: &str, instance_uuid: &str, attempt: u32) -> Result<Url> {
|
||||
let ws_scheme = match url.scheme() {
|
||||
"https" => "wss",
|
||||
"http" => "ws",
|
||||
other => return Err(eyre!("Unsupported scheme for WebSocket URL: {other}")),
|
||||
};
|
||||
|
||||
url.set_scheme(ws_scheme)
|
||||
.map_err(|()| eyre!("Failed to set URL scheme to {ws_scheme}"))?;
|
||||
url.set_path(&format!("{}ws/outpost/{outpost_pk}/", url.path()));
|
||||
url.query_pairs_mut()
|
||||
.append_pair("instance_uuid", instance_uuid)
|
||||
.append_pair("attempt", &attempt.to_string());
|
||||
|
||||
Ok(url)
|
||||
}
|
||||
|
||||
fn hello_args(instance_uuid: &str) -> serde_json::Value {
|
||||
let raw_hostname = gethostname().unwrap_or_default();
|
||||
let hostname = raw_hostname.to_string_lossy();
|
||||
|
||||
serde_json::json!({
|
||||
"version": VERSION,
|
||||
"buildHash": authentik_build_hash(None),
|
||||
"uuid": instance_uuid,
|
||||
// TODO: rust version and AWS-LC versions
|
||||
"hostname": hostname,
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_event<O: Outpost>(
|
||||
controller: Arc<OutpostController>,
|
||||
outpost: Arc<O>,
|
||||
event: Event,
|
||||
) -> Result<()> {
|
||||
match event.instruction {
|
||||
EventKind::Ack | EventKind::Hello => {}
|
||||
EventKind::TriggerUpdate => {
|
||||
info!("received update trigger, refreshing outpost");
|
||||
sleep(controller.reload_offset).await;
|
||||
controller.refresh().await?;
|
||||
debug!("outpost controller has been refreshed");
|
||||
outpost.refresh().await?;
|
||||
debug!("outpost has been refreshed");
|
||||
#[expect(
|
||||
clippy::as_conversions,
|
||||
clippy::cast_precision_loss,
|
||||
reason = "This is fine, we'll never get big values here."
|
||||
)]
|
||||
controller
|
||||
.m_last_update
|
||||
.set(UtcDateTime::now().unix_timestamp() as f64);
|
||||
}
|
||||
EventKind::SessionEnd => {
|
||||
let event: EventSessionEnd = serde_json::from_value(event.args)?;
|
||||
outpost.end_session(event).await?;
|
||||
}
|
||||
#[expect(
|
||||
clippy::unimplemented,
|
||||
reason = "this is only relevant for the RAC provider"
|
||||
)]
|
||||
EventKind::ProviderSpecific => unimplemented!(),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn watch_events_inner<O: Outpost>(
|
||||
arbiter: Arbiter,
|
||||
controller: Arc<OutpostController>,
|
||||
outpost: Arc<O>,
|
||||
attempt: u32,
|
||||
) -> Result<()> {
|
||||
let server_config = api::ServerConfig::new()?;
|
||||
let ws_url = build_ws_url(
|
||||
server_config.host,
|
||||
&controller.outpost.load().pk.to_string(),
|
||||
&controller.instance_uuid.to_string(),
|
||||
attempt,
|
||||
)?;
|
||||
|
||||
debug!(url = %ws_url, "connecting to websocket");
|
||||
let mut request = ws_url.into_client_request()?;
|
||||
let token = controller
|
||||
.api_config
|
||||
.bearer_access_token
|
||||
.as_deref()
|
||||
.unwrap_or("");
|
||||
request.headers_mut().insert(
|
||||
AUTHORIZATION,
|
||||
HeaderValue::from_str(&format!("Bearer {token}"))?,
|
||||
);
|
||||
|
||||
let (ws_stream, _response) = tokio_tungstenite::connect_async(request).await?;
|
||||
let (mut ws_write, mut ws_read) = ws_stream.split();
|
||||
|
||||
info!(
|
||||
outpost = %controller.outpost.load().pk,
|
||||
"connected to websocket"
|
||||
);
|
||||
controller.m_connection.set(1_u8);
|
||||
|
||||
let get_refresh_interval = || {
|
||||
let mut interval = controller.outpost.load().refresh_interval_s;
|
||||
// Ensure timer interval is not negative or 0.
|
||||
// If it is, we default to 5 minutes.
|
||||
if interval <= 0_i32 {
|
||||
interval = 60_i32 * 5_i32;
|
||||
}
|
||||
// Clamp interval to be at least 30 seconds.
|
||||
if interval < 30_i32 {
|
||||
interval = 30_i32;
|
||||
}
|
||||
// infallible because we bound it to be positive above
|
||||
Duration::from_secs(interval.try_into().expect("infallible"))
|
||||
};
|
||||
let mut refresh_interval = interval(get_refresh_interval());
|
||||
let mut heartbeat_interval = interval(Duration::from_secs(10));
|
||||
|
||||
let mut events_rx = arbiter.events_subscribe();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = refresh_interval.tick() => {
|
||||
info!("refreshing outpost on interval");
|
||||
if let Err(err) = handle_event(
|
||||
Arc::clone(&controller),
|
||||
Arc::clone(&outpost),
|
||||
Event {
|
||||
instruction: EventKind::TriggerUpdate,
|
||||
args: serde_json::Value::Null
|
||||
}
|
||||
).await {
|
||||
warn!(?err, "failed to refresh");
|
||||
}
|
||||
refresh_interval = interval(get_refresh_interval());
|
||||
// Since we re-create the interval, we need to make it tick instantly to avoid
|
||||
// ending up in a never-ending tick-loop.
|
||||
refresh_interval.tick().await;
|
||||
},
|
||||
_ = heartbeat_interval.tick() => {
|
||||
let ping = Event {
|
||||
instruction: EventKind::Hello,
|
||||
args: hello_args(&controller.instance_uuid.to_string()),
|
||||
};
|
||||
ws_write.send(Message::text(serde_json::to_string(&ping)?)).await?;
|
||||
trace!("sent websocket hello (heartbeat)");
|
||||
},
|
||||
Ok(arbiter::Event::Signal(signal)) = events_rx.recv() => {
|
||||
if signal == SignalKind::user_defined1() {
|
||||
info!("refreshing outpost on signal");
|
||||
if let Err(err) = handle_event(
|
||||
Arc::clone(&controller),
|
||||
Arc::clone(&outpost),
|
||||
Event {
|
||||
instruction: EventKind::TriggerUpdate,
|
||||
args: serde_json::Value::Null
|
||||
}
|
||||
).await {
|
||||
warn!(?err, "failed to refresh");
|
||||
}
|
||||
}
|
||||
},
|
||||
msg = ws_read.next() => {
|
||||
let Some(msg) = msg else {
|
||||
break;
|
||||
};
|
||||
let msg = msg?;
|
||||
match msg {
|
||||
Message::Text(text) => {
|
||||
let Ok(event): Result<Event, _> = serde_json::from_str(&text) else {
|
||||
warn!(data = text.as_str(), "failed to parse event");
|
||||
continue;
|
||||
};
|
||||
trace!(event = %event.instruction, "received websocket event");
|
||||
if let Err(err) = handle_event(
|
||||
Arc::clone(&controller),
|
||||
Arc::clone(&outpost),
|
||||
event,
|
||||
).await {
|
||||
warn!(?err, "failed to handle event");
|
||||
}
|
||||
},
|
||||
Message::Ping(data) => {
|
||||
ws_write.send(Message::Pong(data)).await?;
|
||||
},
|
||||
Message::Close(_) => {
|
||||
break;
|
||||
},
|
||||
_ => {},
|
||||
}
|
||||
},
|
||||
() = arbiter.shutdown() => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn watch_events<O: Outpost>(
|
||||
arbiter: Arbiter,
|
||||
controller: Arc<OutpostController>,
|
||||
outpost: Arc<O>,
|
||||
) -> Result<()> {
|
||||
const MAX_BACKOFF: Duration = Duration::from_mins(5);
|
||||
let mut backoff = Duration::from_secs(1);
|
||||
let mut attempt: u32 = 0;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
() = arbiter.shutdown() => break,
|
||||
res = watch_events_inner(
|
||||
arbiter.clone(),
|
||||
Arc::clone(&controller),
|
||||
Arc::clone(&outpost),
|
||||
attempt
|
||||
) => {
|
||||
controller.m_connection.set(0_u8);
|
||||
match res {
|
||||
Ok(()) => debug!("websocket disconnected cleanly"),
|
||||
Err(err) => warn!(?err, attempt, "websocket error"),
|
||||
}
|
||||
|
||||
info!(attempt, delay = backoff.as_secs(), "reconnecting websocket in {}s...", backoff.as_secs());
|
||||
|
||||
tokio::select! {
|
||||
() = arbiter.shutdown() => break,
|
||||
() = sleep(backoff) => {}
|
||||
}
|
||||
|
||||
backoff = (backoff * 2).min(MAX_BACKOFF);
|
||||
attempt += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("stopping event watcher");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn start<O: Outpost + 'static>(
|
||||
tasks: &mut Tasks,
|
||||
controller: Arc<OutpostController>,
|
||||
outpost: Arc<O>,
|
||||
) -> Result<()> {
|
||||
let arbiter = tasks.arbiter();
|
||||
tasks
|
||||
.build_task()
|
||||
.name(&format!("{}::watch_events", module_path!()))
|
||||
.spawn(watch_events(arbiter, controller, outpost))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
123
src/outpost/mod.rs
Normal file
123
src/outpost/mod.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use ak_client::{
|
||||
apis::{configuration::Configuration, outposts_api::outposts_instances_list},
|
||||
models::Outpost as OutpostModel,
|
||||
};
|
||||
use ak_common::{Tasks, VERSION, api, authentik_build_hash};
|
||||
use arc_swap::ArcSwap;
|
||||
use eyre::{Result, eyre};
|
||||
use tracing::{debug, info, instrument};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub(crate) mod event;
|
||||
#[cfg(feature = "proxy")]
|
||||
pub(crate) mod proxy;
|
||||
|
||||
pub(crate) trait Outpost: Send + Sync + Sized {
|
||||
const OUTPOST_TYPE: &'static str;
|
||||
type Cli: Send + Sync;
|
||||
|
||||
async fn new(controller: Arc<OutpostController>) -> Result<Self>;
|
||||
|
||||
fn start(&self, tasks: &mut Tasks) -> Result<()>;
|
||||
fn refresh(&self) -> impl Future<Output = Result<()>> + Send;
|
||||
|
||||
fn end_session(&self, event: event::EventSessionEnd)
|
||||
-> impl Future<Output = Result<()>> + Send;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct OutpostController {
|
||||
api_config: Configuration,
|
||||
outpost: ArcSwap<OutpostModel>,
|
||||
instance_uuid: Uuid,
|
||||
reload_offset: Duration,
|
||||
m_info: metrics::Gauge,
|
||||
m_last_update: metrics::Gauge,
|
||||
m_connection: metrics::Gauge,
|
||||
}
|
||||
|
||||
impl OutpostController {
|
||||
#[instrument(skip_all)]
|
||||
async fn get_outpost(api_config: &Configuration) -> Result<OutpostModel> {
|
||||
let outposts = outposts_instances_list(
|
||||
api_config, None, None, None, None, None, None, None, None, None, None, None, None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let Some(outpost) = outposts.results.into_iter().next() else {
|
||||
return Err(eyre!(
|
||||
"No outposts found with given token, ensure the given token corresponds to an \
|
||||
authentik Outpost"
|
||||
));
|
||||
};
|
||||
debug!(name = outpost.name, "fetched outpost configuration");
|
||||
|
||||
Ok(outpost)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn new<O: Outpost>() -> Result<Self> {
|
||||
let api_config = api::make_config()?;
|
||||
let outpost = Self::get_outpost(&api_config).await?;
|
||||
let instance_uuid = Uuid::new_v4();
|
||||
|
||||
let m_labels = [
|
||||
("outpost_name", outpost.name.clone()),
|
||||
("outpost_type", O::OUTPOST_TYPE.to_owned()),
|
||||
("uuid", instance_uuid.to_string()),
|
||||
("version", VERSION.to_owned()),
|
||||
("build", authentik_build_hash(None)),
|
||||
];
|
||||
metrics::describe_gauge!("authentik_outpost_info", "Outpost info");
|
||||
let m_info = metrics::gauge!("authentik_outpost_info", &m_labels);
|
||||
metrics::describe_gauge!("authentik_outpost_last_update", "Time of last update");
|
||||
let m_last_update = metrics::gauge!("authentik_outpost_last_update", &m_labels);
|
||||
metrics::describe_gauge!("authentik_outpost_connection", "Connection status");
|
||||
let m_connection = metrics::gauge!("authentik_outpost_connection", &m_labels);
|
||||
|
||||
let reload_offset = Duration::from_secs(rand::random_range(0..10));
|
||||
let controller = Self {
|
||||
api_config,
|
||||
outpost: ArcSwap::from_pointee(outpost),
|
||||
instance_uuid,
|
||||
reload_offset,
|
||||
m_info,
|
||||
m_last_update,
|
||||
m_connection,
|
||||
};
|
||||
|
||||
info!(embedded = controller.is_embedded(), "outpost mode");
|
||||
debug!(?reload_offset, "HA Reload offset");
|
||||
|
||||
Ok(controller)
|
||||
}
|
||||
|
||||
fn is_embedded(&self) -> bool {
|
||||
self.outpost
|
||||
.load()
|
||||
.managed
|
||||
.as_ref()
|
||||
.and_then(|m| m.as_deref())
|
||||
.is_some_and(|m| m == "goauthentik.io/outposts/embedded")
|
||||
}
|
||||
|
||||
async fn refresh(&self) -> Result<()> {
|
||||
let outpost = Self::get_outpost(&self.api_config).await?;
|
||||
self.outpost.swap(Arc::new(outpost));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn start<O: Outpost + 'static>(_cli: O::Cli, tasks: &mut Tasks) -> Result<()> {
|
||||
let controller = Arc::new(OutpostController::new::<O>().await?);
|
||||
let outpost = Arc::new(O::new(Arc::clone(&controller)).await?);
|
||||
|
||||
event::start(tasks, Arc::clone(&controller), Arc::clone(&outpost))?;
|
||||
outpost.start(tasks)?;
|
||||
controller.m_info.set(1_u8);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
38
src/outpost/proxy/application.rs
Normal file
38
src/outpost/proxy/application.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
use url::Url;
|
||||
|
||||
use ak_client::models::ProxyOutpostConfig;
|
||||
use eyre::{Result, eyre};
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::outpost::proxy::ProxyOutpost;
|
||||
|
||||
const REDIRECT_PARAM: &str = "rd";
|
||||
const CALLBACK_SIGNATURE: &str = "X-authentik-auth-callback";
|
||||
const LOGOUT_SIGNATURE: &str = "X-authentik-logout";
|
||||
|
||||
pub(super) struct Application {
|
||||
pub(super) host: String,
|
||||
}
|
||||
|
||||
impl Application {
|
||||
#[instrument(skip_all)]
|
||||
pub(super) fn new(
|
||||
_existing_apps: &ProxyOutpost,
|
||||
provider: &ProxyOutpostConfig,
|
||||
) -> Result<Self> {
|
||||
let external_url = Url::parse(&provider.external_host)?;
|
||||
let external_host = external_url
|
||||
.host_str()
|
||||
.ok_or_else(|| eyre!("no host in external host"))?;
|
||||
|
||||
let _redirect_url = {
|
||||
let mut redirect_url = external_url.join("outpost.goauthentik.io/callback")?;
|
||||
redirect_url.set_query(Some(&format!("{CALLBACK_SIGNATURE}=true")));
|
||||
redirect_url
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
host: external_host.to_owned(),
|
||||
})
|
||||
}
|
||||
}
|
||||
151
src/outpost/proxy/mod.rs
Normal file
151
src/outpost/proxy/mod.rs
Normal file
@@ -0,0 +1,151 @@
|
||||
use ak_axum::extract::host::Host;
|
||||
use axum::extract::State;
|
||||
use axum::http::Method;
|
||||
use axum::routing::any;
|
||||
use metrics::{Histogram, histogram};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use ak_axum::router::wrap_router;
|
||||
use ak_client::apis::outposts_api::outposts_proxy_list;
|
||||
use ak_common::{Tasks, api::fetch_all};
|
||||
use arc_swap::ArcSwap;
|
||||
use argh::FromArgs;
|
||||
use axum::Router;
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::IntoResponse;
|
||||
use eyre::Result;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
|
||||
use crate::outpost::proxy::application::Application;
|
||||
use crate::outpost::{Outpost, OutpostController};
|
||||
|
||||
mod application;
|
||||
|
||||
#[derive(Debug, Default, FromArgs, PartialEq, Eq)]
|
||||
/// Run the authentik proxy outpost.
|
||||
#[argh(subcommand, name = "proxy")]
|
||||
#[expect(
|
||||
clippy::empty_structs_with_brackets,
|
||||
reason = "argh doesn't support unit structs"
|
||||
)]
|
||||
pub(crate) struct Cli {}
|
||||
|
||||
pub(crate) struct ProxyOutpost {
|
||||
controller: Arc<OutpostController>,
|
||||
applications: ArcSwap<HashMap<String, Application>>,
|
||||
}
|
||||
|
||||
impl Outpost for ProxyOutpost {
|
||||
type Cli = Cli;
|
||||
|
||||
const OUTPOST_TYPE: &'static str = "proxy";
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn new(controller: Arc<OutpostController>) -> Result<Self> {
|
||||
Ok(Self {
|
||||
controller,
|
||||
applications: ArcSwap::from_pointee(HashMap::with_capacity(0)),
|
||||
})
|
||||
}
|
||||
|
||||
fn start(&self, _tasks: &mut Tasks) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn refresh(&self) -> Result<()> {
|
||||
debug!(
|
||||
outpost_pk = %self.controller.outpost.load().pk,
|
||||
"requesting providers for outpost"
|
||||
);
|
||||
|
||||
let providers = fetch_all(
|
||||
|page| {
|
||||
outposts_proxy_list(
|
||||
&self.controller.api_config,
|
||||
None,
|
||||
None,
|
||||
Some(page),
|
||||
Some(100_i32),
|
||||
None,
|
||||
)
|
||||
},
|
||||
|r| &r.pagination,
|
||||
|r| r.results,
|
||||
)
|
||||
.await
|
||||
.inspect_err(|err| error!(?err, "failed to fetch providers"))?;
|
||||
debug!(count = providers.len(), "fetched providers");
|
||||
|
||||
if providers.is_empty() && !self.controller.is_embedded() {
|
||||
warn!(
|
||||
"no providers assigned to this outpost, check outpost configuration in authentik"
|
||||
);
|
||||
}
|
||||
|
||||
for (i, provider) in providers.iter().enumerate() {
|
||||
debug!(
|
||||
index = i,
|
||||
name = provider.name,
|
||||
external_host = provider.external_host,
|
||||
assigned_to_app = provider.assigned_application_name,
|
||||
"provider details"
|
||||
);
|
||||
}
|
||||
|
||||
let mut apps = HashMap::with_capacity(providers.len());
|
||||
|
||||
for provider in providers {
|
||||
let Ok(application) = Application::new(self, &provider)
|
||||
.inspect_err(|err| warn!(?err, "failed to setup application, skipping provider"))
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
info!(
|
||||
name = provider.name,
|
||||
host = application.host,
|
||||
"loaded application"
|
||||
);
|
||||
|
||||
apps.insert(application.host.clone(), application);
|
||||
}
|
||||
|
||||
self.applications.store(Arc::new(apps));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn end_session(&self, _event: super::event::EventSessionEnd) -> Result<()> {
|
||||
// todo!()
|
||||
warn!(?_event, "removing session");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_ping(
|
||||
method: Method,
|
||||
Host(host): Host,
|
||||
State(outpost): State<Arc<ProxyOutpost>>,
|
||||
) -> impl IntoResponse {
|
||||
let start = Instant::now();
|
||||
histogram!(
|
||||
"authentik_outpost_proxy_request_duration_seconds",
|
||||
"outpost_name" => outpost.controller.outpost.load().name.clone(),
|
||||
"method" => method.to_string(),
|
||||
"host" => host,
|
||||
"type" => "ping",
|
||||
)
|
||||
.record(start.elapsed().as_secs_f64());
|
||||
StatusCode::NO_CONTENT
|
||||
}
|
||||
|
||||
fn build_router(outpost: Arc<ProxyOutpost>) -> Router {
|
||||
// TODO: static files
|
||||
wrap_router(
|
||||
Router::new()
|
||||
.route("outpost.goauthentik.io/ping", any(handle_ping))
|
||||
.with_state(outpost),
|
||||
true,
|
||||
)
|
||||
}
|
||||
Reference in New Issue
Block a user