custom-functions examples

This commit is contained in:
Magnus Müller
2025-08-18 18:51:58 -07:00
parent 4f5932b5a8
commit 1d34d2ed22
10 changed files with 39 additions and 1138 deletions

View File

@@ -56,7 +56,7 @@ async def main():
You are completely FORBIDDEN to use any other method to get the 2FA code.
"""
model = ChatOpenAI(model='gpt-4.1')
model = ChatOpenAI(model='gpt-4.1-mini')
agent = Agent(task=task, llm=model, controller=controller)
result = await agent.run()

View File

@@ -1,59 +0,0 @@
import asyncio
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from dotenv import load_dotenv
load_dotenv()
import pyperclip
from browser_use import Agent, Controller
from browser_use.agent.views import ActionResult
from browser_use.browser import BrowserProfile, BrowserSession
from browser_use.browser.types import Page
from browser_use.llm import ChatOpenAI
browser_profile = BrowserProfile(
headless=False,
)
controller = Controller()
@controller.registry.action('Copy text to clipboard')
def copy_to_clipboard(text: str):
pyperclip.copy(text)
return ActionResult(extracted_content=text)
@controller.registry.action('Paste text from clipboard')
async def paste_from_clipboard(page: Page):
text = pyperclip.paste()
# send text to browser
await page.keyboard.type(text)
return ActionResult(extracted_content=text)
async def main():
task = 'Copy the text "Hello, world!" to the clipboard, then go to google.com and paste the text'
model = ChatOpenAI(model='gpt-4.1')
browser_session = BrowserSession(browser_profile=browser_profile)
await browser_session.start()
agent = Agent(
task=task,
llm=model,
controller=controller,
browser_session=browser_session,
)
await agent.run()
await browser_session.kill()
input('Press Enter to close...')
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -1,235 +0,0 @@
"""
Description: These Python modules are designed to capture detailed
browser usage datafor analysis, with both server and client
components working together to record and store the information.
Author: Carlos A. Planchón
https://github.com/carlosplanchon/
Adapt this code to your needs.
Feedback is appreciated!
"""
#####################
# #
# --- UTILS --- #
# #
#####################
import base64
def b64_to_png(b64_string: str, output_file):
"""
Convert a Base64-encoded string to a PNG file.
:param b64_string: A string containing Base64-encoded data
:param output_file: The path to the output PNG file
"""
with open(output_file, 'wb') as f:
f.write(base64.b64decode(b64_string))
###################################################################
# #
# --- FASTAPI API TO RECORD AND SAVE Browser-Use ACTIVITY --- #
# #
###################################################################
# Save to api.py and run with `python api.py`
# ! pip install uvicorn
# ! pip install fastapi
# ! pip install prettyprinter
import json
from pathlib import Path
import prettyprinter # type: ignore
from fastapi import FastAPI, Request
prettyprinter.install_extras()
app = FastAPI()
@app.post('/post_agent_history_step')
async def post_agent_history_step(request: Request):
data = await request.json()
prettyprinter.cpprint(data)
# Ensure the "recordings" folder exists using pathlib
recordings_folder = Path('recordings')
recordings_folder.mkdir(exist_ok=True)
# Determine the next file number by examining existing .json files
existing_numbers = []
for item in recordings_folder.iterdir():
if item.is_file() and item.suffix == '.json':
try:
file_num = int(item.stem)
existing_numbers.append(file_num)
except ValueError:
# In case the file name isn't just a number
...
if existing_numbers:
next_number = max(existing_numbers) + 1
else:
next_number = 1
# Construct the file path
file_path = recordings_folder / f'{next_number}.json'
# Save the JSON data to the file
with file_path.open('w') as f:
json.dump(data, f, indent=2)
return {'status': 'ok', 'message': f'Saved to {file_path}'}
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=9000)
##############################################################
# #
# --- CLIENT TO RECORD AND SAVE Browser-Use ACTIVITY --- #
# #
##############################################################
"""
pyobjtojson:
A Python library to safely and recursively serialize any Python object
(including Pydantic models and dataclasses) into JSON-ready structures,
gracefully handling circular references.
"""
# ! pip install -U pyobjtojson
# ! pip install -U prettyprinter
import asyncio
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from dotenv import load_dotenv
load_dotenv()
import requests
from pyobjtojson import obj_to_json # type: ignore
from browser_use import Agent
from browser_use.llm import ChatOpenAI
# import prettyprinter
# prettyprinter.install_extras()
def send_agent_history_step(data):
url = 'http://127.0.0.1:9000/post_agent_history_step'
response = requests.post(url, json=data)
return response.json()
async def record_activity(agent_obj):
website_html = None
website_screenshot = None
urls_json_last_elem = None
model_thoughts_last_elem = None
model_outputs_json_last_elem = None
model_actions_json_last_elem = None
extracted_content_json_last_elem = None
print('--- ON_STEP_START HOOK ---')
website_html = await agent_obj.browser_context.get_page_html()
website_screenshot = await agent_obj.browser_context.take_screenshot()
print('--> History:')
# Assert agent has state to satisfy type checker
assert hasattr(agent_obj, 'state'), 'Agent must have state attribute'
history = agent_obj.history
model_thoughts = obj_to_json(obj=history.model_thoughts(), check_circular=False)
# print("--- MODEL THOUGHTS ---")
if len(model_thoughts) > 0:
model_thoughts_last_elem = model_thoughts[-1]
# prettyprinter.cpprint(model_thoughts_last_elem)
# print("--- MODEL OUTPUT ACTION ---")
model_outputs = agent_obj.history.model_outputs()
model_outputs_json = obj_to_json(obj=model_outputs, check_circular=False)
if len(model_outputs_json) > 0:
model_outputs_json_last_elem = model_outputs_json[-1]
# prettyprinter.cpprint(model_outputs_json_last_elem)
# print("--- MODEL INTERACTED ELEM ---")
model_actions = agent_obj.history.model_actions()
model_actions_json = obj_to_json(obj=model_actions, check_circular=False)
if len(model_actions_json) > 0:
model_actions_json_last_elem = model_actions_json[-1]
# prettyprinter.cpprint(model_actions_json_last_elem)
# print("--- EXTRACTED CONTENT ---")
extracted_content = agent_obj.history.extracted_content()
extracted_content_json = obj_to_json(obj=extracted_content, check_circular=False)
if len(extracted_content_json) > 0:
extracted_content_json_last_elem = extracted_content_json[-1]
# prettyprinter.cpprint(extracted_content_json_last_elem)
# print("--- URLS ---")
urls = agent_obj.history.urls()
# prettyprinter.cpprint(urls)
urls_json = obj_to_json(obj=urls, check_circular=False)
if len(urls_json) > 0:
urls_json_last_elem = urls_json[-1]
# prettyprinter.cpprint(urls_json_last_elem)
model_step_summary = {
'website_html': website_html,
'website_screenshot': website_screenshot,
'url': urls_json_last_elem,
'model_thoughts': model_thoughts_last_elem,
'model_outputs': model_outputs_json_last_elem,
'model_actions': model_actions_json_last_elem,
'extracted_content': extracted_content_json_last_elem,
}
print('--- MODEL STEP SUMMARY ---')
# prettyprinter.cpprint(model_step_summary)
send_agent_history_step(data=model_step_summary)
# response = send_agent_history_step(data=history)
# print(response)
# print("--> Website HTML:")
# print(website_html[:200])
# print("--> Website Screenshot:")
# print(website_screenshot[:200])
agent = Agent(
task='Compare the price of gpt-4.1 and DeepSeek-V3',
llm=ChatOpenAI(model='gpt-4.1'),
)
async def run_agent():
try:
await agent.run(on_step_start=record_activity, max_steps=30)
except Exception as e:
print(e)
asyncio.run(run_agent())

View File

@@ -1,320 +0,0 @@
"""
Drag and Drop Custom Action Example
This example demonstrates how to implement drag and drop functionality as a custom action.
The drag and drop action supports both element-based and coordinate-based operations,
making it useful for canvas drawing, sortable lists, sliders, file uploads, and UI rearrangement.
"""
import asyncio
from typing import cast
from pydantic import BaseModel, Field
from browser_use import ActionResult, Agent, Controller
from browser_use.browser.types import ElementHandle, Page
from browser_use.llm import ChatOpenAI
class Position(BaseModel):
"""Represents a position with x and y coordinates."""
x: int = Field(..., description='X coordinate')
y: int = Field(..., description='Y coordinate')
class DragDropAction(BaseModel):
"""Parameters for drag and drop operations."""
# Element-based approach
element_source: str | None = Field(None, description='CSS selector or XPath for the source element to drag')
element_target: str | None = Field(None, description='CSS selector or XPath for the target element to drop on')
element_source_offset: Position | None = Field(None, description='Optional offset from source element center (x, y)')
element_target_offset: Position | None = Field(None, description='Optional offset from target element center (x, y)')
# Coordinate-based approach
coord_source_x: int | None = Field(None, description='Source X coordinate for drag start')
coord_source_y: int | None = Field(None, description='Source Y coordinate for drag start')
coord_target_x: int | None = Field(None, description='Target X coordinate for drag end')
coord_target_y: int | None = Field(None, description='Target Y coordinate for drag end')
# Operation parameters
steps: int | None = Field(10, description='Number of intermediate steps during drag (default: 10)')
delay_ms: int | None = Field(5, description='Delay in milliseconds between steps (default: 5)')
async def create_drag_drop_controller() -> Controller:
"""Create a controller with drag and drop functionality."""
controller = Controller()
@controller.registry.action(
'Drag and drop elements or between coordinates on the page - useful for canvas drawing, sortable lists, sliders, file uploads, and UI rearrangement',
param_model=DragDropAction,
)
async def drag_drop(params: DragDropAction, page: Page) -> ActionResult:
"""
Performs a precise drag and drop operation between elements or coordinates.
"""
async def get_drag_elements(
page: Page,
source_selector: str,
target_selector: str,
) -> tuple[ElementHandle | None, ElementHandle | None]:
"""Get source and target elements with appropriate error handling."""
source_element = None
target_element = None
try:
# page.locator() auto-detects CSS and XPath
source_locator = page.locator(source_selector)
target_locator = page.locator(target_selector)
# Check if elements exist
source_count = await source_locator.count()
target_count = await target_locator.count()
if source_count > 0:
source_element = await source_locator.first.element_handle()
print(f'Found source element with selector: {source_selector}')
else:
print(f'Source element not found: {source_selector}')
if target_count > 0:
target_element = await target_locator.first.element_handle()
print(f'Found target element with selector: {target_selector}')
else:
print(f'Target element not found: {target_selector}')
except Exception as e:
print(f'Error finding elements: {str(e)}')
return source_element, target_element
async def get_element_coordinates(
source_element: ElementHandle,
target_element: ElementHandle,
source_position: Position | None,
target_position: Position | None,
) -> tuple[tuple[int, int] | None, tuple[int, int] | None]:
"""Get coordinates from elements with appropriate error handling."""
source_coords = None
target_coords = None
try:
# Get source coordinates
if source_position:
source_coords = (source_position.x, source_position.y)
else:
source_box = await source_element.bounding_box()
if source_box:
source_coords = (
int(source_box['x'] + source_box['width'] / 2),
int(source_box['y'] + source_box['height'] / 2),
)
# Get target coordinates
if target_position:
target_coords = (target_position.x, target_position.y)
else:
target_box = await target_element.bounding_box()
if target_box:
target_coords = (
int(target_box['x'] + target_box['width'] / 2),
int(target_box['y'] + target_box['height'] / 2),
)
except Exception as e:
print(f'Error getting element coordinates: {str(e)}')
return source_coords, target_coords
async def execute_drag_operation(
page: Page,
source_x: int,
source_y: int,
target_x: int,
target_y: int,
steps: int,
delay_ms: int,
) -> tuple[bool, str]:
"""Execute the drag operation with comprehensive error handling."""
try:
# Try to move to source position
try:
await page.mouse.move(source_x, source_y)
print(f'Moved to source position ({source_x}, {source_y})')
except Exception as e:
print(f'Failed to move to source position: {str(e)}')
return False, f'Failed to move to source position: {str(e)}'
# Press mouse button down
await page.mouse.down()
# Move to target position with intermediate steps
for i in range(1, steps + 1):
ratio = i / steps
intermediate_x = int(source_x + (target_x - source_x) * ratio)
intermediate_y = int(source_y + (target_y - source_y) * ratio)
await page.mouse.move(intermediate_x, intermediate_y)
if delay_ms > 0:
await asyncio.sleep(delay_ms / 1000)
# Move to final target position
await page.mouse.move(target_x, target_y)
# Move again to ensure dragover events are properly triggered
await page.mouse.move(target_x, target_y)
# Release mouse button
await page.mouse.up()
return True, 'Drag operation completed successfully'
except Exception as e:
return False, f'Error during drag operation: {str(e)}'
try:
# Initialize variables
source_x: int | None = None
source_y: int | None = None
target_x: int | None = None
target_y: int | None = None
# Normalize parameters
steps = max(1, params.steps or 10)
delay_ms = max(0, params.delay_ms or 5)
# Case 1: Element selectors provided
if params.element_source and params.element_target:
print('Using element-based approach with selectors')
source_element, target_element = await get_drag_elements(
page,
params.element_source,
params.element_target,
)
if not source_element or not target_element:
error_msg = f'Failed to find {"source" if not source_element else "target"} element'
return ActionResult(error=error_msg, include_in_memory=True)
source_coords, target_coords = await get_element_coordinates(
source_element, target_element, params.element_source_offset, params.element_target_offset
)
if not source_coords or not target_coords:
error_msg = f'Failed to determine {"source" if not source_coords else "target"} coordinates'
return ActionResult(error=error_msg, include_in_memory=True)
source_x, source_y = source_coords
target_x, target_y = target_coords
# Case 2: Coordinates provided directly
elif all(
coord is not None
for coord in [params.coord_source_x, params.coord_source_y, params.coord_target_x, params.coord_target_y]
):
print('Using coordinate-based approach')
source_x = params.coord_source_x
source_y = params.coord_source_y
target_x = params.coord_target_x
target_y = params.coord_target_y
else:
error_msg = 'Must provide either source/target selectors or source/target coordinates'
return ActionResult(error=error_msg, include_in_memory=True)
# Validate coordinates
if any(coord is None for coord in [source_x, source_y, target_x, target_y]):
error_msg = 'Failed to determine source or target coordinates'
return ActionResult(error=error_msg, include_in_memory=True)
# Perform the drag operation
success, message = await execute_drag_operation(
page,
cast(int, source_x),
cast(int, source_y),
cast(int, target_x),
cast(int, target_y),
steps,
delay_ms,
)
if not success:
print(f'Drag operation failed: {message}')
return ActionResult(error=message, include_in_memory=True)
# Create descriptive message
if params.element_source and params.element_target:
msg = f"🖱️ Dragged element '{params.element_source}' to '{params.element_target}'"
else:
msg = f'🖱️ Dragged from ({source_x}, {source_y}) to ({target_x}, {target_y})'
print(msg)
return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=msg)
except Exception as e:
error_msg = f'Failed to perform drag and drop: {str(e)}'
print(error_msg)
return ActionResult(error=error_msg, include_in_memory=True)
return controller
async def example_drag_drop_sortable_list():
"""Example: Drag and drop to reorder items in a sortable list."""
controller = await create_drag_drop_controller()
# Initialize LLM (replace with your preferred model)
llm = ChatOpenAI(model='gpt-4.1')
# Create the agent
agent = Agent(
task='Go to a drag and drop demo website and reorder some list items using drag and drop',
llm=llm,
controller=controller,
)
# Run the agent
print('🚀 Starting drag and drop example...')
history = await agent.run()
return history
async def example_drag_drop_coordinates():
"""Example: Direct coordinate-based drag and drop."""
controller = await create_drag_drop_controller()
llm = ChatOpenAI(model='gpt-4.1')
agent = Agent(
task='Go to a canvas drawing website and draw a simple line using drag and drop from coordinates (100, 100) to (300, 200)',
llm=llm,
controller=controller,
)
print('🎨 Starting coordinate-based drag and drop example...')
history = await agent.run()
return history
if __name__ == '__main__':
# Run different examples
print('Choose an example:')
print('1. Sortable list drag and drop')
print('2. Coordinate-based drawing')
choice = input('Enter choice (1-3): ').strip()
if choice == '1':
asyncio.run(example_drag_drop_sortable_list())
elif choice == '2':
asyncio.run(example_drag_drop_coordinates())
else:
print('Invalid choice, running sortable list example...')
asyncio.run(example_drag_drop_sortable_list())

View File

@@ -1,90 +0,0 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = ["browser-use", "mistralai"]
# ///
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from dotenv import load_dotenv
load_dotenv()
import asyncio
import logging
from mistralai import Mistral # type: ignore
from pydantic import BaseModel, Field
from browser_use import Agent, Controller
from browser_use.agent.views import ActionResult
from browser_use.browser import BrowserSession
from browser_use.llm import ChatOpenAI
if not os.getenv('OPENAI_API_KEY'):
raise ValueError('OPENAI_API_KEY is not set. Please add it to your environment variables.')
if not os.getenv('MISTRAL_API_KEY'):
raise ValueError('MISTRAL_API_KEY is not set. Please add it to your environment variables.')
logger = logging.getLogger(__name__)
controller = Controller()
class PdfExtractParams(BaseModel):
url: str = Field(description='URL to a PDF document')
@controller.registry.action(
'Extract PDF Text',
param_model=PdfExtractParams,
)
def extract_mistral_ocr(params: PdfExtractParams, browser: BrowserSession) -> ActionResult:
"""
Process a PDF URL using Mistral OCR API and return the OCR response.
Args:
url: URL to a PDF document
Returns:
OCR response object from Mistral API
"""
api_key = os.getenv('MISTRAL_API_KEY')
client = Mistral(api_key=api_key)
response = client.ocr.process(
model='mistral-ocr-latest',
document={
'type': 'document_url',
'document_url': params.url,
},
include_image_base64=False,
)
markdown = '\n\n'.join(f'### Page {i + 1}\n{response.pages[i].markdown}' for i in range(len(response.pages)))
return ActionResult(
extracted_content=markdown,
include_in_memory=False, ## PDF content can be very large, so we don't include it in memory
)
async def main():
agent = Agent(
task="""
Objective: Navigate to the following URL, extract its contents using the Extract PDF Text action, and explain its historical significance.
URL: https://docs.house.gov/meetings/GO/GO00/20220929/115171/HHRG-117-GO00-20220929-SD010.pdf
""",
llm=ChatOpenAI(model='gpt-4.1'),
controller=controller,
)
result = await agent.run()
logger.info(result)
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -1,167 +0,0 @@
"""
Example of implementing hover functionality for elements.
This shows how to hover over elements to trigger hover states and tooltips.
"""
import asyncio
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from dotenv import load_dotenv
from pydantic import BaseModel
load_dotenv()
from browser_use.agent.service import Agent, Controller
from browser_use.agent.views import ActionResult
from browser_use.browser import BrowserSession
from browser_use.llm import ChatOpenAI
# Initialize controller
controller = Controller()
class HoverAction(BaseModel):
"""Parameters for hover action"""
index: int | None = None
xpath: str | None = None
selector: str | None = None
@controller.registry.action(
'Hover over an element',
param_model=HoverAction, # Define this model with at least "index: int" field
)
async def hover_element(params: HoverAction, browser_session: BrowserSession):
"""
Hovers over the element specified by its index from the cached selector map or by XPath.
"""
try:
element_node = None
if params.xpath:
# Find element by XPath using CDP
cdp_session = await browser_session.get_or_create_cdp_session()
result = await cdp_session.cdp_client.send.Runtime.evaluate(
params={
'expression': f"""
(() => {{
const element = document.evaluate('{params.xpath}', document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
if (element) {{
const rect = element.getBoundingClientRect();
return {{found: true, x: rect.x + rect.width/2, y: rect.y + rect.height/2}};
}}
return {{found: false}};
}})()
""",
'returnByValue': True,
},
session_id=cdp_session.session_id,
)
element_info = result.get('result', {}).get('value', {})
if not element_info.get('found'):
raise Exception(f'Failed to locate element with XPath {params.xpath}')
x, y = element_info['x'], element_info['y']
elif params.selector:
# Find element by CSS selector using CDP
cdp_session = await browser_session.get_or_create_cdp_session()
result = await cdp_session.cdp_client.send.Runtime.evaluate(
params={
'expression': f"""
(() => {{
const element = document.querySelector('{params.selector}');
if (element) {{
const rect = element.getBoundingClientRect();
return {{found: true, x: rect.x + rect.width/2, y: rect.y + rect.height/2}};
}}
return {{found: false}};
}})()
""",
'returnByValue': True,
},
session_id=cdp_session.session_id,
)
element_info = result.get('result', {}).get('value', {})
if not element_info.get('found'):
raise Exception(f'Failed to locate element with CSS Selector {params.selector}')
x, y = element_info['x'], element_info['y']
elif params.index is not None:
# Use index to locate the element
selector_map = await browser_session.get_selector_map()
if params.index not in selector_map:
raise Exception(f'Element index {params.index} does not exist - retry or use alternative actions')
element_node = selector_map[params.index]
# Get element position
if not element_node.absolute_position:
raise Exception(f'Element at index {params.index} has no position information')
x = element_node.absolute_position.x + element_node.absolute_position.width / 2
y = element_node.absolute_position.y + element_node.absolute_position.height / 2
else:
raise Exception('Either index, xpath, or selector must be provided')
# Perform hover using CDP mouse events
cdp_session = await browser_session.get_or_create_cdp_session()
# Move mouse to the element position
await cdp_session.cdp_client.send.Input.dispatchMouseEvent(
params={
'type': 'mouseMoved',
'x': x,
'y': y,
},
session_id=cdp_session.session_id,
)
# Wait a bit for hover state to trigger
await asyncio.sleep(0.1)
msg = (
f'🖱️ Hovered over element at index {params.index}'
if params.index is not None
else f'🖱️ Hovered over element with XPath {params.xpath}'
if params.xpath
else f'🖱️ Hovered over element with selector {params.selector}'
)
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
error_msg = f'❌ Failed to hover over element: {str(e)}'
return ActionResult(error=error_msg)
async def main():
"""Main function to run the example"""
browser_session = BrowserSession()
await browser_session.start()
llm = ChatOpenAI(model='gpt-4.1')
# Create the agent with hover capability
agent = Agent(
task="""
Go to a website with hover interactions, like https://www.w3schools.com/howto/howto_css_dropdown.asp
Try hovering over the dropdown menu to see the dropdown items appear.
Then describe what happens when you hover.
""",
llm=llm,
browser_session=browser_session,
controller=controller,
)
# Run the agent
await agent.run(max_steps=10)
# Cleanup
await browser_session.kill()
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -1,91 +0,0 @@
import asyncio
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from dotenv import load_dotenv
load_dotenv()
import logging
from pydantic import BaseModel
from browser_use import ActionResult, Agent, Controller
from browser_use.browser.profile import BrowserProfile
from browser_use.llm import ChatOpenAI
logger = logging.getLogger(__name__)
class Person(BaseModel):
name: str
email: str | None = None
class PersonList(BaseModel):
people: list[Person]
PERPLEXITY_API_KEY = os.getenv('PERPLEXITY_API_KEY')
if not PERPLEXITY_API_KEY:
raise ValueError('PERPLEXITY_API_KEY is not set')
controller = Controller(exclude_actions=['search_google'], output_model=PersonList)
@controller.registry.action('Search the web for a specific query with perplexity')
async def search_web(query: str):
import httpx
url = 'https://api.perplexity.ai/chat/completions'
payload = {
'model': 'sonar',
'messages': [
{'role': 'system', 'content': 'Be precise and concise.'},
{'role': 'user', 'content': query},
],
}
headers = {'Authorization': f'Bearer {PERPLEXITY_API_KEY}', 'Content-Type': 'application/json'}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, headers=headers)
response.raise_for_status()
response_json = response.json()
content = response_json['choices'][0]['message']['content']
citations = response_json['citations']
output = f'{content}\n\nCitations:\n' + '\n'.join(citations)
logger.info(output)
return ActionResult(extracted_content=output, include_in_memory=True)
names = [
'Ruedi Aebersold',
'Bernd Bodenmiller',
'Eugene Demler',
]
async def main():
task = 'use search_web with "find email address of the following ETH professor:" for each of the persons. Finally return the list with name and email if provided '
task += '\n' + '\n'.join(names)
model = ChatOpenAI(model='gpt-4.1')
browser_profile = BrowserProfile()
agent = Agent(task=task, llm=model, controller=controller, browser_profile=browser_profile)
history = await agent.run()
result = history.final_result()
if result:
parsed: PersonList = PersonList.model_validate_json(result)
for person in parsed.people:
print(f'{person.name} - {person.email}')
else:
print('No result')
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -1,56 +0,0 @@
import asyncio
import os
import re
import sys
from pathlib import Path
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from dotenv import load_dotenv
load_dotenv()
from browser_use import ActionResult, Agent, Controller
from browser_use.browser.types import Page
from browser_use.llm import ChatOpenAI
# Initialize controller
controller = Controller()
download_path = Path.cwd() / 'downloads'
download_path.mkdir(parents=True, exist_ok=True)
# Save PDF - exact copy from original controller function
@controller.registry.action('Save the current page as a PDF file')
async def save_pdf(page: Page):
short_url = re.sub(r'^https?://(?:www\.)?|/$', '', page.url)
slug = re.sub(r'[^a-zA-Z0-9]+', '-', short_url).strip('-').lower()
sanitized_filename = f'{slug}.pdf'
await page.emulate_media(media='screen')
await page.pdf(path=download_path / sanitized_filename, format='A4', print_background=False)
msg = f'Saving page with URL {page.url} as PDF to {download_path / sanitized_filename}'
return ActionResult(extracted_content=msg, include_in_memory=True, long_term_memory=f'Saved PDF to {sanitized_filename}')
async def main():
"""
Example task: Navigate to browser-use.com and save the page as a PDF
"""
task = """
Go to https://browser-use.com/ and save the page as a PDF file.
"""
# Initialize the language model
model = ChatOpenAI(model='gpt-4.1-mini')
# Create and run the agent
agent = Agent(task=task, llm=model, controller=controller)
result = await agent.run()
print(f'🎯 Task completed: {result}')
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -1,119 +0,0 @@
import asyncio
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from dotenv import load_dotenv
load_dotenv()
from amazoncaptcha import AmazonCaptcha # type: ignore
from browser_use import ActionResult
from browser_use.agent.service import Agent
from browser_use.browser import BrowserConfig, BrowserSession
from browser_use.controller.service import Controller
from browser_use.llm import ChatOpenAI
browser_profile = BrowserConfig(headless=False)
# Initialize controller first
controller = Controller()
@controller.action(
'Solve Amazon text based captcha',
domains=[
'*.amazon.com',
'*.amazon.co.uk',
'*.amazon.ca',
'*.amazon.de',
'*.amazon.es',
'*.amazon.fr',
'*.amazon.it',
'*.amazon.co.jp',
'*.amazon.in',
'*.amazon.cn',
'*.amazon.com.sg',
'*.amazon.com.mx',
'*.amazon.ae',
'*.amazon.com.br',
'*.amazon.nl',
'*.amazon.com.au',
'*.amazon.com.tr',
'*.amazon.sa',
'*.amazon.se',
'*.amazon.pl',
],
)
async def solve_amazon_captcha(browser_session: BrowserSession):
if not browser_session.agent_focus:
raise ValueError('No active browser session')
# Find the captcha image and extract its src using CDP
result = await browser_session.agent_focus.cdp_client.send.Runtime.evaluate(
params={
'expression': """
const img = document.querySelector('img[src*="amazon.com/captcha"]');
img ? img.src : null;
""",
'returnByValue': True,
},
session_id=browser_session.agent_focus.session_id,
)
link = result.get('result', {}).get('value')
if not link:
raise ValueError('Could not find captcha image on the page')
captcha = AmazonCaptcha.fromlink(link)
solution = captcha.solve()
if not solution or solution == 'Not solved':
raise ValueError('Captcha could not be solved')
# Fill the captcha solution using CDP
await browser_session.agent_focus.cdp_client.send.Runtime.evaluate(
params={
'expression': f"""
const input = document.querySelector('#captchacharacters');
if (input) {{
input.value = '{solution}';
input.dispatchEvent(new Event('input', {{ bubbles: true }}));
input.dispatchEvent(new Event('change', {{ bubbles: true }}));
}}
""",
},
session_id=browser_session.agent_focus.session_id,
)
# Click submit button using CDP
await browser_session.agent_focus.cdp_client.send.Runtime.evaluate(
params={
'expression': """
const button = document.querySelector('button[type="submit"]');
if (button) button.click();
""",
},
session_id=browser_session.agent_focus.session_id,
)
return ActionResult(extracted_content=solution)
async def main():
task = 'Go to https://www.amazon.com/errors/validateCaptcha and solve the captcha using the solve_amazon_captcha tool'
model = ChatOpenAI(model='gpt-4.1')
browser_session = BrowserSession(browser_profile=browser_profile)
await browser_session.start()
agent = Agent(task=task, llm=model, controller=controller, browser_session=browser_session)
await agent.run()
await browser_session.kill()
input('Press Enter to close...')
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -0,0 +1,38 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = ["browser-use", "mistralai"]
# ///
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from dotenv import load_dotenv
load_dotenv()
import asyncio
import logging
from browser_use import Agent, ChatOpenAI
logger = logging.getLogger(__name__)
async def main():
agent = Agent(
task="""
Objective: Navigate to the following URL whats on page 3?
URL: https://docs.house.gov/meetings/GO/GO00/20220929/115171/HHRG-117-GO00-20220929-SD010.pdf
""",
llm=ChatOpenAI(model='gpt-4.1-mini'),
)
result = await agent.run()
logger.info(result)
if __name__ == '__main__':
asyncio.run(main())