Files
n8n/packages/@n8n/task-runner-python/tests/fixtures/task_runner_manager.py

128 lines
4.1 KiB
Python

import asyncio
import os
import re
import sys
from pathlib import Path
from src.constants import (
ENV_GRACEFUL_SHUTDOWN_TIMEOUT,
ENV_GRANT_TOKEN,
ENV_HEALTH_CHECK_SERVER_ENABLED,
ENV_HEALTH_CHECK_SERVER_PORT,
ENV_LAUNCHER_LOG_LEVEL,
ENV_TASK_BROKER_URI,
ENV_TASK_TIMEOUT,
)
from tests.fixtures.test_constants import (
GRACEFUL_SHUTDOWN_TIMEOUT,
TASK_TIMEOUT,
)
class TaskRunnerManager:
"""Responsible for managing the lifecycle of a task runner subprocess."""
def __init__(
self,
task_broker_url: str | None = None,
graceful_shutdown_timeout: float | None = None,
custom_env: dict[str, str] | None = None,
):
self.task_broker_url = task_broker_url
self.graceful_shutdown_timeout = graceful_shutdown_timeout
self.custom_env = custom_env or {}
self.subprocess: asyncio.subprocess.Process | None = None
self.stdout_buffer: list[str] = []
self.stderr_buffer: list[str] = []
self.health_check_port: int | None = None
async def start(self):
project_root = Path(__file__).parent.parent.parent
runner_path = project_root / "src" / "main.py"
env_vars = os.environ.copy()
env_vars[ENV_GRANT_TOKEN] = "test_token"
env_vars[ENV_TASK_BROKER_URI] = self.task_broker_url
env_vars[ENV_TASK_TIMEOUT] = str(TASK_TIMEOUT)
env_vars[ENV_HEALTH_CHECK_SERVER_ENABLED] = "true"
env_vars[ENV_HEALTH_CHECK_SERVER_PORT] = "0"
env_vars[ENV_LAUNCHER_LOG_LEVEL] = "INFO"
if self.graceful_shutdown_timeout is not None:
env_vars[ENV_GRACEFUL_SHUTDOWN_TIMEOUT] = str(
self.graceful_shutdown_timeout
)
env_vars["PYTHONPATH"] = str(project_root)
env_vars.update(self.custom_env)
self.subprocess = await asyncio.create_subprocess_exec(
sys.executable,
str(runner_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env_vars,
cwd=str(project_root),
)
asyncio.create_task(self._read_stdout())
asyncio.create_task(self._read_stderr())
await self._wait_for_health_check_port()
def is_running(self) -> bool:
return self.subprocess is not None and self.subprocess.returncode is None
def get_health_check_url(self) -> str:
return f"http://localhost:{self.health_check_port}"
async def stop(self) -> None:
if not self.subprocess or self.subprocess.returncode is not None:
return
self.subprocess.terminate()
try:
await asyncio.wait_for(
self.subprocess.wait(), timeout=GRACEFUL_SHUTDOWN_TIMEOUT
)
except asyncio.TimeoutError:
self.subprocess.kill()
await self.subprocess.wait()
async def _read_stdout(self):
if not self.subprocess or not self.subprocess.stdout:
return
while True:
line = await self.subprocess.stdout.readline()
if not line:
break
self.stdout_buffer.append(line.decode().strip())
async def _read_stderr(self):
if not self.subprocess or not self.subprocess.stderr:
return
while True:
line = await self.subprocess.stderr.readline()
if not line:
break
self.stderr_buffer.append(line.decode().strip())
async def _wait_for_health_check_port(self, timeout: float = 5.0):
pattern = re.compile(r"Health check server listening on .+, port (\d+)")
start_time = asyncio.get_running_loop().time()
while asyncio.get_running_loop().time() - start_time < timeout:
for line in self.stdout_buffer:
match = pattern.search(line)
if match:
self.health_check_port = int(match.group(1))
return
await asyncio.sleep(0.1)
raise TimeoutError(
f"Failed to detect health check port within {timeout}s. "
f"Stdout: {self.stdout_buffer}"
)