(channels) add encryption, custom scopes and levels, auditing (#599)

- Scoped API-key channels with per-scope authorization, personal user channels (users/me/channels), encrypted channel secrets, API-key regeneration (API + admin UI), channel last-used tracking, and a raw email submission endpoint.
 - Provisioning mailbox lookup/listing API.
 - Stricter outbound MIME size checks, DKIM signing improvements, and hardened channel/permission validation.
 - Expanded end-to-end and unit coverage for channels, auth/scopes, provisioning, submission, metrics, and admin flows.
This commit is contained in:
Sylvain Zimmer
2026-04-09 00:53:03 +02:00
committed by GitHub
parent d72df6c77d
commit a3ccc7a57b
43 changed files with 6226 additions and 261 deletions

View File

@@ -42,7 +42,7 @@ jobs:
- name: Create msg-imports bucket
run: make import-bucket
- name: Run backend tests
run: make test-back
run: make test-back-parallel
test-front:
runs-on: ubuntu-latest

View File

@@ -21,9 +21,6 @@ LOGGING_LEVEL_LOGGERS_APP=INFO
ENABLE_PROMETHEUS=0
PROMETHEUS_API_KEY=ExamplePrometheusApiKey
# Metrics
METRICS_API_KEY=ExampleMetricsApiKey
# Python
PYTHONPATH=/app

View File

@@ -1,12 +1,14 @@
"""Admin classes and registrations for core app."""
# pylint: disable=too-many-lines
import json
import logging
from django import forms
from django.contrib import admin, messages
from django.contrib.auth import admin as auth_admin
from django.core.files.storage import storages
from django.db.models import Q
from django.db.models import JSONField, Q
from django.http import HttpResponseNotAllowed
from django.shortcuts import redirect
from django.template.response import TemplateResponse
@@ -29,6 +31,33 @@ from .enums import MessageDeliveryStatusChoices
from .forms import IMAPImportForm, MessageImportForm
class PrettyJSONWidget(forms.Textarea):
"""A textarea widget that pretty-prints JSON content."""
def __init__(self, attrs=None):
default_attrs = {"cols": 80, "rows": 20, "style": "font-family: monospace;"}
if attrs:
default_attrs.update(attrs)
super().__init__(attrs=default_attrs)
def format_value(self, value):
if isinstance(value, str):
try:
value = json.dumps(json.loads(value), indent=2, ensure_ascii=False)
except (json.JSONDecodeError, TypeError):
pass
return value
# Apply pretty JSON widget globally to every ModelAdmin (in-house and
# third-party). EncryptedJSONField inherits from TextField, not JSONField,
# so encrypted columns are unaffected.
admin.ModelAdmin.formfield_overrides = {
**admin.ModelAdmin.formfield_overrides,
JSONField: {"widget": PrettyJSONWidget},
}
class RecipientDeliveryStatusFilter(admin.SimpleListFilter):
"""Filter messages by their recipients' delivery status."""
@@ -383,27 +412,110 @@ class MailboxAdmin(admin.ModelAdmin):
class ChannelAdmin(admin.ModelAdmin):
"""Admin class for the Channel model"""
list_display = ("name", "type", "mailbox", "maildomain", "created_at")
list_filter = ("type", "created_at")
list_display = (
"name",
"type",
"scope_level",
"mailbox",
"maildomain",
"user",
"created_at",
)
list_filter = ("type", "scope_level", "created_at")
search_fields = ("name", "type")
readonly_fields = ("created_at", "updated_at")
autocomplete_fields = ("mailbox", "maildomain")
readonly_fields = ("created_at", "updated_at", "last_used_at")
autocomplete_fields = ("mailbox", "maildomain", "user")
change_form_template = "admin/core/channel/change_form.html"
fieldsets = (
(None, {"fields": ("name", "type", "settings")}),
(None, {"fields": ("name", "type", "scope_level", "settings")}),
(
"Target",
{
"fields": ("mailbox", "maildomain"),
"description": "Specify either a mailbox or maildomain, but not both.",
"fields": ("mailbox", "maildomain", "user"),
"description": (
"Bind the channel to exactly the target required by its "
"scope_level: 'global' → none; 'maildomain' → maildomain; "
"'mailbox' → mailbox; 'user' → user. On non-user scopes "
"the user FK is an optional creator audit."
),
},
),
(
"Timestamps",
{"fields": ("created_at", "updated_at"), "classes": ("collapse",)},
{
"fields": ("created_at", "updated_at", "last_used_at"),
"classes": ("collapse",),
},
),
)
def formfield_for_dbfield(self, db_field, request, **kwargs):
"""Constrain ``type`` to known ChannelTypes in the admin form.
The model field is intentionally a free-form CharField (see
``ChannelTypes`` docstring — adding a new type must not require a
migration). The choice constraint is therefore admin-only.
"""
if db_field.name == "type":
# pylint: disable=import-outside-toplevel
from core.enums import ChannelTypes
kwargs["widget"] = forms.Select(
choices=[(t.value, t.value) for t in ChannelTypes]
)
return db_field.formfield(**kwargs)
return super().formfield_for_dbfield(db_field, request, **kwargs)
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path(
"<path:object_id>/regenerate-api-key/",
self.admin_site.admin_view(self.regenerate_api_key_view),
name="core_channel_regenerate_api_key",
),
]
return custom_urls + urls
def regenerate_api_key_view(self, request, object_id):
"""Regenerate the api_key secret on an api_key channel.
Delegates the actual rotation to ``Channel.rotate_api_key`` (single
source of truth, shared with the DRF create + regenerate flows). The
plaintext is rendered ONCE in the response body and never stored in
cookies, session, or the messages framework — closing the window
where a credential could leak through signed-cookie message storage.
"""
# pylint: disable=import-outside-toplevel
from core.enums import ChannelTypes
if request.method != "POST":
return HttpResponseNotAllowed(["POST"])
channel = self.get_object(request, object_id)
if channel is None:
messages.error(request, "Channel not found.")
return redirect("..")
if channel.type != ChannelTypes.API_KEY:
messages.error(
request, "Only api_key channels can have their secret regenerated."
)
return redirect("..")
plaintext = channel.rotate_api_key()
context = {
**self.admin_site.each_context(request),
"opts": self.model._meta, # noqa: SLF001
"original": channel,
"title": "New api_key generated",
"api_key": plaintext,
}
return TemplateResponse(
request, "admin/core/channel/regenerated_api_key.html", context
)
@admin.register(models.MailboxAccess)
class MailboxAccessAdmin(admin.ModelAdmin):

View File

@@ -0,0 +1,85 @@
"""Authentication classes for service-to-service API calls.
Today this module ships a single scheme, ChannelApiKeyAuthentication, which
authenticates a request as an api_key Channel via the X-Channel-Id + X-API-Key
headers. New schemes (mTLS, signed JWT, OIDC client credentials, …) should be
added here as additional BaseAuthentication subclasses that set
``request.auth`` to a Channel instance the same way. The downstream permission
layer (``HasChannelScope``) is scheme-agnostic — it only inspects
``request.auth``.
"""
import hashlib
from secrets import compare_digest
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import ValidationError as DjangoValidationError
from django.utils import timezone
from django.utils.dateparse import parse_datetime
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from core import models
from core.enums import ChannelTypes
class ChannelApiKeyAuthentication(BaseAuthentication):
"""Authenticate as an api_key Channel via X-Channel-Id + X-API-Key.
Client contract:
X-Channel-Id: <uuid> (public, identifies which channel)
X-API-Key: <raw secret> (the shared secret, hashed at rest)
On success ``request.user`` is set to ``AnonymousUser`` (there is no
associated user) and ``request.auth`` is set to the authenticated
``Channel`` instance. Views must read ``request.auth.scope_level``,
``request.auth.mailbox_id`` and ``request.auth.maildomain_id`` to
enforce resource-level bounds on the action they perform.
"""
def authenticate(self, request):
channel_id = request.headers.get("X-Channel-Id")
api_key = request.headers.get("X-API-Key")
# Missing either header → this auth scheme does not apply; let DRF
# try the next class in authentication_classes. Returning None here
# is the documented way to skip.
if not channel_id or not api_key:
return None
try:
channel = models.Channel.objects.select_related(
"mailbox", "maildomain", "user"
).get(pk=channel_id, type=ChannelTypes.API_KEY)
except (models.Channel.DoesNotExist, ValueError, DjangoValidationError) as exc:
# ValueError / ValidationError handle malformed UUIDs.
raise AuthenticationFailed("Invalid channel or API key.") from exc
provided_hash = hashlib.sha256(api_key.encode("utf-8")).hexdigest()
stored_hashes = (channel.encrypted_settings or {}).get("api_key_hashes") or []
# Iterate every stored hash without early exit so the timing is
# constant with respect to *which* slot matched (the total number
# of slots is not secret — there is no hard cap on the array). Any
# match flips the boolean.
matched = False
for stored in stored_hashes:
if isinstance(stored, str) and compare_digest(stored, provided_hash):
matched = True
if not matched:
raise AuthenticationFailed("Invalid channel or API key.")
expires_at_raw = (channel.settings or {}).get("expires_at")
if expires_at_raw:
expires_at = parse_datetime(expires_at_raw)
if expires_at is not None and expires_at < timezone.now():
raise AuthenticationFailed("API key has expired.")
# Throttled update of last_used_at for monitoring (5 min window).
channel.mark_used()
return (AnonymousUser(), channel)
def authenticate_header(self, request):
# DRF uses this as the WWW-Authenticate header on 401 responses.
return "X-API-Key"

View File

