(entitlements) add DeployCenter backend for syncing maildomain admins (#572)

Entitlements system with pluggable backends and automatic mail-domain admin sync on login.
Provisioning API to create/update mail domains (bulk input, custom attributes)
Feature flags to enable/disable mail-domain creation and access management; UI and public config expose these flags
This commit is contained in:
Sylvain Zimmer
2026-03-03 15:06:55 +01:00
committed by GitHub
parent 14aff373cf
commit d153f5ad69
29 changed files with 1518 additions and 4 deletions

140
docs/entitlements.md Normal file
View File

@@ -0,0 +1,140 @@
# Entitlements System
The entitlements system provides a pluggable backend architecture for checking user access rights and synchronizing mail domain admin permissions. It integrates with the DeployCenter (Espace Operateur) API in production and uses a local backend for development.
## Architecture
```text
┌─────────────────────────────────────────────┐
│ OIDC Authentication Backend │
│ _sync_entitlements() on every login │
└──────────────┬──────────────────────────────┘
┌──────────────▼──────────────────────────────┐
│ Service Layer │
│ get_user_entitlements() │
└──────────────┬──────────────────────────────┘
┌──────────────▼──────────────────────────────┐
│ Backend Factory (singleton) │
│ get_entitlements_backend() │
└──────────────┬──────────────────────────────┘
┌───────┴───────┐
│ │
┌──────▼─────┐ ┌──────▼───────────────┐
│ Local │ │ DeployCenter │
│ Backend │ │ Backend │
│ (dev/test) │ │ (production, cached) │
└────────────┘ └──────────────────────┘
```
### Components
- **Service layer** (`core/entitlements/__init__.py`): Public `get_user_entitlements()` function and `EntitlementsUnavailableError` exception.
- **Backend factory** (`core/entitlements/factory.py`): `@functools.cache` singleton that imports and instantiates the configured backend class.
- **Abstract base** (`core/entitlements/backends/base.py`): Defines the `EntitlementsBackend` interface.
- **Local backend** (`core/entitlements/backends/local.py`): Always grants access, returns `None` for `can_admin_maildomains` (disabling sync).
- **DeployCenter backend** (`core/entitlements/backends/deploycenter.py`): Calls the DeployCenter API with internal Django cache.
- **OIDC sync** (`core/authentication/backends.py`): Syncs `MailDomainAccess` ADMIN records on every login based on `can_admin_maildomains`.
### Error Handling
- **Login sync is fail-open**: if the entitlements service is unavailable during OIDC login, existing `MailDomainAccess` records are preserved and the user is allowed in.
- The DeployCenter backend falls back to stale cached data when the API is unavailable.
- `EntitlementsUnavailableError` is only raised when the API fails and no cache exists.
## Configuration
### Environment Variables
| Variable | Default | Description |
|---|---|---|
| `ENTITLEMENTS_BACKEND` | `core.entitlements.backends.local.LocalEntitlementsBackend` | Python import path of the backend class |
| `ENTITLEMENTS_BACKEND_PARAMETERS` | `{}` | JSON object of parameters passed to the backend constructor |
| `ENTITLEMENTS_CACHE_TIMEOUT` | `300` | Cache TTL in seconds |
### DeployCenter Backend Parameters
When using `core.entitlements.backends.deploycenter.DeployCenterEntitlementsBackend`, provide these in `ENTITLEMENTS_BACKEND_PARAMETERS`:
```json
{
"base_url": "https://deploycenter.example.com/api/v1.0/entitlements/",
"service_id": "42",
"api_key": "your-api-key",
"timeout": 10,
"oidc_claims": ["siret"]
}
```
| Parameter | Required | Description |
|---|---|---|
| `base_url` | Yes | Full URL of the DeployCenter entitlements endpoint |
| `service_id` | Yes | Service identifier in DeployCenter |
| `api_key` | Yes | API key for `X-Service-Auth` header |
| `timeout` | No | HTTP timeout in seconds (default: 10) |
| `oidc_claims` | No | List of OIDC claim names to extract from user_info and forward as query params (e.g. `["siret"]`) |
### Example Production Configuration
```bash
ENTITLEMENTS_BACKEND=core.entitlements.backends.deploycenter.DeployCenterEntitlementsBackend
ENTITLEMENTS_BACKEND_PARAMETERS='{"base_url":"https://deploycenter.example.com/api/v1.0/entitlements/","service_id":"42","api_key":"secret-key","timeout":10,"oidc_claims":["siret"]}'
ENTITLEMENTS_CACHE_TIMEOUT=300
```
## Backend Interface
Custom backends must extend `EntitlementsBackend` and implement:
```python
class MyBackend(EntitlementsBackend):
def __init__(self, **kwargs):
# Receive ENTITLEMENTS_BACKEND_PARAMETERS as kwargs
pass
def get_user_entitlements(self, user_sub, user_email, user_info=None, force_refresh=False):
# Return: {"can_access": bool, "can_admin_maildomains": list[str] | None}
# Return None for can_admin_maildomains to skip domain admin sync.
# Raise EntitlementsUnavailableError on failure.
pass
```
## DeployCenter API
The DeployCenter backend calls:
```text
GET {base_url}?service_id=X&account_type=user&account_email=X&siret=X
```
Headers:
- `X-Service-Auth: Bearer {api_key}`
Query parameters include any configured `oidc_claims` extracted from the OIDC user_info response (e.g. `siret`).
Response: `{"entitlements": {"can_access": bool, "can_admin_maildomains": [str], ...}}`
## OIDC Login Integration
During OIDC login (`post_get_or_create_user`), the system:
1. Calls `get_user_entitlements` with `force_refresh=True` (resets cache)
2. Syncs `MailDomainAccess` ADMIN records based on `can_admin_maildomains`:
- Compares entitled domains with existing records (optimistic early return if in sync)
- Creates missing admin accesses for entitled domains (using `update_or_create`)
- Removes admin accesses for domains not in the entitled list
3. If `can_admin_maildomains` is `None` (e.g. local backend), sync is skipped entirely
4. If the entitlements service is unavailable, existing accesses are preserved (fail-open)
### Caching Behavior
- The DeployCenter backend caches entitlements in Django's cache framework (TTL: `ENTITLEMENTS_CACHE_TIMEOUT`).
- On login, `force_refresh=True` bypasses the cache to fetch fresh data.
- If the API fails during a forced refresh, stale cached data is returned as fallback.
- Logging out and back in triggers a fresh fetch, effectively resetting the cache for that user.
### Deployment Consideration
Before enabling the DeployCenter backend in production, ensure that existing domain admin assignments are present in DeployCenter. The entitlements sync will **remove** admin accesses that are not in the DeployCenter response.

View File

@@ -245,6 +245,14 @@
"description": "Whether external images should be proxied",
"readOnly": true
},
"FEATURE_MAILDOMAIN_CREATE": {
"type": "boolean",
"readOnly": true
},
"FEATURE_MAILDOMAIN_MANAGE_ACCESSES": {
"type": "boolean",
"readOnly": true
},
"MESSAGES_MANUAL_RETRY_MAX_AGE": {
"type": "integer",
"description": "Maximum age in seconds for a message to be eligible for manual retry of failed deliveries",
@@ -267,6 +275,8 @@
"MAX_RECIPIENTS_PER_MESSAGE",
"MAX_TEMPLATE_IMAGE_SIZE",
"IMAGE_PROXY_ENABLED",
"FEATURE_MAILDOMAIN_CREATE",
"FEATURE_MAILDOMAIN_MANAGE_ACCESSES",
"MESSAGES_MANUAL_RETRY_MAX_AGE"
]
}

