simplify daemon architecture: single session, socket-as-liveness, no PID/lock files

Replace the multi-session server (server.py, SessionRegistry, portalocker locking,
PID files, orphan detection) with a minimal daemon (daemon.py) that holds one
BrowserSession in memory. Socket file existence = alive. Auto-exits when browser
dies via CDP watchdog.

-2277 lines, +142 lines across 20 files.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
ShawnPana
2026-03-03 19:05:44 -08:00
parent 39698e58e4
commit 859cb97063
21 changed files with 407 additions and 2275 deletions

View File

@@ -694,8 +694,6 @@ class BrowserUseApp(App):
'trafilatura.htmlprocessing',
'trafilatura',
'groq',
'portalocker',
'portalocker.utils',
]:
third_party = logging.getLogger(logger_name)
third_party.setLevel(logging.ERROR)

View File

@@ -223,9 +223,7 @@ def setup_logging(stream=None, log_level=None, force_setup=False, debug_log_file
'trafilatura.htmlprocessing',
'trafilatura',
'groq',
'portalocker',
'google_genai',
'portalocker.utils',
'websockets', # General websockets (but not websockets.client which we need)
]
for logger_name in third_party_loggers:

View File

@@ -322,22 +322,16 @@ browser-use -b remote open https://abc.trycloudflare.com
| `profile update <id> --name <name>` | Rename profile |
| `profile delete <id>` | Delete profile |
## Local Session Management
## Session Management
| Command | Description |
|---------|-------------|
| `sessions` | List active sessions |
| `close` | Close browser session |
| `close --all` | Close all sessions |
| `server status` | Check if server is running |
| `server stop` | Stop server |
| `server logs` | View server logs |
| `close` | Close browser and stop daemon |
## Global Options
| Option | Description |
|--------|-------------|
| `--session NAME` | Use named session (default: "default") |
| `--browser MODE` | Browser mode: chromium, real, remote |
| `--headed` | Show browser window |
| `--profile NAME` | Browser profile (local name or cloud ID) |
@@ -345,8 +339,6 @@ browser-use -b remote open https://abc.trycloudflare.com
| `--api-key KEY` | Override API key |
| `--mcp` | Run as MCP server via stdin/stdout |
**Session behavior**: All commands without `--session` use the same "default" session. The browser stays open and is reused across commands. Use `--session NAME` to run multiple browsers in parallel.
## Examples
### Fill a Form
@@ -365,15 +357,6 @@ browser-use open https://news.ycombinator.com
browser-use eval "Array.from(document.querySelectorAll('.titleline a')).slice(0,5).map(a => a.textContent)"
```
### Multi-Session Workflow
```bash
browser-use --session work open https://work.example.com
browser-use --session personal open https://personal.example.com
browser-use --session work state
browser-use --session personal state
browser-use close --all
```
### Python Automation
```bash
browser-use open https://example.com
@@ -410,12 +393,12 @@ curl -o ~/.claude/skills/browser-use/SKILL.md \
## How It Works
The CLI uses a session server architecture:
The CLI uses a daemon architecture:
1. First command starts a background server (browser stays open)
1. First command starts a background daemon (browser stays open)
2. Subsequent commands communicate via Unix socket (or TCP on Windows)
3. Browser persists across commands for fast interaction
4. Server auto-starts when needed, stops with `browser-use server stop`
4. Daemon auto-starts when needed, auto-exits when browser dies, or stops with `browser-use close`
This gives you ~50ms command latency instead of waiting for browser startup each time.
@@ -444,11 +427,11 @@ echo $env:PATH
& "C:\Program Files\Git\bin\bash.exe" -c 'browser-use --help'
```
### "Failed to start session server" error
### "Failed to start daemon" error
Kill zombie processes:
```powershell
# Find process on port
netstat -ano | findstr 49698
netstat -ano | findstr 49200
# Kill by PID
taskkill /PID <pid> /F

View File

