This commit is contained in:
Quentin BEY
2025-05-05 15:32:39 +02:00
parent 1a9d074915
commit 9937d45295
3 changed files with 199 additions and 199 deletions

View File

@@ -1,11 +1,18 @@
"""Module contains the JCOP backends for the malware detection system."""
import hashlib
import logging
from http import HTTPStatus
import requests
from django.conf import settings
from django.core.files.storage import default_storage
from django.utils.module_loading import import_string
from requests_toolbelt import MultipartEncoder
from ..tasks.jcop import analyse_file_async
from ..enums import ReportStatus
from ..exceptions import MalwareDetectionInvalidAuthenticationError
from ..tasks.jcop import analyse_file_async, trigger_new_analysis
from ..utils import encrypt_message
from .base import BaseBackend
@@ -33,8 +40,12 @@ class JCOPBackend(BaseBackend):
if not default_storage.exists(file_path):
raise FileNotFoundError(f"File {file_path} not found")
with default_storage.open(file_path, "rb") as file:
file_hash = hashlib.file_digest(file, "sha256").hexdigest()
analyse_file_async.delay(
file_path,
file_hash,
api_key=encrypt_message(self.api_key),
base_url=self.base_url,
callback_path=self.callback_path,
@@ -42,3 +53,161 @@ class JCOPBackend(BaseBackend):
submit_timeout=self.submit_timeout,
**kwargs,
)
def failed_analysis(self, file_path: str, error_msg: str, **kwargs) -> None:
"""Handle a failed analysis."""
callback = import_string(self.callback_path)
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "Failed to analyse the file",
"error_code": 5000,
},
**kwargs,
)
def start_analysis(self, file_path: str, file_hash: str, **kwargs) -> bool:
"""Start the analysis process for a file."""
callback = import_string(self.callback_path)
try:
# try if the file as already been tested
response = requests.get(
f"{self.base_url}/results/{file_hash}",
headers={
"X-Auth-Token": self.api_key,
"Accept": "application/json",
},
timeout=self.result_timeout,
)
except requests.exceptions.RequestException as exc:
logger.error("Error getting cache result for file %s: %s", file_path, exc)
raise
if response.status_code == HTTPStatus.NOT_FOUND:
# start a new analysis
trigger_new_analysis.delay(
file_path,
**kwargs,
)
return False
if response.status_code == HTTPStatus.UNAUTHORIZED:
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "Invalid API key",
"error_code": response.status_code,
},
**kwargs,
)
raise MalwareDetectionInvalidAuthenticationError()
if response.status_code == HTTPStatus.OK:
content = response.json()
if content.get("done", False) is False:
# the analysis is not done yet, retry later
return True
is_malware = content.get("is_malware")
if is_malware is True or content.get("error_code"):
status = ReportStatus.UNSAFE if is_malware else ReportStatus.UNKNOWN
callback(
file_path,
status,
error_info={
"error": content.get("error", "malware detected"),
"error_code": content.get("error_code", 5000),
},
**kwargs,
)
return False
if is_malware is False:
callback(file_path, ReportStatus.SAFE, error_info={}, **kwargs)
return False
# Any other case, call the callback with an unknown error
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "Unknown treatment",
"error_code": response.status_code,
},
**kwargs,
)
return False
def trigger_new_analysis(self, file_path: str, **kwargs):
"""Trigger a new analysis for a file."""
callback = import_string(self.callback_path)
with default_storage.open(file_path, "rb") as file:
encoder = MultipartEncoder(
fields={
"file": (file.name, file),
}
)
try:
response = requests.post(
f"{self.base_url}/submit",
headers={
"X-Auth-Token": self.api_key,
"Accept": "application/json",
"Content-Type": encoder.content_type,
},
data=encoder,
timeout=(30, self.submit_timeout),
)
except requests.exceptions.RequestException as exc:
logger.error("Error submitting file to JCOP: %s", exc)
raise
if response.status_code == HTTPStatus.OK:
content = response.json()
analyse_file_async.apply_async(
countdown=30,
args=(file_path, content["id"]),
kwargs=kwargs,
)
return
if response.status_code == HTTPStatus.UNAUTHORIZED:
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "Invalid API key",
"error_code": response.status_code,
},
**kwargs,
)
raise MalwareDetectionInvalidAuthenticationError()
if response.status_code == HTTPStatus.REQUEST_TIMEOUT:
raise TimeoutError()
if response.status_code == HTTPStatus.REQUEST_ENTITY_TOO_LARGE:
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "File too large",
"error_code": response.status_code,
},
**kwargs,
)
return
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "Unknown treatment",
"error_code": response.status_code,
},
**kwargs,
)

View File