View File

@@ -15,6 +15,13 @@ ACTION_FOR_METHOD_TO_PERMISSION = {
}
class DenyAll(permissions.BasePermission):
"""Always denies access. Used to disable endpoints via feature flags."""
def has_permission(self, request, view):
return False
class IsAuthenticated(permissions.BasePermission):
"""
Allows access only to authenticated users. Alternative method checking the presence
@@ -423,6 +430,18 @@ class HasMetricsApiKey(permissions.BasePermission):
)
class HasProvisioningApiKey(permissions.BasePermission):
"""Allows access only to requests bearing the provisioning API key."""
def has_permission(self, request, view):
if not settings.PROVISIONING_API_KEY:
return False
return compare_digest(
request.headers.get("Authorization") or "",
f"Bearer {settings.PROVISIONING_API_KEY}",
)
class HasAccessToMailbox(IsAuthenticated):
"""Allows access only to users with the access to the mailbox."""

View File

@@ -1759,3 +1759,34 @@ class PartialDriveItemSerializer(serializers.Serializer):
def update(self, instance, validated_data):
"""This serializer is only used to validate the data, not to create or update."""
class DomainsField(serializers.Field):
"""Accepts either a JSON list of strings or a comma-separated string."""
def to_internal_value(self, data):
if isinstance(data, str):
data = [d.strip() for d in data.split(",") if d.strip()]
if not isinstance(data, list):
raise serializers.ValidationError(
"Expected a list of domains or a comma-separated string."
)
if not data:
raise serializers.ValidationError("At least one domain is required.")
return data
def to_representation(self, value):
return value
class ProvisioningMailDomainSerializer(serializers.Serializer):
"""Serializer for the provisioning endpoint that creates mail domains."""
domains = DomainsField()
custom_attributes = serializers.JSONField(required=False, default=dict)
def create(self, validated_data):
"""This serializer is only used to validate the data, not to create or update."""
def update(self, instance, validated_data):
"""This serializer is only used to validate the data, not to create or update."""

View File

@@ -111,6 +111,14 @@ class ConfigView(drf.views.APIView):
"description": "Whether external images should be proxied",
"readOnly": True,
},
"FEATURE_MAILDOMAIN_CREATE": {
"type": "boolean",
"readOnly": True,
},
"FEATURE_MAILDOMAIN_MANAGE_ACCESSES": {
"type": "boolean",
"readOnly": True,
},
"MESSAGES_MANUAL_RETRY_MAX_AGE": {
"type": "integer",
"description": (
@@ -136,6 +144,8 @@ class ConfigView(drf.views.APIView):
"MAX_RECIPIENTS_PER_MESSAGE",
"MAX_TEMPLATE_IMAGE_SIZE",
"IMAGE_PROXY_ENABLED",
"FEATURE_MAILDOMAIN_CREATE",
"FEATURE_MAILDOMAIN_MANAGE_ACCESSES",
"MESSAGES_MANUAL_RETRY_MAX_AGE",
],
},
@@ -158,6 +168,8 @@ class ConfigView(drf.views.APIView):
"IMAGE_PROXY_ENABLED",
"MESSAGES_MANUAL_RETRY_MAX_AGE",
"FEATURE_MAILBOX_ADMIN_CHANNELS",
"FEATURE_MAILDOMAIN_CREATE",
"FEATURE_MAILDOMAIN_MANAGE_ACCESSES",
"MAX_OUTGOING_ATTACHMENT_SIZE",
"MAX_OUTGOING_BODY_SIZE",
"MAX_INCOMING_EMAIL_SIZE",

View File

@@ -56,6 +56,8 @@ class AdminMailDomainViewSet(
def get_permissions(self):
if self.action == "create":
if not settings.FEATURE_MAILDOMAIN_CREATE:
return [core_permissions.DenyAll()]
return [core_permissions.IsSuperUser()]
return super().get_permissions()

View File

@@ -1,5 +1,6 @@
"""API ViewSet for MaildomainAccess model."""
from django.conf import settings
from django.shortcuts import get_object_or_404
from drf_spectacular.utils import extend_schema
@@ -31,6 +32,12 @@ class MaildomainAccessViewSet(
lookup_field = "pk"
pagination_class = None
def get_permissions(self):
if self.action in ("create", "destroy"):
if not settings.FEATURE_MAILDOMAIN_MANAGE_ACCESSES:
return [core_permissions.DenyAll()]
return super().get_permissions()
def get_serializer_class(self):
"""Select serializer based on action."""
if self.action in ["create", "update", "partial_update"]:

View File

@@ -0,0 +1,74 @@
"""API view for provisioning mail domains from DeployCenter."""
import logging
from django.core.exceptions import ValidationError
from django.db import IntegrityError
from drf_spectacular.utils import extend_schema
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView
from sentry_sdk import capture_exception
from core.api.permissions import HasProvisioningApiKey
from core.api.serializers import ProvisioningMailDomainSerializer
from core.models import MailDomain
logger = logging.getLogger(__name__)
class ProvisioningMailDomainView(APIView):
"""Provision mail domains from DeployCenter webhooks."""
permission_classes = [HasProvisioningApiKey]
authentication_classes = []
@extend_schema(exclude=True)
def post(self, request):
"""Provision mail domains from a list of domain names."""
serializer = ProvisioningMailDomainSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
domains = serializer.validated_data["domains"]
custom_attributes = serializer.validated_data.get("custom_attributes", {})
created = []
existing = []
errors = []
for domain_name in domains:
try:
domain, was_created = MailDomain.objects.get_or_create(
name=domain_name,
defaults={"custom_attributes": custom_attributes},
)
if was_created:
created.append(domain_name)
else:
if domain.custom_attributes != custom_attributes:
domain.custom_attributes = custom_attributes
domain.save()
existing.append(domain_name)
except ValidationError as e:
errors.append({"domain": domain_name, "error": str(e)})
except IntegrityError as exc:
capture_exception(exc)
logger.exception(
"IntegrityError while provisioning domain %s", domain_name
)
errors.append(
{
"domain": domain_name,
"error": "Failed to provision domain.",
}
)
return Response(
{
"created": created,
"existing": existing,
"errors": errors,
},
status=status.HTTP_200_OK,
)

View File

@@ -10,13 +10,15 @@ from lasuite.oidc_login.backends import (
OIDCAuthenticationBackend as LaSuiteOIDCAuthenticationBackend,
)
from core.enums import MailboxRoleChoices
from core.entitlements import EntitlementsUnavailableError, get_user_entitlements
from core.enums import MailboxRoleChoices, MailDomainAccessRoleChoices
from core.models import (
Contact,
DuplicateEmailError,
Mailbox,
MailboxAccess,
MailDomain,
MailDomainAccess,
User,
)
@@ -48,6 +50,7 @@ class OIDCAuthenticationBackend(LaSuiteOIDCAuthenticationBackend):
"""
_user_created = False
user_info = self.get_userinfo(access_token, id_token, payload)
self._user_info = user_info # pylint: disable=attribute-defined-outside-init
if not self.verify_claims(user_info):
msg = "Claims verification failed"
@@ -87,6 +90,69 @@ class OIDCAuthenticationBackend(LaSuiteOIDCAuthenticationBackend):
"""Post-get or create user."""
if user:
self.autojoin_mailbox(user)
self._sync_entitlements(user)
def _sync_entitlements(self, user):
"""Fetch user entitlements and sync MailDomainAccess ADMIN records.
Called on every login. Uses force_refresh=True to reset the cache.
If the entitlements backend is unavailable, existing accesses are preserved.
"""
user_info = getattr(self, "_user_info", None)
try:
entitlements = get_user_entitlements(
user.sub, user.email, user_info=user_info, force_refresh=True
)
except EntitlementsUnavailableError:
logger.warning("Entitlements service unavailable during login")
return
admin_domains = entitlements.get("can_admin_maildomains")
if admin_domains is None:
# Backend doesn't support this field (e.g. dummy), skip sync
return
if not isinstance(admin_domains, (list, tuple, set)):
logger.warning(
"Invalid type for can_admin_maildomains: %s, skipping sync",
type(admin_domains).__name__,
)
return
# Resolve domain names to MailDomain objects that exist in DB
entitled_domains = list(MailDomain.objects.filter(name__in=admin_domains))
entitled_domain_ids = {d.id for d in entitled_domains}
# Get current ADMIN accesses for this user
existing_accesses = MailDomainAccess.objects.filter(
user=user, role=MailDomainAccessRoleChoices.ADMIN
)
existing_domain_ids = set(
existing_accesses.values_list("maildomain_id", flat=True)
)
# Optimistic path: if already in sync, skip all writes
if entitled_domain_ids == existing_domain_ids:
return
# Add new accesses
for domain in entitled_domains:
if domain.id not in existing_domain_ids:
MailDomainAccess.objects.update_or_create(
user=user,
maildomain=domain,
defaults={"role": MailDomainAccessRoleChoices.ADMIN},
)
# Remove stale accesses (domains not in the entitled list)
stale_domain_ids = existing_domain_ids - entitled_domain_ids
if stale_domain_ids:
MailDomainAccess.objects.filter(
user=user,
maildomain_id__in=stale_domain_ids,
role=MailDomainAccessRoleChoices.ADMIN,
).delete()
def get_extra_claims(self, user_info):
"""Get extra claims."""

