Files
authentik/tests/e2e/test_provider_ldap.py
Simonyi Gergő f7e23295ed core: add digraph group hierarchy (#17050)
* move imports

* core: add digraph group hierarchy

* move to permissions from Group or User to Role

* set group parents on frontend

* do not serialize `GroupParentageNode` directly

* core: enforce unique group name on database level

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* use group parents in LDAP provider

* add user-role relationship control to frontend

* move materialized view to be more discoverable

* add guardian to mypy exceptions

* make `Role` a `ManagedModel`

* fixup! make `Role` a `ManagedModel`

* simplify `get_objects_for_user`

* fix flaky unit test

* rename `django-guardian` fork to `ak-guardian`

* add tests around users/groups/roles

* remove unused guardian config variable

* simplify guardian file structure

* clean up frontend

* initial docs

* remove `mode` from `InitialPermissions`

This is no longer needed, since users no longer directly have permissions.

* fixup! Merge branch 'main' into core/add-digraph-group-hierarchy

* clean up docs for managing permissions

* addendums from docs review

* fixup! Merge branch 'main' into core/add-digraph-group-hierarchy

* tweaks

* dewi and tana edits to docs

* tweak

* truly final tweaks, for now

* relabel Role Permissions table

* clarify button label

* fixup! Merge branch 'main' into core/add-digraph-group-hierarchy

* fixup! Merge branch 'main' into core/add-digraph-group-hierarchy

* merge migrations

* fixup! Merge branch 'main' into core/add-digraph-group-hierarchy

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
Co-authored-by: Jens Langhammer <jens@goauthentik.io>
Co-authored-by: Tana M Berry <tana@goauthentik.io>
2025-12-08 12:04:04 +01:00

502 lines
19 KiB
Python

"""LDAP and Outpost e2e tests"""
from dataclasses import asdict
from ldap3 import ALL, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, SUBTREE, Connection, Server
from ldap3.core.exceptions import LDAPInvalidCredentialsResult
from authentik.blueprints.tests import apply_blueprint, reconcile_app
from authentik.core.models import Application, User
from authentik.core.tests.utils import create_test_user
from authentik.events.models import Event, EventAction
from authentik.flows.models import Flow
from authentik.lib.generators import generate_id
from authentik.outposts.apps import MANAGED_OUTPOST
from authentik.outposts.models import Outpost, OutpostConfig, OutpostType
from authentik.providers.ldap.models import APIAccessMode, LDAPProvider
from tests.e2e.utils import SeleniumTestCase, retry
class TestProviderLDAP(SeleniumTestCase):
"""LDAP and Outpost e2e tests"""
def start_ldap(self, outpost: Outpost):
"""Start ldap container based on outpost created"""
self.run_container(
image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
ports={
"3389": "3389",
"6636": "6636",
},
environment={
"AUTHENTIK_TOKEN": outpost.token.key,
},
)
def _prepare(self) -> User:
"""prepare user, provider, app and container"""
self.user.attributes["extraAttribute"] = "bar"
self.user.save()
ldap: LDAPProvider = LDAPProvider.objects.create(
name=generate_id(),
authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
search_mode=APIAccessMode.CACHED,
)
self.user.assign_perms_to_managed_role("search_full_directory", ldap)
# we need to create an application to actually access the ldap
Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
outpost: Outpost = Outpost.objects.create(
name=generate_id(),
type=OutpostType.LDAP,
_config=asdict(OutpostConfig(log_level="debug")),
)
outpost.providers.add(ldap)
self.start_ldap(outpost)
return outpost
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
def test_ldap_bind_success(self):
"""Test simple bind"""
self._prepare()
server = Server("ldap://localhost:3389", get_info=ALL)
_connection = Connection(
server,
raise_exceptions=True,
user=f"cn={self.user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
password=self.user.username,
)
_connection.bind()
self.assertTrue(
Event.objects.filter(
action=EventAction.LOGIN,
user={
"pk": self.user.pk,
"email": self.user.email,
"username": self.user.username,
},
)
)
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
def test_ldap_bind_success_ssl(self):
"""Test simple bind with ssl"""
self._prepare()
server = Server("ldaps://localhost:6636", get_info=ALL)
_connection = Connection(
server,
raise_exceptions=True,
user=f"cn={self.user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
password=self.user.username,
)
_connection.bind()
self.assertTrue(
Event.objects.filter(
action=EventAction.LOGIN,
user={
"pk": self.user.pk,
"email": self.user.email,
"username": self.user.username,
},
)
)
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
def test_ldap_bind_success_starttls(self):
"""Test simple bind with ssl"""
self._prepare()
server = Server("ldap://localhost:3389")
_connection = Connection(
server,
raise_exceptions=True,
user=f"cn={self.user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
password=self.user.username,
)
_connection.start_tls()
_connection.bind()
self.assertTrue(
Event.objects.filter(
action=EventAction.LOGIN,
user={
"pk": self.user.pk,
"email": self.user.email,
"username": self.user.username,
},
)
)
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
def test_ldap_bind_fail(self):
"""Test simple bind (failed)"""
self._prepare()
server = Server("ldap://localhost:3389", get_info=ALL)
_connection = Connection(
server,
raise_exceptions=True,
user=f"cn={self.user.username},ou=users,DC=ldap,DC=goauthentik,DC=io",
password=self.user.username + "fqwerwqer",
)
with self.assertRaises(LDAPInvalidCredentialsResult):
_connection.bind()
self.assertTrue(
Event.objects.filter(
action=EventAction.LOGIN_FAILED,
user={
"pk": self.user.pk,
"email": self.user.email,
"username": self.user.username,
},
).exists(),
)
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
@reconcile_app("authentik_tenants")
@reconcile_app("authentik_outposts")
def test_ldap_bind_search(self):
"""Test simple bind + search"""
# Remove akadmin to ensure list is correct
# Remove user before starting container so it's not cached
User.objects.filter(username="akadmin").delete()
outpost = self._prepare()
server = Server("ldap://localhost:3389", get_info=ALL)
_connection = Connection(
server,
raise_exceptions=True,
user=f"cn={self.user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
password=self.user.username,
)
_connection.bind()
self.assertTrue(
Event.objects.filter(
action=EventAction.LOGIN,
user={
"pk": self.user.pk,
"email": self.user.email,
"username": self.user.username,
},
)
)
embedded_account = Outpost.objects.filter(managed=MANAGED_OUTPOST).first().user
_connection.search(
"ou=Users,DC=ldaP,dc=goauthentik,dc=io",
"(objectClass=user)",
search_scope=SUBTREE,
attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES],
)
response: list = _connection.response
# Remove raw_attributes to make checking easier
for obj in response:
del obj["raw_attributes"]
del obj["raw_dn"]
obj["attributes"] = dict(obj["attributes"])
o_user = outpost.user
expected = [
{
"dn": f"cn={o_user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
"attributes": {
"cn": o_user.username,
"sAMAccountName": o_user.username,
"uid": o_user.uid,
"name": o_user.name,
"displayName": o_user.name,
"sn": o_user.name,
"mail": "",
"objectClass": [
"top",
"person",
"organizationalPerson",
"inetOrgPerson",
"user",
"posixAccount",
"goauthentik.io/ldap/user",
],
"uidNumber": 2000 + o_user.pk,
"gidNumber": 2000 + o_user.pk,
"memberOf": [],
"homeDirectory": f"/home/{o_user.username}",
"ak-active": True,
"ak-superuser": False,
"pwdChangedTime": o_user.password_change_date.replace(microsecond=0),
"createTimestamp": o_user.date_joined.replace(microsecond=0),
"modifyTimestamp": o_user.last_updated.replace(microsecond=0),
},
"type": "searchResEntry",
},
{
"dn": f"cn={embedded_account.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
"attributes": {
"cn": embedded_account.username,
"sAMAccountName": embedded_account.username,
"uid": embedded_account.uid,
"name": embedded_account.name,
"displayName": embedded_account.name,
"sn": embedded_account.name,
"mail": "",
"objectClass": [
"top",
"person",
"organizationalPerson",
"inetOrgPerson",
"user",
"posixAccount",
"goauthentik.io/ldap/user",
],
"uidNumber": 2000 + embedded_account.pk,
"gidNumber": 2000 + embedded_account.pk,
"memberOf": [],
"homeDirectory": f"/home/{embedded_account.username}",
"ak-active": True,
"ak-superuser": False,
"pwdChangedTime": embedded_account.password_change_date.replace(microsecond=0),
"createTimestamp": embedded_account.date_joined.replace(microsecond=0),
"modifyTimestamp": embedded_account.last_updated.replace(microsecond=0),
},
"type": "searchResEntry",
},
{
"dn": f"cn={self.user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
"attributes": {
"cn": self.user.username,
"sAMAccountName": self.user.username,
"uid": self.user.uid,
"name": self.user.name,
"displayName": self.user.name,
"sn": self.user.name,
"mail": self.user.email,
"objectClass": [
"top",
"person",
"organizationalPerson",
"inetOrgPerson",
"user",
"posixAccount",
"goauthentik.io/ldap/user",
],
"uidNumber": 2000 + self.user.pk,
"gidNumber": 2000 + self.user.pk,
"memberOf": [
f"cn={group.name},ou=groups,dc=ldap,dc=goauthentik,dc=io"
for group in self.user.ak_groups.all()
],
"homeDirectory": f"/home/{self.user.username}",
"ak-active": True,
"ak-superuser": True,
"extraAttribute": ["bar"],
"pwdChangedTime": self.user.password_change_date.replace(microsecond=0),
"createTimestamp": self.user.date_joined.replace(microsecond=0),
"modifyTimestamp": self.user.last_updated.replace(microsecond=0),
},
"type": "searchResEntry",
},
]
self.assert_list_dict_equal(expected, response)
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
@reconcile_app("authentik_tenants")
@reconcile_app("authentik_outposts")
def test_ldap_bind_search_no_perms(self):
"""Test simple bind + search"""
user = create_test_user()
self._prepare()
server = Server("ldap://localhost:3389", get_info=ALL)
_connection = Connection(
server,
raise_exceptions=True,
user=f"cn={user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
password=user.username,
)
_connection.bind()
self.assertTrue(
Event.objects.filter(
action=EventAction.LOGIN,
user={
"pk": user.pk,
"email": user.email,
"username": user.username,
},
)
)
_connection.search(
"ou=Users,DC=ldaP,dc=goauthentik,dc=io",
"(objectClass=user)",
search_scope=SUBTREE,
attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES],
)
response: list = _connection.response
# Remove raw_attributes to make checking easier
for obj in response:
del obj["raw_attributes"]
del obj["raw_dn"]
obj["attributes"] = dict(obj["attributes"])
expected = [
{
"dn": f"cn={user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
"attributes": {
"cn": user.username,
"sAMAccountName": user.username,
"uid": user.uid,
"name": user.name,
"displayName": user.name,
"sn": user.name,
"mail": user.email,
"objectClass": [
"top",
"person",
"organizationalPerson",
"inetOrgPerson",
"user",
"posixAccount",
"goauthentik.io/ldap/user",
],
"uidNumber": 2000 + user.pk,
"gidNumber": 2000 + user.pk,
"memberOf": [
f"cn={group.name},ou=groups,dc=ldap,dc=goauthentik,dc=io"
for group in user.ak_groups.all()
],
"homeDirectory": f"/home/{user.username}",
"ak-active": True,
"ak-superuser": False,
"pwdChangedTime": user.password_change_date.replace(microsecond=0),
"createTimestamp": user.date_joined.replace(microsecond=0),
"modifyTimestamp": user.last_updated.replace(microsecond=0),
},
"type": "searchResEntry",
},
]
self.assert_list_dict_equal(expected, response)
def assert_list_dict_equal(self, expected: list[dict], actual: list[dict], match_key="dn"):
"""Assert a list of dictionaries is identical, ignoring the ordering of items"""
self.assertEqual(len(expected), len(actual))
for res_item in actual:
all_matching = [x for x in expected if x[match_key] == res_item[match_key]]
self.assertEqual(len(all_matching), 1)
matching = all_matching[0]
self.assertDictEqual(res_item, matching)
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
@reconcile_app("authentik_tenants")
@reconcile_app("authentik_outposts")
def test_ldap_schema(self):
"""Test LDAP Schema"""
self._prepare()
server = Server("ldap://localhost:3389", get_info=ALL)
_connection = Connection(
server,
raise_exceptions=True,
user=f"cn={self.user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
password=self.user.username,
)
_connection.bind()
self.assertIsNotNone(server.schema)
self.assertTrue(server.schema.is_valid())
self.assertIsNotNone(server.schema.object_classes["goauthentik.io/ldap/user"])
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
@reconcile_app("authentik_tenants")
@reconcile_app("authentik_outposts")
def test_ldap_search_attrs_filter(self):
"""Test search with attributes filtering"""
# Remove akadmin to ensure list is correct
# Remove user before starting container so it's not cached
User.objects.filter(username="akadmin").delete()
outpost = self._prepare()
server = Server("ldap://localhost:3389", get_info=ALL)
_connection = Connection(
server,
raise_exceptions=True,
user=f"cn={self.user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
password=self.user.username,
)
_connection.bind()
self.assertTrue(
Event.objects.filter(
action=EventAction.LOGIN,
user={
"pk": self.user.pk,
"email": self.user.email,
"username": self.user.username,
},
)
)
embedded_account = Outpost.objects.filter(managed=MANAGED_OUTPOST).first().user
_connection.search(
"ou=Users,DC=ldaP,dc=goauthentik,dc=io",
"(objectClass=user)",
search_scope=SUBTREE,
attributes=["cn"],
)
response: list = _connection.response
# Remove raw_attributes to make checking easier
for obj in response:
del obj["raw_attributes"]
del obj["raw_dn"]
o_user = outpost.user
self.assert_list_dict_equal(
[
{
"dn": f"cn={o_user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
"attributes": {
"cn": o_user.username,
},
"type": "searchResEntry",
},
{
"dn": f"cn={embedded_account.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
"attributes": {
"cn": embedded_account.username,
},
"type": "searchResEntry",
},
{
"dn": f"cn={self.user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
"attributes": {
"cn": self.user.username,
},
"type": "searchResEntry",
},
],
response,
)