@@ -1,18 +1,12 @@
"""Module containing the tasks for the JCOP backend."""
import hashlib
import logging
from http import HTTPStatus
import requests
from celery import shared_task
from django.core.files.storage import default_storage
from django.utils.module_loading import import_string
from requests_toolbelt.multipart.encoder import MultipartEncoder
from ..enums import ReportStatus
from .. import MalwareDetectionHandler
from ..exceptions import MalwareDetectionInvalidAuthenticationError
from ..utils import decrypt_message
logger = logging.getLogger(__name__)
@@ -26,214 +20,40 @@ logger = logging.getLogger(__name__)
def analyse_file_async( # noqa: PLR0913
self,
file_path: str,
api_key: str,
base_url: str,
callback_path: str,
result_timeout: int = 30,
submit_timeout: int = 60 * 10,
file_hash: str | None = None,
file_hash: str,
**kwargs,
) -> None:
"""Task starting analysis process for a file."""
if not file_hash:
with default_storage.open(file_path, "rb") as file:
file_hash = hashlib.file_digest(file, "sha256").hexdigest()
callback = import_string(callback_path)
backend = MalwareDetectionHandler()() # JCOPBackend
try:
# try if the file as already been tested
response = requests.get(
f"{base_url}/results/{file_hash}",
headers={
"X-Auth-Token": decrypt_message(api_key),
"Accept": "application/json",
},
timeout=result_timeout,
)
backend.start_analysis(file_path, file_hash)
except requests.exceptions.RequestException as exc:
logger.error("Error getting cache result for file %s: %s", file_path, exc)
if self.request.retries >= self.max_retries:
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "Max retries fetching results exceeded",
"error_code": 5000,
},
**kwargs,
)
backend.failed_analysis(file_path, "Max retries fetching results exceeded")
return
self.retry(exc=exc)
return
if response.status_code == HTTPStatus.NOT_FOUND:
# start a new analysis
trigger_new_analysis.delay(
file_path,
api_key=api_key,
base_url=base_url,
callback_path=callback_path,
result_timeout=result_timeout,
submit_timeout=submit_timeout,
**kwargs,
)
return
if response.status_code == HTTPStatus.UNAUTHORIZED:
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "Invalid API key",
"error_code": response.status_code,
},
**kwargs,
)
raise MalwareDetectionInvalidAuthenticationError()
if response.status_code == HTTPStatus.OK:
content = response.json()
if content.get("done", False) is False:
# the analysis is not done yet, retry later
self.retry()
return
is_malware = content.get("is_malware")
if is_malware is True or content.get("error_code"):
status = ReportStatus.UNSAFE if is_malware else ReportStatus.UNKNOWN
callback(
file_path,
status,
error_info={
"error": content.get("error", "malware detected"),
"error_code": content.get("error_code", 5000),
},
**kwargs,
)
return
if is_malware is False:
callback(file_path, ReportStatus.SAFE, error_info={}, **kwargs)
return
# Any other case, call the callback with an unknown error
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "Unknown treatment",
"error_code": response.status_code,
},
**kwargs,
)
@shared_task(bind=True, max_retries=6, dont_autoretry_for=(MalwareDetectionInvalidAuthenticationError,))
def trigger_new_analysis( # noqa: PLR0913
self,
file_path: str,
api_key: str,
base_url: str,
callback_path: str,
result_timeout: int = 30,
submit_timeout: int = 10 * 60,
**kwargs,
) -> None:
"""Trigger a new analysis for a file."""
with default_storage.open(file_path, "rb") as file:
callback = import_string(callback_path)
encoder = MultipartEncoder(
fields={
"file": (file.name, file),
}
)
try:
response = requests.post(
f"{base_url}/submit",
headers={
"X-Auth-Token": decrypt_message(api_key),
"Accept": "application/json",
"Content-Type": encoder.content_type,
},
data=encoder,
timeout=(30, submit_timeout),
)
except requests.exceptions.RequestException as exc:
logger.error("Error submitting file to JCOP: %s", exc)
if self.request.retries >= self.max_retries:
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "Max retries triggering new analysis exceeded",
"error_code": 5000,
},
**kwargs,
)
self.retry(exc=exc)
return
if response.status_code == HTTPStatus.OK:
content = response.json()
analyse_file_async.apply_async(
countdown=30,
args=(file_path,),
kwargs={
"file_hash": content["id"],
"api_key": api_key,
"base_url": base_url,
"callback_path": callback_path,
"result_timeout": result_timeout,
"submit_timeout": submit_timeout,
**kwargs,
},
)
return
if response.status_code == HTTPStatus.UNAUTHORIZED:
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "Invalid API key",
"error_code": response.status_code,
},
**kwargs,
)
raise MalwareDetectionInvalidAuthenticationError()
if response.status_code == HTTPStatus.REQUEST_TIMEOUT:
backend = MalwareDetectionHandler()() # JCOPBackend
try:
should_retry = backend.trigger_new_analysis(file_path)
except requests.exceptions.RequestException as exc:
if self.request.retries >= self.max_retries:
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "Max retries triggering new analysis exceeded",
"error_code": 5000,
},
**kwargs,
)
backend.failed_analysis(file_path, "Max retries triggering new analysis exceeded")
self.retry(exc=exc)
return
except TimeoutError:
if self.request.retries >= self.max_retries:
backend.failed_analysis(file_path, "Max retries triggering new analysis exceeded")
self.retry(exc=TimeoutError())
return
if response.status_code == HTTPStatus.REQUEST_ENTITY_TOO_LARGE:
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "File too large",
"error_code": response.status_code,
},
**kwargs,
)
return
callback(
file_path,
ReportStatus.UNKNOWN,
error_info={
"error": "Unknown treatment",
"error_code": response.status_code,
},
**kwargs,
)
if should_retry:
self.retry()

View File

@@ -51,6 +51,17 @@ def jcop_backend_parameters():
}
@pytest.fixture(autouse=True)
def jcop_backend_settings_fixture(settings):
settings.MALWARE_DETECTION = {
"BACKEND": "lasuite.malware_detection.backends.jcop.JCOPBackend",
"PARAMETERS": {
"base_url": "https://malware_detection.tld/api/v1",
"api_key": "xxxx",
"callback_path": "tests.malware_detection.backends.test_jcop_backend.jcop_callback",
},
}
@pytest.fixture
def jcop_backend(jcop_backend_parameters):
"""Create a JCOP backend instance with test configuration."""