View File

@@ -0,0 +1,28 @@
"""Entitlements service layer."""
from core.entitlements.factory import get_entitlements_backend
class EntitlementsUnavailableError(Exception):
"""Raised when the entitlements backend cannot be reached or returns an error."""
def get_user_entitlements(user_sub, user_email, user_info=None, force_refresh=False):
"""Get user entitlements, delegating to the configured backend.
Args:
user_sub: The user's OIDC subject identifier.
user_email: The user's email address.
user_info: The full OIDC user_info dict (forwarded to backend).
force_refresh: If True, bypass backend cache and fetch fresh data.
Returns:
dict: {"can_access": bool, "can_admin_maildomains": list[str] | None}
Raises:
EntitlementsUnavailableError: If the backend cannot be reached and no cache exists.
"""
backend = get_entitlements_backend()
return backend.get_user_entitlements(
user_sub, user_email, user_info=user_info, force_refresh=force_refresh
)

View File

@@ -0,0 +1,29 @@
"""Abstract base class for entitlements backends."""
from abc import ABC, abstractmethod
class EntitlementsBackend(ABC):
"""Abstract base class that defines the interface for entitlements backends."""
@abstractmethod
def get_user_entitlements(
self, user_sub, user_email, user_info=None, force_refresh=False
):
"""Fetch user entitlements.
Args:
user_sub: The user's OIDC subject identifier.
user_email: The user's email address.
user_info: The full OIDC user_info dict (backends may extract claims from it).
force_refresh: If True, bypass any cache and fetch fresh data.
Returns:
dict: {
"can_access": bool,
"can_admin_maildomains": list[str] | None,
}
Raises:
EntitlementsUnavailableError: If the backend cannot be reached.
"""

View File

@@ -0,0 +1,113 @@
"""DeployCenter (Espace Operateur) entitlements backend."""
import logging
from django.conf import settings
from django.core.cache import cache
import requests
from core.entitlements import EntitlementsUnavailableError
from core.entitlements.backends.base import EntitlementsBackend
logger = logging.getLogger(__name__)
class DeployCenterEntitlementsBackend(EntitlementsBackend):
"""Backend that fetches entitlements from the DeployCenter API.
Args:
base_url: Full URL of the entitlements endpoint
(e.g. "https://dc.example.com/api/v1.0/entitlements/").
service_id: The service identifier in DeployCenter.
api_key: API key for X-Service-Auth header.
timeout: HTTP request timeout in seconds.
oidc_claims: List of OIDC claim names to extract from user_info
and forward as query params (e.g. ["siret"]).
"""
def __init__(
self, base_url, service_id, api_key, timeout=10, oidc_claims=None, **kwargs
):
super().__init__(**kwargs)
self.base_url = base_url
self.service_id = service_id
self.api_key = api_key
self.timeout = timeout
self.oidc_claims = oidc_claims or []
def _cache_key(self, user_sub):
return f"entitlements:user:{user_sub}"
def _make_request(self, user_email, user_info=None):
"""Make a request to the DeployCenter entitlements API.
Returns:
dict | None: The response data, or None on failure.
"""
params = {
"service_id": self.service_id,
"account_type": "user",
"account_email": user_email,
}
# Forward configured OIDC claims as query params
if user_info:
for claim in self.oidc_claims:
if claim in user_info:
params[claim] = user_info[claim]
headers = {
"X-Service-Auth": f"Bearer {self.api_key}",
}
try:
response = requests.get(
self.base_url, params=params, headers=headers, timeout=self.timeout
)
response.raise_for_status()
return response.json()
except (requests.RequestException, ValueError):
email_domain = user_email.split("@")[-1] if "@" in user_email else "?"
logger.warning(
"DeployCenter entitlements request failed for user@%s",
email_domain,
exc_info=True,
)
return None
def get_user_entitlements(
self, user_sub, user_email, user_info=None, force_refresh=False
):
"""Fetch user entitlements from DeployCenter with caching.
On cache miss or force_refresh: fetches from the API.
On API failure: falls back to stale cache if available,
otherwise raises EntitlementsUnavailableError.
"""
cache_key = self._cache_key(user_sub)
if not force_refresh:
cached = cache.get(cache_key)
if cached is not None:
return cached
data = self._make_request(user_email, user_info=user_info)
if data is None:
# API failed — try stale cache as fallback
cached = cache.get(cache_key)
if cached is not None:
return cached
raise EntitlementsUnavailableError(
"Failed to fetch user entitlements from DeployCenter"
)
entitlements = data.get("entitlements", {})
result = {
"can_access": entitlements.get("can_access", False),
"can_admin_maildomains": entitlements.get("can_admin_maildomains"),
}
cache.set(cache_key, result, settings.ENTITLEMENTS_CACHE_TIMEOUT)
return result

View File

@@ -0,0 +1,16 @@
"""Local entitlements backend for development and testing."""
from core.entitlements.backends.base import EntitlementsBackend
class LocalEntitlementsBackend(EntitlementsBackend):
"""Local backend that always grants access. Returns None for
can_admin_maildomains to signal that domain admin sync is not supported."""
def get_user_entitlements(
self, user_sub, user_email, user_info=None, force_refresh=False
):
return {
"can_access": True,
"can_admin_maildomains": None,
}

View File

@@ -0,0 +1,13 @@
"""Factory for creating entitlements backend instances."""
import functools
from django.conf import settings
from django.utils.module_loading import import_string
@functools.cache
def get_entitlements_backend():
"""Return a singleton instance of the configured entitlements backend."""
backend_class = import_string(settings.ENTITLEMENTS_BACKEND)
return backend_class(**settings.ENTITLEMENTS_BACKEND_PARAMETERS)

View File

