mirror of
https://github.com/suitenumerique/messages.git
synced 2026-04-25 17:15:21 +02:00
🐛(global) handle non-serializable celery task errors and stop infinite polling (#633)
When a Celery worker crashes (e.g. OOM during large imports), the task result contains a raw exception object (WorkerLostError) that is not JSON-serializable, causing a 500 on the task status endpoint. The frontend polling hooks never received a FAILURE status and kept polling indefinitely. Backend: convert exception objects in task results to serializable strings instead of letting DRF fail on serialization. Frontend: stop polling on API errors and surface the failure state to consumers so they can display appropriate error UI immediately.
This commit is contained in:
committed by
GitHub
parent
631522b388
commit
730fea15ee
@@ -103,6 +103,16 @@ class TaskDetailView(APIView):
|
||||
result_data["status"] = task_result.result["status"]
|
||||
result_data["result"] = task_result.result["result"]
|
||||
result_data["error"] = task_result.result["error"]
|
||||
elif isinstance(task_result.result, Exception):
|
||||
logger.exception(
|
||||
"Task %s failed with unhandled exception", task_id,
|
||||
exc_info=task_result.result,
|
||||
)
|
||||
result_data["status"] = "FAILURE"
|
||||
result_data["result"] = None
|
||||
result_data["error"] = (
|
||||
str(task_result.result) or type(task_result.result).__name__
|
||||
)
|
||||
else:
|
||||
result_data["result"] = task_result.result
|
||||
if task_result.state == "PROGRESS" and task_result.info:
|
||||
|
||||
@@ -81,6 +81,56 @@ class TestTaskDetailViewPermissions:
|
||||
assert response.data["result"]["imported"] == 42
|
||||
|
||||
|
||||
def test_api_task_detail_worker_crash_exception_should_be_serialized(self):
|
||||
"""Test that a task with an exception result (e.g. WorkerLostError)
|
||||
returns a properly serialized error instead of a 500."""
|
||||
user = factories.UserFactory()
|
||||
client = APIClient()
|
||||
client.force_authenticate(user=user)
|
||||
|
||||
task_id = "test-task-crashed-worker"
|
||||
register_task_owner(task_id, user.id)
|
||||
url = reverse("task-detail", kwargs={"task_id": task_id})
|
||||
|
||||
with mock.patch("core.api.viewsets.task.AsyncResult") as mock_async_result:
|
||||
mock_result = mock.MagicMock()
|
||||
mock_result.status = "FAILURE"
|
||||
mock_result.state = "FAILURE"
|
||||
mock_result.result = Exception("Worker lost")
|
||||
mock_result.info = None
|
||||
mock_async_result.return_value = mock_result
|
||||
|
||||
response = client.get(url)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.data["status"] == "FAILURE"
|
||||
assert response.data["result"] is None
|
||||
assert response.data["error"] == "Worker lost"
|
||||
|
||||
def test_api_task_detail_exception_without_message_should_use_class_name(self):
|
||||
"""Test that an exception with an empty str() uses the class name."""
|
||||
user = factories.UserFactory()
|
||||
client = APIClient()
|
||||
client.force_authenticate(user=user)
|
||||
|
||||
task_id = "test-task-empty-exception"
|
||||
register_task_owner(task_id, user.id)
|
||||
url = reverse("task-detail", kwargs={"task_id": task_id})
|
||||
|
||||
with mock.patch("core.api.viewsets.task.AsyncResult") as mock_async_result:
|
||||
mock_result = mock.MagicMock()
|
||||
mock_result.status = "FAILURE"
|
||||
mock_result.state = "FAILURE"
|
||||
mock_result.result = ConnectionResetError()
|
||||
mock_result.info = None
|
||||
mock_async_result.return_value = mock_result
|
||||
|
||||
response = client.get(url)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.data["status"] == "FAILURE"
|
||||
assert response.data["result"] is None
|
||||
assert response.data["error"] == "ConnectionResetError"
|
||||
|
||||
|
||||
class TestImportViewSetPermissions:
|
||||
"""Test that ImportViewSet enforces proper role checks."""
|
||||
|
||||
|
||||
@@ -25,6 +25,8 @@
|
||||
"{{count}} messages mentioning you_other": "{{count}} messages mentioning you",
|
||||
"{{count}} messages of this thread have been deleted._one": "{{count}} message of this thread has been deleted.",
|
||||
"{{count}} messages of this thread have been deleted._other": "{{count}} messages of this thread have been deleted.",
|
||||
"{{count}} messages were imported before the error._one": "{{count}} message were imported before the error.",
|
||||
"{{count}} messages were imported before the error._other": "{{count}} messages were imported before the error.",
|
||||
"{{count}} minutes ago_one": "{{count}} minute ago",
|
||||
"{{count}} minutes ago_other": "{{count}} minutes ago",
|
||||
"{{count}} months ago_one": "{{count}} month ago",
|
||||
@@ -673,6 +675,7 @@
|
||||
"You are the last editor of this thread, you cannot therefore modify your access.": "You are the last editor of this thread, you cannot therefore modify your access.",
|
||||
"You can close this window and continue using the app.": "You can close this window and continue using the app.",
|
||||
"You can now inform the person that their mailbox is ready to be used and communicate the instructions for authentication.": "You can now inform the person that their mailbox is ready to be used and communicate the instructions for authentication.",
|
||||
"You can safely retry the import — messages already imported will not be duplicated.": "You can safely retry the import — messages already imported will not be duplicated.",
|
||||
"You cannot delete the last editor of this thread": "You cannot delete the last editor of this thread",
|
||||
"You cannot modify it.": "You cannot modify it.",
|
||||
"You have {{count}} recipients, which exceeds the maximum of {{max}} recipients per message. The message cannot be sent until you reduce the number of recipients._one": "You have {{count}} recipient, which exceeds the maximum of {{max}} recipients per message. The message cannot be sent until you reduce the number of recipients.",
|
||||
|
||||
@@ -38,6 +38,9 @@
|
||||
"{{count}} messages of this thread have been deleted._one": "{{count}} message de cette conversation a été supprimé.",
|
||||
"{{count}} messages of this thread have been deleted._many": "{{count}} messages de cette conversation ont été supprimés.",
|
||||
"{{count}} messages of this thread have been deleted._other": "{{count}} messages de cette conversation ont été supprimés.",
|
||||
"{{count}} messages were imported before the error._one": "{{count}} message a été importé avant l'erreur.",
|
||||
"{{count}} messages were imported before the error._many": "{{count}} messages ont été importés avant l'erreur.",
|
||||
"{{count}} messages were imported before the error._other": "{{count}} messages ont été importés avant l'erreur.",
|
||||
"{{count}} minutes ago_one": "il y a {{count}} minute",
|
||||
"{{count}} minutes ago_many": "il y a {{count}} minutes",
|
||||
"{{count}} minutes ago_other": "il y a {{count}} minutes",
|
||||
@@ -734,6 +737,7 @@
|
||||
"You are the last editor of this thread, you cannot therefore modify your access.": "Vous êtes le dernier éditeur de cette conversation, vous ne pouvez donc pas modifier votre accès.",
|
||||
"You can close this window and continue using the app.": "Vous pouvez fermer cette fenêtre et continuer à utiliser l'application.",
|
||||
"You can now inform the person that their mailbox is ready to be used and communicate the instructions for authentication.": "Vous pouvez désormais prévenir la personne que sa boîte aux lettres est prête à être utilisée et lui communiquer les instructions pour s'authentifier.",
|
||||
"You can safely retry the import — messages already imported will not be duplicated.": "Vous pouvez relancer l'import en toute sécurité — les messages déjà importés ne seront pas dupliqués.",
|
||||
"You cannot delete the last editor of this thread": "Vous ne pouvez pas supprimer le dernier éditeur de cette conversation",
|
||||
"You cannot modify it.": "Vous ne pouvez pas la modifier.",
|
||||
"You have {{count}} recipients, which exceeds the maximum of {{max}} recipients per message. The message cannot be sent until you reduce the number of recipients._one": "Vous avez {{count}} destinataire, ce qui dépasse le maximum de {{max}} destinataires autorisés par message. Le message ne peut pas être envoyé tant que vous n'avez pas réduit le nombre de destinataires.",
|
||||
|
||||
@@ -49,14 +49,23 @@ export const StepLoader = ({ taskId, onComplete, onError }: StepLoaderProps) =>
|
||||
onCompleteRef.current();
|
||||
} else if (importStatus?.state === StatusEnum.FAILURE) {
|
||||
const error = importStatus?.error || '';
|
||||
let errorKey = t('An error occurred while importing messages.');
|
||||
if (
|
||||
const isAuthError =
|
||||
error.includes("AUTHENTICATIONFAILED") ||
|
||||
error.includes("IMAP authentication failed")
|
||||
) {
|
||||
errorKey = t('Authentication failed. Please check your credentials and ensure you have enabled IMAP connections in your account.');
|
||||
error.includes("IMAP authentication failed");
|
||||
|
||||
const parts: string[] = [];
|
||||
|
||||
if (isAuthError) {
|
||||
parts.push(t('Authentication failed. Please check your credentials and ensure you have enabled IMAP connections in your account.'));
|
||||
} else {
|
||||
parts.push(t('An error occurred while importing messages.'));
|
||||
if (importStatus.successCount > 0) {
|
||||
parts.push(t('{{count}} messages were imported before the error.', { count: importStatus.successCount }));
|
||||
}
|
||||
parts.push(t('You can safely retry the import — messages already imported will not be duplicated.'));
|
||||
}
|
||||
onErrorRef.current(errorKey);
|
||||
|
||||
onErrorRef.current(parts.join(' '));
|
||||
}
|
||||
}, [importStatus?.state, t]);
|
||||
|
||||
|
||||
@@ -169,7 +169,7 @@ const ApplicationMenu = () => {
|
||||
}
|
||||
if (taskStatus.state === StatusEnum.FAILURE) {
|
||||
label = t("Import failed");
|
||||
icon = <Icon name="error" type={IconType.OUTLINED} />;
|
||||
icon = <Icon name="error" type={IconType.OUTLINED} style={{ color: "var(--c--contextuals--content--semantic--error--primary)" }} />;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -45,12 +45,26 @@ export const QueueMessage = ({ taskId, onSettled }: QueueMessageProps) => {
|
||||
}, []);
|
||||
|
||||
useEffect(() => {
|
||||
if (taskQuery.isError) {
|
||||
toast.update(toastId, {
|
||||
render: (
|
||||
<ToasterItem type="error">
|
||||
<span className="material-icons">error</span>
|
||||
<span>{t('The message could not be sent.')}</span>
|
||||
</ToasterItem>
|
||||
),
|
||||
autoClose: QUEUED_MESSAGE_CLOSE_DELAY * 2,
|
||||
});
|
||||
onSettled?.();
|
||||
return;
|
||||
}
|
||||
|
||||
const status_code = taskQuery?.data?.status;
|
||||
|
||||
|
||||
if (!status_code) return;
|
||||
|
||||
setRetryCount(retryCount => retryCount + 1);
|
||||
|
||||
|
||||
const status = taskQuery.data!.data.status;
|
||||
|
||||
if (status === StatusEnum.SUCCESS) {
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
import { StatusEnum, useTasksRetrieve } from "@/features/api/gen";
|
||||
import { TaskMetadata } from "@/features/controlled-modals/message-importer/step-loader";
|
||||
import { useEffect, useMemo, useState } from "react";
|
||||
import { useEffect, useMemo, useRef, useState } from "react";
|
||||
import { useTranslation } from "react-i18next";
|
||||
|
||||
const MAX_POLL_ERRORS = 10;
|
||||
|
||||
export function useImportTaskStatus(
|
||||
taskId: string | null,
|
||||
@@ -9,7 +12,10 @@ export function useImportTaskStatus(
|
||||
enabled = true,
|
||||
}: { refetchInterval?: number; enabled?: boolean } = {}
|
||||
) {
|
||||
const { t } = useTranslation();
|
||||
const [queryEnabled, setQueryEnabled] = useState(enabled);
|
||||
const [hasExhaustedRetries, setHasExhaustedRetries] = useState(false);
|
||||
const errorCountRef = useRef(0);
|
||||
const taskQuery = useTasksRetrieve(taskId || "", {
|
||||
query: {
|
||||
enabled: Boolean(taskId) && queryEnabled === true,
|
||||
@@ -36,20 +42,34 @@ export function useImportTaskStatus(
|
||||
}, [taskStatus, taskMetadata, hasKnownTotal]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!enabled || taskStatus === StatusEnum.FAILURE || taskStatus === StatusEnum.SUCCESS) {
|
||||
if (taskQuery.isError) {
|
||||
errorCountRef.current += 1;
|
||||
if (errorCountRef.current >= MAX_POLL_ERRORS) {
|
||||
setHasExhaustedRetries(true);
|
||||
}
|
||||
} else if (taskQuery.data) {
|
||||
errorCountRef.current = 0;
|
||||
}
|
||||
}, [taskQuery.dataUpdatedAt, taskQuery.errorUpdatedAt]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!enabled || taskStatus === StatusEnum.FAILURE || taskStatus === StatusEnum.SUCCESS || hasExhaustedRetries) {
|
||||
setQueryEnabled(false);
|
||||
} else if (enabled || taskStatus === StatusEnum.PROGRESS || taskStatus === StatusEnum.PENDING) {
|
||||
setQueryEnabled(true);
|
||||
}
|
||||
}, [taskStatus, enabled]);
|
||||
}, [taskStatus, enabled, hasExhaustedRetries]);
|
||||
|
||||
if (!taskId) return null;
|
||||
return {
|
||||
progress: progress !== null ? Math.ceil(progress) : null,
|
||||
state: taskQuery.data?.data.status,
|
||||
state: hasExhaustedRetries ? StatusEnum.FAILURE : taskQuery.data?.data.status,
|
||||
loading: taskQuery.isPending || progress === null,
|
||||
error: taskQuery.data?.data.error,
|
||||
error: hasExhaustedRetries
|
||||
? t('An error occurred while importing messages.')
|
||||
: taskQuery.data?.data.error,
|
||||
hasKnownTotal,
|
||||
currentMessage,
|
||||
successCount: taskMetadata?.success_count ?? 0,
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user