import asyncio import datetime import enum import json import logging import re from typing import Dict, Generic, Optional, Tuple, Type, TypeVar, cast from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.prompts import PromptTemplate from playwright.async_api import ElementHandle, Page # from lmnr.sdk.laminar import Laminar from pydantic import BaseModel from browser_use.agent.views import ActionModel, ActionResult from browser_use.browser.context import BrowserContext from browser_use.controller.registry.service import Registry from browser_use.controller.views import ( ClickElementAction, ClickElementBySelectorAction, ClickElementByTextAction, ClickElementByXpathAction, CloseTabAction, DoneAction, DragDropAction, GoToUrlAction, InputTextAction, NoParamsAction, OpenTabAction, Position, ScrollAction, SearchGoogleAction, SendKeysAction, SwitchTabAction, WaitForElementAction, ) from browser_use.utils import time_execution_sync logger = logging.getLogger(__name__) Context = TypeVar('Context') class Controller(Generic[Context]): def __init__( self, exclude_actions: list[str] = [], output_model: Optional[Type[BaseModel]] = None, ): self.registry = Registry[Context](exclude_actions) """Register all default browser actions""" if output_model is not None: # Create a new model that extends the output model with success parameter class ExtendedOutputModel(BaseModel): # type: ignore success: bool = True data: output_model @self.registry.action( 'Complete task - with return text and if the task is finished (success=True) or not yet completely finished (success=False), because last step is reached', param_model=ExtendedOutputModel, ) async def done(params: ExtendedOutputModel): # Exclude success from the output JSON since it's an internal parameter output_dict = params.data.model_dump() # Enums are not serializable, convert to string for key, value in output_dict.items(): if isinstance(value, enum.Enum): output_dict[key] = value.value return ActionResult(is_done=True, success=params.success, extracted_content=json.dumps(output_dict)) else: @self.registry.action( 'Complete task - with return text and if the task is finished (success=True) or not yet completely finished (success=False), because last step is reached', param_model=DoneAction, ) async def done(params: DoneAction): return ActionResult(is_done=True, success=params.success, extracted_content=params.text) # Basic Navigation Actions @self.registry.action( 'Search the query in Google in the current tab, the query should be a search query like humans search in Google, concrete and not vague or super long. More the single most important items. ', param_model=SearchGoogleAction, ) async def search_google(params: SearchGoogleAction, browser: BrowserContext): page = await browser.get_current_page() await page.goto(f'https://www.google.com/search?q={params.query}&udm=14') await page.wait_for_load_state() msg = f'🔍 Searched for "{params.query}" in Google' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) @self.registry.action('Navigate to URL in the current tab', param_model=GoToUrlAction) async def go_to_url(params: GoToUrlAction, browser: BrowserContext): page = await browser.get_current_page() await page.goto(params.url) await page.wait_for_load_state() msg = f'🔗 Navigated to {params.url}' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) @self.registry.action('Go back', param_model=NoParamsAction) async def go_back(_: NoParamsAction, browser: BrowserContext): await browser.go_back() msg = '🔙 Navigated back' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) # wait for x seconds @self.registry.action('Wait for x seconds default 3') async def wait(seconds: int = 3): msg = f'🕒 Waiting for {seconds} seconds' logger.info(msg) await asyncio.sleep(seconds) return ActionResult(extracted_content=msg, include_in_memory=True) @self.registry.action('Wait for element to be visible', param_model=WaitForElementAction) async def wait_for_element(params: WaitForElementAction, browser: BrowserContext): """Waits for the element specified by the CSS selector to become visible within the given timeout.""" try: await browser.wait_for_element(params.selector, params.timeout) msg = f'👀 Element with selector "{params.selector}" became visible within {params.timeout}ms.' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) except Exception as e: err_msg = f'❌ Failed to wait for element "{params.selector}" within {params.timeout}ms: {str(e)}' logger.error(err_msg) raise Exception(err_msg) # Element Interaction Actions @self.registry.action('Click element by index', param_model=ClickElementAction) async def click_element_by_index(params: ClickElementAction, browser: BrowserContext): session = await browser.get_session() if params.index not in await browser.get_selector_map(): raise Exception(f'Element with index {params.index} does not exist - retry or use alternative actions') element_node = await browser.get_dom_element_by_index(params.index) initial_pages = len(session.context.pages) # if element has file uploader then dont click if await browser.is_file_uploader(element_node): msg = f'Index {params.index} - has an element which opens file upload dialog. To upload files please use a specific function to upload files ' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) msg = None try: download_path = await browser._click_element_node(element_node) if download_path: msg = f'💾 Downloaded file to {download_path}' else: msg = f'🖱️ Clicked button with index {params.index}: {element_node.get_all_text_till_next_clickable_element(max_depth=2)}' logger.info(msg) logger.debug(f'Element xpath: {element_node.xpath}') if len(session.context.pages) > initial_pages: new_tab_msg = 'New tab opened - switching to it' msg += f' - {new_tab_msg}' logger.info(new_tab_msg) await browser.switch_to_tab(-1) return ActionResult(extracted_content=msg, include_in_memory=True) except Exception as e: logger.warning(f'Element not clickable with index {params.index} - most likely the page changed') return ActionResult(error=str(e)) @self.registry.action('Click element by selector', param_model=ClickElementBySelectorAction) async def click_element_by_selector(params: ClickElementBySelectorAction, browser: BrowserContext): try: element_node = await browser.get_locate_element_by_css_selector(params.css_selector) if element_node: try: await element_node.scroll_into_view_if_needed() await element_node.click(timeout=1500, force=True) except Exception: try: # Handle with js evaluate if fails to click using playwright await element_node.evaluate('el => el.click()') except Exception as e: logger.warning(f"Element not clickable with css selector '{params.css_selector}' - {e}") return ActionResult(error=str(e)) msg = f'🖱️ Clicked on element with text "{params.css_selector}"' return ActionResult(extracted_content=msg, include_in_memory=True) except Exception as e: logger.warning(f'Element not clickable with selector {params.css_selector} - most likely the page changed') return ActionResult(error=str(e)) @self.registry.action('Click on element by xpath', param_model=ClickElementByXpathAction) async def click_element_by_xpath(params: ClickElementByXpathAction, browser: BrowserContext): try: element_node = await browser.get_locate_element_by_xpath(params.xpath) if element_node: try: await element_node.scroll_into_view_if_needed() await element_node.click(timeout=1500, force=True) except Exception: try: # Handle with js evaluate if fails to click using playwright await element_node.evaluate('el => el.click()') except Exception as e: logger.warning(f"Element not clickable with xpath '{params.xpath}' - {e}") return ActionResult(error=str(e)) msg = f'🖱️ Clicked on element with text "{params.xpath}"' return ActionResult(extracted_content=msg, include_in_memory=True) except Exception as e: logger.warning(f'Element not clickable with xpath {params.xpath} - most likely the page changed') return ActionResult(error=str(e)) @self.registry.action('Click element with text', param_model=ClickElementByTextAction) async def click_element_by_text(params: ClickElementByTextAction, browser: BrowserContext): try: element_node = await browser.get_locate_element_by_text( text=params.text, nth=params.nth, element_type=params.element_type ) if element_node: try: await element_node.scroll_into_view_if_needed() await element_node.click(timeout=1500, force=True) except Exception: try: # Handle with js evaluate if fails to click using playwright await element_node.evaluate('el => el.click()') except Exception as e: logger.warning(f"Element not clickable with text '{params.text}' - {e}") return ActionResult(error=str(e)) msg = f'🖱️ Clicked on element with text "{params.text}"' return ActionResult(extracted_content=msg, include_in_memory=True) else: return ActionResult(error=f"No element found for text '{params.text}'") except Exception as e: logger.warning(f"Element not clickable with text '{params.text}' - {e}") return ActionResult(error=str(e)) @self.registry.action( 'Input text into a input interactive element', param_model=InputTextAction, ) async def input_text(params: InputTextAction, browser: BrowserContext, has_sensitive_data: bool = False): if params.index not in await browser.get_selector_map(): raise Exception(f'Element index {params.index} does not exist - retry or use alternative actions') element_node = await browser.get_dom_element_by_index(params.index) await browser._input_text_element_node(element_node, params.text) if not has_sensitive_data: msg = f'⌨️ Input {params.text} into index {params.index}' else: msg = f'⌨️ Input sensitive data into index {params.index}' logger.info(msg) logger.debug(f'Element xpath: {element_node.xpath}') return ActionResult(extracted_content=msg, include_in_memory=True) # Save PDF @self.registry.action( 'Save the current page as a PDF file', ) async def save_pdf(browser: BrowserContext): page = await browser.get_current_page() short_url = re.sub(r'^https?://(?:www\.)?|/$', '', page.url) slug = re.sub(r'[^a-zA-Z0-9]+', '-', short_url).strip('-').lower() sanitized_filename = f'{slug}.pdf' await page.emulate_media('screen') await page.pdf(path=sanitized_filename, format='A4', print_background=False) msg = f'Saving page with URL {page.url} as PDF to ./{sanitized_filename}' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) # Tab Management Actions @self.registry.action('Switch tab', param_model=SwitchTabAction) async def switch_tab(params: SwitchTabAction, browser: BrowserContext): await browser.switch_to_tab(params.page_id) # Wait for tab to be ready page = await browser.get_current_page() await page.wait_for_load_state() msg = f'🔄 Switched to tab {params.page_id}' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) @self.registry.action('Open url in new tab', param_model=OpenTabAction) async def open_tab(params: OpenTabAction, browser: BrowserContext): await browser.create_new_tab(params.url) msg = f'🔗 Opened new tab with {params.url}' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) @self.registry.action('Close an existing tab', param_model=CloseTabAction) async def close_tab(params: CloseTabAction, browser: BrowserContext): await browser.switch_to_tab(params.page_id) page = await browser.get_current_page() url = page.url await page.close() msg = f'❌ Closed tab #{params.page_id} with url {url}' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) # Content Actions @self.registry.action( 'Extract page content to retrieve specific information from the page, e.g. all company names, a specific description, all information about, links with companies in structured format or simply links', ) async def extract_content( goal: str, should_strip_link_urls: bool, browser: BrowserContext, page_extraction_llm: BaseChatModel ): page = await browser.get_current_page() import markdownify strip = [] if should_strip_link_urls: strip = ['a', 'img'] content = markdownify.markdownify(await page.content(), strip=strip) # manually append iframe text into the content so it's readable by the LLM (includes cross-origin iframes) for iframe in page.frames: if iframe.url != page.url and not iframe.url.startswith('data:'): content += f'\n\nIFRAME {iframe.url}:\n' content += markdownify.markdownify(await iframe.content()) prompt = 'Your task is to extract the content of the page. You will be given a page and a goal and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. Extraction goal: {goal}, Page: {page}' template = PromptTemplate(input_variables=['goal', 'page'], template=prompt) try: output = page_extraction_llm.invoke(template.format(goal=goal, page=content)) msg = f'📄 Extracted from page\n: {output.content}\n' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) except Exception as e: logger.debug(f'Error extracting content: {e}') msg = f'📄 Extracted from page\n: {content}\n' logger.info(msg) return ActionResult(extracted_content=msg) # HTML Download @self.registry.action( 'Save the raw HTML content of the current page to a local file', param_model=NoParamsAction, ) async def save_html_to_file(_: NoParamsAction, browser: BrowserContext) -> ActionResult: """Retrieves and returns the full HTML content of the current page to a file""" try: page = await browser.get_current_page() html_content = await page.content() # Create a filename based on the page URL short_url = re.sub(r'^https?://(?:www\.)?|/$', '', page.url) slug = re.sub(r'[^a-zA-Z0-9]+', '-', short_url).strip('-').lower()[:64] timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') sanitized_filename = f'{slug}_{timestamp}.html' # Save HTML to file with open(sanitized_filename, 'w', encoding='utf-8') as f: f.write(html_content) msg = f'Saved HTML content of page with URL {page.url} to ./{sanitized_filename}' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) except Exception as e: error_msg = f'Failed to save HTML content: {str(e)}' logger.error(error_msg) return ActionResult(error=error_msg, extracted_content='') @self.registry.action( 'Scroll down the page by pixel amount - if no amount is specified, scroll down one page', param_model=ScrollAction, ) async def scroll_down(params: ScrollAction, browser: BrowserContext): page = await browser.get_current_page() if params.amount is not None: await page.evaluate(f'window.scrollBy(0, {params.amount});') else: await page.evaluate('window.scrollBy(0, window.innerHeight);') amount = f'{params.amount} pixels' if params.amount is not None else 'one page' msg = f'🔍 Scrolled down the page by {amount}' logger.info(msg) return ActionResult( extracted_content=msg, include_in_memory=True, ) # scroll up @self.registry.action( 'Scroll up the page by pixel amount - if no amount is specified, scroll up one page', param_model=ScrollAction, ) async def scroll_up(params: ScrollAction, browser: BrowserContext): page = await browser.get_current_page() if params.amount is not None: await page.evaluate(f'window.scrollBy(0, -{params.amount});') else: await page.evaluate('window.scrollBy(0, -window.innerHeight);') amount = f'{params.amount} pixels' if params.amount is not None else 'one page' msg = f'🔍 Scrolled up the page by {amount}' logger.info(msg) return ActionResult( extracted_content=msg, include_in_memory=True, ) # send keys @self.registry.action( 'Send strings of special keys like Escape,Backspace, Insert, PageDown, Delete, Enter, Shortcuts such as `Control+o`, `Control+Shift+T` are supported as well. This gets used in keyboard.press. ', param_model=SendKeysAction, ) async def send_keys(params: SendKeysAction, browser: BrowserContext): page = await browser.get_current_page() try: await page.keyboard.press(params.keys) except Exception as e: if 'Unknown key' in str(e): # loop over the keys and try to send each one for key in params.keys: try: await page.keyboard.press(key) except Exception as e: logger.debug(f'Error sending key {key}: {str(e)}') raise e else: raise e msg = f'⌨️ Sent keys: {params.keys}' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) @self.registry.action( description='If you dont find something which you want to interact with, scroll to it', ) async def scroll_to_text(text: str, browser: BrowserContext): # type: ignore page = await browser.get_current_page() try: # Try different locator strategies locators = [ page.get_by_text(text, exact=False), page.locator(f'text={text}'), page.locator(f"//*[contains(text(), '{text}')]"), ] for locator in locators: try: # First check if element exists and is visible if await locator.count() > 0 and await locator.first.is_visible(): await locator.first.scroll_into_view_if_needed() await asyncio.sleep(0.5) # Wait for scroll to complete msg = f'🔍 Scrolled to text: {text}' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) except Exception as e: logger.debug(f'Locator attempt failed: {str(e)}') continue msg = f"Text '{text}' not found or not visible on page" logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) except Exception as e: msg = f"Failed to scroll to text '{text}': {str(e)}" logger.error(msg) return ActionResult(error=msg, include_in_memory=True) @self.registry.action( description='Get all options from a native dropdown', ) async def get_dropdown_options(index: int, browser: BrowserContext) -> ActionResult: """Get all options from a native dropdown""" page = await browser.get_current_page() selector_map = await browser.get_selector_map() dom_element = selector_map[index] try: # Frame-aware approach since we know it works all_options = [] frame_index = 0 for frame in page.frames: try: options = await frame.evaluate( """ (xpath) => { const select = document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue; if (!select) return null; return { options: Array.from(select.options).map(opt => ({ text: opt.text, //do not trim, because we are doing exact match in select_dropdown_option value: opt.value, index: opt.index })), id: select.id, name: select.name }; } """, dom_element.xpath, ) if options: logger.debug(f'Found dropdown in frame {frame_index}') logger.debug(f'Dropdown ID: {options["id"]}, Name: {options["name"]}') formatted_options = [] for opt in options['options']: # encoding ensures AI uses the exact string in select_dropdown_option encoded_text = json.dumps(opt['text']) formatted_options.append(f'{opt["index"]}: text={encoded_text}') all_options.extend(formatted_options) except Exception as frame_e: logger.debug(f'Frame {frame_index} evaluation failed: {str(frame_e)}') frame_index += 1 if all_options: msg = '\n'.join(all_options) msg += '\nUse the exact text string in select_dropdown_option' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) else: msg = 'No options found in any frame for dropdown' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) except Exception as e: logger.error(f'Failed to get dropdown options: {str(e)}') msg = f'Error getting options: {str(e)}' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) @self.registry.action( description='Select dropdown option for interactive element index by the text of the option you want to select', ) async def select_dropdown_option( index: int, text: str, browser: BrowserContext, ) -> ActionResult: """Select dropdown option by the text of the option you want to select""" page = await browser.get_current_page() selector_map = await browser.get_selector_map() dom_element = selector_map[index] # Validate that we're working with a select element if dom_element.tag_name != 'select': logger.error(f'Element is not a select! Tag: {dom_element.tag_name}, Attributes: {dom_element.attributes}') msg = f'Cannot select option: Element with index {index} is a {dom_element.tag_name}, not a select' return ActionResult(extracted_content=msg, include_in_memory=True) logger.debug(f"Attempting to select '{text}' using xpath: {dom_element.xpath}") logger.debug(f'Element attributes: {dom_element.attributes}') logger.debug(f'Element tag: {dom_element.tag_name}') xpath = '//' + dom_element.xpath try: frame_index = 0 for frame in page.frames: try: logger.debug(f'Trying frame {frame_index} URL: {frame.url}') # First verify we can find the dropdown in this frame find_dropdown_js = """ (xpath) => { try { const select = document.evaluate(xpath, document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue; if (!select) return null; if (select.tagName.toLowerCase() !== 'select') { return { error: `Found element but it's a ${select.tagName}, not a SELECT`, found: false }; } return { id: select.id, name: select.name, found: true, tagName: select.tagName, optionCount: select.options.length, currentValue: select.value, availableOptions: Array.from(select.options).map(o => o.text.trim()) }; } catch (e) { return {error: e.toString(), found: false}; } } """ dropdown_info = await frame.evaluate(find_dropdown_js, dom_element.xpath) if dropdown_info: if not dropdown_info.get('found'): logger.error(f'Frame {frame_index} error: {dropdown_info.get("error")}') continue logger.debug(f'Found dropdown in frame {frame_index}: {dropdown_info}') # "label" because we are selecting by text # nth(0) to disable error thrown by strict mode # timeout=1000 because we are already waiting for all network events, therefore ideally we don't need to wait a lot here (default 30s) selected_option_values = ( await frame.locator('//' + dom_element.xpath).nth(0).select_option(label=text, timeout=1000) ) msg = f'selected option {text} with value {selected_option_values}' logger.info(msg + f' in frame {frame_index}') return ActionResult(extracted_content=msg, include_in_memory=True) except Exception as frame_e: logger.error(f'Frame {frame_index} attempt failed: {str(frame_e)}') logger.error(f'Frame type: {type(frame)}') logger.error(f'Frame URL: {frame.url}') frame_index += 1 msg = f"Could not select option '{text}' in any frame" logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) except Exception as e: msg = f'Selection failed: {str(e)}' logger.error(msg) return ActionResult(error=msg, include_in_memory=True) @self.registry.action( 'Drag and drop elements or between coordinates on the page - useful for canvas drawing, sortable lists, sliders, file uploads, and UI rearrangement', param_model=DragDropAction, ) async def drag_drop(params: DragDropAction, browser: BrowserContext) -> ActionResult: """ Performs a precise drag and drop operation between elements or coordinates. """ async def get_drag_elements( page: Page, source_selector: str, target_selector: str, ) -> Tuple[Optional[ElementHandle], Optional[ElementHandle]]: """Get source and target elements with appropriate error handling.""" source_element = None target_element = None try: # page.locator() auto-detects CSS and XPath source_locator = page.locator(source_selector) target_locator = page.locator(target_selector) # Check if elements exist source_count = await source_locator.count() target_count = await target_locator.count() if source_count > 0: source_element = await source_locator.first.element_handle() logger.debug(f'Found source element with selector: {source_selector}') else: logger.warning(f'Source element not found: {source_selector}') if target_count > 0: target_element = await target_locator.first.element_handle() logger.debug(f'Found target element with selector: {target_selector}') else: logger.warning(f'Target element not found: {target_selector}') except Exception as e: logger.error(f'Error finding elements: {str(e)}') return source_element, target_element async def get_element_coordinates( source_element: ElementHandle, target_element: ElementHandle, source_position: Optional[Position], target_position: Optional[Position], ) -> Tuple[Optional[Tuple[int, int]], Optional[Tuple[int, int]]]: """Get coordinates from elements with appropriate error handling.""" source_coords = None target_coords = None try: # Get source coordinates if source_position: source_coords = (source_position.x, source_position.y) else: source_box = await source_element.bounding_box() if source_box: source_coords = ( int(source_box['x'] + source_box['width'] / 2), int(source_box['y'] + source_box['height'] / 2), ) # Get target coordinates if target_position: target_coords = (target_position.x, target_position.y) else: target_box = await target_element.bounding_box() if target_box: target_coords = ( int(target_box['x'] + target_box['width'] / 2), int(target_box['y'] + target_box['height'] / 2), ) except Exception as e: logger.error(f'Error getting element coordinates: {str(e)}') return source_coords, target_coords async def execute_drag_operation( page: Page, source_x: int, source_y: int, target_x: int, target_y: int, steps: int, delay_ms: int, ) -> Tuple[bool, str]: """Execute the drag operation with comprehensive error handling.""" try: # Try to move to source position try: await page.mouse.move(source_x, source_y) logger.debug(f'Moved to source position ({source_x}, {source_y})') except Exception as e: logger.error(f'Failed to move to source position: {str(e)}') return False, f'Failed to move to source position: {str(e)}' # Press mouse button down await page.mouse.down() # Move to target position with intermediate steps for i in range(1, steps + 1): ratio = i / steps intermediate_x = int(source_x + (target_x - source_x) * ratio) intermediate_y = int(source_y + (target_y - source_y) * ratio) await page.mouse.move(intermediate_x, intermediate_y) if delay_ms > 0: await asyncio.sleep(delay_ms / 1000) # Move to final target position await page.mouse.move(target_x, target_y) # Move again to ensure dragover events are properly triggered await page.mouse.move(target_x, target_y) # Release mouse button await page.mouse.up() return True, 'Drag operation completed successfully' except Exception as e: return False, f'Error during drag operation: {str(e)}' page = await browser.get_current_page() try: # Initialize variables source_x: Optional[int] = None source_y: Optional[int] = None target_x: Optional[int] = None target_y: Optional[int] = None # Normalize parameters steps = max(1, params.steps or 10) delay_ms = max(0, params.delay_ms or 5) # Case 1: Element selectors provided if params.element_source and params.element_target: logger.debug('Using element-based approach with selectors') source_element, target_element = await get_drag_elements( page, params.element_source, params.element_target, ) if not source_element or not target_element: error_msg = f'Failed to find {"source" if not source_element else "target"} element' return ActionResult(error=error_msg, include_in_memory=True) source_coords, target_coords = await get_element_coordinates( source_element, target_element, params.element_source_offset, params.element_target_offset ) if not source_coords or not target_coords: error_msg = f'Failed to determine {"source" if not source_coords else "target"} coordinates' return ActionResult(error=error_msg, include_in_memory=True) source_x, source_y = source_coords target_x, target_y = target_coords # Case 2: Coordinates provided directly elif all( coord is not None for coord in [params.coord_source_x, params.coord_source_y, params.coord_target_x, params.coord_target_y] ): logger.debug('Using coordinate-based approach') source_x = params.coord_source_x source_y = params.coord_source_y target_x = params.coord_target_x target_y = params.coord_target_y else: error_msg = 'Must provide either source/target selectors or source/target coordinates' return ActionResult(error=error_msg, include_in_memory=True) # Validate coordinates if any(coord is None for coord in [source_x, source_y, target_x, target_y]): error_msg = 'Failed to determine source or target coordinates' return ActionResult(error=error_msg, include_in_memory=True) # Perform the drag operation success, message = await execute_drag_operation( page, cast(int, source_x), cast(int, source_y), cast(int, target_x), cast(int, target_y), steps, delay_ms, ) if not success: logger.error(f'Drag operation failed: {message}') return ActionResult(error=message, include_in_memory=True) # Create descriptive message if params.element_source and params.element_target: msg = f"🖱️ Dragged element '{params.element_source}' to '{params.element_target}'" else: msg = f'🖱️ Dragged from ({source_x}, {source_y}) to ({target_x}, {target_y})' logger.info(msg) return ActionResult(extracted_content=msg, include_in_memory=True) except Exception as e: error_msg = f'Failed to perform drag and drop: {str(e)}' logger.error(error_msg) return ActionResult(error=error_msg, include_in_memory=True) # Register --------------------------------------------------------------- def action(self, description: str, **kwargs): """Decorator for registering custom actions @param description: Describe the LLM what the function does (better description == better function calling) """ return self.registry.action(description, **kwargs) # Act -------------------------------------------------------------------- @time_execution_sync('--act') async def act( self, action: ActionModel, browser_context: BrowserContext, # page_extraction_llm: Optional[BaseChatModel] = None, sensitive_data: Optional[Dict[str, str]] = None, available_file_paths: Optional[list[str]] = None, # context: Context | None = None, ) -> ActionResult: """Execute an action""" try: for action_name, params in action.model_dump(exclude_unset=True).items(): if params is not None: # with Laminar.start_as_current_span( # name=action_name, # input={ # 'action': action_name, # 'params': params, # }, # span_type='TOOL', # ): result = await self.registry.execute_action( action_name, params, browser=browser_context, page_extraction_llm=page_extraction_llm, sensitive_data=sensitive_data, available_file_paths=available_file_paths, context=context, ) # Laminar.set_span_output(result) if isinstance(result, str): return ActionResult(extracted_content=result) elif isinstance(result, ActionResult): return result elif result is None: return ActionResult() else: raise ValueError(f'Invalid action result type: {type(result)} of {result}') return ActionResult() except Exception as e: raise e