@@ -1,6 +1,7 @@
"""Tests for the MailDomain Admin API endpoints."""
# pylint: disable=redefined-outer-name, unused-argument
from django.test import override_settings
from django.urls import reverse
import pytest
@@ -92,3 +93,14 @@ class TestAdminMailDomainsCreate:
self.CREATE_DOMAIN_URL, {"name": "dup.com"}, format="json"
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
@override_settings(FEATURE_MAILDOMAIN_CREATE=False)
def test_create_mail_domain_feature_flag_disabled(
self, api_client, domain_superuser_user
):
"""Superuser should get 403 when FEATURE_MAILDOMAIN_CREATE is False."""
api_client.force_authenticate(user=domain_superuser_user)
data = {"name": "blocked-domain.com"}
response = api_client.post(self.CREATE_DOMAIN_URL, data, format="json")
assert response.status_code == status.HTTP_403_FORBIDDEN
assert not models.MailDomain.objects.filter(name="blocked-domain.com").exists()

View File

@@ -24,6 +24,8 @@ pytestmark = pytest.mark.django_db
FEATURE_AI_SUMMARY=False,
FEATURE_AI_AUTOLABELS=False,
FEATURE_MAILBOX_ADMIN_CHANNELS=[],
FEATURE_MAILDOMAIN_CREATE=True,
FEATURE_MAILDOMAIN_MANAGE_ACCESSES=True,
DRIVE_CONFIG={"base_url": None, "app_name": "Drive"},
MAX_OUTGOING_ATTACHMENT_SIZE=20971520, # 20MB
MAX_OUTGOING_BODY_SIZE=5242880, # 5MB
@@ -52,6 +54,8 @@ def test_api_config(is_authenticated):
"FEATURE_AI_SUMMARY": False,
"FEATURE_AI_AUTOLABELS": False,
"FEATURE_MAILBOX_ADMIN_CHANNELS": [],
"FEATURE_MAILDOMAIN_CREATE": True,
"FEATURE_MAILDOMAIN_MANAGE_ACCESSES": True,
"SCHEMA_CUSTOM_ATTRIBUTES_USER": {},
"SCHEMA_CUSTOM_ATTRIBUTES_MAILDOMAIN": {},
"MAX_INCOMING_EMAIL_SIZE": 10485760,

View File

