"""test OAuth2 OpenID Provider flow""" from json import dumps from time import sleep from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as ec from authentik.blueprints.tests import apply_blueprint, reconcile_app from authentik.common.oauth.constants import ( SCOPE_OFFLINE_ACCESS, SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE, ) from authentik.core.models import Application from authentik.core.tests.utils import create_test_cert from authentik.flows.models import Flow from authentik.lib.generators import generate_id, generate_key from authentik.policies.expression.models import ExpressionPolicy from authentik.policies.models import PolicyBinding from authentik.providers.oauth2.models import ( ClientTypes, OAuth2Provider, RedirectURI, RedirectURIMatchingMode, ScopeMapping, ) from tests.decorators import retry from tests.selenium import SeleniumTestCase class TestProviderOAuth2OIDC(SeleniumTestCase): """test OAuth with OpenID Provider flow""" def setUp(self): self.client_id = generate_id() self.client_secret = generate_key() self.application_slug = generate_id() super().setUp() def setup_client(self): """Setup client oidc-test-client container which we test OIDC against""" sleep(1) self.run_container( image="ghcr.io/beryju/oidc-test-client:2.1", ports={ "9009": "9009", }, environment={ "OIDC_CLIENT_ID": self.client_id, "OIDC_CLIENT_SECRET": self.client_secret, "OIDC_PROVIDER": f"{self.live_server_url}/application/o/{self.application_slug}/", }, ) @retry() @apply_blueprint( "default/flow-default-authentication-flow.yaml", "default/flow-default-invalidation-flow.yaml", ) @apply_blueprint("default/flow-default-provider-authorization-implicit-consent.yaml") @reconcile_app("authentik_crypto") def test_redirect_uri_error(self): """test OpenID Provider flow (invalid redirect URI, check error message)""" sleep(1) # Bootstrap all needed objects authorization_flow = Flow.objects.get( slug="default-provider-authorization-implicit-consent" ) provider = OAuth2Provider.objects.create( name=self.application_slug, client_type=ClientTypes.CONFIDENTIAL, client_id=self.client_id, client_secret=self.client_secret, signing_key=create_test_cert(), redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/")], authorization_flow=authorization_flow, ) provider.property_mappings.set( ScopeMapping.objects.filter( scope_name__in=[ SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE, SCOPE_OFFLINE_ACCESS, ] ) ) Application.objects.create( name=self.application_slug, slug=self.application_slug, provider=provider, ) self.setup_client() self.driver.get("http://localhost:9009") sleep(2) self.assertEqual( self.driver.find_element(By.CLASS_NAME, "pf-c-title").text, "Redirect URI Error", ) @retry() @apply_blueprint( "default/flow-default-authentication-flow.yaml", "default/flow-default-invalidation-flow.yaml", ) @apply_blueprint("default/flow-default-provider-authorization-implicit-consent.yaml") @apply_blueprint("system/providers-oauth2.yaml") @reconcile_app("authentik_crypto") def test_authorization_consent_implied(self): """test OpenID Provider flow (default authorization flow with implied consent) (due to offline_access a consent will still be triggered)""" sleep(1) # Bootstrap all needed objects authorization_flow = Flow.objects.get( slug="default-provider-authorization-implicit-consent" ) provider = OAuth2Provider.objects.create( name=self.application_slug, client_type=ClientTypes.CONFIDENTIAL, client_id=self.client_id, client_secret=self.client_secret, signing_key=create_test_cert(), redirect_uris=[ RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback") ], authorization_flow=authorization_flow, ) provider.property_mappings.set( ScopeMapping.objects.filter( scope_name__in=[ SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE, SCOPE_OFFLINE_ACCESS, ] ) ) app = Application.objects.create( name=self.application_slug, slug=self.application_slug, provider=provider, ) self.setup_client() self.driver.get("http://localhost:9009") self.login() self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor"))) flow_executor = self.get_shadow_root("ak-flow-executor") consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor) self.assertIn( app.name, consent_stage.find_element(By.CSS_SELECTOR, "[data-test-id='stage-heading']").text, ) current_url = self.driver.current_url consent_stage.find_element( By.CSS_SELECTOR, "[type=submit]", ).click() self.wait.until(ec.url_changes(current_url)) body = self.parse_json_content() snippet = dumps(body, indent=2)[:500].replace("\n", " ") self.assertEqual( body.get("IDTokenClaims", {}).get("nickname"), self.user.username, f"IDTokenClaims.nickname mismatch at {self.driver.current_url}: {snippet}", ) self.assertEqual( body.get("IDTokenClaims", {}).get("amr"), ["pwd"], f"IDTokenClaims.amr mismatch at {self.driver.current_url}: {snippet}", ) self.assertEqual( body.get("IDTokenClaims", {}).get("name"), self.user.name, f"IDTokenClaims.name mismatch at {self.driver.current_url}: {snippet}", ) self.assertEqual( body.get("IDTokenClaims", {}).get("email"), self.user.email, f"IDTokenClaims.email mismatch at {self.driver.current_url}: {snippet}", ) # UserInfo assertions self.assertEqual( body.get("UserInfo", {}).get("nickname"), self.user.username, f"UserInfo.nickname mismatch at {self.driver.current_url}: {snippet}", ) self.assertEqual( body.get("UserInfo", {}).get("name"), self.user.name, f"UserInfo.name mismatch at {self.driver.current_url}: {snippet}", ) self.assertEqual( body.get("UserInfo", {}).get("email"), self.user.email, f"UserInfo.email mismatch at {self.driver.current_url}: {snippet}", ) @retry() @apply_blueprint( "default/flow-default-authentication-flow.yaml", "default/flow-default-invalidation-flow.yaml", ) @apply_blueprint("default/flow-default-provider-authorization-explicit-consent.yaml") @apply_blueprint("system/providers-oauth2.yaml") @reconcile_app("authentik_crypto") def test_authorization_consent_explicit(self): """test OpenID Provider flow (default authorization flow with explicit consent)""" sleep(1) # Bootstrap all needed objects authorization_flow = Flow.objects.get( slug="default-provider-authorization-explicit-consent" ) provider = OAuth2Provider.objects.create( name=self.application_slug, authorization_flow=authorization_flow, client_type=ClientTypes.CONFIDENTIAL, client_id=self.client_id, client_secret=self.client_secret, signing_key=create_test_cert(), redirect_uris=[ RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback") ], ) provider.property_mappings.set( ScopeMapping.objects.filter( scope_name__in=[ SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE, SCOPE_OFFLINE_ACCESS, ] ) ) app = Application.objects.create( name=self.application_slug, slug=self.application_slug, provider=provider, ) self.setup_client() self.driver.get("http://localhost:9009") self.login() self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor"))) flow_executor = self.get_shadow_root("ak-flow-executor") consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor) self.assertIn( app.name, consent_stage.find_element(By.CSS_SELECTOR, "[data-test-id='stage-heading']").text, ) current_url = self.driver.current_url consent_stage.find_element( By.CSS_SELECTOR, "[type=submit]", ).click() self.wait.until(ec.url_changes(current_url)) body = self.parse_json_content() snippet = dumps(body, indent=2)[:500].replace("\n", " ") id_token_claims = body.get("IDTokenClaims", {}) user_info = body.get("UserInfo", {}) self.assertEqual( id_token_claims.get("nickname"), self.user.username, f"IDTokenClaims.nickname mismatch at {self.driver.current_url}: {snippet}", ) self.assertEqual( user_info.get("nickname"), self.user.username, f"UserInfo.nickname mismatch at {self.driver.current_url}: {snippet}", ) self.assertEqual( id_token_claims.get("name"), self.user.name, f"IDTokenClaims.name mismatch at {self.driver.current_url}: {snippet}", ) self.assertEqual( user_info.get("name"), self.user.name, f"UserInfo.name mismatch at {self.driver.current_url}: {snippet}", ) self.assertEqual( id_token_claims.get("email"), self.user.email, f"IDTokenClaims.email mismatch at {self.driver.current_url}: {snippet}", ) self.assertEqual( user_info.get("email"), self.user.email, f"UserInfo.email mismatch at {self.driver.current_url}: {snippet}", ) @retry() @apply_blueprint( "default/flow-default-authentication-flow.yaml", "default/flow-default-invalidation-flow.yaml", ) @apply_blueprint("default/flow-default-provider-authorization-explicit-consent.yaml") @apply_blueprint("system/providers-oauth2.yaml") @reconcile_app("authentik_crypto") def test_authorization_denied(self): """test OpenID Provider flow (default authorization with access deny)""" sleep(1) # Bootstrap all needed objects authorization_flow = Flow.objects.get( slug="default-provider-authorization-explicit-consent" ) provider = OAuth2Provider.objects.create( name=self.application_slug, authorization_flow=authorization_flow, client_type=ClientTypes.CONFIDENTIAL, client_id=self.client_id, client_secret=self.client_secret, signing_key=create_test_cert(), redirect_uris=[ RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost:9009/auth/callback") ], ) provider.property_mappings.set( ScopeMapping.objects.filter( scope_name__in=[ SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE, SCOPE_OFFLINE_ACCESS, ] ) ) app = Application.objects.create( name=self.application_slug, slug=self.application_slug, provider=provider, ) negative_policy = ExpressionPolicy.objects.create( name=generate_id(), expression="return False" ) PolicyBinding.objects.create(target=app, policy=negative_policy, order=0) self.setup_client() self.driver.get("http://localhost:9009") self.login() self.wait.until( ec.presence_of_element_located((By.CSS_SELECTOR, "[data-test-id='card-title']")) ) self.assertEqual( self.driver.find_element(By.CSS_SELECTOR, "[data-test-id='card-title']").text, "Permission denied", )