@@ -1,7 +1,7 @@
"""Browser-use CLI package.
This package provides a fast command-line interface for browser automation.
The CLI uses a session server architecture for persistent browser sessions.
The CLI uses a daemon architecture for persistent browser sessions.
Usage:
browser-use open https://example.com

View File

@@ -5,7 +5,6 @@ from browser_use.skill_cli.commands import (
browser,
doctor,
python_exec,
session,
setup,
)
@@ -14,6 +13,5 @@ __all__ = [
'browser',
'doctor',
'python_exec',
'session',
'setup',
]

View File

@@ -1,38 +0,0 @@
"""Session management command handlers."""
import logging
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from browser_use.skill_cli.sessions import SessionRegistry
logger = logging.getLogger(__name__)
COMMANDS = {'sessions', 'close'}
async def handle(action: str, session_name: str, registry: 'SessionRegistry', params: dict[str, Any]) -> Any:
"""Handle session management command."""
if action == 'sessions':
sessions = registry.list_sessions()
return {
'sessions': sessions,
'count': len(sessions),
}
elif action == 'close':
if params.get('all'):
# Close all sessions and signal shutdown
sessions = registry.list_sessions()
await registry.close_all()
return {
'closed': [s['name'] for s in sessions],
'count': len(sessions),
'_shutdown': True, # Signal to stop server
}
else:
# Close this server's session and shutdown
await registry.close_session(session_name)
return {'closed': session_name, '_shutdown': True}
raise ValueError(f'Unknown session action: {action}')

View File

@@ -0,0 +1,267 @@
"""Background daemon - keeps a single BrowserSession alive.
Replaces the multi-session server.py with a simpler model:
- One daemon, one session, one socket
- Socket file existence = daemon is alive (no PID/lock files)
- Auto-exits when browser dies (polls is_cdp_connected)
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import signal
import sys
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from browser_use.skill_cli.sessions import SessionInfo
# Configure logging before imports
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger('browser_use.skill_cli.daemon')
class Daemon:
"""Single-session daemon that manages a browser and handles CLI commands."""
def __init__(
self,
browser_mode: str,
headed: bool,
profile: str | None,
) -> None:
self.browser_mode = browser_mode
self.headed = headed
self.profile = profile
self.running = True
self._server: asyncio.Server | None = None
self._shutdown_event = asyncio.Event()
self._session: 'SessionInfo | None' = None
self._browser_watchdog_task: asyncio.Task | None = None
async def _get_or_create_session(self) -> 'SessionInfo':
"""Lazy-create the single session on first command."""
if self._session is not None:
return self._session
from browser_use.skill_cli.sessions import SessionInfo, create_browser_session
logger.info(f'Creating session (mode={self.browser_mode}, headed={self.headed})')
bs = await create_browser_session(self.browser_mode, self.headed, self.profile)
await bs.start()
self._session = SessionInfo(
name='default',
browser_mode=self.browser_mode,
headed=self.headed,
profile=self.profile,
browser_session=bs,
)
self._browser_watchdog_task = asyncio.create_task(self._watch_browser())
return self._session
async def _watch_browser(self) -> None:
"""Poll BrowserSession.is_cdp_connected every 2s. Shutdown when browser dies."""
while self.running:
await asyncio.sleep(2.0)
if self._session and not self._session.browser_session.is_cdp_connected:
logger.info('Browser disconnected, shutting down daemon')
await self.shutdown()
return
async def handle_connection(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> None:
"""Handle a single client request (one command per connection)."""
try:
line = await asyncio.wait_for(reader.readline(), timeout=300)
if not line:
return
request = {}
try:
request = json.loads(line.decode())
response = await self.dispatch(request)
except json.JSONDecodeError as e:
response = {'id': '', 'success': False, 'error': f'Invalid JSON: {e}'}
except Exception as e:
logger.exception(f'Error handling request: {e}')
response = {'id': '', 'success': False, 'error': str(e)}
writer.write((json.dumps(response) + '\n').encode())
await writer.drain()
if request.get('action') == 'shutdown':
await self.shutdown()
except TimeoutError:
logger.debug('Connection timeout')
except Exception as e:
logger.exception(f'Connection error: {e}')
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
async def dispatch(self, request: dict) -> dict:
"""Route to command handlers."""
action = request.get('action', '')
params = request.get('params', {})
req_id = request.get('id', '')
logger.info(f'Dispatch: {action} (id={req_id})')
try:
# Handle shutdown
if action == 'shutdown':
return {'id': req_id, 'success': True, 'data': {'shutdown': True}}
# Handle ping — returns daemon config for mismatch detection
if action == 'ping':
return {
'id': req_id,
'success': True,
'data': {
'browser_mode': self.browser_mode,
'headed': self.headed,
'profile': self.profile,
},
}
from browser_use.skill_cli.commands import agent, browser, python_exec
# Get or create the single session
session = await self._get_or_create_session()
# Dispatch to handler
if action in browser.COMMANDS:
result = await browser.handle(action, session, params)
elif action == 'python':
result = await python_exec.handle(session, params)
elif action == 'run':
result = await agent.handle(session, params)
else:
return {'id': req_id, 'success': False, 'error': f'Unknown action: {action}'}
return {'id': req_id, 'success': True, 'data': result}
except Exception as e:
logger.exception(f'Error dispatching {action}: {e}')
return {'id': req_id, 'success': False, 'error': str(e)}
async def run(self) -> None:
"""Listen on Unix socket (or TCP on Windows). No PID file, no lock file."""
from browser_use.skill_cli.utils import get_socket_path
# Setup signal handlers
loop = asyncio.get_running_loop()
def signal_handler():
asyncio.create_task(self.shutdown())
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, signal_handler)
except NotImplementedError:
pass # Windows doesn't support add_signal_handler
if hasattr(signal, 'SIGHUP'):
try:
loop.add_signal_handler(signal.SIGHUP, signal_handler)
except NotImplementedError:
pass
sock_path = get_socket_path()
logger.info(f'Socket: {sock_path}')
if sock_path.startswith('tcp://'):
# Windows: TCP server
_, hostport = sock_path.split('://', 1)
host, port = hostport.split(':')
self._server = await asyncio.start_server(
self.handle_connection,
host,
int(port),
reuse_address=True,
)
logger.info(f'Listening on TCP {host}:{port}')
else:
# Unix: socket server
Path(sock_path).unlink(missing_ok=True)
self._server = await asyncio.start_unix_server(
self.handle_connection,
sock_path,
)
logger.info(f'Listening on Unix socket {sock_path}')
try:
async with self._server:
await self._shutdown_event.wait()
except asyncio.CancelledError:
pass
finally:
# Clean up socket file
if not sock_path.startswith('tcp://'):
Path(sock_path).unlink(missing_ok=True)
logger.info('Daemon stopped')
async def shutdown(self) -> None:
"""Graceful shutdown."""
logger.info('Shutting down daemon...')
self.running = False
self._shutdown_event.set()
if self._session:
try:
await self._session.browser_session.kill()
except Exception as e:
logger.warning(f'Error closing session: {e}')
self._session = None
if self._browser_watchdog_task:
self._browser_watchdog_task.cancel()
if self._server:
self._server.close()
def main() -> None:
"""Main entry point for daemon process."""
parser = argparse.ArgumentParser(description='Browser-use daemon')
parser.add_argument('--browser', default='chromium', choices=['chromium', 'real'])
parser.add_argument('--headed', action='store_true', help='Show browser window')
parser.add_argument('--profile', help='Chrome profile (real browser mode)')
args = parser.parse_args()
logger.info(f'Starting daemon: browser={args.browser}, headed={args.headed}')
daemon = Daemon(
browser_mode=args.browser,
headed=args.headed,
profile=args.profile,
)
try:
asyncio.run(daemon.run())
except KeyboardInterrupt:
logger.info('Interrupted')
except Exception as e:
logger.exception(f'Daemon error: {e}')
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -1,711 +0,0 @@
"""Serverless CLI for browser-use - runs commands directly without a session server.
Each command reconnects to the browser via CDP WebSocket URL saved to a state file.
The browser process stays alive between commands; only the Python process exits.
Two-tier reconnection:
Tier 1 (Lightweight CDP, ~200ms): Most commands use raw CDPClient + Target.attachToTarget.
No BrowserSession, no watchdogs, no event bus.
Tier 2 (Full BrowserSession, ~3s): Only for `state` (needs DOMWatchdog) and first-time
`open` (needs to launch browser).
Usage:
python -m browser_use.skill_cli.direct open https://example.com
python -m browser_use.skill_cli.direct state
python -m browser_use.skill_cli.direct click 200 400
python -m browser_use.skill_cli.direct screenshot ./shot.png
python -m browser_use.skill_cli.direct close
"""
import asyncio
import base64
import json
import sys
import tempfile
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from cdp_use import CDPClient
from browser_use.browser.session import BrowserSession
STATE_FILE = Path(tempfile.gettempdir()) / 'browser-use-direct.json'
# ---------------------------------------------------------------------------
# State persistence
# ---------------------------------------------------------------------------
def _load_state() -> dict[str, Any]:
if STATE_FILE.exists():
try:
return json.loads(STATE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
return {}
def _save_state(state: dict[str, Any]) -> None:
STATE_FILE.write_text(json.dumps(state))
def _clear_state() -> None:
STATE_FILE.unlink(missing_ok=True)
# ---------------------------------------------------------------------------
# Selector map cache (persisted in state file under "selector_map" key)
# ---------------------------------------------------------------------------
def _save_selector_cache(selector_map: dict[int, Any]) -> None:
"""Cache element positions from the selector map into the state file.
Stores absolute_position (document coordinates) so click-by-index can
convert to viewport coords at click time using current scroll offset.
"""
cache: dict[str, dict[str, Any]] = {}
for idx, node in selector_map.items():
pos = getattr(node, 'absolute_position', None)
if pos is None:
continue
text = ''
if hasattr(node, 'ax_node') and node.ax_node and node.ax_node.name:
text = node.ax_node.name
elif hasattr(node, 'node_value') and node.node_value:
text = node.node_value
tag = getattr(node, 'node_name', '') or ''
cache[str(idx)] = {
'x': pos.x,
'y': pos.y,
'w': pos.width,
'h': pos.height,
'tag': tag.lower(),
'text': text[:80],
}
state = _load_state()
state['selector_map'] = cache
_save_state(state)
def _load_selector_cache() -> dict[int, dict[str, Any]]:
"""Load cached element positions. Returns {index: {x, y, w, h, tag, text}}."""
state = _load_state()
raw = state.get('selector_map', {})
return {int(k): v for k, v in raw.items()}
# ---------------------------------------------------------------------------
# Tier 1: Lightweight CDP connection (~200ms)
# ---------------------------------------------------------------------------
@dataclass
class LightCDP:
"""Minimal CDP connection — no BrowserSession, no watchdogs."""
client: 'CDPClient'
session_id: str
target_id: str
@asynccontextmanager
async def _lightweight_cdp():
"""Connect to the browser via raw CDP. ~200ms total.
Raises RuntimeError if no saved state or browser is dead.
"""
from cdp_use import CDPClient
state = _load_state()
cdp_url = state.get('cdp_url')
if not cdp_url:
raise RuntimeError('No active browser session')
client = CDPClient(cdp_url)
try:
await client.start()
except Exception as e:
raise RuntimeError(f'Cannot connect to browser at {cdp_url}: {e}') from e
target_id = state.get('target_id')
# If no saved target, discover one
if not target_id:
targets = await client.send.Target.getTargets()
for t in targets.get('targetInfos', []):
if t.get('type') == 'page' and t.get('url', '').startswith(('http://', 'https://')):
target_id = t['targetId']
break
if not target_id:
await client.stop()
raise RuntimeError('No page target found in browser')
# Attach to the target
attach_result = await client.send.Target.attachToTarget(params={'targetId': target_id, 'flatten': True})
session_id = attach_result.get('sessionId')
if not session_id:
await client.stop()
raise RuntimeError(f'Failed to attach to target {target_id}')
# Enable required domains
await client.send.Page.enable(session_id=session_id)
await client.send.Runtime.enable(session_id=session_id)
try:
yield LightCDP(client=client, session_id=session_id, target_id=target_id)
finally:
try:
await client.stop()
except Exception:
pass
# ---------------------------------------------------------------------------
# Tier 2: Full BrowserSession (for state + first-time open)
# ---------------------------------------------------------------------------
async def _activate_content_target(session: 'BrowserSession', saved_target_id: str | None) -> None:
"""After reconnection, ensure the session focuses on the actual page, not about:blank."""
current_url = await session.get_current_page_url()
if current_url and current_url.startswith(('http://', 'https://')):
return
if saved_target_id and session.session_manager:
target = session.session_manager.get_target(saved_target_id)
if target and target.url and target.url.startswith(('http://', 'https://')):
try:
await session.get_or_create_cdp_session(saved_target_id, focus=True)
return
except (ValueError, Exception):
pass
if session._cdp_client_root:
targets_result = await session._cdp_client_root.send.Target.getTargets()
for t in targets_result.get('targetInfos', []):
if t.get('type') == 'page' and t.get('url', '').startswith(('http://', 'https://')):
try:
await session.get_or_create_cdp_session(t['targetId'], focus=True)
return
except (ValueError, Exception):
pass
@asynccontextmanager
async def browser(use_remote: bool = False):
"""Connect to existing browser or launch a new one. Disconnects CDP on exit."""
from browser_use.browser.session import BrowserSession
state = _load_state()
cdp_url = state.get('cdp_url')
session = None
if cdp_url:
session = BrowserSession(cdp_url=cdp_url)
try:
await session.start()
await _activate_content_target(session, state.get('target_id'))
except Exception:
_clear_state()
session = None
if session is None:
if use_remote:
session = BrowserSession(use_cloud=True)
else:
session = BrowserSession(headless=False)
await session.start()
assert session.cdp_url is not None
_save_state({'cdp_url': session.cdp_url, 'remote': use_remote})
try:
yield session
finally:
if session.agent_focus_target_id:
current_state = _load_state()
current_state['target_id'] = session.agent_focus_target_id
_save_state(current_state)
if session._cdp_client_root:
try:
await session._cdp_client_root.stop()
except Exception:
pass
await session.event_bus.stop(clear=True, timeout=2)
# ---------------------------------------------------------------------------
# Lightweight CDP command functions (Tier 1)
# ---------------------------------------------------------------------------
async def _cdp_navigate(cdp: LightCDP, url: str) -> None:
"""Navigate to URL and invalidate selector cache."""
await cdp.client.send.Page.navigate(params={'url': url}, session_id=cdp.session_id)
# Invalidate selector cache — page changed, elements are gone
state = _load_state()
state.pop('selector_map', None)
_save_state(state)
async def _cdp_screenshot(cdp: LightCDP, path: str | None) -> None:
"""Take screenshot, save to file or print base64+dimensions."""
result = await cdp.client.send.Page.captureScreenshot(params={'format': 'png'}, session_id=cdp.session_id)
data = base64.b64decode(result['data'])
if path:
p = Path(path)
p.write_bytes(data) # noqa: ASYNC240
print(f'Screenshot saved to {p} ({len(data)} bytes)')
else:
# Get viewport dimensions
metrics = await cdp.client.send.Page.getLayoutMetrics(session_id=cdp.session_id)
visual = metrics.get('visualViewport', {})
output: dict[str, Any] = {
'screenshot': result['data'],
'size_bytes': len(data),
}
if visual:
output['viewport'] = {
'width': int(visual.get('clientWidth', 0)),
'height': int(visual.get('clientHeight', 0)),
}
print(json.dumps(output))
async def _cdp_click_coordinate(cdp: LightCDP, x: int, y: int) -> None:
"""Click at viewport coordinates using CDP Input.dispatchMouseEvent."""
sid = cdp.session_id
await cdp.client.send.Input.dispatchMouseEvent(
params={'type': 'mouseMoved', 'x': x, 'y': y},
session_id=sid,
)
await asyncio.sleep(0.05)
await cdp.client.send.Input.dispatchMouseEvent(
params={'type': 'mousePressed', 'x': x, 'y': y, 'button': 'left', 'clickCount': 1},
session_id=sid,
)
await asyncio.sleep(0.05)
await cdp.client.send.Input.dispatchMouseEvent(
params={'type': 'mouseReleased', 'x': x, 'y': y, 'button': 'left', 'clickCount': 1},
session_id=sid,
)
async def _get_scroll_offset(cdp: LightCDP) -> tuple[float, float]:
"""Get current scroll position via JS."""
result = await cdp.client.send.Runtime.evaluate(
params={
'expression': 'JSON.stringify({x:window.scrollX,y:window.scrollY})',
'returnByValue': True,
},
session_id=cdp.session_id,
)
data = json.loads(result.get('result', {}).get('value', '{"x":0,"y":0}'))
return (data['x'], data['y'])
async def _cdp_click_index(cdp: LightCDP, index: int) -> None:
"""Click element by cached index. Converts document coords to viewport coords."""
cache = _load_selector_cache()
if index not in cache:
print(f'Error: Element index {index} not in cache. Run "state" first.', file=sys.stderr)
sys.exit(1)
elem = cache[index]
scroll_x, scroll_y = await _get_scroll_offset(cdp)
# Center of element in document coords, converted to viewport coords
viewport_x = int(elem['x'] + elem['w'] / 2 - scroll_x)
viewport_y = int(elem['y'] + elem['h'] / 2 - scroll_y)
await _cdp_click_coordinate(cdp, viewport_x, viewport_y)
tag = elem.get('tag', '')
text = elem.get('text', '')
label = f'{tag}' + (f' "{text}"' if text else '')
print(f'Clicked element [{index}] {label} at ({viewport_x}, {viewport_y})')
async def _cdp_type(cdp: LightCDP, text: str) -> None:
"""Type text into focused element."""
await cdp.client.send.Input.insertText(params={'text': text}, session_id=cdp.session_id)
async def _cdp_input(cdp: LightCDP, index: int, text: str) -> None:
"""Click element by index then type text."""
await _cdp_click_index(cdp, index)
await asyncio.sleep(0.1)
await _cdp_type(cdp, text)
print(f'Typed "{text}" into element [{index}]')
async def _cdp_scroll(cdp: LightCDP, direction: str) -> None:
"""Scroll page up or down by 500px."""
amount = -500 if direction == 'up' else 500
await cdp.client.send.Runtime.evaluate(
params={
'expression': f'window.scrollBy(0, {amount})',
'returnByValue': True,
},
session_id=cdp.session_id,
)
async def _cdp_back(cdp: LightCDP) -> None:
"""Go back in browser history."""
nav = await cdp.client.send.Page.getNavigationHistory(session_id=cdp.session_id)
current_index = nav.get('currentIndex', 0)
entries = nav.get('entries', [])
if current_index > 0:
prev_entry = entries[current_index - 1]
await cdp.client.send.Page.navigateToHistoryEntry(params={'entryId': prev_entry['id']}, session_id=cdp.session_id)
# Invalidate selector cache on navigation
state = _load_state()
state.pop('selector_map', None)
_save_state(state)
else:
print('Already at the beginning of history', file=sys.stderr)
async def _cdp_keys(cdp: LightCDP, keys_str: str) -> None:
"""Send keyboard keys/shortcuts via CDP."""
from browser_use.actor.utils import get_key_info
# Key alias normalization (same as default_action_watchdog)
key_aliases = {
'ctrl': 'Control',
'control': 'Control',
'alt': 'Alt',
'option': 'Alt',
'meta': 'Meta',
'cmd': 'Meta',
'command': 'Meta',
'shift': 'Shift',
'enter': 'Enter',
'return': 'Enter',
'tab': 'Tab',
'delete': 'Delete',
'backspace': 'Backspace',
'escape': 'Escape',
'esc': 'Escape',
'space': ' ',
'up': 'ArrowUp',
'down': 'ArrowDown',
'left': 'ArrowLeft',
'right': 'ArrowRight',
'pageup': 'PageUp',
'pagedown': 'PageDown',
'home': 'Home',
'end': 'End',
}
sid = cdp.session_id
async def dispatch_key(event_type: str, key: str, modifiers: int = 0) -> None:
from cdp_use.cdp.input.commands import DispatchKeyEventParameters
code, vk_code = get_key_info(key)
params: DispatchKeyEventParameters = {'type': event_type, 'key': key, 'code': code}
if modifiers:
params['modifiers'] = modifiers
if vk_code is not None:
params['windowsVirtualKeyCode'] = vk_code
await cdp.client.send.Input.dispatchKeyEvent(params=params, session_id=sid)
# Normalize
if '+' in keys_str:
parts = [key_aliases.get(p.strip().lower(), p.strip()) for p in keys_str.split('+')]
modifiers_list = parts[:-1]
main_key = parts[-1]
modifier_map = {'Alt': 1, 'Control': 2, 'Meta': 4, 'Shift': 8}
modifier_value = 0
for mod in modifiers_list:
modifier_value |= modifier_map.get(mod, 0)
for mod in modifiers_list:
await dispatch_key('keyDown', mod)
await dispatch_key('keyDown', main_key, modifier_value)
await dispatch_key('keyUp', main_key, modifier_value)
for mod in reversed(modifiers_list):
await dispatch_key('keyUp', mod)
else:
normalized = key_aliases.get(keys_str.strip().lower(), keys_str)
special_keys = {
'Enter',
'Tab',
'Delete',
'Backspace',
'Escape',
'ArrowUp',
'ArrowDown',
'ArrowLeft',
'ArrowRight',
'PageUp',
'PageDown',
'Home',
'End',
'Control',
'Alt',
'Meta',
'Shift',
'F1',
'F2',
'F3',
'F4',
'F5',
'F6',
'F7',
'F8',
'F9',
'F10',
'F11',
'F12',
}
if normalized in special_keys:
await dispatch_key('keyDown', normalized)
if normalized == 'Enter':
await cdp.client.send.Input.dispatchKeyEvent(
params={'type': 'char', 'text': '\r', 'key': 'Enter'},
session_id=sid,
)
await dispatch_key('keyUp', normalized)
else:
# Plain text — use insertText for each character
for char in normalized:
await cdp.client.send.Input.insertText(
params={'text': char},
session_id=sid,
)
async def _cdp_html(cdp: LightCDP, selector: str | None) -> None:
"""Get raw HTML of the page or a CSS selector."""
if selector:
js = f'(function(){{ const el = document.querySelector({json.dumps(selector)}); return el ? el.outerHTML : null; }})()'
else:
js = 'document.documentElement.outerHTML'
result = await cdp.client.send.Runtime.evaluate(params={'expression': js, 'returnByValue': True}, session_id=cdp.session_id)
html = result.get('result', {}).get('value')
if html:
print(html)
else:
msg = f'No element found for selector: {selector}' if selector else 'Error: Could not get HTML'
print(msg, file=sys.stderr)
sys.exit(1)
async def _cdp_eval(cdp: LightCDP, js: str) -> None:
"""Execute JavaScript and print result."""
result = await cdp.client.send.Runtime.evaluate(params={'expression': js, 'returnByValue': True}, session_id=cdp.session_id)
value = result.get('result', {}).get('value')
print(json.dumps(value) if value is not None else 'undefined')
# ---------------------------------------------------------------------------
# Command routing
# ---------------------------------------------------------------------------
# Commands that always use lightweight CDP (Tier 1)
_LIGHTWEIGHT_COMMANDS = frozenset(
{
'screenshot',
'click',
'type',
'input',
'scroll',
'back',
'keys',
'html',
'eval',
}
)
async def main() -> int:
args = sys.argv[1:]
if not args or args[0] in ('help', '--help', '-h'):
print("""Usage: python -m browser_use.skill_cli.direct <command> [args]
Commands:
open <url> Navigate to URL
state Get DOM state with viewport info
click <index> Click element by index (uses cached positions)
click <x> <y> Click at viewport coordinates
type <text> Type into focused element
input <index> <text> Click element then type
screenshot [path] Take screenshot (saves to file or prints base64+dimensions)
scroll [up|down] Scroll page (default: down)
back Go back in history
keys <keys> Send keyboard keys
html [selector] Get raw HTML (full page or CSS selector)
eval <js> Execute JavaScript
close Kill browser and clean up
Flags:
--remote Use browser-use cloud browser (requires BROWSER_USE_API_KEY)""")
return 0 if args else 1
# Extract --remote flag
use_remote = '--remote' in args
args = [a for a in args if a != '--remote']
if not args:
print('Error: No command specified', file=sys.stderr)
return 1
command = args[0]
# ── close: lightweight CDP kill ──────────────────────────────────────
if command == 'close':
state = _load_state()
cdp_url = state.get('cdp_url')
if not cdp_url:
print('No active browser session')
else:
closed = False
try:
from cdp_use import CDPClient
client = CDPClient(cdp_url)
await client.start()
await client.send.Browser.close()
await client.stop()
closed = True
except Exception:
pass
if not closed:
try:
from browser_use.browser.session import BrowserSession
session = BrowserSession(cdp_url=cdp_url)
await session.start()
await session.kill()
except Exception:
pass
_clear_state()
print('Browser closed')
return 0
# ── open: lightweight if reconnecting, full session if first launch ──
if command == 'open' and len(args) >= 2:
url = args[1]
if not url.startswith(('http://', 'https://', 'file://')):
url = 'https://' + url
state = _load_state()
if state.get('cdp_url'):
# Reconnect — lightweight CDP navigate
try:
async with _lightweight_cdp() as cdp:
await _cdp_navigate(cdp, url)
# Update target_id in state
current_state = _load_state()
current_state['target_id'] = cdp.target_id
_save_state(current_state)
print(f'Navigated to: {url}')
return 0
except RuntimeError:
# Browser died — fall through to full session launch
_clear_state()
# First launch — needs full session
async with browser(use_remote=use_remote) as session:
from browser_use.browser.events import NavigateToUrlEvent
await session.event_bus.dispatch(NavigateToUrlEvent(url=url))
if session.agent_focus_target_id:
current_state = _load_state()
current_state['target_id'] = session.agent_focus_target_id
_save_state(current_state)
print(f'Navigated to: {url}')
return 0
# ── state: full session (needs DOMWatchdog for DOM tree building) ────
if command == 'state':
async with browser(use_remote=use_remote) as session:
state_summary = await session.get_browser_state_summary()
assert state_summary.dom_state is not None
text = state_summary.dom_state.llm_representation()
if state_summary.page_info:
pi = state_summary.page_info
header = f'viewport: {pi.viewport_width}x{pi.viewport_height}\n'
header += f'page: {pi.page_width}x{pi.page_height}\n'
header += f'scroll: ({pi.scroll_x}, {pi.scroll_y})\n'
text = header + text
print(text)
# Cache selector map for subsequent click-by-index
selector_map = await session.get_selector_map()
if selector_map:
_save_selector_cache(selector_map)
return 0
# ── Lightweight commands (Tier 1) ────────────────────────────────────
if command in _LIGHTWEIGHT_COMMANDS:
try:
async with _lightweight_cdp() as cdp:
if command == 'screenshot':
path = args[1] if len(args) >= 2 else None
await _cdp_screenshot(cdp, path)
elif command == 'click' and len(args) >= 2:
int_args = [int(a) for a in args[1:]]
if len(int_args) == 2:
x, y = int_args
await _cdp_click_coordinate(cdp, x, y)
print(f'Clicked at ({x}, {y})')
elif len(int_args) == 1:
await _cdp_click_index(cdp, int_args[0])
else:
print('Usage: click <index> or click <x> <y>', file=sys.stderr)
return 1
elif command == 'type' and len(args) >= 2:
text = ' '.join(args[1:])
await _cdp_type(cdp, text)
print(f'Typed: {text}')
elif command == 'input' and len(args) >= 3:
index = int(args[1])
text = ' '.join(args[2:])
await _cdp_input(cdp, index, text)
elif command == 'scroll':
direction = args[1] if len(args) >= 2 else 'down'
await _cdp_scroll(cdp, direction)
print(f'Scrolled {direction}')
elif command == 'back':
await _cdp_back(cdp)
print('Navigated back')
elif command == 'keys' and len(args) >= 2:
await _cdp_keys(cdp, ' '.join(args[1:]))
print(f'Sent keys: {" ".join(args[1:])}')
elif command == 'html':
selector = args[1] if len(args) >= 2 else None
await _cdp_html(cdp, selector)
elif command == 'eval' and len(args) >= 2:
js = ' '.join(args[1:])
await _cdp_eval(cdp, js)
else:
print(f'Missing arguments for: {command}', file=sys.stderr)
return 1
except RuntimeError as e:
print(f'Error: {e}', file=sys.stderr)
return 1
return 0
print(f'Unknown command: {command}', file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(asyncio.run(main()))

View File

@@ -76,10 +76,10 @@
# - Always kill stale processes before retrying
# - Or kill all Python: taskkill /IM python.exe /F
#
# 7. Debugging server issues
# To see actual error messages instead of "Failed to start session server":
# & "$env:USERPROFILE\.browser-use-env\Scripts\python.exe" -m browser_use.skill_cli.server --session default --browser chromium
# This runs the server in foreground and shows all errors.
# 7. Debugging daemon issues
# To see actual error messages instead of "Failed to start daemon":
# & "$env:USERPROFILE\.browser-use-env\Scripts\python.exe" -m browser_use.skill_cli.daemon --browser chromium
# This runs the daemon in foreground and shows all errors.
#
# =============================================================================

View File

@@ -2,13 +2,12 @@
"""Fast CLI for browser-use. STDLIB ONLY - must start in <50ms.
This is the main entry point for the browser-use CLI. It uses only stdlib
imports to ensure fast startup, delegating heavy operations to the session
server which loads once and stays running.
imports to ensure fast startup, delegating heavy operations to the daemon
which loads once and stays running.
"""
import argparse
import asyncio
import hashlib
import json
import os
import socket
@@ -20,7 +19,7 @@ from pathlib import Path
# =============================================================================
# Early command interception (before heavy imports)
# These commands don't need the session server infrastructure
# These commands don't need the daemon infrastructure
# =============================================================================
# Handle --mcp flag early to prevent logging initialization
@@ -124,7 +123,7 @@ if '--template' in sys.argv:
# Keep --force/-f and --list/-l flags
elif arg in ('--force', '-f', '--list', '-l'):
new_argv.append(arg)
# Skip other flags (--session, --browser, --headed, etc.)
# Skip other flags (--browser, --headed, etc.)
i += 1
sys.argv = new_argv
@@ -136,69 +135,24 @@ if '--template' in sys.argv:
# =============================================================================
def get_socket_path(session: str) -> str:
"""Get socket path for session."""
def _get_socket_path() -> str:
"""Get the fixed daemon socket path."""
if sys.platform == 'win32':
# Use 127.0.0.1 explicitly (not localhost) to avoid IPv6 binding issues
port = 49152 + (int(hashlib.md5(session.encode()).hexdigest()[:4], 16) % 16383)
return f'tcp://127.0.0.1:{port}'
return str(Path(tempfile.gettempdir()) / f'browser-use-{session}.sock')
return 'tcp://127.0.0.1:49200'
return str(Path(tempfile.gettempdir()) / 'browser-use-cli.sock')
def get_pid_path(session: str) -> Path:
"""Get PID file path for session."""
return Path(tempfile.gettempdir()) / f'browser-use-{session}.pid'
def _pid_exists(pid: int) -> bool:
"""Check if a process with given PID exists.
On Windows, uses ctypes to call OpenProcess (os.kill doesn't work reliably).
On Unix, uses os.kill(pid, 0) which is the standard approach.
"""
if sys.platform == 'win32':
import ctypes
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
handle = ctypes.windll.kernel32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, False, pid)
if handle:
ctypes.windll.kernel32.CloseHandle(handle)
return True
return False
else:
try:
os.kill(pid, 0)
return True
except OSError:
return False
def is_server_running(session: str) -> bool:
"""Check if server is running for session."""
pid_path = get_pid_path(session)
if not pid_path.exists():
return False
try:
pid = int(pid_path.read_text().strip())
return _pid_exists(pid)
except (OSError, ValueError):
# Can't read PID file or invalid PID
return False
def connect_to_server(session: str, timeout: float = 60.0) -> socket.socket:
"""Connect to session server."""
sock_path = get_socket_path(session)
def _connect_to_daemon(timeout: float = 60.0) -> socket.socket:
"""Connect to daemon socket."""
sock_path = _get_socket_path()
if sock_path.startswith('tcp://'):
# Windows: TCP connection
_, hostport = sock_path.split('://', 1)
host, port = hostport.split(':')
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
sock.connect((host, int(port)))
else:
# Unix socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(timeout)
sock.connect(sock_path)
@@ -206,47 +160,52 @@ def connect_to_server(session: str, timeout: float = 60.0) -> socket.socket:
return sock
def get_session_metadata_path(session: str) -> Path:
"""Get path to session metadata file (stores browser_mode, headed, profile)."""
return Path(tempfile.gettempdir()) / f'browser-use-{session}.meta'
def _is_daemon_alive() -> bool:
"""Check if daemon is alive by attempting socket connect."""
try:
sock = _connect_to_daemon(timeout=0.5)
sock.close()
return True
except OSError:
# Clean up stale socket on Unix
sock_path = _get_socket_path()
if not sock_path.startswith('tcp://'):
Path(sock_path).unlink(missing_ok=True)
return False
def ensure_server(session: str, browser: str, headed: bool, profile: str | None, api_key: str | None) -> bool:
"""Start server if not running. Returns True if started."""
from browser_use.skill_cli.utils import is_session_locked, kill_orphaned_server
def ensure_daemon(
browser: str,
headed: bool,
profile: str | None,
api_key: str | None,
*,
explicit_config: bool = False,
) -> None:
"""Start daemon if not running. Restarts only if user explicitly set config flags."""
if _is_daemon_alive():
if not explicit_config:
return # Daemon is alive, user didn't request specific config — reuse it
meta_path = get_session_metadata_path(session)
# Check if server is already running AND holding its lock (healthy server)
if is_server_running(session) and is_session_locked(session):
# User explicitly set --browser/--headed/--profile — check config matches
try:
sock = connect_to_server(session, timeout=0.5) # Increased from 0.1s
sock.close()
response = send_command('ping', {})
if response.get('success'):
data = response.get('data', {})
if data.get('browser_mode') == browser and data.get('headed') == headed and data.get('profile') == profile:
return # Already running with correct config
# Check browser mode matches existing session
if meta_path.exists():
try:
meta = json.loads(meta_path.read_text())
existing_mode = meta.get('browser_mode', 'chromium')
if existing_mode != browser:
pass # Mode mismatch is non-fatal for local modes
except (json.JSONDecodeError, OSError):
pass # Metadata file corrupt, ignore
return False # Already running with correct mode
# Config mismatch — shutdown and restart
send_command('shutdown', {})
time.sleep(0.3)
except Exception:
pass # Server not responsive, continue to restart logic
pass # Daemon not responsive, continue to start
# Kill any orphaned server (has PID file but no lock)
kill_orphaned_server(session)
# Build server command
# Build daemon command
cmd = [
sys.executable,
'-m',
'browser_use.skill_cli.server',
'--session',
session,
'browser_use.skill_cli.daemon',
'--browser',
browser,
]
@@ -260,10 +219,8 @@ def ensure_server(session: str, browser: str, headed: bool, profile: str | None,
if api_key:
env['BROWSER_USE_API_KEY'] = api_key
# Start server as background process
# Start daemon as background process
if sys.platform == 'win32':
# Windows: CREATE_NO_WINDOW prevents console window from appearing
# CREATE_NEW_PROCESS_GROUP allows the process to survive parent exit
subprocess.Popen(
cmd,
env=env,
@@ -272,7 +229,6 @@ def ensure_server(session: str, browser: str, headed: bool, profile: str | None,
stderr=subprocess.DEVNULL,
)
else:
# Unix: use start_new_session
subprocess.Popen(
cmd,
env=env,
@@ -281,43 +237,25 @@ def ensure_server(session: str, browser: str, headed: bool, profile: str | None,
stderr=subprocess.DEVNULL,
)
# Wait for server to be ready (must have PID, lock, and responsive socket)
# Wait for daemon to be ready
for _ in range(100): # 5 seconds max
if is_server_running(session) and is_session_locked(session):
try:
sock = connect_to_server(session, timeout=0.5)
sock.close()
# Write metadata file to track session config
meta_path.write_text(
json.dumps(
{
'browser_mode': browser,
'headed': headed,
'profile': profile,
}
)
)
return True
except Exception:
pass
if _is_daemon_alive():
return
time.sleep(0.05)
print('Error: Failed to start session server', file=sys.stderr)
print('Error: Failed to start daemon', file=sys.stderr)
sys.exit(1)
def send_command(session: str, action: str, params: dict) -> dict:
"""Send command to server and get response."""
def send_command(action: str, params: dict) -> dict:
"""Send command to daemon and get response."""
request = {
'id': f'r{int(time.time() * 1000000) % 1000000}',
'action': action,
'session': session,
'params': params,
}
sock = connect_to_server(session)
sock = _connect_to_daemon()
try:
# Send request
sock.sendall((json.dumps(request) + '\n').encode())
@@ -331,7 +269,7 @@ def send_command(session: str, action: str, params: dict) -> dict:
data += chunk
if not data:
return {'id': request['id'], 'success': False, 'error': 'No response from server'}
return {'id': request['id'], 'success': False, 'error': 'No response from daemon'}
return json.loads(data.decode())
finally:
@@ -374,7 +312,6 @@ Setup:
)
# Global flags
parser.add_argument('--session', '-s', default='default', help='Session name (default: default)')
parser.add_argument(
'--browser',
'-b',
@@ -607,22 +544,8 @@ Setup:
# Session Management
# -------------------------------------------------------------------------
# sessions
subparsers.add_parser('sessions', help='List active sessions')
# close
p = subparsers.add_parser('close', help='Close session')
p.add_argument('--all', action='store_true', help='Close all sessions')
# -------------------------------------------------------------------------
# Server Control
# -------------------------------------------------------------------------
server_p = subparsers.add_parser('server', help='Server control')
server_sub = server_p.add_subparsers(dest='server_command')
server_sub.add_parser('status', help='Check server status')
server_sub.add_parser('stop', help='Stop server')
server_sub.add_parser('logs', help='View server logs')
subparsers.add_parser('close', help='Close browser and stop daemon')
# -------------------------------------------------------------------------
# Profile Management (local only, use -b real)
@@ -645,39 +568,6 @@ Setup:
return parser
def handle_server_command(args: argparse.Namespace) -> int:
"""Handle server subcommands."""
if args.server_command == 'status':
if is_server_running(args.session):
print(f'Server for session "{args.session}" is running')
return 0
else:
print(f'Server for session "{args.session}" is not running')
return 1
elif args.server_command == 'stop':
if not is_server_running(args.session):
print(f'Server for session "{args.session}" is not running')
return 0
response = send_command(args.session, 'shutdown', {})
if response.get('success'):
print(f'Server for session "{args.session}" stopped')
return 0
else:
print(f'Error: {response.get("error")}', file=sys.stderr)
return 1
elif args.server_command == 'logs':
log_path = Path(tempfile.gettempdir()) / f'browser-use-{args.session}.log'
if log_path.exists():
print(log_path.read_text())
else:
print('No logs found')
return 0
return 0
def main() -> int:
"""Main entry point."""
parser = build_parser()
@@ -687,60 +577,12 @@ def main() -> int:
parser.print_help()
return 0
# Handle server subcommands without starting server
if args.command == 'server':
return handle_server_command(args)
# Handle profile subcommands without starting server
# Handle profile subcommands without starting daemon
if args.command == 'profile':
from browser_use.skill_cli.commands.profile import handle_profile_command
return handle_profile_command(args)
# Handle sessions list - find all running sessions
if args.command == 'sessions':
from browser_use.skill_cli.utils import find_all_sessions
session_names = find_all_sessions()
sessions = [{'name': name, 'status': 'running'} for name in session_names]
if args.json:
print(json.dumps(sessions))
else:
if sessions:
for s in sessions:
print(f' {s["name"]}: {s["status"]}')
else:
print('No active sessions')
return 0
# Handle close --all by closing all running sessions
if args.command == 'close' and getattr(args, 'all', False):
from browser_use.skill_cli.utils import find_all_sessions
session_names = find_all_sessions()
closed = []
for name in session_names:
try:
response = send_command(name, 'close', {})
if response.get('success'):
closed.append(name)
# Clean up metadata file
meta_path = get_session_metadata_path(name)
if meta_path.exists():
meta_path.unlink()
except Exception:
pass # Server may already be stopping
if args.json:
print(json.dumps({'closed': closed, 'count': len(closed)}))
else:
if closed:
print(f'Closed {len(closed)} session(s): {", ".join(closed)}')
else:
print('No active sessions')
return 0
# Handle setup command
if args.command == 'setup':
from browser_use.skill_cli.commands import setup
@@ -864,6 +706,24 @@ def main() -> int:
print(f'Stopped tunnel on port {result["stopped"]}')
return 0
# Handle close — shutdown daemon
if args.command == 'close':
if _is_daemon_alive():
try:
response = send_command('shutdown', {})
if args.json:
print(json.dumps(response))
else:
print('Browser closed')
except Exception:
print('Browser closed')
else:
if args.json:
print(json.dumps({'success': True, 'data': {'shutdown': True}}))
else:
print('No active browser session')
return 0
# Validate requested mode is available based on installation config
from browser_use.skill_cli.install_config import get_mode_unavailable_error, is_mode_available
@@ -878,18 +738,19 @@ def main() -> int:
# Validate --profile flag usage
if args.profile and args.browser == 'chromium':
print(
'Error: --profile is not supported in chromium mode.\n'
'Use -b real for local Chrome profiles.',
'Error: --profile is not supported in chromium mode.\nUse -b real for local Chrome profiles.',
file=sys.stderr,
)
return 1
# Ensure server is running
ensure_server(args.session, args.browser, args.headed, args.profile, args.api_key)
# Ensure daemon is running
# Only restart on config mismatch if the user explicitly passed config flags
explicit_config = any(flag in sys.argv for flag in ('--browser', '-b', '--headed', '--profile'))
ensure_daemon(args.browser, args.headed, args.profile, args.api_key, explicit_config=explicit_config)
# Build params from args
params = {}
skip_keys = {'command', 'session', 'browser', 'headed', 'json', 'api_key', 'server_command'}
skip_keys = {'command', 'browser', 'headed', 'json', 'api_key'}
for key, value in vars(args).items():
if key not in skip_keys and value is not None:
@@ -899,14 +760,8 @@ def main() -> int:
if args.profile:
params['profile'] = args.profile
# Send command to server
response = send_command(args.session, args.command, params)
# Clean up metadata file on successful close
if args.command == 'close' and response.get('success'):
meta_path = get_session_metadata_path(args.session)
if meta_path.exists():
meta_path.unlink()
# Send command to daemon
response = send_command(args.command, params)
# Output response
if args.json:

View File

@@ -1,54 +0,0 @@
"""Wire protocol for CLI↔Server communication.
Uses JSON over Unix sockets (or TCP on Windows) with newline-delimited messages.
"""
import json
from dataclasses import asdict, dataclass, field
from typing import Any
@dataclass
class Request:
"""Command request from CLI to server."""
id: str
action: str
session: str
params: dict[str, Any] = field(default_factory=dict)
def to_json(self) -> str:
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str) -> 'Request':
d = json.loads(data)
return cls(
id=d['id'],
action=d['action'],
session=d['session'],
params=d.get('params', {}),
)
@dataclass
class Response:
"""Response from server to CLI."""
id: str
success: bool
data: Any = None
error: str | None = None
def to_json(self) -> str:
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str) -> 'Response':
d = json.loads(data)
return cls(
id=d['id'],
success=d['success'],
data=d.get('data'),
error=d.get('error'),
)

View File

@@ -1,292 +0,0 @@
"""Session server - keeps BrowserSession instances alive.
This server runs as a background process, managing browser sessions and
handling commands from the CLI. It uses Unix sockets (or TCP on Windows)
for IPC communication.
"""
import argparse
import asyncio
import json
import logging
import os
import signal
import sys
from pathlib import Path
from typing import IO
import portalocker
# Configure logging before imports
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger('browser_use.skill_cli.server')
class SessionServer:
"""Server that manages browser sessions and handles CLI commands."""
def __init__(
self,
session_name: str,
browser_mode: str,
headed: bool,
profile: str | None,
) -> None:
self.session_name = session_name
self.browser_mode = browser_mode
self.headed = headed
self.profile = profile
self.running = True
self._server: asyncio.Server | None = None
self._shutdown_event: asyncio.Event | None = None
self._lock_file: IO | None = None
# Lazy import to avoid loading everything at startup
from browser_use.skill_cli.sessions import SessionRegistry
self.registry = SessionRegistry()
async def handle_connection(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> None:
"""Handle a client connection."""
addr = writer.get_extra_info('peername')
logger.debug(f'Connection from {addr}')
try:
while self.running:
try:
line = await asyncio.wait_for(reader.readline(), timeout=300) # 5 min timeout
except TimeoutError:
logger.debug(f'Connection timeout from {addr}')
break
if not line:
break
request = {}
try:
request = json.loads(line.decode())
response = await self.dispatch(request)
except json.JSONDecodeError as e:
response = {'id': '', 'success': False, 'error': f'Invalid JSON: {e}'}
except Exception as e:
logger.exception(f'Error handling request: {e}')
response = {'id': '', 'success': False, 'error': str(e)}
writer.write((json.dumps(response) + '\n').encode())
await writer.drain()
# Check for shutdown command
if request.get('action') == 'shutdown':
await self.shutdown()
break
except Exception as e:
logger.exception(f'Connection error: {e}')
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
async def dispatch(self, request: dict) -> dict:
"""Dispatch command to appropriate handler."""
action = request.get('action', '')
params = request.get('params', {})
req_id = request.get('id', '')
logger.info(f'Dispatch: {action} (id={req_id})')
try:
# Import command handlers
from browser_use.skill_cli.commands import agent, browser, python_exec, session
# Handle shutdown
if action == 'shutdown':
return {'id': req_id, 'success': True, 'data': {'shutdown': True}}
# Session commands don't need a browser session
if action in session.COMMANDS:
result = await session.handle(action, self.session_name, self.registry, params)
# Check if command wants to shutdown server
if result.get('_shutdown'):
asyncio.create_task(self.shutdown())
return {'id': req_id, 'success': True, 'data': result}
# Get or create session for browser commands
session_info = await self.registry.get_or_create(
self.session_name,
self.browser_mode,
self.headed,
self.profile,
)
# Dispatch to handler
if action in browser.COMMANDS:
result = await browser.handle(action, session_info, params)
elif action == 'python':
result = await python_exec.handle(session_info, params)
elif action == 'run':
result = await agent.handle(session_info, params)
else:
return {'id': req_id, 'success': False, 'error': f'Unknown action: {action}'}
return {'id': req_id, 'success': True, 'data': result}
except Exception as e:
logger.exception(f'Error dispatching {action}: {e}')
return {'id': req_id, 'success': False, 'error': str(e)}
async def shutdown(self) -> None:
"""Graceful shutdown."""
logger.info('Shutting down server...')
self.running = False
# Signal the shutdown event
if self._shutdown_event:
self._shutdown_event.set()
# Close all sessions
await self.registry.close_all()
# Stop the server
if self._server:
self._server.close()
await self._server.wait_closed()
# Clean up files
from browser_use.skill_cli.utils import cleanup_session_files
cleanup_session_files(self.session_name)
async def run(self) -> None:
"""Run the server."""
from browser_use.skill_cli.utils import get_lock_path, get_pid_path, get_socket_path
# Acquire exclusive lock BEFORE writing PID - this prevents race conditions
lock_path = get_lock_path(self.session_name)
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.touch(exist_ok=True)
self._lock_file = open(lock_path, 'r+') # noqa: ASYNC230 - blocking ok at startup
try:
portalocker.lock(self._lock_file, portalocker.LOCK_EX | portalocker.LOCK_NB)
except portalocker.LockException:
logger.error(f'Another server is already running for session: {self.session_name}')
self._lock_file.close()
self._lock_file = None
sys.exit(1)
logger.info(f'Acquired exclusive lock for session: {self.session_name}')
# NOW safe to write PID file
pid_path = get_pid_path(self.session_name)
pid_path.write_text(str(os.getpid()))
logger.info(f'PID file: {pid_path}')
# Setup signal handlers
loop = asyncio.get_running_loop()
def signal_handler():
asyncio.create_task(self.shutdown())
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, signal_handler)
except NotImplementedError:
# Windows doesn't support add_signal_handler
pass
# Also handle SIGHUP on Unix
if hasattr(signal, 'SIGHUP'):
try:
loop.add_signal_handler(signal.SIGHUP, signal_handler)
except NotImplementedError:
pass
# Get socket path
sock_path = get_socket_path(self.session_name)
logger.info(f'Socket: {sock_path}')
# Start server
if sock_path.startswith('tcp://'):
# Windows: TCP server
_, hostport = sock_path.split('://', 1)
host, port = hostport.split(':')
self._server = await asyncio.start_server(
self.handle_connection,
host,
int(port),
reuse_address=True, # Allow rebinding ports in TIME_WAIT state
)
logger.info(f'Listening on TCP {host}:{port}')
else:
# Unix: socket server
# Remove stale socket file
sock_file = Path(sock_path)
if sock_file.exists():
sock_file.unlink()
self._server = await asyncio.start_unix_server(
self.handle_connection,
sock_path,
)
logger.info(f'Listening on Unix socket {sock_path}')
# Run until shutdown
self._shutdown_event = asyncio.Event()
try:
async with self._server:
await self._shutdown_event.wait()
except asyncio.CancelledError:
pass
finally:
# Release lock on shutdown
if self._lock_file:
try:
portalocker.unlock(self._lock_file)
self._lock_file.close()
except Exception:
pass
self._lock_file = None
logger.info('Server stopped')
def main() -> None:
"""Main entry point for server process."""
parser = argparse.ArgumentParser(description='Browser-use session server')
parser.add_argument('--session', required=True, help='Session name')
parser.add_argument('--browser', default='chromium', choices=['chromium', 'real'])
parser.add_argument('--headed', action='store_true', help='Show browser window')
parser.add_argument('--profile', help='Chrome profile (real browser mode)')
args = parser.parse_args()
logger.info(f'Starting server for session: {args.session}')
logger.info(f'Browser mode: {args.browser}, headed: {args.headed}')
server = SessionServer(
session_name=args.session,
browser_mode=args.browser,
headed=args.headed,
profile=args.profile,
)
try:
asyncio.run(server.run())
except KeyboardInterrupt:
logger.info('Interrupted')
except Exception as e:
logger.exception(f'Server error: {e}')
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -1,8 +1,7 @@
"""Session registry - manages BrowserSession instances."""
"""Session data — SessionInfo dataclass and browser session factory."""
import logging
from dataclasses import dataclass, field
from typing import Any
from browser_use.browser.session import BrowserSession
from browser_use.skill_cli.python_session import PythonSession
@@ -22,81 +21,6 @@ class SessionInfo:
python_session: PythonSession = field(default_factory=PythonSession)
class SessionRegistry:
"""Registry of active browser sessions.
Sessions are created on-demand when first accessed. Each named session
is isolated with its own BrowserSession and Python namespace.
"""
def __init__(self) -> None:
self._sessions: dict[str, SessionInfo] = {}
async def get_or_create(
self,
name: str,
browser_mode: str,
headed: bool,
profile: str | None,
) -> SessionInfo:
"""Get existing session or create new one."""
if name in self._sessions:
return self._sessions[name]
logger.info(f'Creating new session: {name} (mode={browser_mode}, headed={headed})')
browser_session = await create_browser_session(browser_mode, headed, profile)
await browser_session.start()
session_info = SessionInfo(
name=name,
browser_mode=browser_mode,
headed=headed,
profile=profile,
browser_session=browser_session,
)
self._sessions[name] = session_info
return session_info
def get(self, name: str) -> SessionInfo | None:
"""Get session by name."""
return self._sessions.get(name)
def list_sessions(self) -> list[dict[str, Any]]:
"""List all active sessions."""
return [
{
'name': s.name,
'browser_mode': s.browser_mode,
'headed': s.headed,
'profile': s.profile,
}
for s in self._sessions.values()
]
async def close_session(self, name: str) -> bool:
"""Close and remove a session."""
if name not in self._sessions:
return False
session = self._sessions.pop(name)
logger.info(f'Closing session: {name}')
# Note: Tunnels are managed independently via tunnel.py
# They persist across session close/open cycles
try:
await session.browser_session.kill()
except Exception as e:
logger.warning(f'Error closing session {name}: {e}')
return True
async def close_all(self) -> None:
"""Close all sessions."""
for name in list(self._sessions.keys()):
await self.close_session(name)
async def create_browser_session(
mode: str,
headed: bool,

View File

@@ -1,205 +1,63 @@
"""Platform utilities for CLI and server."""
"""Platform utilities for CLI and daemon."""
import hashlib
import os
import platform
import signal
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import IO
import portalocker
def get_socket_path(session: str) -> str:
"""Get socket path for session.
def get_socket_path() -> str:
"""Get the fixed daemon socket path.
On Windows, returns a TCP address (tcp://127.0.0.1:PORT).
On Unix, returns a Unix socket path.
"""
if sys.platform == 'win32':
# Windows: use TCP on deterministic port (49152-65535)
# Use 127.0.0.1 explicitly (not localhost) to avoid IPv6 binding issues
port = 49152 + (int(hashlib.md5(session.encode()).hexdigest()[:4], 16) % 16383)
return f'tcp://127.0.0.1:{port}'
return str(Path(tempfile.gettempdir()) / f'browser-use-{session}.sock')
return 'tcp://127.0.0.1:49200'
return str(Path(tempfile.gettempdir()) / 'browser-use-cli.sock')
def get_pid_path(session: str) -> Path:
"""Get PID file path for session."""
return Path(tempfile.gettempdir()) / f'browser-use-{session}.pid'
def is_daemon_alive() -> bool:
"""Check daemon liveness by attempting socket connect.
def get_log_path(session: str) -> Path:
"""Get log file path for session."""
return Path(tempfile.gettempdir()) / f'browser-use-{session}.log'
def get_lock_path(session: str) -> Path:
"""Get lock file path for session."""
return Path(tempfile.gettempdir()) / f'browser-use-{session}.lock'
def _pid_exists(pid: int) -> bool:
"""Check if a process with given PID exists.
On Windows, uses ctypes to call OpenProcess (os.kill doesn't work reliably).
On Unix, uses os.kill(pid, 0) which is the standard approach.
If socket file exists but nobody is listening, removes the stale file.
"""
if sys.platform == 'win32':
import ctypes
import socket
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
handle = ctypes.windll.kernel32.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, False, pid)
if handle:
ctypes.windll.kernel32.CloseHandle(handle)
return True
return False
else:
sock_path = get_socket_path()
if sock_path.startswith('tcp://'):
_, hostport = sock_path.split('://', 1)
host, port_str = hostport.split(':')
try:
os.kill(pid, 0)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.5)
s.connect((host, int(port_str)))
s.close()
return True
except OSError:
return False
def is_server_running(session: str) -> bool:
"""Check if server is running for session."""
pid_path = get_pid_path(session)
if not pid_path.exists():
return False
try:
pid = int(pid_path.read_text().strip())
return _pid_exists(pid)
except (OSError, ValueError):
# Can't read PID file or invalid PID
return False
def try_acquire_server_lock(session: str) -> IO | None:
"""Try to acquire the server lock non-blocking.
Returns:
Lock file handle if acquired (caller must keep in scope to maintain lock),
None if lock is already held by another process.
"""
lock_path = get_lock_path(session)
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.touch(exist_ok=True)
lock_file = open(lock_path, 'r+')
try:
portalocker.lock(lock_file, portalocker.LOCK_EX | portalocker.LOCK_NB)
return lock_file
except portalocker.LockException:
lock_file.close()
return None
def is_session_locked(session: str) -> bool:
"""Check if session has an active lock (server is holding it)."""
lock_path = get_lock_path(session)
if not lock_path.exists():
return False
try:
with open(lock_path, 'r+') as f:
portalocker.lock(f, portalocker.LOCK_EX | portalocker.LOCK_NB)
portalocker.unlock(f)
return False # Lock acquired = no one holding it
except portalocker.LockException:
return True # Lock failed = someone holding it
except OSError:
return False # File access error
def kill_orphaned_server(session: str) -> bool:
"""Kill an orphaned server (has PID file but no lock).
An orphaned server is one where the process is running but it doesn't
hold the session lock (e.g., because a newer server took over the lock
file but didn't kill the old process).
Returns:
True if an orphan was found and killed.
"""
pid_path = get_pid_path(session)
if not pid_path.exists():
return False
# Check if session is locked (server alive and holding lock)
if is_session_locked(session):
return False # Not an orphan - server is healthy
# PID exists but no lock - orphan situation
try:
pid = int(pid_path.read_text().strip())
if _pid_exists(pid):
# Kill the orphaned process
if sys.platform == 'win32':
import ctypes
PROCESS_TERMINATE = 1
handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False, pid)
if handle:
ctypes.windll.kernel32.TerminateProcess(handle, 1)
ctypes.windll.kernel32.CloseHandle(handle)
else:
os.kill(pid, signal.SIGKILL)
return True
except (OSError, ValueError):
pass
# Clean up stale files even if we couldn't kill (process may be gone)
cleanup_session_files(session)
return False
def find_all_sessions() -> list[str]:
"""Find all running browser-use sessions by scanning PID files."""
sessions = []
tmpdir = Path(tempfile.gettempdir())
for pid_file in tmpdir.glob('browser-use-*.pid'):
# Extract session name from filename: browser-use-{session}.pid
name = pid_file.stem.replace('browser-use-', '', 1)
if is_server_running(name):
sessions.append(name)
return sessions
def cleanup_session_files(session: str) -> None:
"""Remove session socket, PID, lock, and metadata files."""
sock_path = get_socket_path(session)
pid_path = get_pid_path(session)
lock_path = get_lock_path(session)
meta_path = Path(tempfile.gettempdir()) / f'browser-use-{session}.meta'
# Remove socket file (Unix only)
if not sock_path.startswith('tcp://'):
else:
sock_file = Path(sock_path)
if not sock_file.exists():
return False
try:
os.unlink(sock_path)
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.settimeout(0.5)
s.connect(sock_path)
s.close()
return True
except OSError:
pass
# Stale socket file — remove it
sock_file.unlink(missing_ok=True)
return False
# Remove PID file
try:
pid_path.unlink()
except OSError:
pass
# Remove lock file
try:
lock_path.unlink()
except OSError:
pass
# Remove metadata file
try:
meta_path.unlink()
except OSError:
pass
def get_log_path() -> Path:
"""Get log file path for the daemon."""
return Path(tempfile.gettempdir()) / 'browser-use-cli.log'
def find_chrome_executable() -> str | None:

View File

@@ -19,7 +19,6 @@ dependencies = [
"rich==14.3.1",
"google-api-core==2.29.0",
"httpx==0.28.1",
"portalocker==2.10.1",
"posthog==7.7.0",
"psutil==7.2.2",
"pydantic==2.12.5",

View File

@@ -68,9 +68,7 @@ browser-use wait selector "h1" # Wait for element
browser-use wait text "Success" # Wait for text
# Session
browser-use sessions # List active sessions
browser-use close # Close current session
browser-use close --all # Close all sessions
browser-use close # Close browser session
# AI Agent
browser-use run "Fill the contact form" # Run local agent
@@ -186,9 +184,7 @@ browser-use tunnel stop --all # Stop all tunnels
### Session Management
```bash
browser-use sessions # List active sessions
browser-use close # Close current session
browser-use close --all # Close all sessions
browser-use close # Close browser session
```
### Profile Management
@@ -200,11 +196,6 @@ browser-use -b real profile get "Default" # Get profile details
browser-use -b real profile cookies "Default" # Show cookie domains in profile
```
### Server Control
```bash
browser-use server logs # View server logs
```
## Common Workflows
### Authenticated Browsing with Profiles
@@ -267,15 +258,12 @@ browser-use screenshot
| Option | Description |
|--------|-------------|
| `--session NAME` | Use named session (default: "default") |
| `--browser MODE` | Browser mode: chromium, real |
| `--headed` | Show browser window (chromium mode) |
| `--profile NAME` | Browser profile (local Chrome profile name) |
| `--json` | Output as JSON |
| `--mcp` | Run as MCP server via stdin/stdout |
**Session behavior**: All commands without `--session` use the same "default" session. The browser stays open and is reused across commands. Use `--session NAME` to run multiple browsers in parallel.
## Tips
1. **Always run `browser-use state` first** to see available elements and their indices
@@ -294,7 +282,7 @@ browser-use doctor
**Browser won't start?**
```bash
browser-use close --all # Close all sessions
browser-use close # Close browser session
browser-use --headed open <url> # Try with visible window
```
@@ -305,13 +293,6 @@ browser-use scroll down # Element might be below fold
browser-use state # Check again
```
**Session issues?**
```bash
browser-use sessions # Check active sessions
browser-use close --all # Clean slate
browser-use open <url> # Fresh start
```
## Cleanup
**Always close the browser when done:**

View File

@@ -180,9 +180,7 @@ browser-use tunnel stop --all # Stop all tunnels
### Session Management
```bash
browser-use sessions # List active sessions
browser-use close # Close current session
browser-use close --all # Close all sessions
browser-use close # Close browser session
```
## Common Workflows
@@ -213,7 +211,6 @@ browser-use screenshot
| Option | Description |
|--------|-------------|
| `--session NAME` | Named session (default: "default") |
| `--browser MODE` | Browser mode: chromium, real |
| `--json` | Output as JSON |
@@ -230,7 +227,7 @@ browser-use screenshot
**Browser won't start?**
- Run `browser-use doctor` to check configuration
- `browser-use close --all` then retry
- `browser-use close` then retry
**Tunnel not working?**
- Verify cloudflared is installed: `which cloudflared`

View File

@@ -1,19 +1,13 @@
"""Tests for CLI coordinate clicking support.
Verifies that the CLI correctly parses both index-based and coordinate-based
click commands, that the browser command handler dispatches the right events,
and that the direct CLI selector map cache works correctly.
click commands, and that the browser command handler dispatches the right events.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
if TYPE_CHECKING:
from browser_use.dom.views import DOMRect, EnhancedDOMTreeNode
from browser_use.skill_cli.main import build_parser
@@ -139,197 +133,3 @@ class TestClickCommandHandler:
result = await handle('click', session_info, {'args': [1, 2, 3]})
assert 'error' in result
assert 'Usage' in result['error']
def _make_dom_node(
*,
node_name: str,
absolute_position: DOMRect | None = None,
ax_name: str | None = None,
node_value: str = '',
) -> EnhancedDOMTreeNode:
"""Build a real EnhancedDOMTreeNode for testing."""
from browser_use.dom.views import (
EnhancedAXNode,
EnhancedDOMTreeNode,
NodeType,
)
ax_node = None
if ax_name is not None:
ax_node = EnhancedAXNode(
ax_node_id='ax-0',
ignored=False,
role='button',
name=ax_name,
description=None,
properties=None,
child_ids=None,
)
return EnhancedDOMTreeNode(
node_id=1,
backend_node_id=1,
node_type=NodeType.ELEMENT_NODE,
node_name=node_name,
node_value=node_value,
attributes={},
is_scrollable=None,
is_visible=True,
absolute_position=absolute_position,
target_id='target-0',
frame_id=None,
session_id=None,
content_document=None,
shadow_root_type=None,
shadow_roots=None,
parent_node=None,
children_nodes=None,
ax_node=ax_node,
snapshot_node=None,
)
class TestSelectorCache:
"""Test selector map cache round-trip and coordinate conversion."""
@pytest.fixture(autouse=True)
def _use_tmp_state_file(self, monkeypatch, tmp_path):
"""Redirect STATE_FILE to a temp dir so tests don't clobber real state."""
import browser_use.skill_cli.direct as direct_mod
self.state_file = tmp_path / 'browser-use-direct.json'
monkeypatch.setattr(direct_mod, 'STATE_FILE', self.state_file)
def test_save_and_load_cache_round_trip(self):
"""_save_selector_cache → _load_selector_cache preserves data."""
from browser_use.dom.views import DOMRect
from browser_use.skill_cli.direct import (
_load_selector_cache,
_save_selector_cache,
_save_state,
)
_save_state({'cdp_url': 'ws://localhost:9222'})
node_1 = _make_dom_node(
node_name='BUTTON',
absolute_position=DOMRect(x=100.0, y=200.0, width=80.0, height=32.0),
ax_name='Submit',
)
node_2 = _make_dom_node(
node_name='A',
absolute_position=DOMRect(x=50.0, y=800.5, width=200.0, height=40.0),
node_value='Click here',
)
_save_selector_cache({5: node_1, 12: node_2})
loaded = _load_selector_cache()
assert 5 in loaded
assert 12 in loaded
assert loaded[5]['x'] == 100.0
assert loaded[5]['y'] == 200.0
assert loaded[5]['w'] == 80.0
assert loaded[5]['h'] == 32.0
assert loaded[5]['tag'] == 'button'
assert loaded[5]['text'] == 'Submit'
assert loaded[12]['x'] == 50.0
assert loaded[12]['y'] == 800.5
assert loaded[12]['tag'] == 'a'
assert loaded[12]['text'] == 'Click here'
def test_load_empty_cache(self):
"""_load_selector_cache returns empty dict when no cache exists."""
from browser_use.skill_cli.direct import _load_selector_cache, _save_state
_save_state({'cdp_url': 'ws://localhost:9222'})
loaded = _load_selector_cache()
assert loaded == {}
def test_cache_skips_nodes_without_position(self):
"""Nodes without absolute_position are not cached."""
from browser_use.skill_cli.direct import (
_load_selector_cache,
_save_selector_cache,
_save_state,
)
_save_state({'cdp_url': 'ws://localhost:9222'})
node = _make_dom_node(node_name='DIV', absolute_position=None)
_save_selector_cache({1: node})
loaded = _load_selector_cache()
assert loaded == {}
def test_viewport_coordinate_conversion(self):
"""Document coords + scroll offset → viewport coords."""
elem = {'x': 150.0, 'y': 900.0, 'w': 80.0, 'h': 32.0}
scroll_x, scroll_y = 0.0, 500.0
viewport_x = int(elem['x'] + elem['w'] / 2 - scroll_x)
viewport_y = int(elem['y'] + elem['h'] / 2 - scroll_y)
assert viewport_x == 190
assert viewport_y == 416
def test_viewport_conversion_with_horizontal_scroll(self):
"""Horizontal scroll is also accounted for."""
elem = {'x': 1200.0, 'y': 300.0, 'w': 100.0, 'h': 50.0}
scroll_x, scroll_y = 800.0, 100.0
viewport_x = int(elem['x'] + elem['w'] / 2 - scroll_x)
viewport_y = int(elem['y'] + elem['h'] / 2 - scroll_y)
assert viewport_x == 450
assert viewport_y == 225
def test_cache_invalidated_on_navigate(self):
"""Navigating clears selector_map from state."""
from browser_use.skill_cli.direct import _load_state, _save_state
_save_state(
{
'cdp_url': 'ws://localhost:9222',
'target_id': 'abc',
'selector_map': {'1': {'x': 10, 'y': 20, 'w': 30, 'h': 40, 'tag': 'a', 'text': 'Link'}},
}
)
state = _load_state()
state.pop('selector_map', None)
_save_state(state)
reloaded = _load_state()
assert 'selector_map' not in reloaded
assert reloaded['cdp_url'] == 'ws://localhost:9222'
assert reloaded['target_id'] == 'abc'
def test_state_overwritten_on_fresh_cache(self):
"""Running state overwrites old cache with new data."""
from browser_use.dom.views import DOMRect
from browser_use.skill_cli.direct import (
_load_selector_cache,
_save_selector_cache,
_save_state,
)
_save_state(
{
'cdp_url': 'ws://localhost:9222',
'selector_map': {'99': {'x': 0, 'y': 0, 'w': 0, 'h': 0, 'tag': 'old', 'text': 'old'}},
}
)
node = _make_dom_node(
node_name='SPAN',
absolute_position=DOMRect(x=5.0, y=10.0, width=20.0, height=15.0),
ax_name='New',
)
_save_selector_cache({7: node})
loaded = _load_selector_cache()
assert 99 not in loaded
assert 7 in loaded
assert loaded[7]['tag'] == 'span'

View File

@@ -19,13 +19,13 @@ def test_headed_flag_before_open_subcommand():
assert args.command == 'open'
def test_headed_flag_with_session():
"""Test that --headed works with other global flags like -s/--session."""
def test_headed_flag_with_browser():
"""Test that --headed works with other global flags like --browser."""
parser = build_parser()
args = parser.parse_args(['--headed', '-s', 'mysession', 'open', 'http://example.com'])
args = parser.parse_args(['--headed', '-b', 'chromium', 'open', 'http://example.com'])
assert args.headed is True
assert args.session == 'mysession'
assert args.browser == 'chromium'
assert args.url == 'http://example.com'

View File

@@ -1,256 +0,0 @@
"""Tests for server locking to prevent race conditions."""
import os
import subprocess
import sys
import tempfile
import time
from pathlib import Path
import portalocker
import pytest
from browser_use.skill_cli.utils import (
cleanup_session_files,
get_lock_path,
get_pid_path,
is_server_running,
is_session_locked,
kill_orphaned_server,
try_acquire_server_lock,
)
@pytest.fixture
def test_session():
"""Provide a unique test session name and cleanup after."""
session = f'test-lock-{os.getpid()}-{time.time_ns()}'
yield session
cleanup_session_files(session)
def test_lock_path_generation(test_session):
"""Test that lock path is generated correctly."""
path = get_lock_path(test_session)
assert path.parent == Path(tempfile.gettempdir())
assert path.name == f'browser-use-{test_session}.lock'
def test_try_acquire_server_lock_success(test_session):
"""Test acquiring lock when no one holds it."""
lock = try_acquire_server_lock(test_session)
assert lock is not None
# Should block second acquisition
lock2 = try_acquire_server_lock(test_session)
assert lock2 is None
# Release first lock
portalocker.unlock(lock)
lock.close()
# Now should succeed
lock3 = try_acquire_server_lock(test_session)
assert lock3 is not None
portalocker.unlock(lock3)
lock3.close()
def test_is_session_locked(test_session):
"""Test detecting if session is locked."""
# Initially not locked
assert is_session_locked(test_session) is False
# Acquire lock
lock = try_acquire_server_lock(test_session)
assert lock is not None
# Now should be locked
assert is_session_locked(test_session) is True
# Release
portalocker.unlock(lock)
lock.close()
# No longer locked
assert is_session_locked(test_session) is False
def test_kill_orphaned_server_no_pid_file(test_session):
"""Test that kill_orphaned_server returns False when no PID file."""
assert kill_orphaned_server(test_session) is False
def test_kill_orphaned_server_with_lock(test_session):
"""Test that kill_orphaned_server doesn't kill server holding lock."""
# Create PID file pointing to current process
pid_path = get_pid_path(test_session)
pid_path.write_text(str(os.getpid()))
# Acquire lock (simulating a healthy server)
lock = try_acquire_server_lock(test_session)
assert lock is not None
# Should not kill - server is healthy (has lock)
assert kill_orphaned_server(test_session) is False
portalocker.unlock(lock)
lock.close()
def test_cleanup_includes_lock_file(test_session):
"""Test that cleanup removes lock file."""
lock_path = get_lock_path(test_session)
pid_path = get_pid_path(test_session)
# Create files
lock_path.touch()
pid_path.write_text('12345')
assert lock_path.exists()
assert pid_path.exists()
cleanup_session_files(test_session)
assert not lock_path.exists()
assert not pid_path.exists()
def test_concurrent_lock_acquisition(test_session):
"""Test that only one process can hold the lock."""
lock_path = get_lock_path(test_session)
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.touch()
# Acquire lock in current process
lock = try_acquire_server_lock(test_session)
assert lock is not None
# Try to acquire in subprocess - should fail
result = subprocess.run(
[
sys.executable,
'-c',
f'''
import portalocker
from pathlib import Path
lock_path = Path("{lock_path}")
f = open(lock_path, 'r+')
try:
portalocker.lock(f, portalocker.LOCK_EX | portalocker.LOCK_NB)
print("ACQUIRED")
except portalocker.LockException:
print("BLOCKED")
f.close()
''',
],
capture_output=True,
text=True,
timeout=5,
)
assert 'BLOCKED' in result.stdout
# Release lock
portalocker.unlock(lock)
lock.close()
# Now subprocess should succeed
result = subprocess.run(
[
sys.executable,
'-c',
f'''
import portalocker
from pathlib import Path
lock_path = Path("{lock_path}")
f = open(lock_path, 'r+')
try:
portalocker.lock(f, portalocker.LOCK_EX | portalocker.LOCK_NB)
print("ACQUIRED")
portalocker.unlock(f)
except portalocker.LockException:
print("BLOCKED")
f.close()
''',
],
capture_output=True,
text=True,
timeout=5,
)
assert 'ACQUIRED' in result.stdout
def test_lock_released_on_process_death(test_session):
"""Test that lock is automatically released when process dies."""
lock_path = get_lock_path(test_session)
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.touch()
# Start subprocess that holds lock
proc = subprocess.Popen(
[
sys.executable,
'-c',
f'''
import portalocker
import time
from pathlib import Path
lock_path = Path("{lock_path}")
f = open(lock_path, 'r+')
portalocker.lock(f, portalocker.LOCK_EX | portalocker.LOCK_NB)
print("LOCKED", flush=True)
time.sleep(60) # Hold lock
''',
],
stdout=subprocess.PIPE,
text=True,
)
# Wait for lock acquisition
assert proc.stdout is not None
line = proc.stdout.readline()
assert 'LOCKED' in line
# Verify we can't acquire
lock = try_acquire_server_lock(test_session)
assert lock is None
# Kill the process
proc.terminate()
proc.wait(timeout=5)
# Small delay for OS to release lock
time.sleep(0.1)
# Now we should be able to acquire
lock = try_acquire_server_lock(test_session)
assert lock is not None
portalocker.unlock(lock)
lock.close()
def test_is_server_running_without_pid(test_session):
"""Test is_server_running returns False when no PID file."""
assert is_server_running(test_session) is False
def test_is_server_running_with_current_pid(test_session):
"""Test is_server_running returns True when PID file points to live process."""
pid_path = get_pid_path(test_session)
pid_path.write_text(str(os.getpid()))
assert is_server_running(test_session) is True
def test_is_server_running_with_dead_pid(test_session):
"""Test is_server_running returns False when PID file points to dead process."""
pid_path = get_pid_path(test_session)
# Use a PID that's very unlikely to exist
pid_path.write_text('999999999')
assert is_server_running(test_session) is False

View File

@@ -1,175 +0,0 @@
"""Tests for session browser mode validation.
When a session is started with a specific browser mode (chromium, remote, real),
subsequent commands with a different mode should error with helpful guidance.
"""
import json
import tempfile
from pathlib import Path
from browser_use.skill_cli.main import get_session_metadata_path
def test_get_session_metadata_path():
"""Test that metadata path is generated correctly."""
path = get_session_metadata_path('default')
assert path.parent == Path(tempfile.gettempdir())
assert path.name == 'browser-use-default.meta'
def test_get_session_metadata_path_custom_session():
"""Test metadata path for custom session names."""
path = get_session_metadata_path('my-session')
assert path.name == 'browser-use-my-session.meta'
def test_metadata_file_format():
"""Test metadata file format matches expected structure."""
meta_path = get_session_metadata_path('test-format')
try:
# Write metadata as the code does
meta_path.write_text(
json.dumps(
{
'browser_mode': 'chromium',
'headed': False,
'profile': None,
}
)
)
# Read and verify
meta = json.loads(meta_path.read_text())
assert meta['browser_mode'] == 'chromium'
assert meta['headed'] is False
assert meta['profile'] is None
finally:
if meta_path.exists():
meta_path.unlink()
def test_metadata_file_remote_mode():
"""Test metadata file with remote browser mode."""
meta_path = get_session_metadata_path('test-remote')
try:
meta_path.write_text(
json.dumps(
{
'browser_mode': 'remote',
'headed': True,
'profile': 'cloud-profile-123',
}
)
)
meta = json.loads(meta_path.read_text())
assert meta['browser_mode'] == 'remote'
assert meta['headed'] is True
assert meta['profile'] == 'cloud-profile-123'
finally:
if meta_path.exists():
meta_path.unlink()
def test_metadata_cleanup():
"""Test that metadata file can be cleaned up."""
meta_path = get_session_metadata_path('test-cleanup')
meta_path.write_text(json.dumps({'browser_mode': 'chromium'}))
assert meta_path.exists()
# Cleanup
meta_path.unlink()
assert not meta_path.exists()
def test_mode_mismatch_remote_on_local_should_error():
"""Test that requesting remote on local session triggers error condition.
This is the problematic case: user wants cloud features (live_url) but
session is running locally. They would silently lose those features.
"""
meta_path = get_session_metadata_path('test-mismatch-error')
try:
# Simulate existing session with chromium (local) mode
meta_path.write_text(json.dumps({'browser_mode': 'chromium'}))
meta = json.loads(meta_path.read_text())
existing_mode = meta.get('browser_mode', 'chromium')
requested_mode = 'remote'
# This combination should trigger an error
should_error = requested_mode == 'remote' and existing_mode != 'remote'
assert should_error is True
finally:
if meta_path.exists():
meta_path.unlink()
def test_mode_mismatch_local_on_remote_should_allow():
"""Test that requesting local on remote session is allowed.
This case is fine: user gets a remote browser (more features than requested).
The remote session works just like a local one, just with extra features.
"""
meta_path = get_session_metadata_path('test-mismatch-allow')
try:
# Simulate existing session with remote mode
meta_path.write_text(json.dumps({'browser_mode': 'remote'}))
meta = json.loads(meta_path.read_text())
existing_mode = meta.get('browser_mode')
assert existing_mode == 'remote'
requested_mode = 'chromium' # Default mode when user doesn't specify --browser
# This combination should NOT trigger an error
# (user requested chromium, but session is remote - that's fine)
should_error = requested_mode == 'remote' and existing_mode != 'remote'
assert should_error is False
finally:
if meta_path.exists():
meta_path.unlink()
def test_mode_match_detection_logic():
"""Test that matching modes pass validation."""
meta_path = get_session_metadata_path('test-match')
try:
# Simulate existing session with chromium mode
meta_path.write_text(json.dumps({'browser_mode': 'chromium'}))
# Check match passes
meta = json.loads(meta_path.read_text())
existing_mode = meta.get('browser_mode', 'chromium')
requested_mode = 'chromium'
assert existing_mode == requested_mode
finally:
if meta_path.exists():
meta_path.unlink()
def test_different_sessions_independent():
"""Test that different session names are independent."""
session1_meta = get_session_metadata_path('session-a')
session2_meta = get_session_metadata_path('session-b')
try:
# Session A with chromium
session1_meta.write_text(json.dumps({'browser_mode': 'chromium'}))
# Session B with remote
session2_meta.write_text(json.dumps({'browser_mode': 'remote'}))
# Verify they are independent
meta1 = json.loads(session1_meta.read_text())
meta2 = json.loads(session2_meta.read_text())
assert meta1['browser_mode'] == 'chromium'
assert meta2['browser_mode'] == 'remote'
finally:
if session1_meta.exists():
session1_meta.unlink()
if session2_meta.exists():
session2_meta.unlink()