@@ -2602,13 +2602,7 @@
}
}
},
"description": "Channel created successfully"
},
"400": {
"description": "Invalid input data"
},
"403": {
"description": "Permission denied"
"description": ""
}
}
}
@@ -2824,6 +2818,60 @@
}
}
},
"/api/v1.0/mailboxes/{mailbox_id}/channels/{id}/regenerate-api-key/": {
"post": {
"operationId": "mailboxes_channels_regenerate_api_key_create",
"description": "Manage integration channels for a mailbox",
"parameters": [
{
"in": "path",
"name": "id",
"schema": {
"type": "string"
},
"required": true
},
{
"in": "path",
"name": "mailbox_id",
"schema": {
"type": "string",
"format": "uuid"
},
"required": true
}
],
"tags": [
"channels"
],
"security": [
{
"cookieAuth": []
}
],
"responses": {
"200": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/RegeneratedApiKeyResponse"
}
}
},
"description": "Returns the freshly generated plaintext api_key. The previous secret is invalidated immediately. The plaintext is shown ONCE and cannot be retrieved later."
},
"400": {
"description": "Channel is not an api_key channel"
},
"403": {
"description": "Permission denied"
},
"404": {
"description": "Channel not found"
}
}
}
},
"/api/v1.0/mailboxes/{mailbox_id}/image-proxy/": {
"get": {
"operationId": "mailboxes_image_proxy_list",
@@ -6007,6 +6055,294 @@
}
}
}
},
"/api/v1.0/users/me/channels/": {
"get": {
"operationId": "users_me_channels_list",
"description": "Manage personal (scope_level=user) integration channels",
"tags": [
"channels"
],
"security": [
{
"cookieAuth": []
}
],
"responses": {
"200": {
"content": {
"application/json": {
"schema": {
"type": "array",
"items": {
"$ref": "#/components/schemas/Channel"
}
}
}
},
"description": ""
}
}
},
"post": {
"operationId": "users_me_channels_create",
"description": "Manage personal (scope_level=user) integration channels",
"tags": [
"channels"
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ChannelRequest"
}
},
"multipart/form-data": {
"schema": {
"$ref": "#/components/schemas/ChannelRequest"
}
}
},
"required": true
},
"security": [
{
"cookieAuth": []
}
],
"responses": {
"201": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/Channel"
}
}
},
"description": ""
}
}
}
},
"/api/v1.0/users/me/channels/{id}/": {
"get": {
"operationId": "users_me_channels_retrieve",
"description": "Manage personal (scope_level=user) integration channels",
"parameters": [
{
"in": "path",
"name": "id",
"schema": {
"type": "string"
},
"required": true
}
],
"tags": [
"channels"
],
"security": [
{
"cookieAuth": []
}
],
"responses": {
"200": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/Channel"
}
}
},
"description": ""
}
}
},
"put": {
"operationId": "users_me_channels_update",
"description": "Manage personal (scope_level=user) integration channels",
"parameters": [
{
"in": "path",
"name": "id",
"schema": {
"type": "string"
},
"required": true
}
],
"tags": [
"channels"
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ChannelRequest"
}
},
"multipart/form-data": {
"schema": {
"$ref": "#/components/schemas/ChannelRequest"
}
}
},
"required": true
},
"security": [
{
"cookieAuth": []
}
],
"responses": {
"200": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/Channel"
}
}
},
"description": "Channel updated successfully"
},
"400": {
"description": "Invalid input data"
},
"403": {
"description": "Permission denied"
},
"404": {
"description": "Channel not found"
}
}
},
"patch": {
"operationId": "users_me_channels_partial_update",
"description": "Manage personal (scope_level=user) integration channels",
"parameters": [
{
"in": "path",
"name": "id",
"schema": {
"type": "string"
},
"required": true
}
],
"tags": [
"channels"
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/PatchedChannelRequest"
}
},
"multipart/form-data": {
"schema": {
"$ref": "#/components/schemas/PatchedChannelRequest"
}
}
}
},
"security": [
{
"cookieAuth": []
}
],
"responses": {
"200": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/Channel"
}
}
},
"description": ""
}
}
},
"delete": {
"operationId": "users_me_channels_destroy",
"description": "Manage personal (scope_level=user) integration channels",
"parameters": [
{
"in": "path",
"name": "id",
"schema": {
"type": "string"
},
"required": true
}
],
"tags": [
"channels"
],
"security": [
{
"cookieAuth": []
}
],
"responses": {
"204": {
"description": "Channel deleted successfully"
},
"403": {
"description": "Permission denied"
},
"404": {
"description": "Channel not found"
}
}
}
},
"/api/v1.0/users/me/channels/{id}/regenerate-api-key/": {
"post": {
"operationId": "users_me_channels_regenerate_api_key_create",
"description": "Manage personal (scope_level=user) integration channels",
"parameters": [
{
"in": "path",
"name": "id",
"schema": {
"type": "string"
},
"required": true
}
],
"tags": [
"channels"
],
"security": [
{
"cookieAuth": []
}
],
"responses": {
"200": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/RegeneratedApiKeyResponse"
}
}
},
"description": "Returns the freshly generated plaintext api_key. The previous secret is invalidated immediately. The plaintext is shown ONCE and cannot be retrieved later."
},
"400": {
"description": "Channel is not an api_key channel"
},
"403": {
"description": "Permission denied"
},
"404": {
"description": "Channel not found"
}
}
}
}
},
"components": {
@@ -6131,6 +6467,14 @@
"description": "Type of channel",
"maxLength": 255
},
"scope_level": {
"allOf": [
{
"$ref": "#/components/schemas/ScopeLevelEnum"
}
],
"readOnly": true
},
"settings": {
"description": "Channel-specific configuration settings"
},
@@ -6148,6 +6492,19 @@
"readOnly": true,
"nullable": true
},
"user": {
"type": "string",
"format": "uuid",
"description": "primary key for the record as UUID",
"readOnly": true,
"nullable": true
},
"last_used_at": {
"type": "string",
"format": "date-time",
"readOnly": true,
"nullable": true
},
"created_at": {
"type": "string",
"format": "date-time",
@@ -6166,10 +6523,14 @@
"required": [
"created_at",
"id",
"last_used_at",
"mailbox",
"maildomain",
"name",
"updated_at"
"scope_level",
"type",
"updated_at",
"user"
]
},
"ChannelRequest": {
@@ -6193,7 +6554,8 @@
}
},
"required": [
"name"
"name",
"type"
]
},
"Contact": {
@@ -7490,6 +7852,11 @@
"type": "boolean",
"readOnly": true
},
"mime_id": {
"type": "string",
"readOnly": true,
"nullable": true
},
"signature": {
"allOf": [
{
@@ -7520,6 +7887,7 @@
"is_sender",
"is_trashed",
"is_unread",
"mime_id",
"parent_id",
"sender",
"sender_user",
@@ -8281,6 +8649,23 @@
"updated_at"
]
},
"RegeneratedApiKeyResponse": {
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "Channel id (also the X-Channel-Id header value)."
},
"api_key": {
"type": "string",
"description": "Freshly generated plaintext api_key. Returned ONCE on regeneration and cannot be retrieved later."
}
},
"required": [
"api_key",
"id"
]
},
"ResetPasswordError": {
"type": "object",
"properties": {
@@ -8325,6 +8710,16 @@
"one_time_password"
]
},
"ScopeLevelEnum": {
"enum": [
"global",
"maildomain",
"mailbox",
"user"
],
"type": "string",
"description": "* `global` - Global\n* `maildomain` - Maildomain\n* `mailbox` - Mailbox\n* `user` - User"
},
"SendMessageRequest": {
"type": "object",
"description": "Serializer for sending messages.",

View File

@@ -1,11 +1,9 @@
"""Permission handlers for the messages core app."""
from secrets import compare_digest
from django.conf import settings
from django.core import exceptions
from rest_framework import permissions
from rest_framework.exceptions import PermissionDenied
from core import enums, models
@@ -458,26 +456,86 @@ class IsMailboxAdmin(permissions.BasePermission):
return is_domain_admin
class HasMetricsApiKey(permissions.BasePermission):
"""Allows access only to users with the metrics API key."""
class HasChannelScope(permissions.BasePermission):
"""Scope-based permission for service calls authenticated as a Channel.
Auth-scheme-agnostic: it only inspects ``request.auth`` (which must be a
``Channel`` set by an authentication class like ``ChannelApiKeyAuthentication``)
and checks that ``required_scope`` is listed in the channel's
``settings["scopes"]``. Additionally, if the required scope is in
``CHANNEL_API_KEY_SCOPES_GLOBAL_ONLY``, the calling channel must itself
be ``scope_level=global`` — this is the second of two enforcement points
for global-only scopes (the first is the serializer at write time).
The view remains responsible for resource-level bounds via
``request.auth.api_key_covers(...)`` — this class does not know which
mailbox or domain the action is about to touch.
Subclasses set ``required_scope`` — use the ``channel_scope()`` factory
below to create one on the fly so ``permission_classes`` can reference it
declaratively.
"""
required_scope: str = ""
def has_permission(self, request, view):
return compare_digest(
request.headers.get("Authorization") or "",
f"Bearer {settings.METRICS_API_KEY}",
)
class HasProvisioningApiKey(permissions.BasePermission):
"""Allows access only to requests bearing the provisioning API key."""
def has_permission(self, request, view):
if not settings.PROVISIONING_API_KEY:
channel = request.auth
if not isinstance(channel, models.Channel):
return False
return compare_digest(
request.headers.get("Authorization") or "",
f"Bearer {settings.PROVISIONING_API_KEY}",
)
if not isinstance(channel.settings, dict):
return False
scopes = channel.settings.get("scopes")
if not isinstance(scopes, list) or not all(isinstance(s, str) for s in scopes):
return False
if self.required_scope not in scopes:
return False
if (
self.required_scope in enums.CHANNEL_API_KEY_SCOPES_GLOBAL_ONLY
and channel.scope_level != enums.ChannelScopeLevel.GLOBAL
):
return False
return True
def channel_scope(required_scope: str) -> type:
"""Return a ``HasChannelScope`` subclass with ``required_scope`` pre-bound.
DRF's ``permission_classes`` expects a list of classes, not instances, so
we synthesize a tiny subclass per scope. Usage:
permission_classes = [channel_scope(ChannelApiKeyScope.MESSAGES_SEND)]
"""
return type(
f"HasChannelScope_{required_scope}",
(HasChannelScope,),
{"required_scope": str(required_scope)},
)
class IsGlobalChannelMixin:
"""Mixin for APIViews that require ``request.auth`` to be a Channel
with ``scope_level=global``. Drop this onto a view *in addition to*
``ChannelApiKeyAuthentication`` + ``channel_scope(...)``.
Two-layer defense in depth: even when the required scope is in
``CHANNEL_API_KEY_SCOPES_GLOBAL_ONLY`` (and ``HasChannelScope`` already
rejects non-global callers), the view itself re-asserts the requirement
so a future scope-set typo can't accidentally weaken the endpoint. The
mixin's ``initial()`` runs after authentication+permission, so by the
time it executes ``request.auth`` is guaranteed to be a Channel.
"""
def initial(self, request, *args, **kwargs):
"""Re-assert scope_level=global after authentication+permission run."""
super().initial(request, *args, **kwargs)
channel = getattr(request, "auth", None)
if (
not isinstance(channel, models.Channel)
or channel.scope_level != enums.ChannelScopeLevel.GLOBAL
):
# Generic message — do not leak the scope_level requirement
# to the caller.
raise PermissionDenied()
class HasThreadEditAccess(IsAuthenticated):

View File

@@ -944,6 +944,7 @@ class MessageSerializer(serializers.ModelSerializer):
"is_trashed",
"is_archived",
"has_attachments",
"mime_id",
"signature",
"stmsg_headers",
]
@@ -1247,9 +1248,7 @@ class MailboxAdminSerializer(serializers.ModelSerializer):
if metadata.get("type") == "personal":
local_part = attrs.get("local_part", "")
denylist = getattr(
settings, "MESSAGES_MAILBOX_LOCALPART_DENYLIST_PERSONAL", []
)
denylist = settings.MESSAGES_MAILBOX_LOCALPART_DENYLIST_PERSONAL
lower_value = local_part.lower()
if any(lower_value == prefix.lower() for prefix in denylist):
raise serializers.ValidationError(
@@ -1542,12 +1541,17 @@ class ImportIMAPSerializer(ImportBaseSerializer):
)
class ChannelSerializer(serializers.ModelSerializer):
class ChannelSerializer(CreateOnlyFieldsMixin, serializers.ModelSerializer):
"""Serialize Channel model."""
# Explicitly mark nullable fields to fix OpenAPI schema
mailbox = serializers.PrimaryKeyRelatedField(read_only=True, allow_null=True)
maildomain = serializers.PrimaryKeyRelatedField(read_only=True, allow_null=True)
user = serializers.PrimaryKeyRelatedField(read_only=True, allow_null=True)
scope_level = serializers.ChoiceField(
choices=enums.ChannelScopeLevel.choices, read_only=True
)
last_used_at = serializers.DateTimeField(read_only=True, allow_null=True)
class Meta:
model = models.Channel
@@ -1555,13 +1559,60 @@ class ChannelSerializer(serializers.ModelSerializer):
"id",
"name",
"type",
"scope_level",
"settings",
"mailbox",
"maildomain",
"user",
"last_used_at",
"created_at",
"updated_at",
]
read_only_fields = ["id", "mailbox", "maildomain", "created_at", "updated_at"]
read_only_fields = [
"id",
"mailbox",
"maildomain",
"user",
"scope_level",
"last_used_at",
"created_at",
"updated_at",
]
# ``type`` is writable on CREATE only — once a channel exists its
# type is frozen. PATCHing type=widget on an api_key channel
# would let a mailbox admin sneak around the create-time scope
# check (the new type wouldn't trip _validate_api_key_scopes) or
# strand encrypted_settings on a row whose type the auth class
# no longer recognizes. Lock it down via CreateOnlyFieldsMixin.
create_only_fields = ["type"]
# Per-type list of settings keys that are server-managed and must
# NEVER appear in caller-supplied ``settings``. Used by
# ``_reject_caller_supplied_encrypted_keys`` below — there is no
# automatic move-from-settings-to-encrypted shim, server-side
# generators write directly to ``encrypted_settings`` instead.
RESERVED_SETTINGS_KEYS = {
enums.ChannelTypes.API_KEY: ["api_key_hashes"],
}
def create(self, validated_data):
# For api_key channels, mint the secret on a transient instance so
# the resulting ``encrypted_settings`` rides through the normal
# ``super().create()`` save path. The plaintext is stashed on the
# saved row for ChannelViewSet.create() to surface exactly once,
# mirroring the ``_generated_password`` pattern at channel.py:68-74.
generated_api_key = None
if validated_data.get("type") == enums.ChannelTypes.API_KEY:
transient = models.Channel(**validated_data)
generated_api_key = transient.rotate_api_key(save=False)
validated_data["encrypted_settings"] = transient.encrypted_settings
instance = super().create(validated_data)
# pylint: disable=protected-access
if generated_api_key is not None:
instance._generated_api_key = generated_api_key # noqa: SLF001
return instance
def validate_settings(self, value):
"""Validate settings, including tags if present."""
@@ -1610,19 +1661,190 @@ class ChannelSerializer(serializers.ModelSerializer):
return value
def _reject_caller_supplied_encrypted_keys(self, attrs):
"""Refuse any request that puts ``RESERVED_SETTINGS_KEYS`` values
in ``settings``.
``encrypted_settings`` itself is not in ``ChannelSerializer.fields``,
so callers can't write it directly. Reserved settings keys (e.g.
``api_key_hashes``) are written by server-side generators and
must never originate from a request body. Without this check, a
mailbox admin could PATCH ``{"settings": {"scopes": [...],
"api_key_hashes": [<chosen>]}}`` and the row's settings JSON
would carry the attacker-chosen value alongside the legitimate
ones — confusing, even when not directly authenticatable.
"""
settings_data = attrs.get("settings")
if not isinstance(settings_data, dict):
return
# Use the resolved type so this works on both CREATE and PATCH.
channel_type = attrs.get("type") or (
self.instance.type if self.instance else None
)
reserved = self.RESERVED_SETTINGS_KEYS.get(channel_type, [])
if any(k in settings_data for k in reserved):
raise serializers.ValidationError({"settings": "Invalid settings."})
def _resolve_type_and_settings(self, attrs):
"""Compute (channel_type, settings_data, should_validate) for a
type-specific validator. The pair handles all three lifecycle paths:
- CREATE: ``type`` and ``settings`` come from ``attrs``.
- PATCH/PUT touching settings (or type): ``settings`` overrides
the instance's existing JSON; ``type`` falls back to the
instance.
- PATCH that doesn't touch ``type`` or ``settings`` (e.g. a
rename): the validator is skipped because the field being
validated isn't in play. ``should_validate`` is False.
This is the central fix for the PATCH-bypass escalation: the old
code returned early when ``attrs.get("type")`` was None, which is
the typical PATCH shape, allowing a mailbox admin to PATCH
``{"settings": {"scopes": ["maildomains:create"]}}`` and have it
slip through unvalidated.
"""
instance = self.instance
channel_type = attrs.get("type") or (instance.type if instance else None)
settings_in_attrs = "settings" in attrs
# If neither type nor settings are being introduced/changed, this
# validator has nothing to look at.
if not settings_in_attrs and "type" not in attrs:
return channel_type, None, False
if settings_in_attrs:
settings_data = attrs.get("settings") or {}
elif instance is not None:
# type was changed but settings weren't — fall back to the
# instance's existing settings so we validate the post-save
# combination.
settings_data = instance.settings or {}
else:
settings_data = {}
return channel_type, settings_data, True
def _validate_api_key_scopes(self, attrs):
"""Validate the ``settings["scopes"]`` list on api_key channels.
Runs on every CREATE and on every PATCH/PUT that touches ``type``
or ``settings``. The validator looks at the EFFECTIVE post-save
state — explicit attrs win, instance state is the fallback —
which is the only way to airtight-block a mailbox admin from
granting themselves a global-only scope via a settings-only PATCH.
- Every value must be a member of ``ChannelApiKeyScope``.
- Global-only scopes (e.g. ``maildomains:create``) can only be
requested when the channel itself has ``scope_level=global``.
Since DRF clients cannot set scope_level, this always rejects
global-only scopes on the mailbox-nested and user-nested paths.
"""
channel_type, settings_data, should_validate = self._resolve_type_and_settings(
attrs
)
if not should_validate or channel_type != enums.ChannelTypes.API_KEY:
return
raw_scopes = settings_data.get("scopes")
if raw_scopes is None:
raise serializers.ValidationError(
{
"settings": "api_key channels require settings.scopes (a list of strings)."
}
)
if not isinstance(raw_scopes, list) or not all(
isinstance(s, str) for s in raw_scopes
):
raise serializers.ValidationError(
{"settings": "settings.scopes must be a list of strings."}
)
valid_values = {choice.value for choice in enums.ChannelApiKeyScope}
unknown = [s for s in raw_scopes if s not in valid_values]
if unknown:
raise serializers.ValidationError(
{"settings": f"Unknown api_key scopes: {unknown}"}
)
# Any DRF path to this serializer is mailbox-nested or user-nested
# today, so the effective scope_level on save will be MAILBOX or
# USER. Global-only scopes are therefore never grantable via DRF.
# If that ever changes, the viewset must still pass scope_level
# explicitly on save.
global_only = enums.CHANNEL_API_KEY_SCOPES_GLOBAL_ONLY
if any(s in global_only for s in raw_scopes):
raise serializers.ValidationError(
{"settings": "One or more requested scopes are not permitted."}
)
def _validate_webhook_settings(self, attrs):
"""Validate required fields on webhook channel settings.
Runs on CREATE and on every PATCH/PUT touching ``type`` or
``settings``. Same airtight rule as ``_validate_api_key_scopes``:
a settings-only PATCH on an existing webhook channel must hit the
URL/events validators, otherwise a mailbox admin could PATCH
``{"settings": {"url": "javascript:..."}}`` past the create-time
check.
"""
channel_type, settings_data, should_validate = self._resolve_type_and_settings(
attrs
)
if not should_validate or channel_type != enums.ChannelTypes.WEBHOOK:
return
url = settings_data.get("url")
if not url or not isinstance(url, str):
raise serializers.ValidationError(
{"settings": "webhook channels require settings.url (a string)."}
)
if not url.startswith(("http://", "https://")):
raise serializers.ValidationError(
{"settings": "webhook settings.url must be http:// or https://"}
)
events = settings_data.get("events")
if not events or not isinstance(events, list):
raise serializers.ValidationError(
{
"settings": "webhook channels require settings.events (a non-empty list)."
}
)
unknown = [e for e in events if e not in enums.WebhookEvents]
if unknown:
raise serializers.ValidationError(
{"settings": f"Unknown webhook events: {unknown}"}
)
def validate(self, attrs):
"""Validate channel data.
When used in the nested mailbox context (via ChannelViewSet),
the mailbox is set from context and doesn't need to be validated here.
When used in the nested mailbox context (via ChannelViewSet) or the
user context (via UserChannelViewSet), the target FK is set by the
viewset and doesn't need to be validated here. Otherwise this is
the Django-admin / management-command path which still requires an
explicit mailbox or maildomain.
"""
# If we have a mailbox in context (from ChannelViewSet), validate channel type
# and skip mailbox/maildomain validation.
# This allows Django admin to create any channel type.
if self.context.get("mailbox"):
if self.context.get("mailbox") or self.context.get("user_channel"):
# On CREATE, ``type`` MUST be supplied explicitly. The model
# field used to default to "mta", which let a caller omit
# ``type`` from the body and bypass FEATURE_MAILBOX_ADMIN_CHANNELS
# even when "mta" was not in the allowlist. On UPDATE, ``type``
# is made read-only by ``CreateOnlyFieldsMixin`` so it's never
# in ``attrs`` — fall through to the other validators which
# read the instance's existing type.
if self.instance is None and "type" not in attrs:
raise serializers.ValidationError(
{
"type": (
"Channel type is required for mailbox/user "
"channel creation."
)
}
)
channel_type = attrs.get("type")
if channel_type:
allowed_types = settings.FEATURE_MAILBOX_ADMIN_CHANNELS
allowed_types = list(settings.FEATURE_MAILBOX_ADMIN_CHANNELS)
if channel_type not in allowed_types:
raise serializers.ValidationError(
{
@@ -1630,6 +1852,9 @@ class ChannelSerializer(serializers.ModelSerializer):
f"Allowed types: {', '.join(allowed_types)}"
}
)
self._reject_caller_supplied_encrypted_keys(attrs)
self._validate_api_key_scopes(attrs)
self._validate_webhook_settings(attrs)
return attrs
mailbox = attrs.get("mailbox")
@@ -1646,6 +1871,9 @@ class ChannelSerializer(serializers.ModelSerializer):
"Cannot specify both mailbox and maildomain."
)
self._reject_caller_supplied_encrypted_keys(attrs)
self._validate_api_key_scopes(attrs)
self._validate_webhook_settings(attrs)
return attrs

View File

