Files
browser-use/browser_use/utils.py

324 lines
11 KiB
Python

import asyncio
import logging
import os
import platform
import signal
import time
from functools import wraps
from sys import stderr
from typing import Any, Callable, Coroutine, List, Optional, ParamSpec, TypeVar
logger = logging.getLogger(__name__)
# Global flag to prevent duplicate exit messages
_exiting = False
# Define generic type variables for return type and parameters
R = TypeVar('R')
P = ParamSpec('P')
class SignalHandler:
"""
A modular and reusable signal handling system for managing SIGINT (Ctrl+C), SIGTERM,
and other signals in asyncio applications.
This class provides:
- Configurable signal handling for SIGINT and SIGTERM
- Support for custom pause/resume callbacks
- Management of event loop state across signals
- Standardized handling of first and second Ctrl+C presses
- Cross-platform compatibility (with simplified behavior on Windows)
"""
def __init__(
self,
loop: Optional[asyncio.AbstractEventLoop] = None,
pause_callback: Optional[Callable[[], None]] = None,
resume_callback: Optional[Callable[[], None]] = None,
custom_exit_callback: Optional[Callable[[], None]] = None,
exit_on_second_int: bool = True,
interruptible_task_patterns: List[str] = None,
):
"""
Initialize the signal handler.
Args:
loop: The asyncio event loop to use. Defaults to current event loop.
pause_callback: Function to call when system is paused (first Ctrl+C)
resume_callback: Function to call when system is resumed
custom_exit_callback: Function to call on exit (second Ctrl+C or SIGTERM)
exit_on_second_int: Whether to exit on second SIGINT (Ctrl+C)
interruptible_task_patterns: List of patterns to match task names that should be
canceled on first Ctrl+C (default: ['step', 'multi_act', 'get_next_action'])
"""
self.loop = loop or asyncio.get_event_loop()
self.pause_callback = pause_callback
self.resume_callback = resume_callback
self.custom_exit_callback = custom_exit_callback
self.exit_on_second_int = exit_on_second_int
self.interruptible_task_patterns = interruptible_task_patterns or ['step', 'multi_act', 'get_next_action']
self.is_windows = platform.system() == 'Windows'
# Initialize loop state attributes
self._initialize_loop_state()
# Store original signal handlers to restore them later if needed
self.original_sigint_handler = None
self.original_sigterm_handler = None
def _initialize_loop_state(self) -> None:
"""Initialize loop state attributes used for signal handling."""
setattr(self.loop, 'ctrl_c_pressed', False)
setattr(self.loop, 'waiting_for_input', False)
def register(self) -> None:
"""Register signal handlers for SIGINT and SIGTERM."""
try:
if self.is_windows:
# On Windows, use simple signal handling with immediate exit on Ctrl+C
def windows_handler(sig, frame):
print('\n\n🛑 Got Ctrl+C. Exiting immediately on Windows...\n', file=stderr)
# Run the custom exit callback if provided
if self.custom_exit_callback:
self.custom_exit_callback()
os._exit(0)
self.original_sigint_handler = signal.signal(signal.SIGINT, windows_handler)
else:
# On Unix-like systems, use asyncio's signal handling for smoother experience
self.original_sigint_handler = self.loop.add_signal_handler(signal.SIGINT, lambda: self.sigint_handler())
self.original_sigterm_handler = self.loop.add_signal_handler(signal.SIGTERM, lambda: self.sigterm_handler())
except Exception:
# there are situations where signal handlers are not supported, e.g.
# - when running in a thread other than the main thread
# - some operating systems
# - inside jupyter notebooks
pass
def unregister(self) -> None:
"""Unregister signal handlers and restore original handlers if possible."""
try:
if self.is_windows:
# On Windows, just restore the original SIGINT handler
if self.original_sigint_handler:
signal.signal(signal.SIGINT, self.original_sigint_handler)
else:
# On Unix-like systems, use asyncio's signal handler removal
self.loop.remove_signal_handler(signal.SIGINT)
self.loop.remove_signal_handler(signal.SIGTERM)
# Restore original handlers if available
if self.original_sigint_handler:
signal.signal(signal.SIGINT, self.original_sigint_handler)
if self.original_sigterm_handler:
signal.signal(signal.SIGTERM, self.original_sigterm_handler)
except Exception as e:
logger.warning(f'Error while unregistering signal handlers: {e}')
def _handle_second_ctrl_c(self) -> None:
"""
Handle a second Ctrl+C press by performing cleanup and exiting.
This is shared logic used by both sigint_handler and wait_for_resume.
"""
global _exiting
if not _exiting:
_exiting = True
# Call custom exit callback if provided
if self.custom_exit_callback:
try:
self.custom_exit_callback()
except Exception as e:
logger.error(f'Error in exit callback: {e}')
# Force immediate exit - more reliable than sys.exit()
print('\n\n🛑 Got second Ctrl+C. Exiting immediately...\n', file=stderr)
# write carriage return + newline + ASNI reset to both stdout and stderr to clear any color codes
print('\r\033[0m', end='', flush=True, file=stderr)
print('\r\033[0m', end='', flush=True)
os._exit(0)
def sigint_handler(self) -> None:
"""
SIGINT (Ctrl+C) handler.
First Ctrl+C: Cancel current step and pause.
Second Ctrl+C: Exit immediately if exit_on_second_int is True.
"""
global _exiting
if _exiting:
# Already exiting, force exit immediately
os._exit(0)
if getattr(self.loop, 'ctrl_c_pressed', False):
# If we're in the waiting for input state, let the pause method handle it
if getattr(self.loop, 'waiting_for_input', False):
return
# Second Ctrl+C - exit immediately if configured to do so
if self.exit_on_second_int:
self._handle_second_ctrl_c()
# Mark that Ctrl+C was pressed
self.loop.ctrl_c_pressed = True
# Cancel current tasks that should be interruptible - this is crucial for immediate pausing
self._cancel_interruptible_tasks()
# Call pause callback if provided - this sets the paused flag
if self.pause_callback:
try:
self.pause_callback()
except Exception as e:
logger.error(f'Error in pause callback: {e}')
# Log pause message after pause_callback is called (not before)
print('----------------------------------------------------------------------', file=stderr)
def sigterm_handler(self) -> None:
"""
SIGTERM handler.
Always exits the program completely.
"""
global _exiting
if not _exiting:
_exiting = True
print('\n\n🛑 SIGTERM received. Exiting immediately...\n\n', file=stderr)
# Call custom exit callback if provided
if self.custom_exit_callback:
self.custom_exit_callback()
os._exit(0)
def _cancel_interruptible_tasks(self) -> None:
"""Cancel current tasks that should be interruptible."""
current_task = asyncio.current_task(self.loop)
for task in asyncio.all_tasks(self.loop):
if task != current_task and not task.done():
task_name = task.get_name() if hasattr(task, 'get_name') else str(task)
# Cancel tasks that match certain patterns
if any(pattern in task_name for pattern in self.interruptible_task_patterns):
logger.debug(f'Cancelling task: {task_name}')
task.cancel()
# Add exception handler to silence "Task exception was never retrieved" warnings
task.add_done_callback(lambda t: t.exception() if t.cancelled() else None)
# Also cancel the current task if it's interruptible
if current_task and not current_task.done():
task_name = current_task.get_name() if hasattr(current_task, 'get_name') else str(current_task)
if any(pattern in task_name for pattern in self.interruptible_task_patterns):
logger.debug(f'Cancelling current task: {task_name}')
current_task.cancel()
def wait_for_resume(self) -> None:
"""
Wait for user input to resume or exit.
This method should be called after handling the first Ctrl+C.
It temporarily restores default signal handling to allow catching
a second Ctrl+C directly.
"""
# Set flag to indicate we're waiting for input
setattr(self.loop, 'waiting_for_input', True)
# Temporarily restore default signal handling for SIGINT
# This ensures KeyboardInterrupt will be raised during input()
original_handler = signal.getsignal(signal.SIGINT)
try:
signal.signal(signal.SIGINT, signal.default_int_handler)
except ValueError:
# we are running in a thread other than the main thread
# or signal handlers are not supported for some other reason
pass
green = '\x1b[32;1m'
red = '\x1b[31m'
blink = '\033[33;5m'
unblink = '\033[0m'
reset = '\x1b[0m'
try: # escape code is to blink the ...
print(
f'➡️ Press {green}[Enter]{reset} to resume or {red}[Ctrl+C]{reset} again to exit{blink}...{unblink} ',
end='',
flush=True,
file=stderr,
)
input() # This will raise KeyboardInterrupt on Ctrl+C
# Call resume callback if provided
if self.resume_callback:
self.resume_callback()
except KeyboardInterrupt:
# Use the shared method to handle second Ctrl+C
self._handle_second_ctrl_c()
finally:
try:
# Restore our signal handler
signal.signal(signal.SIGINT, original_handler)
setattr(self.loop, 'waiting_for_input', False)
except Exception:
pass
def reset(self) -> None:
"""Reset state after resuming."""
# Clear the flags
if hasattr(self.loop, 'ctrl_c_pressed'):
self.loop.ctrl_c_pressed = False
if hasattr(self.loop, 'waiting_for_input'):
self.loop.waiting_for_input = False
def time_execution_sync(additional_text: str = '') -> Callable[[Callable[P, R]], Callable[P, R]]:
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
start_time = time.time()
result = func(*args, **kwargs)
execution_time = time.time() - start_time
logger.debug(f'{additional_text} Execution time: {execution_time:.2f} seconds')
return result
return wrapper
return decorator
def time_execution_async(
additional_text: str = '',
) -> Callable[[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]]:
def decorator(func: Callable[P, Coroutine[Any, Any, R]]) -> Callable[P, Coroutine[Any, Any, R]]:
@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
start_time = time.time()
result = await func(*args, **kwargs)
execution_time = time.time() - start_time
logger.debug(f'{additional_text} Execution time: {execution_time:.2f} seconds')
return result
return wrapper
return decorator
def singleton(cls):
instance = [None]
def wrapper(*args, **kwargs):
if instance[0] is None:
instance[0] = cls(*args, **kwargs)
return instance[0]
return wrapper
def check_env_variables(keys: list[str], any_or_all=all) -> bool:
"""Check if all required environment variables are set"""
return any_or_all(os.getenv(key, '').strip() for key in keys)