(marketing) create a celery task to create a new contact

To ease the use of this module, we added a shared task which a wrapper
arount the configured backend and will only call the
create_or_update_contact in a celery task.
This commit is contained in:
Manuel Raynaud
2025-12-03 16:39:23 +01:00
parent 609251233a
commit 6b1d47b80d
3 changed files with 68 additions and 1 deletions

View File

@@ -0,0 +1,19 @@
"""Marketing tasks module."""
from celery import shared_task
from lasuite.marketing import marketing
from lasuite.marketing.backends import ContactData
@shared_task
def create_or_update_contact(
email: str,
attributes: dict[str, str] | None = None,
list_ids: list[int] | None = None,
update_enabled: bool = True,
timeout: int = None,
):
"""Create or update a contact."""
contact_data = ContactData(email=email, attributes=attributes, list_ids=list_ids, update_enabled=update_enabled)
return marketing.create_or_update_contact(contact_data, timeout)

View File

@@ -1 +1 @@
"""Marketing tests module."""
"""Marketing tests module."""

View File

@@ -0,0 +1,48 @@
"""Test the marketing tasks."""
from unittest import mock
from lasuite.marketing import tasks
from lasuite.marketing.backends import ContactData
def test_create_or_update_contact_success():
"""Test the create_or_update_contact task."""
with mock.patch.object(tasks, "marketing") as mock_marketing:
mock_marketing.create_or_update_contact = mock.MagicMock()
contact_data = ContactData(
email="test@example.com",
attributes={"first_name": "Test"},
list_ids=[1, 2],
update_enabled=True,
)
tasks.create_or_update_contact(
email="test@example.com", attributes={"first_name": "Test"}, list_ids=[1, 2], update_enabled=True
)
mock_marketing.create_or_update_contact.assert_called_once_with(contact_data, None)
def test_create_or_update_contact_with_timeout():
"""Test the create_or_update_contact task."""
with mock.patch.object(tasks, "marketing") as mock_marketing:
mock_marketing.create_or_update_contact = mock.MagicMock()
contact_data = ContactData(
email="test@example.com",
attributes={"first_name": "Test"},
list_ids=[1, 2],
update_enabled=True,
)
tasks.create_or_update_contact(
email="test@example.com",
attributes={"first_name": "Test"},
list_ids=[1, 2],
update_enabled=True,
timeout=30,
)
mock_marketing.create_or_update_contact.assert_called_once_with(contact_data, 30)