(malware) add management command to reschedule processing

We want to reschedule processing task when they are older than 3 days.
A settings is used to modify this delay. The current backend is
instanciated and it is the backend responsability to determine if it
should reschedule it or not.
This commit is contained in:
Manuel Raynaud
2025-11-28 22:35:41 +01:00
parent 98c0510f6d
commit b0b90be8fe
3 changed files with 105 additions and 0 deletions

View File

@@ -13,6 +13,7 @@ and this project adheres to
- ✨(backend) keep traces of failed malware analysis tasks
- ✨(backend) save backend used in a malware analysis task
- ✨(backend) allow a malware detection backend to reschedule a task
- ✨(malware) add management command to reschedule processing
## [0.0.19] - 2025-11-21

View File

@@ -0,0 +1,29 @@
"""Management command to reschedule the processing analysis."""
from datetime import timedelta
from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils.timezone import now
from lasuite.malware_detection.handler import MalwareDetectionHandler
from lasuite.malware_detection.models import MalwareDetection, MalwareDetectionStatus
class Command(BaseCommand):
"""Management command to reschedule the processing analysis."""
def handle(self, *args, **options):
"""Handle the command."""
reschedule_days = getattr(settings, "MALWARE_DETECTION_RESCHEDULE_PROCESSING_ANALYSIS_DAYS", 3)
reschedule_date = now() - timedelta(days=reschedule_days)
malware_detections = MalwareDetection.objects.filter(
status=MalwareDetectionStatus.PROCESSING, created_at__lte=reschedule_date
)
backend = MalwareDetectionHandler()()
for malware_detection in malware_detections:
backend.reschedule_processing_task(malware_detection)

View File

@@ -0,0 +1,75 @@
"""Test the malware detection reschedule processing analysis command."""
from datetime import datetime, timezone
from unittest import mock
import pytest
from django.core.management import call_command
from freezegun import freeze_time
from lasuite.malware_detection.backends.base import BaseBackend
from lasuite.malware_detection.models import MalwareDetection, MalwareDetectionStatus
from tests import factories
pytestmark = pytest.mark.django_db
class TestBackend(BaseBackend):
"""Test backend."""
def launch_next_analysis(self) -> None:
"""Launch the next analysis."""
def analyse_file(self, file_path: str, **kwargs) -> None:
"""Analyse a file."""
def reschedule_processing_task(self, malware_detection_record: MalwareDetection) -> None:
"""Reschedule the processing task for a malware detection record."""
def test_malware_detection_reschedule_processing_analysis_command_no_processing_analysis(settings):
"""Test the malware detection reschedule processing analysis command with no processing analysis."""
settings.MALWARE_DETECTION = {
"BACKEND": "tests.malware_detection.test_malware_detection_reschedule_processing_analysis_command.TestBackend",
"PARAMETERS": {
"callback_path": "tests.foo",
},
}
with mock.patch.object(TestBackend, "reschedule_processing_task") as mock_reschedule_processing_task:
call_command("reschedule_processing_analysis")
mock_reschedule_processing_task.assert_not_called()
def test_malware_detection_reschedule_processing_analysis_command_processing_analysis(settings):
"""Test the malware detection reschedule processing analysis command with processing analysis."""
settings.MALWARE_DETECTION = {
"BACKEND": "tests.malware_detection.test_malware_detection_reschedule_processing_analysis_command.TestBackend",
"PARAMETERS": {
"callback_path": "tests.foo",
},
}
with freeze_time(datetime(2025, 11, 15, 0, 0, 0, tzinfo=timezone.utc)):
malware_detections_to_reschedule = factories.MalwareDetectionFactory.create_batch(
4,
status=MalwareDetectionStatus.PROCESSING,
)
with freeze_time(datetime(2025, 11, 19, 0, 0, 0, tzinfo=timezone.utc)):
factories.MalwareDetectionFactory.create_batch(
4,
status=MalwareDetectionStatus.PROCESSING,
)
with (
freeze_time(datetime(2025, 11, 20, 0, 0, 0, tzinfo=timezone.utc)),
mock.patch.object(TestBackend, "reschedule_processing_task") as mock_reschedule_processing_task,
):
call_command("reschedule_processing_analysis")
assert mock_reschedule_processing_task.call_count == 4
calls = [mock.call(malware_detection) for malware_detection in malware_detections_to_reschedule]
for call in calls:
assert call in mock_reschedule_processing_task.call_args_list