mirror of
https://github.com/browser-use/browser-use
synced 2026-05-06 17:52:15 +02:00
feat: improved session manager
This commit is contained in:
@@ -69,53 +69,26 @@ class CDPSession(BaseModel):
|
||||
title: str = 'Unknown title'
|
||||
url: str = 'about:blank'
|
||||
|
||||
# Track if this session owns its CDP client (for cleanup)
|
||||
owns_cdp_client: bool = False
|
||||
|
||||
@classmethod
|
||||
async def for_target(
|
||||
cls,
|
||||
cdp_client: CDPClient,
|
||||
target_id: TargetID,
|
||||
new_socket: bool = False,
|
||||
cdp_url: str | None = None,
|
||||
domains: list[str] | None = None,
|
||||
):
|
||||
"""Create a CDP session for a target.
|
||||
"""Create a CDP session for a target using the shared WebSocket.
|
||||
|
||||
Args:
|
||||
cdp_client: Existing CDP client to use (or just for reference if creating own)
|
||||
cdp_client: The shared CDP client (root WebSocket connection)
|
||||
target_id: Target ID to attach to
|
||||
new_socket: If True, create a dedicated WebSocket connection for this target
|
||||
cdp_url: CDP URL (required if new_socket is True)
|
||||
domains: List of CDP domains to enable. If None, enables default domains.
|
||||
"""
|
||||
if new_socket:
|
||||
if not cdp_url:
|
||||
raise ValueError('cdp_url required when new_socket=True')
|
||||
# Create a new CDP client with its own WebSocket connection
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(f'browser_use.CDPSession.{target_id[-4:]}')
|
||||
logger.debug(f'🔌 Creating new dedicated WebSocket connection for target 🅣 {target_id}')
|
||||
|
||||
target_cdp_client = CDPClient(cdp_url)
|
||||
await target_cdp_client.start()
|
||||
|
||||
cdp_session = cls(
|
||||
cdp_client=target_cdp_client,
|
||||
target_id=target_id,
|
||||
session_id='connecting',
|
||||
owns_cdp_client=True,
|
||||
)
|
||||
else:
|
||||
# Use shared CDP client
|
||||
cdp_session = cls(
|
||||
cdp_client=cdp_client,
|
||||
target_id=target_id,
|
||||
session_id='connecting',
|
||||
owns_cdp_client=False,
|
||||
)
|
||||
# Always use shared CDP client (event-driven approach)
|
||||
cdp_session = cls(
|
||||
cdp_client=cdp_client,
|
||||
target_id=target_id,
|
||||
session_id='connecting',
|
||||
)
|
||||
return await cdp_session.attach(domains=domains)
|
||||
|
||||
async def attach(self, domains: list[str] | None = None) -> Self:
|
||||
@@ -163,12 +136,10 @@ class CDPSession(BaseModel):
|
||||
return self
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
"""Disconnect and cleanup if this session owns its CDP client."""
|
||||
if self.owns_cdp_client and self.cdp_client:
|
||||
try:
|
||||
await self.cdp_client.stop()
|
||||
except Exception:
|
||||
pass # Ignore errors during cleanup
|
||||
"""Disconnect session (no-op since we use shared WebSocket)."""
|
||||
# With event-driven approach, all sessions share the root WebSocket
|
||||
# Nothing to disconnect - only the root client is disconnected on browser.stop()
|
||||
pass
|
||||
|
||||
async def get_tab_info(self) -> TabInfo:
|
||||
target_info = await self.get_target_info()
|
||||
@@ -340,6 +311,7 @@ class BrowserSession(BaseModel):
|
||||
# Mutable private state shared between watchdogs
|
||||
_cdp_client_root: CDPClient | None = PrivateAttr(default=None)
|
||||
_cdp_session_pool: dict[str, CDPSession] = PrivateAttr(default_factory=dict)
|
||||
_session_manager: Any = PrivateAttr(default=None) # SessionManager instance
|
||||
_cached_browser_state_summary: Any = PrivateAttr(default=None)
|
||||
_cached_selector_map: dict[int, EnhancedDOMTreeNode] = PrivateAttr(default_factory=dict)
|
||||
_downloaded_files: list[str] = PrivateAttr(default_factory=list) # Track files downloaded during this session
|
||||
@@ -397,10 +369,12 @@ class BrowserSession(BaseModel):
|
||||
# await self.event_bus.wait_for_idle(timeout=5.0)
|
||||
# await self.event_bus.clear()
|
||||
|
||||
# Disconnect sessions that own their WebSocket connections
|
||||
for session in self._cdp_session_pool.values():
|
||||
if hasattr(session, 'disconnect'):
|
||||
await session.disconnect()
|
||||
# Clear session manager first (stops event monitoring)
|
||||
if self._session_manager:
|
||||
await self._session_manager.clear()
|
||||
self._session_manager = None
|
||||
|
||||
# Clear session pool (all sessions share the root WebSocket, so no disconnect needed)
|
||||
self._cdp_session_pool.clear()
|
||||
|
||||
self._cdp_client_root = None # type: ignore
|
||||
@@ -743,14 +717,6 @@ class BrowserSession(BaseModel):
|
||||
async def on_CloseTabEvent(self, event: CloseTabEvent) -> None:
|
||||
"""Handle tab closure - update focus if needed."""
|
||||
try:
|
||||
# Remove from session pool first to prevent further use
|
||||
stale_session = self._cdp_session_pool.pop(event.target_id, None)
|
||||
if stale_session and stale_session.owns_cdp_client:
|
||||
try:
|
||||
await stale_session.disconnect()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Dispatch tab closed event
|
||||
await self.event_bus.dispatch(TabClosedEvent(target_id=event.target_id))
|
||||
|
||||
@@ -1036,75 +1002,68 @@ class BrowserSession(BaseModel):
|
||||
|
||||
return storage_state
|
||||
|
||||
async def get_or_create_cdp_session(
|
||||
self, target_id: TargetID | None = None, focus: bool = True, new_socket: bool | None = None
|
||||
) -> CDPSession:
|
||||
"""Get or create a CDP session for a target.
|
||||
async def get_or_create_cdp_session(self, target_id: TargetID | None = None, focus: bool = True) -> CDPSession:
|
||||
"""Get CDP session for a target from the event-driven pool.
|
||||
|
||||
With autoAttach=True, sessions are created automatically by Chrome and added
|
||||
to the pool via Target.attachedToTarget events. This method retrieves them.
|
||||
|
||||
Args:
|
||||
target_id: Target ID to get session for. If None, uses current agent focus.
|
||||
focus: If True, switches agent focus to this target. If False, just returns session without changing focus.
|
||||
new_socket: If True, create a dedicated WebSocket connection. If None (default), creates new socket for new targets only.
|
||||
target_id: Target ID to get session for. If None, uses current agent focus.
|
||||
focus: If True, switches agent focus to this target.
|
||||
|
||||
Returns:
|
||||
CDPSession for the specified target.
|
||||
"""
|
||||
assert self.cdp_url is not None, 'CDP URL not set - browser may not be configured or launched yet'
|
||||
assert self._cdp_client_root is not None, 'Root CDP client not initialized - browser may not be connected yet'
|
||||
assert self.agent_focus is not None, 'CDP session not initialized - browser may not be connected yet'
|
||||
CDPSession for the specified target.
|
||||
|
||||
# If no target_id specified, use the current target_id
|
||||
Raises:
|
||||
ValueError: If target doesn't exist or session is not available.
|
||||
"""
|
||||
assert self._cdp_client_root is not None, 'Root CDP client not initialized'
|
||||
assert self.agent_focus is not None, 'CDP session not initialized'
|
||||
assert self._session_manager is not None, 'SessionManager not initialized'
|
||||
|
||||
# If no target_id specified, use current agent focus
|
||||
if target_id is None:
|
||||
target_id = self.agent_focus.target_id
|
||||
|
||||
# Check if we already have a session for this target in the pool
|
||||
if target_id in self._cdp_session_pool:
|
||||
session = self._cdp_session_pool[target_id]
|
||||
# Get session from event-driven pool
|
||||
session = await self._session_manager.get_session_for_target(target_id)
|
||||
|
||||
if session:
|
||||
# Validate session is still active
|
||||
is_valid = await self._session_manager.validate_session(target_id)
|
||||
if not is_valid:
|
||||
raise ValueError(f'Target {target_id} has detached - no active sessions')
|
||||
|
||||
# Update focus if requested
|
||||
if focus and self.agent_focus.target_id != target_id:
|
||||
self.logger.debug(
|
||||
f'[get_or_create_cdp_session] Switching agent focus from {self.agent_focus.target_id} to {target_id}'
|
||||
)
|
||||
self.logger.debug(f'[SessionManager] Switching focus: {self.agent_focus.target_id[:8]}... → {target_id[:8]}...')
|
||||
self.agent_focus = session
|
||||
|
||||
# Resume if waiting for debugger
|
||||
if focus:
|
||||
await session.cdp_client.send.Runtime.runIfWaitingForDebugger(session_id=session.session_id)
|
||||
# else:
|
||||
# self.logger.debug(f'[get_or_create_cdp_session] Reusing existing session for {target_id} (focus={focus})')
|
||||
try:
|
||||
await session.cdp_client.send.Runtime.runIfWaitingForDebugger(session_id=session.session_id)
|
||||
except Exception:
|
||||
pass # May fail if not waiting
|
||||
|
||||
return session
|
||||
|
||||
# If it's the current focus target, return that session
|
||||
if self.agent_focus.target_id == target_id:
|
||||
self._cdp_session_pool[target_id] = self.agent_focus
|
||||
return self.agent_focus
|
||||
# Session not in pool yet - wait for attach event
|
||||
self.logger.debug(f'[SessionManager] Waiting for target {target_id[:8]}... to attach...')
|
||||
|
||||
# Create new session for this target
|
||||
# Default to True for new sessions (each new target gets its own WebSocket)
|
||||
should_use_new_socket = True if new_socket is None else new_socket
|
||||
self.logger.debug(
|
||||
f'[get_or_create_cdp_session] Creating new CDP session for target {target_id} (new_socket={should_use_new_socket})'
|
||||
)
|
||||
session = await CDPSession.for_target(
|
||||
self._cdp_client_root,
|
||||
target_id,
|
||||
new_socket=should_use_new_socket,
|
||||
cdp_url=self.cdp_url if should_use_new_socket else None,
|
||||
)
|
||||
self._cdp_session_pool[target_id] = session
|
||||
# log length of _cdp_session_pool
|
||||
self.logger.debug(f'[get_or_create_cdp_session] new _cdp_session_pool length: {len(self._cdp_session_pool)}')
|
||||
# Wait up to 2 seconds for the attach event
|
||||
for attempt in range(20):
|
||||
await asyncio.sleep(0.1)
|
||||
session = await self._session_manager.get_session_for_target(target_id)
|
||||
if session:
|
||||
self.logger.debug(f'[SessionManager] Target appeared after {attempt * 100}ms')
|
||||
if focus:
|
||||
self.agent_focus = session
|
||||
return session
|
||||
|
||||
# Only change agent focus if requested
|
||||
if focus:
|
||||
self.logger.debug(
|
||||
f'[get_or_create_cdp_session] Switching agent focus from {self.agent_focus.target_id} to {target_id}'
|
||||
)
|
||||
self.agent_focus = session
|
||||
await session.cdp_client.send.Runtime.runIfWaitingForDebugger(session_id=session.session_id)
|
||||
else:
|
||||
self.logger.debug(
|
||||
f'[get_or_create_cdp_session] Created session for {target_id} without changing focus (still on {self.agent_focus.target_id})'
|
||||
)
|
||||
|
||||
return session
|
||||
# Timeout - target doesn't exist
|
||||
raise ValueError(f'Target {target_id} not found - may have detached or never existed')
|
||||
|
||||
@property
|
||||
def current_target_id(self) -> str | None:
|
||||
@@ -1328,22 +1287,52 @@ class BrowserSession(BaseModel):
|
||||
self.logger.debug(f'🌎 Connecting to existing chromium-based browser via CDP: {self.cdp_url} -> ({browser_location})')
|
||||
|
||||
try:
|
||||
# Import cdp-use client
|
||||
|
||||
# Convert HTTP URL to WebSocket URL if needed
|
||||
|
||||
# Create and store the CDP client for direct CDP communication
|
||||
self._cdp_client_root = CDPClient(self.cdp_url)
|
||||
assert self._cdp_client_root is not None
|
||||
await self._cdp_client_root.start()
|
||||
|
||||
# Initialize event-driven session manager FIRST (before enabling autoAttach)
|
||||
from browser_use.browser.session_manager import SessionManager
|
||||
|
||||
self._session_manager = SessionManager(self)
|
||||
await self._session_manager.start_monitoring()
|
||||
self.logger.info('Event-driven session manager started')
|
||||
|
||||
# Enable auto-attach so Chrome automatically notifies us when NEW targets attach/detach
|
||||
# This is the foundation of event-driven session management
|
||||
await self._cdp_client_root.send.Target.setAutoAttach(
|
||||
params={'autoAttach': False, 'waitForDebuggerOnStart': False, 'flatten': True}
|
||||
params={'autoAttach': True, 'waitForDebuggerOnStart': False, 'flatten': True}
|
||||
)
|
||||
self.logger.debug('CDP client connected successfully')
|
||||
self.logger.info('CDP client connected with auto-attach enabled')
|
||||
|
||||
# Get browser targets to find available contexts/pages
|
||||
targets = await self._cdp_client_root.send.Target.getTargets()
|
||||
|
||||
# Manually attach to ALL EXISTING targets (autoAttach only fires for new ones)
|
||||
# We attach to everything (pages, iframes, workers) for complete coverage
|
||||
for target in targets['targetInfos']:
|
||||
target_id = target['targetId']
|
||||
target_type = target.get('type', 'unknown')
|
||||
|
||||
try:
|
||||
# Attach to target - this triggers attachedToTarget event
|
||||
result = await self._cdp_client_root.send.Target.attachToTarget(
|
||||
params={'targetId': target_id, 'flatten': True}
|
||||
)
|
||||
session_id = result['sessionId']
|
||||
|
||||
# Enable auto-attach for this target's children
|
||||
await self._cdp_client_root.send.Target.setAutoAttach(
|
||||
params={'autoAttach': True, 'waitForDebuggerOnStart': False, 'flatten': True}, session_id=session_id
|
||||
)
|
||||
|
||||
self.logger.debug(
|
||||
f'Attached to existing target: {target_id[:8]}... (type={target_type}, session={session_id[:8]}...)'
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.debug(f'Failed to attach to existing target {target_id[:8]}... (type={target_type}): {e}')
|
||||
|
||||
# Find main browser pages (avoiding iframes, workers, extensions, etc.)
|
||||
page_targets: list[TargetInfo] = [
|
||||
t
|
||||
@@ -1353,76 +1342,57 @@ class BrowserSession(BaseModel):
|
||||
)
|
||||
]
|
||||
|
||||
# Check for chrome://newtab pages and immediately redirect them
|
||||
# to about:blank to avoid JS issues from CDP on chrome://* urls
|
||||
# Check for chrome://newtab pages and redirect them to about:blank
|
||||
from browser_use.utils import is_new_tab_page
|
||||
|
||||
# Collect all targets that need redirection
|
||||
redirected_targets = []
|
||||
redirect_sessions = {} # Store sessions created for redirection to potentially reuse
|
||||
for target in page_targets:
|
||||
target_url = target.get('url', '')
|
||||
if is_new_tab_page(target_url) and target_url != 'about:blank':
|
||||
# Redirect chrome://newtab to about:blank to avoid JS issues preventing driving chrome://newtab
|
||||
target_id = target['targetId']
|
||||
self.logger.debug(f'🔄 Redirecting {target_url} to about:blank for target {target_id}')
|
||||
try:
|
||||
# Create a CDP session for redirection (minimal domains to avoid duplicate event handlers)
|
||||
# Only enable Page domain for navigation, avoid duplicate event handlers
|
||||
redirect_session = await CDPSession.for_target(self._cdp_client_root, target_id, domains=['Page'])
|
||||
# Navigate to about:blank
|
||||
await redirect_session.cdp_client.send.Page.navigate(
|
||||
params={'url': 'about:blank'}, session_id=redirect_session.session_id
|
||||
)
|
||||
redirected_targets.append(target_id)
|
||||
redirect_sessions[target_id] = redirect_session # Store for potential reuse
|
||||
# Update the target's URL in our list for later use
|
||||
target['url'] = 'about:blank'
|
||||
# Small delay to ensure navigation completes
|
||||
await asyncio.sleep(0.05)
|
||||
# Sessions now exist from manual attachment above
|
||||
session = await self._session_manager.get_session_for_target(target_id)
|
||||
if session:
|
||||
await session.cdp_client.send.Page.navigate(
|
||||
params={'url': 'about:blank'}, session_id=session.session_id
|
||||
)
|
||||
target['url'] = 'about:blank'
|
||||
await asyncio.sleep(0.05) # Let navigation start
|
||||
except Exception as e:
|
||||
self.logger.warning(f'Failed to redirect {target_url} to about:blank: {e}')
|
||||
|
||||
# Log summary of redirections
|
||||
if redirected_targets:
|
||||
self.logger.debug(f'Redirected {len(redirected_targets)} chrome://newtab pages to about:blank')
|
||||
self.logger.warning(f'Failed to redirect {target_url}: {e}')
|
||||
|
||||
# Ensure we have at least one page
|
||||
if not page_targets:
|
||||
# No pages found, create a new one
|
||||
new_target = await self._cdp_client_root.send.Target.createTarget(params={'url': 'about:blank'})
|
||||
target_id = new_target['targetId']
|
||||
self.logger.debug(f'📄 Created new blank page with target ID: {target_id}')
|
||||
self.logger.debug(f'📄 Created new blank page: {target_id}')
|
||||
else:
|
||||
# Use the first available page
|
||||
target_id = [page for page in page_targets if page.get('type') == 'page'][0]['targetId']
|
||||
self.logger.debug(f'📄 Using existing page with target ID: {target_id}')
|
||||
self.logger.debug(f'📄 Using existing page: {target_id}')
|
||||
|
||||
# Store the current page target ID and add to pool
|
||||
# Reuse redirect session if available, otherwise create new one
|
||||
if target_id in redirect_sessions:
|
||||
self.logger.debug(f'Reusing redirect session for target {target_id}')
|
||||
self.agent_focus = redirect_sessions[target_id]
|
||||
else:
|
||||
# For the initial connection, we'll use the shared root WebSocket
|
||||
self.agent_focus = await CDPSession.for_target(self._cdp_client_root, target_id, new_socket=False)
|
||||
if self.agent_focus:
|
||||
self._cdp_session_pool[target_id] = self.agent_focus
|
||||
# Wait for SessionManager to receive the attach event for this target
|
||||
# (Chrome will fire Target.attachedToTarget event which SessionManager handles)
|
||||
for _ in range(20): # Wait up to 2 seconds
|
||||
await asyncio.sleep(0.1)
|
||||
session = await self._session_manager.get_session_for_target(target_id)
|
||||
if session:
|
||||
self.agent_focus = session
|
||||
# SessionManager already added it to pool - no need to do it manually
|
||||
self.logger.debug(f'📄 Agent focus set to {target_id[:8]}...')
|
||||
break
|
||||
|
||||
if not self.agent_focus:
|
||||
raise RuntimeError(f'Failed to get session for initial target {target_id}')
|
||||
|
||||
# Enable proxy authentication handling if configured
|
||||
await self._setup_proxy_auth()
|
||||
|
||||
# Verify the session is working
|
||||
try:
|
||||
if self.agent_focus:
|
||||
assert self.agent_focus.title != 'Unknown title'
|
||||
else:
|
||||
raise RuntimeError('Failed to create CDP session')
|
||||
except Exception as e:
|
||||
self.logger.warning(f'Failed to create CDP session: {e}')
|
||||
raise
|
||||
if self.agent_focus.title == 'Unknown title':
|
||||
self.logger.warning('Session created but title is unknown (may be normal for about:blank)')
|
||||
|
||||
# Dispatch TabCreatedEvent for all initial tabs (so watchdogs can initialize)
|
||||
# This replaces the duplicated logic from navigation_watchdog's _initialize_agent_focus
|
||||
for idx, target in enumerate(page_targets):
|
||||
target_url = target.get('url', '')
|
||||
self.logger.debug(f'Dispatching TabCreatedEvent for initial tab {idx}: {target_url}')
|
||||
@@ -1739,18 +1709,11 @@ class BrowserSession(BaseModel):
|
||||
# First check cached sessions
|
||||
for full_target_id in self._cdp_session_pool.keys():
|
||||
if full_target_id.endswith(tab_id):
|
||||
# Verify target still exists
|
||||
if await self._is_target_valid(full_target_id):
|
||||
return full_target_id
|
||||
else:
|
||||
# Remove stale session from pool
|
||||
self.logger.debug(f'Removing stale session for target {full_target_id}')
|
||||
stale_session = self._cdp_session_pool.pop(full_target_id, None)
|
||||
if stale_session and stale_session.owns_cdp_client:
|
||||
try:
|
||||
await stale_session.disconnect()
|
||||
except Exception:
|
||||
pass
|
||||
# Stale session - Chrome should have sent detach event
|
||||
# If we're here, event listener will clean it up
|
||||
self.logger.debug(f'Found stale session for target {full_target_id}, skipping')
|
||||
|
||||
# Get all current targets and find the one matching tab_id
|
||||
all_targets = await self.cdp_client.send.Target.getTargets()
|
||||
@@ -2427,7 +2390,7 @@ class BrowserSession(BaseModel):
|
||||
|
||||
async def _cdp_get_cookies(self) -> list[Cookie]:
|
||||
"""Get cookies using CDP Network.getCookies."""
|
||||
cdp_session = await self.get_or_create_cdp_session(target_id=None, new_socket=False)
|
||||
cdp_session = await self.get_or_create_cdp_session(target_id=None)
|
||||
result = await asyncio.wait_for(
|
||||
cdp_session.cdp_client.send.Storage.getCookies(session_id=cdp_session.session_id), timeout=8.0
|
||||
)
|
||||
@@ -2438,7 +2401,7 @@ class BrowserSession(BaseModel):
|
||||
if not self.agent_focus or not cookies:
|
||||
return
|
||||
|
||||
cdp_session = await self.get_or_create_cdp_session(target_id=None, new_socket=False)
|
||||
cdp_session = await self.get_or_create_cdp_session(target_id=None)
|
||||
# Storage.setCookies expects params dict with 'cookies' key
|
||||
await cdp_session.cdp_client.send.Storage.setCookies(
|
||||
params={'cookies': cookies}, # type: ignore[arg-type]
|
||||
@@ -2509,7 +2472,7 @@ class BrowserSession(BaseModel):
|
||||
"""
|
||||
if target_id:
|
||||
# Set viewport for specific target
|
||||
cdp_session = await self.get_or_create_cdp_session(target_id, focus=False, new_socket=False)
|
||||
cdp_session = await self.get_or_create_cdp_session(target_id, focus=False)
|
||||
elif self.agent_focus:
|
||||
# Use current focus
|
||||
cdp_session = self.agent_focus
|
||||
@@ -2525,7 +2488,7 @@ class BrowserSession(BaseModel):
|
||||
async def _cdp_get_origins(self) -> list[dict[str, Any]]:
|
||||
"""Get origins with localStorage and sessionStorage using CDP."""
|
||||
origins = []
|
||||
cdp_session = await self.get_or_create_cdp_session(target_id=None, new_socket=False)
|
||||
cdp_session = await self.get_or_create_cdp_session(target_id=None)
|
||||
|
||||
try:
|
||||
# Enable DOMStorage domain to track storage
|
||||
|
||||
226
browser_use/browser/session_manager.py
Normal file
226
browser_use/browser/session_manager.py
Normal file
@@ -0,0 +1,226 @@
|
||||
"""Event-driven CDP session management.
|
||||
|
||||
Sessions are managed by listening to
|
||||
Target.attachedToTarget and Target.detachedFromTarget events, ensuring the
|
||||
session pool always reflects the current browser state.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import TYPE_CHECKING, Dict, Set
|
||||
|
||||
from cdp_use.cdp.target import AttachedToTargetEvent, DetachedFromTargetEvent, SessionID, TargetID
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from browser_use.browser.session import BrowserSession, CDPSession
|
||||
|
||||
|
||||
class SessionManager:
|
||||
"""Manages CDP sessions with event-driven synchronization.
|
||||
|
||||
Key differences from manual caching:
|
||||
- Sessions are added/removed via CDP events, not manual calls
|
||||
- Multiple sessions can attach to the same target
|
||||
- Targets are only removed when ALL sessions detach
|
||||
- No stale sessions - pool always reflects browser reality
|
||||
"""
|
||||
|
||||
def __init__(self, browser_session: 'BrowserSession'):
|
||||
self.browser_session = browser_session
|
||||
self.logger = browser_session.logger
|
||||
|
||||
# Target -> Set of sessions attached to it
|
||||
self._target_sessions: Dict[TargetID, Set[SessionID]] = {}
|
||||
|
||||
# Session -> Target mapping (for reverse lookup)
|
||||
self._session_to_target: Dict[SessionID, TargetID] = {}
|
||||
|
||||
# Lock for thread-safe access
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def start_monitoring(self) -> None:
|
||||
"""Start monitoring Target attach/detach events.
|
||||
|
||||
This registers CDP event handlers that keep the session pool synchronized.
|
||||
"""
|
||||
if not self.browser_session._cdp_client_root:
|
||||
raise RuntimeError('CDP client not initialized')
|
||||
|
||||
# Capture cdp_client_root in closure to avoid type errors
|
||||
cdp_client = self.browser_session._cdp_client_root
|
||||
|
||||
# Register synchronous event handlers (CDP requirement)
|
||||
def on_attached(event: AttachedToTargetEvent, session_id: SessionID | None = None):
|
||||
event_session_id = event['sessionId']
|
||||
target_type = event['targetInfo'].get('type', 'unknown')
|
||||
|
||||
# Enable auto-attach for this session's children
|
||||
# Note: For short-lived targets (workers, temp iframes), this may fail with -32001
|
||||
# This is EXPECTED - the session_id is valid when event fires, but may detach
|
||||
# before our async task executes. We catch and ignore these errors.
|
||||
async def _enable_auto_attach():
|
||||
try:
|
||||
await cdp_client.send.Target.setAutoAttach(
|
||||
params={'autoAttach': True, 'waitForDebuggerOnStart': False, 'flatten': True}, session_id=event_session_id
|
||||
)
|
||||
self.logger.debug(f'[SessionManager] Auto-attach enabled for {target_type} session {event_session_id[:8]}...')
|
||||
except Exception as e:
|
||||
# Expected for workers/temp iframes that attach/detach rapidly
|
||||
# The session_id is valid in the event, but gone by the time task executes
|
||||
error_str = str(e)
|
||||
if '-32001' in error_str or 'Session with given id not found' in error_str:
|
||||
self.logger.debug(
|
||||
f'[SessionManager] setAutoAttach skipped - {target_type} session {event_session_id[:8]}... '
|
||||
f'already detached (normal for short-lived targets)'
|
||||
)
|
||||
else:
|
||||
self.logger.debug(f'[SessionManager] setAutoAttach failed: {e}')
|
||||
|
||||
# Schedule auto-attach and pool management
|
||||
asyncio.create_task(_enable_auto_attach())
|
||||
asyncio.create_task(self._handle_target_attached(event))
|
||||
|
||||
def on_detached(event: DetachedFromTargetEvent, session_id: SessionID | None = None):
|
||||
asyncio.create_task(self._handle_target_detached(event))
|
||||
|
||||
self.browser_session._cdp_client_root.register.Target.attachedToTarget(on_attached)
|
||||
self.browser_session._cdp_client_root.register.Target.detachedFromTarget(on_detached)
|
||||
|
||||
self.logger.info('[SessionManager] Event monitoring started')
|
||||
|
||||
async def _handle_target_attached(self, event: AttachedToTargetEvent) -> None:
|
||||
"""Handle Target.attachedToTarget event.
|
||||
|
||||
Called automatically by Chrome when a new target/session is created.
|
||||
This is the ONLY place where sessions should be added to the pool.
|
||||
"""
|
||||
target_id = event['targetInfo']['targetId']
|
||||
session_id = event['sessionId']
|
||||
target_type = event['targetInfo']['type']
|
||||
waiting_for_debugger = event.get('waitingForDebugger', False)
|
||||
|
||||
self.logger.debug(
|
||||
f'[SessionManager] Target attached: {target_id[:8]}... (session={session_id[:8]}..., '
|
||||
f'type={target_type}, waitingForDebugger={waiting_for_debugger})'
|
||||
)
|
||||
|
||||
async with self._lock:
|
||||
# Track this session for the target
|
||||
if target_id not in self._target_sessions:
|
||||
self._target_sessions[target_id] = set()
|
||||
|
||||
self._target_sessions[target_id].add(session_id)
|
||||
self._session_to_target[session_id] = target_id
|
||||
|
||||
# Create CDPSession wrapper and add to pool
|
||||
if target_id not in self.browser_session._cdp_session_pool:
|
||||
# Create session wrapper (uses shared WebSocket, just tracks session_id)
|
||||
from browser_use.browser.session import CDPSession
|
||||
|
||||
assert self.browser_session._cdp_client_root is not None, 'Root CDP client required'
|
||||
|
||||
cdp_session = CDPSession(
|
||||
cdp_client=self.browser_session._cdp_client_root,
|
||||
target_id=target_id,
|
||||
session_id=session_id,
|
||||
title=event['targetInfo'].get('title', 'Unknown title'),
|
||||
url=event['targetInfo'].get('url', 'about:blank'),
|
||||
)
|
||||
|
||||
self.browser_session._cdp_session_pool[target_id] = cdp_session
|
||||
|
||||
self.logger.debug(
|
||||
f'[SessionManager] Created session for target {target_id[:8]}... '
|
||||
f'(pool size: {len(self.browser_session._cdp_session_pool)})'
|
||||
)
|
||||
else:
|
||||
# Update existing session with new session_id
|
||||
existing = self.browser_session._cdp_session_pool[target_id]
|
||||
existing.session_id = session_id
|
||||
existing.title = event['targetInfo'].get('title', existing.title)
|
||||
existing.url = event['targetInfo'].get('url', existing.url)
|
||||
|
||||
# Resume execution if waiting for debugger
|
||||
if waiting_for_debugger:
|
||||
try:
|
||||
assert self.browser_session._cdp_client_root is not None
|
||||
await self.browser_session._cdp_client_root.send.Runtime.runIfWaitingForDebugger(session_id=session_id)
|
||||
self.logger.debug(f'[SessionManager] Resumed execution for session {session_id[:8]}...')
|
||||
except Exception as e:
|
||||
self.logger.warning(f'[SessionManager] Failed to resume execution: {e}')
|
||||
|
||||
async def _handle_target_detached(self, event: DetachedFromTargetEvent) -> None:
|
||||
"""Handle Target.detachedFromTarget event.
|
||||
|
||||
Called automatically by Chrome when a target/session is destroyed.
|
||||
This is the ONLY place where sessions should be removed from the pool.
|
||||
"""
|
||||
session_id = event['sessionId']
|
||||
target_id = event.get('targetId') # May be empty
|
||||
|
||||
# If targetId not in event, look it up via session mapping
|
||||
if not target_id:
|
||||
async with self._lock:
|
||||
target_id = self._session_to_target.get(session_id)
|
||||
|
||||
if not target_id:
|
||||
self.logger.warning(f'[SessionManager] Session detached but target unknown (session={session_id[:8]}...)')
|
||||
return
|
||||
|
||||
async with self._lock:
|
||||
# Remove this session from target's session set
|
||||
if target_id in self._target_sessions:
|
||||
self._target_sessions[target_id].discard(session_id)
|
||||
|
||||
remaining_sessions = len(self._target_sessions[target_id])
|
||||
|
||||
self.logger.debug(
|
||||
f'[SessionManager] Session detached: target={target_id[:8]}... '
|
||||
f'session={session_id[:8]}... (remaining={remaining_sessions})'
|
||||
)
|
||||
|
||||
# Only remove target when NO sessions remain
|
||||
if remaining_sessions == 0:
|
||||
self.logger.info(f'[SessionManager] No sessions remain for target {target_id[:8]}..., removing from pool')
|
||||
|
||||
# Remove from pool
|
||||
if target_id in self.browser_session._cdp_session_pool:
|
||||
stale_session = self.browser_session._cdp_session_pool.pop(target_id)
|
||||
# Don't disconnect - we're using shared WebSocket
|
||||
self.logger.debug(
|
||||
f'[SessionManager] Removed target {target_id[:8]}... from pool '
|
||||
f'(pool size: {len(self.browser_session._cdp_session_pool)})'
|
||||
)
|
||||
|
||||
# Clean up tracking
|
||||
del self._target_sessions[target_id]
|
||||
|
||||
# Remove from reverse mapping
|
||||
if session_id in self._session_to_target:
|
||||
del self._session_to_target[session_id]
|
||||
|
||||
async def get_session_for_target(self, target_id: TargetID) -> 'CDPSession | None':
|
||||
"""Get the current valid session for a target.
|
||||
|
||||
Returns None if no session exists (target detached).
|
||||
"""
|
||||
async with self._lock:
|
||||
return self.browser_session._cdp_session_pool.get(target_id)
|
||||
|
||||
async def validate_session(self, target_id: TargetID) -> bool:
|
||||
"""Check if a target still has active sessions.
|
||||
|
||||
Returns True if target is valid, False if it should be removed.
|
||||
"""
|
||||
async with self._lock:
|
||||
if target_id not in self._target_sessions:
|
||||
return False
|
||||
|
||||
return len(self._target_sessions[target_id]) > 0
|
||||
|
||||
async def clear(self) -> None:
|
||||
"""Clear all session tracking (for cleanup)."""
|
||||
async with self._lock:
|
||||
self._target_sessions.clear()
|
||||
self._session_to_target.clear()
|
||||
|
||||
self.logger.info('[SessionManager] Cleared all session tracking')
|
||||
@@ -129,33 +129,36 @@ class BaseWatchdog(BaseModel):
|
||||
f'🚌 {watchdog_and_handler_str} ❌ Failed ({time_elapsed:.2f}s): {type(e).__name__}: {e}'
|
||||
)
|
||||
|
||||
# attempt to repair potentially crashed CDP session
|
||||
try:
|
||||
target_id_to_restore = None
|
||||
if browser_session.agent_focus and browser_session.agent_focus.target_id:
|
||||
# Common issue with CDP, recreate session with new socket to recover
|
||||
target_id_to_restore = browser_session.agent_focus.target_id
|
||||
browser_session.logger.debug(
|
||||
f'🚌 {watchdog_and_handler_str} ⚠️ Recreating session to try and recover crashed CDP session\n\t{browser_session.agent_focus}'
|
||||
)
|
||||
del browser_session._cdp_session_pool[browser_session.agent_focus.target_id]
|
||||
browser_session.agent_focus = await browser_session.get_or_create_cdp_session(
|
||||
target_id=target_id_to_restore, new_socket=True
|
||||
)
|
||||
else:
|
||||
await browser_session.get_or_create_cdp_session(target_id=None, new_socket=True, focus=True)
|
||||
except Exception as sub_error:
|
||||
if 'ConnectionClosedError' in str(type(sub_error)) or 'ConnectionError' in str(type(sub_error)):
|
||||
browser_session.logger.error(
|
||||
f'🚌 {watchdog_and_handler_str} ❌ Browser closed or CDP Connection disconnected by remote. {type(sub_error).__name__}: {sub_error}\n'
|
||||
)
|
||||
raise
|
||||
else:
|
||||
browser_session.logger.error(
|
||||
f'🚌 {watchdog_and_handler_str} ❌ CDP connected but failed to re-create CDP session after error "{type(original_error).__name__}: {original_error}" in {actual_handler.__name__}({event.event_type}#{event.event_id[-4:]}): due to {type(sub_error).__name__}: {sub_error}\n'
|
||||
)
|
||||
# attempt to repair potentially crashed CDP session
|
||||
try:
|
||||
if browser_session.agent_focus and browser_session.agent_focus.target_id:
|
||||
# With event-driven sessions, Chrome will send detach/attach events
|
||||
# SessionManager handles pool cleanup automatically
|
||||
target_id_to_restore = browser_session.agent_focus.target_id
|
||||
browser_session.logger.debug(
|
||||
f'🚌 {watchdog_and_handler_str} ⚠️ Session error detected, waiting for CDP events to sync\n\t{browser_session.agent_focus}'
|
||||
)
|
||||
|
||||
raise
|
||||
# Wait for new attach event to restore the session
|
||||
# This will raise ValueError if target doesn't re-attach
|
||||
browser_session.agent_focus = await browser_session.get_or_create_cdp_session(
|
||||
target_id=target_id_to_restore, focus=True
|
||||
)
|
||||
else:
|
||||
# Try to get any available session
|
||||
await browser_session.get_or_create_cdp_session(target_id=None, focus=True)
|
||||
except Exception as sub_error:
|
||||
if 'ConnectionClosedError' in str(type(sub_error)) or 'ConnectionError' in str(type(sub_error)):
|
||||
browser_session.logger.error(
|
||||
f'🚌 {watchdog_and_handler_str} ❌ Browser closed or CDP Connection disconnected by remote. {type(sub_error).__name__}: {sub_error}\n'
|
||||
)
|
||||
raise
|
||||
else:
|
||||
browser_session.logger.error(
|
||||
f'🚌 {watchdog_and_handler_str} ❌ CDP connected but failed to re-create CDP session after error "{type(original_error).__name__}: {original_error}" in {actual_handler.__name__}({event.event_type}#{event.event_id[-4:]}): due to {type(sub_error).__name__}: {sub_error}\n'
|
||||
)
|
||||
|
||||
raise
|
||||
|
||||
return unique_handler
|
||||
|
||||
|
||||
@@ -168,10 +168,7 @@ class CrashWatchdog(BaseWatchdog):
|
||||
|
||||
async def _on_target_crash_cdp(self, target_id: TargetID) -> None:
|
||||
"""Handle target crash detected via CDP."""
|
||||
# Remove crashed session from pool
|
||||
if session := self.browser_session._cdp_session_pool.pop(target_id, None):
|
||||
await session.disconnect()
|
||||
self.logger.debug(f'[CrashWatchdog] Removed crashed session from pool: {target_id}')
|
||||
self.logger.debug(f'[CrashWatchdog] Target crashed: {target_id}, waiting for detach event')
|
||||
|
||||
# Get target info
|
||||
cdp_client = self.browser_session.cdp_client
|
||||
@@ -300,8 +297,9 @@ class CrashWatchdog(BaseWatchdog):
|
||||
self.logger.debug(
|
||||
f'[CrashWatchdog] Checking browser health for target {self.browser_session.agent_focus} error: {type(e).__name__}: {e}'
|
||||
)
|
||||
# With event-driven sessions, wait for re-attach instead of creating new socket
|
||||
self.agent_focus = cdp_session = await self.browser_session.get_or_create_cdp_session(
|
||||
target_id=self.agent_focus.target_id, new_socket=True, focus=True
|
||||
target_id=self.agent_focus.target_id, focus=True
|
||||
)
|
||||
|
||||
for target in (await self.browser_session.cdp_client.send.Target.getTargets()).get('targetInfos', []):
|
||||
@@ -326,12 +324,9 @@ class CrashWatchdog(BaseWatchdog):
|
||||
self.logger.error(
|
||||
f'[CrashWatchdog] ❌ Crashed session detected for target {self.browser_session.agent_focus} error: {type(e).__name__}: {e}'
|
||||
)
|
||||
# Remove crashed session from pool
|
||||
if self.browser_session.agent_focus and (target_id := self.browser_session.agent_focus.target_id):
|
||||
if session := self.browser_session._cdp_session_pool.pop(target_id, None):
|
||||
await session.disconnect()
|
||||
self.logger.debug(f'[CrashWatchdog] Removed crashed session from pool: {target_id}')
|
||||
self.browser_session.agent_focus.target_id = None # type: ignore
|
||||
# Chrome will send detach event, SessionManager will handle cleanup
|
||||
if self.browser_session.agent_focus:
|
||||
self.browser_session.agent_focus.target_id = None # type: ignore
|
||||
|
||||
# Check browser process if we have PID
|
||||
if self.browser_session._local_browser_watchdog and (proc := self.browser_session._local_browser_watchdog._subprocess):
|
||||
|
||||
@@ -165,7 +165,7 @@ class StorageStateWatchdog(BaseWatchdog):
|
||||
"""Save browser storage state to file."""
|
||||
async with self._save_lock:
|
||||
# Check if CDP client is available
|
||||
assert await self.browser_session.get_or_create_cdp_session(target_id=None, new_socket=False)
|
||||
assert await self.browser_session.get_or_create_cdp_session(target_id=None)
|
||||
|
||||
save_path = path or self.browser_session.browser_profile.storage_state
|
||||
if not save_path:
|
||||
|
||||
@@ -27,16 +27,16 @@ load_dotenv()
|
||||
|
||||
from browser_use import Agent, Tools
|
||||
from browser_use.browser import BrowserProfile, BrowserSession
|
||||
from browser_use.llm import ChatOpenAI
|
||||
from browser_use.llm import ChatBrowserUse
|
||||
|
||||
browser_session = BrowserSession(browser_profile=BrowserProfile(cdp_url='http://localhost:9222', is_local=True))
|
||||
browser_session = BrowserSession(browser_profile=BrowserProfile(cdp_url='https://b6949893-1bef-4e5b-8e02-940f24215549.cdp0.browser-use.com', is_local=False))
|
||||
tools = Tools()
|
||||
|
||||
|
||||
async def main():
|
||||
agent = Agent(
|
||||
task='Visit https://duckduckgo.com and search for "browser-use founders"',
|
||||
llm=ChatOpenAI(model='gpt-4.1-mini'),
|
||||
task='Use the search functionality to locate pages detailing tuition and fees, then extract the published tuition fee information for undergraduate programs. website: https://columbia.edu',
|
||||
llm=ChatBrowserUse(model='bu-latest'),
|
||||
tools=tools,
|
||||
browser_session=browser_session,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user