mirror of
https://github.com/browser-use/browser-use
synced 2026-05-06 17:52:15 +02:00
1223 lines
45 KiB
Python
1223 lines
45 KiB
Python
import asyncio
|
|
import enum
|
|
import json
|
|
import logging
|
|
import re
|
|
from collections.abc import Awaitable, Callable
|
|
from typing import Any, Generic, TypeVar, cast
|
|
|
|
from pydantic import BaseModel
|
|
|
|
from browser_use.agent.views import ActionModel, ActionResult
|
|
from browser_use.browser import BrowserSession
|
|
from browser_use.browser.types import ElementHandle, Page
|
|
from browser_use.controller.registry.service import Registry
|
|
from browser_use.controller.views import (
|
|
ClickElementAction,
|
|
CloseTabAction,
|
|
DoneAction,
|
|
DragDropAction,
|
|
GoToUrlAction,
|
|
InputTextAction,
|
|
NoParamsAction,
|
|
OpenTabAction,
|
|
Position,
|
|
ScrollAction,
|
|
SearchGoogleAction,
|
|
SendKeysAction,
|
|
SwitchTabAction,
|
|
)
|
|
from browser_use.filesystem.file_system import FileSystem
|
|
from browser_use.llm.base import BaseChatModel
|
|
from browser_use.llm.messages import UserMessage
|
|
from browser_use.utils import time_execution_sync
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def retry_async_function(
|
|
func: Callable[[], Awaitable[Any]], error_message: str, n_retries: int = 3, sleep_seconds: float = 1
|
|
) -> tuple[Any | None, ActionResult | None]:
|
|
"""
|
|
Retry an async function n times before giving up and returning an ActionResult with an error.
|
|
|
|
Args:
|
|
func: Async function to retry
|
|
error_message: Error message to use in ActionResult if all retries fail
|
|
n_retries: Number of retries (default 3)
|
|
sleep_seconds: Seconds to sleep between retries (default 1)
|
|
|
|
Returns:
|
|
Tuple of (result, None) on success or (None, ActionResult) on failure
|
|
"""
|
|
for attempt in range(n_retries):
|
|
try:
|
|
result = await func()
|
|
return result, None
|
|
except Exception as e:
|
|
await asyncio.sleep(sleep_seconds)
|
|
logger.debug(f'Error (attempt {attempt + 1}/{n_retries}): {e}')
|
|
if attempt == n_retries - 1: # Last attempt failed
|
|
return None, ActionResult(error=error_message + str(e))
|
|
|
|
# Should never reach here but make type checker happy
|
|
return None, ActionResult(error=error_message)
|
|
|
|
|
|
Context = TypeVar('Context')
|
|
|
|
|
|
class Controller(Generic[Context]):
|
|
def __init__(
|
|
self,
|
|
exclude_actions: list[str] = [],
|
|
output_model: type[BaseModel] | None = None,
|
|
):
|
|
self.registry = Registry[Context](exclude_actions)
|
|
|
|
"""Register all default browser actions"""
|
|
|
|
if output_model is not None:
|
|
# Create a new model that extends the output model with success parameter
|
|
class ExtendedOutputModel(BaseModel): # type: ignore
|
|
success: bool = True
|
|
data: output_model # type: ignore
|
|
|
|
@self.registry.action(
|
|
'Complete task - with return text and if the task is finished (success=True) or not yet completely finished (success=False), because last step is reached',
|
|
param_model=ExtendedOutputModel,
|
|
)
|
|
async def done(params: ExtendedOutputModel):
|
|
# Exclude success from the output JSON since it's an internal parameter
|
|
output_dict = params.data.model_dump()
|
|
|
|
# Enums are not serializable, convert to string
|
|
for key, value in output_dict.items():
|
|
if isinstance(value, enum.Enum):
|
|
output_dict[key] = value.value
|
|
|
|
return ActionResult(
|
|
is_done=True,
|
|
success=params.success,
|
|
extracted_content=json.dumps(output_dict),
|
|
long_term_memory=f'Task completed. Success Status: {params.success}',
|
|
)
|
|
else:
|
|
|
|
@self.registry.action(
|
|
'Complete task - provide a summary of results for the user. Set success=True if task completed successfully, false otherwise. Text should be your response to the user summarizing results. Include files you would like to display to the user in files_to_display.',
|
|
param_model=DoneAction,
|
|
)
|
|
async def done(params: DoneAction, file_system: FileSystem):
|
|
user_message = params.text
|
|
|
|
len_text = len(params.text)
|
|
len_max_memory = 100
|
|
memory = f'Task completed: {params.success} - {params.text[:len_max_memory]}'
|
|
if len_text > len_max_memory:
|
|
memory += f' - {len_text - len_max_memory} more characters'
|
|
|
|
attachments = []
|
|
if params.files_to_display:
|
|
for file_name in params.files_to_display:
|
|
if file_name == 'todo.md':
|
|
continue
|
|
file_content = file_system.display_file(file_name)
|
|
if file_content:
|
|
attachments.append(file_name)
|
|
|
|
attachments = [str(file_system.get_dir() / file_name) for file_name in attachments]
|
|
|
|
return ActionResult(
|
|
is_done=True,
|
|
success=params.success,
|
|
extracted_content=user_message,
|
|
long_term_memory=memory,
|
|
attachments=attachments,
|
|
)
|
|
|
|
# Basic Navigation Actions
|
|
@self.registry.action(
|
|
'Search the query in Google, the query should be a search query like humans search in Google, concrete and not vague or super long.',
|
|
param_model=SearchGoogleAction,
|
|
)
|
|
async def search_google(params: SearchGoogleAction, browser_session: BrowserSession):
|
|
search_url = f'https://www.google.com/search?q={params.query}&udm=14'
|
|
|
|
page = await browser_session.get_current_page()
|
|
if page.url.strip('/') == 'https://www.google.com':
|
|
# SECURITY FIX: Use browser_session.navigate_to() instead of direct page.goto()
|
|
# This ensures URL validation against allowed_domains is performed
|
|
await browser_session.navigate_to(search_url)
|
|
else:
|
|
# create_new_tab already includes proper URL validation
|
|
page = await browser_session.create_new_tab(search_url)
|
|
|
|
msg = f'🔍 Searched for "{params.query}" in Google'
|
|
logger.info(msg)
|
|
return ActionResult(
|
|
extracted_content=msg, include_in_memory=True, long_term_memory=f"Searched Google for '{params.query}'"
|
|
)
|
|
|
|
@self.registry.action('Navigate to URL in the current tab', param_model=GoToUrlAction)
|
|
async def go_to_url(params: GoToUrlAction, browser_session: BrowserSession):
|
|
try:
|
|
# SECURITY FIX: Use browser_session.navigate_to() instead of direct page.goto()
|
|
# This ensures URL validation against allowed_domains is performed
|
|
await browser_session.navigate_to(params.url)
|
|
memory = f'Navigated to {params.url}'
|
|
msg = f'🔗 {memory}'
|
|
logger.info(msg)
|
|
return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=memory)
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
# Check for network-related errors
|
|
if any(
|
|
err in error_msg
|
|
for err in [
|
|
'ERR_NAME_NOT_RESOLVED',
|
|
'ERR_INTERNET_DISCONNECTED',
|
|
'ERR_CONNECTION_REFUSED',
|
|
'ERR_TIMED_OUT',
|
|
'net::',
|
|
]
|
|
):
|
|
site_unavailable_msg = f'Site unavailable: {params.url} - {error_msg}'
|
|
logger.warning(site_unavailable_msg)
|
|
return ActionResult(
|
|
success=False, error=site_unavailable_msg, include_in_memory=True, long_term_memory=site_unavailable_msg
|
|
)
|
|
else:
|
|
# Re-raise non-network errors (including URLNotAllowedError for unauthorized domains)
|
|
raise
|
|
|
|
@self.registry.action('Go back', param_model=NoParamsAction)
|
|
async def go_back(_: NoParamsAction, browser_session: BrowserSession):
|
|
await browser_session.go_back()
|
|
msg = '🔙 Navigated back'
|
|
logger.info(msg)
|
|
return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory='Navigated back')
|
|
|
|
# wait for x seconds
|
|
@self.registry.action('Wait for x seconds default 3')
|
|
async def wait(seconds: int = 3):
|
|
msg = f'🕒 Waiting for {seconds} seconds'
|
|
logger.info(msg)
|
|
await asyncio.sleep(seconds)
|
|
return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=f'Waited for {seconds} seconds')
|
|
|
|
# Element Interaction Actions
|
|
@self.registry.action('Click element by index', param_model=ClickElementAction)
|
|
async def click_element_by_index(params: ClickElementAction, browser_session: BrowserSession):
|
|
# Browser is now a BrowserSession itself
|
|
|
|
# Check if element exists in current selector map
|
|
selector_map = await browser_session.get_selector_map()
|
|
if params.index not in selector_map:
|
|
# Force a state refresh in case the cache is stale
|
|
logger.info(f'Element with index {params.index} not found in selector map, refreshing state...')
|
|
await browser_session.get_state_summary(
|
|
cache_clickable_elements_hashes=True
|
|
) # This will refresh the cached state
|
|
selector_map = await browser_session.get_selector_map()
|
|
|
|
if params.index not in selector_map:
|
|
# Return informative message with the new state instead of error
|
|
max_index = max(selector_map.keys()) if selector_map else -1
|
|
msg = f'Element with index {params.index} does not exist. Page has {len(selector_map)} interactive elements (indices 0-{max_index}). State has been refreshed - please use the updated element indices.'
|
|
return ActionResult(extracted_content=msg, include_in_memory=True, success=False, long_term_memory=msg)
|
|
|
|
element_node = await browser_session.get_dom_element_by_index(params.index)
|
|
initial_pages = len(browser_session.tabs)
|
|
|
|
# if element has file uploader then dont click
|
|
# Check if element is actually a file input (not just contains file-related keywords)
|
|
if element_node is not None and browser_session.is_file_input(element_node):
|
|
msg = f'Index {params.index} - has an element which opens file upload dialog. To upload files please use a specific function to upload files '
|
|
logger.info(msg)
|
|
return ActionResult(extracted_content=msg, include_in_memory=True, success=False, long_term_memory=msg)
|
|
|
|
msg = None
|
|
|
|
try:
|
|
assert element_node is not None, f'Element with index {params.index} does not exist'
|
|
download_path = await browser_session._click_element_node(element_node)
|
|
if download_path:
|
|
emoji = '💾'
|
|
msg = f'Downloaded file to {download_path}'
|
|
else:
|
|
emoji = '🖱️'
|
|
msg = f'Clicked button with index {params.index}: {element_node.get_all_text_till_next_clickable_element(max_depth=2)}'
|
|
|
|
logger.info(f'{emoji} {msg}')
|
|
logger.debug(f'Element xpath: {element_node.xpath}')
|
|
if len(browser_session.tabs) > initial_pages:
|
|
new_tab_msg = 'New tab opened - switching to it'
|
|
msg += f' - {new_tab_msg}'
|
|
emoji = '🔗'
|
|
logger.info(f'{emoji} {new_tab_msg}')
|
|
await browser_session.switch_to_tab(-1)
|
|
return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=msg)
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if 'Execution context was destroyed' in error_msg or 'Cannot find context with specified id' in error_msg:
|
|
# Page navigated during click - refresh state and return it
|
|
logger.info('Page context changed during click, refreshing state...')
|
|
await browser_session.get_state_summary(cache_clickable_elements_hashes=True)
|
|
return ActionResult(
|
|
error='Page navigated during click. Refreshed state provided.', include_in_memory=True, success=False
|
|
)
|
|
else:
|
|
logger.warning(f'Element not clickable with index {params.index} - most likely the page changed')
|
|
return ActionResult(error=error_msg, success=False)
|
|
|
|
@self.registry.action(
|
|
'Click and input text into a input interactive element',
|
|
param_model=InputTextAction,
|
|
)
|
|
async def input_text(params: InputTextAction, browser_session: BrowserSession, has_sensitive_data: bool = False):
|
|
if params.index not in await browser_session.get_selector_map():
|
|
raise Exception(f'Element index {params.index} does not exist - retry or use alternative actions')
|
|
|
|
element_node = await browser_session.get_dom_element_by_index(params.index)
|
|
assert element_node is not None, f'Element with index {params.index} does not exist'
|
|
try:
|
|
await browser_session._input_text_element_node(element_node, params.text)
|
|
except Exception:
|
|
msg = f'Failed to input text into element {params.index}.'
|
|
return ActionResult(error=msg)
|
|
|
|
if not has_sensitive_data:
|
|
msg = f'⌨️ Input {params.text} into index {params.index}'
|
|
else:
|
|
msg = f'⌨️ Input sensitive data into index {params.index}'
|
|
logger.info(msg)
|
|
logger.debug(f'Element xpath: {element_node.xpath}')
|
|
return ActionResult(
|
|
extracted_content=msg,
|
|
include_in_memory=True,
|
|
long_term_memory=f"Input '{params.text}' into element {params.index}.",
|
|
)
|
|
|
|
# Save PDF
|
|
@self.registry.action('Save the current page as a PDF file')
|
|
async def save_pdf(page: Page):
|
|
short_url = re.sub(r'^https?://(?:www\.)?|/$', '', page.url)
|
|
slug = re.sub(r'[^a-zA-Z0-9]+', '-', short_url).strip('-').lower()
|
|
sanitized_filename = f'{slug}.pdf'
|
|
|
|
await page.emulate_media(media='screen')
|
|
await page.pdf(path=sanitized_filename, format='A4', print_background=False)
|
|
msg = f'Saving page with URL {page.url} as PDF to ./{sanitized_filename}'
|
|
logger.info(msg)
|
|
return ActionResult(
|
|
extracted_content=msg, include_in_memory=True, long_term_memory=f'Saved PDF to {sanitized_filename}'
|
|
)
|
|
|
|
# Tab Management Actions
|
|
@self.registry.action('Switch tab', param_model=SwitchTabAction)
|
|
async def switch_tab(params: SwitchTabAction, browser_session: BrowserSession):
|
|
await browser_session.switch_to_tab(params.page_id)
|
|
page = await browser_session.get_current_page()
|
|
try:
|
|
await page.wait_for_load_state(state='domcontentloaded', timeout=5_000)
|
|
# page was already loaded when we first navigated, this is additional to wait for onfocus/onblur animations/ajax to settle
|
|
except Exception as e:
|
|
pass
|
|
msg = f'🔄 Switched to tab #{params.page_id} with url {page.url}'
|
|
logger.info(msg)
|
|
return ActionResult(
|
|
extracted_content=msg, include_in_memory=True, long_term_memory=f'Switched to tab {params.page_id}'
|
|
)
|
|
|
|
@self.registry.action('Open a specific url in new tab', param_model=OpenTabAction)
|
|
async def open_tab(params: OpenTabAction, browser_session: BrowserSession):
|
|
page = await browser_session.create_new_tab(params.url)
|
|
tab_idx = browser_session.tabs.index(page)
|
|
msg = f'🔗 Opened new tab #{tab_idx} with url {params.url}'
|
|
logger.info(msg)
|
|
return ActionResult(
|
|
extracted_content=msg, include_in_memory=True, long_term_memory=f'Opened new tab with URL {params.url}'
|
|
)
|
|
|
|
@self.registry.action('Close an existing tab', param_model=CloseTabAction)
|
|
async def close_tab(params: CloseTabAction, browser_session: BrowserSession):
|
|
await browser_session.switch_to_tab(params.page_id)
|
|
page = await browser_session.get_current_page()
|
|
url = page.url
|
|
await page.close()
|
|
new_page = await browser_session.get_current_page()
|
|
new_page_idx = browser_session.tabs.index(new_page)
|
|
msg = f'❌ Closed tab #{params.page_id} with {url}, now focused on tab #{new_page_idx} with url {new_page.url}'
|
|
logger.info(msg)
|
|
return ActionResult(
|
|
extracted_content=msg,
|
|
include_in_memory=True,
|
|
long_term_memory=f'Closed tab {params.page_id} with url {url}, now focused on tab {new_page_idx} with url {new_page.url}.',
|
|
)
|
|
|
|
# Content Actions
|
|
@self.registry.action(
|
|
"""Extract structured, semantic data (e.g. product description, price, all information about XYZ) from the current webpage based on a textual query.
|
|
Only use this for extracting info from a single product/article page, not for entire listings or search results pages.
|
|
""",
|
|
)
|
|
async def extract_structured_data(
|
|
query: str,
|
|
page: Page,
|
|
page_extraction_llm: BaseChatModel,
|
|
file_system: FileSystem,
|
|
):
|
|
from functools import partial
|
|
|
|
import markdownify
|
|
|
|
strip = []
|
|
include_links = False
|
|
lower_query = query.lower()
|
|
url_keywords = ['url', 'links']
|
|
if any(keyword in lower_query for keyword in url_keywords):
|
|
include_links = True
|
|
|
|
if not include_links:
|
|
strip = ['a', 'img']
|
|
|
|
# Run markdownify in a thread pool to avoid blocking the event loop
|
|
loop = asyncio.get_event_loop()
|
|
|
|
# Try getting page content with retries
|
|
page_html_result, action_result = await retry_async_function(
|
|
lambda: page.content(), "Couldn't extract page content due to an error."
|
|
)
|
|
if action_result:
|
|
return action_result
|
|
page_html = page_html_result
|
|
|
|
markdownify_func = partial(markdownify.markdownify, strip=strip)
|
|
content = await loop.run_in_executor(None, markdownify_func, page_html)
|
|
|
|
# manually append iframe text into the content so it's readable by the LLM (includes cross-origin iframes)
|
|
for iframe in page.frames:
|
|
try:
|
|
await iframe.wait_for_load_state(timeout=5000) # extra on top of already loaded page
|
|
except Exception as e:
|
|
pass
|
|
|
|
if iframe.url != page.url and not iframe.url.startswith('data:'):
|
|
content += f'\n\nIFRAME {iframe.url}:\n'
|
|
# Run markdownify in a thread pool for iframe content as well
|
|
try:
|
|
iframe_html = await iframe.content()
|
|
iframe_markdown = await loop.run_in_executor(None, markdownify_func, iframe_html)
|
|
except Exception as e:
|
|
logger.debug(f'Error extracting iframe content from within page {page.url}: {type(e).__name__}: {e}')
|
|
iframe_markdown = ''
|
|
content += iframe_markdown
|
|
|
|
# limit to 60000 characters - remove text in the middle this is approx 20000 tokens
|
|
max_chars = 60000
|
|
if len(content) > max_chars:
|
|
content = (
|
|
content[: max_chars // 2]
|
|
+ '\n... left out the middle because it was too long ...\n'
|
|
+ content[-max_chars // 2 :]
|
|
)
|
|
|
|
prompt = """You convert websites into structured information. Extract information from this webpage based on the query. Focus only on content relevant to the query. If
|
|
1. The query is vague
|
|
2. Does not make sense for the page
|
|
3. Some/all of the information is not available
|
|
|
|
Explain the content of the page and that the requested information is not available in the page. Respond in JSON format.\nQuery: {query}\n Website:\n{page}"""
|
|
try:
|
|
formatted_prompt = prompt.format(query=query, page=content)
|
|
response = await page_extraction_llm.ainvoke([UserMessage(content=formatted_prompt)])
|
|
|
|
extracted_content = f'Page Link: {page.url}\nQuery: {query}\nExtracted Content:\n{response.completion}'
|
|
|
|
# if content is small include it to memory
|
|
MAX_MEMORY_SIZE = 600
|
|
if len(extracted_content) < MAX_MEMORY_SIZE:
|
|
memory = extracted_content
|
|
include_extracted_content_only_once = False
|
|
else:
|
|
# find lines until MAX_MEMORY_SIZE
|
|
lines = extracted_content.splitlines()
|
|
display = ''
|
|
display_lines_count = 0
|
|
for line in lines:
|
|
if len(display) + len(line) < MAX_MEMORY_SIZE:
|
|
display += line + '\n'
|
|
display_lines_count += 1
|
|
else:
|
|
break
|
|
save_result = await file_system.save_extracted_content(extracted_content)
|
|
memory = f'Extracted content from {page.url}\n<query>{query}\n</query>\n<extracted_content>\n{display}{len(lines) - display_lines_count} more lines...\n</extracted_content>\n<file_system>{save_result}</file_system>'
|
|
include_extracted_content_only_once = True
|
|
logger.info(f'📄 {memory}')
|
|
return ActionResult(
|
|
extracted_content=extracted_content,
|
|
include_extracted_content_only_once=include_extracted_content_only_once,
|
|
long_term_memory=memory,
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f'Error extracting content: {e}')
|
|
msg = f'📄 Extracted from page\n: {content}\n'
|
|
logger.info(msg)
|
|
return ActionResult(error=str(e))
|
|
|
|
@self.registry.action(
|
|
'Get the accessibility tree of the page in the format "role name" with the number_of_elements to return',
|
|
)
|
|
async def get_ax_tree(number_of_elements: int, page: Page):
|
|
node = await page.accessibility.snapshot(interesting_only=True)
|
|
|
|
def flatten_ax_tree(node, lines):
|
|
if not node:
|
|
return
|
|
role = node.get('role', '')
|
|
name = node.get('name', '')
|
|
lines.append(f'{role} {name}')
|
|
for child in node.get('children', []):
|
|
flatten_ax_tree(child, lines)
|
|
|
|
lines = []
|
|
flatten_ax_tree(node, lines)
|
|
msg = '\n'.join(lines)
|
|
logger.info(msg)
|
|
return ActionResult(
|
|
extracted_content=msg,
|
|
include_in_memory=False,
|
|
long_term_memory='Retrieved accessibility tree',
|
|
include_extracted_content_only_once=True,
|
|
)
|
|
|
|
@self.registry.action(
|
|
'Scroll down the page by pixel amount - if none is given, scroll one page',
|
|
param_model=ScrollAction,
|
|
)
|
|
async def scroll_down(params: ScrollAction, browser_session: BrowserSession):
|
|
"""
|
|
(a) Use browser._scroll_container for container-aware scrolling.
|
|
(b) If that JavaScript throws, fall back to window.scrollBy().
|
|
"""
|
|
page = await browser_session.get_current_page()
|
|
if params.amount:
|
|
dy = params.amount
|
|
else:
|
|
# Get window height with retries
|
|
dy_result, action_result = await retry_async_function(
|
|
lambda: page.evaluate('() => window.innerHeight'), 'Scroll down failed due to an error.'
|
|
)
|
|
if action_result:
|
|
return action_result
|
|
dy = dy_result
|
|
|
|
try:
|
|
await browser_session._scroll_container(cast(int, dy))
|
|
except Exception as e:
|
|
# Hard fallback: always works on root scroller
|
|
await page.evaluate('(y) => window.scrollBy(0, y)', dy)
|
|
logger.debug('Smart scroll failed; used window.scrollBy fallback', exc_info=e)
|
|
|
|
amount_str = f'{params.amount} pixels' if params.amount is not None else 'one page'
|
|
msg = f'🔍 Scrolled down the page by {amount_str}'
|
|
logger.info(msg)
|
|
return ActionResult(
|
|
extracted_content=msg, include_in_memory=True, long_term_memory=f'Scrolled down the page by {amount_str}'
|
|
)
|
|
|
|
@self.registry.action(
|
|
'Scroll up the page by pixel amount - if none is given, scroll one page',
|
|
param_model=ScrollAction,
|
|
)
|
|
async def scroll_up(params: ScrollAction, browser_session: BrowserSession):
|
|
page = await browser_session.get_current_page()
|
|
if params.amount:
|
|
dy = -(params.amount)
|
|
else:
|
|
# Get window height with retries
|
|
dy_result, action_result = await retry_async_function(
|
|
lambda: page.evaluate('() => window.innerHeight'), 'Scroll up failed due to an error.'
|
|
)
|
|
if action_result:
|
|
return action_result
|
|
dy = -(dy_result or 0)
|
|
|
|
try:
|
|
await browser_session._scroll_container(dy)
|
|
except Exception as e:
|
|
await page.evaluate('(y) => window.scrollBy(0, y)', dy)
|
|
logger.debug('Smart scroll failed; used window.scrollBy fallback', exc_info=e)
|
|
|
|
amount_str = f'{params.amount} pixels' if params.amount is not None else 'one page'
|
|
msg = f'🔍 Scrolled up the page by {amount_str}'
|
|
logger.info(msg)
|
|
return ActionResult(
|
|
extracted_content=msg, include_in_memory=True, long_term_memory=f'Scrolled up the page by{amount_str}'
|
|
)
|
|
|
|
# send keys
|
|
@self.registry.action(
|
|
'Send strings of special keys like Escape,Backspace, Insert, PageDown, Delete, Enter, Shortcuts such as `Control+o`, `Control+Shift+T` are supported as well. This gets used in keyboard.press. ',
|
|
param_model=SendKeysAction,
|
|
)
|
|
async def send_keys(params: SendKeysAction, page: Page):
|
|
try:
|
|
await page.keyboard.press(params.keys)
|
|
except Exception as e:
|
|
if 'Unknown key' in str(e):
|
|
# loop over the keys and try to send each one
|
|
for key in params.keys:
|
|
try:
|
|
await page.keyboard.press(key)
|
|
except Exception as e:
|
|
logger.debug(f'Error sending key {key}: {str(e)}')
|
|
raise e
|
|
else:
|
|
raise e
|
|
msg = f'⌨️ Sent keys: {params.keys}'
|
|
logger.info(msg)
|
|
return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=f'Sent keys: {params.keys}')
|
|
|
|
@self.registry.action(
|
|
description='If you dont find something which you want to interact with, scroll to it',
|
|
)
|
|
async def scroll_to_text(text: str, page: Page): # type: ignore
|
|
try:
|
|
# Try different locator strategies
|
|
locators = [
|
|
page.get_by_text(text, exact=False),
|
|
page.locator(f'text={text}'),
|
|
page.locator(f"//*[contains(text(), '{text}')]"),
|
|
]
|
|
|
|
for locator in locators:
|
|
try:
|
|
if await locator.count() == 0:
|
|
continue
|
|
|
|
element = locator.first
|
|
is_visible = await element.is_visible()
|
|
bbox = await element.bounding_box()
|
|
|
|
if is_visible and bbox is not None and bbox['width'] > 0 and bbox['height'] > 0:
|
|
await element.scroll_into_view_if_needed()
|
|
await asyncio.sleep(0.5) # Wait for scroll to complete
|
|
msg = f'🔍 Scrolled to text: {text}'
|
|
logger.info(msg)
|
|
return ActionResult(
|
|
extracted_content=msg, include_in_memory=True, long_term_memory=f'Scrolled to text: {text}'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.debug(f'Locator attempt failed: {str(e)}')
|
|
continue
|
|
|
|
msg = f"Text '{text}' not found or not visible on page"
|
|
logger.info(msg)
|
|
return ActionResult(
|
|
extracted_content=msg,
|
|
include_in_memory=True,
|
|
long_term_memory=f"Tried scrolling to text '{text}' but it was not found",
|
|
)
|
|
|
|
except Exception as e:
|
|
msg = f"Failed to scroll to text '{text}': {str(e)}"
|
|
logger.error(msg)
|
|
return ActionResult(error=msg, include_in_memory=True)
|
|
|
|
# File System Actions
|
|
@self.registry.action('Write content to file_name in file system, use only .md or .txt extensions.')
|
|
async def write_file(file_name: str, content: str, file_system: FileSystem):
|
|
result = await file_system.write_file(file_name, content)
|
|
logger.info(f'💾 {result}')
|
|
return ActionResult(extracted_content=result, include_in_memory=True, long_term_memory=result)
|
|
|
|
@self.registry.action('Append content to file_name in file system')
|
|
async def append_file(file_name: str, content: str, file_system: FileSystem):
|
|
result = await file_system.append_file(file_name, content)
|
|
logger.info(f'💾 {result}')
|
|
return ActionResult(extracted_content=result, include_in_memory=True, long_term_memory=result)
|
|
|
|
@self.registry.action('Read file_name from file system')
|
|
async def read_file(file_name: str, available_file_paths: list[str], file_system: FileSystem):
|
|
if available_file_paths and file_name in available_file_paths:
|
|
import anyio
|
|
|
|
async with await anyio.open_file(file_name, 'r') as f:
|
|
content = await f.read()
|
|
result = f'Read from file {file_name}.\n<content>\n{content}\n</content>'
|
|
else:
|
|
result = file_system.read_file(file_name)
|
|
|
|
MAX_MEMORY_SIZE = 1000
|
|
if len(result) > MAX_MEMORY_SIZE:
|
|
lines = result.splitlines()
|
|
display = ''
|
|
lines_count = 0
|
|
for line in lines:
|
|
if len(display) + len(line) < MAX_MEMORY_SIZE:
|
|
display += line + '\n'
|
|
lines_count += 1
|
|
else:
|
|
break
|
|
remaining_lines = len(lines) - lines_count
|
|
memory = f'{display}{remaining_lines} more lines...' if remaining_lines > 0 else display
|
|
else:
|
|
memory = result
|
|
logger.info(f'💾 {memory}')
|
|
return ActionResult(
|
|
extracted_content=result,
|
|
include_in_memory=True,
|
|
long_term_memory=memory,
|
|
include_extracted_content_only_once=True,
|
|
)
|
|
|
|
@self.registry.action(
|
|
description='Get all options from a native dropdown',
|
|
)
|
|
async def get_dropdown_options(index: int, browser_session: BrowserSession) -> ActionResult:
|
|
"""Get all options from a native dropdown"""
|
|
page = await browser_session.get_current_page()
|
|
selector_map = await browser_session.get_selector_map()
|
|
dom_element = selector_map[index]
|
|
|
|
try:
|
|
# Frame-aware approach since we know it works
|
|
all_options = []
|
|
frame_index = 0
|
|
|
|
for frame in page.frames:
|
|
try:
|
|
options = await frame.evaluate(
|
|
"""
|
|
(xpath) => {
|
|
const select = document.evaluate(xpath, document, null,
|
|
XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
|
|
if (!select) return null;
|
|
|
|
return {
|
|
options: Array.from(select.options).map(opt => ({
|
|
text: opt.text, //do not trim, because we are doing exact match in select_dropdown_option
|
|
value: opt.value,
|
|
index: opt.index
|
|
})),
|
|
id: select.id,
|
|
name: select.name
|
|
};
|
|
}
|
|
""",
|
|
dom_element.xpath,
|
|
)
|
|
|
|
if options:
|
|
logger.debug(f'Found dropdown in frame {frame_index}')
|
|
logger.debug(f'Dropdown ID: {options["id"]}, Name: {options["name"]}')
|
|
|
|
formatted_options = []
|
|
for opt in options['options']:
|
|
# encoding ensures AI uses the exact string in select_dropdown_option
|
|
encoded_text = json.dumps(opt['text'])
|
|
formatted_options.append(f'{opt["index"]}: text={encoded_text}')
|
|
|
|
all_options.extend(formatted_options)
|
|
|
|
except Exception as frame_e:
|
|
logger.debug(f'Frame {frame_index} evaluation failed: {str(frame_e)}')
|
|
|
|
frame_index += 1
|
|
|
|
if all_options:
|
|
msg = '\n'.join(all_options)
|
|
msg += '\nUse the exact text string in select_dropdown_option'
|
|
logger.info(msg)
|
|
return ActionResult(
|
|
extracted_content=msg,
|
|
include_in_memory=True,
|
|
long_term_memory=f'Found dropdown options for index {index}.',
|
|
include_extracted_content_only_once=True,
|
|
)
|
|
else:
|
|
msg = 'No options found in any frame for dropdown'
|
|
logger.info(msg)
|
|
return ActionResult(
|
|
extracted_content=msg, include_in_memory=True, long_term_memory='No dropdown options found'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f'Failed to get dropdown options: {str(e)}')
|
|
msg = f'Error getting options: {str(e)}'
|
|
logger.info(msg)
|
|
return ActionResult(extracted_content=msg, include_in_memory=True)
|
|
|
|
@self.registry.action(
|
|
description='Select dropdown option for interactive element index by the text of the option you want to select',
|
|
)
|
|
async def select_dropdown_option(
|
|
index: int,
|
|
text: str,
|
|
browser_session: BrowserSession,
|
|
) -> ActionResult:
|
|
"""Select dropdown option by the text of the option you want to select"""
|
|
page = await browser_session.get_current_page()
|
|
selector_map = await browser_session.get_selector_map()
|
|
dom_element = selector_map[index]
|
|
|
|
# Validate that we're working with a select element
|
|
if dom_element.tag_name != 'select':
|
|
logger.error(f'Element is not a select! Tag: {dom_element.tag_name}, Attributes: {dom_element.attributes}')
|
|
msg = f'Cannot select option: Element with index {index} is a {dom_element.tag_name}, not a select'
|
|
return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=msg)
|
|
|
|
logger.debug(f"Attempting to select '{text}' using xpath: {dom_element.xpath}")
|
|
logger.debug(f'Element attributes: {dom_element.attributes}')
|
|
logger.debug(f'Element tag: {dom_element.tag_name}')
|
|
|
|
xpath = '//' + dom_element.xpath
|
|
|
|
try:
|
|
frame_index = 0
|
|
for frame in page.frames:
|
|
try:
|
|
logger.debug(f'Trying frame {frame_index} URL: {frame.url}')
|
|
|
|
# First verify we can find the dropdown in this frame
|
|
find_dropdown_js = """
|
|
(xpath) => {
|
|
try {
|
|
const select = document.evaluate(xpath, document, null,
|
|
XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
|
|
if (!select) return null;
|
|
if (select.tagName.toLowerCase() !== 'select') {
|
|
return {
|
|
error: `Found element but it's a ${select.tagName}, not a SELECT`,
|
|
found: false
|
|
};
|
|
}
|
|
return {
|
|
id: select.id,
|
|
name: select.name,
|
|
found: true,
|
|
tagName: select.tagName,
|
|
optionCount: select.options.length,
|
|
currentValue: select.value,
|
|
availableOptions: Array.from(select.options).map(o => o.text.trim())
|
|
};
|
|
} catch (e) {
|
|
return {error: e.toString(), found: false};
|
|
}
|
|
}
|
|
"""
|
|
|
|
dropdown_info = await frame.evaluate(find_dropdown_js, dom_element.xpath)
|
|
|
|
if dropdown_info:
|
|
if not dropdown_info.get('found'):
|
|
logger.error(f'Frame {frame_index} error: {dropdown_info.get("error")}')
|
|
continue
|
|
|
|
logger.debug(f'Found dropdown in frame {frame_index}: {dropdown_info}')
|
|
|
|
# "label" because we are selecting by text
|
|
# nth(0) to disable error thrown by strict mode
|
|
# timeout=1000 because we are already waiting for all network events, therefore ideally we don't need to wait a lot here (default 30s)
|
|
selected_option_values = (
|
|
await frame.locator('//' + dom_element.xpath).nth(0).select_option(label=text, timeout=1000)
|
|
)
|
|
|
|
msg = f'selected option {text} with value {selected_option_values}'
|
|
logger.info(msg + f' in frame {frame_index}')
|
|
|
|
return ActionResult(
|
|
extracted_content=msg, include_in_memory=True, long_term_memory=f"Selected option '{text}'"
|
|
)
|
|
|
|
except Exception as frame_e:
|
|
logger.error(f'Frame {frame_index} attempt failed: {str(frame_e)}')
|
|
logger.error(f'Frame type: {type(frame)}')
|
|
logger.error(f'Frame URL: {frame.url}')
|
|
|
|
frame_index += 1
|
|
|
|
msg = f"Could not select option '{text}' in any frame"
|
|
logger.info(msg)
|
|
return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=msg)
|
|
|
|
except Exception as e:
|
|
msg = f'Selection failed: {str(e)}'
|
|
logger.error(msg)
|
|
return ActionResult(error=msg, include_in_memory=True)
|
|
|
|
@self.registry.action(
|
|
'Drag and drop elements or between coordinates on the page - useful for canvas drawing, sortable lists, sliders, file uploads, and UI rearrangement',
|
|
param_model=DragDropAction,
|
|
)
|
|
async def drag_drop(params: DragDropAction, page: Page) -> ActionResult:
|
|
"""
|
|
Performs a precise drag and drop operation between elements or coordinates.
|
|
"""
|
|
|
|
async def get_drag_elements(
|
|
page: Page,
|
|
source_selector: str,
|
|
target_selector: str,
|
|
) -> tuple[ElementHandle | None, ElementHandle | None]:
|
|
"""Get source and target elements with appropriate error handling."""
|
|
source_element = None
|
|
target_element = None
|
|
|
|
try:
|
|
# page.locator() auto-detects CSS and XPath
|
|
source_locator = page.locator(source_selector)
|
|
target_locator = page.locator(target_selector)
|
|
|
|
# Check if elements exist
|
|
source_count = await source_locator.count()
|
|
target_count = await target_locator.count()
|
|
|
|
if source_count > 0:
|
|
source_element = await source_locator.first.element_handle()
|
|
logger.debug(f'Found source element with selector: {source_selector}')
|
|
else:
|
|
logger.warning(f'Source element not found: {source_selector}')
|
|
|
|
if target_count > 0:
|
|
target_element = await target_locator.first.element_handle()
|
|
logger.debug(f'Found target element with selector: {target_selector}')
|
|
else:
|
|
logger.warning(f'Target element not found: {target_selector}')
|
|
|
|
except Exception as e:
|
|
logger.error(f'Error finding elements: {str(e)}')
|
|
|
|
return source_element, target_element
|
|
|
|
async def get_element_coordinates(
|
|
source_element: ElementHandle,
|
|
target_element: ElementHandle,
|
|
source_position: Position | None,
|
|
target_position: Position | None,
|
|
) -> tuple[tuple[int, int] | None, tuple[int, int] | None]:
|
|
"""Get coordinates from elements with appropriate error handling."""
|
|
source_coords = None
|
|
target_coords = None
|
|
|
|
try:
|
|
# Get source coordinates
|
|
if source_position:
|
|
source_coords = (source_position.x, source_position.y)
|
|
else:
|
|
source_box = await source_element.bounding_box()
|
|
if source_box:
|
|
source_coords = (
|
|
int(source_box['x'] + source_box['width'] / 2),
|
|
int(source_box['y'] + source_box['height'] / 2),
|
|
)
|
|
|
|
# Get target coordinates
|
|
if target_position:
|
|
target_coords = (target_position.x, target_position.y)
|
|
else:
|
|
target_box = await target_element.bounding_box()
|
|
if target_box:
|
|
target_coords = (
|
|
int(target_box['x'] + target_box['width'] / 2),
|
|
int(target_box['y'] + target_box['height'] / 2),
|
|
)
|
|
except Exception as e:
|
|
logger.error(f'Error getting element coordinates: {str(e)}')
|
|
|
|
return source_coords, target_coords
|
|
|
|
async def execute_drag_operation(
|
|
page: Page,
|
|
source_x: int,
|
|
source_y: int,
|
|
target_x: int,
|
|
target_y: int,
|
|
steps: int,
|
|
delay_ms: int,
|
|
) -> tuple[bool, str]:
|
|
"""Execute the drag operation with comprehensive error handling."""
|
|
try:
|
|
# Try to move to source position
|
|
try:
|
|
await page.mouse.move(source_x, source_y)
|
|
logger.debug(f'Moved to source position ({source_x}, {source_y})')
|
|
except Exception as e:
|
|
logger.error(f'Failed to move to source position: {str(e)}')
|
|
return False, f'Failed to move to source position: {str(e)}'
|
|
|
|
# Press mouse button down
|
|
await page.mouse.down()
|
|
|
|
# Move to target position with intermediate steps
|
|
for i in range(1, steps + 1):
|
|
ratio = i / steps
|
|
intermediate_x = int(source_x + (target_x - source_x) * ratio)
|
|
intermediate_y = int(source_y + (target_y - source_y) * ratio)
|
|
|
|
await page.mouse.move(intermediate_x, intermediate_y)
|
|
|
|
if delay_ms > 0:
|
|
await asyncio.sleep(delay_ms / 1000)
|
|
|
|
# Move to final target position
|
|
await page.mouse.move(target_x, target_y)
|
|
|
|
# Move again to ensure dragover events are properly triggered
|
|
await page.mouse.move(target_x, target_y)
|
|
|
|
# Release mouse button
|
|
await page.mouse.up()
|
|
|
|
return True, 'Drag operation completed successfully'
|
|
|
|
except Exception as e:
|
|
return False, f'Error during drag operation: {str(e)}'
|
|
|
|
try:
|
|
# Initialize variables
|
|
source_x: int | None = None
|
|
source_y: int | None = None
|
|
target_x: int | None = None
|
|
target_y: int | None = None
|
|
|
|
# Normalize parameters
|
|
steps = max(1, params.steps or 10)
|
|
delay_ms = max(0, params.delay_ms or 5)
|
|
|
|
# Case 1: Element selectors provided
|
|
if params.element_source and params.element_target:
|
|
logger.debug('Using element-based approach with selectors')
|
|
|
|
source_element, target_element = await get_drag_elements(
|
|
page,
|
|
params.element_source,
|
|
params.element_target,
|
|
)
|
|
|
|
if not source_element or not target_element:
|
|
error_msg = f'Failed to find {"source" if not source_element else "target"} element'
|
|
return ActionResult(error=error_msg, include_in_memory=True)
|
|
|
|
source_coords, target_coords = await get_element_coordinates(
|
|
source_element, target_element, params.element_source_offset, params.element_target_offset
|
|
)
|
|
|
|
if not source_coords or not target_coords:
|
|
error_msg = f'Failed to determine {"source" if not source_coords else "target"} coordinates'
|
|
return ActionResult(error=error_msg, include_in_memory=True)
|
|
|
|
source_x, source_y = source_coords
|
|
target_x, target_y = target_coords
|
|
|
|
# Case 2: Coordinates provided directly
|
|
elif all(
|
|
coord is not None
|
|
for coord in [params.coord_source_x, params.coord_source_y, params.coord_target_x, params.coord_target_y]
|
|
):
|
|
logger.debug('Using coordinate-based approach')
|
|
source_x = params.coord_source_x
|
|
source_y = params.coord_source_y
|
|
target_x = params.coord_target_x
|
|
target_y = params.coord_target_y
|
|
else:
|
|
error_msg = 'Must provide either source/target selectors or source/target coordinates'
|
|
return ActionResult(error=error_msg, include_in_memory=True)
|
|
|
|
# Validate coordinates
|
|
if any(coord is None for coord in [source_x, source_y, target_x, target_y]):
|
|
error_msg = 'Failed to determine source or target coordinates'
|
|
return ActionResult(error=error_msg, include_in_memory=True)
|
|
|
|
# Perform the drag operation
|
|
success, message = await execute_drag_operation(
|
|
page,
|
|
cast(int, source_x),
|
|
cast(int, source_y),
|
|
cast(int, target_x),
|
|
cast(int, target_y),
|
|
steps,
|
|
delay_ms,
|
|
)
|
|
|
|
if not success:
|
|
logger.error(f'Drag operation failed: {message}')
|
|
return ActionResult(error=message, include_in_memory=True)
|
|
|
|
# Create descriptive message
|
|
if params.element_source and params.element_target:
|
|
msg = f"🖱️ Dragged element '{params.element_source}' to '{params.element_target}'"
|
|
else:
|
|
msg = f'🖱️ Dragged from ({source_x}, {source_y}) to ({target_x}, {target_y})'
|
|
|
|
logger.info(msg)
|
|
return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=msg)
|
|
|
|
except Exception as e:
|
|
error_msg = f'Failed to perform drag and drop: {str(e)}'
|
|
logger.error(error_msg)
|
|
return ActionResult(error=error_msg, include_in_memory=True)
|
|
|
|
@self.registry.action('Google Sheets: Get the contents of the entire sheet', domains=['https://docs.google.com'])
|
|
async def read_sheet_contents(page: Page):
|
|
# select all cells
|
|
await page.keyboard.press('Enter')
|
|
await page.keyboard.press('Escape')
|
|
await page.keyboard.press('ControlOrMeta+A')
|
|
await page.keyboard.press('ControlOrMeta+C')
|
|
|
|
extracted_tsv = await page.evaluate('() => navigator.clipboard.readText()')
|
|
return ActionResult(
|
|
extracted_content=extracted_tsv,
|
|
include_in_memory=True,
|
|
long_term_memory='Retrieved sheet contents',
|
|
include_extracted_content_only_once=True,
|
|
)
|
|
|
|
@self.registry.action('Google Sheets: Get the contents of a cell or range of cells', domains=['https://docs.google.com'])
|
|
async def read_cell_contents(cell_or_range: str, browser_session: BrowserSession):
|
|
page = await browser_session.get_current_page()
|
|
|
|
await select_cell_or_range(cell_or_range=cell_or_range, page=page)
|
|
|
|
await page.keyboard.press('ControlOrMeta+C')
|
|
await asyncio.sleep(0.1)
|
|
extracted_tsv = await page.evaluate('() => navigator.clipboard.readText()')
|
|
return ActionResult(
|
|
extracted_content=extracted_tsv,
|
|
include_in_memory=True,
|
|
long_term_memory=f'Retrieved contents from {cell_or_range}',
|
|
include_extracted_content_only_once=True,
|
|
)
|
|
|
|
@self.registry.action(
|
|
'Google Sheets: Update the content of a cell or range of cells', domains=['https://docs.google.com']
|
|
)
|
|
async def update_cell_contents(cell_or_range: str, new_contents_tsv: str, browser_session: BrowserSession):
|
|
page = await browser_session.get_current_page()
|
|
|
|
await select_cell_or_range(cell_or_range=cell_or_range, page=page)
|
|
|
|
# simulate paste event from clipboard with TSV content
|
|
await page.evaluate(f"""
|
|
const clipboardData = new DataTransfer();
|
|
clipboardData.setData('text/plain', `{new_contents_tsv}`);
|
|
document.activeElement.dispatchEvent(new ClipboardEvent('paste', {{clipboardData}}));
|
|
""")
|
|
|
|
return ActionResult(
|
|
extracted_content=f'Updated cells: {cell_or_range} = {new_contents_tsv}',
|
|
include_in_memory=False,
|
|
long_term_memory=f'Updated cells {cell_or_range} with {new_contents_tsv}',
|
|
)
|
|
|
|
@self.registry.action('Google Sheets: Clear whatever cells are currently selected', domains=['https://docs.google.com'])
|
|
async def clear_cell_contents(cell_or_range: str, browser_session: BrowserSession):
|
|
page = await browser_session.get_current_page()
|
|
|
|
await select_cell_or_range(cell_or_range=cell_or_range, page=page)
|
|
|
|
await page.keyboard.press('Backspace')
|
|
return ActionResult(
|
|
extracted_content=f'Cleared cells: {cell_or_range}',
|
|
include_in_memory=False,
|
|
long_term_memory=f'Cleared cells {cell_or_range}',
|
|
)
|
|
|
|
@self.registry.action('Google Sheets: Select a specific cell or range of cells', domains=['https://docs.google.com'])
|
|
async def select_cell_or_range(cell_or_range: str, page: Page):
|
|
await page.keyboard.press('Enter') # make sure we dont delete current cell contents if we were last editing
|
|
await page.keyboard.press('Escape') # to clear current focus (otherwise select range popup is additive)
|
|
await asyncio.sleep(0.1)
|
|
await page.keyboard.press('Home') # move cursor to the top left of the sheet first
|
|
await page.keyboard.press('ArrowUp')
|
|
await asyncio.sleep(0.1)
|
|
await page.keyboard.press('Control+G') # open the goto range popup
|
|
await asyncio.sleep(0.2)
|
|
await page.keyboard.type(cell_or_range, delay=0.05)
|
|
await asyncio.sleep(0.2)
|
|
await page.keyboard.press('Enter')
|
|
await asyncio.sleep(0.2)
|
|
await page.keyboard.press('Escape') # to make sure the popup still closes in the case where the jump failed
|
|
return ActionResult(
|
|
extracted_content=f'Selected cells: {cell_or_range}',
|
|
include_in_memory=False,
|
|
long_term_memory=f'Selected cells {cell_or_range}',
|
|
)
|
|
|
|
@self.registry.action(
|
|
'Google Sheets: Fallback method to type text into (only one) currently selected cell',
|
|
domains=['https://docs.google.com'],
|
|
)
|
|
async def fallback_input_into_single_selected_cell(text: str, page: Page):
|
|
await page.keyboard.type(text, delay=0.1)
|
|
await page.keyboard.press('Enter') # make sure to commit the input so it doesn't get overwritten by the next action
|
|
await page.keyboard.press('ArrowUp')
|
|
return ActionResult(
|
|
extracted_content=f'Inputted text {text}',
|
|
include_in_memory=False,
|
|
long_term_memory=f"Inputted text '{text}' into cell",
|
|
)
|
|
|
|
# Register ---------------------------------------------------------------
|
|
|
|
def action(self, description: str, **kwargs):
|
|
"""Decorator for registering custom actions
|
|
|
|
@param description: Describe the LLM what the function does (better description == better function calling)
|
|
"""
|
|
return self.registry.action(description, **kwargs)
|
|
|
|
# Act --------------------------------------------------------------------
|
|
|
|
@time_execution_sync('--act')
|
|
async def act(
|
|
self,
|
|
action: ActionModel,
|
|
browser_session: BrowserSession,
|
|
#
|
|
page_extraction_llm: BaseChatModel | None = None,
|
|
sensitive_data: dict[str, str | dict[str, str]] | None = None,
|
|
available_file_paths: list[str] | None = None,
|
|
file_system: FileSystem | None = None,
|
|
#
|
|
context: Context | None = None,
|
|
) -> ActionResult:
|
|
"""Execute an action"""
|
|
|
|
for action_name, params in action.model_dump(exclude_unset=True).items():
|
|
if params is not None:
|
|
# with Laminar.start_as_current_span(
|
|
# name=action_name,
|
|
# input={
|
|
# 'action': action_name,
|
|
# 'params': params,
|
|
# },
|
|
# span_type='TOOL',
|
|
# ):
|
|
result = await self.registry.execute_action(
|
|
action_name=action_name,
|
|
params=params,
|
|
browser_session=browser_session,
|
|
page_extraction_llm=page_extraction_llm,
|
|
file_system=file_system,
|
|
sensitive_data=sensitive_data,
|
|
available_file_paths=available_file_paths,
|
|
context=context,
|
|
)
|
|
|
|
# Laminar.set_span_output(result)
|
|
|
|
if isinstance(result, str):
|
|
return ActionResult(extracted_content=result)
|
|
elif isinstance(result, ActionResult):
|
|
return result
|
|
elif result is None:
|
|
return ActionResult()
|
|
else:
|
|
raise ValueError(f'Invalid action result type: {type(result)} of {result}')
|
|
return ActionResult()
|