@@ -2,6 +2,7 @@
# pylint: disable=unused-argument
from django.contrib.auth.models import AnonymousUser
from django.test import override_settings
from django.urls import reverse
import pytest
@@ -409,3 +410,50 @@ class TestMaildomainAccessViewSet:
assert "id" in user_details
assert "email" in user_details
assert "full_name" in user_details
# --- FEATURE FLAG Tests ---
@override_settings(FEATURE_MAILDOMAIN_MANAGE_ACCESSES=False)
def test_admin_api_maildomain_accesses_create_feature_flag_disabled(
self,
api_client,
super_user,
maildomain_1,
md1_access,
md2_admin_user,
):
"""Creating access should return 403 when FEATURE_MAILDOMAIN_MANAGE_ACCESSES is False."""
api_client.force_authenticate(user=super_user)
data = {"user": str(md2_admin_user.pk), "role": "admin"}
response = api_client.post(
self.list_create_url(maildomain_pk=maildomain_1.pk), data
)
assert response.status_code == status.HTTP_403_FORBIDDEN
@override_settings(FEATURE_MAILDOMAIN_MANAGE_ACCESSES=False)
def test_admin_api_maildomain_accesses_delete_feature_flag_disabled(
self,
api_client,
super_user,
maildomain_1,
md1_access,
):
"""Deleting access should return 403 when FEATURE_MAILDOMAIN_MANAGE_ACCESSES is False."""
api_client.force_authenticate(user=super_user)
response = api_client.delete(
self.detail_url(maildomain_pk=maildomain_1.pk, pk=md1_access.pk)
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert models.MailDomainAccess.objects.filter(pk=md1_access.pk).exists()
@override_settings(FEATURE_MAILDOMAIN_MANAGE_ACCESSES=False)
def test_admin_api_maildomain_accesses_list_feature_flag_disabled(
self,
api_client,
super_user,
maildomain_1,
md1_access,
):
"""Listing accesses should still return 200 when feature flag is disabled (read-only is fine)."""
api_client.force_authenticate(user=super_user)
response = api_client.get(self.list_create_url(maildomain_pk=maildomain_1.pk))
assert response.status_code == status.HTTP_200_OK

View File

@@ -0,0 +1,218 @@
"""Tests for the provisioning maildomains endpoint."""
# pylint: disable=redefined-outer-name
from django.urls import reverse
import pytest
from core.factories import MailDomainFactory
from core.models import MailDomain
@pytest.fixture
def url():
"""Returns the URL for the provisioning maildomains endpoint."""
return reverse("provisioning-maildomains")
@pytest.fixture
def auth_header(settings):
"""Returns the authentication header for the provisioning endpoint."""
settings.PROVISIONING_API_KEY = "test-provisioning-key"
return {"HTTP_AUTHORIZATION": "Bearer test-provisioning-key"}
# -- Authentication tests --
@pytest.mark.django_db
def test_provisioning_no_auth_returns_403(client, url):
"""Request without Authorization header returns 403."""
response = client.post(
url, data={"domains": ["test.fr"]}, content_type="application/json"
)
assert response.status_code == 403
@pytest.mark.django_db
def test_provisioning_wrong_token_returns_403(client, url, settings):
"""Request with wrong token returns 403."""
settings.PROVISIONING_API_KEY = "correct-key"
response = client.post(
url,
data={"domains": ["test.fr"]},
content_type="application/json",
HTTP_AUTHORIZATION="Bearer wrong-key",
)
assert response.status_code == 403
@pytest.mark.django_db
def test_provisioning_no_key_configured_returns_403(client, url, settings):
"""When PROVISIONING_API_KEY is not configured, returns 403."""
settings.PROVISIONING_API_KEY = None
response = client.post(
url,
data={"domains": ["test.fr"]},
content_type="application/json",
HTTP_AUTHORIZATION="Bearer some-key",
)
assert response.status_code == 403
# -- Create tests --
@pytest.mark.django_db
def test_provisioning_creates_domains(client, url, auth_header):
"""New domains are created with correct custom_attributes."""
response = client.post(
url,
data={
"domains": ["domaine.fr", "autre.fr"],
"custom_attributes": {"siret": "12345678901234"},
},
content_type="application/json",
**auth_header,
)
assert response.status_code == 200
data = response.json()
assert sorted(data["created"]) == ["autre.fr", "domaine.fr"]
assert data["existing"] == []
assert data["errors"] == []
for name in ["domaine.fr", "autre.fr"]:
domain = MailDomain.objects.get(name=name)
assert domain.custom_attributes == {"siret": "12345678901234"}
@pytest.mark.django_db
def test_provisioning_creates_domains_without_custom_attributes(
client, url, auth_header
):
"""Domains can be created without custom_attributes."""
response = client.post(
url,
data={"domains": ["simple.fr"]},
content_type="application/json",
**auth_header,
)
assert response.status_code == 200
data = response.json()
assert data["created"] == ["simple.fr"]
domain = MailDomain.objects.get(name="simple.fr")
assert domain.custom_attributes == {}
# -- Idempotency tests --
@pytest.mark.django_db
def test_provisioning_idempotent_existing_domains(client, url, auth_header):
"""Existing domains are not duplicated."""
MailDomainFactory(name="existing.fr", custom_attributes={"siret": "111"})
response = client.post(
url,
data={
"domains": ["existing.fr", "new.fr"],
"custom_attributes": {"siret": "111"},
},
content_type="application/json",
**auth_header,
)
assert response.status_code == 200
data = response.json()
assert data["created"] == ["new.fr"]
assert data["existing"] == ["existing.fr"]
assert data["errors"] == []
assert MailDomain.objects.filter(name="existing.fr").count() == 1
@pytest.mark.django_db
def test_provisioning_updates_custom_attributes_on_existing(client, url, auth_header):
"""custom_attributes are updated on existing domains when they differ."""
MailDomainFactory(name="existing.fr", custom_attributes={"siret": "old"})
response = client.post(
url,
data={
"domains": ["existing.fr"],
"custom_attributes": {"siret": "new"},
},
content_type="application/json",
**auth_header,
)
assert response.status_code == 200
data = response.json()
assert data["existing"] == ["existing.fr"]
domain = MailDomain.objects.get(name="existing.fr")
assert domain.custom_attributes == {"siret": "new"}
# -- Comma-separated string format --
@pytest.mark.django_db
def test_provisioning_accepts_comma_separated_string(client, url, auth_header):
"""The endpoint accepts a comma-separated string of domains."""
response = client.post(
url,
data={
"domains": "alpha.fr,beta.fr",
"custom_attributes": {"siret": "123"},
},
content_type="application/json",
**auth_header,
)
assert response.status_code == 200
data = response.json()
assert sorted(data["created"]) == ["alpha.fr", "beta.fr"]
# -- Validation tests --
@pytest.mark.django_db
def test_provisioning_invalid_domain_returns_error(client, url, auth_header):
"""Invalid domain names return structured errors, not 500."""
response = client.post(
url,
data={
"domains": ["valid.fr", "INVALID DOMAIN!"],
"custom_attributes": {"siret": "123"},
},
content_type="application/json",
**auth_header,
)
assert response.status_code == 200
data = response.json()
assert data["created"] == ["valid.fr"]
assert len(data["errors"]) == 1
assert data["errors"][0]["domain"] == "INVALID DOMAIN!"
@pytest.mark.django_db
def test_provisioning_empty_domains_returns_400(client, url, auth_header):
"""Empty domains list returns 400."""
response = client.post(
url,
data={"domains": [], "custom_attributes": {"siret": "123"}},
content_type="application/json",
**auth_header,
)
assert response.status_code == 400
@pytest.mark.django_db
def test_provisioning_missing_domains_returns_400(client, url, auth_header):
"""Missing domains field returns 400."""
response = client.post(
url,
data={"custom_attributes": {"siret": "123"}},
content_type="application/json",
**auth_header,
)
assert response.status_code == 400

View File

@@ -0,0 +1,334 @@
"""Unit tests for entitlements backends."""
from django.core.cache import cache
from django.test import override_settings
import pytest
import requests
import responses
from core.entitlements import EntitlementsUnavailableError
from core.entitlements.backends.deploycenter import DeployCenterEntitlementsBackend
from core.entitlements.backends.local import LocalEntitlementsBackend
@pytest.fixture(autouse=True)
def _clear_cache():
cache.clear()
yield
cache.clear()
class TestLocalBackend:
"""Tests for the LocalEntitlementsBackend."""
def test_get_user_entitlements(self):
"""Local backend grants access and returns None for admin domains."""
backend = LocalEntitlementsBackend()
result = backend.get_user_entitlements("user-sub", "user@example.com")
assert result == {
"can_access": True,
"can_admin_maildomains": None,
}
def test_force_refresh_has_no_effect(self):
"""force_refresh is accepted but has no effect on local backend."""
backend = LocalEntitlementsBackend()
result = backend.get_user_entitlements(
"user-sub", "user@example.com", force_refresh=True
)
assert result["can_access"] is True
BASE_URL = "https://deploycenter.example.com/api/v1.0/entitlements"
class TestDeployCenterBackend:
"""Tests for the DeployCenterEntitlementsBackend."""
def _get_backend(self, **kwargs):
defaults = {
"base_url": BASE_URL,
"service_id": "test-service",
"api_key": "test-api-key",
"timeout": 5,
}
defaults.update(kwargs)
return DeployCenterEntitlementsBackend(**defaults)
@responses.activate
def test_get_user_entitlements_success(self):
"""Successful API call returns parsed entitlements."""
responses.add(
responses.GET,
BASE_URL,
json={
"entitlements": {
"can_access": True,
"can_admin_maildomains": ["example.com", "test.org"],
},
},
status=200,
)
backend = self._get_backend()
result = backend.get_user_entitlements("user-sub", "user@example.com")
assert result == {
"can_access": True,
"can_admin_maildomains": ["example.com", "test.org"],
}
@responses.activate
def test_request_params_and_headers(self):
"""Request includes correct query params and auth header."""
responses.add(
responses.GET,
BASE_URL,
json={"entitlements": {"can_access": True}},
status=200,
)
backend = self._get_backend()
backend.get_user_entitlements("user-sub", "user@example.com")
request = responses.calls[0].request
assert "service_id=test-service" in request.url
assert "account_type=user" in request.url
assert "account_email=user%40example.com" in request.url
assert request.headers["X-Service-Auth"] == "Bearer test-api-key"
@responses.activate
def test_forwards_oidc_claims_as_query_params(self):
"""Configured OIDC claims from user_info are sent as query params."""
responses.add(
responses.GET,
BASE_URL,
json={"entitlements": {"can_access": True}},
status=200,
)
backend = self._get_backend(oidc_claims=["siret", "other_claim"])
user_info = {"siret": "12345678901234", "other_claim": "value", "ignored": "x"}
backend.get_user_entitlements(
"user-sub", "user@example.com", user_info=user_info
)
request = responses.calls[0].request
assert "siret=12345678901234" in request.url
assert "other_claim=value" in request.url
assert "ignored" not in request.url
@responses.activate
def test_missing_oidc_claim_not_sent(self):
"""If a configured claim is absent from user_info, it's just not sent."""
responses.add(
responses.GET,
BASE_URL,
json={"entitlements": {"can_access": True}},
status=200,
)
backend = self._get_backend(oidc_claims=["siret"])
backend.get_user_entitlements(
"user-sub", "user@example.com", user_info={"email": "x@y.com"}
)
request = responses.calls[0].request
assert "siret" not in request.url
@responses.activate
def test_server_error_raises(self):
"""Server error raises EntitlementsUnavailableError."""
responses.add(responses.GET, BASE_URL, status=500)
backend = self._get_backend()
with pytest.raises(EntitlementsUnavailableError):
backend.get_user_entitlements("user-sub", "user@example.com")
@responses.activate
def test_connection_timeout_raises(self):
"""Connection timeout raises EntitlementsUnavailableError."""
responses.add(
responses.GET,
BASE_URL,
body=requests.exceptions.ConnectionError("Connection timed out"),
)
backend = self._get_backend()
with pytest.raises(EntitlementsUnavailableError):
backend.get_user_entitlements("user-sub", "user@example.com")
@responses.activate
@override_settings(ENTITLEMENTS_CACHE_TIMEOUT=300)
def test_cache_hit_returns_cached_without_http(self):
"""Second call uses cache, no HTTP request made."""
responses.add(
responses.GET,
BASE_URL,
json={
"entitlements": {
"can_access": True,
"can_admin_maildomains": ["example.com"],
},
},
status=200,
)
backend = self._get_backend()
# First call populates cache
result1 = backend.get_user_entitlements("user-sub", "user@example.com")
# Second call should use cache — no HTTP
result2 = backend.get_user_entitlements("user-sub", "user@example.com")
assert result1 == result2
assert len(responses.calls) == 1
@responses.activate
@override_settings(ENTITLEMENTS_CACHE_TIMEOUT=300)
def test_force_refresh_bypasses_cache(self):
"""force_refresh=True makes a new HTTP call even with cached data."""
responses.add(
responses.GET,
BASE_URL,
json={"entitlements": {"can_access": True, "can_admin_maildomains": []}},
status=200,
)
responses.add(
responses.GET,
BASE_URL,
json={
"entitlements": {
"can_access": True,
"can_admin_maildomains": ["new.com"],
},
},
status=200,
)
backend = self._get_backend()
result1 = backend.get_user_entitlements("user-sub", "user@example.com")
result2 = backend.get_user_entitlements(
"user-sub", "user@example.com", force_refresh=True
)
assert len(responses.calls) == 2
assert result1["can_admin_maildomains"] == []
assert result2["can_admin_maildomains"] == ["new.com"]
@responses.activate
@override_settings(ENTITLEMENTS_CACHE_TIMEOUT=300)
def test_failure_with_stale_cache_returns_cached(self):
"""When force_refresh fails but stale cache exists, return cached data."""
responses.add(
responses.GET,
BASE_URL,
json={
"entitlements": {
"can_access": True,
"can_admin_maildomains": ["cached.com"],
},
},
status=200,
)
# Second call will fail
responses.add(responses.GET, BASE_URL, status=500)
backend = self._get_backend()
# Populate cache
backend.get_user_entitlements("user-sub", "user@example.com")
# Force refresh fails, should fall back to cache
result = backend.get_user_entitlements(
"user-sub", "user@example.com", force_refresh=True
)
assert result["can_admin_maildomains"] == ["cached.com"]
assert len(responses.calls) == 2
@responses.activate
def test_failure_without_cache_raises(self):
"""API failure with no cache raises EntitlementsUnavailableError."""
responses.add(responses.GET, BASE_URL, status=500)
backend = self._get_backend()
with pytest.raises(EntitlementsUnavailableError):
backend.get_user_entitlements("user-sub", "user@example.com")
@responses.activate
def test_missing_fields_defaults(self):
"""Backend should provide sensible defaults for missing response fields."""
responses.add(
responses.GET,
BASE_URL,
json={"entitlements": {}},
status=200,
)
backend = self._get_backend()
result = backend.get_user_entitlements("user-sub", "user@example.com")
assert result == {
"can_access": False,
"can_admin_maildomains": None,
}
@responses.activate
def test_missing_entitlements_key(self):
"""Response with no entitlements key returns safe defaults."""
responses.add(responses.GET, BASE_URL, json={}, status=200)
backend = self._get_backend()
result = backend.get_user_entitlements("user-sub", "user@example.com")
assert result == {
"can_access": False,
"can_admin_maildomains": None,
}
@responses.activate
@override_settings(ENTITLEMENTS_CACHE_TIMEOUT=300)
def test_different_users_have_different_cache_keys(self):
"""Each user_sub gets its own cache entry."""
responses.add(
responses.GET,
BASE_URL,
json={
"entitlements": {
"can_access": True,
"can_admin_maildomains": ["a.com"],
},
},
status=200,
)
responses.add(
responses.GET,
BASE_URL,
json={
"entitlements": {
"can_access": False,
"can_admin_maildomains": ["b.com"],
},
},
status=200,
)
backend = self._get_backend()
result1 = backend.get_user_entitlements("user1", "user1@example.com")
result2 = backend.get_user_entitlements("user2", "user2@example.com")
assert result1["can_admin_maildomains"] == ["a.com"]
assert result2["can_admin_maildomains"] == ["b.com"]
assert len(responses.calls) == 2
@responses.activate
def test_invalid_json_response_raises(self):
"""ValueError from response.json() should be handled."""
responses.add(
responses.GET,
BASE_URL,
body="not json",
status=200,
content_type="text/plain",
)
backend = self._get_backend()
with pytest.raises(EntitlementsUnavailableError):
backend.get_user_entitlements("user-sub", "user@example.com")

View File

@@ -0,0 +1,290 @@
"""Tests for entitlements sync during OIDC login."""
from unittest import mock
from django.core.cache import cache
from django.db import connection
from django.test.utils import CaptureQueriesContext
import pytest
from core import factories
from core.authentication.backends import OIDCAuthenticationBackend
from core.entitlements import EntitlementsUnavailableError
from core.enums import MailDomainAccessRoleChoices
from core.models import MailDomainAccess
pytestmark = pytest.mark.django_db
@pytest.fixture(autouse=True)
def _clear_cache():
cache.clear()
yield
cache.clear()
class TestSyncEntitlements: # pylint: disable=protected-access
"""Tests for _sync_entitlements called during OIDC login."""
@mock.patch("core.authentication.backends.get_user_entitlements")
def test_creates_admin_access_for_entitled_domains(self, mock_get):
"""Admin access is created for each entitled domain."""
user = factories.UserFactory()
domain1 = factories.MailDomainFactory(name="domain1.com")
domain2 = factories.MailDomainFactory(name="domain2.com")
mock_get.return_value = {
"can_access": True,
"can_admin_maildomains": ["domain1.com", "domain2.com"],
}
backend = OIDCAuthenticationBackend()
backend._sync_entitlements(user)
assert MailDomainAccess.objects.filter(
user=user, maildomain=domain1, role=MailDomainAccessRoleChoices.ADMIN
).exists()
assert MailDomainAccess.objects.filter(
user=user, maildomain=domain2, role=MailDomainAccessRoleChoices.ADMIN
).exists()
@mock.patch("core.authentication.backends.get_user_entitlements")
def test_removes_stale_admin_access(self, mock_get):
"""Admin access for domains no longer entitled is removed."""
user = factories.UserFactory()
domain1 = factories.MailDomainFactory(name="domain1.com")
domain2 = factories.MailDomainFactory(name="domain2.com")
# User currently has admin access to both domains
factories.MailDomainAccessFactory(
user=user, maildomain=domain1, role=MailDomainAccessRoleChoices.ADMIN
)
factories.MailDomainAccessFactory(
user=user, maildomain=domain2, role=MailDomainAccessRoleChoices.ADMIN
)
# Entitlements now only include domain1
mock_get.return_value = {
"can_access": True,
"can_admin_maildomains": ["domain1.com"],
}
backend = OIDCAuthenticationBackend()
backend._sync_entitlements(user)
assert MailDomainAccess.objects.filter(user=user, maildomain=domain1).exists()
assert not MailDomainAccess.objects.filter(
user=user, maildomain=domain2
).exists()
@mock.patch("core.authentication.backends.get_user_entitlements")
def test_skips_sync_when_can_admin_maildomains_is_none(self, mock_get):
"""If can_admin_maildomains is None (e.g. local backend), skip sync entirely."""
user = factories.UserFactory()
domain = factories.MailDomainFactory(name="domain.com")
factories.MailDomainAccessFactory(
user=user, maildomain=domain, role=MailDomainAccessRoleChoices.ADMIN
)
mock_get.return_value = {
"can_access": True,
"can_admin_maildomains": None,
}
backend = OIDCAuthenticationBackend()
backend._sync_entitlements(user)
# Existing access should still be there
assert MailDomainAccess.objects.filter(user=user, maildomain=domain).exists()
@mock.patch("core.authentication.backends.get_user_entitlements")
def test_empty_list_removes_all_admin_accesses(self, mock_get):
"""An empty list means the user has no admin access to any domain."""
user = factories.UserFactory()
domain = factories.MailDomainFactory(name="domain.com")
factories.MailDomainAccessFactory(
user=user, maildomain=domain, role=MailDomainAccessRoleChoices.ADMIN
)
mock_get.return_value = {
"can_access": True,
"can_admin_maildomains": [],
}
backend = OIDCAuthenticationBackend()
backend._sync_entitlements(user)
assert MailDomainAccess.objects.filter(user=user).count() == 0
@mock.patch("core.authentication.backends.get_user_entitlements")
def test_handles_unavailable_error(self, mock_get):
"""On EntitlementsUnavailableError, existing accesses are preserved."""
user = factories.UserFactory()
domain = factories.MailDomainFactory(name="domain.com")
factories.MailDomainAccessFactory(
user=user, maildomain=domain, role=MailDomainAccessRoleChoices.ADMIN
)
mock_get.side_effect = EntitlementsUnavailableError("Backend down")
backend = OIDCAuthenticationBackend()
backend._sync_entitlements(user)
# Existing access should NOT be removed
assert MailDomainAccess.objects.filter(user=user, maildomain=domain).exists()
@mock.patch("core.authentication.backends.get_user_entitlements")
def test_handles_timeout(self, mock_get):
"""On timeout (also EntitlementsUnavailableError), existing accesses preserved."""
user = factories.UserFactory()
domain = factories.MailDomainFactory(name="domain.com")
factories.MailDomainAccessFactory(
user=user, maildomain=domain, role=MailDomainAccessRoleChoices.ADMIN
)
mock_get.side_effect = EntitlementsUnavailableError("Connection timed out")
backend = OIDCAuthenticationBackend()
backend._sync_entitlements(user)
assert MailDomainAccess.objects.filter(user=user, maildomain=domain).exists()
@mock.patch("core.authentication.backends.get_user_entitlements")
def test_skips_nonexistent_domains(self, mock_get):
"""Domains not present in the DB are silently skipped."""
user = factories.UserFactory()
mock_get.return_value = {
"can_access": True,
"can_admin_maildomains": ["nonexistent.com"],
}
backend = OIDCAuthenticationBackend()
backend._sync_entitlements(user)
assert MailDomainAccess.objects.filter(user=user).count() == 0
@mock.patch("core.authentication.backends.get_user_entitlements")
def test_does_not_duplicate_existing_access(self, mock_get):
"""Should not create duplicate MailDomainAccess records."""
user = factories.UserFactory()
domain = factories.MailDomainFactory(name="domain.com")
factories.MailDomainAccessFactory(
user=user, maildomain=domain, role=MailDomainAccessRoleChoices.ADMIN
)
mock_get.return_value = {
"can_access": True,
"can_admin_maildomains": ["domain.com"],
}
backend = OIDCAuthenticationBackend()
backend._sync_entitlements(user)
assert (
MailDomainAccess.objects.filter(user=user, maildomain=domain).count() == 1
)
@mock.patch("core.authentication.backends.get_user_entitlements")
def test_force_refresh_is_used(self, mock_get):
"""Should call get_user_entitlements with force_refresh=True."""
user = factories.UserFactory()
mock_get.return_value = {
"can_access": True,
"can_admin_maildomains": [],
}
backend = OIDCAuthenticationBackend()
backend._sync_entitlements(user)
mock_get.assert_called_once_with(
user.sub, user.email, user_info=None, force_refresh=True
)
@mock.patch("core.authentication.backends.get_user_entitlements")
def test_passes_user_info_from_oidc(self, mock_get):
"""Should forward the stored OIDC user_info to get_user_entitlements."""
user = factories.UserFactory()
user_info = {"sub": user.sub, "email": user.email, "siret": "12345678901234"}
mock_get.return_value = {
"can_access": True,
"can_admin_maildomains": [],
}
backend = OIDCAuthenticationBackend()
backend._user_info = user_info
backend._sync_entitlements(user)
mock_get.assert_called_once_with(
user.sub, user.email, user_info=user_info, force_refresh=True
)
@mock.patch("core.authentication.backends.get_user_entitlements")
def test_optimistic_path_no_db_writes_when_in_sync(self, mock_get):
"""When entitled domains match existing accesses, no DB writes should occur."""
user = factories.UserFactory()
domain = factories.MailDomainFactory(name="domain.com")
factories.MailDomainAccessFactory(
user=user, maildomain=domain, role=MailDomainAccessRoleChoices.ADMIN
)
mock_get.return_value = {
"can_access": True,
"can_admin_maildomains": ["domain.com"],
}
backend = OIDCAuthenticationBackend()
with CaptureQueriesContext(connection) as ctx:
backend._sync_entitlements(user)
# Only SELECT queries (no INSERT, UPDATE, DELETE)
write_queries = [
q
for q in ctx.captured_queries
if q["sql"].startswith(("INSERT", "UPDATE", "DELETE"))
]
assert write_queries == []
assert (
MailDomainAccess.objects.filter(user=user, maildomain=domain).count() == 1
)
@mock.patch("core.authentication.backends.get_user_entitlements")
def test_login_resets_cache(self, mock_get):
"""Logging in should call with force_refresh=True, resetting cached data."""
user = factories.UserFactory()
# Simulate first login with domains
mock_get.return_value = {
"can_access": True,
"can_admin_maildomains": ["first.com"],
}
factories.MailDomainFactory(name="first.com")
backend = OIDCAuthenticationBackend()
backend._sync_entitlements(user)
# Simulate second login with different domains
mock_get.return_value = {
"can_access": True,
"can_admin_maildomains": ["second.com"],
}
factories.MailDomainFactory(name="second.com")
backend._sync_entitlements(user)
# Both calls should have force_refresh=True
assert mock_get.call_count == 2
for call in mock_get.call_args_list:
assert call.kwargs.get("force_refresh") is True
# Only second.com should remain
assert not MailDomainAccess.objects.filter(
user=user, maildomain__name="first.com"
).exists()
assert MailDomainAccess.objects.filter(
user=user, maildomain__name="second.com"
).exists()

View File

@@ -37,6 +37,7 @@ from core.api.viewsets.metrics import (
MailDomainUsersMetricsApiView,
)
from core.api.viewsets.placeholder import DraftPlaceholderView, PlaceholderView
from core.api.viewsets.provisioning import ProvisioningMailDomainView
from core.api.viewsets.send import SendMessageView
from core.api.viewsets.task import TaskDetailView
from core.api.viewsets.thread import ThreadViewSet
@@ -241,6 +242,11 @@ urlpatterns = [
MailboxUsageMetricsApiView.as_view(),
name="mailbox-usage-metrics",
),
path(
f"api/{settings.API_VERSION}/provisioning/maildomains/",
ProvisioningMailDomainView.as_view(),
name="provisioning-maildomains",
),
# Alias for MTA check endpoint
path(
f"api/{settings.API_VERSION}/mta/check-recipients/",

View File

@@ -789,6 +789,10 @@ class Base(Configuration):
None, environ_name="METRICS_API_KEY", environ_prefix=None
)
PROVISIONING_API_KEY = values.Value(
None, environ_name="PROVISIONING_API_KEY", environ_prefix=None
)
METRICS_STORAGE_USED_OVERHEAD_BY_MESSAGE = values.PositiveIntegerValue(
1024,
environ_name="METRICS_STORAGE_USED_OVERHEAD_BY_MESSAGE",
@@ -800,6 +804,19 @@ class Base(Configuration):
AI_BASE_URL = values.Value(None, environ_name="AI_BASE_URL", environ_prefix=None)
AI_MODEL = values.Value(None, environ_name="AI_MODEL", environ_prefix=None)
# Entitlements
ENTITLEMENTS_BACKEND = values.Value(
"core.entitlements.backends.local.LocalEntitlementsBackend",
environ_name="ENTITLEMENTS_BACKEND",
environ_prefix=None,
)
ENTITLEMENTS_BACKEND_PARAMETERS = JSONValue(
{}, environ_name="ENTITLEMENTS_BACKEND_PARAMETERS", environ_prefix=None
)
ENTITLEMENTS_CACHE_TIMEOUT = values.PositiveIntegerValue(
300, environ_name="ENTITLEMENTS_CACHE_TIMEOUT", environ_prefix=None
)
# Feature flags
FEATURE_AI_SUMMARY = values.BooleanValue(
default=False, environ_name="FEATURE_AI_SUMMARY", environ_prefix=None
@@ -816,6 +833,14 @@ class Base(Configuration):
FEATURE_MAILBOX_ADMIN_CHANNELS = values.ListValue(
default=[], environ_name="FEATURE_MAILBOX_ADMIN_CHANNELS", environ_prefix=None
)
FEATURE_MAILDOMAIN_CREATE = values.BooleanValue(
default=True, environ_name="FEATURE_MAILDOMAIN_CREATE", environ_prefix=None
)
FEATURE_MAILDOMAIN_MANAGE_ACCESSES = values.BooleanValue(
default=True,
environ_name="FEATURE_MAILDOMAIN_MANAGE_ACCESSES",
environ_prefix=None,
)
# Logging
# We want to make it easy to log to console but by default we log production

View File

@@ -33,6 +33,8 @@ export type ConfigRetrieve200 = {
readonly MAX_TEMPLATE_IMAGE_SIZE: number;
/** Whether external images should be proxied */
readonly IMAGE_PROXY_ENABLED: boolean;
readonly FEATURE_MAILDOMAIN_CREATE: boolean;
readonly FEATURE_MAILDOMAIN_MANAGE_ACCESSES: boolean;
/** Maximum age in seconds for a message to be eligible for manual retry of failed deliveries */
readonly MESSAGES_MANUAL_RETRY_MAX_AGE: number;
};

View File

@@ -1,6 +1,7 @@
import { MailDomainAdminWrite } from "@/features/api/gen";
import { ModalCreateDomain } from "@/features/layouts/components/admin/modal-create-domain";
import useAbility, { Abilities } from "@/hooks/use-ability";
import { FEATURE_KEYS, useFeatureFlag } from "@/hooks/use-feature";
import { Button, useModal } from "@gouvfr-lasuite/cunningham-react";
import { useTranslation } from "react-i18next";
@@ -16,8 +17,9 @@ export const CreateDomainAction = ({ onCreate }: CreateDomainActionProps) => {
const modal = useModal();
const { t } = useTranslation();
const canCreateDomains = useAbility(Abilities.CAN_CREATE_MAILDOMAINS);
const isFeatureEnabled = useFeatureFlag(FEATURE_KEYS.MAILDOMAIN_CREATE);
if (!canCreateDomains) {
if (!canCreateDomains || !isFeatureEnabled) {
return null;
}

View File

@@ -27,6 +27,8 @@ const DEFAULT_CONFIG: AppConfig = {
MAX_RECIPIENTS_PER_MESSAGE: 0,
MAX_TEMPLATE_IMAGE_SIZE: 0,
IMAGE_PROXY_ENABLED: false,
FEATURE_MAILDOMAIN_CREATE: true,
FEATURE_MAILDOMAIN_MANAGE_ACCESSES: true,
DRIVE: DEFAULT_DRIVE_CONFIG,
MESSAGES_MANUAL_RETRY_MAX_AGE: 0,
}

View File

@@ -5,6 +5,8 @@ export enum FEATURE_KEYS {
AI_SUMMARY = 'ai_summary',
AI_AUTOLABELS = 'ai_autolabels',
MAILBOX_ADMIN_CHANNELS = 'mailbox_admin_channels',
MAILDOMAIN_CREATE = 'maildomain_create',
MAILDOMAIN_MANAGE_ACCESSES = 'maildomain_manage_accesses',
}
/**
@@ -26,6 +28,10 @@ export const useFeatureFlag = (featureKey: FEATURE_KEYS) => {
return config.AI_ENABLED === true && config.FEATURE_AI_AUTOLABELS === true;
case FEATURE_KEYS.MAILBOX_ADMIN_CHANNELS:
return Array.isArray(config.FEATURE_MAILBOX_ADMIN_CHANNELS) && config.FEATURE_MAILBOX_ADMIN_CHANNELS.length > 0;
case FEATURE_KEYS.MAILDOMAIN_CREATE:
return config.FEATURE_MAILDOMAIN_CREATE === true;
case FEATURE_KEYS.MAILDOMAIN_MANAGE_ACCESSES:
return config.FEATURE_MAILDOMAIN_MANAGE_ACCESSES === true;
default:
throw new Error(`Unknown feature key: ${featureKey}`);
}

View File

@@ -7,6 +7,7 @@ import { AdminLayout } from "@/features/layouts/components/admin/admin-layout";
import { getMaildomainsListQueryOptions, MailDomainAdmin, MailDomainAdminWrite } from "@/features/api/gen";
import { useAdminMailDomain } from "@/features/providers/admin-maildomain";
import useAbility, { Abilities } from "@/hooks/use-ability";
import { FEATURE_KEYS, useFeatureFlag } from "@/hooks/use-feature";
import { Banner } from "@/features/ui/components/banner";
import { CreateDomainAction } from "@/features/layouts/components/admin/domains-view/create-domain-action";
import { useQueryClient } from "@tanstack/react-query";
@@ -25,7 +26,9 @@ enum MailDomainEditAction {
function AdminDataGrid({ domains, pagination }: AdminDataGridProps) {
const router = useRouter();
const { t, i18n } = useTranslation();
const canManageMaildomainAccesses = useAbility(Abilities.CAN_MANAGE_SOME_MAILDOMAIN_ACCESSES);
const hasManageAbility = useAbility(Abilities.CAN_MANAGE_SOME_MAILDOMAIN_ACCESSES);
const isManageAccessesEnabled = useFeatureFlag(FEATURE_KEYS.MAILDOMAIN_MANAGE_ACCESSES);
const canManageMaildomainAccesses = hasManageAbility && isManageAccessesEnabled;
const [editedDomain, setEditedDomain] = useState<MailDomainAdmin | null>(null);
const [editAction, setEditAction] = useState<MailDomainEditAction | null>(null);
const columns = [
@@ -95,7 +98,9 @@ const AdminPageContent = () => {
const { t } = useTranslation();
const { mailDomains, isLoading, error, pagination } = useAdminMailDomain();
const canCreateMaildomain = useAbility(Abilities.CAN_CREATE_MAILDOMAINS);
const canManageMaildomainAccesses = useAbility(Abilities.CAN_MANAGE_SOME_MAILDOMAIN_ACCESSES);
const hasManageAbility = useAbility(Abilities.CAN_MANAGE_SOME_MAILDOMAIN_ACCESSES);
const isManageAccessesEnabled = useFeatureFlag(FEATURE_KEYS.MAILDOMAIN_MANAGE_ACCESSES);
const canManageMaildomainAccesses = hasManageAbility && isManageAccessesEnabled;
const shouldRedirect = !canCreateMaildomain && !canManageMaildomainAccesses && !isLoading && mailDomains.length === 1;
/**