Files
browser-use/browser_use/controller/service.py

790 lines
28 KiB
Python

import asyncio
import enum
import json
import logging
import re
from typing import Dict, Generic, Optional, Tuple, Type, TypeVar, cast
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.prompts import PromptTemplate
from patchright.async_api import ElementHandle, Page
# from lmnr.sdk.laminar import Laminar
from pydantic import BaseModel
from browser_use.agent.views import ActionModel, ActionResult
from browser_use.browser.context import BrowserContext
from browser_use.controller.registry.service import Registry
from browser_use.controller.views import (
ClickElementAction,
CloseTabAction,
DoneAction,
DragDropAction,
GoToUrlAction,
InputTextAction,
NoParamsAction,
OpenTabAction,
Position,
ScrollAction,
SearchGoogleAction,
SendKeysAction,
SwitchTabAction,
)
from browser_use.utils import time_execution_sync
logger = logging.getLogger(__name__)
Context = TypeVar('Context')
class Controller(Generic[Context]):
def __init__(
self,
exclude_actions: list[str] = [],
output_model: Optional[Type[BaseModel]] = None,
):
self.registry = Registry[Context](exclude_actions)
"""Register all default browser actions"""
if output_model is not None:
# Create a new model that extends the output model with success parameter
class ExtendedOutputModel(BaseModel): # type: ignore
success: bool = True
data: output_model # type: ignore
@self.registry.action(
'Complete task - with return text and if the task is finished (success=True) or not yet completely finished (success=False), because last step is reached',
param_model=ExtendedOutputModel,
)
async def done(params: ExtendedOutputModel):
# Exclude success from the output JSON since it's an internal parameter
output_dict = params.data.model_dump()
# Enums are not serializable, convert to string
for key, value in output_dict.items():
if isinstance(value, enum.Enum):
output_dict[key] = value.value
return ActionResult(is_done=True, success=params.success, extracted_content=json.dumps(output_dict))
else:
@self.registry.action(
'Complete task - with return text and if the task is finished (success=True) or not yet completely finished (success=False), because last step is reached',
param_model=DoneAction,
)
async def done(params: DoneAction):
return ActionResult(is_done=True, success=params.success, extracted_content=params.text)
# Basic Navigation Actions
@self.registry.action(
'Search the query in Google in the current tab, the query should be a search query like humans search in Google, concrete and not vague or super long. More the single most important items. ',
param_model=SearchGoogleAction,
)
async def search_google(params: SearchGoogleAction, browser: BrowserContext):
page = await browser.get_current_page()
await page.goto(f'https://www.google.com/search?q={params.query}&udm=14')
await page.wait_for_load_state()
msg = f'🔍 Searched for "{params.query}" in Google'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
@self.registry.action('Navigate to URL in the current tab', param_model=GoToUrlAction)
async def go_to_url(params: GoToUrlAction, browser: BrowserContext):
page = await browser.get_current_page()
await page.goto(params.url)
await page.wait_for_load_state()
msg = f'🔗 Navigated to {params.url}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
@self.registry.action('Go back', param_model=NoParamsAction)
async def go_back(_: NoParamsAction, browser: BrowserContext):
await browser.go_back()
msg = '🔙 Navigated back'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
# wait for x seconds
@self.registry.action('Wait for x seconds default 3')
async def wait(seconds: int = 3):
msg = f'🕒 Waiting for {seconds} seconds'
logger.info(msg)
await asyncio.sleep(seconds)
return ActionResult(extracted_content=msg, include_in_memory=True)
# Element Interaction Actions
@self.registry.action('Click element by index', param_model=ClickElementAction)
async def click_element_by_index(params: ClickElementAction, browser: BrowserContext):
session = await browser.get_session()
if params.index not in await browser.get_selector_map():
raise Exception(f'Element with index {params.index} does not exist - retry or use alternative actions')
element_node = await browser.get_dom_element_by_index(params.index)
initial_pages = len(session.context.pages)
# if element has file uploader then dont click
if await browser.is_file_uploader(element_node):
msg = f'Index {params.index} - has an element which opens file upload dialog. To upload files please use a specific function to upload files '
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
msg = None
try:
download_path = await browser._click_element_node(element_node)
if download_path:
msg = f'💾 Downloaded file to {download_path}'
else:
msg = f'🖱️ Clicked button with index {params.index}: {element_node.get_all_text_till_next_clickable_element(max_depth=2)}'
logger.info(msg)
logger.debug(f'Element xpath: {element_node.xpath}')
if len(session.context.pages) > initial_pages:
new_tab_msg = 'New tab opened - switching to it'
msg += f' - {new_tab_msg}'
logger.info(new_tab_msg)
await browser.switch_to_tab(-1)
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
logger.warning(f'Element not clickable with index {params.index} - most likely the page changed')
return ActionResult(error=str(e))
@self.registry.action(
'Input text into a input interactive element',
param_model=InputTextAction,
)
async def input_text(params: InputTextAction, browser: BrowserContext, has_sensitive_data: bool = False):
if params.index not in await browser.get_selector_map():
raise Exception(f'Element index {params.index} does not exist - retry or use alternative actions')
element_node = await browser.get_dom_element_by_index(params.index)
await browser._input_text_element_node(element_node, params.text)
if not has_sensitive_data:
msg = f'⌨️ Input {params.text} into index {params.index}'
else:
msg = f'⌨️ Input sensitive data into index {params.index}'
logger.info(msg)
logger.debug(f'Element xpath: {element_node.xpath}')
return ActionResult(extracted_content=msg, include_in_memory=True)
# Save PDF
@self.registry.action(
'Save the current page as a PDF file',
)
async def save_pdf(browser: BrowserContext):
page = await browser.get_current_page()
short_url = re.sub(r'^https?://(?:www\.)?|/$', '', page.url)
slug = re.sub(r'[^a-zA-Z0-9]+', '-', short_url).strip('-').lower()
sanitized_filename = f'{slug}.pdf'
await page.emulate_media(media='screen')
await page.pdf(path=sanitized_filename, format='A4', print_background=False)
msg = f'Saving page with URL {page.url} as PDF to ./{sanitized_filename}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
# Tab Management Actions
@self.registry.action('Switch tab', param_model=SwitchTabAction)
async def switch_tab(params: SwitchTabAction, browser: BrowserContext):
await browser.switch_to_tab(params.page_id)
# Wait for tab to be ready
page = await browser.get_current_page()
await page.wait_for_load_state()
msg = f'🔄 Switched to tab {params.page_id}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
@self.registry.action('Open url in new tab', param_model=OpenTabAction)
async def open_tab(params: OpenTabAction, browser: BrowserContext):
await browser.create_new_tab(params.url)
msg = f'🔗 Opened new tab with {params.url}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
@self.registry.action('Close an existing tab', param_model=CloseTabAction)
async def close_tab(params: CloseTabAction, browser: BrowserContext):
await browser.switch_to_tab(params.page_id)
page = await browser.get_current_page()
url = page.url
await page.close()
msg = f'❌ Closed tab #{params.page_id} with url {url}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
# Content Actions
@self.registry.action(
'Extract page content to retrieve specific information from the page, e.g. all company names, a specific description, all information about, links with companies in structured format or simply links',
)
async def extract_content(
goal: str, should_strip_link_urls: bool, browser: BrowserContext, page_extraction_llm: BaseChatModel
):
page = await browser.get_current_page()
import markdownify
strip = []
if should_strip_link_urls:
strip = ['a', 'img']
content = markdownify.markdownify(await page.content(), strip=strip)
# manually append iframe text into the content so it's readable by the LLM (includes cross-origin iframes)
for iframe in page.frames:
if iframe.url != page.url and not iframe.url.startswith('data:'):
content += f'\n\nIFRAME {iframe.url}:\n'
content += markdownify.markdownify(await iframe.content())
prompt = 'Your task is to extract the content of the page. You will be given a page and a goal and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. Extraction goal: {goal}, Page: {page}'
template = PromptTemplate(input_variables=['goal', 'page'], template=prompt)
try:
output = await page_extraction_llm.ainvoke(template.format(goal=goal, page=content))
msg = f'📄 Extracted from page\n: {output.content}\n'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
logger.debug(f'Error extracting content: {e}')
msg = f'📄 Extracted from page\n: {content}\n'
logger.info(msg)
return ActionResult(extracted_content=msg)
@self.registry.action(
'Scroll down the page by pixel amount - if no amount is specified, scroll down one page',
param_model=ScrollAction,
)
async def scroll_down(params: ScrollAction, browser: BrowserContext):
page = await browser.get_current_page()
if params.amount is not None:
await page.evaluate(f'window.scrollBy(0, {params.amount});')
else:
await page.evaluate('window.scrollBy(0, window.innerHeight);')
amount = f'{params.amount} pixels' if params.amount is not None else 'one page'
msg = f'🔍 Scrolled down the page by {amount}'
logger.info(msg)
return ActionResult(
extracted_content=msg,
include_in_memory=True,
)
# scroll up
@self.registry.action(
'Scroll up the page by pixel amount - if no amount is specified, scroll up one page',
param_model=ScrollAction,
)
async def scroll_up(params: ScrollAction, browser: BrowserContext):
page = await browser.get_current_page()
if params.amount is not None:
await page.evaluate(f'window.scrollBy(0, -{params.amount});')
else:
await page.evaluate('window.scrollBy(0, -window.innerHeight);')
amount = f'{params.amount} pixels' if params.amount is not None else 'one page'
msg = f'🔍 Scrolled up the page by {amount}'
logger.info(msg)
return ActionResult(
extracted_content=msg,
include_in_memory=True,
)
# send keys
@self.registry.action(
'Send strings of special keys like Escape,Backspace, Insert, PageDown, Delete, Enter, Shortcuts such as `Control+o`, `Control+Shift+T` are supported as well. This gets used in keyboard.press. ',
param_model=SendKeysAction,
)
async def send_keys(params: SendKeysAction, browser: BrowserContext):
page = await browser.get_current_page()
try:
await page.keyboard.press(params.keys)
except Exception as e:
if 'Unknown key' in str(e):
# loop over the keys and try to send each one
for key in params.keys:
try:
await page.keyboard.press(key)
except Exception as e:
logger.debug(f'Error sending key {key}: {str(e)}')
raise e
else:
raise e
msg = f'⌨️ Sent keys: {params.keys}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
@self.registry.action(
description='If you dont find something which you want to interact with, scroll to it',
)
async def scroll_to_text(text: str, browser: BrowserContext): # type: ignore
page = await browser.get_current_page()
try:
# Try different locator strategies
locators = [
page.get_by_text(text, exact=False),
page.locator(f'text={text}'),
page.locator(f"//*[contains(text(), '{text}')]"),
]
for locator in locators:
try:
# First check if element exists and is visible
if await locator.count() > 0 and await locator.first.is_visible():
await locator.first.scroll_into_view_if_needed()
await asyncio.sleep(0.5) # Wait for scroll to complete
msg = f'🔍 Scrolled to text: {text}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
logger.debug(f'Locator attempt failed: {str(e)}')
continue
msg = f"Text '{text}' not found or not visible on page"
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
msg = f"Failed to scroll to text '{text}': {str(e)}"
logger.error(msg)
return ActionResult(error=msg, include_in_memory=True)
@self.registry.action(
description='Get all options from a native dropdown',
)
async def get_dropdown_options(index: int, browser: BrowserContext) -> ActionResult:
"""Get all options from a native dropdown"""
page = await browser.get_current_page()
selector_map = await browser.get_selector_map()
dom_element = selector_map[index]
try:
# Frame-aware approach since we know it works
all_options = []
frame_index = 0
for frame in page.frames:
try:
options = await frame.evaluate(
"""
(xpath) => {
const select = document.evaluate(xpath, document, null,
XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
if (!select) return null;
return {
options: Array.from(select.options).map(opt => ({
text: opt.text, //do not trim, because we are doing exact match in select_dropdown_option
value: opt.value,
index: opt.index
})),
id: select.id,
name: select.name
};
}
""",
dom_element.xpath,
)
if options:
logger.debug(f'Found dropdown in frame {frame_index}')
logger.debug(f'Dropdown ID: {options["id"]}, Name: {options["name"]}')
formatted_options = []
for opt in options['options']:
# encoding ensures AI uses the exact string in select_dropdown_option
encoded_text = json.dumps(opt['text'])
formatted_options.append(f'{opt["index"]}: text={encoded_text}')
all_options.extend(formatted_options)
except Exception as frame_e:
logger.debug(f'Frame {frame_index} evaluation failed: {str(frame_e)}')
frame_index += 1
if all_options:
msg = '\n'.join(all_options)
msg += '\nUse the exact text string in select_dropdown_option'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
else:
msg = 'No options found in any frame for dropdown'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
logger.error(f'Failed to get dropdown options: {str(e)}')
msg = f'Error getting options: {str(e)}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
@self.registry.action(
description='Select dropdown option for interactive element index by the text of the option you want to select',
)
async def select_dropdown_option(
index: int,
text: str,
browser: BrowserContext,
) -> ActionResult:
"""Select dropdown option by the text of the option you want to select"""
page = await browser.get_current_page()
selector_map = await browser.get_selector_map()
dom_element = selector_map[index]
# Validate that we're working with a select element
if dom_element.tag_name != 'select':
logger.error(f'Element is not a select! Tag: {dom_element.tag_name}, Attributes: {dom_element.attributes}')
msg = f'Cannot select option: Element with index {index} is a {dom_element.tag_name}, not a select'
return ActionResult(extracted_content=msg, include_in_memory=True)
logger.debug(f"Attempting to select '{text}' using xpath: {dom_element.xpath}")
logger.debug(f'Element attributes: {dom_element.attributes}')
logger.debug(f'Element tag: {dom_element.tag_name}')
xpath = '//' + dom_element.xpath
try:
frame_index = 0
for frame in page.frames:
try:
logger.debug(f'Trying frame {frame_index} URL: {frame.url}')
# First verify we can find the dropdown in this frame
find_dropdown_js = """
(xpath) => {
try {
const select = document.evaluate(xpath, document, null,
XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
if (!select) return null;
if (select.tagName.toLowerCase() !== 'select') {
return {
error: `Found element but it's a ${select.tagName}, not a SELECT`,
found: false
};
}
return {
id: select.id,
name: select.name,
found: true,
tagName: select.tagName,
optionCount: select.options.length,
currentValue: select.value,
availableOptions: Array.from(select.options).map(o => o.text.trim())
};
} catch (e) {
return {error: e.toString(), found: false};
}
}
"""
dropdown_info = await frame.evaluate(find_dropdown_js, dom_element.xpath)
if dropdown_info:
if not dropdown_info.get('found'):
logger.error(f'Frame {frame_index} error: {dropdown_info.get("error")}')
continue
logger.debug(f'Found dropdown in frame {frame_index}: {dropdown_info}')
# "label" because we are selecting by text
# nth(0) to disable error thrown by strict mode
# timeout=1000 because we are already waiting for all network events, therefore ideally we don't need to wait a lot here (default 30s)
selected_option_values = (
await frame.locator('//' + dom_element.xpath).nth(0).select_option(label=text, timeout=1000)
)
msg = f'selected option {text} with value {selected_option_values}'
logger.info(msg + f' in frame {frame_index}')
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as frame_e:
logger.error(f'Frame {frame_index} attempt failed: {str(frame_e)}')
logger.error(f'Frame type: {type(frame)}')
logger.error(f'Frame URL: {frame.url}')
frame_index += 1
msg = f"Could not select option '{text}' in any frame"
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
msg = f'Selection failed: {str(e)}'
logger.error(msg)
return ActionResult(error=msg, include_in_memory=True)
@self.registry.action(
'Drag and drop elements or between coordinates on the page - useful for canvas drawing, sortable lists, sliders, file uploads, and UI rearrangement',
param_model=DragDropAction,
)
async def drag_drop(params: DragDropAction, browser: BrowserContext) -> ActionResult:
"""
Performs a precise drag and drop operation between elements or coordinates.
"""
async def get_drag_elements(
page: Page,
source_selector: str,
target_selector: str,
) -> Tuple[Optional[ElementHandle], Optional[ElementHandle]]:
"""Get source and target elements with appropriate error handling."""
source_element = None
target_element = None
try:
# page.locator() auto-detects CSS and XPath
source_locator = page.locator(source_selector)
target_locator = page.locator(target_selector)
# Check if elements exist
source_count = await source_locator.count()
target_count = await target_locator.count()
if source_count > 0:
source_element = await source_locator.first.element_handle()
logger.debug(f'Found source element with selector: {source_selector}')
else:
logger.warning(f'Source element not found: {source_selector}')
if target_count > 0:
target_element = await target_locator.first.element_handle()
logger.debug(f'Found target element with selector: {target_selector}')
else:
logger.warning(f'Target element not found: {target_selector}')
except Exception as e:
logger.error(f'Error finding elements: {str(e)}')
return source_element, target_element
async def get_element_coordinates(
source_element: ElementHandle,
target_element: ElementHandle,
source_position: Optional[Position],
target_position: Optional[Position],
) -> Tuple[Optional[Tuple[int, int]], Optional[Tuple[int, int]]]:
"""Get coordinates from elements with appropriate error handling."""
source_coords = None
target_coords = None
try:
# Get source coordinates
if source_position:
source_coords = (source_position.x, source_position.y)
else:
source_box = await source_element.bounding_box()
if source_box:
source_coords = (
int(source_box['x'] + source_box['width'] / 2),
int(source_box['y'] + source_box['height'] / 2),
)
# Get target coordinates
if target_position:
target_coords = (target_position.x, target_position.y)
else:
target_box = await target_element.bounding_box()
if target_box:
target_coords = (
int(target_box['x'] + target_box['width'] / 2),
int(target_box['y'] + target_box['height'] / 2),
)
except Exception as e:
logger.error(f'Error getting element coordinates: {str(e)}')
return source_coords, target_coords
async def execute_drag_operation(
page: Page,
source_x: int,
source_y: int,
target_x: int,
target_y: int,
steps: int,
delay_ms: int,
) -> Tuple[bool, str]:
"""Execute the drag operation with comprehensive error handling."""
try:
# Try to move to source position
try:
await page.mouse.move(source_x, source_y)
logger.debug(f'Moved to source position ({source_x}, {source_y})')
except Exception as e:
logger.error(f'Failed to move to source position: {str(e)}')
return False, f'Failed to move to source position: {str(e)}'
# Press mouse button down
await page.mouse.down()
# Move to target position with intermediate steps
for i in range(1, steps + 1):
ratio = i / steps
intermediate_x = int(source_x + (target_x - source_x) * ratio)
intermediate_y = int(source_y + (target_y - source_y) * ratio)
await page.mouse.move(intermediate_x, intermediate_y)
if delay_ms > 0:
await asyncio.sleep(delay_ms / 1000)
# Move to final target position
await page.mouse.move(target_x, target_y)
# Move again to ensure dragover events are properly triggered
await page.mouse.move(target_x, target_y)
# Release mouse button
await page.mouse.up()
return True, 'Drag operation completed successfully'
except Exception as e:
return False, f'Error during drag operation: {str(e)}'
page = await browser.get_current_page()
try:
# Initialize variables
source_x: Optional[int] = None
source_y: Optional[int] = None
target_x: Optional[int] = None
target_y: Optional[int] = None
# Normalize parameters
steps = max(1, params.steps or 10)
delay_ms = max(0, params.delay_ms or 5)
# Case 1: Element selectors provided
if params.element_source and params.element_target:
logger.debug('Using element-based approach with selectors')
source_element, target_element = await get_drag_elements(
page,
params.element_source,
params.element_target,
)
if not source_element or not target_element:
error_msg = f'Failed to find {"source" if not source_element else "target"} element'
return ActionResult(error=error_msg, include_in_memory=True)
source_coords, target_coords = await get_element_coordinates(
source_element, target_element, params.element_source_offset, params.element_target_offset
)
if not source_coords or not target_coords:
error_msg = f'Failed to determine {"source" if not source_coords else "target"} coordinates'
return ActionResult(error=error_msg, include_in_memory=True)
source_x, source_y = source_coords
target_x, target_y = target_coords
# Case 2: Coordinates provided directly
elif all(
coord is not None
for coord in [params.coord_source_x, params.coord_source_y, params.coord_target_x, params.coord_target_y]
):
logger.debug('Using coordinate-based approach')
source_x = params.coord_source_x
source_y = params.coord_source_y
target_x = params.coord_target_x
target_y = params.coord_target_y
else:
error_msg = 'Must provide either source/target selectors or source/target coordinates'
return ActionResult(error=error_msg, include_in_memory=True)
# Validate coordinates
if any(coord is None for coord in [source_x, source_y, target_x, target_y]):
error_msg = 'Failed to determine source or target coordinates'
return ActionResult(error=error_msg, include_in_memory=True)
# Perform the drag operation
success, message = await execute_drag_operation(
page,
cast(int, source_x),
cast(int, source_y),
cast(int, target_x),
cast(int, target_y),
steps,
delay_ms,
)
if not success:
logger.error(f'Drag operation failed: {message}')
return ActionResult(error=message, include_in_memory=True)
# Create descriptive message
if params.element_source and params.element_target:
msg = f"🖱️ Dragged element '{params.element_source}' to '{params.element_target}'"
else:
msg = f'🖱️ Dragged from ({source_x}, {source_y}) to ({target_x}, {target_y})'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
error_msg = f'Failed to perform drag and drop: {str(e)}'
logger.error(error_msg)
return ActionResult(error=error_msg, include_in_memory=True)
# Register ---------------------------------------------------------------
def action(self, description: str, **kwargs):
"""Decorator for registering custom actions
@param description: Describe the LLM what the function does (better description == better function calling)
"""
return self.registry.action(description, **kwargs)
# Act --------------------------------------------------------------------
@time_execution_sync('--act')
async def act(
self,
action: ActionModel,
browser_context: BrowserContext,
#
page_extraction_llm: Optional[BaseChatModel] = None,
sensitive_data: Optional[Dict[str, str]] = None,
available_file_paths: Optional[list[str]] = None,
#
context: Context | None = None,
) -> ActionResult:
"""Execute an action"""
try:
for action_name, params in action.model_dump(exclude_unset=True).items():
if params is not None:
# with Laminar.start_as_current_span(
# name=action_name,
# input={
# 'action': action_name,
# 'params': params,
# },
# span_type='TOOL',
# ):
result = await self.registry.execute_action(
action_name,
params,
browser=browser_context,
page_extraction_llm=page_extraction_llm,
sensitive_data=sensitive_data,
available_file_paths=available_file_paths,
context=context,
)
# Laminar.set_span_output(result)
if isinstance(result, str):
return ActionResult(extracted_content=result)
elif isinstance(result, ActionResult):
return result
elif result is None:
return ActionResult()
else:
raise ValueError(f'Invalid action result type: {type(result)} of {result}')
return ActionResult()
except Exception as e:
raise e