diff --git a/browser_use/browser/session.py b/browser_use/browser/session.py index 32ed4aa64..2adf94ae8 100644 --- a/browser_use/browser/session.py +++ b/browser_use/browser/session.py @@ -69,53 +69,26 @@ class CDPSession(BaseModel): title: str = 'Unknown title' url: str = 'about:blank' - # Track if this session owns its CDP client (for cleanup) - owns_cdp_client: bool = False - @classmethod async def for_target( cls, cdp_client: CDPClient, target_id: TargetID, - new_socket: bool = False, - cdp_url: str | None = None, domains: list[str] | None = None, ): - """Create a CDP session for a target. + """Create a CDP session for a target using the shared WebSocket. Args: - cdp_client: Existing CDP client to use (or just for reference if creating own) + cdp_client: The shared CDP client (root WebSocket connection) target_id: Target ID to attach to - new_socket: If True, create a dedicated WebSocket connection for this target - cdp_url: CDP URL (required if new_socket is True) domains: List of CDP domains to enable. If None, enables default domains. """ - if new_socket: - if not cdp_url: - raise ValueError('cdp_url required when new_socket=True') - # Create a new CDP client with its own WebSocket connection - import logging - - logger = logging.getLogger(f'browser_use.CDPSession.{target_id[-4:]}') - logger.debug(f'🔌 Creating new dedicated WebSocket connection for target 🅣 {target_id}') - - target_cdp_client = CDPClient(cdp_url) - await target_cdp_client.start() - - cdp_session = cls( - cdp_client=target_cdp_client, - target_id=target_id, - session_id='connecting', - owns_cdp_client=True, - ) - else: - # Use shared CDP client - cdp_session = cls( - cdp_client=cdp_client, - target_id=target_id, - session_id='connecting', - owns_cdp_client=False, - ) + # Always use shared CDP client (event-driven approach) + cdp_session = cls( + cdp_client=cdp_client, + target_id=target_id, + session_id='connecting', + ) return await cdp_session.attach(domains=domains) async def attach(self, domains: list[str] | None = None) -> Self: @@ -163,12 +136,10 @@ class CDPSession(BaseModel): return self async def disconnect(self) -> None: - """Disconnect and cleanup if this session owns its CDP client.""" - if self.owns_cdp_client and self.cdp_client: - try: - await self.cdp_client.stop() - except Exception: - pass # Ignore errors during cleanup + """Disconnect session (no-op since we use shared WebSocket).""" + # With event-driven approach, all sessions share the root WebSocket + # Nothing to disconnect - only the root client is disconnected on browser.stop() + pass async def get_tab_info(self) -> TabInfo: target_info = await self.get_target_info() @@ -340,6 +311,7 @@ class BrowserSession(BaseModel): # Mutable private state shared between watchdogs _cdp_client_root: CDPClient | None = PrivateAttr(default=None) _cdp_session_pool: dict[str, CDPSession] = PrivateAttr(default_factory=dict) + _session_manager: Any = PrivateAttr(default=None) # SessionManager instance _cached_browser_state_summary: Any = PrivateAttr(default=None) _cached_selector_map: dict[int, EnhancedDOMTreeNode] = PrivateAttr(default_factory=dict) _downloaded_files: list[str] = PrivateAttr(default_factory=list) # Track files downloaded during this session @@ -397,10 +369,12 @@ class BrowserSession(BaseModel): # await self.event_bus.wait_for_idle(timeout=5.0) # await self.event_bus.clear() - # Disconnect sessions that own their WebSocket connections - for session in self._cdp_session_pool.values(): - if hasattr(session, 'disconnect'): - await session.disconnect() + # Clear session manager first (stops event monitoring) + if self._session_manager: + await self._session_manager.clear() + self._session_manager = None + + # Clear session pool (all sessions share the root WebSocket, so no disconnect needed) self._cdp_session_pool.clear() self._cdp_client_root = None # type: ignore @@ -743,14 +717,6 @@ class BrowserSession(BaseModel): async def on_CloseTabEvent(self, event: CloseTabEvent) -> None: """Handle tab closure - update focus if needed.""" try: - # Remove from session pool first to prevent further use - stale_session = self._cdp_session_pool.pop(event.target_id, None) - if stale_session and stale_session.owns_cdp_client: - try: - await stale_session.disconnect() - except Exception: - pass - # Dispatch tab closed event await self.event_bus.dispatch(TabClosedEvent(target_id=event.target_id)) @@ -1036,75 +1002,68 @@ class BrowserSession(BaseModel): return storage_state - async def get_or_create_cdp_session( - self, target_id: TargetID | None = None, focus: bool = True, new_socket: bool | None = None - ) -> CDPSession: - """Get or create a CDP session for a target. + async def get_or_create_cdp_session(self, target_id: TargetID | None = None, focus: bool = True) -> CDPSession: + """Get CDP session for a target from the event-driven pool. + + With autoAttach=True, sessions are created automatically by Chrome and added + to the pool via Target.attachedToTarget events. This method retrieves them. Args: - target_id: Target ID to get session for. If None, uses current agent focus. - focus: If True, switches agent focus to this target. If False, just returns session without changing focus. - new_socket: If True, create a dedicated WebSocket connection. If None (default), creates new socket for new targets only. + target_id: Target ID to get session for. If None, uses current agent focus. + focus: If True, switches agent focus to this target. Returns: - CDPSession for the specified target. - """ - assert self.cdp_url is not None, 'CDP URL not set - browser may not be configured or launched yet' - assert self._cdp_client_root is not None, 'Root CDP client not initialized - browser may not be connected yet' - assert self.agent_focus is not None, 'CDP session not initialized - browser may not be connected yet' + CDPSession for the specified target. - # If no target_id specified, use the current target_id + Raises: + ValueError: If target doesn't exist or session is not available. + """ + assert self._cdp_client_root is not None, 'Root CDP client not initialized' + assert self.agent_focus is not None, 'CDP session not initialized' + assert self._session_manager is not None, 'SessionManager not initialized' + + # If no target_id specified, use current agent focus if target_id is None: target_id = self.agent_focus.target_id - # Check if we already have a session for this target in the pool - if target_id in self._cdp_session_pool: - session = self._cdp_session_pool[target_id] + # Get session from event-driven pool + session = await self._session_manager.get_session_for_target(target_id) + + if session: + # Validate session is still active + is_valid = await self._session_manager.validate_session(target_id) + if not is_valid: + raise ValueError(f'Target {target_id} has detached - no active sessions') + + # Update focus if requested if focus and self.agent_focus.target_id != target_id: - self.logger.debug( - f'[get_or_create_cdp_session] Switching agent focus from {self.agent_focus.target_id} to {target_id}' - ) + self.logger.debug(f'[SessionManager] Switching focus: {self.agent_focus.target_id[:8]}... → {target_id[:8]}...') self.agent_focus = session + + # Resume if waiting for debugger if focus: - await session.cdp_client.send.Runtime.runIfWaitingForDebugger(session_id=session.session_id) - # else: - # self.logger.debug(f'[get_or_create_cdp_session] Reusing existing session for {target_id} (focus={focus})') + try: + await session.cdp_client.send.Runtime.runIfWaitingForDebugger(session_id=session.session_id) + except Exception: + pass # May fail if not waiting + return session - # If it's the current focus target, return that session - if self.agent_focus.target_id == target_id: - self._cdp_session_pool[target_id] = self.agent_focus - return self.agent_focus + # Session not in pool yet - wait for attach event + self.logger.debug(f'[SessionManager] Waiting for target {target_id[:8]}... to attach...') - # Create new session for this target - # Default to True for new sessions (each new target gets its own WebSocket) - should_use_new_socket = True if new_socket is None else new_socket - self.logger.debug( - f'[get_or_create_cdp_session] Creating new CDP session for target {target_id} (new_socket={should_use_new_socket})' - ) - session = await CDPSession.for_target( - self._cdp_client_root, - target_id, - new_socket=should_use_new_socket, - cdp_url=self.cdp_url if should_use_new_socket else None, - ) - self._cdp_session_pool[target_id] = session - # log length of _cdp_session_pool - self.logger.debug(f'[get_or_create_cdp_session] new _cdp_session_pool length: {len(self._cdp_session_pool)}') + # Wait up to 2 seconds for the attach event + for attempt in range(20): + await asyncio.sleep(0.1) + session = await self._session_manager.get_session_for_target(target_id) + if session: + self.logger.debug(f'[SessionManager] Target appeared after {attempt * 100}ms') + if focus: + self.agent_focus = session + return session - # Only change agent focus if requested - if focus: - self.logger.debug( - f'[get_or_create_cdp_session] Switching agent focus from {self.agent_focus.target_id} to {target_id}' - ) - self.agent_focus = session - await session.cdp_client.send.Runtime.runIfWaitingForDebugger(session_id=session.session_id) - else: - self.logger.debug( - f'[get_or_create_cdp_session] Created session for {target_id} without changing focus (still on {self.agent_focus.target_id})' - ) - - return session + # Timeout - target doesn't exist + raise ValueError(f'Target {target_id} not found - may have detached or never existed') @property def current_target_id(self) -> str | None: @@ -1328,22 +1287,52 @@ class BrowserSession(BaseModel): self.logger.debug(f'🌎 Connecting to existing chromium-based browser via CDP: {self.cdp_url} -> ({browser_location})') try: - # Import cdp-use client - - # Convert HTTP URL to WebSocket URL if needed - # Create and store the CDP client for direct CDP communication self._cdp_client_root = CDPClient(self.cdp_url) assert self._cdp_client_root is not None await self._cdp_client_root.start() + + # Initialize event-driven session manager FIRST (before enabling autoAttach) + from browser_use.browser.session_manager import SessionManager + + self._session_manager = SessionManager(self) + await self._session_manager.start_monitoring() + self.logger.info('Event-driven session manager started') + + # Enable auto-attach so Chrome automatically notifies us when NEW targets attach/detach + # This is the foundation of event-driven session management await self._cdp_client_root.send.Target.setAutoAttach( - params={'autoAttach': False, 'waitForDebuggerOnStart': False, 'flatten': True} + params={'autoAttach': True, 'waitForDebuggerOnStart': False, 'flatten': True} ) - self.logger.debug('CDP client connected successfully') + self.logger.info('CDP client connected with auto-attach enabled') # Get browser targets to find available contexts/pages targets = await self._cdp_client_root.send.Target.getTargets() + # Manually attach to ALL EXISTING targets (autoAttach only fires for new ones) + # We attach to everything (pages, iframes, workers) for complete coverage + for target in targets['targetInfos']: + target_id = target['targetId'] + target_type = target.get('type', 'unknown') + + try: + # Attach to target - this triggers attachedToTarget event + result = await self._cdp_client_root.send.Target.attachToTarget( + params={'targetId': target_id, 'flatten': True} + ) + session_id = result['sessionId'] + + # Enable auto-attach for this target's children + await self._cdp_client_root.send.Target.setAutoAttach( + params={'autoAttach': True, 'waitForDebuggerOnStart': False, 'flatten': True}, session_id=session_id + ) + + self.logger.debug( + f'Attached to existing target: {target_id[:8]}... (type={target_type}, session={session_id[:8]}...)' + ) + except Exception as e: + self.logger.debug(f'Failed to attach to existing target {target_id[:8]}... (type={target_type}): {e}') + # Find main browser pages (avoiding iframes, workers, extensions, etc.) page_targets: list[TargetInfo] = [ t @@ -1353,76 +1342,57 @@ class BrowserSession(BaseModel): ) ] - # Check for chrome://newtab pages and immediately redirect them - # to about:blank to avoid JS issues from CDP on chrome://* urls + # Check for chrome://newtab pages and redirect them to about:blank from browser_use.utils import is_new_tab_page - # Collect all targets that need redirection - redirected_targets = [] - redirect_sessions = {} # Store sessions created for redirection to potentially reuse for target in page_targets: target_url = target.get('url', '') if is_new_tab_page(target_url) and target_url != 'about:blank': - # Redirect chrome://newtab to about:blank to avoid JS issues preventing driving chrome://newtab target_id = target['targetId'] self.logger.debug(f'🔄 Redirecting {target_url} to about:blank for target {target_id}') try: - # Create a CDP session for redirection (minimal domains to avoid duplicate event handlers) - # Only enable Page domain for navigation, avoid duplicate event handlers - redirect_session = await CDPSession.for_target(self._cdp_client_root, target_id, domains=['Page']) - # Navigate to about:blank - await redirect_session.cdp_client.send.Page.navigate( - params={'url': 'about:blank'}, session_id=redirect_session.session_id - ) - redirected_targets.append(target_id) - redirect_sessions[target_id] = redirect_session # Store for potential reuse - # Update the target's URL in our list for later use - target['url'] = 'about:blank' - # Small delay to ensure navigation completes - await asyncio.sleep(0.05) + # Sessions now exist from manual attachment above + session = await self._session_manager.get_session_for_target(target_id) + if session: + await session.cdp_client.send.Page.navigate( + params={'url': 'about:blank'}, session_id=session.session_id + ) + target['url'] = 'about:blank' + await asyncio.sleep(0.05) # Let navigation start except Exception as e: - self.logger.warning(f'Failed to redirect {target_url} to about:blank: {e}') - - # Log summary of redirections - if redirected_targets: - self.logger.debug(f'Redirected {len(redirected_targets)} chrome://newtab pages to about:blank') + self.logger.warning(f'Failed to redirect {target_url}: {e}') + # Ensure we have at least one page if not page_targets: - # No pages found, create a new one new_target = await self._cdp_client_root.send.Target.createTarget(params={'url': 'about:blank'}) target_id = new_target['targetId'] - self.logger.debug(f'📄 Created new blank page with target ID: {target_id}') + self.logger.debug(f'📄 Created new blank page: {target_id}') else: - # Use the first available page target_id = [page for page in page_targets if page.get('type') == 'page'][0]['targetId'] - self.logger.debug(f'📄 Using existing page with target ID: {target_id}') + self.logger.debug(f'📄 Using existing page: {target_id}') - # Store the current page target ID and add to pool - # Reuse redirect session if available, otherwise create new one - if target_id in redirect_sessions: - self.logger.debug(f'Reusing redirect session for target {target_id}') - self.agent_focus = redirect_sessions[target_id] - else: - # For the initial connection, we'll use the shared root WebSocket - self.agent_focus = await CDPSession.for_target(self._cdp_client_root, target_id, new_socket=False) - if self.agent_focus: - self._cdp_session_pool[target_id] = self.agent_focus + # Wait for SessionManager to receive the attach event for this target + # (Chrome will fire Target.attachedToTarget event which SessionManager handles) + for _ in range(20): # Wait up to 2 seconds + await asyncio.sleep(0.1) + session = await self._session_manager.get_session_for_target(target_id) + if session: + self.agent_focus = session + # SessionManager already added it to pool - no need to do it manually + self.logger.debug(f'📄 Agent focus set to {target_id[:8]}...') + break + + if not self.agent_focus: + raise RuntimeError(f'Failed to get session for initial target {target_id}') # Enable proxy authentication handling if configured await self._setup_proxy_auth() # Verify the session is working - try: - if self.agent_focus: - assert self.agent_focus.title != 'Unknown title' - else: - raise RuntimeError('Failed to create CDP session') - except Exception as e: - self.logger.warning(f'Failed to create CDP session: {e}') - raise + if self.agent_focus.title == 'Unknown title': + self.logger.warning('Session created but title is unknown (may be normal for about:blank)') # Dispatch TabCreatedEvent for all initial tabs (so watchdogs can initialize) - # This replaces the duplicated logic from navigation_watchdog's _initialize_agent_focus for idx, target in enumerate(page_targets): target_url = target.get('url', '') self.logger.debug(f'Dispatching TabCreatedEvent for initial tab {idx}: {target_url}') @@ -1739,18 +1709,11 @@ class BrowserSession(BaseModel): # First check cached sessions for full_target_id in self._cdp_session_pool.keys(): if full_target_id.endswith(tab_id): - # Verify target still exists if await self._is_target_valid(full_target_id): return full_target_id - else: - # Remove stale session from pool - self.logger.debug(f'Removing stale session for target {full_target_id}') - stale_session = self._cdp_session_pool.pop(full_target_id, None) - if stale_session and stale_session.owns_cdp_client: - try: - await stale_session.disconnect() - except Exception: - pass + # Stale session - Chrome should have sent detach event + # If we're here, event listener will clean it up + self.logger.debug(f'Found stale session for target {full_target_id}, skipping') # Get all current targets and find the one matching tab_id all_targets = await self.cdp_client.send.Target.getTargets() @@ -2427,7 +2390,7 @@ class BrowserSession(BaseModel): async def _cdp_get_cookies(self) -> list[Cookie]: """Get cookies using CDP Network.getCookies.""" - cdp_session = await self.get_or_create_cdp_session(target_id=None, new_socket=False) + cdp_session = await self.get_or_create_cdp_session(target_id=None) result = await asyncio.wait_for( cdp_session.cdp_client.send.Storage.getCookies(session_id=cdp_session.session_id), timeout=8.0 ) @@ -2438,7 +2401,7 @@ class BrowserSession(BaseModel): if not self.agent_focus or not cookies: return - cdp_session = await self.get_or_create_cdp_session(target_id=None, new_socket=False) + cdp_session = await self.get_or_create_cdp_session(target_id=None) # Storage.setCookies expects params dict with 'cookies' key await cdp_session.cdp_client.send.Storage.setCookies( params={'cookies': cookies}, # type: ignore[arg-type] @@ -2509,7 +2472,7 @@ class BrowserSession(BaseModel): """ if target_id: # Set viewport for specific target - cdp_session = await self.get_or_create_cdp_session(target_id, focus=False, new_socket=False) + cdp_session = await self.get_or_create_cdp_session(target_id, focus=False) elif self.agent_focus: # Use current focus cdp_session = self.agent_focus @@ -2525,7 +2488,7 @@ class BrowserSession(BaseModel): async def _cdp_get_origins(self) -> list[dict[str, Any]]: """Get origins with localStorage and sessionStorage using CDP.""" origins = [] - cdp_session = await self.get_or_create_cdp_session(target_id=None, new_socket=False) + cdp_session = await self.get_or_create_cdp_session(target_id=None) try: # Enable DOMStorage domain to track storage diff --git a/browser_use/browser/session_manager.py b/browser_use/browser/session_manager.py new file mode 100644 index 000000000..848e4c262 --- /dev/null +++ b/browser_use/browser/session_manager.py @@ -0,0 +1,226 @@ +"""Event-driven CDP session management. + +Sessions are managed by listening to +Target.attachedToTarget and Target.detachedFromTarget events, ensuring the +session pool always reflects the current browser state. +""" + +import asyncio +from typing import TYPE_CHECKING, Dict, Set + +from cdp_use.cdp.target import AttachedToTargetEvent, DetachedFromTargetEvent, SessionID, TargetID + +if TYPE_CHECKING: + from browser_use.browser.session import BrowserSession, CDPSession + + +class SessionManager: + """Manages CDP sessions with event-driven synchronization. + + Key differences from manual caching: + - Sessions are added/removed via CDP events, not manual calls + - Multiple sessions can attach to the same target + - Targets are only removed when ALL sessions detach + - No stale sessions - pool always reflects browser reality + """ + + def __init__(self, browser_session: 'BrowserSession'): + self.browser_session = browser_session + self.logger = browser_session.logger + + # Target -> Set of sessions attached to it + self._target_sessions: Dict[TargetID, Set[SessionID]] = {} + + # Session -> Target mapping (for reverse lookup) + self._session_to_target: Dict[SessionID, TargetID] = {} + + # Lock for thread-safe access + self._lock = asyncio.Lock() + + async def start_monitoring(self) -> None: + """Start monitoring Target attach/detach events. + + This registers CDP event handlers that keep the session pool synchronized. + """ + if not self.browser_session._cdp_client_root: + raise RuntimeError('CDP client not initialized') + + # Capture cdp_client_root in closure to avoid type errors + cdp_client = self.browser_session._cdp_client_root + + # Register synchronous event handlers (CDP requirement) + def on_attached(event: AttachedToTargetEvent, session_id: SessionID | None = None): + event_session_id = event['sessionId'] + target_type = event['targetInfo'].get('type', 'unknown') + + # Enable auto-attach for this session's children + # Note: For short-lived targets (workers, temp iframes), this may fail with -32001 + # This is EXPECTED - the session_id is valid when event fires, but may detach + # before our async task executes. We catch and ignore these errors. + async def _enable_auto_attach(): + try: + await cdp_client.send.Target.setAutoAttach( + params={'autoAttach': True, 'waitForDebuggerOnStart': False, 'flatten': True}, session_id=event_session_id + ) + self.logger.debug(f'[SessionManager] Auto-attach enabled for {target_type} session {event_session_id[:8]}...') + except Exception as e: + # Expected for workers/temp iframes that attach/detach rapidly + # The session_id is valid in the event, but gone by the time task executes + error_str = str(e) + if '-32001' in error_str or 'Session with given id not found' in error_str: + self.logger.debug( + f'[SessionManager] setAutoAttach skipped - {target_type} session {event_session_id[:8]}... ' + f'already detached (normal for short-lived targets)' + ) + else: + self.logger.debug(f'[SessionManager] setAutoAttach failed: {e}') + + # Schedule auto-attach and pool management + asyncio.create_task(_enable_auto_attach()) + asyncio.create_task(self._handle_target_attached(event)) + + def on_detached(event: DetachedFromTargetEvent, session_id: SessionID | None = None): + asyncio.create_task(self._handle_target_detached(event)) + + self.browser_session._cdp_client_root.register.Target.attachedToTarget(on_attached) + self.browser_session._cdp_client_root.register.Target.detachedFromTarget(on_detached) + + self.logger.info('[SessionManager] Event monitoring started') + + async def _handle_target_attached(self, event: AttachedToTargetEvent) -> None: + """Handle Target.attachedToTarget event. + + Called automatically by Chrome when a new target/session is created. + This is the ONLY place where sessions should be added to the pool. + """ + target_id = event['targetInfo']['targetId'] + session_id = event['sessionId'] + target_type = event['targetInfo']['type'] + waiting_for_debugger = event.get('waitingForDebugger', False) + + self.logger.debug( + f'[SessionManager] Target attached: {target_id[:8]}... (session={session_id[:8]}..., ' + f'type={target_type}, waitingForDebugger={waiting_for_debugger})' + ) + + async with self._lock: + # Track this session for the target + if target_id not in self._target_sessions: + self._target_sessions[target_id] = set() + + self._target_sessions[target_id].add(session_id) + self._session_to_target[session_id] = target_id + + # Create CDPSession wrapper and add to pool + if target_id not in self.browser_session._cdp_session_pool: + # Create session wrapper (uses shared WebSocket, just tracks session_id) + from browser_use.browser.session import CDPSession + + assert self.browser_session._cdp_client_root is not None, 'Root CDP client required' + + cdp_session = CDPSession( + cdp_client=self.browser_session._cdp_client_root, + target_id=target_id, + session_id=session_id, + title=event['targetInfo'].get('title', 'Unknown title'), + url=event['targetInfo'].get('url', 'about:blank'), + ) + + self.browser_session._cdp_session_pool[target_id] = cdp_session + + self.logger.debug( + f'[SessionManager] Created session for target {target_id[:8]}... ' + f'(pool size: {len(self.browser_session._cdp_session_pool)})' + ) + else: + # Update existing session with new session_id + existing = self.browser_session._cdp_session_pool[target_id] + existing.session_id = session_id + existing.title = event['targetInfo'].get('title', existing.title) + existing.url = event['targetInfo'].get('url', existing.url) + + # Resume execution if waiting for debugger + if waiting_for_debugger: + try: + assert self.browser_session._cdp_client_root is not None + await self.browser_session._cdp_client_root.send.Runtime.runIfWaitingForDebugger(session_id=session_id) + self.logger.debug(f'[SessionManager] Resumed execution for session {session_id[:8]}...') + except Exception as e: + self.logger.warning(f'[SessionManager] Failed to resume execution: {e}') + + async def _handle_target_detached(self, event: DetachedFromTargetEvent) -> None: + """Handle Target.detachedFromTarget event. + + Called automatically by Chrome when a target/session is destroyed. + This is the ONLY place where sessions should be removed from the pool. + """ + session_id = event['sessionId'] + target_id = event.get('targetId') # May be empty + + # If targetId not in event, look it up via session mapping + if not target_id: + async with self._lock: + target_id = self._session_to_target.get(session_id) + + if not target_id: + self.logger.warning(f'[SessionManager] Session detached but target unknown (session={session_id[:8]}...)') + return + + async with self._lock: + # Remove this session from target's session set + if target_id in self._target_sessions: + self._target_sessions[target_id].discard(session_id) + + remaining_sessions = len(self._target_sessions[target_id]) + + self.logger.debug( + f'[SessionManager] Session detached: target={target_id[:8]}... ' + f'session={session_id[:8]}... (remaining={remaining_sessions})' + ) + + # Only remove target when NO sessions remain + if remaining_sessions == 0: + self.logger.info(f'[SessionManager] No sessions remain for target {target_id[:8]}..., removing from pool') + + # Remove from pool + if target_id in self.browser_session._cdp_session_pool: + stale_session = self.browser_session._cdp_session_pool.pop(target_id) + # Don't disconnect - we're using shared WebSocket + self.logger.debug( + f'[SessionManager] Removed target {target_id[:8]}... from pool ' + f'(pool size: {len(self.browser_session._cdp_session_pool)})' + ) + + # Clean up tracking + del self._target_sessions[target_id] + + # Remove from reverse mapping + if session_id in self._session_to_target: + del self._session_to_target[session_id] + + async def get_session_for_target(self, target_id: TargetID) -> 'CDPSession | None': + """Get the current valid session for a target. + + Returns None if no session exists (target detached). + """ + async with self._lock: + return self.browser_session._cdp_session_pool.get(target_id) + + async def validate_session(self, target_id: TargetID) -> bool: + """Check if a target still has active sessions. + + Returns True if target is valid, False if it should be removed. + """ + async with self._lock: + if target_id not in self._target_sessions: + return False + + return len(self._target_sessions[target_id]) > 0 + + async def clear(self) -> None: + """Clear all session tracking (for cleanup).""" + async with self._lock: + self._target_sessions.clear() + self._session_to_target.clear() + + self.logger.info('[SessionManager] Cleared all session tracking') diff --git a/browser_use/browser/watchdog_base.py b/browser_use/browser/watchdog_base.py index 582f73090..8390faaad 100644 --- a/browser_use/browser/watchdog_base.py +++ b/browser_use/browser/watchdog_base.py @@ -129,33 +129,36 @@ class BaseWatchdog(BaseModel): f'🚌 {watchdog_and_handler_str} ❌ Failed ({time_elapsed:.2f}s): {type(e).__name__}: {e}' ) - # attempt to repair potentially crashed CDP session - try: - target_id_to_restore = None - if browser_session.agent_focus and browser_session.agent_focus.target_id: - # Common issue with CDP, recreate session with new socket to recover - target_id_to_restore = browser_session.agent_focus.target_id - browser_session.logger.debug( - f'🚌 {watchdog_and_handler_str} ⚠️ Recreating session to try and recover crashed CDP session\n\t{browser_session.agent_focus}' - ) - del browser_session._cdp_session_pool[browser_session.agent_focus.target_id] - browser_session.agent_focus = await browser_session.get_or_create_cdp_session( - target_id=target_id_to_restore, new_socket=True - ) - else: - await browser_session.get_or_create_cdp_session(target_id=None, new_socket=True, focus=True) - except Exception as sub_error: - if 'ConnectionClosedError' in str(type(sub_error)) or 'ConnectionError' in str(type(sub_error)): - browser_session.logger.error( - f'🚌 {watchdog_and_handler_str} ❌ Browser closed or CDP Connection disconnected by remote. {type(sub_error).__name__}: {sub_error}\n' - ) - raise - else: - browser_session.logger.error( - f'🚌 {watchdog_and_handler_str} ❌ CDP connected but failed to re-create CDP session after error "{type(original_error).__name__}: {original_error}" in {actual_handler.__name__}({event.event_type}#{event.event_id[-4:]}): due to {type(sub_error).__name__}: {sub_error}\n' - ) + # attempt to repair potentially crashed CDP session + try: + if browser_session.agent_focus and browser_session.agent_focus.target_id: + # With event-driven sessions, Chrome will send detach/attach events + # SessionManager handles pool cleanup automatically + target_id_to_restore = browser_session.agent_focus.target_id + browser_session.logger.debug( + f'🚌 {watchdog_and_handler_str} ⚠️ Session error detected, waiting for CDP events to sync\n\t{browser_session.agent_focus}' + ) - raise + # Wait for new attach event to restore the session + # This will raise ValueError if target doesn't re-attach + browser_session.agent_focus = await browser_session.get_or_create_cdp_session( + target_id=target_id_to_restore, focus=True + ) + else: + # Try to get any available session + await browser_session.get_or_create_cdp_session(target_id=None, focus=True) + except Exception as sub_error: + if 'ConnectionClosedError' in str(type(sub_error)) or 'ConnectionError' in str(type(sub_error)): + browser_session.logger.error( + f'🚌 {watchdog_and_handler_str} ❌ Browser closed or CDP Connection disconnected by remote. {type(sub_error).__name__}: {sub_error}\n' + ) + raise + else: + browser_session.logger.error( + f'🚌 {watchdog_and_handler_str} ❌ CDP connected but failed to re-create CDP session after error "{type(original_error).__name__}: {original_error}" in {actual_handler.__name__}({event.event_type}#{event.event_id[-4:]}): due to {type(sub_error).__name__}: {sub_error}\n' + ) + + raise return unique_handler diff --git a/browser_use/browser/watchdogs/crash_watchdog.py b/browser_use/browser/watchdogs/crash_watchdog.py index c0e096082..9f14d0a91 100644 --- a/browser_use/browser/watchdogs/crash_watchdog.py +++ b/browser_use/browser/watchdogs/crash_watchdog.py @@ -168,10 +168,7 @@ class CrashWatchdog(BaseWatchdog): async def _on_target_crash_cdp(self, target_id: TargetID) -> None: """Handle target crash detected via CDP.""" - # Remove crashed session from pool - if session := self.browser_session._cdp_session_pool.pop(target_id, None): - await session.disconnect() - self.logger.debug(f'[CrashWatchdog] Removed crashed session from pool: {target_id}') + self.logger.debug(f'[CrashWatchdog] Target crashed: {target_id}, waiting for detach event') # Get target info cdp_client = self.browser_session.cdp_client @@ -300,8 +297,9 @@ class CrashWatchdog(BaseWatchdog): self.logger.debug( f'[CrashWatchdog] Checking browser health for target {self.browser_session.agent_focus} error: {type(e).__name__}: {e}' ) + # With event-driven sessions, wait for re-attach instead of creating new socket self.agent_focus = cdp_session = await self.browser_session.get_or_create_cdp_session( - target_id=self.agent_focus.target_id, new_socket=True, focus=True + target_id=self.agent_focus.target_id, focus=True ) for target in (await self.browser_session.cdp_client.send.Target.getTargets()).get('targetInfos', []): @@ -326,12 +324,9 @@ class CrashWatchdog(BaseWatchdog): self.logger.error( f'[CrashWatchdog] ❌ Crashed session detected for target {self.browser_session.agent_focus} error: {type(e).__name__}: {e}' ) - # Remove crashed session from pool - if self.browser_session.agent_focus and (target_id := self.browser_session.agent_focus.target_id): - if session := self.browser_session._cdp_session_pool.pop(target_id, None): - await session.disconnect() - self.logger.debug(f'[CrashWatchdog] Removed crashed session from pool: {target_id}') - self.browser_session.agent_focus.target_id = None # type: ignore + # Chrome will send detach event, SessionManager will handle cleanup + if self.browser_session.agent_focus: + self.browser_session.agent_focus.target_id = None # type: ignore # Check browser process if we have PID if self.browser_session._local_browser_watchdog and (proc := self.browser_session._local_browser_watchdog._subprocess): diff --git a/browser_use/browser/watchdogs/storage_state_watchdog.py b/browser_use/browser/watchdogs/storage_state_watchdog.py index 0b38e1283..9c0a3be94 100644 --- a/browser_use/browser/watchdogs/storage_state_watchdog.py +++ b/browser_use/browser/watchdogs/storage_state_watchdog.py @@ -165,7 +165,7 @@ class StorageStateWatchdog(BaseWatchdog): """Save browser storage state to file.""" async with self._save_lock: # Check if CDP client is available - assert await self.browser_session.get_or_create_cdp_session(target_id=None, new_socket=False) + assert await self.browser_session.get_or_create_cdp_session(target_id=None) save_path = path or self.browser_session.browser_profile.storage_state if not save_path: diff --git a/examples/browser/using_cdp.py b/examples/browser/using_cdp.py index b932de03a..7a33540ad 100644 --- a/examples/browser/using_cdp.py +++ b/examples/browser/using_cdp.py @@ -27,16 +27,16 @@ load_dotenv() from browser_use import Agent, Tools from browser_use.browser import BrowserProfile, BrowserSession -from browser_use.llm import ChatOpenAI +from browser_use.llm import ChatBrowserUse -browser_session = BrowserSession(browser_profile=BrowserProfile(cdp_url='http://localhost:9222', is_local=True)) +browser_session = BrowserSession(browser_profile=BrowserProfile(cdp_url='https://b6949893-1bef-4e5b-8e02-940f24215549.cdp0.browser-use.com', is_local=False)) tools = Tools() async def main(): agent = Agent( - task='Visit https://duckduckgo.com and search for "browser-use founders"', - llm=ChatOpenAI(model='gpt-4.1-mini'), + task='Use the search functionality to locate pages detailing tuition and fees, then extract the published tuition fee information for undergraduate programs. website: https://columbia.edu', + llm=ChatBrowserUse(model='bu-latest'), tools=tools, browser_session=browser_session, )