Files
django-lasuite/tests/malware_detection/tasks/test_jcop_tasks.py
Manuel Raynaud 3bdfdf2b16 ♻️(malware) log filehash when a submission fail
We want to log the file has when a submission fail in order to aggregate
all this logs to send them to JCOP.
2025-12-02 09:31:29 +01:00

182 lines
7.4 KiB
Python

"""Tests for the JCOP tasks."""
from unittest import mock
import pytest
import requests
from celery.exceptions import Retry
from lasuite.malware_detection.exceptions import MalwareDetectionInvalidAuthenticationError
from lasuite.malware_detection.tasks.jcop import MaxRetriesErrorCodes, analyse_file_async, trigger_new_analysis
@pytest.fixture
def mock_backend():
"""Create a mock for the JCOP backend."""
backend = mock.MagicMock()
handler = mock.MagicMock()
handler.return_value = backend
return backend, handler
def test_analyse_file_async_success(mock_backend):
"""Test analyse_file_async with successful check_analysis."""
backend, handler = mock_backend
backend.check_analysis.return_value = False # No retry needed
with mock.patch("lasuite.malware_detection.tasks.jcop.MalwareDetectionHandler", return_value=handler):
analyse_file_async("file.txt", file_hash="hash123")
backend.check_analysis.assert_called_once_with("file.txt", file_hash="hash123")
backend.failed_analysis.assert_not_called()
def test_analyse_file_async_retry(mock_backend):
"""Test analyse_file_async with retry needed."""
backend, handler = mock_backend
backend.check_analysis.return_value = True # Retry needed
with (
mock.patch("lasuite.malware_detection.tasks.jcop.MalwareDetectionHandler", return_value=handler),
pytest.raises(Retry),
):
analyse_file_async("file.txt")
backend.check_analysis.assert_called_once_with("file.txt", file_hash=None)
backend.failed_analysis.assert_not_called()
def test_analyse_file_async_request_exception_retry(mock_backend):
"""Test analyse_file_async with request exception and retry."""
backend, handler = mock_backend
backend.check_analysis.side_effect = requests.exceptions.RequestException("Connection error")
with (
mock.patch("lasuite.malware_detection.tasks.jcop.MalwareDetectionHandler", return_value=handler),
pytest.raises(requests.exceptions.RequestException),
):
analyse_file_async("file.txt")
backend.check_analysis.assert_called_once_with("file.txt", file_hash=None)
backend.failed_analysis.assert_not_called()
def test_analyse_file_async_request_exception_max_retries(mock_backend):
"""Test analyse_file_async with request exception and max retries reached."""
backend, handler = mock_backend
backend.check_analysis.side_effect = requests.exceptions.RequestException("Connection error")
backend.failed_analysis = mock.MagicMock()
with mock.patch("lasuite.malware_detection.tasks.jcop.MalwareDetectionHandler", return_value=handler):
analyse_file_async.max_retries = 0
analyse_file_async("file.txt")
backend.check_analysis.assert_called_once_with("file.txt", file_hash=None)
backend.failed_analysis.assert_called_once_with(
"file.txt", error_code=MaxRetriesErrorCodes.ANALYSE_FILE, error_msg="Max retries fetching results exceeded"
)
def test_analyse_file_async_with_auth_error_no_retry(mock_backend):
"""Test analyse_file_async with authentication error (no retry)."""
backend, handler = mock_backend
backend.check_analysis.side_effect = MalwareDetectionInvalidAuthenticationError("Invalid API key")
with (
mock.patch("lasuite.malware_detection.tasks.jcop.MalwareDetectionHandler", return_value=handler),
pytest.raises(MalwareDetectionInvalidAuthenticationError),
):
analyse_file_async("file.txt")
backend.check_analysis.assert_called_once_with("file.txt", file_hash=None)
backend.failed_analysis.assert_not_called()
def test_trigger_new_analysis_success(mock_backend):
"""Test trigger_new_analysis with successful execution."""
backend, handler = mock_backend
with mock.patch("lasuite.malware_detection.tasks.jcop.MalwareDetectionHandler", return_value=handler):
trigger_new_analysis("file.txt", "file_hash", extra_param="value")
backend.trigger_new_analysis.assert_called_once_with("file.txt", "file_hash", extra_param="value")
def test_trigger_new_analysis_request_exception_retry(mock_backend):
"""Test trigger_new_analysis with request exception and retry."""
backend, handler = mock_backend
backend.trigger_new_analysis.side_effect = requests.exceptions.RequestException("Connection error")
with (
mock.patch("lasuite.malware_detection.tasks.jcop.MalwareDetectionHandler", return_value=handler),
pytest.raises(requests.exceptions.RequestException),
):
trigger_new_analysis("file.txt", "file_hash")
backend.trigger_new_analysis.assert_called_once_with("file.txt", "file_hash")
backend.failed_analysis.assert_not_called()
def test_trigger_new_analysis_timeout_retry(mock_backend):
"""Test trigger_new_analysis with timeout error and retry."""
backend, handler = mock_backend
backend.trigger_new_analysis.side_effect = TimeoutError("Timeout")
with (
mock.patch("lasuite.malware_detection.tasks.jcop.MalwareDetectionHandler", return_value=handler),
pytest.raises(TimeoutError),
):
trigger_new_analysis("file.txt", "file_hash")
backend.trigger_new_analysis.assert_called_once_with("file.txt", "file_hash")
backend.failed_analysis.assert_not_called()
def test_trigger_new_analysis_request_exception_max_retries(mock_backend):
"""Test trigger_new_analysis with request exception and max retries reached."""
backend, handler = mock_backend
backend.trigger_new_analysis.side_effect = requests.exceptions.RequestException("Connection error")
backend.failed_analysis = mock.MagicMock()
with mock.patch("lasuite.malware_detection.tasks.jcop.MalwareDetectionHandler", return_value=handler):
trigger_new_analysis.max_retries = 0
trigger_new_analysis("file.txt", "file_hash")
backend.trigger_new_analysis.assert_called_once_with("file.txt", "file_hash")
backend.failed_analysis.assert_called_once_with(
"file.txt",
error_code=MaxRetriesErrorCodes.TRIGGER_NEW_ANALYSIS,
error_msg="Max retries triggering new analysis exceeded",
)
def test_trigger_new_analysis_timeout_max_retries(mock_backend):
"""Test trigger_new_analysis with timeout error and max retries reached."""
backend, handler = mock_backend
backend.trigger_new_analysis.side_effect = TimeoutError("Timeout")
backend.failed_analysis = mock.MagicMock()
with mock.patch("lasuite.malware_detection.tasks.jcop.MalwareDetectionHandler", return_value=handler):
trigger_new_analysis.max_retries = 0
trigger_new_analysis("file.txt", "file_hash")
backend.trigger_new_analysis.assert_called_once_with("file.txt", "file_hash")
backend.failed_analysis.assert_called_once_with(
"file.txt",
error_code=MaxRetriesErrorCodes.TRIGGER_NEW_ANALYSIS_TIMEOUT,
error_msg="Max retries triggering new analysis exceeded",
)
def test_trigger_new_analysis_with_auth_error_no_retry(mock_backend):
"""Test trigger_new_analysis with authentication error (no retry)."""
backend, handler = mock_backend
backend.trigger_new_analysis.side_effect = MalwareDetectionInvalidAuthenticationError("Invalid API key")
with (
mock.patch("lasuite.malware_detection.tasks.jcop.MalwareDetectionHandler", return_value=handler),
pytest.raises(MalwareDetectionInvalidAuthenticationError),
):
trigger_new_analysis("file.txt", "file_hash")
backend.trigger_new_analysis.assert_called_once_with("file.txt", "file_hash")
backend.failed_analysis.assert_not_called()