@@ -6,11 +6,16 @@ from django.utils.functional import cached_property
from drf_spectacular.utils import (
OpenApiResponse,
extend_schema,
inline_serializer,
)
from rest_framework import mixins, status, viewsets
from rest_framework import serializers as drf_serializers
from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError
from rest_framework.response import Response
from core import models
from core.enums import ChannelScopeLevel, ChannelTypes
from .. import permissions, serializers
@@ -39,10 +44,17 @@ class ChannelViewSet(
return get_object_or_404(models.Mailbox, id=self.kwargs["mailbox_id"])
def get_queryset(self):
"""Get channels for the mailbox the user has admin access to."""
return models.Channel.objects.filter(mailbox=self.mailbox).order_by(
"-created_at"
)
"""Get channels for the mailbox the user has admin access to.
Defense-in-depth: filter explicitly on scope_level=MAILBOX even though
the mailbox FK filter already excludes global/maildomain rows. Any
accidentally force-inserted row with a non-null mailbox_id and a
non-mailbox scope_level would be excluded here.
"""
return models.Channel.objects.filter(
mailbox=self.mailbox,
scope_level=ChannelScopeLevel.MAILBOX,
).order_by("-created_at")
def get_serializer_context(self):
"""Add mailbox to serializer context."""
@@ -61,12 +73,50 @@ class ChannelViewSet(
403: OpenApiResponse(description="Permission denied"),
},
)
def get_save_kwargs(self):
"""Hook for subclasses to inject the scope-level + target FKs.
This base class is mailbox-nested, so it always saves with
scope_level=MAILBOX bound to the URL mailbox. ``user`` is stamped
as the creator audit (the user FK doubles as the target on
scope_level=user channels but is the audit creator everywhere
else). Subclasses (e.g. UserChannelViewSet) override this to bind
to a different scope.
"""
return {
"mailbox": self.mailbox,
"scope_level": ChannelScopeLevel.MAILBOX,
"user": self.request.user,
}
def create(self, request, *args, **kwargs):
"""Create a new channel for the mailbox."""
"""Create a new channel.
Always forces scope_level on save through ``get_save_kwargs``:
non-superadmins cannot create global channels through DRF even if a
validation bug slipped ``scope_level`` past the serializer.
The response includes the row's ``id`` (which is also the value of
the ``X-Channel-Id`` header on subsequent api_key calls), and on
creation only, the freshly generated plaintext secrets — these
cannot be retrieved later.
"""
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save(mailbox=self.mailbox)
return Response(serializer.data, status=status.HTTP_201_CREATED)
instance = serializer.save(**self.get_save_kwargs())
data = serializer.data
# Surface plaintext secrets exactly once on creation. Each generator
# lives on the instance under `_generated_*` (see ChannelSerializer).
# Subsequent GETs never return any of these.
for attr, response_key in (
("_generated_password", "password"),
("_generated_api_key", "api_key"),
):
value = getattr(instance, attr, None)
if value:
data[response_key] = value
return Response(data, status=status.HTTP_201_CREATED)
@extend_schema(
request=serializers.ChannelSerializer,
@@ -101,3 +151,102 @@ class ChannelViewSet(
instance = self.get_object()
self.perform_destroy(instance)
return Response(status=status.HTTP_204_NO_CONTENT)
@extend_schema(
request=None,
responses={
200: OpenApiResponse(
response=inline_serializer(
name="RegeneratedApiKeyResponse",
fields={
"id": drf_serializers.CharField(
help_text="Channel id (also the X-Channel-Id header value).",
),
"api_key": drf_serializers.CharField(
help_text=(
"Freshly generated plaintext api_key. Returned "
"ONCE on regeneration and cannot be retrieved later."
),
),
},
),
description=(
"Returns the freshly generated plaintext api_key. The "
"previous secret is invalidated immediately. The plaintext "
"is shown ONCE and cannot be retrieved later."
),
),
400: OpenApiResponse(description="Channel is not an api_key channel"),
403: OpenApiResponse(description="Permission denied"),
404: OpenApiResponse(description="Channel not found"),
},
)
@action(detail=True, methods=["post"], url_path="regenerate-api-key")
def regenerate_api_key(self, request, *args, **kwargs):
"""Regenerate the api_key on this channel.
Single-active rotation: the new secret REPLACES the old one
immediately, so any client still using the old secret will
start failing on the next call. This is the only rotation
flow exposed via DRF.
Smooth (dual-active) rotation — appending a new hash without
removing the old one so clients can migrate over a window — is
intentionally a superadmin-only feature available via Django admin.
"""
instance = self.get_object()
if instance.type != ChannelTypes.API_KEY:
raise ValidationError(
{"type": "Only api_key channels can have their secret regenerated."}
)
plaintext = instance.rotate_api_key()
return Response(
{"id": str(instance.id), "api_key": plaintext},
status=status.HTTP_200_OK,
)
@extend_schema(
tags=["channels"],
description="Manage personal (scope_level=user) integration channels",
)
# pylint: disable=too-many-ancestors
class UserChannelViewSet(ChannelViewSet):
"""Personal channels owned by the requesting user.
Mounted at ``/api/v1.0/users/me/channels/``. Reuses the base class's
create/update/destroy plumbing — the only differences are the queryset
filter, the permission class, and the save kwargs that bind the row
to ``scope_level=user``.
"""
permission_classes = [permissions.IsAuthenticated]
@cached_property
def mailbox(self):
"""No mailbox in this nesting — explicitly disable the parent's
cached property to make accidental access loud."""
return None
def get_queryset(self):
return models.Channel.objects.filter(
user=self.request.user,
scope_level=ChannelScopeLevel.USER,
).order_by("-created_at")
def get_serializer_context(self):
# Skip the mailbox-context branch in ChannelSerializer.validate so
# the serializer's validation falls back to the user_channel path.
# The user-scope serializer.save() below still hardcodes scope_level
# + user, so a body-supplied user= would be ignored regardless.
context = super(ChannelViewSet, self).get_serializer_context()
context["user_channel"] = True
return context
def get_save_kwargs(self):
return {
"user": self.request.user,
"scope_level": ChannelScopeLevel.USER,
}

View File

@@ -13,7 +13,9 @@ from drf_spectacular.utils import extend_schema
from rest_framework.response import Response
from rest_framework.views import APIView
from core.api.permissions import HasMetricsApiKey
from core.api.authentication import ChannelApiKeyAuthentication
from core.api.permissions import IsGlobalChannelMixin, channel_scope
from core.enums import ChannelApiKeyScope
from core.models import (
Attachment,
Blob,
@@ -33,13 +35,18 @@ ACTIVE_USER_METRICS = {
}
class MailDomainUsersMetricsApiView(APIView):
class MailDomainUsersMetricsApiView(IsGlobalChannelMixin, APIView):
"""
API view to expose MailDomain Users custom metrics
API view to expose MailDomain Users custom metrics. Global-only.
``METRICS_READ`` is in CHANNEL_API_KEY_SCOPES_GLOBAL_ONLY (so the
permission layer rejects non-global api_key channels), and
``IsGlobalChannelMixin`` re-asserts the same invariant in the view as
a second defense-in-depth layer.
"""
permission_classes = [HasMetricsApiKey]
authentication_classes = [] # Disable any authentication
authentication_classes = [ChannelApiKeyAuthentication]
permission_classes = [channel_scope(ChannelApiKeyScope.METRICS_READ)]
@extend_schema(exclude=True)
def get(self, request):
@@ -54,7 +61,6 @@ class MailDomainUsersMetricsApiView(APIView):
metrics = defaultdict(lambda: {"metrics": {}})
for metric, threshold in ACTIVE_USER_METRICS.items():
# Build the base queryset
queryset = MailboxAccess.objects.select_related(
"mailbox", "mailbox__domain"
)
@@ -195,13 +201,14 @@ class MailDomainUsersMetricsApiView(APIView):
return Response({"count": len(metrics), "results": list(metrics.values())})
class MailboxUsageMetricsApiView(APIView):
class MailboxUsageMetricsApiView(IsGlobalChannelMixin, APIView):
"""
API view to expose per-mailbox storage usage metrics.
API view to expose per-mailbox storage usage metrics. Global-only —
see ``MailDomainUsersMetricsApiView`` for the rationale.
"""
permission_classes = [HasMetricsApiKey]
authentication_classes = [] # Disable any authentication
authentication_classes = [ChannelApiKeyAuthentication]
permission_classes = [channel_scope(ChannelApiKeyScope.METRICS_READ)]
@extend_schema(exclude=True)
def get(self, request):

View File

@@ -1,4 +1,4 @@
"""API view for provisioning mail domains from DeployCenter."""
"""Provisioning API views (service-to-service, API key auth)."""
import logging
@@ -11,18 +11,23 @@ from rest_framework.response import Response
from rest_framework.views import APIView
from sentry_sdk import capture_exception
from core.api.permissions import HasProvisioningApiKey
from core.api.serializers import ProvisioningMailDomainSerializer
from core.models import MailDomain
from core import models
from core.api.authentication import ChannelApiKeyAuthentication
from core.api.permissions import IsGlobalChannelMixin, channel_scope
from core.api.serializers import (
MailboxLightSerializer,
ProvisioningMailDomainSerializer,
)
from core.enums import ChannelApiKeyScope, MailboxRoleChoices
logger = logging.getLogger(__name__)
class ProvisioningMailDomainView(APIView):
"""Provision mail domains from DeployCenter webhooks."""
class ProvisioningMailDomainView(IsGlobalChannelMixin, APIView):
"""Provision mail domains from DeployCenter webhooks. Global-only."""
permission_classes = [HasProvisioningApiKey]
authentication_classes = []
authentication_classes = [ChannelApiKeyAuthentication]
permission_classes = [channel_scope(ChannelApiKeyScope.MAILDOMAINS_CREATE)]
@extend_schema(exclude=True)
def post(self, request):
@@ -41,7 +46,7 @@ class ProvisioningMailDomainView(APIView):
for domain_name in domains:
try:
domain, was_created = MailDomain.objects.get_or_create(
domain, was_created = models.MailDomain.objects.get_or_create(
name=domain_name,
defaults={
"custom_attributes": custom_attributes,
@@ -87,3 +92,105 @@ class ProvisioningMailDomainView(APIView):
},
status=status.HTTP_200_OK,
)
def _serialize_mailbox_with_users(
mailbox, role=None, maildomain_custom_attributes=None
):
"""Serialize a mailbox with all its users and their roles."""
data = MailboxLightSerializer(mailbox).data
if role is not None:
data["role"] = role
data["users"] = [
{
"email": access.user.email,
"role": MailboxRoleChoices(access.role).label,
}
for access in mailbox.accesses.select_related("user").all()
]
if maildomain_custom_attributes:
domain_attrs = mailbox.domain.custom_attributes or {}
data["maildomain_custom_attributes"] = {
key: domain_attrs.get(key) for key in maildomain_custom_attributes
}
return data
class ProvisioningMailboxView(IsGlobalChannelMixin, APIView):
"""List mailboxes for a user or look up a mailbox by email.
Each mailbox includes a ``users`` array with all users who have
access and their roles, so callers can sync shares in one request.
GET /api/v1.0/provisioning/mailboxes/?user_email=...
GET /api/v1.0/provisioning/mailboxes/?email=...
**Global-scope api_key channels only.** This endpoint is intentionally
not exposed to maildomain- or mailbox-scope keys, even though such a key
could only see a subset of the data — the threat model is that any
leak of a credential able to enumerate mailboxes is treated as a
privileged event, and only ops/CI keys should be able to do it. The
``IsGlobalChannelMixin`` enforces that.
"""
authentication_classes = [ChannelApiKeyAuthentication]
permission_classes = [channel_scope(ChannelApiKeyScope.MAILBOXES_READ)]
@extend_schema(exclude=True)
def get(self, request):
"""Return mailboxes filtered by user_email or email query parameter."""
user_email = request.query_params.get("user_email")
email = request.query_params.get("email")
# Optional: include specific keys from MailDomain.custom_attributes
# e.g. ?add_maildomain_custom_attributes=siret,org_name
raw = request.query_params.get("add_maildomain_custom_attributes", "")
maildomain_attrs = [k.strip() for k in raw.split(",") if k.strip()] or None
if user_email:
return self._list_by_user(user_email, maildomain_attrs)
if email:
return self._list_by_email(email, maildomain_attrs)
return Response(
{"detail": "Provide either 'user_email' or 'email' query parameter."},
status=status.HTTP_400_BAD_REQUEST,
)
def _list_by_user(self, user_email, maildomain_attrs=None):
accesses = (
models.MailboxAccess.objects.filter(user__email=user_email)
.select_related("mailbox__domain", "mailbox__contact")
.prefetch_related("mailbox__accesses__user")
)
results = [
_serialize_mailbox_with_users(
access.mailbox,
role=MailboxRoleChoices(access.role).label,
maildomain_custom_attributes=maildomain_attrs,
)
for access in accesses
]
return Response({"results": results})
def _list_by_email(self, email, maildomain_attrs=None):
if "@" not in email:
return Response({"results": []})
local_part, domain_name = email.rsplit("@", 1)
mailboxes = (
models.Mailbox.objects.filter(
local_part=local_part, domain__name=domain_name
)
.select_related("domain", "contact")
.prefetch_related("accesses__user")
)
results = [
_serialize_mailbox_with_users(
mailbox, maildomain_custom_attributes=maildomain_attrs
)
for mailbox in mailboxes
]
return Response({"results": results})

View File

@@ -0,0 +1,196 @@
"""Generic outbound email submission endpoint.
POST /api/v1.0/submit/
Accepts a raw RFC 5322 message and sends it from a mailbox.
Creates a Message via the inbound pipeline (with ``is_outbound=True``),
then runs ``prepare_outbound_message`` synchronously (DKIM signing, blob
creation) and dispatches SMTP delivery asynchronously via Celery.
"""
import logging
from django.core.exceptions import ValidationError as DjangoValidationError
from drf_spectacular.utils import extend_schema
from rest_framework import status
from rest_framework.exceptions import PermissionDenied
from rest_framework.response import Response
from rest_framework.views import APIView
from core import models
from core.api.authentication import ChannelApiKeyAuthentication
from core.api.permissions import channel_scope
from core.enums import MAILBOX_ROLES_CAN_SEND, ChannelApiKeyScope
from core.mda.inbound_create import _create_message_from_inbound
from core.mda.outbound import prepare_outbound_message
from core.mda.outbound_tasks import send_message_task
from core.mda.rfc5322 import EmailParseError, parse_email_message
logger = logging.getLogger(__name__)
class SubmitRawEmailView(APIView):
"""Submit a pre-composed RFC 5322 email for delivery from a mailbox.
POST /api/v1.0/submit/
Content-Type: message/rfc822
Headers:
X-Channel-Id: <channel uuid> (api_key channel with messages:send scope)
X-API-Key: <raw secret>
X-Mail-From: <mailbox uuid> (UUID of the sending mailbox)
X-Rcpt-To: <addr>[,<addr>] (comma-separated recipient addresses)
The endpoint creates a Message record, DKIM-signs the raw MIME
synchronously, and dispatches SMTP delivery via Celery.
Returns: ``{"message_id": "<…>", "status": "accepted"}`` (HTTP 202).
"""
authentication_classes = [ChannelApiKeyAuthentication]
permission_classes = [channel_scope(ChannelApiKeyScope.MESSAGES_SEND)]
@extend_schema(exclude=True)
def post(self, request):
"""Accept a raw MIME message, create a Message, sign, and dispatch."""
mailbox_id = request.META.get("HTTP_X_MAIL_FROM")
rcpt_to_header = request.META.get("HTTP_X_RCPT_TO")
if not mailbox_id or not rcpt_to_header:
return Response(
{"detail": "Missing required headers: X-Mail-From, X-Rcpt-To."},
status=status.HTTP_400_BAD_REQUEST,
)
# Resolve mailbox
try:
mailbox = models.Mailbox.objects.select_related("domain").get(id=mailbox_id)
except (models.Mailbox.DoesNotExist, ValueError, DjangoValidationError):
return Response(
{"detail": "Mailbox not found."},
status=status.HTTP_404_NOT_FOUND,
)
# Enforce the api_key channel's resource scope. A scope_level=mailbox
# credential can only send as that mailbox; a scope_level=maildomain
# credential only within that domain; a global credential is
# unrestricted. For scope_level=user we additionally require the
# target user to have a SENDER-or-better role on the mailbox via
# MailboxAccess — without this, a viewer-only user could mint a
# personal api_key and submit messages.
if not request.auth.api_key_covers(
mailbox=mailbox, mailbox_roles=MAILBOX_ROLES_CAN_SEND
):
raise PermissionDenied("API key is not authorized to send as this mailbox.")
# Parse envelope recipients. _create_message_from_inbound creates
# MessageRecipient rows from MIME To/Cc headers; any address that
# appears only in X-Rcpt-To (not in MIME headers) is added as BCC
# after message creation — this is how SMTP BCC works.
recipient_emails = [
addr.strip() for addr in rcpt_to_header.split(",") if addr.strip()
]
if not recipient_emails:
return Response(
{"detail": "X-Rcpt-To header is empty."},
status=status.HTTP_400_BAD_REQUEST,
)
raw_mime = request.body
if not raw_mime:
return Response(
{"detail": "Empty request body."},
status=status.HTTP_400_BAD_REQUEST,
)
# Parse to validate structure
try:
parsed = parse_email_message(raw_mime)
except EmailParseError:
return Response(
{"detail": "Failed to parse email message."},
status=status.HTTP_400_BAD_REQUEST,
)
# Validate sender matches the mailbox
sender_email = (parsed.get("from") or {}).get("email", "")
mailbox_email = str(mailbox)
if sender_email.lower() != mailbox_email.lower():
return Response(
{
"detail": (
f"From header '{sender_email}' does not match"
f" mailbox '{mailbox_email}'."
)
},
status=status.HTTP_403_FORBIDDEN,
)
# Create thread, contacts, message, and recipients from the parsed email.
# is_outbound=True skips blob creation (handled by prepare_outbound_message
# with DKIM) and AI features.
message = _create_message_from_inbound(
recipient_email=mailbox_email,
parsed_email=parsed,
raw_data=raw_mime,
mailbox=mailbox,
is_outbound=True,
)
if not message:
return Response(
{"detail": "Failed to create message."},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# Add envelope-only recipients as BCC. _create_message_from_inbound
# creates MessageRecipient rows from the MIME To/Cc/Bcc headers, but
# true BCC recipients appear only in the envelope (X-Rcpt-To), never
# in the MIME headers — that's how BCC works in SMTP.
mime_recipients = {
e.lower()
for e in message.recipients.values_list("contact__email", flat=True)
}
for addr in recipient_emails:
if addr.lower() not in mime_recipients:
try:
contact, _ = models.Contact.objects.get_or_create(
email=addr,
mailbox=mailbox,
defaults={"name": addr.split("@")[0]},
)
models.MessageRecipient.objects.get_or_create(
message=message,
contact=contact,
type=models.MessageRecipientTypeChoices.BCC,
)
except Exception: # pylint: disable=broad-exception-caught
logger.warning("Failed to add BCC recipient (masked)")
# Synchronous: validate recipients, throttle, DKIM sign, create blob.
# This is a one-shot API — clean up on any failure so no orphan
# draft remains.
try:
prepared = prepare_outbound_message(
mailbox,
message,
"",
"",
raw_mime=raw_mime,
)
except Exception:
message.delete()
raise
if not prepared:
message.delete()
return Response(
{"detail": "Failed to prepare message for sending."},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# Dispatch async SMTP delivery
send_message_task.delay(str(message.id))
return Response(
{"message_id": str(message.id), "status": "accepted"},
status=status.HTTP_202_ACCEPTED,
)

View File

@@ -1,7 +1,11 @@
"""Messages Core application"""
import logging
from django.apps import AppConfig
logger = logging.getLogger(__name__)
class CoreConfig(AppConfig):
"""Configuration class for the messages core app."""
@@ -26,3 +30,11 @@ class CoreConfig(AppConfig):
# Import signal handlers to register them
# pylint: disable=unused-import, import-outside-toplevel
import core.signals # noqa
# Deprecation warning for legacy static API keys.
for deprecated in ("METRICS_API_KEY", "PROVISIONING_API_KEY"):
if getattr(settings, deprecated, None):
logger.warning(
"%s is set but deprecated and ignored. Migrate to a channel.",
deprecated,
)

View File

@@ -2,6 +2,8 @@
Core application enums declaration
"""
from enum import StrEnum
from django.conf import global_settings
from django.db import models
@@ -141,6 +143,119 @@ class ThreadEventTypeChoices(models.TextChoices):
IM = "im", "Instant message"
class ChannelScopeLevel(models.TextChoices):
"""Scope level for a Channel: which resource the channel is bound to.
- GLOBAL: instance-wide, no target. Creatable only via Django admin or CLI.
- MAILDOMAIN: bound to one MailDomain; actions limited to that domain.
- MAILBOX: bound to one Mailbox; actions limited to that mailbox.
- USER: personal channel bound to a User; actions limited to mailboxes
the user has MailboxAccess to.
"""
GLOBAL = "global", "Global"
MAILDOMAIN = "maildomain", "Maildomain"
MAILBOX = "mailbox", "Mailbox"
USER = "user", "User"
class ChannelTypes(StrEnum):
"""Known Channel.type values.
``StrEnum`` (not a Django ``TextChoices``): Channel.type is intentionally
a free-form CharField so adding a new type never requires a migration.
Members ARE strings (``ChannelTypes.MTA == "mta"``) so comparisons,
dict keys and ORM filters work transparently.
"""
MTA = "mta"
WIDGET = "widget"
API_KEY = "api_key"
WEBHOOK = "webhook"
class WebhookEvents(StrEnum):
"""Known webhook event identifiers.
Stored as strings in ``Channel.settings["events"]``; validated by the
serializer at write time. Adding a new event is a Python-only change.
"""
MESSAGE_RECEIVED = "message.received"
MESSAGE_SENT = "message.sent"
class ChannelApiKeyScope(models.TextChoices):
"""Capability scopes granted to an api_key Channel.
Stored as a list of string values in Channel.settings["scopes"] and
enforced by the serializer + HasChannelScope permission at the API layer.
Adding a new scope is a Python-only change (no DB choices, no migration).
A credential's blast radius for any scope is automatically bounded by its
channel's scope_level + target FK: a scope_level=mailbox api_key can only
act on that mailbox, regardless of which scopes it holds.
WRITE vs CREATE distinction: ``*_WRITE`` scopes modify an object the
channel already has resource-scope access to (e.g. archiving a thread in
a mailbox-scope channel's mailbox). ``*_CREATE`` scopes mint a brand-new
top-level resource, which is an escalation — these are global-only and
listed in ``CHANNEL_API_KEY_SCOPES_GLOBAL_ONLY``. Most resources only
need WRITE because their "create" never escalates; only mailboxes and
maildomains have a meaningful _CREATE counterpart.
**Only scopes wired to a real endpoint live in this enum.** Adding a new
scope is a one-line change here once the endpoint exists. Forward-looking
scopes are sketched in the comment block below for design reference but
intentionally not enabled — having them in the enum without an enforcing
endpoint is dead surface area an attacker could probe.
"""
METRICS_READ = "metrics:read", "Read usage metrics"
MAILBOXES_READ = "mailboxes:read", "Read mailboxes (and their users/roles)"
MESSAGES_SEND = "messages:send", "Send outbound messages"
MAILDOMAINS_CREATE = "maildomains:create", "Create new maildomains"
# Forward-looking scopes — DO NOT uncomment without a real endpoint
# enforcing them. Listed here so the planned vocabulary is visible at a
# glance and so the WRITE/CREATE convention is documented.
#
# Reads:
# MAILDOMAINS_READ = "maildomains:read", "Read maildomains"
# USERS_READ = "users:read", "Read users"
# LABELS_READ = "labels:read", "Read labels"
# CONTACTS_READ = "contacts:read", "Read contacts"
# THREADS_READ = "threads:read", "Read thread metadata"
# MESSAGES_READ = "messages:read", "Read message metadata"
# MESSAGES_READ_BODY = "messages:read.body", "Read message bodies"
# ATTACHMENTS_READ = "attachments:read", "Read attachments"
# BLOBS_READ = "blobs:read", "Read raw MIME blobs"
#
# Writes (update an object the channel already has access to):
# MESSAGES_WRITE = "messages:write", "Create/modify drafts"
# THREADS_WRITE = "threads:write", "Archive/star/label"
# LABELS_WRITE = "labels:write", "Create/modify labels"
# CONTACTS_WRITE = "contacts:write", "Create/modify contacts"
# MAILBOXES_WRITE = "mailboxes:write", "Update existing mailboxes"
# MAILDOMAINS_WRITE = "maildomains:write", "Update existing maildomains"
#
# Creates (mint a brand-new top-level object — global-only):
# MAILBOXES_CREATE = "mailboxes:create", "Create new mailboxes"
# Scopes that can only be granted to / used by a scope_level=global Channel.
# Two enforcement points use this set:
# - the serializer (write time) rejects non-global channels asking for these
# - HasChannelScope (request time) rejects requests where the calling
# channel is not global but a global-only scope is required
CHANNEL_API_KEY_SCOPES_GLOBAL_ONLY = frozenset(
{
ChannelApiKeyScope.METRICS_READ.value,
ChannelApiKeyScope.MAILDOMAINS_CREATE.value,
}
)
class MessageTemplateTypeChoices(models.IntegerChoices):
"""Defines the possible types of message templates."""

View File

@@ -3,6 +3,7 @@ Core application factories
"""
import json
import uuid
from django.conf import settings
from django.contrib.auth.hashers import make_password
@@ -258,7 +259,15 @@ class AttachmentFactory(factory.django.DjangoModelFactory):
class ChannelFactory(factory.django.DjangoModelFactory):
"""A factory to create channels for testing purposes."""
"""A factory to create channels for testing purposes.
``scope_level`` is derived from the FKs by default:
- mailbox set → "mailbox" (default)
- maildomain set → "maildomain"
- user set → "user"
- none of the above → "global"
Callers can always override ``scope_level=`` explicitly.
"""
class Meta:
model = models.Channel
@@ -267,6 +276,53 @@ class ChannelFactory(factory.django.DjangoModelFactory):
type = factory.fuzzy.FuzzyChoice(["widget", "mta"])
settings = factory.Dict({"config": {"enabled": True}})
mailbox = factory.SubFactory(MailboxFactory)
maildomain = None
user = None
scope_level = factory.LazyAttribute(
lambda o: (
"mailbox"
if o.mailbox is not None
else "maildomain"
if o.maildomain is not None
else "user"
if o.user is not None
else "global"
)
)
def make_api_key_channel(
*,
scope_level=enums.ChannelScopeLevel.GLOBAL,
scopes=(),
mailbox=None,
maildomain=None,
user=None,
name=None,
extra_settings=None,
):
"""Create an api_key Channel and return ``(channel, plaintext)``.
Single source of truth for tests that need a working api_key. Mints
the secret via ``Channel.rotate_api_key`` so the production helper is
exercised on the same code path the DRF/admin flows use, and so the
plaintext prefix stays consistent across production and tests.
Returned plaintext is the only chance to capture it — the row's
``encrypted_settings.api_key_hashes`` only stores the SHA-256 digest.
"""
channel = models.Channel(
name=name or f"test-{uuid.uuid4().hex[:6]}",
type=enums.ChannelTypes.API_KEY,
scope_level=scope_level,
mailbox=mailbox,
maildomain=maildomain,
user=user,
settings={"scopes": list(scopes), **(extra_settings or {})},
)
plaintext = channel.rotate_api_key(save=False)
channel.save()
return channel, plaintext
class BlobFactory(factory.django.DjangoModelFactory):

View File

@@ -208,7 +208,7 @@ def deliver_inbound_message(
try_send_autoreply(mailbox, parsed_email, result)
return result is not None
return bool(result)
# Regular messages: queue for spam processing
try:

View File

@@ -179,7 +179,7 @@ def _find_thread_by_message_ids(
return None
def _create_message_from_inbound(
def _create_message_from_inbound( # pylint: disable=too-many-arguments
recipient_email: str,
parsed_email: Dict[str, Any],
raw_data: bytes,
@@ -190,14 +190,21 @@ def _create_message_from_inbound(
imap_flags: Optional[List[str]] = None,
channel: Optional[models.Channel] = None,
is_spam: bool = False,
is_outbound: bool = False,
) -> Optional[models.Message]:
"""Create a message and thread from inbound message data.
"""Create a message and thread from parsed email data.
Used for inbound delivery, imports, and outbound submission.
Returns the created Message on success, or None on failure.
Callers that only need a boolean can check truthiness of the return value.
When ``is_outbound`` is True:
- ``is_sender`` is forced to True
- No blob is created (the caller handles DKIM signing + blob via prepare_outbound_message)
- AI features (summary, auto-labels) are skipped
- The message is created as a draft (finalized later by prepare_outbound_message)
Warning: messages imported here could be is_sender=True.
This method continues the logic of deliver_inbound_message, potentially asynchronously.
TODO: continue splitting this into smaller methods.
"""
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
message_flags = {}
@@ -329,18 +336,24 @@ def _create_message_from_inbound(
mime_id=parsed_email.get("in_reply_to"), thread=thread
).first()
blob = mailbox.create_blob(
content=raw_data,
content_type="message/rfc822",
)
# Outbound: no blob yet — prepare_outbound_message handles
# DKIM signing and blob creation later.
blob = None
if not is_outbound:
blob = mailbox.create_blob(
content=raw_data,
content_type="message/rfc822",
)
# Truncate subject to 255 characters if it exceeds max_length
subject = parsed_email.get("subject")
if subject and len(subject) > 255:
subject = subject[:255]
is_sender = (is_import and is_import_sender) or (
sender_email == recipient_email
is_sender = (
is_outbound
or (is_import and is_import_sender)
or (sender_email == recipient_email)
)
message = models.Message.objects.create(
@@ -351,8 +364,10 @@ def _create_message_from_inbound(
mime_id=parsed_email.get("messageId", parsed_email.get("message_id"))
or None,
parent=parent_message,
sent_at=parsed_email.get("date") or timezone.now(),
is_draft=False,
sent_at=(
None if is_outbound else (parsed_email.get("date") or timezone.now())
),
is_draft=is_outbound, # Outbound: draft until prepare_outbound_message finalizes
is_sender=is_sender,
is_trashed=False,
is_spam=is_spam,
@@ -511,8 +526,8 @@ def _create_message_from_inbound(
thread.snippet = new_snippet
thread.save(update_fields=["snippet"])
# Do not trigger AI features on import or spam
if not is_import and not is_spam:
# Do not trigger AI features on import, spam, or outbound
if not is_import and not is_spam and not is_outbound:
# Update summary if needed is ai is enabled
if is_ai_summary_enabled():
messages = get_messages_from_thread(thread)

View File

@@ -46,6 +46,38 @@ RETRY_INTERVALS = [
]
def validate_mime_size(mime_size: int, message_id: str) -> None:
"""Raise a ValidationError if *mime_size* exceeds the outgoing MIME limit."""
max_total_size = settings.MAX_OUTGOING_BODY_SIZE + (
settings.MAX_OUTGOING_ATTACHMENT_SIZE * 1.4
)
if mime_size > max_total_size:
mime_mb = mime_size / (1024 * 1024)
max_mb = max_total_size / (1024 * 1024)
logger.error(
"MIME for message %s exceeds size limit: %d bytes (%.1f MB) > %d bytes (%.0f MB)",
message_id,
mime_size,
mime_mb,
max_total_size,
max_mb,
)
raise drf.exceptions.ValidationError(
{
"message": (
"The email (%(mime_size)s MB) exceeds the maximum allowed "
"size of %(max_size)s MB."
)
% {
"mime_size": f"{mime_mb:.1f}",
"max_size": f"{max_mb:.0f}",
}
}
)
def validate_attachments_size(total_size: int, message_id: str) -> None:
"""Raise a ValidationError if *total_size* exceeds the outgoing limit."""
if total_size > settings.MAX_OUTGOING_ATTACHMENT_SIZE:
@@ -240,8 +272,14 @@ def prepare_outbound_message(
text_body: str,
html_body: str,
user: Optional[models.User] = None,
raw_mime: Optional[bytes] = None,
) -> bool:
"""Compose and sign an existing draft Message object before sending via SMTP.
"""Prepare a Message for outbound delivery: compose (or accept raw) MIME,
sign with DKIM, create a blob, and mark the message as non-draft.
When ``raw_mime`` is provided (e.g. from a raw MIME submission),
the MIME composition step is skipped and the raw bytes are used directly.
Validation, throttling, DKIM signing, and blob creation still apply.
This part is called synchronously from the API view.
"""
@@ -268,6 +306,16 @@ def prepare_outbound_message(
message=message,
)
if raw_mime is not None:
# Raw MIME path: the caller already composed the MIME.
validate_mime_size(len(raw_mime), message.id)
message.sender_user = user
message.blob = _sign_and_store_blob(mailbox_sender, raw_mime)
_finalize_sent_message(mailbox_sender, message)
return True
# --- Web/API path: compose MIME from text/html body --- #
# TODO: Fetch MIME IDs of "references" from the thread
# references = message.thread.messages.exclude(id=message.id).order_by("-created_at").all()
@@ -322,64 +370,22 @@ def prepare_outbound_message(
logger.error("Failed to compose MIME for message %s: %s", message.id, e)
return False
# Validate the composed MIME size
mime_size = message.blob.size
max_total_size = settings.MAX_OUTGOING_BODY_SIZE + (
settings.MAX_OUTGOING_ATTACHMENT_SIZE * 1.4
)
if mime_size > max_total_size:
mime_mb = mime_size / (1024 * 1024)
max_mb = max_total_size / (1024 * 1024)
logger.error(
"Composed MIME for message %s exceeds size limit: %d bytes (%.1f MB) > %d bytes (%.0f MB)",
message.id,
mime_size,
mime_mb,
max_total_size,
max_mb,
)
raise drf.exceptions.ValidationError(
{
"message": (
"The composed email (%(mime_size)s MB) exceeds the maximum allowed size of %(max_size)s MB. "
"Please reduce message content or attachments."
)
% {
"mime_size": f"{mime_mb:.1f}",
"max_size": f"{max_mb:.0f}",
}
}
)
# compose_and_store_mime already DKIM-signed and stored the blob.
# Validate the final size — clean up the blob if it's too large.
try:
validate_mime_size(message.blob.size, message.id)
except drf.exceptions.ValidationError:
message.blob.delete()
raise
draft_blob = message.draft_blob
message.is_draft = False
message.sender_user = user
message.draft_blob = None
message.created_at = timezone.now()
message.updated_at = timezone.now()
message.save(
update_fields=[
"updated_at",
"blob",
"mime_id",
"is_draft",
"sender_user",
"draft_blob",
"has_attachments",
"created_at",
]
# has_attachments is already set by compose_and_store_mime (includes
# inline signature images), so we do not overwrite it here.
_finalize_sent_message(
mailbox_sender, message, extra_update_fields=("mime_id", "has_attachments")
)
# Mark the thread as read for the sender — they've obviously seen
# their own message, so read_at must be >= messaged_at.
models.ThreadAccess.objects.filter(
thread=message.thread,
mailbox=mailbox_sender,
).update(read_at=message.created_at)
message.thread.update_stats()
# Clean up the draft blob and the attachment blobs
if draft_blob:
@@ -392,6 +398,56 @@ def prepare_outbound_message(
return True
def _sign_and_store_blob(
mailbox_sender: models.Mailbox, raw_mime: bytes
) -> models.Blob:
"""DKIM-sign raw MIME bytes and persist them as a blob on the mailbox."""
dkim_signature_header: Optional[bytes] = sign_message_dkim(
raw_mime_message=raw_mime, maildomain=mailbox_sender.domain
)
raw_mime_signed = raw_mime
if dkim_signature_header:
raw_mime_signed = dkim_signature_header + b"\r\n" + raw_mime
return mailbox_sender.create_blob(
content=raw_mime_signed,
content_type="message/rfc822",
)
def _finalize_sent_message(
mailbox_sender: models.Mailbox,
message: models.Message,
extra_update_fields: tuple = (),
) -> None:
"""Finalize an outbound message once its blob is attached: clear draft
state, stamp timestamps, save, mark the thread as read for the sender,
and refresh thread stats."""
message.is_draft = False
message.draft_blob = None
message.created_at = timezone.now()
message.updated_at = timezone.now()
update_fields = [
"updated_at",
"blob",
"is_draft",
"sender_user",
"draft_blob",
"created_at",
*extra_update_fields,
]
message.save(update_fields=update_fields)
models.ThreadAccess.objects.filter(
thread=message.thread,
mailbox=mailbox_sender,
).update(read_at=message.created_at)
message.thread.update_stats()
def send_message(message: models.Message, force_mta_out: bool = False):
"""Send an existing Message, internally or externally.

View File

@@ -0,0 +1,192 @@
"""Channel: encrypted_settings + scope_level + user + last_used_at.
Single schema change for this PR.
- Adds ``encrypted_settings`` (EncryptedJSONField) for per-type secrets.
- Adds ``user`` (FK to User, SET_NULL). Set on scope_level=user channels —
the User the channel is bound to. Otherwise an optional creator-audit
pointer (the user who created the row via DRF). May be NULL on any
scope. Personal channels are explicitly deleted by a pre_delete signal
in core.signals before the User is removed; the FK is SET_NULL rather
than CASCADE so a future constraint relaxation cannot silently sweep up
unrelated channels.
- Adds ``scope_level`` (global / maildomain / mailbox / user) with a backfill
from the existing mailbox/maildomain FKs.
- Adds ``last_used_at`` for operational metadata.
- Drops the ``type`` field's ``default="mta"``: every supported caller
(DRF serializer, factories, admin) now passes ``type`` explicitly, and
the implicit default was a bypass for FEATURE_MAILBOX_ADMIN_CHANNELS.
- Replaces the old ``channel_has_target`` XOR constraint with a
scope-level-driven ``channel_scope_level_targets`` check.
"""
import django.db.models.deletion
import encrypted_fields.fields
from django.conf import settings
from django.db import migrations, models
def backfill_scope_level(apps, schema_editor):
"""Set scope_level on every existing Channel row based on its FKs.
All existing rows have exactly one of mailbox/maildomain populated (the
old XOR constraint guaranteed that), so scope_level is derived
unambiguously.
"""
Channel = apps.get_model("core", "Channel")
Channel.objects.filter(mailbox__isnull=False).update(scope_level="mailbox")
Channel.objects.filter(maildomain__isnull=False).update(scope_level="maildomain")
def noop_reverse(apps, schema_editor):
"""No reverse — scope_level is dropped by AddField reversal."""
class Migration(migrations.Migration):
dependencies = [
("core", "0023_threadevent"),
]
operations = [
# -- drop the implicit "mta" default on Channel.type ------------------
# The default was a silent FEATURE_MAILBOX_ADMIN_CHANNELS bypass:
# a nested mailbox/user POST that omitted ``type`` would land an
# "mta" channel even when "mta" was not in the allowlist. Every
# supported caller (DRF serializer, factories, admin) now passes
# ``type`` explicitly; the field is therefore required at the DB
# layer too, so any future regression fails loudly on INSERT.
migrations.AlterField(
model_name="channel",
name="type",
field=models.CharField(
help_text="Type of channel",
max_length=255,
verbose_name="type",
),
),
# -- encrypted_settings + creator user --------------------------------
migrations.AddField(
model_name="channel",
name="encrypted_settings",
field=encrypted_fields.fields.EncryptedJSONField(
blank=True,
default=dict,
help_text="Encrypted channel settings (e.g., app-specific passwords)",
verbose_name="encrypted settings",
),
),
# Set on scope_level=user channels — the User the channel is bound
# to; NULL for any other scope level. SET_NULL rather than CASCADE
# so a future constraint relaxation can never silently sweep up
# unrelated channels; user-scope channels are explicitly deleted by
# the pre_delete signal in core.signals before the User row is
# removed.
migrations.AddField(
model_name="channel",
name="user",
field=models.ForeignKey(
blank=True,
help_text=(
"User who owns (scope_level=user) or created (audit) "
"this channel"
),
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="channels",
to=settings.AUTH_USER_MODEL,
),
),
# -- scope_level + last_used_at ---------------------------------------
migrations.AddField(
model_name="channel",
name="scope_level",
field=models.CharField(
choices=[
("global", "Global"),
("maildomain", "Maildomain"),
("mailbox", "Mailbox"),
("user", "User"),
],
db_index=True,
help_text=(
"Resource scope the channel is bound to: 'global' "
"(instance-wide, no target — admin/CLI only), 'maildomain', "
"'mailbox', or 'user' (personal channel bound to ``user``)."
),
max_length=16,
null=True,
verbose_name="scope level",
),
),
migrations.AddField(
model_name="channel",
name="last_used_at",
field=models.DateTimeField(
blank=True,
db_index=True,
help_text=(
"Operational timestamp updated (throttled) whenever the "
"channel is used."
),
null=True,
verbose_name="last used at",
),
),
migrations.RunPython(backfill_scope_level, noop_reverse),
migrations.AlterField(
model_name="channel",
name="scope_level",
field=models.CharField(
choices=[
("global", "Global"),
("maildomain", "Maildomain"),
("mailbox", "Mailbox"),
("user", "User"),
],
db_index=True,
help_text=(
"Resource scope the channel is bound to: 'global' "
"(instance-wide, no target — admin/CLI only), 'maildomain', "
"'mailbox', or 'user' (personal channel bound to ``user``)."
),
max_length=16,
verbose_name="scope level",
),
),
# -- swap XOR check constraint for scope-level-driven one -------------
migrations.RemoveConstraint(
model_name="channel",
name="channel_has_target",
),
migrations.AddConstraint(
model_name="channel",
constraint=models.CheckConstraint(
# ``user`` is permitted on any scope as a creator-audit FK.
# Only the user-scope clause requires it NOT NULL (target).
condition=(
(
models.Q(scope_level="global")
& models.Q(mailbox__isnull=True)
& models.Q(maildomain__isnull=True)
)
| (
models.Q(scope_level="maildomain")
& models.Q(mailbox__isnull=True)
& models.Q(maildomain__isnull=False)
)
| (
models.Q(scope_level="mailbox")
& models.Q(mailbox__isnull=False)
& models.Q(maildomain__isnull=True)
)
| (
models.Q(scope_level="user")
& models.Q(mailbox__isnull=True)
& models.Q(maildomain__isnull=True)
& models.Q(user__isnull=False)
)
),
name="channel_scope_level_targets",
),
),
]

View File

@@ -7,6 +7,7 @@ import base64
import hashlib
import json
import re
import secrets
import uuid
from datetime import datetime as dt
from datetime import time, timedelta
@@ -29,10 +30,11 @@ from django.utils.text import slugify
import jsonschema
import pyzstd
from encrypted_fields.fields import EncryptedTextField
from encrypted_fields.fields import EncryptedJSONField, EncryptedTextField
from timezone_field import TimeZoneField
from core.enums import (
ChannelScopeLevel,
CompressionTypeChoices,
CRUDAbilities,
DKIMAlgorithmChoices,
@@ -438,8 +440,18 @@ class Channel(BaseModel):
"name", max_length=255, help_text="Human-readable name for this channel"
)
type = models.CharField(
"type", max_length=255, help_text="Type of channel", default="mta"
type = models.CharField("type", max_length=255, help_text="Type of channel")
scope_level = models.CharField(
"scope level",
max_length=16,
choices=ChannelScopeLevel.choices,
db_index=True,
help_text=(
"Resource scope the channel is bound to: 'global' (instance-wide, "
"no target — admin/CLI only), 'maildomain', 'mailbox', or 'user' "
"(personal channel bound to ``user``)."
),
)
settings = models.JSONField(
@@ -458,6 +470,33 @@ class Channel(BaseModel):
help_text="Mailbox that receives messages from this channel",
)
encrypted_settings = EncryptedJSONField(
"encrypted settings",
default=dict,
blank=True,
help_text="Encrypted channel settings (e.g., app-specific passwords)",
)
# Dual-purpose FK:
# - For ``scope_level=user`` channels, this is the target user the
# channel is bound to. The check constraint forces it NOT NULL.
# - For every other scope level, this is the creator audit (the user
# who created the channel via DRF). May be NULL for channels created
# by the CLI / Django admin / data migration.
# The FK uses SET_NULL rather than CASCADE so a user delete cannot
# blanket-cascade unrelated channels; user-scope channels are
# explicitly deleted by the pre_delete signal in core.signals before
# the User row is removed (otherwise SET_NULL would null user_id on a
# user-scope row and immediately violate the check constraint).
user = models.ForeignKey(
"User",
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="channels",
help_text="User who owns (scope_level=user) or created (audit) this channel",
)
maildomain = models.ForeignKey(
"MailDomain",
on_delete=models.CASCADE,
@@ -467,6 +506,16 @@ class Channel(BaseModel):
help_text="Mail domain that owns this channel",
)
last_used_at = models.DateTimeField(
"last used at",
null=True,
blank=True,
db_index=True,
help_text=(
"Operational timestamp updated (throttled) whenever the channel is used."
),
)
class Meta:
db_table = "messages_channel"
verbose_name = "channel"
@@ -475,15 +524,148 @@ class Channel(BaseModel):
constraints = [
models.CheckConstraint(
check=(
models.Q(mailbox__isnull=False) ^ models.Q(maildomain__isnull=False)
# The constraint enforces the mailbox/maildomain shape
# for each scope level. ``user`` is NOT in any of these
# clauses on purpose: it can be set on any scope as a
# creator-audit FK. The only place ``user`` shows up is
# the user-scope clause, where it must be NOT NULL
# (it's the target).
(
Q(scope_level=ChannelScopeLevel.GLOBAL)
& Q(mailbox__isnull=True)
& Q(maildomain__isnull=True)
)
| (
Q(scope_level=ChannelScopeLevel.MAILDOMAIN)
& Q(mailbox__isnull=True)
& Q(maildomain__isnull=False)
)
| (
Q(scope_level=ChannelScopeLevel.MAILBOX)
& Q(mailbox__isnull=False)
& Q(maildomain__isnull=True)
)
| (
Q(scope_level=ChannelScopeLevel.USER)
& Q(mailbox__isnull=True)
& Q(maildomain__isnull=True)
& Q(user__isnull=False)
)
),
name="channel_has_target",
name="channel_scope_level_targets",
),
]
def __str__(self):
return self.name
# The scope_level ↔ target invariant is enforced by the
# ``channel_scope_level_targets`` CheckConstraint above. BaseModel.save
# calls full_clean(), which calls validate_constraints(), which evaluates
# that Q() in Python and raises ValidationError before the row is sent
# to the DB. No custom clean() override is needed.
# --- api_key helpers --- #
API_KEY_PREFIX = "msgk_"
def rotate_api_key(self, *, save: bool = True) -> str:
"""Mint a fresh api_key plaintext, replace ``api_key_hashes`` with
the new SHA-256 digest, and return the plaintext exactly once.
Single-active rotation: any prior secret is invalidated immediately.
Dual-active "smooth" rotation (appending without removing) is not
exposed here — callers that need it must mutate ``encrypted_settings``
directly via the Django admin.
Set ``save=False`` for the DRF create path, where the row is being
built and ``super().create()`` will persist it shortly after.
"""
plaintext = self.API_KEY_PREFIX + secrets.token_urlsafe(32)
digest = hashlib.sha256(plaintext.encode("utf-8")).hexdigest()
self.encrypted_settings = {
**(self.encrypted_settings or {}),
"api_key_hashes": [digest],
}
if save:
self.save(update_fields=["encrypted_settings", "updated_at"])
return plaintext
def api_key_covers(
self, *, mailbox=None, maildomain=None, mailbox_roles=None
) -> bool:
"""Return True if an action on the given resource is within this
channel's scope, assuming the channel is an api_key and the scope
check has already passed.
- Global channels cover everything.
- Maildomain channels cover any mailbox in their domain.
- Mailbox channels cover only their mailbox.
- User channels cover any mailbox the target user has access to via
``MailboxAccess`` — and crucially, only if that access carries a
role in ``mailbox_roles`` when the kwarg is supplied. This is what
stops a viewer-only user from submitting via a personal api_key:
/submit/ passes ``MAILBOX_ROLES_CAN_SEND``, so a VIEWER access is
rejected here.
For mailbox / maildomain / global scopes the channel was bound by an
admin who already had authority over the resource, so the role check
does not apply — the api_key inherits the binding directly.
``mailbox_roles`` is ignored for non-user scopes.
"""
# Fail closed on ambiguous input: callers must ask about exactly one
# resource at a time, otherwise branch order below would silently
# decide which one "wins".
if mailbox is not None and maildomain is not None:
return False
if self.scope_level == ChannelScopeLevel.GLOBAL:
return True
if self.scope_level == ChannelScopeLevel.MAILDOMAIN:
if maildomain is not None:
return maildomain.id == self.maildomain_id
if mailbox is not None:
return mailbox.domain_id == self.maildomain_id
return False
if self.scope_level == ChannelScopeLevel.MAILBOX:
if mailbox is not None:
return mailbox.id == self.mailbox_id
return False
if self.scope_level == ChannelScopeLevel.USER:
if mailbox is None:
return False
qs = MailboxAccess.objects.filter(user_id=self.user_id, mailbox=mailbox)
if mailbox_roles is not None:
qs = qs.filter(role__in=mailbox_roles)
return qs.exists()
return False
def mark_used(self, only_if_older_than_seconds: int = 300):
"""Throttled update of last_used_at.
The throttle predicate is evaluated in the DB, not in Python:
``filter(pk=..., Q(last_used_at < cutoff) | Q(last_used_at IS NULL))
.update(last_used_at=now)``. This means concurrent requests racing
on the same channel coalesce into a single UPDATE — only the first
one matches, the others see the freshly written timestamp and
affect zero rows. Filtering on ``self.last_used_at`` in Python
would not give that guarantee because each worker would see a
stale in-memory value and all of them would issue the UPDATE.
We deliberately go through ``filter().update()`` rather than
``self.save()`` to skip full_clean() on global channels and to
avoid racing with other writers on adjacent fields.
"""
now = timezone.now()
cutoff = now - timedelta(seconds=only_if_older_than_seconds)
rows_affected = (
Channel.objects.filter(pk=self.pk)
.filter(Q(last_used_at__lt=cutoff) | Q(last_used_at__isnull=True))
.update(last_used_at=now)
)
if rows_affected:
self.last_used_at = now
class Mailbox(BaseModel):
"""Mailbox model to store mailbox information."""

View File

@@ -1,7 +1,14 @@
"""OpenSearch index and mapping configuration."""
# Index name constants
MESSAGE_INDEX = "messages"
import os
# Index name constants. When running under pytest-xdist, give every worker its
# own index so parallel test workers do not race on the same shared index
# (create/delete/index operations would otherwise step on each other and
# surface as flaky `resource_already_exists_exception` / missing-doc errors).
# In production PYTEST_XDIST_WORKER is unset, so the name stays "messages".
_XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "")
MESSAGE_INDEX = f"messages_{_XDIST_WORKER}" if _XDIST_WORKER else "messages"
# Schema definitions
MESSAGE_MAPPING = {

View File

@@ -9,6 +9,7 @@ from django.db.models.signals import post_delete, post_save, pre_delete
from django.dispatch import receiver
from core import models
from core.enums import ChannelScopeLevel
from core.services.identity.keycloak import (
sync_mailbox_to_keycloak_user,
sync_maildomain_to_keycloak_group,
@@ -54,7 +55,7 @@ def sync_mailbox_to_keycloak(sender, instance, created, **kwargs):
@receiver(post_save, sender=models.Message)
def index_message_post_save(sender, instance, created, **kwargs):
"""Index a message after it's saved."""
if not getattr(settings, "OPENSEARCH_INDEX_THREADS", False):
if not settings.OPENSEARCH_INDEX_THREADS:
return
try:
@@ -74,7 +75,7 @@ def index_message_post_save(sender, instance, created, **kwargs):
@receiver(post_save, sender=models.MessageRecipient)
def index_message_recipient_post_save(sender, instance, created, **kwargs):
"""Index a message recipient after it's saved."""
if not getattr(settings, "OPENSEARCH_INDEX_THREADS", False):
if not settings.OPENSEARCH_INDEX_THREADS:
return
try:
@@ -132,7 +133,7 @@ def update_thread_stats_on_delivery_status_change(sender, instance, **kwargs):
@receiver(post_save, sender=models.Thread)
def index_thread_post_save(sender, instance, created, **kwargs):
"""Index a thread after it's saved."""
if not getattr(settings, "OPENSEARCH_INDEX_THREADS", False):
if not settings.OPENSEARCH_INDEX_THREADS:
return
try:
@@ -167,7 +168,7 @@ def delete_message_blobs(sender, instance, **kwargs):
@receiver(post_delete, sender=models.Message)
def delete_message_from_index(sender, instance, **kwargs):
"""Remove a message from the index after it's deleted."""
if not getattr(settings, "OPENSEARCH_INDEX_THREADS", False):
if not settings.OPENSEARCH_INDEX_THREADS:
return
try:
@@ -191,7 +192,7 @@ def delete_message_from_index(sender, instance, **kwargs):
@receiver(post_delete, sender=models.Thread)
def delete_thread_from_index(sender, instance, **kwargs):
"""Remove a thread and its messages from the index after it's deleted."""
if not getattr(settings, "OPENSEARCH_INDEX_THREADS", False):
if not settings.OPENSEARCH_INDEX_THREADS:
return
try:
@@ -245,7 +246,7 @@ def delete_orphan_draft_attachments(sender, instance, **kwargs):
@receiver(post_save, sender=models.ThreadAccess)
def update_mailbox_flags_on_access_save(sender, instance, created, **kwargs):
"""Update mailbox flags in OpenSearch when ThreadAccess read/starred state changes."""
if not getattr(settings, "OPENSEARCH_INDEX_THREADS", False):
if not settings.OPENSEARCH_INDEX_THREADS:
return
update_fields = kwargs.get("update_fields")
@@ -271,7 +272,7 @@ def update_mailbox_flags_on_access_save(sender, instance, created, **kwargs):
@receiver(post_delete, sender=models.ThreadAccess)
def update_unread_mailboxes_on_access_delete(sender, instance, **kwargs):
"""Update unread_mailboxes in OpenSearch when a ThreadAccess is deleted."""
if not getattr(settings, "OPENSEARCH_INDEX_THREADS", False):
if not settings.OPENSEARCH_INDEX_THREADS:
return
thread_id = str(instance.thread_id)
@@ -286,3 +287,27 @@ def update_unread_mailboxes_on_access_delete(sender, instance, **kwargs):
instance.thread_id,
e,
)
@receiver(pre_delete, sender=models.User)
def delete_user_scope_channels_on_user_delete(sender, instance, **kwargs):
"""Delete the user's personal (scope_level=user) Channels before the
user row is removed.
Channel.user uses on_delete=SET_NULL deliberately — the FK alone must
not blanket-cascade, because a future relaxation of the
channel_scope_level_targets check constraint could otherwise let a
user delete silently sweep up unrelated channels. This handler is the
*only* place where user-scope channels are removed in response to a
user deletion. The query is filtered explicitly on
``scope_level=user``, never on the FK alone.
If we did not delete these rows here, SET_NULL on the FK would null
``user_id`` on the user-scope rows, immediately violating the check
constraint and aborting the user delete with an IntegrityError — so
this signal is also load-bearing for user deletion to succeed at all.
"""
models.Channel.objects.filter(
user=instance,
scope_level=ChannelScopeLevel.USER,
).delete()

View File

@@ -0,0 +1,28 @@
{# Shared <style> block for admin change-form pages that add a custom #}
{# button to the ``object-tools`` row via a <form><button> pair (instead #}
{# of the usual <a class="historylink"> link). Mirrors the link styling #}
{# so the buttons are visually indistinguishable from the built-in ones. #}
<style>
.object-tools button.historylink {
appearance: none;
background: var(--object-tools-bg);
border-radius: 15px;
border: none;
color: var(--object-tools-fg);
cursor: pointer;
display: block;
float: left;
font-size: 0.6875rem;
font-weight: 400;
letter-spacing: 0.5px;
line-height: inherit;
padding: 3px 12px;
text-transform: uppercase;
transition: color 0.15s, background 0.15s;
}
.object-tools button:focus,
.object-tools button:hover {
background-color: var(--object-tools-hover-bg);
}
</style>

View File

@@ -0,0 +1,24 @@
{% extends "admin/change_form.html" %}
{% load i18n admin_urls %}
{% block extrastyle %}
{{ block.super }}
{% include "admin/_object_tools_button_styles.html" %}
{% endblock %}
{% block object-tools-items %}
{% if original and original.type == "api_key" %}
<li>
<form method="post"
action="{% url 'admin:core_channel_regenerate_api_key' original.pk %}"
style="display: inline;"
onsubmit="return confirm('Regenerate the api_key for this channel? The previous secret will stop working immediately.');">
{% csrf_token %}
<button type="submit" class="historylink">
{% translate "Regenerate api_key" %}
</button>
</form>
</li>
{% endif %}
{{ block.super }}
{% endblock %}

View File

@@ -0,0 +1,28 @@
{% extends "admin/base_site.html" %}
{% load i18n admin_urls %}
{% block breadcrumbs %}
<div class="breadcrumbs">
<a href="{% url 'admin:index' %}">{% translate 'Home' %}</a>
&rsaquo; <a href="{% url 'admin:app_list' app_label=opts.app_label %}">{{ opts.app_config.verbose_name }}</a>
&rsaquo; <a href="{% url 'admin:core_channel_changelist' %}">{{ opts.verbose_name_plural|capfirst }}</a>
&rsaquo; <a href="{% url 'admin:core_channel_change' original.pk %}">{{ original }}</a>
&rsaquo; {% translate "Regenerated api_key" %}
</div>
{% endblock %}
{% block content %}
<h1>{% translate "New api_key generated" %}</h1>
<p><strong>{% translate "Copy this value now. It will not be shown again." %}</strong></p>
<p>{% blocktranslate %}The previous secret has been invalidated immediately. Any client still using the old api_key will start failing on its next call.{% endblocktranslate %}</p>
<pre style="padding: 12px; background: #f5f5f5; border: 1px solid #ccc; border-radius: 4px; user-select: all; word-break: break-all;">{{ api_key }}</pre>
<p style="margin-top: 24px;">
<a class="button" href="{% url 'admin:core_channel_change' original.pk %}">
{% translate "Back to channel" %}
</a>
</p>
{% endblock %}

View File

@@ -2,30 +2,7 @@
{% block extrastyle %}
{{ block.super }}
<style>
.object-tools button.historylink {
appearance: none;
background: var(--object-tools-bg);
border-radius: 15px;
border: none;
color: var(--object-tools-fg);
cursor: pointer;
display: block;
float: left;
font-size: 0.6875rem;
font-weight: 400;
letter-spacing: 0.5px;
line-height: inherit;
padding: 3px 12px;
text-transform: uppercase;
transition: color 0.15s, background 0.15s;
}
.object-tools button:focus,
.object-tools button:hover {
background-color: var(--object-tools-hover-bg);
}
</style>
{% include "admin/_object_tools_button_styles.html" %}
{% endblock %}
{% load i18n admin_urls %}

View File

@@ -0,0 +1,176 @@
"""Tests for ChannelApiKeyAuthentication + HasChannelScope.
Covers the cross-cutting auth/permission layer directly, independent of any
specific endpoint. Endpoint-specific scope enforcement is tested in each
viewset's own test file (test_submit, test_provisioning_*, test_*_metrics).
"""
# pylint: disable=missing-function-docstring,missing-class-docstring,import-outside-toplevel
import hashlib
import uuid
from datetime import timedelta
from django.utils import timezone
import pytest
from core import models
from core.enums import ChannelApiKeyScope, ChannelScopeLevel
from core.factories import MailboxFactory, make_api_key_channel
SUBMIT_URL = "/api/v1.0/submit/"
def _make_channel(scopes=(ChannelApiKeyScope.MESSAGES_SEND.value,), **kwargs):
"""Wrapper around the shared factory pre-loaded with the auth-class
test default scope (messages:send). Callers can still override
``scopes`` and any other kwarg."""
return make_api_key_channel(scopes=scopes, **kwargs)
@pytest.mark.django_db
class TestChannelApiKeyAuth:
"""Direct edge-case coverage for the authentication class."""
def test_missing_headers_returns_401(self, client):
"""No headers at all → DRF NotAuthenticated → 401."""
response = client.post(SUBMIT_URL)
assert response.status_code == 401
def test_malformed_channel_id_returns_401(self, client):
response = client.post(
SUBMIT_URL,
HTTP_X_CHANNEL_ID="not-a-uuid",
HTTP_X_API_KEY="anything",
)
assert response.status_code == 401
def test_wrong_secret_returns_401(self, client):
channel, _plaintext = _make_channel()
response = client.post(
SUBMIT_URL,
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY="not-the-real-secret",
)
assert response.status_code == 401
def test_expired_key_returns_401(self, client):
past = (timezone.now() - timedelta(days=1)).isoformat()
channel, plaintext = _make_channel(extra_settings={"expires_at": past})
response = client.post(
SUBMIT_URL,
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY=plaintext,
)
assert response.status_code == 401
def test_non_api_key_channel_cannot_authenticate(self, client):
"""A widget/mta channel with a hash in encrypted_settings must not
authenticate the api_key path — the authentication class filters by
type='api_key' explicitly."""
plaintext = f"msg_test_{uuid.uuid4().hex}"
digest = hashlib.sha256(plaintext.encode("utf-8")).hexdigest()
channel = models.Channel(
name="not-api-key",
type="widget",
scope_level=ChannelScopeLevel.GLOBAL,
encrypted_settings={"api_key_hashes": [digest]},
settings={"scopes": [ChannelApiKeyScope.MESSAGES_SEND.value]},
)
channel.save()
response = client.post(
SUBMIT_URL,
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY=plaintext,
)
assert response.status_code == 401
def test_last_used_at_updates_on_success(self, client):
"""A successful auth call updates last_used_at within the throttle window."""
channel, plaintext = _make_channel()
mailbox = MailboxFactory()
assert channel.last_used_at is None
# We don't need the call to succeed end-to-end for the submit endpoint.
# We just need to reach the authentication class, which itself calls
# mark_used() on success. Submit without a body → 400, but auth has
# already passed by then.
response = client.post(
SUBMIT_URL,
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY=plaintext,
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="to@x.test",
)
# Auth must have passed; the post-auth pipeline is allowed to
# produce 202 (accepted), 400 (empty body / validation), or 403
# (scope mismatch). 500 is NOT accepted — a server error here
# would mask a real bug behind a "test still green" signal.
assert response.status_code in (202, 400, 403)
channel.refresh_from_db()
# Only check that last_used_at moved past the None state when
# the auth call succeeded (scopes include messages:send + global).
assert channel.last_used_at is not None
@pytest.mark.django_db
class TestHasChannelScope:
"""Direct tests for HasChannelScope.has_permission."""
def test_scope_present(self, rf):
channel, _ = _make_channel(scopes=(ChannelApiKeyScope.MESSAGES_SEND.value,))
from core.api.permissions import channel_scope
perm_class = channel_scope(ChannelApiKeyScope.MESSAGES_SEND)
request = rf.post("/")
request.auth = channel
assert perm_class().has_permission(request, None) is True
def test_scope_absent(self, rf):
channel, _ = _make_channel(scopes=(ChannelApiKeyScope.METRICS_READ.value,))
from core.api.permissions import channel_scope
perm_class = channel_scope(ChannelApiKeyScope.MESSAGES_SEND)
request = rf.post("/")
request.auth = channel
assert perm_class().has_permission(request, None) is False
def test_auth_not_a_channel(self, rf):
from core.api.permissions import channel_scope
perm_class = channel_scope(ChannelApiKeyScope.MESSAGES_SEND)
request = rf.post("/")
request.auth = None
assert perm_class().has_permission(request, None) is False
@pytest.mark.django_db
class TestApiKeyCovers:
"""Channel.api_key_covers resource-scope helper."""
def test_global_covers_everything(self):
channel, _ = _make_channel(scope_level=ChannelScopeLevel.GLOBAL)
mailbox = MailboxFactory()
assert channel.api_key_covers(mailbox=mailbox) is True
assert channel.api_key_covers(maildomain=mailbox.domain) is True
def test_maildomain_covers_only_its_domain(self):
mailbox = MailboxFactory()
channel, _ = _make_channel(
scope_level=ChannelScopeLevel.MAILDOMAIN,
maildomain=mailbox.domain,
)
assert channel.api_key_covers(mailbox=mailbox) is True
other = MailboxFactory()
assert channel.api_key_covers(mailbox=other) is False
def test_mailbox_covers_only_itself(self):
mailbox = MailboxFactory()
channel, _ = _make_channel(
scope_level=ChannelScopeLevel.MAILBOX,
mailbox=mailbox,
)
assert channel.api_key_covers(mailbox=mailbox) is True
other = MailboxFactory()
assert channel.api_key_covers(mailbox=other) is False

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
"""Tests for the channel API endpoints."""
# pylint: disable=redefined-outer-name, unused-argument, too-many-public-methods
# pylint: disable=redefined-outer-name, unused-argument, too-many-public-methods, import-outside-toplevel
import uuid
@@ -252,6 +252,20 @@ class TestChannelCreate:
assert response.status_code == status.HTTP_201_CREATED
assert response.data["type"] == "widget"
@override_settings(FEATURE_MAILBOX_ADMIN_CHANNELS=["widget"])
def test_create_channel_missing_type_is_rejected(self, api_client, mailbox):
"""Omitting ``type`` on CREATE must be a 400 — never silently default
to "mta" and bypass FEATURE_MAILBOX_ADMIN_CHANNELS. Regression lock
for the bug where the model field default and the serializer's
``if channel_type:`` short-circuit combined to let "mta" channels
through on any nested mailbox/user POST."""
url = reverse("mailbox-channels-list", kwargs={"mailbox_id": mailbox.id})
response = api_client.post(
url, {"name": "no type", "settings": {}}, format="json"
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "type" in response.data
@pytest.mark.django_db
class TestChannelRetrieve:
@@ -421,3 +435,134 @@ class TestChannelDomainAdminAccess:
assert response.status_code == status.HTTP_201_CREATED
assert response.data["name"] == "Domain Admin Widget"
@pytest.mark.django_db
class TestChannelEncryptedSettings:
"""Test encrypted_settings and user fields on the Channel model."""
def test_encrypted_settings_stored_on_model(self, mailbox):
"""encrypted_settings can be set and read back."""
channel = ChannelFactory(
mailbox=mailbox, type="widget", settings={"public": "value"}
)
channel.encrypted_settings = {"secret_key": "s3cret"}
channel.save()
channel.refresh_from_db()
assert channel.encrypted_settings["secret_key"] == "s3cret"
assert channel.settings["public"] == "value"
def test_encrypted_settings_not_in_api_response(self, api_client, mailbox):
"""encrypted_settings must never leak in the REST API — neither as
a top-level key nor smuggled into the visible ``settings`` payload."""
channel = ChannelFactory(mailbox=mailbox, type="widget")
channel.encrypted_settings = {"password": "s3cret"}
channel.save()
url = reverse(
"mailbox-channels-detail",
kwargs={"mailbox_id": mailbox.id, "pk": channel.id},
)
response = api_client.get(url)
assert response.status_code == status.HTTP_200_OK
assert "encrypted_settings" not in response.data
assert "password" not in response.data
# Defense in depth: a serializer bug that copied encrypted_settings
# into the visible ``settings`` JSON would also be a leak. Inspect
# both the secret keys and the secret values.
settings_payload = response.data.get("settings") or {}
assert "password" not in settings_payload
assert "s3cret" not in settings_payload.values()
def test_encrypted_settings_not_in_list_response(self, api_client, mailbox):
"""encrypted_settings must not appear in list responses either —
same defense-in-depth check on each item's ``settings`` payload."""
channel = ChannelFactory(mailbox=mailbox, type="widget")
channel.encrypted_settings = {"token": "abc"}
channel.save()
url = reverse("mailbox-channels-list", kwargs={"mailbox_id": mailbox.id})
response = api_client.get(url)
assert response.status_code == status.HTTP_200_OK
for item in response.data:
assert "encrypted_settings" not in item
settings_payload = item.get("settings") or {}
assert "token" not in settings_payload
assert "abc" not in settings_payload.values()
def test_user_field_target_on_user_scope_channel(self, user):
"""Channel.user is the *target* user for scope_level=user channels."""
channel = ChannelFactory(
user=user, scope_level="user", mailbox=None, maildomain=None
)
channel.refresh_from_db()
assert channel.user == user
def test_user_field_creator_audit_on_mailbox_scope_channel(self, user, mailbox):
"""On non-user-scope channels, Channel.user is the creator audit
— an OPTIONAL FK pointing at the User who created it via DRF."""
channel = ChannelFactory(mailbox=mailbox, type="widget", user=user)
channel.refresh_from_db()
assert channel.user == user
def test_user_field_nullable_on_mailbox_scope_channel(self, mailbox):
"""The creator FK is nullable — channels created by the CLI / data
migration / Django admin may not have a creator stamped."""
channel = ChannelFactory(mailbox=mailbox, type="widget")
assert channel.user is None
@pytest.mark.django_db
class TestChannelReservedSettingsKeys:
"""The serializer rejects callers that try to write reserved settings
keys (e.g. ``api_key_hashes``). Server-side generators write directly
to encrypted_settings, callers cannot influence its contents."""
@override_settings(FEATURE_MAILBOX_ADMIN_CHANNELS=["api_key"])
def test_post_with_reserved_key_in_settings_is_rejected(self, api_client, mailbox):
"""Smuggling api_key_hashes via settings is a 400."""
url = reverse("mailbox-channels-list", kwargs={"mailbox_id": mailbox.id})
response = api_client.post(
url,
data={
"name": "Tries to inject a hash",
"type": "api_key",
"settings": {
"scopes": ["messages:send"],
"api_key_hashes": ["a" * 64],
},
},
format="json",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
@override_settings(FEATURE_MAILBOX_ADMIN_CHANNELS=["api_key"])
def test_unrelated_settings_keys_pass_through(self, api_client, mailbox):
"""Non-reserved keys in settings (e.g. expires_at) flow through
the API as caller-supplied data — only the reserved list is locked
down."""
url = reverse("mailbox-channels-list", kwargs={"mailbox_id": mailbox.id})
response = api_client.post(
url,
data={
"name": "Has expires_at",
"type": "api_key",
"settings": {
"scopes": ["messages:send"],
"expires_at": "2030-01-01T00:00:00Z",
},
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
channel = models.Channel.objects.get(id=response.data["id"])
assert channel.settings["expires_at"] == "2030-01-01T00:00:00Z"
# And api_key_hashes is what the server generated, not what the
# caller smuggled (the caller didn't smuggle anything here, but
# we still assert the encrypted_settings shape).
assert "api_key_hashes" in channel.encrypted_settings
assert "api_key_hashes" not in channel.settings

View File

@@ -58,18 +58,20 @@ def fixture_channel_without_mailbox():
@pytest.mark.django_db
def test_inbound_widget_channel_model():
"""Test the Channel model."""
"""Test the Channel model scope_level invariants.
- mailbox AND maildomain both set → rejected (scope_level must pick one).
- neither set → valid global channel (scope_level=global).
"""
with pytest.raises(ValidationError):
factories.ChannelFactory(
mailbox=factories.MailboxFactory(),
maildomain=factories.MailDomainFactory(),
)
with pytest.raises(ValidationError):
factories.ChannelFactory(
mailbox=None,
maildomain=None,
)
# A channel with neither mailbox nor maildomain is now a valid global channel.
global_channel = factories.ChannelFactory(mailbox=None, maildomain=None)
assert global_channel.scope_level == "global"
@pytest.mark.django_db

View File

@@ -1,11 +1,14 @@
"""Tests for the mailbox usage metrics endpoint."""
# pylint: disable=redefined-outer-name, too-many-public-methods
# pylint: disable=redefined-outer-name, too-many-public-methods, too-many-lines, unused-argument
from django.urls import reverse
import pytest
from core.enums import MessageTemplateTypeChoices
from core.enums import (
ChannelApiKeyScope,
MessageTemplateTypeChoices,
)
from core.factories import (
AttachmentFactory,
BlobFactory,
@@ -16,6 +19,7 @@ from core.factories import (
MessageTemplateFactory,
ThreadAccessFactory,
ThreadFactory,
make_api_key_channel,
)
@@ -26,9 +30,16 @@ def url():
@pytest.fixture
def correctly_configured_header(settings):
"""Returns the authentication header for the metrics endpoint."""
return {"HTTP_AUTHORIZATION": f"Bearer {settings.METRICS_API_KEY}"}
def correctly_configured_header(db):
"""Returns the authentication headers via a global api_key Channel."""
channel, plaintext = make_api_key_channel(
scopes=(ChannelApiKeyScope.METRICS_READ.value,),
name="metrics-test",
)
return {
"HTTP_X_CHANNEL_ID": str(channel.id),
"HTTP_X_API_KEY": plaintext,
}
CUSTOM_ATTRIBUTES_SCHEMA = {
@@ -52,14 +63,23 @@ class TestMailboxUsageMetrics:
@pytest.mark.django_db
def test_requires_auth(self, api_client, url, correctly_configured_header):
"""Requires valid API key for access."""
"""Requires a valid api_key Channel to access.
No headers → 401 (NotAuthenticated). Invalid → 401. Valid → 200.
"""
# Without authentication
response = api_client.get(url)
assert response.status_code == 403
assert response.status_code == 401
# Invalid authentication
response = api_client.get(url, HTTP_AUTHORIZATION="Bearer invalid_token")
assert response.status_code == 403
# Invalid authentication: reuse the real channel id with a wrong
# secret so the hash-mismatch branch (not just Channel.DoesNotExist)
# is exercised.
response = api_client.get(
url,
HTTP_X_CHANNEL_ID=correctly_configured_header["HTTP_X_CHANNEL_ID"],
HTTP_X_API_KEY="invalid_token",
)
assert response.status_code == 401
# Valid authentication
response = api_client.get(url, **correctly_configured_header)

View File

@@ -1,12 +1,17 @@
"""Tests for the Maildomain users metrics endpoint."""
# pylint: disable=redefined-outer-name, unused-argument, too-many-lines
import uuid
from django.urls import reverse
from django.utils import timezone
import pytest
from core.enums import MessageTemplateTypeChoices
from core.enums import (
ChannelApiKeyScope,
MessageTemplateTypeChoices,
)
from core.factories import (
AttachmentFactory,
BlobFactory,
@@ -19,10 +24,19 @@ from core.factories import (
ThreadAccessFactory,
ThreadFactory,
UserFactory,
make_api_key_channel,
)
from core.models import MailboxAccess, MailDomain
def _make_metrics_api_key():
"""Create a global-scope api_key channel with metrics:read."""
return make_api_key_channel(
scopes=(ChannelApiKeyScope.METRICS_READ.value,),
name="metrics-test",
)
def check_results_for_key(
results: dict | list,
expected: dict[str, int],
@@ -69,11 +83,16 @@ def url_with_siret_query_param(url):
@pytest.fixture
def correctly_configured_header(settings):
def correctly_configured_header(db):
"""
Returns the authentication header for the metrics endpoint.
Returns the authentication headers for the metrics endpoint via a
scope_level=global api_key Channel with metrics:read.
"""
return {"HTTP_AUTHORIZATION": f"Bearer {settings.METRICS_API_KEY}"}
channel, plaintext = _make_metrics_api_key()
return {
"HTTP_X_CHANNEL_ID": str(channel.id),
"HTTP_X_API_KEY": plaintext,
}
def grant_access_to_mailbox_accessed_at(mailbox, user, accessed_at: timezone = None):
@@ -131,18 +150,22 @@ class TestMailDomainUsersMetrics:
self, api_client, url, correctly_configured_header
):
"""
Requires valid API key for access.
Requires a valid api_key Channel to access.
Asserts that requests without or with invalid authentication are rejected (403),
and requests with the correct API key are accepted (200).
No headers → 401 (NotAuthenticated). Invalid credentials → 401
(AuthenticationFailed). Valid → 200.
"""
# Test without authentication
response = api_client.get(url)
assert response.status_code == 403
assert response.status_code == 401
# Test with invalid authentication
response = api_client.get(url, HTTP_AUTHORIZATION="Bearer invalid_token")
assert response.status_code == 403
response = api_client.get(
url,
HTTP_X_CHANNEL_ID=str(uuid.uuid4()),
HTTP_X_API_KEY="invalid_token",
)
assert response.status_code == 401
# Test with authentication
response = api_client.get(url, **correctly_configured_header)

View File

@@ -0,0 +1,422 @@
"""Tests for the provisioning mailbox lookup endpoint."""
# pylint: disable=redefined-outer-name,missing-function-docstring
import uuid
from django.urls import reverse
import pytest
from rest_framework.test import APIClient
from core.enums import (
ChannelApiKeyScope,
ChannelScopeLevel,
MailboxRoleChoices,
)
from core.factories import (
MailboxAccessFactory,
MailboxFactory,
MailDomainFactory,
UserFactory,
make_api_key_channel,
)
MAILBOX_URL = reverse("provisioning-mailboxes")
def _make_api_key_channel(scopes=(ChannelApiKeyScope.MAILBOXES_READ.value,), **kwargs):
"""Wrapper around the shared factory pre-loaded with the
provisioning-endpoint default scope (mailboxes:read). Callers can
still override ``scopes`` and any other kwarg."""
return make_api_key_channel(scopes=scopes, **kwargs)
@pytest.fixture
def auth_header():
"""Global-scope api_key with mailboxes:read."""
channel, plaintext = _make_api_key_channel()
return {
"HTTP_X_CHANNEL_ID": str(channel.id),
"HTTP_X_API_KEY": plaintext,
}
@pytest.fixture
def domain():
return MailDomainFactory(name="company.com")
@pytest.fixture
def mailbox(domain):
return MailboxFactory(local_part="contact", domain=domain)
# =============================================================================
# Security — API key required, no bypass possible
# =============================================================================
@pytest.mark.django_db
class TestServiceAuthSecurity:
"""Verify that the provisioning endpoint requires ChannelApiKeyScope.MAILBOXES_READ."""
def test_user_email_no_auth_returns_401(self, client):
response = client.get(MAILBOX_URL, {"user_email": "a@b.com"})
assert response.status_code == 401
def test_user_email_wrong_token_returns_401(self, client):
channel, _plaintext = _make_api_key_channel()
response = client.get(
MAILBOX_URL,
{"user_email": "a@b.com"},
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY="not-the-real-key",
)
assert response.status_code == 401
def test_user_email_unknown_channel_returns_401(self, client):
response = client.get(
MAILBOX_URL,
{"user_email": "a@b.com"},
HTTP_X_CHANNEL_ID=str(uuid.uuid4()),
HTTP_X_API_KEY="anything",
)
assert response.status_code == 401
def test_user_email_malformed_channel_returns_401(self, client):
"""A non-UUID X-Channel-Id must be rejected by the auth class
before any DB lookup. Exercises the ValueError/ValidationError
branch in ChannelApiKeyAuthentication, distinct from the
DoesNotExist branch covered by the unknown-channel test above."""
response = client.get(
MAILBOX_URL,
{"user_email": "a@b.com"},
HTTP_X_CHANNEL_ID="not-a-uuid",
HTTP_X_API_KEY="anything",
)
assert response.status_code == 401
def test_user_email_wrong_scope_returns_403(self, client):
channel, plaintext = _make_api_key_channel(
scopes=(ChannelApiKeyScope.METRICS_READ.value,),
)
response = client.get(
MAILBOX_URL,
{"user_email": "a@b.com"},
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY=plaintext,
)
assert response.status_code == 403
def test_email_no_auth_returns_401(self, client):
response = client.get(MAILBOX_URL, {"email": "a@b.com"})
assert response.status_code == 401
def test_oidc_user_cannot_use_user_email_param(self, mailbox):
user = UserFactory(email="attacker@oidc.example.com")
MailboxAccessFactory(mailbox=mailbox, user=user, role=MailboxRoleChoices.VIEWER)
api_client = APIClient()
api_client.force_authenticate(user=user)
response = api_client.get(
MAILBOX_URL, {"user_email": "victim@oidc.example.com"}
)
assert response.status_code == 403
def test_oidc_user_cannot_use_email_param(self, mailbox):
user = UserFactory(email="attacker@oidc.example.com")
MailboxAccessFactory(mailbox=mailbox, user=user, role=MailboxRoleChoices.VIEWER)
api_client = APIClient()
api_client.force_authenticate(user=user)
response = api_client.get(MAILBOX_URL, {"email": "contact@company.com"})
assert response.status_code == 403
def test_no_query_param_returns_400(self, client, auth_header):
response = client.get(MAILBOX_URL, **auth_header)
assert response.status_code == 400
# =============================================================================
# GET /provisioning/mailboxes/?user_email=...
# =============================================================================
@pytest.mark.django_db
class TestMailboxListByUser:
"""Tests for listing mailboxes by user_email."""
def test_returns_mailboxes_for_user(self, client, auth_header, mailbox):
user = UserFactory(email="alice@oidc.example.com")
MailboxAccessFactory(mailbox=mailbox, user=user, role=MailboxRoleChoices.ADMIN)
response = client.get(
MAILBOX_URL,
{"user_email": "alice@oidc.example.com"},
**auth_header,
)
assert response.status_code == 200
results = response.json()["results"]
assert len(results) == 1
assert results[0]["id"] == str(mailbox.id)
assert results[0]["email"] == "contact@company.com"
assert results[0]["role"] == "admin"
def test_returns_multiple_mailboxes_with_roles(self, client, auth_header, domain):
user = UserFactory(email="bob@oidc.example.com")
mb1 = MailboxFactory(local_part="info", domain=domain)
mb2 = MailboxFactory(local_part="support", domain=domain)
MailboxAccessFactory(mailbox=mb1, user=user, role=MailboxRoleChoices.SENDER)
MailboxAccessFactory(mailbox=mb2, user=user, role=MailboxRoleChoices.VIEWER)
response = client.get(
MAILBOX_URL,
{"user_email": "bob@oidc.example.com"},
**auth_header,
)
assert response.status_code == 200
results = response.json()["results"]
assert len(results) == 2
by_email = {r["email"]: r for r in results}
assert by_email["info@company.com"]["role"] == "sender"
assert by_email["support@company.com"]["role"] == "viewer"
def test_returns_empty_for_unknown_user(self, client, auth_header):
response = client.get(
MAILBOX_URL,
{"user_email": "nobody@nowhere.com"},
**auth_header,
)
assert response.status_code == 200
assert response.json()["results"] == []
def test_response_fields(self, client, auth_header, mailbox):
"""Response includes id, email, name, role, and users."""
user = UserFactory(email="alice@oidc.example.com")
MailboxAccessFactory(mailbox=mailbox, user=user, role=MailboxRoleChoices.ADMIN)
response = client.get(
MAILBOX_URL,
{"user_email": "alice@oidc.example.com"},
**auth_header,
)
result = response.json()["results"][0]
assert set(result.keys()) == {"id", "email", "name", "role", "users"}
def test_users_includes_all_mailbox_users(self, client, auth_header, mailbox):
"""The users array lists ALL users with access, not just the queried one."""
alice = UserFactory(email="alice@oidc.example.com")
bob = UserFactory(email="bob@oidc.example.com")
MailboxAccessFactory(mailbox=mailbox, user=alice, role=MailboxRoleChoices.ADMIN)
MailboxAccessFactory(mailbox=mailbox, user=bob, role=MailboxRoleChoices.SENDER)
response = client.get(
MAILBOX_URL,
{"user_email": "alice@oidc.example.com"},
**auth_header,
)
result = response.json()["results"][0]
users_by_email = {u["email"]: u for u in result["users"]}
assert len(users_by_email) == 2
assert users_by_email["alice@oidc.example.com"]["role"] == "admin"
assert users_by_email["bob@oidc.example.com"]["role"] == "sender"
# =============================================================================
# GET /provisioning/mailboxes/?email=...
# =============================================================================
@pytest.mark.django_db
class TestMailboxListByEmail:
"""Tests for looking up a mailbox by its email address."""
def test_returns_mailbox_by_email(self, client, auth_header, mailbox):
alice = UserFactory(email="alice@oidc.example.com")
MailboxAccessFactory(mailbox=mailbox, user=alice, role=MailboxRoleChoices.ADMIN)
response = client.get(
MAILBOX_URL,
{"email": "contact@company.com"},
**auth_header,
)
assert response.status_code == 200
results = response.json()["results"]
assert len(results) == 1
assert results[0]["id"] == str(mailbox.id)
assert results[0]["email"] == "contact@company.com"
# No top-level role in email lookup mode
assert "role" not in results[0]
def test_email_lookup_includes_users(self, client, auth_header, mailbox):
"""Email lookup also includes the users array."""
alice = UserFactory(email="alice@oidc.example.com")
bob = UserFactory(email="bob@oidc.example.com")
MailboxAccessFactory(mailbox=mailbox, user=alice, role=MailboxRoleChoices.ADMIN)
MailboxAccessFactory(mailbox=mailbox, user=bob, role=MailboxRoleChoices.VIEWER)
response = client.get(
MAILBOX_URL,
{"email": "contact@company.com"},
**auth_header,
)
result = response.json()["results"][0]
assert "users" in result
users_by_email = {u["email"]: u for u in result["users"]}
assert users_by_email["alice@oidc.example.com"]["role"] == "admin"
assert users_by_email["bob@oidc.example.com"]["role"] == "viewer"
def test_returns_empty_for_unknown_email(self, client, auth_header):
response = client.get(
MAILBOX_URL,
{"email": "nope@nowhere.com"},
**auth_header,
)
assert response.status_code == 200
assert response.json()["results"] == []
def test_returns_empty_for_invalid_email(self, client, auth_header):
response = client.get(
MAILBOX_URL,
{"email": "no-at-sign"},
**auth_header,
)
assert response.status_code == 200
assert response.json()["results"] == []
# =============================================================================
# add_maildomain_custom_attributes
# =============================================================================
@pytest.mark.django_db
class TestMaildomainCustomAttributes:
"""Test the add_maildomain_custom_attributes query parameter."""
def test_user_email_with_custom_attributes(self, client, auth_header, domain):
domain.custom_attributes = {"siret": "123456789", "org_name": "ACME"}
domain.save()
mb = MailboxFactory(local_part="info", domain=domain)
user = UserFactory(email="alice@oidc.example.com")
MailboxAccessFactory(mailbox=mb, user=user, role=MailboxRoleChoices.ADMIN)
response = client.get(
MAILBOX_URL,
{
"user_email": "alice@oidc.example.com",
"add_maildomain_custom_attributes": "siret,org_name",
},
**auth_header,
)
assert response.status_code == 200
result = response.json()["results"][0]
assert result["maildomain_custom_attributes"]["siret"] == "123456789"
assert result["maildomain_custom_attributes"]["org_name"] == "ACME"
def test_email_with_custom_attributes(self, client, auth_header, domain):
domain.custom_attributes = {"siret": "987654321"}
domain.save()
MailboxFactory(local_part="info", domain=domain)
response = client.get(
MAILBOX_URL,
{
"email": "info@company.com",
"add_maildomain_custom_attributes": "siret",
},
**auth_header,
)
assert response.status_code == 200
result = response.json()["results"][0]
assert result["maildomain_custom_attributes"]["siret"] == "987654321"
def test_missing_key_returns_none(self, client, auth_header, domain):
domain.custom_attributes = {"siret": "123"}
domain.save()
mb = MailboxFactory(local_part="info", domain=domain)
user = UserFactory(email="alice@oidc.example.com")
MailboxAccessFactory(mailbox=mb, user=user, role=MailboxRoleChoices.ADMIN)
response = client.get(
MAILBOX_URL,
{
"user_email": "alice@oidc.example.com",
"add_maildomain_custom_attributes": "siret,nonexistent",
},
**auth_header,
)
result = response.json()["results"][0]
assert result["maildomain_custom_attributes"]["siret"] == "123"
assert result["maildomain_custom_attributes"]["nonexistent"] is None
def test_no_param_means_no_field(self, client, auth_header, domain):
"""Without the param, maildomain_custom_attributes is absent."""
domain.custom_attributes = {"siret": "123"}
domain.save()
mb = MailboxFactory(local_part="info", domain=domain)
user = UserFactory(email="alice@oidc.example.com")
MailboxAccessFactory(mailbox=mb, user=user, role=MailboxRoleChoices.ADMIN)
response = client.get(
MAILBOX_URL,
{"user_email": "alice@oidc.example.com"},
**auth_header,
)
result = response.json()["results"][0]
assert "maildomain_custom_attributes" not in result
# =============================================================================
# ProvisioningMailboxView is global-only — non-global api_key channels are
# rejected, regardless of which scope filter would otherwise narrow results.
# =============================================================================
@pytest.mark.django_db
class TestMailboxListGlobalOnly:
"""The endpoint refuses any non-global api_key channel."""
def test_maildomain_scope_returns_403(self, client, domain):
channel, plaintext = _make_api_key_channel(
scope_level=ChannelScopeLevel.MAILDOMAIN,
maildomain=domain,
)
response = client.get(
MAILBOX_URL,
{"user_email": "alice@oidc.example.com"},
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY=plaintext,
)
assert response.status_code == 403
def test_mailbox_scope_returns_403(self, client, mailbox):
channel, plaintext = _make_api_key_channel(
scope_level=ChannelScopeLevel.MAILBOX,
mailbox=mailbox,
)
response = client.get(
MAILBOX_URL,
{"email": "contact@company.com"},
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY=plaintext,
)
assert response.status_code == 403

View File

@@ -1,14 +1,26 @@
"""Tests for the provisioning maildomains endpoint."""
# pylint: disable=redefined-outer-name
import uuid
from django.urls import reverse
import pytest
from core.factories import MailDomainFactory
from core.enums import ChannelApiKeyScope, ChannelScopeLevel
from core.factories import MailDomainFactory, make_api_key_channel
from core.models import MailDomain
def _make_api_key_channel(
scopes=(ChannelApiKeyScope.MAILDOMAINS_CREATE.value,), **kwargs
):
"""Wrapper around the shared factory pre-loaded with the
maildomains-write scope used by every test in this module."""
kwargs.setdefault("name", "provisioning-test")
return make_api_key_channel(scopes=scopes, **kwargs)
@pytest.fixture
def url():
"""Returns the URL for the provisioning maildomains endpoint."""
@@ -16,46 +28,68 @@ def url():
@pytest.fixture
def auth_header(settings):
"""Returns the authentication header for the provisioning endpoint."""
settings.PROVISIONING_API_KEY = "test-provisioning-key"
return {"HTTP_AUTHORIZATION": "Bearer test-provisioning-key"}
def auth_header():
"""Global-scope api_key with ChannelApiKeyScope.MAILDOMAINS_CREATE."""
channel, plaintext = _make_api_key_channel()
return {
"HTTP_X_CHANNEL_ID": str(channel.id),
"HTTP_X_API_KEY": plaintext,
}
# -- Authentication tests --
@pytest.mark.django_db
def test_provisioning_no_auth_returns_403(client, url):
"""Request without Authorization header returns 403."""
def test_provisioning_no_auth_returns_401(client, url):
"""Request without X-Channel-Id/X-API-Key returns 401 (not authenticated)."""
response = client.post(
url, data={"domains": ["test.fr"]}, content_type="application/json"
)
assert response.status_code == 403
assert response.status_code == 401
@pytest.mark.django_db
def test_provisioning_wrong_token_returns_403(client, url, settings):
"""Request with wrong token returns 403."""
settings.PROVISIONING_API_KEY = "correct-key"
def test_provisioning_wrong_token_returns_401(client, url):
"""Request with wrong token returns 401 (invalid credentials)."""
channel, _plaintext = _make_api_key_channel()
response = client.post(
url,
data={"domains": ["test.fr"]},
content_type="application/json",
HTTP_AUTHORIZATION="Bearer wrong-key",
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY="not-the-real-key",
)
assert response.status_code == 403
assert response.status_code == 401
@pytest.mark.django_db
def test_provisioning_no_key_configured_returns_403(client, url, settings):
"""When PROVISIONING_API_KEY is not configured, returns 403."""
settings.PROVISIONING_API_KEY = None
def test_provisioning_unknown_channel_returns_401(client, url):
"""Unknown channel id returns 401 (invalid credentials)."""
response = client.post(
url,
data={"domains": ["test.fr"]},
content_type="application/json",
HTTP_AUTHORIZATION="Bearer some-key",
HTTP_X_CHANNEL_ID=str(uuid.uuid4()),
HTTP_X_API_KEY="anything",
)
assert response.status_code == 401
@pytest.mark.django_db
def test_provisioning_non_global_scope_returns_403(client, url):
"""A maildomain-scope key cannot call maildomain provisioning."""
domain = MailDomainFactory(name="test.fr")
channel, plaintext = _make_api_key_channel(
scope_level=ChannelScopeLevel.MAILDOMAIN,
maildomain=domain,
)
response = client.post(
url,
data={"domains": ["new.fr"]},
content_type="application/json",
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY=plaintext,
)
assert response.status_code == 403

View File

@@ -0,0 +1,658 @@
"""Tests for the generic email submission endpoint (POST /submit/)."""
# pylint: disable=redefined-outer-name,missing-function-docstring,unused-argument,import-outside-toplevel
import uuid
from unittest.mock import MagicMock, patch
import pytest
from dkim import verify as dkim_verify
from core.enums import ChannelApiKeyScope, ChannelScopeLevel
from core.factories import MailboxFactory, MailDomainFactory, make_api_key_channel
from core.mda.signing import generate_dkim_key
SUBMIT_URL = "/api/v1.0/submit/"
MINIMAL_MIME = (
b"From: contact@company.com\r\n"
b"To: attendee@example.com\r\n"
b"Subject: Team Meeting\r\n"
b"Message-ID: <test-123@company.com>\r\n"
b"Date: Mon, 30 Mar 2026 10:00:00 +0000\r\n"
b"MIME-Version: 1.0\r\n"
b"Content-Type: text/plain; charset=utf-8\r\n"
b"\r\n"
b"Hello world\r\n"
)
CREATE_MSG_MOCK = "core.api.viewsets.submit._create_message_from_inbound"
PREPARE_MOCK = "core.api.viewsets.submit.prepare_outbound_message"
TASK_MOCK = "core.api.viewsets.submit.send_message_task"
def _make_api_key_channel(**kwargs):
"""Thin wrapper around the shared factory pre-loaded with the
submit-endpoint default scope (messages:send)."""
kwargs.setdefault("scopes", (ChannelApiKeyScope.MESSAGES_SEND.value,))
kwargs.setdefault("name", "test-key")
return make_api_key_channel(**kwargs)
@pytest.fixture
def auth_header():
"""Build a global-scope api_key with messages:send and return the auth headers."""
channel, plaintext = _make_api_key_channel()
return {
"HTTP_X_CHANNEL_ID": str(channel.id),
"HTTP_X_API_KEY": plaintext,
}
@pytest.fixture
def domain():
return MailDomainFactory(name="company.com")
@pytest.fixture
def mailbox(domain):
return MailboxFactory(local_part="contact", domain=domain)
# =============================================================================
# Authentication
# =============================================================================
@pytest.mark.django_db
class TestSubmitAuth:
"""Authentication tests for the submit endpoint."""
def test_no_auth_returns_401(self, client, mailbox):
"""No auth headers → 401 via DRF NotAuthenticated."""
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
)
assert response.status_code == 401
def test_wrong_token_returns_401(self, client, mailbox):
"""Invalid credentials are an authentication failure → 401."""
channel, _plaintext = _make_api_key_channel()
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY="not-the-real-key",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
)
assert response.status_code == 401
def test_unknown_channel_returns_401(self, client, mailbox):
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_CHANNEL_ID=str(uuid.uuid4()),
HTTP_X_API_KEY="anything",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
)
assert response.status_code == 401
def test_missing_scope_returns_403(self, client, mailbox):
channel, plaintext = _make_api_key_channel(
scopes=(ChannelApiKeyScope.MAILBOXES_READ.value,),
)
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY=plaintext,
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
)
assert response.status_code == 403
def test_mailbox_scope_wrong_mailbox_returns_403(self, client, domain, mailbox):
other_mailbox = MailboxFactory(local_part="other", domain=domain)
channel, plaintext = _make_api_key_channel(
scope_level=ChannelScopeLevel.MAILBOX,
mailbox=other_mailbox,
)
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME, # from contact@company.com
content_type="message/rfc822",
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY=plaintext,
HTTP_X_MAIL_FROM=str(mailbox.id), # contact@company.com
HTTP_X_RCPT_TO="attendee@example.com",
)
assert response.status_code == 403
def test_maildomain_scope_wrong_domain_returns_403(self, client, mailbox):
other_domain = MailDomainFactory(name="other.test")
channel, plaintext = _make_api_key_channel(
scope_level=ChannelScopeLevel.MAILDOMAIN,
maildomain=other_domain,
)
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY=plaintext,
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
)
assert response.status_code == 403
def test_get_method_not_allowed(self, client, auth_header, mailbox):
response = client.get(
SUBMIT_URL,
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
**auth_header,
)
assert response.status_code == 405
@pytest.mark.parametrize(
"role_name",
["VIEWER", "EDITOR"],
)
def test_user_scope_non_sending_role_cannot_submit(
self, client, mailbox, role_name
):
"""Regression: a user-scope api_key whose owner does not have a
SENDER-or-better role on the mailbox MUST NOT be able to submit.
Both VIEWER and EDITOR are below the threshold —
``MAILBOX_ROLES_CAN_SEND = [SENDER, ADMIN]`` — so neither can
send through a personal api_key. The fix is in
``Channel.api_key_covers``'s ``mailbox_roles=`` path."""
from core.enums import MailboxRoleChoices
from core.factories import MailboxAccessFactory, UserFactory
owner = UserFactory(email=f"{role_name.lower()}@oidc.example.com")
MailboxAccessFactory(
mailbox=mailbox,
user=owner,
role=getattr(MailboxRoleChoices, role_name),
)
channel, plaintext = _make_api_key_channel(
scope_level=ChannelScopeLevel.USER,
user=owner,
name=f"{role_name.lower()}-personal",
)
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY=plaintext,
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
)
assert response.status_code == 403, response.content
@pytest.mark.parametrize(
"role_name",
["SENDER", "ADMIN"],
)
def test_user_scope_sending_role_can_submit(self, client, mailbox, role_name):
"""Companion to the negative test: a user-scope api_key whose
owner has SENDER or ADMIN access *is* allowed through. Both roles
are in ``MAILBOX_ROLES_CAN_SEND``. The pipeline is mocked so this
test only exercises the auth+permission+covers layer."""
from core.enums import MailboxRoleChoices
from core.factories import MailboxAccessFactory, UserFactory
owner = UserFactory(email=f"{role_name.lower()}@oidc.example.com")
MailboxAccessFactory(
mailbox=mailbox,
user=owner,
role=getattr(MailboxRoleChoices, role_name),
)
channel, plaintext = _make_api_key_channel(
scope_level=ChannelScopeLevel.USER,
user=owner,
name=f"{role_name.lower()}-personal",
)
fake_message = MagicMock()
fake_message.id = uuid.uuid4()
fake_message.recipients.values_list.return_value = []
with (
patch(CREATE_MSG_MOCK, return_value=fake_message),
patch(PREPARE_MOCK, return_value=True),
patch(TASK_MOCK),
):
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_CHANNEL_ID=str(channel.id),
HTTP_X_API_KEY=plaintext,
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
)
assert response.status_code == 202, response.content
# =============================================================================
# Validation
# =============================================================================
@pytest.mark.django_db
class TestSubmitValidation:
"""Input validation tests."""
def test_missing_both_headers_returns_400(self, client, auth_header):
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
**auth_header,
)
assert response.status_code == 400
assert "X-Mail-From" in response.json()["detail"]
def test_missing_x_rcpt_to_returns_400(self, client, auth_header, mailbox):
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(mailbox.id),
**auth_header,
)
assert response.status_code == 400
def test_missing_x_mail_from_returns_400(self, client, auth_header):
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_RCPT_TO="attendee@example.com",
**auth_header,
)
assert response.status_code == 400
def test_unknown_mailbox_returns_404(self, client, auth_header):
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(uuid.uuid4()),
HTTP_X_RCPT_TO="attendee@example.com",
**auth_header,
)
assert response.status_code == 404
def test_invalid_uuid_in_x_mail_from_returns_404(self, client, auth_header):
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_MAIL_FROM="not-a-uuid",
HTTP_X_RCPT_TO="attendee@example.com",
**auth_header,
)
assert response.status_code == 404
def test_empty_body_returns_400(self, client, auth_header, mailbox):
response = client.post(
SUBMIT_URL,
data=b"",
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
**auth_header,
)
assert response.status_code == 400
def test_sender_mismatch_returns_403(self, client, auth_header, domain):
other_mailbox = MailboxFactory(local_part="other", domain=domain)
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME, # From: contact@company.com
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(other_mailbox.id), # other@company.com
HTTP_X_RCPT_TO="attendee@example.com",
**auth_header,
)
assert response.status_code == 403
def test_empty_rcpt_to_returns_400(self, client, auth_header, mailbox):
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO=" , ",
**auth_header,
)
assert response.status_code == 400
# =============================================================================
# Message creation + DKIM signing + async dispatch
# =============================================================================
@pytest.mark.django_db
class TestSubmitDispatch:
"""Verify message creation, synchronous signing, and async dispatch."""
def _fake_message(self):
"""Create a fake Message with a recipients manager stub."""
msg = MagicMock()
msg.id = uuid.uuid4()
msg.recipients.values_list.return_value = ["attendee@example.com"]
return msg
@patch(TASK_MOCK)
@patch(PREPARE_MOCK, return_value=True)
@patch(CREATE_MSG_MOCK)
def test_accepted(
self, mock_create, mock_prepare, mock_task, client, auth_header, mailbox
):
fake_message = self._fake_message()
mock_create.return_value = fake_message
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
**auth_header,
)
assert response.status_code == 202
data = response.json()
assert data["status"] == "accepted"
assert data["message_id"] == str(fake_message.id)
# Message created with is_outbound=True
mock_create.assert_called_once()
assert mock_create.call_args[1]["is_outbound"] is True
assert mock_create.call_args[1]["mailbox"] == mailbox
# prepare_outbound_message called with raw_mime
mock_prepare.assert_called_once()
assert mock_prepare.call_args[1]["raw_mime"] == MINIMAL_MIME
# Async task dispatched
mock_task.delay.assert_called_once_with(str(fake_message.id))
@patch(TASK_MOCK)
@patch(PREPARE_MOCK, return_value=True)
@patch(CREATE_MSG_MOCK)
def test_create_message_failure_returns_500(
self, mock_create, mock_prepare, mock_task, client, auth_header, mailbox
):
mock_create.return_value = None
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
**auth_header,
)
assert response.status_code == 500
assert "create message" in response.json()["detail"].lower()
mock_prepare.assert_not_called()
mock_task.delay.assert_not_called()
@patch(TASK_MOCK)
@patch(PREPARE_MOCK, return_value=False)
@patch(CREATE_MSG_MOCK)
def test_prepare_failure_returns_500(
self, mock_create, mock_prepare, mock_task, client, auth_header, mailbox
):
mock_create.return_value = self._fake_message()
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
**auth_header,
)
assert response.status_code == 500
assert "prepare" in response.json()["detail"].lower()
mock_task.delay.assert_not_called()
# =============================================================================
# Integration — real pipeline, only mock the async SMTP delivery
# =============================================================================
@pytest.mark.django_db
class TestSubmitIntegration:
"""End-to-end tests that run the full pipeline (message creation, DKIM
signing, blob storage) and only mock the final async SMTP task."""
@patch(TASK_MOCK)
def test_full_pipeline(self, mock_task, client, auth_header, mailbox):
"""Submit creates a Message with thread, recipients, blob, and dispatches delivery."""
mailbox_email = str(mailbox)
# X-Rcpt-To matches the To: header in MINIMAL_MIME (attendee@example.com)
rcpt_to = "attendee@example.com"
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO=rcpt_to,
**auth_header,
)
assert response.status_code == 202
data = response.json()
assert data["status"] == "accepted"
message_id = data["message_id"]
# Verify message in DB
from core.models import Message, ThreadAccess
message = Message.objects.get(id=message_id)
assert message.subject == "Team Meeting"
assert message.is_sender is True
assert message.is_draft is False # finalized by prepare_outbound_message
assert message.sender.email == mailbox_email
assert message.blob is not None # DKIM-signed MIME stored
# Thread was created and mailbox has access
assert message.thread is not None
assert ThreadAccess.objects.filter(
thread=message.thread, mailbox=mailbox
).exists()
# Recipient was created (from the parsed To: header)
assert message.recipients.filter(contact__email=rcpt_to).exists()
# Async delivery dispatched
mock_task.delay.assert_called_once_with(str(message.id))
@patch(TASK_MOCK)
def test_multiple_recipients_creates_all(
self, mock_task, client, auth_header, mailbox
):
"""Multiple X-Rcpt-To addresses each get a recipient record."""
mailbox_email = str(mailbox)
mime = (
f"From: {mailbox_email}\r\n"
f"To: a@example.com, b@example.com\r\n"
f"Subject: Multi\r\n"
f"Message-ID: <multi@example.com>\r\n"
f"Date: Mon, 30 Mar 2026 10:00:00 +0000\r\n"
f"MIME-Version: 1.0\r\n"
f"Content-Type: text/plain\r\n"
f"\r\n"
f"body\r\n"
).encode()
response = client.post(
SUBMIT_URL,
data=mime,
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="a@example.com, b@example.com",
**auth_header,
)
assert response.status_code == 202
from core.models import Message
message = Message.objects.get(id=response.json()["message_id"])
recipient_emails = set(
message.recipients.values_list("contact__email", flat=True)
)
assert "a@example.com" in recipient_emails
assert "b@example.com" in recipient_emails
@patch(TASK_MOCK)
def test_bcc_via_envelope(self, mock_task, client, auth_header, mailbox):
"""BCC works via X-Rcpt-To: the recipient is in the envelope but NOT
in the MIME headers — just like real SMTP BCC."""
mailbox_email = str(mailbox)
# MIME only has To: visible@example.com — no Bcc header
mime = (
f"From: {mailbox_email}\r\n"
f"To: visible@example.com\r\n"
f"Subject: With Bcc\r\n"
f"Message-ID: <bcc@example.com>\r\n"
f"Date: Mon, 30 Mar 2026 10:00:00 +0000\r\n"
f"MIME-Version: 1.0\r\n"
f"Content-Type: text/plain\r\n"
f"\r\n"
f"body\r\n"
).encode()
# X-Rcpt-To includes both visible and hidden (BCC) recipients
response = client.post(
SUBMIT_URL,
data=mime,
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="visible@example.com, hidden@example.com",
**auth_header,
)
assert response.status_code == 202
from core.enums import MessageRecipientTypeChoices
from core.models import Message
message = Message.objects.get(id=response.json()["message_id"])
# visible@example.com comes from MIME To: header
assert message.recipients.filter(
contact__email="visible@example.com",
type=MessageRecipientTypeChoices.TO,
).exists()
# hidden@example.com comes from X-Rcpt-To only — added as BCC
assert message.recipients.filter(
contact__email="hidden@example.com",
type=MessageRecipientTypeChoices.BCC,
).exists()
@patch(TASK_MOCK)
def test_cc_recipients_created(self, mock_task, client, auth_header, mailbox):
"""Cc recipients from MIME headers are created as MessageRecipient rows."""
mailbox_email = str(mailbox)
mime = (
f"From: {mailbox_email}\r\n"
f"To: to@example.com\r\n"
f"Cc: cc@example.com\r\n"
f"Subject: With Cc\r\n"
f"Message-ID: <cc@example.com>\r\n"
f"Date: Mon, 30 Mar 2026 10:00:00 +0000\r\n"
f"MIME-Version: 1.0\r\n"
f"Content-Type: text/plain\r\n"
f"\r\n"
f"body\r\n"
).encode()
response = client.post(
SUBMIT_URL,
data=mime,
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="to@example.com, cc@example.com",
**auth_header,
)
assert response.status_code == 202
from core.enums import MessageRecipientTypeChoices
from core.models import Message
message = Message.objects.get(id=response.json()["message_id"])
assert message.recipients.filter(
contact__email="to@example.com",
type=MessageRecipientTypeChoices.TO,
).exists()
assert message.recipients.filter(
contact__email="cc@example.com",
type=MessageRecipientTypeChoices.CC,
).exists()
@patch(TASK_MOCK)
def test_stored_blob_is_dkim_signed(self, mock_task, client, auth_header, mailbox):
"""The blob persisted for the outbound message must be DKIM-signed
with the domain's active key and verifiable against its public key."""
from core.models import DKIMKey, Message
private_key, public_key = generate_dkim_key(key_size=1024)
dkim_key = DKIMKey.objects.create(
selector="testselector",
private_key=private_key,
public_key=public_key,
key_size=1024,
is_active=True,
domain=mailbox.domain,
)
response = client.post(
SUBMIT_URL,
data=MINIMAL_MIME,
content_type="message/rfc822",
HTTP_X_MAIL_FROM=str(mailbox.id),
HTTP_X_RCPT_TO="attendee@example.com",
**auth_header,
)
assert response.status_code == 202
message = Message.objects.get(id=response.json()["message_id"])
stored = message.blob.get_content()
# Header is present and prepended before the original MIME.
assert stored.startswith(b"DKIM-Signature:"), stored[:200]
# Verify the signature cryptographically using the stored public key.
def get_dns_txt(fqdn, **kwargs):
if fqdn == b"testselector._domainkey.%s." % mailbox.domain.name.encode():
return f"v=DKIM1; k=rsa; p={dkim_key.public_key}".encode()
return None
assert dkim_verify(stored, dnsfunc=get_dns_txt), (
"DKIM verification failed on stored blob"
)

View File

@@ -6,7 +6,7 @@ from django.urls import include, path
from rest_framework.routers import DefaultRouter
from core.api.viewsets.blob import BlobViewSet
from core.api.viewsets.channel import ChannelViewSet
from core.api.viewsets.channel import ChannelViewSet, UserChannelViewSet
from core.api.viewsets.config import ConfigView
from core.api.viewsets.contacts import ContactViewSet
from core.api.viewsets.draft import DraftMessageView
@@ -37,8 +37,12 @@ from core.api.viewsets.metrics import (
MailDomainUsersMetricsApiView,
)
from core.api.viewsets.placeholder import DraftPlaceholderView, PlaceholderView
from core.api.viewsets.provisioning import ProvisioningMailDomainView
from core.api.viewsets.provisioning import (
ProvisioningMailboxView,
ProvisioningMailDomainView,
)
from core.api.viewsets.send import SendMessageView
from core.api.viewsets.submit import SubmitRawEmailView
from core.api.viewsets.task import TaskDetailView
from core.api.viewsets.thread import ThreadViewSet
from core.api.viewsets.thread_access import ThreadAccessViewSet
@@ -49,6 +53,7 @@ from core.authentication.urls import urlpatterns as oidc_urls
# - Main endpoints
router = DefaultRouter()
router.register("users/me/channels", UserChannelViewSet, basename="user-channels")
router.register("users", UserViewSet, basename="users")
router.register("messages", MessageViewSet, basename="messages")
router.register("blob", BlobViewSet, basename="blob")
@@ -139,6 +144,8 @@ mailbox_channel_nested_router.register(
ChannelViewSet,
basename="mailbox-channels",
)
# /users/me/channels/ is registered directly on the main `router` above —
# no nested router needed for a single viewset.
urlpatterns = [
path(
@@ -255,6 +262,17 @@ urlpatterns = [
ProvisioningMailDomainView.as_view(),
name="provisioning-maildomains",
),
path(
f"api/{settings.API_VERSION}/submit/",
SubmitRawEmailView.as_view(),
name="submit-email",
),
# Provisioning: mailbox and user lookup (service-to-service, API key auth)
path(
f"api/{settings.API_VERSION}/provisioning/mailboxes/",
ProvisioningMailboxView.as_view(),
name="provisioning-mailboxes",
),
# Alias for MTA check endpoint
path(
f"api/{settings.API_VERSION}/mta/check-recipients/",

View File

@@ -798,6 +798,9 @@ class Base(Configuration):
None, environ_name="PROMETHEUS_API_KEY", environ_prefix=None
)
# DEPRECATED: ignored since global api_key Channels landed.
# Kept only so AppConfig.ready() can emit a deprecation warning when
# either env var is set. Migrate to a global api_key Channel.
METRICS_API_KEY = values.Value(
None, environ_name="METRICS_API_KEY", environ_prefix=None
)
@@ -843,8 +846,15 @@ class Base(Configuration):
FEATURE_IMPORT_MESSAGES = values.BooleanValue(
default=True, environ_name="FEATURE_IMPORT_MESSAGES", environ_prefix=None
)
# NOTE: "webhook" is intentionally NOT in the default list — the
# outbound webhook delivery pipeline is not wired yet. Keeping the
# type creatable would let users mint dead-letter channels that look
# functional. Add "webhook" here once core/mda/webhook_tasks.py and
# the post_save signal land.
FEATURE_MAILBOX_ADMIN_CHANNELS = values.ListValue(
default=[], environ_name="FEATURE_MAILBOX_ADMIN_CHANNELS", environ_prefix=None
default=["api_key"],
environ_name="FEATURE_MAILBOX_ADMIN_CHANNELS",
environ_prefix=None,
)
FEATURE_MAILDOMAIN_CREATE = values.BooleanValue(
default=True, environ_name="FEATURE_MAILDOMAIN_CREATE", environ_prefix=None

File diff suppressed because it is too large Load Diff

View File

@@ -5,6 +5,7 @@
* This is the messages API schema.
* OpenAPI spec version: 1.0.0 (v1.0)
*/
import type { ScopeLevelEnum } from "./scope_level_enum";
/**
* Serialize Channel model.
@@ -21,7 +22,8 @@ export interface Channel {
* Type of channel
* @maxLength 255
*/
type?: string;
type: string;
readonly scope_level: ScopeLevelEnum;
/** Channel-specific configuration settings */
settings?: unknown;
/**
@@ -34,6 +36,13 @@ export interface Channel {
* @nullable
*/
readonly maildomain: string | null;
/**
* primary key for the record as UUID
* @nullable
*/
readonly user: string | null;
/** @nullable */
readonly last_used_at: string | null;
/** date and time at which a record was created */
readonly created_at: string;
/** date and time at which a record was last updated */

View File

@@ -21,7 +21,7 @@ export interface ChannelRequest {
* @minLength 1
* @maxLength 255
*/
type?: string;
type: string;
/** Channel-specific configuration settings */
settings?: unknown;
}

View File

@@ -121,10 +121,12 @@ export * from "./patched_thread_event_request_data_one_of_mentions_item";
export * from "./placeholders_retrieve200";
export * from "./read_message_template";
export * from "./read_message_template_metadata";
export * from "./regenerated_api_key_response";
export * from "./reset_password_error";
export * from "./reset_password_internal_server_error";
export * from "./reset_password_not_found";
export * from "./reset_password_response";
export * from "./scope_level_enum";
export * from "./send_create400";
export * from "./send_create403";
export * from "./send_create503";

View File

@@ -50,6 +50,8 @@ export interface Message {
readonly is_archived: boolean;
readonly has_attachments: boolean;
/** @nullable */
readonly mime_id: string | null;
/** @nullable */
readonly signature: MessageSignature;
/** Return the STMSG headers of the message. */
readonly stmsg_headers: MessageStmsgHeaders;

View File

@@ -0,0 +1,14 @@
/**
* Generated by orval 🍺
* Do not edit manually.
* messages API
* This is the messages API schema.
* OpenAPI spec version: 1.0.0 (v1.0)
*/
export interface RegeneratedApiKeyResponse {
/** Channel id (also the X-Channel-Id header value). */
id: string;
/** Freshly generated plaintext api_key. Returned ONCE on regeneration and cannot be retrieved later. */
api_key: string;
}

View File

@@ -0,0 +1,24 @@
/**
* Generated by orval 🍺
* Do not edit manually.
* messages API
* This is the messages API schema.
* OpenAPI spec version: 1.0.0 (v1.0)
*/
/**
* * `global` - Global
* `maildomain` - Maildomain
* `mailbox` - Mailbox
* `user` - User
*/
export type ScopeLevelEnum =
(typeof ScopeLevelEnum)[keyof typeof ScopeLevelEnum];
// eslint-disable-next-line @typescript-eslint/no-redeclare
export const ScopeLevelEnum = {
global: "global",
maildomain: "maildomain",
mailbox: "mailbox",
user: "user",
} as const;