(malware) save backend used in a malware analysis task

For later use we wan't to know the backend used for a given task
This commit is contained in:
Manuel Raynaud
2025-11-28 15:58:38 +01:00
parent a91ad0009e
commit 3d86ec7e4e
6 changed files with 27 additions and 2 deletions

View File

@@ -11,6 +11,7 @@ and this project adheres to
### Added
- ✨(backend) keep traces of failed malware analysis tasks
- ✨(backend) save backend used in a malware analysis task
## [0.0.19] - 2025-11-21

View File

@@ -17,6 +17,11 @@ class BaseBackend(ABC):
self.max_processing_files = max_processing_files
@property
def backend_name(self):
"""Backend full class name to store in malware detection records if needed."""
return f"{self.__class__.__module__}.{self.__class__.__qualname__}"
@property
def callback(self):
"""Get the callback function."""

View File

@@ -58,6 +58,7 @@ class JCOPBackend(BaseBackend):
MalwareDetection.objects.create(
path=file_path,
status=MalwareDetectionStatus.PENDING,
backend=self.backend_name,
parameters=kwargs,
)

View File

@@ -25,4 +25,9 @@ class Migration(migrations.Migration):
name='status',
field=models.CharField(choices=[('pending', 'Pending'), ('processing', 'Processing'), ('failed', 'Failed')], default='pending', help_text='status of the detection', max_length=255),
),
migrations.AddField(
model_name='malwaredetection',
name='backend',
field=models.TextField(blank=True, help_text='backend used for the detection'),
),
]

View File

@@ -70,6 +70,10 @@ class MalwareDetection(models.Model):
help_text="error message for the detection",
blank=True,
)
backend = models.TextField(
help_text="backend used for the detection",
blank=True,
)
def __str__(self):
"""Return a string representation of the model."""

View File

@@ -73,11 +73,16 @@ def test_jcop_backend_file_exists(jcop_generate_file_path, jcop_backend):
mock.patch.object(analyse_file_async, "delay") as analyse_file_async_mock,
):
file_path, _ = jcop_generate_file_path
assert not MalwareDetection.objects.filter(path=file_path).exists()
jcop_backend.analyse_file(file_path)
analyse_file_async_mock.assert_called_once_with(
"file.txt",
)
assert MalwareDetection.objects.filter(
backend="lasuite.malware_detection.backends.jcop.JCOPBackend", path=file_path
).exists()
def test_jcop_backend_analyse_file_max_processing_reached(jcop_generate_file_path):
"""When the max processing files is reached, the file is not analysed."""
@@ -94,12 +99,16 @@ def test_jcop_backend_analyse_file_max_processing_reached(jcop_generate_file_pat
mock.patch.object(analyse_file_async, "delay") as analyse_file_async_mock,
):
file_path, _ = jcop_generate_file_path
assert not MalwareDetection.objects.filter(path=file_path).exists()
backend.analyse_file(file_path, foo="bar")
analyse_file_async_mock.assert_not_called()
assert MalwareDetection.objects.filter(
backend="lasuite.malware_detection.backends.jcop.JCOPBackend",
path=file_path,
status=MalwareDetectionStatus.PENDING,
).exists()
assert MalwareDetection.objects.filter(status=MalwareDetectionStatus.PROCESSING).count() == 10
assert MalwareDetection.objects.filter(status=MalwareDetectionStatus.PENDING).count() == 1
assert MalwareDetection.objects.filter(status=MalwareDetectionStatus.PENDING).first().path == file_path
def test_jcop_backend_file_exists_called_with_kwargs(jcop_generate_file_path, jcop_backend):