diff --git a/browser_use/agent/service.py b/browser_use/agent/service.py
index 7a684110d..733aead9f 100644
--- a/browser_use/agent/service.py
+++ b/browser_use/agent/service.py
@@ -1251,7 +1251,9 @@ class Agent(Generic[Context, AgentStructuredOutput]):
# Start browser session and attach watchdogs
assert self.browser_session is not None, 'Browser session must be initialized before starting'
self.logger.debug('π Starting browser session...')
- await self.browser_session.start()
+ from browser_use.browser.events import BrowserStartEvent
+ event = self.browser_session.event_bus.dispatch(BrowserStartEvent())
+ await event
self.logger.debug('π§ Browser session started with watchdogs attached')
diff --git a/browser_use/browser/default_action_watchdog.py b/browser_use/browser/default_action_watchdog.py
index c6e52bfb3..14a873ab5 100644
--- a/browser_use/browser/default_action_watchdog.py
+++ b/browser_use/browser/default_action_watchdog.py
@@ -33,16 +33,9 @@ class DefaultActionWatchdog(BaseWatchdog):
"""Handle click request with CDP."""
page = await self.browser_session.get_current_page()
try:
- # Get the DOM element by index or use provided element_node
- if event.element_node is not None:
- element_node = event.element_node
- # For element_node clicks, we need to get its index for logging
- index_for_logging = getattr(element_node, 'highlight_index', 'N/A')
- else:
- element_node = await self.browser_session.get_dom_element_by_index(event.index)
- if element_node is None:
- raise Exception(f'Element index {event.index} does not exist - retry or use alternative actions')
- index_for_logging = event.index
+ # Use the provided node
+ element_node = event.node
+ index_for_logging = element_node.element_index or 'unknown'
# Track initial number of tabs to detect new tab opening
initial_target_ids = await self.browser_session.target_ids
@@ -94,7 +87,7 @@ class DefaultActionWatchdog(BaseWatchdog):
BrowserErrorEvent(
error_type='ClickFailed',
message=str(e),
- details={'index': index_for_logging if 'index_for_logging' in locals() else event.index},
+ details={'index': index_for_logging if 'index_for_logging' in locals() else 'unknown'},
)
)
@@ -102,23 +95,22 @@ class DefaultActionWatchdog(BaseWatchdog):
"""Handle text input request with CDP."""
page = await self.browser_session.get_current_page()
try:
- # Get the DOM element by index
- element_node = await self.browser_session.get_dom_element_by_index(event.index)
- if element_node is None:
- raise Exception(f'Element index {event.index} does not exist - retry or use alternative actions')
+ # Use the provided node
+ element_node = event.node
+ index_for_logging = element_node.element_index or 'unknown'
# Perform the actual text input
await self._input_text_element_node_impl(element_node, event.text, event.clear_existing)
# Log success
- logger.info(f'β¨οΈ Typed "{event.text}" into element with index {event.index}')
+ logger.info(f'β¨οΈ Typed "{event.text}" into element with index {index_for_logging}')
logger.debug(f'Element xpath: {element_node.xpath}')
except Exception as e:
self.event_bus.dispatch(
BrowserErrorEvent(
error_type='InputTextFailed',
message=str(e),
- details={'index': event.index, 'text': event.text},
+ details={'index': element_node.element_index or 'unknown', 'text': event.text},
)
)
@@ -140,16 +132,15 @@ class DefaultActionWatchdog(BaseWatchdog):
# Positive pixels = scroll down, negative = scroll up
pixels = event.amount if event.direction == 'down' else -event.amount
- # Element-specific scrolling if index is provided
- if event.element_index is not None:
- element_node = await self.browser_session.get_dom_element_by_index(event.element_index)
- if element_node is None:
- raise Exception(f'Element index {event.element_index} does not exist')
+ # Element-specific scrolling if node is provided
+ if event.node is not None:
+ element_node = event.node
+ index_for_logging = element_node.element_index or 'unknown'
# Try to scroll the element's container
success = await self._scroll_element_container(element_node, pixels)
if success:
- logger.info(f'π Scrolled element {event.element_index} container {event.direction} by {event.amount} pixels')
+ logger.info(f'π Scrolled element {index_for_logging} container {event.direction} by {event.amount} pixels')
return
# Perform page-level scroll
@@ -726,14 +717,13 @@ class DefaultActionWatchdog(BaseWatchdog):
async def on_UploadFileEvent(self, event: UploadFileEvent) -> None:
"""Handle file upload request with CDP."""
try:
- # Get the DOM element by index
- element_node = await self.browser_session.get_dom_element_by_index(event.element_index)
- if element_node is None:
- raise Exception(f'Element index {event.element_index} does not exist')
+ # Use the provided node
+ element_node = event.node
+ index_for_logging = element_node.element_index or 'unknown'
# Check if it's a file input
if not self.browser_session.is_file_input(element_node):
- raise Exception(f'Element {event.element_index} is not a file input')
+ raise Exception(f'Element {index_for_logging} is not a file input')
# Get CDP client and session
cdp_client = await self.browser_session.get_cdp_client()
@@ -749,13 +739,13 @@ class DefaultActionWatchdog(BaseWatchdog):
session_id=session_id,
)
- logger.info(f'π Uploaded file {event.file_path} to element {event.element_index}')
+ logger.info(f'π Uploaded file {event.file_path} to element {index_for_logging}')
except Exception as e:
self.event_bus.dispatch(
BrowserErrorEvent(
error_type='UploadFileFailed',
message=str(e),
- details={'element_index': event.element_index, 'file_path': event.file_path},
+ details={'element_index': element_node.element_index or 'unknown', 'file_path': event.file_path},
)
)
diff --git a/browser_use/browser/events.py b/browser_use/browser/events.py
index 0fb425ee6..6cea30520 100644
--- a/browser_use/browser/events.py
+++ b/browser_use/browser/events.py
@@ -1,9 +1,12 @@
"""Event definitions for browser communication."""
-from typing import Any, Literal
+from typing import TYPE_CHECKING, Any, Literal
from bubus import BaseEvent
-from pydantic import Field, model_validator
+from pydantic import Field
+
+if TYPE_CHECKING:
+ from browser_use.dom.views import EnhancedDOMTreeNode
# ============================================================================
# Agent/Controller -> BrowserSession Events (High-level browser actions)
@@ -20,29 +23,19 @@ class NavigateToUrlEvent(BaseEvent):
class ClickElementEvent(BaseEvent):
- """Click an element by index or element_node."""
+ """Click an element."""
- index: int | None = None
- element_node: Any | None = None # DOMElementNode, but avoid circular import
+ node: 'EnhancedDOMTreeNode'
button: Literal['left', 'right', 'middle'] = 'left'
click_count: int = 1
expect_download: bool = False
new_tab: bool = False
- @model_validator(mode='after')
- def validate_index_or_element_node(self):
- """Validate that either index or element_node is provided."""
- if self.index is None and self.element_node is None:
- raise ValueError("Either 'index' or 'element_node' must be provided")
- if self.index is not None and self.element_node is not None:
- raise ValueError("Only one of 'index' or 'element_node' should be provided")
- return self
-
class TypeTextEvent(BaseEvent):
"""Type text into an element."""
- index: int
+ node: 'EnhancedDOMTreeNode'
text: str
clear_existing: bool = True
@@ -52,7 +45,7 @@ class ScrollEvent(BaseEvent):
direction: Literal['up', 'down', 'left', 'right']
amount: int # pixels
- element_index: int | None = None # None means scroll page
+ node: 'EnhancedDOMTreeNode | None' = None # None means scroll page
class SwitchTabEvent(BaseEvent):
@@ -82,13 +75,13 @@ class BrowserStateRequestEvent(BaseEvent):
cache_clickable_elements_hashes: bool = True
-class WaitForConditionEvent(BaseEvent):
- """Wait for a condition."""
+# class WaitForConditionEvent(BaseEvent):
+# """Wait for a condition."""
- condition: Literal['navigation', 'selector', 'timeout', 'load_state']
- timeout: float = 30000
- selector: str | None = None
- state: Literal['attached', 'detached', 'visible', 'hidden'] | None = None
+# condition: Literal['navigation', 'selector', 'timeout', 'load_state']
+# timeout: float = 30000
+# selector: str | None = None
+# state: Literal['attached', 'detached', 'visible', 'hidden'] | None = None
class GoBackEvent(BaseEvent):
@@ -125,7 +118,7 @@ class SendKeysEvent(BaseEvent):
class UploadFileEvent(BaseEvent):
"""Upload a file to an element."""
- element_index: int
+ node: 'EnhancedDOMTreeNode'
file_path: str
@@ -285,17 +278,6 @@ class BrowserErrorEvent(BaseEvent):
details: dict[str, Any] = Field(default_factory=dict)
-# ============================================================================
-# Response Events (for request-response pattern)
-# ============================================================================
-
-
-class BrowserStateChangedEvent(BaseEvent):
- """Response to BrowserStateRequestEvent."""
-
- state: Any # BrowserStateSummary object
-
-
# ============================================================================
# Storage State Events
# ============================================================================
diff --git a/browser_use/browser/session.py b/browser_use/browser/session.py
index 4d0de01fa..941762b0c 100644
--- a/browser_use/browser/session.py
+++ b/browser_use/browser/session.py
@@ -2,7 +2,7 @@
import asyncio
import logging
-from typing import Any, Self
+from typing import TYPE_CHECKING, Any, Self
from bubus import EventBus
from bubus.helpers import retry
@@ -31,6 +31,10 @@ from browser_use.utils import (
time_execution_async,
)
+if TYPE_CHECKING:
+ from cdp_use import CDPClient
+ from browser_use.dom.views import EnhancedDOMTreeNode
+
_GLOB_WARNING_SHOWN = False # used inside _is_url_allowed to avoid spamming the logs with the same warning multiple times
MAX_SCREENSHOT_HEIGHT = 2000
@@ -75,15 +79,14 @@ class BrowserSession(BaseModel):
# Connection info (for backwards compatibility)
cdp_url: str | None = None
is_local: bool = Field(default=True)
+
+ # Mutable state
+ current_target_id: str | None = None
+ """Current active target ID for the main page"""
# Event bus
event_bus: EventBus = Field(default_factory=EventBus)
- # Browser state
- _playwright: PlaywrightOrPatchright | None = PrivateAttr(default=None)
- _browser: Browser | None = PrivateAttr(default=None)
- _browser_context: BrowserContext | None = PrivateAttr(default=None)
-
# PDF handling
_auto_download_pdfs: bool = PrivateAttr(default=True)
@@ -101,6 +104,13 @@ class BrowserSession(BaseModel):
# Cached browser state for synchronous access
_cached_browser_state_summary: Any = PrivateAttr(default=None)
+ _cached_selector_map: dict[int, 'EnhancedDOMTreeNode'] = PrivateAttr(default_factory=dict)
+ """Cached mapping of element indices to DOM nodes"""
+
+ # CDP client
+ _cdp_client: 'CDPClient | None' = PrivateAttr(default=None)
+ """Cached CDP client instance"""
+
_logger: Any = PrivateAttr(default=None)
@property
@@ -114,6 +124,17 @@ class BrowserSession(BaseModel):
self._logger = logging.getLogger(f'browser_use.{self}')
return self._logger
+ @property
+ def cdp_client(self) -> 'CDPClient | None':
+ """Get the cached CDP client if it exists.
+
+ The client is created and started in setup_browser_via_cdp_url().
+
+ Returns:
+ The CDP client instance or None if not yet created
+ """
+ return self._cdp_client
+
def __repr__(self) -> str:
port_number_or_pid = (self.cdp_url or str(self.browser_pid) or 'playwright').rsplit(':', 1)[-1].split('/', 1)[0]
return f'BrowserSessionπ {self.id[-4:]}:{port_number_or_pid} #{str(id(self))[-2:]} (cdp_url={self.cdp_url}, profile={self.browser_profile})'
@@ -121,7 +142,7 @@ class BrowserSession(BaseModel):
def __str__(self) -> str:
# Note: _original_browser_session tracking moved to Agent class
port_number_or_pid = (
- (self.cdp_url or self.wss_url or str(self.browser_pid) or 'playwright').rsplit(':', 1)[-1].split('/', 1)[0]
+ (self.cdp_url or str(self.browser_pid) or 'playwright').rsplit(':', 1)[-1].split('/', 1)[0]
)
return f'BrowserSessionπ {self.id[-4:]}:{port_number_or_pid} #{str(id(self))[-2:]}' # ' π
{str(id(self.current_target_id))[-2:]}'
@@ -150,38 +171,8 @@ class BrowserSession(BaseModel):
assert self.cdp_url and '://' in self.cdp_url
- # Connect via CDP
- self._playwright = await async_playwright().start()
-
- # Get connection kwargs and exclude accept_downloads when using CDP download behavior
- connect_kwargs = self.browser_profile.kwargs_for_connect().model_dump(exclude={'accept_downloads'})
-
- self._browser = await self._playwright.chromium.connect_over_cdp(
- self.cdp_url,
- **connect_kwargs,
- )
-
- # Enable downloads via CDP Browser.setDownloadBehavior
- if self.browser_profile.downloads_path:
- try:
- cdp_session = await self._browser.new_browser_cdp_session()
- await cdp_session.send(
- 'Browser.setDownloadBehavior',
- {'behavior': 'allow', 'downloadPath': str(self.browser_profile.downloads_path)},
- )
- logger.debug(
- f'[Session] Enabled downloads via Browser.setDownloadBehavior to: {self.browser_profile.downloads_path}'
- )
- except Exception as e:
- logger.error(f'[Session] Failed to set browser download behavior via CDP: {e}')
-
- # Get or create browser context
- if self._browser.contexts:
- self._browser_context = self._browser.contexts[0]
- else:
- self._browser_context = await self._browser.new_context(
- **self.browser_profile.kwargs_for_new_context().model_dump(mode='json', exclude_unset=True)
- )
+ # Setup browser via CDP without Playwright
+ await self.setup_browser_via_cdp_url()
# Notify that browser is connected
self.event_bus.dispatch(BrowserConnectedEvent(cdp_url=self.cdp_url))
@@ -200,21 +191,14 @@ class BrowserSession(BaseModel):
"""Handle browser stop request."""
try:
+ # TODO: close all pages here or tell the browser to close gracefully? is there any point?
+ # we might need to give the browser time to save trace files, recordings, etc. during shutdown
+
# Check if we should keep the browser alive
if self.browser_profile.keep_alive and not event.force:
self.event_bus.dispatch(BrowserStoppedEvent(reason='Kept alive due to keep_alive=True'))
return
- # Close context if we created it
- if self._browser_context:
- await self._browser_context.close()
- self._browser_context = None
-
- # Clean up playwright
- if self._playwright:
- await self._playwright.stop()
- self._playwright = None
-
# Reset state
self._browser = None
self._browser_context = None
@@ -291,11 +275,7 @@ class BrowserSession(BaseModel):
# Manually copy over the excluded fields that are needed for browser connection
# These fields are excluded in the model config but need to be shared
- copy._playwright = self._playwright
- copy._browser = self._browser
- copy._browser_context = self._browser_context
copy.current_target_id = self.current_target_id
- copy.browser_pid = self.browser_pid
return copy
@@ -323,7 +303,7 @@ class BrowserSession(BaseModel):
ws_url = version_info.json()['webSocketDebuggerUrl']
# Create and store the CDP client for direct CDP communication
- if not hasattr(self, '_cdp_client'):
+ if self._cdp_client is None:
self._cdp_client = CDPClient(ws_url)
await self._cdp_client.start()
@@ -400,7 +380,8 @@ class BrowserSession(BaseModel):
})();
}
"""
- await self.browser_context.add_init_script(init_script)
+ # TODO: convert this to pure cdp-use and/or move it to the dom_watchdog.py
+ # await self.browser_context.add_init_script(init_script)
@property
async def target_ids(self) -> list[str]:
@@ -431,8 +412,6 @@ class BrowserSession(BaseModel):
# Get all page targets using CDP
pages = await self._cdp_get_all_pages()
- cdp_client = await self.get_cdp_client()
-
for i, page_target in enumerate(pages):
target_id = page_target['targetId']
url = page_target['url']
@@ -449,17 +428,17 @@ class BrowserSession(BaseModel):
# Normal pages - try to get title with CDP for reliability
try:
# Attach to target and get session ID
- session = await cdp_client.send('Target.attachToTarget', {'targetId': target_id, 'flatten': True})
+ session = await self.cdp_client.send('Target.attachToTarget', {'targetId': target_id, 'flatten': True})
session_id = session['sessionId']
# Use CDP to evaluate document.title
title_result = await asyncio.wait_for(
- cdp_client.send('Runtime.evaluate', {'expression': 'document.title'}, session_id=session_id), timeout=2.0
+ self.cdp_client.send('Runtime.evaluate', {'expression': 'document.title'}, session_id=session_id), timeout=2.0
)
title = title_result.get('result', {}).get('value', '')
# Detach from target
- await cdp_client.send('Target.detachFromTarget', {'sessionId': session_id})
+ await self.cdp_client.send('Target.detachFromTarget', {'sessionId': session_id})
# Special handling for PDF pages
if (not title or title == '') and (url.endswith('.pdf') or 'pdf' in url):
@@ -754,31 +733,6 @@ class BrowserSession(BaseModel):
# ========== CDP Helper Methods ==========
- async def get_cdp_client(self) -> Any:
- """Get the CDP client, creating it if necessary."""
- if not hasattr(self, '_cdp_client') or self._cdp_client is None:
- if not self.cdp_url:
- raise ValueError('CDP URL is not set')
-
- # Import cdp-use client
- import httpx
- from cdp_use import CDPClient
-
- # Convert HTTP URL to WebSocket URL if needed
- ws_url = self.cdp_url
- if not ws_url.startswith('ws'):
- # If it's an HTTP URL, fetch the WebSocket URL from /json/version endpoint
- url = ws_url.rstrip('/')
- if not url.endswith('/json/version'):
- url = url + '/json/version'
- async with httpx.AsyncClient() as client:
- version_info = await client.get(url)
- ws_url = version_info.json()['webSocketDebuggerUrl']
-
- self._cdp_client = CDPClient(ws_url)
- await self._cdp_client.start()
-
- return self._cdp_client
async def get_current_page_cdp_session_id(self) -> str | None:
"""Get the CDP session ID for the current page."""
diff --git a/browser_use/controller/service.py b/browser_use/controller/service.py
index d89a690b9..c1f03b907 100644
--- a/browser_use/controller/service.py
+++ b/browser_use/controller/service.py
@@ -29,6 +29,7 @@ from browser_use.browser.events import (
)
from browser_use.browser.views import BrowserError
from browser_use.controller.registry.service import Registry
+from browser_use.dom.views import EnhancedDOMTreeNode
from browser_use.controller.views import (
ClickElementAction,
CloseTabAction,
@@ -165,10 +166,15 @@ class Controller(Generic[Context]):
param_model=ClickElementAction,
)
async def click_element_by_index(params: ClickElementAction, browser_session: BrowserSession):
- # Dispatch click event with index
+ # Look up the node from the selector map
+ node = await browser_session.get_element_by_index(params.index)
+ if node is None:
+ raise ValueError(f"Element index {params.index} not found in DOM")
+
+ # Dispatch click event with node
event = browser_session.event_bus.dispatch(
ClickElementEvent(
- index=params.index,
+ node=node,
expect_download=params.expect_download,
new_tab=params.new_tab
)
@@ -194,10 +200,15 @@ class Controller(Generic[Context]):
param_model=InputTextAction,
)
async def input_text(params: InputTextAction, browser_session: BrowserSession, has_sensitive_data: bool = False):
- # Dispatch type text event
+ # Look up the node from the selector map
+ node = await browser_session.get_element_by_index(params.index)
+ if node is None:
+ raise ValueError(f"Element index {params.index} not found in DOM")
+
+ # Dispatch type text event with node
event = browser_session.event_bus.dispatch(
TypeTextEvent(
- index=params.index,
+ node=node,
text=params.text
)
)
@@ -214,30 +225,33 @@ class Controller(Generic[Context]):
long_term_memory=f"Input '{params.text}' into element {params.index}.",
)
- @self.registry.action('Upload file to interactive element with file path', param_model=UploadFileAction)
- async def upload_file(params: UploadFileAction, browser_session: BrowserSession, available_file_paths: list[str]):
- if params.path not in available_file_paths:
- raise BrowserError(f'File path {params.path} is not available')
+ # @self.registry.action('Upload file to interactive element with file path', param_model=UploadFileAction)
+ # async def upload_file(params: UploadFileAction, browser_session: BrowserSession, available_file_paths: list[str]):
+ # if params.path not in available_file_paths:
+ # raise BrowserError(f'File path {params.path} is not available')
- if not os.path.exists(params.path):
- raise BrowserError(f'File {params.path} does not exist')
+ # if not os.path.exists(params.path):
+ # raise BrowserError(f'File {params.path} does not exist')
- # Dispatch upload file event
- event = browser_session.event_bus.dispatch(
- UploadFileEvent(
- element_index=params.index,
- file_path=params.path
- )
- )
- await event
+ # # Look up the node from the selector map
+ # node = EnhancedDOMTreeNode.from_element_index(browser_session, params.index)
- msg = f'π Successfully uploaded file to index {params.index}'
- logger.info(msg)
- return ActionResult(
- extracted_content=msg,
- include_in_memory=True,
- long_term_memory=f'Uploaded file {params.path} to element {params.index}',
- )
+ # # Dispatch upload file event with node
+ # event = browser_session.event_bus.dispatch(
+ # UploadFileEvent(
+ # node=node,
+ # file_path=params.path
+ # )
+ # )
+ # await event
+
+ # msg = f'π Successfully uploaded file to index {params.index}'
+ # logger.info(msg)
+ # return ActionResult(
+ # extracted_content=msg,
+ # include_in_memory=True,
+ # long_term_memory=f'Uploaded file {params.path} to element {params.index}',
+ # )
# Tab Management Actions
@@ -275,670 +289,674 @@ class Controller(Generic[Context]):
# TODO: Refactor to use events instead of direct page access
# This action is temporarily disabled as it needs refactoring to use events
- if False: # Disabled
- @self.registry.action(
- """Extract structured, semantic data (e.g. product description, price, all information about XYZ) from the current webpage based on a textual query.
- This tool takes the entire markdown of the page and extracts the query from it.
- Set extract_links=True ONLY if your query requires extracting links/URLs from the page.
- Only use this for specific queries for information retrieval from the page. Don't use this to get interactive elements - the tool does not see HTML elements, only the markdown.
- """,
- )
- async def extract_structured_data(
- query: str,
- extract_links: bool,
- page: Page,
- page_extraction_llm: BaseChatModel,
- file_system: FileSystem,
- ):
- from functools import partial
+# @self.registry.action(
+# """Extract structured, semantic data (e.g. product description, price, all information about XYZ) from the current webpage based on a textual query.
+# This tool takes the entire markdown of the page and extracts the query from it.
+# Set extract_links=True ONLY if your query requires extracting links/URLs from the page.
+# Only use this for specific queries for information retrieval from the page. Don't use this to get interactive elements - the tool does not see HTML elements, only the markdown.
+# """,
+# )
+# async def extract_structured_data(
+# query: str,
+# extract_links: bool,
+# page: Page,
+# page_extraction_llm: BaseChatModel,
+# file_system: FileSystem,
+# ):
+# from functools import partial
- import markdownify
+# import markdownify
- strip = []
+# strip = []
- if not extract_links:
- strip = ['a', 'img']
+# if not extract_links:
+# strip = ['a', 'img']
- # Run markdownify in a thread pool to avoid blocking the event loop
- loop = asyncio.get_event_loop()
+# # Run markdownify in a thread pool to avoid blocking the event loop
+# loop = asyncio.get_event_loop()
- # Aggressive timeout for page content
- try:
- page_html_result = await asyncio.wait_for(page.content(), timeout=10.0) # 5 second aggressive timeout
- except TimeoutError:
- raise RuntimeError('Page content extraction timed out after 5 seconds')
- except Exception as e:
- raise RuntimeError(f"Couldn't extract page content: {e}")
+# # Aggressive timeout for page content
+# try:
+# page_html_result = await asyncio.wait_for(page.content(), timeout=10.0) # 5 second aggressive timeout
+# except TimeoutError:
+# raise RuntimeError('Page content extraction timed out after 5 seconds')
+# except Exception as e:
+# raise RuntimeError(f"Couldn't extract page content: {e}")
- page_html = page_html_result
+# page_html = page_html_result
- markdownify_func = partial(markdownify.markdownify, strip=strip)
+# markdownify_func = partial(markdownify.markdownify, strip=strip)
- try:
- content = await asyncio.wait_for(
- loop.run_in_executor(None, markdownify_func, page_html), timeout=5.0
- ) # 5 second aggressive timeout
- except Exception as e:
- logger.warning(f'Markdownify failed: {type(e).__name__}')
- raise RuntimeError(f'Could not convert html to markdown: {type(e).__name__}')
+# try:
+# content = await asyncio.wait_for(
+# loop.run_in_executor(None, markdownify_func, page_html), timeout=5.0
+# ) # 5 second aggressive timeout
+# except Exception as e:
+# logger.warning(f'Markdownify failed: {type(e).__name__}')
+# raise RuntimeError(f'Could not convert html to markdown: {type(e).__name__}')
- # manually append iframe text into the content so it's readable by the LLM (includes cross-origin iframes)
- for iframe in page.frames:
- try:
- await iframe.wait_for_load_state(timeout=1000) # 1 second aggressive timeout for iframe load
- except Exception:
- pass
+# # manually append iframe text into the content so it's readable by the LLM (includes cross-origin iframes)
+# for iframe in page.frames:
+# try:
+# await iframe.wait_for_load_state(timeout=1000) # 1 second aggressive timeout for iframe load
+# except Exception:
+# pass
- if iframe.url != page.url and not iframe.url.startswith('data:') and not iframe.url.startswith('about:'):
- content += f'\n\nIFRAME {iframe.url}:\n'
- # Run markdownify in a thread pool for iframe content as well
- try:
- # Aggressive timeouts for iframe content
- iframe_html = await asyncio.wait_for(iframe.content(), timeout=2.0) # 2 second aggressive timeout
- iframe_markdown = await asyncio.wait_for(
- loop.run_in_executor(None, markdownify_func, iframe_html),
- timeout=2.0, # 2 second aggressive timeout for iframe markdownify
- )
- except Exception:
- iframe_markdown = '' # Skip failed iframes
- content += iframe_markdown
- # replace multiple sequential \n with a single \n
- content = re.sub(r'\n+', '\n', content)
+# if iframe.url != page.url and not iframe.url.startswith('data:') and not iframe.url.startswith('about:'):
+# content += f'\n\nIFRAME {iframe.url}:\n'
+# # Run markdownify in a thread pool for iframe content as well
+# try:
+# # Aggressive timeouts for iframe content
+# iframe_html = await asyncio.wait_for(iframe.content(), timeout=2.0) # 2 second aggressive timeout
+# iframe_markdown = await asyncio.wait_for(
+# loop.run_in_executor(None, markdownify_func, iframe_html),
+# timeout=2.0, # 2 second aggressive timeout for iframe markdownify
+# )
+# except Exception:
+# iframe_markdown = '' # Skip failed iframes
+# content += iframe_markdown
+# # replace multiple sequential \n with a single \n
+# content = re.sub(r'\n+', '\n', content)
- # limit to 30000 characters - remove text in the middle (β15000 tokens)
- max_chars = 30000
- if len(content) > max_chars:
- logger.info(f'Content is too long, removing middle {len(content) - max_chars} characters')
- content = (
- content[: max_chars // 2]
- + '\n... left out the middle because it was too long ...\n'
- + content[-max_chars // 2 :]
- )
+# # limit to 30000 characters - remove text in the middle (β15000 tokens)
+# max_chars = 30000
+# if len(content) > max_chars:
+# logger.info(f'Content is too long, removing middle {len(content) - max_chars} characters')
+# content = (
+# content[: max_chars // 2]
+# + '\n... left out the middle because it was too long ...\n'
+# + content[-max_chars // 2 :]
+# )
- prompt = """You convert websites into structured information. Extract information from this webpage based on the query. Focus only on content relevant to the query. If
-1. The query is vague
-2. Does not make sense for the page
-3. Some/all of the information is not available
+# prompt = """You convert websites into structured information. Extract information from this webpage based on the query. Focus only on content relevant to the query. If
+# 1. The query is vague
+# 2. Does not make sense for the page
+# 3. Some/all of the information is not available
-Explain the content of the page and that the requested information is not available in the page. Respond in JSON format.\nQuery: {query}\n Website:\n{page}"""
- try:
- formatted_prompt = prompt.format(query=query, page=content)
- # Aggressive timeout for LLM call
- response = await asyncio.wait_for(
- page_extraction_llm.ainvoke([UserMessage(content=formatted_prompt)]),
- timeout=120.0, # 120 second aggressive timeout for LLM call
- )
+# Explain the content of the page and that the requested information is not available in the page. Respond in JSON format.\nQuery: {query}\n Website:\n{page}"""
+# try:
+# formatted_prompt = prompt.format(query=query, page=content)
+# # Aggressive timeout for LLM call
+# response = await asyncio.wait_for(
+# page_extraction_llm.ainvoke([UserMessage(content=formatted_prompt)]),
+# timeout=120.0, # 120 second aggressive timeout for LLM call
+# )
- extracted_content = f'Page Link: {page.url}\nQuery: {query}\nExtracted Content:\n{response.completion}'
+# extracted_content = f'Page Link: {page.url}\nQuery: {query}\nExtracted Content:\n{response.completion}'
- # if content is small include it to memory
- MAX_MEMORY_SIZE = 600
- if len(extracted_content) < MAX_MEMORY_SIZE:
- memory = extracted_content
- include_extracted_content_only_once = False
- else:
- # find lines until MAX_MEMORY_SIZE
- lines = extracted_content.splitlines()
- display = ''
- display_lines_count = 0
- for line in lines:
- if len(display) + len(line) < MAX_MEMORY_SIZE:
- display += line + '\n'
- display_lines_count += 1
- else:
- break
- save_result = await file_system.save_extracted_content(extracted_content)
- memory = f'Extracted content from {page.url}\n{query}\n\n\n{display}{len(lines) - display_lines_count} more lines...\n\n{save_result}'
- include_extracted_content_only_once = True
- logger.info(f'π {memory}')
- return ActionResult(
- extracted_content=extracted_content,
- include_extracted_content_only_once=include_extracted_content_only_once,
- long_term_memory=memory,
- )
- except TimeoutError:
- error_msg = f'LLM call timed out for query: {query}'
- logger.warning(error_msg)
- raise RuntimeError(error_msg)
- except Exception as e:
- logger.debug(f'Error extracting content: {e}')
- msg = f'π Extracted from page\n: {content}\n'
- logger.info(msg)
- raise RuntimeError(str(e))
+# # if content is small include it to memory
+# MAX_MEMORY_SIZE = 600
+# if len(extracted_content) < MAX_MEMORY_SIZE:
+# memory = extracted_content
+# include_extracted_content_only_once = False
+# else:
+# # find lines until MAX_MEMORY_SIZE
+# lines = extracted_content.splitlines()
+# display = ''
+# display_lines_count = 0
+# for line in lines:
+# if len(display) + len(line) < MAX_MEMORY_SIZE:
+# display += line + '\n'
+# display_lines_count += 1
+# else:
+# break
+# save_result = await file_system.save_extracted_content(extracted_content)
+# memory = f'Extracted content from {page.url}\n{query}\n\n\n{display}{len(lines) - display_lines_count} more lines...\n\n{save_result}'
+# include_extracted_content_only_once = True
+# logger.info(f'π {memory}')
+# return ActionResult(
+# extracted_content=extracted_content,
+# include_extracted_content_only_once=include_extracted_content_only_once,
+# long_term_memory=memory,
+# )
+# except TimeoutError:
+# error_msg = f'LLM call timed out for query: {query}'
+# logger.warning(error_msg)
+# raise RuntimeError(error_msg)
+# except Exception as e:
+# logger.debug(f'Error extracting content: {e}')
+# msg = f'π Extracted from page\n: {content}\n'
+# logger.info(msg)
+# raise RuntimeError(str(e))
- @self.registry.action(
- 'Scroll the page by specified number of pages (set down=True to scroll down, down=False to scroll up, num_pages=number of pages to scroll like 0.5 for half page, 1.0 for one page, etc.). Optional index parameter to scroll within a specific element or its scroll container (works well for dropdowns and custom UI components).',
- param_model=ScrollAction,
- )
- async def scroll(params: ScrollAction, browser_session: BrowserSession):
- # Dispatch scroll event - the complex logic is handled in the event handler
- event = browser_session.event_bus.dispatch(
- ScrollEvent(
- direction='down' if params.down else 'up',
- amount=params.num_pages, # Pass num_pages, handler will convert to pixels
- element_index=params.index
- )
- )
- await event
+ # @self.registry.action(
+ # 'Scroll the page by specified number of pages (set down=True to scroll down, down=False to scroll up, num_pages=number of pages to scroll like 0.5 for half page, 1.0 for one page, etc.). Optional index parameter to scroll within a specific element or its scroll container (works well for dropdowns and custom UI components).',
+ # param_model=ScrollAction,
+ # )
+ # async def scroll(params: ScrollAction, browser_session: BrowserSession):
+ # # Look up the node from the selector map if index is provided
+ # node = None
+ # if params.index is not None:
+ # node = EnhancedDOMTreeNode.from_element_index(browser_session, params.index)
- direction = 'down' if params.down else 'up'
- target = f'element {params.index}' if params.index is not None else 'the page'
+ # # Dispatch scroll event with node - the complex logic is handled in the event handler
+ # event = browser_session.event_bus.dispatch(
+ # ScrollEvent(
+ # direction='down' if params.down else 'up',
+ # amount=params.num_pages, # Pass num_pages, handler will convert to pixels
+ # node=node
+ # )
+ # )
+ # await event
- if params.num_pages == 1.0:
- long_term_memory = f'Scrolled {direction} {target} by one page'
- else:
- long_term_memory = f'Scrolled {direction} {target} by {params.num_pages} pages'
-
- msg = f'π {long_term_memory}'
- logger.info(msg)
- return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=long_term_memory)
-
- @self.registry.action(
- 'Send strings of special keys to use Playwright page.keyboard.press - examples include Escape, Backspace, Insert, PageDown, Delete, Enter, or Shortcuts such as `Control+o`, `Control+Shift+T`',
- param_model=SendKeysAction,
- )
- async def send_keys(params: SendKeysAction, browser_session: BrowserSession):
- # Dispatch send keys event
- event = browser_session.event_bus.dispatch(
- SendKeysEvent(keys=params.keys)
- )
- await event
+ # direction = 'down' if params.down else 'up'
+ # target = f'element {params.index}' if params.index is not None else 'the page'
- msg = f'β¨οΈ Sent keys: {params.keys}'
- logger.info(msg)
- return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=f'Sent keys: {params.keys}')
+ # if params.num_pages == 1.0:
+ # long_term_memory = f'Scrolled {direction} {target} by one page'
+ # else:
+ # long_term_memory = f'Scrolled {direction} {target} by {params.num_pages} pages'
- @self.registry.action(
- description='Scroll to a text in the current page',
- )
- async def scroll_to_text(text: str, browser_session: BrowserSession): # type: ignore
- # Dispatch scroll to text event
- event = browser_session.event_bus.dispatch(
- ScrollToTextEvent(text=text)
- )
- await event
+ # msg = f'π {long_term_memory}'
+ # logger.info(msg)
+ # return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=long_term_memory)
+
+ # @self.registry.action(
+ # 'Send strings of special keys to use Playwright page.keyboard.press - examples include Escape, Backspace, Insert, PageDown, Delete, Enter, or Shortcuts such as `Control+o`, `Control+Shift+T`',
+ # param_model=SendKeysAction,
+ # )
+ # async def send_keys(params: SendKeysAction, browser_session: BrowserSession):
+ # # Dispatch send keys event
+ # event = browser_session.event_bus.dispatch(
+ # SendKeysEvent(keys=params.keys)
+ # )
+ # await event
- # Check result to see if text was found
- result = await event.event_result()
- if result and result.get('found'):
- msg = f'π Scrolled to text: {text}'
- logger.info(msg)
- return ActionResult(
- extracted_content=msg, include_in_memory=True, long_term_memory=f'Scrolled to text: {text}'
- )
- else:
- msg = f"Text '{text}' not found or not visible on page"
- logger.info(msg)
- return ActionResult(
- extracted_content=msg,
- include_in_memory=True,
- long_term_memory=f"Tried scrolling to text '{text}' but it was not found",
- )
+ # msg = f'β¨οΈ Sent keys: {params.keys}'
+ # logger.info(msg)
+ # return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=f'Sent keys: {params.keys}')
- # File System Actions
- @self.registry.action(
- 'Write or append content to file_name in file system. Allowed extensions are .md, .txt, .json, .csv, .pdf. For .pdf files, write the content in markdown format and it will automatically be converted to a properly formatted PDF document.'
- )
- async def write_file(
- file_name: str,
- content: str,
- file_system: FileSystem,
- append: bool = False,
- trailing_newline: bool = True,
- leading_newline: bool = False,
- ):
- if trailing_newline:
- content += '\n'
- if leading_newline:
- content = '\n' + content
- if append:
- result = await file_system.append_file(file_name, content)
- else:
- result = await file_system.write_file(file_name, content)
- logger.info(f'πΎ {result}')
- return ActionResult(extracted_content=result, include_in_memory=True, long_term_memory=result)
+ # @self.registry.action(
+ # description='Scroll to a text in the current page',
+ # )
+ # async def scroll_to_text(text: str, browser_session: BrowserSession): # type: ignore
+ # # Dispatch scroll to text event
+ # event = browser_session.event_bus.dispatch(
+ # ScrollToTextEvent(text=text)
+ # )
+ # await event
+
+ # # Check result to see if text was found
+ # result = await event.event_result()
+ # if result and result.get('found'):
+ # msg = f'π Scrolled to text: {text}'
+ # logger.info(msg)
+ # return ActionResult(
+ # extracted_content=msg, include_in_memory=True, long_term_memory=f'Scrolled to text: {text}'
+ # )
+ # else:
+ # msg = f"Text '{text}' not found or not visible on page"
+ # logger.info(msg)
+ # return ActionResult(
+ # extracted_content=msg,
+ # include_in_memory=True,
+ # long_term_memory=f"Tried scrolling to text '{text}' but it was not found",
+ # )
- @self.registry.action(
- 'Replace old_str with new_str in file_name. old_str must exactly match the string to replace in original text. Recommended tool to mark completed items in todo.md or change specific contents in a file.'
- )
- async def replace_file_str(file_name: str, old_str: str, new_str: str, file_system: FileSystem):
- result = await file_system.replace_file_str(file_name, old_str, new_str)
- logger.info(f'πΎ {result}')
- return ActionResult(extracted_content=result, include_in_memory=True, long_term_memory=result)
+ # # File System Actions
+ # @self.registry.action(
+ # 'Write or append content to file_name in file system. Allowed extensions are .md, .txt, .json, .csv, .pdf. For .pdf files, write the content in markdown format and it will automatically be converted to a properly formatted PDF document.'
+ # )
+ # async def write_file(
+ # file_name: str,
+ # content: str,
+ # file_system: FileSystem,
+ # append: bool = False,
+ # trailing_newline: bool = True,
+ # leading_newline: bool = False,
+ # ):
+ # if trailing_newline:
+ # content += '\n'
+ # if leading_newline:
+ # content = '\n' + content
+ # if append:
+ # result = await file_system.append_file(file_name, content)
+ # else:
+ # result = await file_system.write_file(file_name, content)
+ # logger.info(f'πΎ {result}')
+ # return ActionResult(extracted_content=result, include_in_memory=True, long_term_memory=result)
- @self.registry.action('Read file_name from file system')
- async def read_file(file_name: str, available_file_paths: list[str], file_system: FileSystem):
- if available_file_paths and file_name in available_file_paths:
- result = await file_system.read_file(file_name, external_file=True)
- else:
- result = await file_system.read_file(file_name)
+ # @self.registry.action(
+ # 'Replace old_str with new_str in file_name. old_str must exactly match the string to replace in original text. Recommended tool to mark completed items in todo.md or change specific contents in a file.'
+ # )
+ # async def replace_file_str(file_name: str, old_str: str, new_str: str, file_system: FileSystem):
+ # result = await file_system.replace_file_str(file_name, old_str, new_str)
+ # logger.info(f'πΎ {result}')
+ # return ActionResult(extracted_content=result, include_in_memory=True, long_term_memory=result)
- MAX_MEMORY_SIZE = 1000
- if len(result) > MAX_MEMORY_SIZE:
- lines = result.splitlines()
- display = ''
- lines_count = 0
- for line in lines:
- if len(display) + len(line) < MAX_MEMORY_SIZE:
- display += line + '\n'
- lines_count += 1
- else:
- break
- remaining_lines = len(lines) - lines_count
- memory = f'{display}{remaining_lines} more lines...' if remaining_lines > 0 else display
- else:
- memory = result
- logger.info(f'πΎ {memory}')
- return ActionResult(
- extracted_content=result,
- include_in_memory=True,
- long_term_memory=memory,
- include_extracted_content_only_once=True,
- )
+ # @self.registry.action('Read file_name from file system')
+ # async def read_file(file_name: str, available_file_paths: list[str], file_system: FileSystem):
+ # if available_file_paths and file_name in available_file_paths:
+ # result = await file_system.read_file(file_name, external_file=True)
+ # else:
+ # result = await file_system.read_file(file_name)
+
+ # MAX_MEMORY_SIZE = 1000
+ # if len(result) > MAX_MEMORY_SIZE:
+ # lines = result.splitlines()
+ # display = ''
+ # lines_count = 0
+ # for line in lines:
+ # if len(display) + len(line) < MAX_MEMORY_SIZE:
+ # display += line + '\n'
+ # lines_count += 1
+ # else:
+ # break
+ # remaining_lines = len(lines) - lines_count
+ # memory = f'{display}{remaining_lines} more lines...' if remaining_lines > 0 else display
+ # else:
+ # memory = result
+ # logger.info(f'πΎ {memory}')
+ # return ActionResult(
+ # extracted_content=result,
+ # include_in_memory=True,
+ # long_term_memory=memory,
+ # include_extracted_content_only_once=True,
+ # )
# TODO: Refactor to use events instead of direct page/dom access
# @self.registry.action(
# description='Get all options from a native dropdown or ARIA menu',
# )
# async def get_dropdown_options(index: int, browser_session: BrowserSession) -> ActionResult:
- """Get all options from a native dropdown or ARIA menu"""
- page = await browser_session.get_current_page()
- dom_element = await browser_session.get_dom_element_by_index(index)
- if dom_element is None:
- raise Exception(f'Element index {index} does not exist - retry or use alternative actions')
+ # """Get all options from a native dropdown or ARIA menu"""
+ # page = await browser_session.get_current_page()
+ # dom_element = await browser_session.get_dom_element_by_index(index)
+ # if dom_element is None:
+ # raise Exception(f'Element index {index} does not exist - retry or use alternative actions')
- try:
- # Frame-aware approach since we know it works
- all_options = []
- frame_index = 0
+ # try:
+ # # Frame-aware approach since we know it works
+ # all_options = []
+ # frame_index = 0
- for frame in page.frames:
- try:
- # First check if it's a native select element
- options = await frame.evaluate(
- """
- (xpath) => {
- const element = document.evaluate(xpath, document, null,
- XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
- if (!element) return null;
+ # for frame in page.frames:
+ # try:
+ # # First check if it's a native select element
+ # options = await frame.evaluate(
+ # """
+ # (xpath) => {
+ # const element = document.evaluate(xpath, document, null,
+ # XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
+ # if (!element) return null;
- // Check if it's a native select element
- if (element.tagName.toLowerCase() === 'select') {
- return {
- type: 'select',
- options: Array.from(element.options).map(opt => ({
- text: opt.text, //do not trim, because we are doing exact match in select_dropdown_option
- value: opt.value,
- index: opt.index
- })),
- id: element.id,
- name: element.name
- };
- }
+ # // Check if it's a native select element
+ # if (element.tagName.toLowerCase() === 'select') {
+ # return {
+ # type: 'select',
+ # options: Array.from(element.options).map(opt => ({
+ # text: opt.text, //do not trim, because we are doing exact match in select_dropdown_option
+ # value: opt.value,
+ # index: opt.index
+ # })),
+ # id: element.id,
+ # name: element.name
+ # };
+ # }
- // Check if it's an ARIA menu
- if (element.getAttribute('role') === 'menu' ||
- element.getAttribute('role') === 'listbox' ||
- element.getAttribute('role') === 'combobox') {
- // Find all menu items
- const menuItems = element.querySelectorAll('[role="menuitem"], [role="option"]');
- const options = [];
+ # // Check if it's an ARIA menu
+ # if (element.getAttribute('role') === 'menu' ||
+ # element.getAttribute('role') === 'listbox' ||
+ # element.getAttribute('role') === 'combobox') {
+ # // Find all menu items
+ # const menuItems = element.querySelectorAll('[role="menuitem"], [role="option"]');
+ # const options = [];
- menuItems.forEach((item, idx) => {
- // Get the text content of the menu item
- const text = item.textContent.trim();
- if (text) {
- options.push({
- text: text,
- value: text, // For ARIA menus, use text as value
- index: idx
- });
- }
- });
+ # menuItems.forEach((item, idx) => {
+ # // Get the text content of the menu item
+ # const text = item.textContent.trim();
+ # if (text) {
+ # options.push({
+ # text: text,
+ # value: text, // For ARIA menus, use text as value
+ # index: idx
+ # });
+ # }
+ # });
- return {
- type: 'aria',
- options: options,
- id: element.id || '',
- name: element.getAttribute('aria-label') || ''
- };
- }
+ # return {
+ # type: 'aria',
+ # options: options,
+ # id: element.id || '',
+ # name: element.getAttribute('aria-label') || ''
+ # };
+ # }
- return null;
- }
- """,
- dom_element.xpath,
- )
+ # return null;
+ # }
+ # """,
+ # dom_element.xpath,
+ # )
- if options:
- logger.debug(f'Found {options["type"]} dropdown in frame {frame_index}')
- logger.debug(f'Element ID: {options["id"]}, Name: {options["name"]}')
+ # if options:
+ # logger.debug(f'Found {options["type"]} dropdown in frame {frame_index}')
+ # logger.debug(f'Element ID: {options["id"]}, Name: {options["name"]}')
- formatted_options = []
- for opt in options['options']:
- # encoding ensures AI uses the exact string in select_dropdown_option
- encoded_text = json.dumps(opt['text'])
- formatted_options.append(f'{opt["index"]}: text={encoded_text}')
+ # formatted_options = []
+ # for opt in options['options']:
+ # # encoding ensures AI uses the exact string in select_dropdown_option
+ # encoded_text = json.dumps(opt['text'])
+ # formatted_options.append(f'{opt["index"]}: text={encoded_text}')
- all_options.extend(formatted_options)
+ # all_options.extend(formatted_options)
- except Exception as frame_e:
- logger.debug(f'Frame {frame_index} evaluation failed: {str(frame_e)}')
+ # except Exception as frame_e:
+ # logger.debug(f'Frame {frame_index} evaluation failed: {str(frame_e)}')
- frame_index += 1
+ # frame_index += 1
- if all_options:
- msg = '\n'.join(all_options)
- msg += '\nUse the exact text string in select_dropdown_option'
- logger.info(msg)
- return ActionResult(
- extracted_content=msg,
- include_in_memory=True,
- long_term_memory=f'Found dropdown options for index {index}.',
- include_extracted_content_only_once=True,
- )
- else:
- msg = 'No options found in any frame for dropdown'
- logger.info(msg)
- return ActionResult(
- extracted_content=msg, include_in_memory=True, long_term_memory='No dropdown options found'
- )
+ # if all_options:
+ # msg = '\n'.join(all_options)
+ # msg += '\nUse the exact text string in select_dropdown_option'
+ # logger.info(msg)
+ # return ActionResult(
+ # extracted_content=msg,
+ # include_in_memory=True,
+ # long_term_memory=f'Found dropdown options for index {index}.',
+ # include_extracted_content_only_once=True,
+ # )
+ # else:
+ # msg = 'No options found in any frame for dropdown'
+ # logger.info(msg)
+ # return ActionResult(
+ # extracted_content=msg, include_in_memory=True, long_term_memory='No dropdown options found'
+ # )
- except Exception as e:
- logger.error(f'Failed to get dropdown options: {str(e)}')
- msg = f'Error getting options: {str(e)}'
- logger.info(msg)
- return ActionResult(extracted_content=msg, include_in_memory=True)
+ # except Exception as e:
+ # logger.error(f'Failed to get dropdown options: {str(e)}')
+ # msg = f'Error getting options: {str(e)}'
+ # logger.info(msg)
+ # return ActionResult(extracted_content=msg, include_in_memory=True)
# TODO: Refactor to use events instead of direct page/dom access
# @self.registry.action(
# description='Select dropdown option or ARIA menu item for interactive element index by the text of the option you want to select',
# )
# async def select_dropdown_option(
- index: int,
- text: str,
- browser_session: BrowserSession,
- ) -> ActionResult:
- """Select dropdown option or ARIA menu item by the text of the option you want to select"""
- page = await browser_session.get_current_page()
- dom_element = await browser_session.get_dom_element_by_index(index)
- if dom_element is None:
- raise Exception(f'Element index {index} does not exist - retry or use alternative actions')
+ # index: int,
+ # text: str,
+ # browser_session: BrowserSession,
+ # ) -> ActionResult:
+ # """Select dropdown option or ARIA menu item by the text of the option you want to select"""
+ # page = await browser_session.get_current_page()
+ # dom_element = await browser_session.get_dom_element_by_index(index)
+ # if dom_element is None:
+ # raise Exception(f'Element index {index} does not exist - retry or use alternative actions')
- logger.debug(f"Attempting to select '{text}' using xpath: {dom_element.xpath}")
- logger.debug(f'Element attributes: {dom_element.attributes}')
- logger.debug(f'Element tag: {dom_element.tag_name}')
+ # logger.debug(f"Attempting to select '{text}' using xpath: {dom_element.xpath}")
+ # logger.debug(f'Element attributes: {dom_element.attributes}')
+ # logger.debug(f'Element tag: {dom_element.tag_name}')
- xpath = '//' + dom_element.xpath
+ # xpath = '//' + dom_element.xpath
- try:
- frame_index = 0
- for frame in page.frames:
- try:
- logger.debug(f'Trying frame {frame_index} URL: {frame.url}')
+ # try:
+ # frame_index = 0
+ # for frame in page.frames:
+ # try:
+ # logger.debug(f'Trying frame {frame_index} URL: {frame.url}')
- # First check what type of element we're dealing with
- element_info_js = """
- (xpath) => {
- try {
- const element = document.evaluate(xpath, document, null,
- XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
- if (!element) return null;
+ # # First check what type of element we're dealing with
+ # element_info_js = """
+ # (xpath) => {
+ # try {
+ # const element = document.evaluate(xpath, document, null,
+ # XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
+ # if (!element) return null;
- const tagName = element.tagName.toLowerCase();
- const role = element.getAttribute('role');
+ # const tagName = element.tagName.toLowerCase();
+ # const role = element.getAttribute('role');
- // Check if it's a native select
- if (tagName === 'select') {
- return {
- type: 'select',
- found: true,
- id: element.id,
- name: element.name,
- tagName: element.tagName,
- optionCount: element.options.length,
- currentValue: element.value,
- availableOptions: Array.from(element.options).map(o => o.text.trim())
- };
- }
+ # // Check if it's a native select
+ # if (tagName === 'select') {
+ # return {
+ # type: 'select',
+ # found: true,
+ # id: element.id,
+ # name: element.name,
+ # tagName: element.tagName,
+ # optionCount: element.options.length,
+ # currentValue: element.value,
+ # availableOptions: Array.from(element.options).map(o => o.text.trim())
+ # };
+ # }
- // Check if it's an ARIA menu or similar
- if (role === 'menu' || role === 'listbox' || role === 'combobox') {
- const menuItems = element.querySelectorAll('[role="menuitem"], [role="option"]');
- return {
- type: 'aria',
- found: true,
- id: element.id || '',
- role: role,
- tagName: element.tagName,
- itemCount: menuItems.length,
- availableOptions: Array.from(menuItems).map(item => item.textContent.trim())
- };
- }
+ # // Check if it's an ARIA menu or similar
+ # if (role === 'menu' || role === 'listbox' || role === 'combobox') {
+ # const menuItems = element.querySelectorAll('[role="menuitem"], [role="option"]');
+ # return {
+ # type: 'aria',
+ # found: true,
+ # id: element.id || '',
+ # role: role,
+ # tagName: element.tagName,
+ # itemCount: menuItems.length,
+ # availableOptions: Array.from(menuItems).map(item => item.textContent.trim())
+ # };
+ # }
- return {
- error: `Element is neither a select nor an ARIA menu (tag: ${tagName}, role: ${role})`,
- found: false
- };
- } catch (e) {
- return {error: e.toString(), found: false};
- }
- }
- """
+ # return {
+ # error: `Element is neither a select nor an ARIA menu (tag: ${tagName}, role: ${role})`,
+ # found: false
+ # };
+ # } catch (e) {
+ # return {error: e.toString(), found: false};
+ # }
+ # }
+ # """
- element_info = await frame.evaluate(element_info_js, dom_element.xpath)
+ # element_info = await frame.evaluate(element_info_js, dom_element.xpath)
- if element_info and element_info.get('found'):
- logger.debug(f'Found {element_info.get("type")} element in frame {frame_index}: {element_info}')
+ # if element_info and element_info.get('found'):
+ # logger.debug(f'Found {element_info.get("type")} element in frame {frame_index}: {element_info}')
- if element_info.get('type') == 'select':
- # Handle native select element
- # "label" because we are selecting by text
- # nth(0) to disable error thrown by strict mode
- # timeout=1000 because we are already waiting for all network events
- selected_option_values = (
- await frame.locator('//' + dom_element.xpath).nth(0).select_option(label=text, timeout=1000)
- )
+ # if element_info.get('type') == 'select':
+ # # Handle native select element
+ # # "label" because we are selecting by text
+ # # nth(0) to disable error thrown by strict mode
+ # # timeout=1000 because we are already waiting for all network events
+ # selected_option_values = (
+ # await frame.locator('//' + dom_element.xpath).nth(0).select_option(label=text, timeout=1000)
+ # )
- msg = f'selected option {text} with value {selected_option_values}'
- logger.info(msg + f' in frame {frame_index}')
+ # msg = f'selected option {text} with value {selected_option_values}'
+ # logger.info(msg + f' in frame {frame_index}')
- return ActionResult(
- extracted_content=msg, include_in_memory=True, long_term_memory=f"Selected option '{text}'"
- )
+ # return ActionResult(
+ # extracted_content=msg, include_in_memory=True, long_term_memory=f"Selected option '{text}'"
+ # )
- elif element_info.get('type') == 'aria':
- # Handle ARIA menu
- click_aria_item_js = """
- (params) => {
- const { xpath, targetText } = params;
- try {
- const element = document.evaluate(xpath, document, null,
- XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
- if (!element) return {success: false, error: 'Element not found'};
+ # elif element_info.get('type') == 'aria':
+ # # Handle ARIA menu
+ # click_aria_item_js = """
+ # (params) => {
+ # const { xpath, targetText } = params;
+ # try {
+ # const element = document.evaluate(xpath, document, null,
+ # XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
+ # if (!element) return {success: false, error: 'Element not found'};
- // Find all menu items
- const menuItems = element.querySelectorAll('[role="menuitem"], [role="option"]');
+ # // Find all menu items
+ # const menuItems = element.querySelectorAll('[role="menuitem"], [role="option"]');
- for (const item of menuItems) {
- const itemText = item.textContent.trim();
- if (itemText === targetText) {
- // Simulate click on the menu item
- item.click();
+ # for (const item of menuItems) {
+ # const itemText = item.textContent.trim();
+ # if (itemText === targetText) {
+ # // Simulate click on the menu item
+ # item.click();
- // Also try dispatching a click event in case the click handler needs it
- const clickEvent = new MouseEvent('click', {
- view: window,
- bubbles: true,
- cancelable: true
- });
- item.dispatchEvent(clickEvent);
+ # // Also try dispatching a click event in case the click handler needs it
+ # const clickEvent = new MouseEvent('click', {
+ # view: window,
+ # bubbles: true,
+ # cancelable: true
+ # });
+ # item.dispatchEvent(clickEvent);
- return {
- success: true,
- message: `Clicked menu item: ${targetText}`
- };
- }
- }
+ # return {
+ # success: true,
+ # message: `Clicked menu item: ${targetText}`
+ # };
+ # }
+ # }
- return {
- success: false,
- error: `Menu item with text '${targetText}' not found`
- };
- } catch (e) {
- return {success: false, error: e.toString()};
- }
- }
- """
+ # return {
+ # success: false,
+ # error: `Menu item with text '${targetText}' not found`
+ # };
+ # } catch (e) {
+ # return {success: false, error: e.toString()};
+ # }
+ # }
+ # """
- result = await frame.evaluate(
- click_aria_item_js, {'xpath': dom_element.xpath, 'targetText': text}
- )
+ # result = await frame.evaluate(
+ # click_aria_item_js, {'xpath': dom_element.xpath, 'targetText': text}
+ # )
- if result.get('success'):
- msg = result.get('message', f'Selected ARIA menu item: {text}')
- logger.info(msg + f' in frame {frame_index}')
- return ActionResult(
- extracted_content=msg,
- include_in_memory=True,
- long_term_memory=f"Selected menu item '{text}'",
- )
- else:
- logger.error(f'Failed to select ARIA menu item: {result.get("error")}')
- continue
+ # if result.get('success'):
+ # msg = result.get('message', f'Selected ARIA menu item: {text}')
+ # logger.info(msg + f' in frame {frame_index}')
+ # return ActionResult(
+ # extracted_content=msg,
+ # include_in_memory=True,
+ # long_term_memory=f"Selected menu item '{text}'",
+ # )
+ # else:
+ # logger.error(f'Failed to select ARIA menu item: {result.get("error")}')
+ # continue
- elif element_info:
- logger.error(f'Frame {frame_index} error: {element_info.get("error")}')
- continue
+ # elif element_info:
+ # logger.error(f'Frame {frame_index} error: {element_info.get("error")}')
+ # continue
- except Exception as frame_e:
- logger.error(f'Frame {frame_index} attempt failed: {str(frame_e)}')
- logger.error(f'Frame type: {type(frame)}')
- logger.error(f'Frame URL: {frame.url}')
+ # except Exception as frame_e:
+ # logger.error(f'Frame {frame_index} attempt failed: {str(frame_e)}')
+ # logger.error(f'Frame type: {type(frame)}')
+ # logger.error(f'Frame URL: {frame.url}')
- frame_index += 1
+ # frame_index += 1
- msg = f"Could not select option '{text}' in any frame"
- logger.info(msg)
- return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=msg)
+ # msg = f"Could not select option '{text}' in any frame"
+ # logger.info(msg)
+ # return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=msg)
- except Exception as e:
- msg = f'Selection failed: {str(e)}'
- logger.error(msg)
- raise BrowserError(msg)
+ # except Exception as e:
+ # msg = f'Selection failed: {str(e)}'
+ # logger.error(msg)
+ # raise BrowserError(msg)
- @self.registry.action('Google Sheets: Get the contents of the entire sheet', domains=['https://docs.google.com'])
- async def read_sheet_contents(browser_session: BrowserSession):
- # Use send keys events to select and copy all cells
- for key in ['Enter', 'Escape', 'ControlOrMeta+A', 'ControlOrMeta+C']:
- event = browser_session.event_bus.dispatch(SendKeysEvent(keys=key))
- await event
+ # @self.registry.action('Google Sheets: Get the contents of the entire sheet', domains=['https://docs.google.com'])
+ # async def read_sheet_contents(browser_session: BrowserSession):
+ # # Use send keys events to select and copy all cells
+ # for key in ['Enter', 'Escape', 'ControlOrMeta+A', 'ControlOrMeta+C']:
+ # event = browser_session.event_bus.dispatch(SendKeysEvent(keys=key))
+ # await event
- # Get page to evaluate clipboard
- page = await browser_session.get_current_page()
- extracted_tsv = await page.evaluate('() => navigator.clipboard.readText()')
- return ActionResult(
- extracted_content=extracted_tsv,
- include_in_memory=True,
- long_term_memory='Retrieved sheet contents',
- include_extracted_content_only_once=True,
- )
+ # # Get page to evaluate clipboard
+ # page = await browser_session.get_current_page()
+ # extracted_tsv = await page.evaluate('() => navigator.clipboard.readText()')
+ # return ActionResult(
+ # extracted_content=extracted_tsv,
+ # include_in_memory=True,
+ # long_term_memory='Retrieved sheet contents',
+ # include_extracted_content_only_once=True,
+ # )
- @self.registry.action('Google Sheets: Get the contents of a cell or range of cells', domains=['https://docs.google.com'])
- async def read_cell_contents(cell_or_range: str, browser_session: BrowserSession):
- page = await browser_session.get_current_page()
+ # @self.registry.action('Google Sheets: Get the contents of a cell or range of cells', domains=['https://docs.google.com'])
+ # async def read_cell_contents(cell_or_range: str, browser_session: BrowserSession):
+ # page = await browser_session.get_current_page()
- await select_cell_or_range(cell_or_range=cell_or_range, page=page)
+ # await select_cell_or_range(cell_or_range=cell_or_range, page=page)
- await page.keyboard.press('ControlOrMeta+C')
- await asyncio.sleep(0.1)
- extracted_tsv = await page.evaluate('() => navigator.clipboard.readText()')
- return ActionResult(
- extracted_content=extracted_tsv,
- include_in_memory=True,
- long_term_memory=f'Retrieved contents from {cell_or_range}',
- include_extracted_content_only_once=True,
- )
+ # await page.keyboard.press('ControlOrMeta+C')
+ # await asyncio.sleep(0.1)
+ # extracted_tsv = await page.evaluate('() => navigator.clipboard.readText()')
+ # return ActionResult(
+ # extracted_content=extracted_tsv,
+ # include_in_memory=True,
+ # long_term_memory=f'Retrieved contents from {cell_or_range}',
+ # include_extracted_content_only_once=True,
+ # )
- @self.registry.action(
- 'Google Sheets: Update the content of a cell or range of cells', domains=['https://docs.google.com']
- )
- async def update_cell_contents(cell_or_range: str, new_contents_tsv: str, browser_session: BrowserSession):
- page = await browser_session.get_current_page()
+ # @self.registry.action(
+ # 'Google Sheets: Update the content of a cell or range of cells', domains=['https://docs.google.com']
+ # )
+ # async def update_cell_contents(cell_or_range: str, new_contents_tsv: str, browser_session: BrowserSession):
+ # page = await browser_session.get_current_page()
- await select_cell_or_range(cell_or_range=cell_or_range, page=page)
+ # await select_cell_or_range(cell_or_range=cell_or_range, page=page)
- # simulate paste event from clipboard with TSV content
- await page.evaluate(f"""
- const clipboardData = new DataTransfer();
- clipboardData.setData('text/plain', `{new_contents_tsv}`);
- document.activeElement.dispatchEvent(new ClipboardEvent('paste', {{clipboardData}}));
- """)
+ # # simulate paste event from clipboard with TSV content
+ # await page.evaluate(f"""
+ # const clipboardData = new DataTransfer();
+ # clipboardData.setData('text/plain', `{new_contents_tsv}`);
+ # document.activeElement.dispatchEvent(new ClipboardEvent('paste', {{clipboardData}}));
+ # """)
- return ActionResult(
- extracted_content=f'Updated cells: {cell_or_range} = {new_contents_tsv}',
- include_in_memory=False,
- long_term_memory=f'Updated cells {cell_or_range} with {new_contents_tsv}',
- )
+ # return ActionResult(
+ # extracted_content=f'Updated cells: {cell_or_range} = {new_contents_tsv}',
+ # include_in_memory=False,
+ # long_term_memory=f'Updated cells {cell_or_range} with {new_contents_tsv}',
+ # )
- @self.registry.action('Google Sheets: Clear whatever cells are currently selected', domains=['https://docs.google.com'])
- async def clear_cell_contents(cell_or_range: str, browser_session: BrowserSession):
- page = await browser_session.get_current_page()
+ # @self.registry.action('Google Sheets: Clear whatever cells are currently selected', domains=['https://docs.google.com'])
+ # async def clear_cell_contents(cell_or_range: str, browser_session: BrowserSession):
+ # page = await browser_session.get_current_page()
- await select_cell_or_range(cell_or_range=cell_or_range, page=page)
+ # await select_cell_or_range(cell_or_range=cell_or_range, page=page)
- await page.keyboard.press('Backspace')
- return ActionResult(
- extracted_content=f'Cleared cells: {cell_or_range}',
- include_in_memory=False,
- long_term_memory=f'Cleared cells {cell_or_range}',
- )
+ # await page.keyboard.press('Backspace')
+ # return ActionResult(
+ # extracted_content=f'Cleared cells: {cell_or_range}',
+ # include_in_memory=False,
+ # long_term_memory=f'Cleared cells {cell_or_range}',
+ # )
- @self.registry.action('Google Sheets: Select a specific cell or range of cells', domains=['https://docs.google.com'])
- async def select_cell_or_range(cell_or_range: str, browser_session: BrowserSession):
- # Use send keys events for navigation
- for key in ['Enter', 'Escape']:
- event = browser_session.event_bus.dispatch(SendKeysEvent(keys=key))
- await event
- await asyncio.sleep(0.1)
- for key in ['Home', 'ArrowUp']:
- event = browser_session.event_bus.dispatch(SendKeysEvent(keys=key))
- await event
- await asyncio.sleep(0.1)
- event = browser_session.event_bus.dispatch(SendKeysEvent(keys='Control+G'))
- await event
- await asyncio.sleep(0.2)
- # Get page to type the cell range
- page = await browser_session.get_current_page()
- await page.keyboard.type(cell_or_range, delay=0.05)
- await asyncio.sleep(0.2)
- for key in ['Enter', 'Escape']:
- event = browser_session.event_bus.dispatch(SendKeysEvent(keys=key))
- await event
- await asyncio.sleep(0.2)
- return ActionResult(
- extracted_content=f'Selected cells: {cell_or_range}',
- include_in_memory=False,
- long_term_memory=f'Selected cells {cell_or_range}',
- )
+ # @self.registry.action('Google Sheets: Select a specific cell or range of cells', domains=['https://docs.google.com'])
+ # async def select_cell_or_range(cell_or_range: str, browser_session: BrowserSession):
+ # # Use send keys events for navigation
+ # for key in ['Enter', 'Escape']:
+ # event = browser_session.event_bus.dispatch(SendKeysEvent(keys=key))
+ # await event
+ # await asyncio.sleep(0.1)
+ # for key in ['Home', 'ArrowUp']:
+ # event = browser_session.event_bus.dispatch(SendKeysEvent(keys=key))
+ # await event
+ # await asyncio.sleep(0.1)
+ # event = browser_session.event_bus.dispatch(SendKeysEvent(keys='Control+G'))
+ # await event
+ # await asyncio.sleep(0.2)
+ # # Get page to type the cell range
+ # page = await browser_session.get_current_page()
+ # await page.keyboard.type(cell_or_range, delay=0.05)
+ # await asyncio.sleep(0.2)
+ # for key in ['Enter', 'Escape']:
+ # event = browser_session.event_bus.dispatch(SendKeysEvent(keys=key))
+ # await event
+ # await asyncio.sleep(0.2)
+ # return ActionResult(
+ # extracted_content=f'Selected cells: {cell_or_range}',
+ # include_in_memory=False,
+ # long_term_memory=f'Selected cells {cell_or_range}',
+ # )
- @self.registry.action(
- 'Google Sheets: Fallback method to type text into (only one) currently selected cell',
- domains=['https://docs.google.com'],
- )
- async def fallback_input_into_single_selected_cell(text: str, browser_session: BrowserSession):
- # Get page to type text
- page = await browser_session.get_current_page()
- await page.keyboard.type(text, delay=0.1)
- # Use send keys for Enter and ArrowUp
- for key in ['Enter', 'ArrowUp']:
- event = browser_session.event_bus.dispatch(SendKeysEvent(keys=key))
- await event
- return ActionResult(
- extracted_content=f'Inputted text {text}',
- include_in_memory=False,
- long_term_memory=f"Inputted text '{text}' into cell",
- )
+ # @self.registry.action(
+ # 'Google Sheets: Fallback method to type text into (only one) currently selected cell',
+ # domains=['https://docs.google.com'],
+ # )
+ # async def fallback_input_into_single_selected_cell(text: str, browser_session: BrowserSession):
+ # # Get page to type text
+ # page = await browser_session.get_current_page()
+ # await page.keyboard.type(text, delay=0.1)
+ # # Use send keys for Enter and ArrowUp
+ # for key in ['Enter', 'ArrowUp']:
+ # event = browser_session.event_bus.dispatch(SendKeysEvent(keys=key))
+ # await event
+ # return ActionResult(
+ # extracted_content=f'Inputted text {text}',
+ # include_in_memory=False,
+ # long_term_memory=f"Inputted text '{text}' into cell",
+ # )
# Custom done action for structured output
def _register_done_action(self, output_model: type[T] | None, display_files_in_done_text: bool = True):
diff --git a/browser_use/dom/serializer/serializer.py b/browser_use/dom/serializer/serializer.py
index 57ce91bed..9d3172cc5 100644
--- a/browser_use/dom/serializer/serializer.py
+++ b/browser_use/dom/serializer/serializer.py
@@ -187,6 +187,7 @@ class DOMTreeSerializer:
if is_interactive_assign:
node.interactive_index = self._interactive_counter
+ node.original_node.element_index = self._interactive_counter
self._selector_map[self._interactive_counter] = node.original_node
self._interactive_counter += 1
diff --git a/browser_use/dom/service.py b/browser_use/dom/service.py
index 43f9e1ec0..b082c75b2 100644
--- a/browser_use/dom/service.py
+++ b/browser_use/dom/service.py
@@ -46,8 +46,8 @@ class DomService:
logger: logging.Logger
- def __init__(self, browser: 'BrowserSession', page: 'Page', logger: logging.Logger | None = None):
- self.browser = browser
+ def __init__(self, browser_session: 'BrowserSession', page: 'Page', logger: logging.Logger | None = None):
+ self.browser_session = browser_session
self.page = page
self.cdp_client: CDPClient | None = None
@@ -57,16 +57,16 @@ class DomService:
self.session_id_domains_enabled_cache: dict[str, bool] = {}
async def _get_cdp_client(self) -> CDPClient:
- if not self.browser.cdp_url:
+ if not self.browser_session.cdp_url:
raise ValueError('CDP URL is not set')
# TODO: MOVE THIS TO BROWSER SESSION (or sth idk)
# If the cdp_url is already a websocket URL, use it as-is.
- if self.browser.cdp_url.startswith('ws'):
- ws_url = self.browser.cdp_url
+ if self.browser_session.cdp_url.startswith('ws'):
+ ws_url = self.browser_session.cdp_url
else:
# Otherwise, treat it as the DevTools HTTP root and fetch the websocket URL.
- url = self.browser.cdp_url.rstrip('/')
+ url = self.browser_session.cdp_url.rstrip('/')
if not url.endswith('/json/version'):
url = url + '/json/version'
async with httpx.AsyncClient() as client:
@@ -321,7 +321,7 @@ class DomService:
return {'nodes': merged_nodes}
async def _get_all_trees_for_session_id(self, session_id: str) -> TargetAllTrees:
- if not self.browser.cdp_url:
+ if not self.browser_session.cdp_url:
raise ValueError('CDP URL is not set')
cdp_client = await self._get_cdp_client()
@@ -463,6 +463,7 @@ class DomService:
snapshot_node=snapshot_data,
is_visible=None,
absolute_position=absolute_position,
+ element_index=None,
)
enhanced_dom_tree_node_lookup[node['nodeId']] = dom_tree_node
diff --git a/browser_use/dom/views.py b/browser_use/dom/views.py
index 04c831215..dac31d0c9 100644
--- a/browser_use/dom/views.py
+++ b/browser_use/dom/views.py
@@ -1,7 +1,7 @@
import hashlib
from dataclasses import asdict, dataclass, field
from enum import Enum
-from typing import Any
+from typing import TYPE_CHECKING, Any
from cdp_use.cdp.accessibility.commands import GetFullAXTreeReturns
from cdp_use.cdp.accessibility.types import AXPropertyName
@@ -225,6 +225,9 @@ class EnhancedDOMTreeNode:
# endregion - Snapshot Node data
+ # Interactive element index
+ element_index: int | None = None
+
uuid: str = field(default_factory=uuid7str)
@property
@@ -319,6 +322,19 @@ class EnhancedDOMTreeNode:
'children_nodes': [c.__json__() for c in self.children_nodes] if self.children_nodes else [],
}
+ async def create_cdp_session(self, browser_session):
+ """Create a CDP session for this node's target.
+
+ Args:
+ browser_session: The BrowserSession to use for creating the CDP client
+
+ Returns:
+ CDPClient attached to this node's target
+
+ Note: Caller is responsible for cleanup using await cdp_client.stop()
+ """
+ return await browser_session.create_cdp_session_for_node(self)
+
def get_all_children_text(self, max_depth: int = -1) -> str:
text_parts = []
@@ -364,6 +380,9 @@ class EnhancedDOMTreeNode:
def element_hash(self) -> int:
return hash(self)
+ def __str__(self) -> str:
+ return f'[<{self.tag_name}>#{self.frame_id[-4:] if self.frame_id else "?"}:{self.element_index}]'
+
def __hash__(self) -> int:
"""
Hash the element based on its parent branch path and attributes.