mirror of
https://github.com/goauthentik/authentik
synced 2026-04-25 17:15:26 +02:00
core: trim hash password command tests
This commit is contained in:
@@ -1,383 +1,28 @@
|
||||
"""Tests for hash_password management command and password_hash serializer functionality"""
|
||||
"""Tests for hash_password management command."""
|
||||
|
||||
from io import StringIO
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.contrib.auth.hashers import check_password, identify_hasher, make_password
|
||||
from django.contrib.auth.hashers import check_password
|
||||
from django.core.management import call_command
|
||||
from django.core.management.base import CommandError
|
||||
from django.test import TestCase
|
||||
from rest_framework.exceptions import ValidationError
|
||||
|
||||
from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
|
||||
from authentik.core.api.users import UserSerializer
|
||||
from authentik.core.models import User
|
||||
from authentik.core.signals import password_changed
|
||||
from authentik.lib.generators import generate_id
|
||||
|
||||
|
||||
class TestHashPasswordCommand(TestCase):
|
||||
"""Test hash_password management command"""
|
||||
"""Test hash_password management command."""
|
||||
|
||||
def test_hash_password_basic(self):
|
||||
"""Test basic password hashing"""
|
||||
def test_hash_password(self):
|
||||
"""Test hashing a password."""
|
||||
out = StringIO()
|
||||
call_command("hash_password", "test123", stdout=out)
|
||||
hashed = out.getvalue().strip()
|
||||
|
||||
# Verify it's a valid hash
|
||||
self.assertTrue(hashed.startswith("pbkdf2_sha256$"))
|
||||
# Verify the hash can be validated
|
||||
self.assertTrue(check_password("test123", hashed))
|
||||
|
||||
def test_hash_password_special_chars(self):
|
||||
"""Test hashing password with special characters"""
|
||||
out = StringIO()
|
||||
password = "P@ssw0rd!#$%^&*(){}[]" # nosec
|
||||
call_command("hash_password", password, stdout=out)
|
||||
hashed = out.getvalue().strip()
|
||||
|
||||
self.assertTrue(hashed.startswith("pbkdf2_sha256$"))
|
||||
self.assertTrue(check_password(password, hashed))
|
||||
|
||||
def test_hash_password_unicode(self):
|
||||
"""Test hashing password with unicode characters"""
|
||||
out = StringIO()
|
||||
password = "пароль123" # nosec
|
||||
call_command("hash_password", password, stdout=out)
|
||||
hashed = out.getvalue().strip()
|
||||
|
||||
self.assertTrue(hashed.startswith("pbkdf2_sha256$"))
|
||||
self.assertTrue(check_password(password, hashed))
|
||||
|
||||
def test_hash_password_long(self):
|
||||
"""Test hashing a very long password"""
|
||||
out = StringIO()
|
||||
password = "a" * 1000
|
||||
call_command("hash_password", password, stdout=out)
|
||||
hashed = out.getvalue().strip()
|
||||
|
||||
self.assertTrue(hashed.startswith("pbkdf2_sha256$"))
|
||||
self.assertTrue(check_password(password, hashed))
|
||||
|
||||
def test_hash_password_spaces(self):
|
||||
"""Test hashing password with spaces"""
|
||||
out = StringIO()
|
||||
password = "my super secret password" # nosec
|
||||
call_command("hash_password", password, stdout=out)
|
||||
hashed = out.getvalue().strip()
|
||||
|
||||
self.assertTrue(hashed.startswith("pbkdf2_sha256$"))
|
||||
self.assertTrue(check_password(password, hashed))
|
||||
|
||||
def test_hash_password_empty_fails(self):
|
||||
"""Test that empty password raises error"""
|
||||
"""Test that empty password raises error."""
|
||||
with self.assertRaises(CommandError) as ctx:
|
||||
call_command("hash_password", "")
|
||||
|
||||
self.assertIn("Password cannot be empty", str(ctx.exception))
|
||||
|
||||
def test_hash_different_passwords_different_hashes(self):
|
||||
"""Test that different passwords produce different hashes"""
|
||||
out1 = StringIO()
|
||||
out2 = StringIO()
|
||||
|
||||
call_command("hash_password", "password1", stdout=out1)
|
||||
call_command("hash_password", "password2", stdout=out2)
|
||||
|
||||
hash1 = out1.getvalue().strip()
|
||||
hash2 = out2.getvalue().strip()
|
||||
|
||||
self.assertNotEqual(hash1, hash2)
|
||||
self.assertTrue(check_password("password1", hash1))
|
||||
self.assertTrue(check_password("password2", hash2))
|
||||
self.assertFalse(check_password("password1", hash2))
|
||||
self.assertFalse(check_password("password2", hash1))
|
||||
|
||||
def test_hash_same_password_different_hashes(self):
|
||||
"""Test that same password produces different hashes (due to salt)"""
|
||||
out1 = StringIO()
|
||||
out2 = StringIO()
|
||||
|
||||
call_command("hash_password", "samepassword", stdout=out1)
|
||||
call_command("hash_password", "samepassword", stdout=out2)
|
||||
|
||||
hash1 = out1.getvalue().strip()
|
||||
hash2 = out2.getvalue().strip()
|
||||
|
||||
# Hashes should be different due to random salt
|
||||
self.assertNotEqual(hash1, hash2)
|
||||
# But both should validate the same password
|
||||
self.assertTrue(check_password("samepassword", hash1))
|
||||
self.assertTrue(check_password("samepassword", hash2))
|
||||
|
||||
|
||||
class TestUserSerializerPasswordHash(TestCase):
|
||||
"""Test UserSerializer password_hash functionality in blueprint context"""
|
||||
|
||||
def test_password_hash_sets_password_correctly(self):
|
||||
"""Test that a valid password_hash sets the password directly without re-hashing"""
|
||||
password = "test-password-123" # nosec
|
||||
password_hash = make_password(password)
|
||||
|
||||
username = generate_id()
|
||||
data = {
|
||||
"username": username,
|
||||
"name": "Test User",
|
||||
"password_hash": password_hash,
|
||||
}
|
||||
|
||||
serializer = UserSerializer(data=data, context={SERIALIZER_CONTEXT_BLUEPRINT: True})
|
||||
self.assertTrue(serializer.is_valid(), serializer.errors)
|
||||
user = serializer.save()
|
||||
|
||||
# Verify password was set correctly
|
||||
self.assertTrue(user.check_password(password))
|
||||
# Verify the hash was set directly (not re-hashed)
|
||||
self.assertEqual(user.password, password_hash)
|
||||
|
||||
def test_password_hash_invalid_format_raises_error(self):
|
||||
"""Test that an invalid password_hash raises ValidationError"""
|
||||
username = generate_id()
|
||||
data = {
|
||||
"username": username,
|
||||
"name": "Test User",
|
||||
"password_hash": "not-a-valid-hash",
|
||||
}
|
||||
|
||||
serializer = UserSerializer(data=data, context={SERIALIZER_CONTEXT_BLUEPRINT: True})
|
||||
self.assertTrue(serializer.is_valid(), serializer.errors)
|
||||
|
||||
with self.assertRaises(ValidationError) as ctx:
|
||||
serializer.save()
|
||||
|
||||
self.assertIn("Invalid password hash format", str(ctx.exception))
|
||||
|
||||
def test_password_and_password_hash_both_set_raises_error(self):
|
||||
"""Test that setting both password and password_hash raises an error"""
|
||||
plaintext_password = "plaintext-password" # nosec
|
||||
password_hash = make_password("hash-password") # nosec
|
||||
|
||||
username = generate_id()
|
||||
data = {
|
||||
"username": username,
|
||||
"name": "Test User",
|
||||
"password": plaintext_password,
|
||||
"password_hash": password_hash,
|
||||
}
|
||||
|
||||
serializer = UserSerializer(data=data, context={SERIALIZER_CONTEXT_BLUEPRINT: True})
|
||||
self.assertTrue(serializer.is_valid(), serializer.errors)
|
||||
|
||||
with self.assertRaises(ValidationError) as ctx:
|
||||
serializer.save()
|
||||
|
||||
self.assertIn("Cannot set both password and password_hash", str(ctx.exception))
|
||||
|
||||
def test_password_change_date_updated_with_password_hash(self):
|
||||
"""Test that password_change_date is updated when using password_hash"""
|
||||
password_hash = make_password("test-password") # nosec
|
||||
|
||||
username = generate_id()
|
||||
data = {
|
||||
"username": username,
|
||||
"name": "Test User",
|
||||
"password_hash": password_hash,
|
||||
}
|
||||
|
||||
serializer = UserSerializer(data=data, context={SERIALIZER_CONTEXT_BLUEPRINT: True})
|
||||
self.assertTrue(serializer.is_valid(), serializer.errors)
|
||||
user = serializer.save()
|
||||
|
||||
# Verify password_change_date is set
|
||||
self.assertIsNotNone(user.password_change_date)
|
||||
|
||||
def test_password_hash_update_existing_user(self):
|
||||
"""Test that password_hash works when updating an existing user"""
|
||||
# Create user first
|
||||
user = User.objects.create(username=generate_id(), name="Test User")
|
||||
user.set_password("old-password") # nosec
|
||||
user.save()
|
||||
|
||||
new_password = "new-password-123" # nosec
|
||||
password_hash = make_password(new_password)
|
||||
|
||||
data = {
|
||||
"password_hash": password_hash,
|
||||
}
|
||||
|
||||
serializer = UserSerializer(
|
||||
instance=user,
|
||||
data=data,
|
||||
context={SERIALIZER_CONTEXT_BLUEPRINT: True},
|
||||
partial=True,
|
||||
)
|
||||
self.assertTrue(serializer.is_valid(), serializer.errors)
|
||||
updated_user = serializer.save()
|
||||
|
||||
# Verify password was updated
|
||||
self.assertTrue(updated_user.check_password(new_password))
|
||||
self.assertFalse(updated_user.check_password("old-password"))
|
||||
|
||||
def test_password_hash_whitespace_only_rejected(self):
|
||||
"""Test that whitespace-only password_hash is rejected by serializer validation"""
|
||||
username = generate_id()
|
||||
data = {
|
||||
"username": username,
|
||||
"name": "Test User",
|
||||
"password_hash": " ", # whitespace only
|
||||
}
|
||||
|
||||
serializer = UserSerializer(data=data, context={SERIALIZER_CONTEXT_BLUEPRINT: True})
|
||||
# Serializer should reject blank values
|
||||
self.assertFalse(serializer.is_valid())
|
||||
self.assertIn("password_hash", serializer.errors)
|
||||
|
||||
def test_password_hash_empty_string_rejected(self):
|
||||
"""Test that empty string password_hash is rejected by serializer validation"""
|
||||
username = generate_id()
|
||||
data = {
|
||||
"username": username,
|
||||
"name": "Test User",
|
||||
"password_hash": "",
|
||||
}
|
||||
|
||||
serializer = UserSerializer(data=data, context={SERIALIZER_CONTEXT_BLUEPRINT: True})
|
||||
# Serializer should reject blank values
|
||||
self.assertFalse(serializer.is_valid())
|
||||
self.assertIn("password_hash", serializer.errors)
|
||||
|
||||
def test_password_hash_null_creates_unusable_password(self):
|
||||
"""Test that null password_hash results in unusable password"""
|
||||
username = generate_id()
|
||||
data = {
|
||||
"username": username,
|
||||
"name": "Test User",
|
||||
"password_hash": None,
|
||||
}
|
||||
|
||||
serializer = UserSerializer(data=data, context={SERIALIZER_CONTEXT_BLUEPRINT: True})
|
||||
self.assertTrue(serializer.is_valid(), serializer.errors)
|
||||
user = serializer.save()
|
||||
|
||||
# User should have an unusable password since no valid password was provided
|
||||
self.assertFalse(user.has_usable_password())
|
||||
|
||||
def test_both_password_and_password_hash_null_creates_unusable_password(self):
|
||||
"""Test that explicitly null password and password_hash results in unusable password"""
|
||||
username = generate_id()
|
||||
data = {
|
||||
"username": username,
|
||||
"name": "Test User",
|
||||
"password": None,
|
||||
"password_hash": None,
|
||||
}
|
||||
|
||||
serializer = UserSerializer(data=data, context={SERIALIZER_CONTEXT_BLUEPRINT: True})
|
||||
self.assertTrue(serializer.is_valid(), serializer.errors)
|
||||
user = serializer.save()
|
||||
|
||||
# User should have an unusable password since no valid password was provided
|
||||
self.assertFalse(user.has_usable_password())
|
||||
|
||||
def test_password_hash_not_available_outside_blueprint_context(self):
|
||||
"""Test that password_hash field is not available outside blueprint context"""
|
||||
username = generate_id()
|
||||
data = {
|
||||
"username": username,
|
||||
"name": "Test User",
|
||||
"password_hash": make_password("test"), # nosec
|
||||
}
|
||||
|
||||
# Without blueprint context
|
||||
serializer = UserSerializer(data=data)
|
||||
self.assertTrue(serializer.is_valid(), serializer.errors)
|
||||
|
||||
# password_hash should not be in validated_data
|
||||
self.assertNotIn("password_hash", serializer.validated_data)
|
||||
|
||||
def test_password_hash_identifies_various_hashers(self):
|
||||
"""Test that valid Django password hashes are accepted"""
|
||||
password = "test-password" # nosec
|
||||
password_hash = make_password(password)
|
||||
|
||||
# Verify the hash is a valid Django hash format
|
||||
hasher = identify_hasher(password_hash)
|
||||
self.assertIsNotNone(hasher)
|
||||
|
||||
username = generate_id()
|
||||
data = {
|
||||
"username": username,
|
||||
"name": "Test User",
|
||||
"password_hash": password_hash,
|
||||
}
|
||||
|
||||
serializer = UserSerializer(data=data, context={SERIALIZER_CONTEXT_BLUEPRINT: True})
|
||||
self.assertTrue(serializer.is_valid(), serializer.errors)
|
||||
user = serializer.save()
|
||||
|
||||
self.assertTrue(user.check_password(password))
|
||||
|
||||
def test_set_password_from_hash_signal_does_not_expose_raw_password(self):
|
||||
"""Test password_changed signal payload when password hash is set directly."""
|
||||
user = User.objects.create(username=generate_id(), name="Test User")
|
||||
user.set_password("old-password") # nosec
|
||||
user.save()
|
||||
captured = []
|
||||
dispatch_uid = generate_id()
|
||||
|
||||
def receiver(sender, **kwargs):
|
||||
captured.append(kwargs)
|
||||
|
||||
password_changed.connect(receiver, dispatch_uid=dispatch_uid)
|
||||
try:
|
||||
user.set_password_from_hash(make_password("new-password")) # nosec
|
||||
user.save()
|
||||
finally:
|
||||
password_changed.disconnect(dispatch_uid=dispatch_uid)
|
||||
|
||||
self.assertEqual(len(captured), 1)
|
||||
self.assertIsNone(captured[0]["password"])
|
||||
self.assertEqual(captured[0]["password_source"], "hash")
|
||||
|
||||
def test_set_password_signal_marks_non_hash_password(self):
|
||||
"""Test password_changed signal payload for a regular password set."""
|
||||
user = User.objects.create(username=generate_id(), name="Test User")
|
||||
captured = []
|
||||
dispatch_uid = generate_id()
|
||||
|
||||
def receiver(sender, **kwargs):
|
||||
captured.append(kwargs)
|
||||
|
||||
password_changed.connect(receiver, dispatch_uid=dispatch_uid)
|
||||
try:
|
||||
user.set_password("new-password") # nosec
|
||||
user.save()
|
||||
finally:
|
||||
password_changed.disconnect(dispatch_uid=dispatch_uid)
|
||||
|
||||
self.assertEqual(len(captured), 1)
|
||||
self.assertEqual(captured[0]["password"], "new-password")
|
||||
self.assertNotIn("password_source", captured[0])
|
||||
|
||||
def test_set_password_from_hash_skips_ldap_and_kerberos_sync_receivers(self):
|
||||
"""Hash-based password sets should not hit LDAP/Kerberos sync paths."""
|
||||
user = User.objects.create(
|
||||
username=generate_id(),
|
||||
name="Test User",
|
||||
attributes={"distinguishedName": "cn=test,ou=users,dc=example,dc=com"},
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"authentik.sources.ldap.signals.LDAPSource.objects.filter"
|
||||
) as ldap_sources_filter,
|
||||
patch(
|
||||
"authentik.sources.kerberos.signals."
|
||||
"UserKerberosSourceConnection.objects.select_related"
|
||||
) as kerberos_connections_select,
|
||||
):
|
||||
user.set_password_from_hash(make_password("new-password")) # nosec
|
||||
user.save()
|
||||
|
||||
ldap_sources_filter.assert_not_called()
|
||||
kerberos_connections_select.assert_not_called()
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
"""user tests"""
|
||||
|
||||
from django.test.testcases import TestCase
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.contrib.auth.hashers import make_password
|
||||
from django.test.testcases import TestCase
|
||||
from rest_framework.exceptions import ValidationError
|
||||
|
||||
from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
|
||||
from authentik.core.api.users import UserSerializer
|
||||
from authentik.core.models import User
|
||||
from authentik.core.signals import password_changed
|
||||
from authentik.events.models import Event
|
||||
from authentik.lib.generators import generate_id
|
||||
|
||||
@@ -33,3 +40,91 @@ class TestUsers(TestCase):
|
||||
self.assertEqual(Event.objects.count(), 1)
|
||||
user.ak_groups.all()
|
||||
self.assertEqual(Event.objects.count(), 1)
|
||||
|
||||
def test_set_password_from_hash_signal_skips_source_sync_receivers(self):
|
||||
"""Test hash password updates do not expose a raw password to sync receivers."""
|
||||
user = User.objects.create(
|
||||
username=generate_id(),
|
||||
attributes={"distinguishedName": "cn=test,ou=users,dc=example,dc=com"},
|
||||
)
|
||||
captured = []
|
||||
dispatch_uid = generate_id()
|
||||
|
||||
def receiver(sender, **kwargs):
|
||||
captured.append(kwargs)
|
||||
|
||||
password_changed.connect(receiver, dispatch_uid=dispatch_uid)
|
||||
try:
|
||||
with (
|
||||
patch(
|
||||
"authentik.sources.ldap.signals.LDAPSource.objects.filter"
|
||||
) as ldap_sources_filter,
|
||||
patch(
|
||||
"authentik.sources.kerberos.signals."
|
||||
"UserKerberosSourceConnection.objects.select_related"
|
||||
) as kerberos_connections_select,
|
||||
):
|
||||
user.set_password_from_hash(make_password("new-password")) # nosec
|
||||
user.save()
|
||||
finally:
|
||||
password_changed.disconnect(dispatch_uid=dispatch_uid)
|
||||
|
||||
self.assertEqual(len(captured), 1)
|
||||
self.assertIsNone(captured[0]["password"])
|
||||
self.assertEqual(captured[0]["password_source"], "hash")
|
||||
ldap_sources_filter.assert_not_called()
|
||||
kerberos_connections_select.assert_not_called()
|
||||
|
||||
|
||||
class TestUserSerializerPasswordHash(TestCase):
|
||||
"""Test UserSerializer password_hash support in blueprint context."""
|
||||
|
||||
def test_password_hash_sets_password_directly(self):
|
||||
"""Test a valid password hash is stored without re-hashing."""
|
||||
password = "test-password-123" # nosec
|
||||
password_hash = make_password(password)
|
||||
serializer = UserSerializer(
|
||||
data={
|
||||
"username": generate_id(),
|
||||
"name": "Test User",
|
||||
"password_hash": password_hash,
|
||||
},
|
||||
context={SERIALIZER_CONTEXT_BLUEPRINT: True},
|
||||
)
|
||||
|
||||
self.assertTrue(serializer.is_valid(), serializer.errors)
|
||||
user = serializer.save()
|
||||
|
||||
self.assertEqual(user.password, password_hash)
|
||||
self.assertTrue(user.check_password(password))
|
||||
self.assertIsNotNone(user.password_change_date)
|
||||
|
||||
def test_password_hash_rejects_invalid_format(self):
|
||||
"""Test invalid password hash values are rejected."""
|
||||
serializer = UserSerializer(
|
||||
data={
|
||||
"username": generate_id(),
|
||||
"name": "Test User",
|
||||
"password_hash": "not-a-valid-hash",
|
||||
},
|
||||
context={SERIALIZER_CONTEXT_BLUEPRINT: True},
|
||||
)
|
||||
|
||||
self.assertTrue(serializer.is_valid(), serializer.errors)
|
||||
with self.assertRaises(ValidationError) as ctx:
|
||||
serializer.save()
|
||||
|
||||
self.assertIn("Invalid password hash format", str(ctx.exception))
|
||||
|
||||
def test_password_hash_ignored_outside_blueprint_context(self):
|
||||
"""Test password_hash is not accepted by the regular serializer."""
|
||||
serializer = UserSerializer(
|
||||
data={
|
||||
"username": generate_id(),
|
||||
"name": "Test User",
|
||||
"password_hash": make_password("test"), # nosec
|
||||
}
|
||||
)
|
||||
|
||||
self.assertTrue(serializer.is_valid(), serializer.errors)
|
||||
self.assertNotIn("password_hash", serializer.validated_data)
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
from datetime import datetime, timedelta
|
||||
from json import loads
|
||||
|
||||
from django.contrib.auth.hashers import make_password
|
||||
from django.urls.base import reverse
|
||||
from django.utils.timezone import now
|
||||
from rest_framework.test import APITestCase
|
||||
@@ -113,6 +114,39 @@ class TestUsersAPI(APITestCase):
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertJSONEqual(response.content, {"password": ["This field may not be blank."]})
|
||||
|
||||
def test_set_password_hash(self):
|
||||
"""Test setting a user's password from a hash."""
|
||||
self.client.force_login(self.admin)
|
||||
password = generate_key()
|
||||
password_hash = make_password(password)
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:user-set-password-hash", kwargs={"pk": self.user.pk}),
|
||||
data={"password_hash": password_hash},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 204)
|
||||
self.user.refresh_from_db()
|
||||
self.assertEqual(self.user.password, password_hash)
|
||||
self.assertTrue(self.user.check_password(password))
|
||||
|
||||
def test_set_password_hash_invalid(self):
|
||||
"""Test invalid password hashes are rejected."""
|
||||
self.client.force_login(self.admin)
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:user-set-password-hash", kwargs={"pk": self.user.pk}),
|
||||
data={"password_hash": "not-a-valid-hash"},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertJSONEqual(
|
||||
response.content,
|
||||
{
|
||||
"password_hash": [
|
||||
"Invalid password hash format. Must be a valid Django password hash."
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
def test_recovery(self):
|
||||
"""Test user recovery link"""
|
||||
flow = create_test_flow(
|
||||
|
||||
Reference in New Issue
Block a user