apply ruff safe fixes

This commit is contained in:
Nick Sweeting
2025-03-28 18:11:36 -07:00
parent d9e03f4628
commit fb6fa259a8
45 changed files with 2067 additions and 1946 deletions

View File

@@ -158,13 +158,13 @@ def _create_task_frame(
# Draw task text with dynamic font size based on task length
margin = 140 # Increased margin
max_width = image.width - (2 * margin)
# Dynamic font size calculation based on task length
# Start with base font size (regular + 16)
base_font_size = regular_font.size + 16
min_font_size = max(regular_font.size - 10, 16) # Don't go below 16pt
max_font_size = base_font_size # Cap at the base font size
# Calculate dynamic font size based on text length and complexity
# Longer texts get progressively smaller fonts
text_length = len(task)
@@ -173,9 +173,9 @@ def _create_task_frame(
font_size = max(base_font_size - int(10 * (text_length / 200)), min_font_size)
else:
font_size = base_font_size
larger_font = ImageFont.truetype(regular_font.path, font_size)
# Generate wrapped text with the calculated font size
wrapped_text = _wrap_text(task, larger_font, max_width)

View File

@@ -8,7 +8,7 @@ from langchain_core.load import dumpd, load
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage, ToolMessage
from pydantic import BaseModel, ConfigDict, Field, model_serializer, model_validator
filterwarnings("ignore", category=LangChainBetaWarning)
filterwarnings('ignore', category=LangChainBetaWarning)
if TYPE_CHECKING:
from browser_use.agent.views import AgentOutput

View File

@@ -6,7 +6,6 @@ import inspect
import json
import logging
import os
import platform
import re
import time
from pathlib import Path

View File

@@ -23,11 +23,11 @@ from browser_use.dom.views import SelectorMap
ToolCallingMethod = Literal['function_calling', 'json_mode', 'raw', 'auto']
REQUIRED_LLM_API_ENV_VARS = {
"ChatOpenAI": ["OPENAI_API_KEY"],
"AzureOpenAI": ["AZURE_ENDPOINT", "AZURE_OPENAI_API_KEY"],
"ChatBedrockConverse": ["ANTHROPIC_API_KEY"],
"ChatAnthropic": ["ANTHROPIC_API_KEY"],
"ChatGoogleGenerativeAI": ["GEMINI_API_KEY"]
'ChatOpenAI': ['OPENAI_API_KEY'],
'AzureOpenAI': ['AZURE_ENDPOINT', 'AZURE_OPENAI_API_KEY'],
'ChatBedrockConverse': ['ANTHROPIC_API_KEY'],
'ChatAnthropic': ['ANTHROPIC_API_KEY'],
'ChatGoogleGenerativeAI': ['GEMINI_API_KEY'],
}

View File

@@ -158,12 +158,7 @@ class BrowserContextConfig(BaseModel):
class BrowserSession:
def __init__(
self,
context: PlaywrightBrowserContext,
cached_state: BrowserState | None=None
):
def __init__(self, context: PlaywrightBrowserContext, cached_state: BrowserState | None = None):
init_script = """
(() => {
if (!window.getEventListeners) {
@@ -212,6 +207,7 @@ class BrowserSession:
self.cached_state = cached_state
self.context.on('page', lambda page: page.add_init_script(init_script))
@dataclass
class BrowserContextState:
"""
@@ -1217,7 +1213,9 @@ class BrowserContext:
return None
@time_execution_async('--get_locate_element_by_text')
async def get_locate_element_by_text(self, text: str, nth: Optional[int] = 0, element_type: Optional[str] = None) -> Optional[ElementHandle]:
async def get_locate_element_by_text(
self, text: str, nth: Optional[int] = 0, element_type: Optional[str] = None
) -> Optional[ElementHandle]:
"""
Locates an element on the page using the provided text.
If `nth` is provided, it returns the nth matching element (0-based).
@@ -1226,7 +1224,7 @@ class BrowserContext:
current_frame = await self.get_current_page()
try:
# handle also specific element type or use any type.
selector = f"{element_type or '*'}:text(\"{text}\")"
selector = f'{element_type or "*"}:text("{text}")'
elements = await current_frame.query_selector_all(selector)
# considering only visible elements
elements = [el for el in elements if await el.is_visible()]
@@ -1574,4 +1572,4 @@ class BrowserContext:
TimeoutError: If the element does not become visible within the specified timeout.
"""
page = await self.get_current_page()
await page.wait_for_selector(selector, state="visible", timeout=timeout)
await page.wait_for_selector(selector, state='visible', timeout=timeout)

View File

@@ -31,5 +31,6 @@ async def test_take_full_page_screenshot():
finally:
await browser.close()
if __name__ == '__main__':
asyncio.run(test_take_full_page_screenshot())

View File

@@ -16,13 +16,17 @@ class TabInfo(BaseModel):
title: str
parent_page_id: Optional[int] = None # parent page that contains this popup or cross-origin iframe
class GroupTabsAction(BaseModel):
tab_ids: list[int]
title: str
color: Optional[str] = "blue"
tab_ids: list[int]
title: str
color: Optional[str] = 'blue'
class UngroupTabsAction(BaseModel):
tab_ids: list[int]
tab_ids: list[int]
@dataclass
class BrowserState(DOMState):
url: str

View File

@@ -212,9 +212,7 @@ class Controller(Generic[Context]):
async def click_element_by_text(params: ClickElementByTextAction, browser: BrowserContext):
try:
element_node = await browser.get_locate_element_by_text(
text=params.text,
nth=params.nth,
element_type=params.element_type
text=params.text, nth=params.nth, element_type=params.element_type
)
if element_node:
@@ -302,7 +300,9 @@ class Controller(Generic[Context]):
@self.registry.action(
'Extract page content to retrieve specific information from the page, e.g. all company names, a specifc description, all information about, links with companies in structured format or simply links',
)
async def extract_content(goal: str, should_strip_link_urls: bool, browser: BrowserContext, page_extraction_llm: BaseChatModel):
async def extract_content(
goal: str, should_strip_link_urls: bool, browser: BrowserContext, page_extraction_llm: BaseChatModel
):
page = await browser.get_current_page()
import markdownify

View File

@@ -13,23 +13,28 @@ class GoToUrlAction(BaseModel):
class WaitForElementAction(BaseModel):
selector: str
timeout: Optional[int] = 10000 # Timeout in milliseconds
selector: str
timeout: Optional[int] = 10000 # Timeout in milliseconds
class ClickElementAction(BaseModel):
index: int
xpath: Optional[str] = None
class ClickElementByXpathAction(BaseModel):
xpath: str
xpath: str
class ClickElementBySelectorAction(BaseModel):
css_selector: str
css_selector: str
class ClickElementByTextAction(BaseModel):
text: str
element_type: Optional[str]
nth: int = 0
text: str
element_type: Optional[str]
nth: int = 0
class InputTextAction(BaseModel):
index: int
@@ -61,16 +66,19 @@ class ScrollAction(BaseModel):
class SendKeysAction(BaseModel):
keys: str
class GroupTabsAction(BaseModel):
tab_ids: list[int] = Field(..., description="List of tab IDs to group")
title: str = Field(..., description="Name for the tab group")
color: Optional[str] = Field(
"blue",
description="Color for the group (grey/blue/red/yellow/green/pink/purple/cyan)",
)
tab_ids: list[int] = Field(..., description='List of tab IDs to group')
title: str = Field(..., description='Name for the tab group')
color: Optional[str] = Field(
'blue',
description='Color for the group (grey/blue/red/yellow/green/pink/purple/cyan)',
)
class UngroupTabsAction(BaseModel):
tab_ids: list[int] = Field(..., description="List of tab IDs to ungroup")
tab_ids: list[int] = Field(..., description='List of tab IDs to ungroup')
class ExtractPageContentAction(BaseModel):
value: str

View File

@@ -1,5 +1,5 @@
class LLMException(Exception):
def __init__(self, status_code, message):
self.status_code = status_code
self.message = message
super().__init__(f"Error {status_code}: {message}")
def __init__(self, status_code, message):
self.status_code = status_code
self.message = message
super().__init__(f'Error {status_code}: {message}')

View File

@@ -82,7 +82,7 @@ class SignalHandler:
if self.custom_exit_callback:
self.custom_exit_callback()
os._exit(0)
self.original_sigint_handler = signal.signal(signal.SIGINT, windows_handler)
else:
# On Unix-like systems, use asyncio's signal handling for smoother experience

View File

@@ -13,13 +13,15 @@ api_key = os.getenv('GROK_API_KEY', '')
if not api_key:
raise ValueError('GROK_API_KEY is not set')
async def main():
agent = Agent(
task='Go to https://www.google.com and search for "python" and click on the first result',
use_vision=False,
llm=ChatOpenAI(model="grok-2-1212",base_url="https://api.x.ai/v1",api_key=SecretStr(api_key)),
)
await agent.run()
async def main():
agent = Agent(
task='Go to https://www.google.com and search for "python" and click on the first result',
use_vision=False,
llm=ChatOpenAI(model='grok-2-1212', base_url='https://api.x.ai/v1', api_key=SecretStr(api_key)),
)
await agent.run()
asyncio.run(main())

View File

@@ -1,4 +1,3 @@
import os
import sys
@@ -13,7 +12,7 @@ from langchain_openai import ChatOpenAI
from browser_use import Agent, Controller
from browser_use.browser.browser import Browser, BrowserConfig
from browser_use.browser.context import BrowserContext
# async def group_tabs(self, tab_ids: list[int] , title: str, color: str = "blue"):
# """Reset the browser session
# Call this when you don't want to kill the context but just kill the state
@@ -23,7 +22,7 @@ from browser_use.browser.context import BrowserContext
# js = f"""
# chrome.tabs.group({{ tabIds: {tab_ids} }}, (groupId) => {{
# chrome.tabGroups.update(groupId, {{
# chrome.tabGroups.update(groupId, {{
# title: "{title}",
# color: "{color}"
# }});
@@ -45,7 +44,7 @@ from browser_use.browser.context import BrowserContext
# }}
# """
# await page.evaluate(js)
# await page.evaluate(js)
# Initialize controller first
@@ -57,52 +56,39 @@ browser = Browser(
)
controller = Controller()
@controller.action(
"Visually group browser tabs in Chrome",
param_model=GroupTabsAction,
requires_browser=True
)
async def group_tabs(params: GroupTabsAction, browser: BrowserContext):
try:
# Get tab IDs from params
tab_ids = params.tab_ids
title = params.title
color = params.color
# Call the low-level implementation in BrowserContext
result = await browser.group_tabs(tab_ids, title, color='red')
return ActionResult(
extracted_content=result,
include_in_memory=True
)
except Exception as e:
return ActionResult(error=f"Failed to group tabs: {str(e)}")
@controller.action('Visually group browser tabs in Chrome', param_model=GroupTabsAction, requires_browser=True)
async def group_tabs(params: GroupTabsAction, browser: BrowserContext):
try:
# Get tab IDs from params
tab_ids = params.tab_ids
title = params.title
color = params.color
# Call the low-level implementation in BrowserContext
result = await browser.group_tabs(tab_ids, title, color='red')
return ActionResult(extracted_content=result, include_in_memory=True)
except Exception as e:
return ActionResult(error=f'Failed to group tabs: {str(e)}')
# Register ungroup_tabs action
@controller.action(
"Remove visual grouping from tabs in Chrome",
param_model=UngroupTabsAction,
requires_browser=True
)
@controller.action('Remove visual grouping from tabs in Chrome', param_model=UngroupTabsAction, requires_browser=True)
async def ungroup_tabs(params: UngroupTabsAction, browser: BrowserContext):
try:
# Get tab IDs from params
tab_ids = params.tab_ids
try:
# Get tab IDs from params
tab_ids = params.tab_ids
# Call the low-level implementation in BrowserContext
result = await browser.ungroup_tabs(tab_ids)
return ActionResult(extracted_content=result, include_in_memory=True)
except Exception as e:
return ActionResult(error=f'Failed to ungroup tabs: {str(e)}')
# Call the low-level implementation in BrowserContext
result = await browser.ungroup_tabs(tab_ids)
return ActionResult(
extracted_content=result,
include_in_memory=True
)
except Exception as e:
return ActionResult(error=f"Failed to ungroup tabs: {str(e)}")
async def main():
task = 'Group tabs 1 and 2 into a "Research" group, then ungroup them.'
model = ChatOpenAI(model='gpt-4o')
agent = Agent(
task=task,
@@ -119,4 +105,4 @@ async def main():
if __name__ == '__main__':
asyncio.run(main())
asyncio.run(main())

View File

@@ -17,72 +17,80 @@ from browser_use.browser.context import BrowserContext
class HoverAction(BaseModel):
index: Optional[int] = None
xpath: Optional[str] = None
index: Optional[int] = None
xpath: Optional[str] = None
selector: Optional[str] = None
browser = Browser(
config=BrowserConfig(
headless=False,
)
config=BrowserConfig(
headless=False,
)
)
controller = Controller()
@controller.registry.action(
'Hover over an element',
param_model=HoverAction, # Define this model with at least "index: int" field
'Hover over an element',
param_model=HoverAction, # Define this model with at least "index: int" field
)
async def hover_element(params: HoverAction, browser: BrowserContext):
"""
Hovers over the element specified by its index from the cached selector map or by XPath.
"""
session = await browser.get_session()
state = session.cached_state
"""
Hovers over the element specified by its index from the cached selector map or by XPath.
"""
session = await browser.get_session()
state = session.cached_state
if params.xpath:
# Use XPath to locate the element
element_handle = await browser.get_locate_element_by_xpath(params.xpath)
if element_handle is None:
raise Exception(f'Failed to locate element with XPath {params.xpath}')
elif params.selector:
# Use CSS selector to locate the element
element_handle = await browser.get_locate_element_by_css_selector(params.selector)
if element_handle is None:
raise Exception(f'Failed to locate element with CSS Selector {params.selector}')
elif params.index is not None:
# Use index to locate the element
if state is None or params.index not in state.selector_map:
raise Exception(f'Element index {params.index} does not exist - retry or use alternative actions')
element_node = state.selector_map[params.index]
element_handle = await browser.get_locate_element(element_node)
if element_handle is None:
raise Exception(f'Failed to locate element with index {params.index}')
else:
raise Exception('Either index or xpath must be provided')
if params.xpath:
# Use XPath to locate the element
element_handle = await browser.get_locate_element_by_xpath(params.xpath)
if element_handle is None:
raise Exception(f'Failed to locate element with XPath {params.xpath}')
elif params.selector:
# Use CSS selector to locate the element
element_handle = await browser.get_locate_element_by_css_selector(params.selector)
if element_handle is None:
raise Exception(f'Failed to locate element with CSS Selector {params.selector}')
elif params.index is not None:
# Use index to locate the element
if state is None or params.index not in state.selector_map:
raise Exception(f'Element index {params.index} does not exist - retry or use alternative actions')
element_node = state.selector_map[params.index]
element_handle = await browser.get_locate_element(element_node)
if element_handle is None:
raise Exception(f'Failed to locate element with index {params.index}')
else:
raise Exception('Either index or xpath must be provided')
try:
await element_handle.hover()
msg = (
f'🖱️ Hovered over element at index {params.index}'
if params.index is not None
else f'🖱️ Hovered over element with XPath {params.xpath}'
)
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
err_msg = f'❌ Failed to hover over element: {str(e)}'
raise Exception(err_msg)
try:
await element_handle.hover()
msg = f'🖱️ Hovered over element at index {params.index}' if params.index is not None else f'🖱️ Hovered over element with XPath {params.xpath}'
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
err_msg = f'❌ Failed to hover over element: {str(e)}'
raise Exception(err_msg)
async def main():
task = 'Open https://testpages.eviltester.com/styled/csspseudo/css-hover.html and hover the element with the css selector #hoverdivpara, then click on "Can you click me?"'
# task = 'Open https://testpages.eviltester.com/styled/csspseudo/css-hover.html and hover the element with the xpath //*[@id="hoverdivpara"], then click on "Can you click me?"'
model = ChatOpenAI(model='gpt-4o')
agent = Agent(
task=task,
llm=model,
controller=controller,
browser=browser,
)
task = 'Open https://testpages.eviltester.com/styled/csspseudo/css-hover.html and hover the element with the css selector #hoverdivpara, then click on "Can you click me?"'
# task = 'Open https://testpages.eviltester.com/styled/csspseudo/css-hover.html and hover the element with the xpath //*[@id="hoverdivpara"], then click on "Can you click me?"'
model = ChatOpenAI(model='gpt-4o')
agent = Agent(
task=task,
llm=model,
controller=controller,
browser=browser,
)
await agent.run()
await browser.close()
await agent.run()
await browser.close()
input('Press Enter to close...')
input('Press Enter to close...')
if __name__ == '__main__':
asyncio.run(main())
asyncio.run(main())

View File

@@ -147,61 +147,65 @@ HTML_CONTENT = """
"""
# aiohttp request handler to serve the HTML content
async def handle_root(request):
return web.Response(text=HTML_CONTENT, content_type='text/html')
return web.Response(text=HTML_CONTENT, content_type='text/html')
# Function to run the HTTP server
async def run_http_server():
app = web.Application()
app.router.add_get("/", handle_root)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "localhost", 8000)
await site.start()
print("HTTP server running on http://localhost:8000")
# Keep the server running indefinitely.
while True:
await asyncio.sleep(3600)
app = web.Application()
app.router.add_get('/', handle_root)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8000)
await site.start()
print('HTTP server running on http://localhost:8000')
# Keep the server running indefinitely.
while True:
await asyncio.sleep(3600)
# Your agent tasks and other logic
load_dotenv()
controller = Controller()
async def main():
# Start the HTTP server in the background.
server_task = asyncio.create_task(run_http_server())
# Start the HTTP server in the background.
server_task = asyncio.create_task(run_http_server())
# Example tasks for the agent.
xpath_task = 'Open http://localhost:8000/, click element with the xpath "/html/body/div/div[1]" and then click on Oranges'
css_selector_task = 'Open http://localhost:8000/, click element with the selector div.select-display and then click on apples'
text_task = 'Open http://localhost:8000/, click the third element with the text "Select a fruit" and then click on Apples, then click the second element with the text "Select a fruit" and then click on Oranges'
select_task = 'Open http://localhost:8000/, choose the car BMW'
button_task = 'Open http://localhost:8000/, click on the button'
# Example tasks for the agent.
xpath_task = 'Open http://localhost:8000/, click element with the xpath "/html/body/div/div[1]" and then click on Oranges'
css_selector_task = 'Open http://localhost:8000/, click element with the selector div.select-display and then click on apples'
text_task = 'Open http://localhost:8000/, click the third element with the text "Select a fruit" and then click on Apples, then click the second element with the text "Select a fruit" and then click on Oranges'
select_task = 'Open http://localhost:8000/, choose the car BMW'
button_task = 'Open http://localhost:8000/, click on the button'
llm = ChatOpenAI(model='gpt-4o')
# llm = ChatGoogleGenerativeAI(
# model="gemini-2.0-flash-lite",
# )
llm = ChatOpenAI(model='gpt-4o')
# llm = ChatGoogleGenerativeAI(
# model="gemini-2.0-flash-lite",
# )
# Run different agent tasks.
for task in [xpath_task, css_selector_task, text_task, select_task, button_task]:
agent = Agent(
task=task,
llm=llm,
controller=controller,
)
await agent.run()
# Run different agent tasks.
for task in [xpath_task, css_selector_task, text_task, select_task, button_task]:
agent = Agent(
task=task,
llm=llm,
controller=controller,
)
await agent.run()
# Wait for user input before shutting down.
input('Press Enter to close...')
# Cancel the server task once finished.
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
print('HTTP server stopped.')
# Wait for user input before shutting down.
input('Press Enter to close...')
# Cancel the server task once finished.
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
print("HTTP server stopped.")
if __name__ == '__main__':
asyncio.run(main())
asyncio.run(main())

View File

@@ -49,7 +49,7 @@ llm = get_llm(args.provider)
browser = Browser(
config=BrowserConfig(
#browser_binary_path='/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
# browser_binary_path='/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
)
)

View File

@@ -16,9 +16,7 @@ llm = ChatOpenAI(model='gpt-4o', temperature=0.0)
async def main():
browser = Browser()
async with await browser.new_context(
config=BrowserContextConfig(trace_path='./tmp/traces/')
) as context:
async with await browser.new_context(config=BrowserContextConfig(trace_path='./tmp/traces/')) as context:
agent = Agent(
task='Go to hackernews, then go to apple.com and return all titles of open tabs',
llm=llm,

View File

@@ -55,9 +55,7 @@ class DiscordBot(commands.Bot):
intents.members = True # Enable members intent for user info
# Initialize the bot with a command prefix and intents.
super().__init__(
command_prefix='!', intents=intents
) # You may not need prefix, just here for flexibility
super().__init__(command_prefix='!', intents=intents) # You may not need prefix, just here for flexibility
# self.tree = app_commands.CommandTree(self) # Initialize command tree for slash commands.
@@ -86,12 +84,8 @@ class DiscordBot(commands.Bot):
print(f'Error sending start message: {e}')
try:
agent_message = await self.run_agent(
message.content.replace(f'{self.prefix} ', '').strip()
)
await message.channel.send(
content=f'{agent_message}', reference=message, mention_author=True
)
agent_message = await self.run_agent(message.content.replace(f'{self.prefix} ', '').strip())
await message.channel.send(content=f'{agent_message}', reference=message, mention_author=True)
except Exception as e:
await message.channel.send(
content=f'Error during task execution: {str(e)}',

View File

@@ -18,96 +18,107 @@ logger = logging.getLogger('slack')
app = FastAPI()
class SlackBot:
def __init__(self, llm: BaseChatModel, bot_token: str, signing_secret: str, ack: bool = False, browser_config: BrowserConfig = BrowserConfig(headless=True)):
if not bot_token or not signing_secret:
raise ValueError("Bot token and signing secret must be provided")
def __init__(
self,
llm: BaseChatModel,
bot_token: str,
signing_secret: str,
ack: bool = False,
browser_config: BrowserConfig = BrowserConfig(headless=True),
):
if not bot_token or not signing_secret:
raise ValueError('Bot token and signing secret must be provided')
self.llm = llm
self.ack = ack
self.browser_config = browser_config
self.client = AsyncWebClient(token=bot_token)
self.signature_verifier = SignatureVerifier(signing_secret)
self.processed_events = set()
logger.info("SlackBot initialized")
self.llm = llm
self.ack = ack
self.browser_config = browser_config
self.client = AsyncWebClient(token=bot_token)
self.signature_verifier = SignatureVerifier(signing_secret)
self.processed_events = set()
logger.info('SlackBot initialized')
async def handle_event(self, event, event_id):
try:
logger.info(f"Received event id: {event_id}")
if not event_id:
logger.warning("Event ID missing in event data")
return
async def handle_event(self, event, event_id):
try:
logger.info(f'Received event id: {event_id}')
if not event_id:
logger.warning('Event ID missing in event data')
return
if event_id in self.processed_events:
logger.info(f"Event {event_id} already processed")
return
self.processed_events.add(event_id)
if event_id in self.processed_events:
logger.info(f'Event {event_id} already processed')
return
self.processed_events.add(event_id)
if 'subtype' in event and event['subtype'] == 'bot_message':
return
if 'subtype' in event and event['subtype'] == 'bot_message':
return
text = event.get('text')
user_id = event.get('user')
if text and text.startswith('$bu '):
task = text[len('$bu '):].strip()
if self.ack:
try:
await self.send_message(event['channel'], f'<@{user_id}> Starting browser use task...', thread_ts=event.get('ts'))
except Exception as e:
logger.error(f"Error sending start message: {e}")
text = event.get('text')
user_id = event.get('user')
if text and text.startswith('$bu '):
task = text[len('$bu ') :].strip()
if self.ack:
try:
await self.send_message(
event['channel'], f'<@{user_id}> Starting browser use task...', thread_ts=event.get('ts')
)
except Exception as e:
logger.error(f'Error sending start message: {e}')
try:
agent_message = await self.run_agent(task)
await self.send_message(event['channel'], f'<@{user_id}> {agent_message}', thread_ts=event.get('ts'))
except Exception as e:
await self.send_message(event['channel'], f'Error during task execution: {str(e)}', thread_ts=event.get('ts'))
except Exception as e:
logger.error(f"Error in handle_event: {str(e)}")
try:
agent_message = await self.run_agent(task)
await self.send_message(event['channel'], f'<@{user_id}> {agent_message}', thread_ts=event.get('ts'))
except Exception as e:
await self.send_message(event['channel'], f'Error during task execution: {str(e)}', thread_ts=event.get('ts'))
except Exception as e:
logger.error(f'Error in handle_event: {str(e)}')
async def run_agent(self, task: str) -> str:
try:
browser = Browser(config=self.browser_config)
agent = Agent(task=task, llm=self.llm, browser=browser)
result = await agent.run()
async def run_agent(self, task: str) -> str:
try:
browser = Browser(config=self.browser_config)
agent = Agent(task=task, llm=self.llm, browser=browser)
result = await agent.run()
agent_message = None
if result.is_done():
agent_message = result.history[-1].result[0].extracted_content
agent_message = None
if result.is_done():
agent_message = result.history[-1].result[0].extracted_content
if agent_message is None:
agent_message = 'Oops! Something went wrong while running Browser-Use.'
if agent_message is None:
agent_message = 'Oops! Something went wrong while running Browser-Use.'
return agent_message
return agent_message
except Exception as e:
logger.error(f"Error during task execution: {str(e)}")
return f'Error during task execution: {str(e)}'
except Exception as e:
logger.error(f'Error during task execution: {str(e)}')
return f'Error during task execution: {str(e)}'
async def send_message(self, channel, text, thread_ts=None):
try:
await self.client.chat_postMessage(channel=channel, text=text, thread_ts=thread_ts)
except SlackApiError as e:
logger.error(f"Error sending message: {e.response['error']}")
async def send_message(self, channel, text, thread_ts=None):
try:
await self.client.chat_postMessage(channel=channel, text=text, thread_ts=thread_ts)
except SlackApiError as e:
logger.error(f'Error sending message: {e.response["error"]}')
@app.post("/slack/events")
@app.post('/slack/events')
async def slack_events(request: Request, slack_bot: SlackBot = Depends()):
try:
if not slack_bot.signature_verifier.is_valid_request(await request.body(), dict(request.headers)):
logger.warning("Request verification failed")
raise HTTPException(status_code=400, detail="Request verification failed")
try:
if not slack_bot.signature_verifier.is_valid_request(await request.body(), dict(request.headers)):
logger.warning('Request verification failed')
raise HTTPException(status_code=400, detail='Request verification failed')
event_data = await request.json()
logger.info(f"Received event data: {event_data}")
if 'challenge' in event_data:
return {"challenge": event_data['challenge']}
event_data = await request.json()
logger.info(f'Received event data: {event_data}')
if 'challenge' in event_data:
return {'challenge': event_data['challenge']}
if 'event' in event_data:
try:
await slack_bot.handle_event(event_data.get('event'), event_data.get('event_id'))
except Exception as e:
logger.error(f"Error handling event: {str(e)}")
if 'event' in event_data:
try:
await slack_bot.handle_event(event_data.get('event'), event_data.get('event_id'))
except Exception as e:
logger.error(f'Error handling event: {str(e)}')
return {}
except Exception as e:
logger.error(f"Error in slack_events: {str(e)}")
raise HTTPException(status_code=500, detail="Internal Server Error")
return {}
except Exception as e:
logger.error(f'Error in slack_events: {str(e)}')
raise HTTPException(status_code=500, detail='Internal Server Error')

View File

@@ -24,26 +24,26 @@ azure_openai_api_key = os.getenv('AZURE_OPENAI_API_KEY')
azure_openai_endpoint = os.getenv('AZURE_OPENAI_ENDPOINT')
if not azure_openai_api_key or not azure_openai_endpoint:
raise ValueError('AZURE_OPENAI_API_KEY or AZURE_OPENAI_ENDPOINT is not set')
raise ValueError('AZURE_OPENAI_API_KEY or AZURE_OPENAI_ENDPOINT is not set')
# Initialize the Azure OpenAI client
llm = AzureChatOpenAI(
model_name='gpt-4o',
openai_api_key=azure_openai_api_key,
azure_endpoint=azure_openai_endpoint, # Corrected to use azure_endpoint instead of openai_api_base
deployment_name='gpt-4o', # Use deployment_name for Azure models
api_version='2024-08-01-preview' # Explicitly set the API version here
model_name='gpt-4o',
openai_api_key=azure_openai_api_key,
azure_endpoint=azure_openai_endpoint, # Corrected to use azure_endpoint instead of openai_api_base
deployment_name='gpt-4o', # Use deployment_name for Azure models
api_version='2024-08-01-preview', # Explicitly set the API version here
)
agent = Agent(
task='Go to amazon.com, search for laptop, sort by best rating, and give me the price of the first result',
llm=llm,
task='Go to amazon.com, search for laptop, sort by best rating, and give me the price of the first result',
llm=llm,
)
async def main():
await agent.run(max_steps=10)
input('Press Enter to continue...')
await agent.run(max_steps=10)
input('Press Enter to continue...')
asyncio.run(main())

View File

@@ -19,19 +19,19 @@ from browser_use.controller.service import Controller
def get_llm():
return ChatBedrockConverse(
model_id="us.anthropic.claude-3-5-sonnet-20241022-v2:0",
temperature=0.0,
max_tokens=None,
)
return ChatBedrockConverse(
model_id='us.anthropic.claude-3-5-sonnet-20241022-v2:0',
temperature=0.0,
max_tokens=None,
)
# Define the task for the agent
task = (
"Visit cnn.com, navigate to the 'World News' section, and identify the latest headline. "
"Open the first article and summarize its content in 3-4 sentences. "
"Additionally, analyze the sentiment of the article (positive, neutral, or negative) "
"and provide a confidence score for the sentiment. Present the result in a tabular format."
"Visit cnn.com, navigate to the 'World News' section, and identify the latest headline. "
'Open the first article and summarize its content in 3-4 sentences. '
'Additionally, analyze the sentiment of the article (positive, neutral, or negative) '
'and provide a confidence score for the sentiment. Present the result in a tabular format.'
)
parser = argparse.ArgumentParser()
@@ -41,19 +41,23 @@ args = parser.parse_args()
llm = get_llm()
browser = Browser(
config=BrowserConfig(
# browser_binary_path='/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
)
config=BrowserConfig(
# browser_binary_path='/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
)
)
agent = Agent(
task=args.query, llm=llm, controller=Controller(), browser=browser, validate_output=True,
task=args.query,
llm=llm,
controller=Controller(),
browser=browser,
validate_output=True,
)
async def main():
await agent.run(max_steps=30)
await browser.close()
await agent.run(max_steps=30)
await browser.close()
asyncio.run(main())

View File

@@ -15,22 +15,22 @@ from browser_use.agent.views import AgentHistoryList
async def run_search() -> AgentHistoryList:
agent = Agent(
task="Search for a 'browser use' post on the r/LocalLLaMA subreddit and open it.",
llm=ChatOllama(
model="qwen2.5:32b-instruct-q4_K_M",
num_ctx=32000,
),
)
agent = Agent(
task="Search for a 'browser use' post on the r/LocalLLaMA subreddit and open it.",
llm=ChatOllama(
model='qwen2.5:32b-instruct-q4_K_M',
num_ctx=32000,
),
)
result = await agent.run()
return result
result = await agent.run()
return result
async def main():
result = await run_search()
print("\n\n", result)
result = await run_search()
print('\n\n', result)
if __name__ == "__main__":
asyncio.run(main())
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -8,10 +8,7 @@ from browser_use import Agent
async def run_search():
agent = Agent(
task=(
'1. Go to https://www.reddit.com/r/LocalLLaMA'
"2. Search for 'browser use' in the search bar"
'3. Click search'
'4. Call done'
"1. Go to https://www.reddit.com/r/LocalLLaMA2. Search for 'browser use' in the search bar3. Click search4. Call done"
),
llm=ChatOllama(
# model='qwen2.5:32b-instruct-q4_K_M',

File diff suppressed because one or more lines are too long

View File

@@ -11,6 +11,7 @@ Example 3: Using Anthropic's Claude Model with a Custom Query
python command_line.py --query "find latest Python tutorials on Medium" --provider anthropic
"""
import argparse
import asyncio
import os
@@ -27,70 +28,71 @@ from browser_use.controller.service import Controller
load_dotenv()
def get_llm(provider: str):
if provider == 'anthropic':
from langchain_anthropic import ChatAnthropic
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("Error: ANTHROPIC_API_KEY is not set. Please provide a valid API key.")
return ChatAnthropic(
model_name='claude-3-5-sonnet-20240620', timeout=25, stop=None, temperature=0.0
)
api_key = os.getenv('ANTHROPIC_API_KEY')
if not api_key:
raise ValueError('Error: ANTHROPIC_API_KEY is not set. Please provide a valid API key.')
return ChatAnthropic(model_name='claude-3-5-sonnet-20240620', timeout=25, stop=None, temperature=0.0)
elif provider == 'openai':
from langchain_openai import ChatOpenAI
api_key = os.getenv("OPENAI_API_KEY")
api_key = os.getenv('OPENAI_API_KEY')
if not api_key:
raise ValueError("Error: OPENAI_API_KEY is not set. Please provide a valid API key.")
raise ValueError('Error: OPENAI_API_KEY is not set. Please provide a valid API key.')
return ChatOpenAI(model='gpt-4o', temperature=0.0)
else:
raise ValueError(f'Unsupported provider: {provider}')
def parse_arguments():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Automate browser tasks using an LLM agent.")
parser.add_argument(
'--query',
type=str,
help='The query to process',
default='go to reddit and search for posts about browser-use'
)
parser.add_argument(
'--provider',
type=str,
choices=['openai', 'anthropic'],
default='openai',
help='The model provider to use (default: openai)',
)
return parser.parse_args()
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description='Automate browser tasks using an LLM agent.')
parser.add_argument(
'--query', type=str, help='The query to process', default='go to reddit and search for posts about browser-use'
)
parser.add_argument(
'--provider',
type=str,
choices=['openai', 'anthropic'],
default='openai',
help='The model provider to use (default: openai)',
)
return parser.parse_args()
def initialize_agent(query: str, provider: str):
"""Initialize the browser agent with the given query and provider."""
llm = get_llm(provider)
controller = Controller()
browser = Browser(config=BrowserConfig())
"""Initialize the browser agent with the given query and provider."""
llm = get_llm(provider)
controller = Controller()
browser = Browser(config=BrowserConfig())
return Agent(
task=query,
llm=llm,
controller=controller,
browser=browser,
use_vision=True,
max_actions_per_step=1,
), browser
return Agent(
task=query,
llm=llm,
controller=controller,
browser=browser,
use_vision=True,
max_actions_per_step=1,
), browser
async def main():
"""Main async function to run the agent."""
args = parse_arguments()
agent, browser = initialize_agent(args.query, args.provider)
"""Main async function to run the agent."""
args = parse_arguments()
agent, browser = initialize_agent(args.query, args.provider)
await agent.run(max_steps=25)
await agent.run(max_steps=25)
input('Press Enter to close the browser...')
await browser.close()
input('Press Enter to close the browser...')
await browser.close()
if __name__ == "__main__":
asyncio.run(main())
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -85,9 +85,7 @@ def create_ui():
placeholder='E.g., Find flights from New York to London for next week',
lines=3,
)
model = gr.Dropdown(
choices=['gpt-4', 'gpt-3.5-turbo'], label='Model', value='gpt-4'
)
model = gr.Dropdown(choices=['gpt-4', 'gpt-3.5-turbo'], label='Model', value='gpt-4')
headless = gr.Checkbox(label='Run Headless', value=True)
submit_btn = gr.Button('Run Task')

View File

@@ -23,62 +23,65 @@ from browser_use.controller.service import Controller
load_dotenv()
if os.name == 'nt':
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
# Function to get the LLM based on provider
def get_llm(provider: str):
if provider == 'anthropic':
from langchain_anthropic import ChatAnthropic
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
st.error("Error: ANTHROPIC_API_KEY is not set. Please provide a valid API key.")
st.stop()
if provider == 'anthropic':
from langchain_anthropic import ChatAnthropic
return ChatAnthropic(
model_name='claude-3-5-sonnet-20240620', timeout=25, stop=None, temperature=0.0
)
elif provider == 'openai':
from langchain_openai import ChatOpenAI
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
st.error("Error: OPENAI_API_KEY is not set. Please provide a valid API key.")
st.stop()
api_key = os.getenv('ANTHROPIC_API_KEY')
if not api_key:
st.error('Error: ANTHROPIC_API_KEY is not set. Please provide a valid API key.')
st.stop()
return ChatAnthropic(model_name='claude-3-5-sonnet-20240620', timeout=25, stop=None, temperature=0.0)
elif provider == 'openai':
from langchain_openai import ChatOpenAI
api_key = os.getenv('OPENAI_API_KEY')
if not api_key:
st.error('Error: OPENAI_API_KEY is not set. Please provide a valid API key.')
st.stop()
return ChatOpenAI(model='gpt-4o', temperature=0.0)
else:
st.error(f'Unsupported provider: {provider}')
st.stop()
return ChatOpenAI(model='gpt-4o', temperature=0.0)
else:
st.error(f'Unsupported provider: {provider}')
st.stop()
# Function to initialize the agent
def initialize_agent(query: str, provider: str):
llm = get_llm(provider)
controller = Controller()
browser = Browser(config=BrowserConfig())
llm = get_llm(provider)
controller = Controller()
browser = Browser(config=BrowserConfig())
return Agent(
task=query,
llm=llm,
controller=controller,
browser=browser,
use_vision=True,
max_actions_per_step=1,
), browser
return Agent(
task=query,
llm=llm,
controller=controller,
browser=browser,
use_vision=True,
max_actions_per_step=1,
), browser
# Streamlit UI
st.title("Automated Browser Agent with LLMs 🤖")
st.title('Automated Browser Agent with LLMs 🤖')
query = st.text_input("Enter your query:", "go to reddit and search for posts about browser-use")
provider = st.radio("Select LLM Provider:", ["openai", "anthropic"], index=0)
query = st.text_input('Enter your query:', 'go to reddit and search for posts about browser-use')
provider = st.radio('Select LLM Provider:', ['openai', 'anthropic'], index=0)
if st.button("Run Agent"):
st.write("Initializing agent...")
agent, browser = initialize_agent(query, provider)
if st.button('Run Agent'):
st.write('Initializing agent...')
agent, browser = initialize_agent(query, provider)
async def run_agent():
with st.spinner("Running automation..."):
await agent.run(max_steps=25)
st.success("Task completed! 🎉")
async def run_agent():
with st.spinner('Running automation...'):
await agent.run(max_steps=25)
st.success('Task completed! 🎉')
asyncio.run(run_agent())
asyncio.run(run_agent())
st.button("Close Browser", on_click=lambda: asyncio.run(browser.close()))
st.button('Close Browser', on_click=lambda: asyncio.run(browser.close()))

View File

@@ -23,16 +23,18 @@ from browser_use import Agent
# Load environment variables
load_dotenv()
if not os.getenv('OPENAI_API_KEY'):
raise ValueError('OPENAI_API_KEY is not set. Please add it to your environment variables.')
raise ValueError('OPENAI_API_KEY is not set. Please add it to your environment variables.')
async def main():
llm = ChatOpenAI(model='gpt-4o')
agent = Agent(
llm = ChatOpenAI(model='gpt-4o')
agent = Agent(
task='go to https://captcha.com/demos/features/captcha-demo.aspx and solve the captcha',
llm=llm,
)
await agent.run()
input('Press Enter to exit')
await agent.run()
input('Press Enter to exit')
if __name__ == "__main__":
asyncio.run(main())
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -13,13 +13,14 @@ from browser_use.controller.service import Controller
# Load environment variables
load_dotenv()
if not os.getenv('OPENAI_API_KEY'):
raise ValueError('OPENAI_API_KEY is not set. Please add it to your environment variables.')
raise ValueError('OPENAI_API_KEY is not set. Please add it to your environment variables.')
controller = Controller()
class WebpageInfo(BaseModel):
"""Model for webpage link."""
link: str = 'https://appointment.mfa.gr/en/reservations/aero/ireland-grcon-dub/'
@@ -43,5 +44,5 @@ async def main():
await agent.run()
if __name__ == "__main__":
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -26,10 +26,10 @@ from browser_use.browser.context import BrowserContext
# Validate required environment variables
load_dotenv()
required_env_vars = ["AZURE_OPENAI_KEY", "AZURE_OPENAI_ENDPOINT"]
required_env_vars = ['AZURE_OPENAI_KEY', 'AZURE_OPENAI_ENDPOINT']
for var in required_env_vars:
if not os.getenv(var):
raise ValueError(f"{var} is not set. Please add it to your environment variables.")
if not os.getenv(var):
raise ValueError(f'{var} is not set. Please add it to your environment variables.')
logger = logging.getLogger(__name__)
# full screen mode
@@ -156,5 +156,5 @@ async def main():
await asyncio.gather(*[agent.run() for agent in agents])
if __name__ == "__main__":
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -14,33 +14,35 @@ from browser_use import Agent, Browser
# Load environment variables
load_dotenv()
if not os.getenv('OPENAI_API_KEY'):
raise ValueError('OPENAI_API_KEY is not set. Please add it to your environment variables.')
raise ValueError('OPENAI_API_KEY is not set. Please add it to your environment variables.')
async def main():
browser = Browser()
async with await browser.new_context() as context:
model = ChatOpenAI(model='gpt-4o')
browser = Browser()
async with await browser.new_context() as context:
model = ChatOpenAI(model='gpt-4o')
# Initialize browser agent
agent1 = Agent(
task='Open an online code editor programiz.',
llm=model,
browser_context=context,
)
executor = Agent(
task='Executor. Execute the code written by the coder and suggest some updates if there are errors.',
llm=model,
browser_context=context,
)
# Initialize browser agent
agent1 = Agent(
task='Open an online code editor programiz.',
llm=model,
browser_context=context,
)
executor = Agent(
task='Executor. Execute the code written by the coder and suggest some updates if there are errors.',
llm=model,
browser_context=context,
)
coder = Agent(
task='Coder. Your job is to write and complete code. You are an expert coder. Code a simple calculator. Write the code on the coding interface after agent1 has opened the link.',
llm=model,
browser_context=context,
)
await agent1.run()
await executor.run()
await coder.run()
coder = Agent(
task='Coder. Your job is to write and complete code. You are an expert coder. Code a simple calculator. Write the code on the coding interface after agent1 has opened the link.',
llm=model,
browser_context=context,
)
await agent1.run()
await executor.run()
await coder.run()
if __name__ == "__main__":
asyncio.run(main())
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -36,53 +36,53 @@ from browser_use.browser.browser import Browser, BrowserConfig
# Load environment variables
load_dotenv()
if not os.getenv('OPENAI_API_KEY'):
raise ValueError('OPENAI_API_KEY is not set. Please add it to your environment variables.')
raise ValueError('OPENAI_API_KEY is not set. Please add it to your environment variables.')
# ============ Configuration Section ============
@dataclass
class TwitterConfig:
"""Configuration for Twitter posting"""
"""Configuration for Twitter posting"""
openai_api_key: str
chrome_path: str
target_user: str # Twitter handle without @
message: str
reply_url: str
headless: bool = False
model: str = "gpt-4o-mini"
base_url: str = "https://x.com/home"
openai_api_key: str
chrome_path: str
target_user: str # Twitter handle without @
message: str
reply_url: str
headless: bool = False
model: str = 'gpt-4o-mini'
base_url: str = 'https://x.com/home'
# Customize these settings
config = TwitterConfig(
openai_api_key=os.getenv("OPENAI_API_KEY"),
chrome_path="/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", # This is for MacOS (Chrome)
target_user="XXXXX",
message="XXXXX",
reply_url="XXXXX",
headless=False,
openai_api_key=os.getenv('OPENAI_API_KEY'),
chrome_path='/Applications/Google Chrome.app/Contents/MacOS/Google Chrome', # This is for MacOS (Chrome)
target_user='XXXXX',
message='XXXXX',
reply_url='XXXXX',
headless=False,
)
def create_twitter_agent(config: TwitterConfig) -> Agent:
llm = ChatOpenAI(model=config.model, api_key=config.openai_api_key)
llm = ChatOpenAI(model=config.model, api_key=config.openai_api_key)
browser = Browser(
config=BrowserConfig(
headless=config.headless,
browser_binary_path=config.chrome_path,
)
)
browser = Browser(
config=BrowserConfig(
headless=config.headless,
browser_binary_path=config.chrome_path,
)
)
controller = Controller()
controller = Controller()
# Construct the full message with tag
full_message = f'@{config.target_user} {config.message}'
# Construct the full message with tag
full_message = f"@{config.target_user} {config.message}"
# Create the agent with detailed instructions
return Agent(
task=f"""Navigate to Twitter and create a post and reply to a tweet.
# Create the agent with detailed instructions
return Agent(
task=f"""Navigate to Twitter and create a post and reply to a tweet.
Here are the specific steps:
@@ -103,25 +103,25 @@ def create_twitter_agent(config: TwitterConfig) -> Agent:
- Verify the post button is clickable before clicking
- Do not click on the '+' button which will add another tweet
""",
llm=llm,
controller=controller,
browser=browser,
)
llm=llm,
controller=controller,
browser=browser,
)
async def post_tweet(agent: Agent):
try:
await agent.run(max_steps=100)
agent.create_history_gif()
print("Tweet posted successfully!")
except Exception as e:
print(f"Error posting tweet: {str(e)}")
try:
await agent.run(max_steps=100)
agent.create_history_gif()
print('Tweet posted successfully!')
except Exception as e:
print(f'Error posting tweet: {str(e)}')
async def main():
agent = create_twitter_agent(config)
await agent.run()
agent = create_twitter_agent(config)
await agent.run()
if __name__ == "__main__":
asyncio.run(main())
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -38,5 +38,5 @@ async def main():
await agent.run()
if __name__ == "__main__":
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -7,7 +7,7 @@ load_dotenv()
import asyncio
task="""
task = """
### Prompt for Shopping Agent Migros Online Grocery Order
**Objective:**
@@ -107,15 +107,17 @@ At this stage, check the basket on the top right (indicates the price) and check
browser = Browser()
agent = Agent(
task=task,
llm=ChatOpenAI(model="gpt-4o"),
browser=browser,
)
task=task,
llm=ChatOpenAI(model='gpt-4o'),
browser=browser,
)
async def main():
await agent.run()
input("Press Enter to close the browser...")
await browser.close()
await agent.run()
input('Press Enter to close the browser...')
await browser.close()
if __name__ == '__main__':
asyncio.run(main())
asyncio.run(main())

View File

@@ -39,5 +39,5 @@ async def main():
input('Press Enter to close the browser...')
if __name__ == "__main__":
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -18,10 +18,10 @@ from browser_use.browser.browser import Browser, BrowserConfig, BrowserContextCo
load_dotenv()
# Validate required environment variables
required_env_vars = ["AZURE_OPENAI_KEY", "AZURE_OPENAI_ENDPOINT"]
required_env_vars = ['AZURE_OPENAI_KEY', 'AZURE_OPENAI_ENDPOINT']
for var in required_env_vars:
if not os.getenv(var):
raise ValueError(f"{var} is not set. Please add it to your environment variables.")
if not os.getenv(var):
raise ValueError(f'{var} is not set. Please add it to your environment variables.')
browser = Browser(
config=BrowserConfig(
@@ -68,5 +68,5 @@ async def main():
history.save_to_file('./tmp/history.json')
if __name__ == "__main__":
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -104,15 +104,11 @@ async def test_error_recovery(llm, context):
actions_names = history.action_names()
actions = history.model_actions()
assert (
'go_to_url' in actions_names or 'open_tab' in actions_names
), f'{actions_names} does not contain go_to_url or open_tab'
assert 'go_to_url' in actions_names or 'open_tab' in actions_names, f'{actions_names} does not contain go_to_url or open_tab'
for action in actions:
if 'go_to_url' in action:
assert 'url' in action['go_to_url'], 'url is not in go_to_url'
assert action['go_to_url']['url'].endswith(
'google.com'
), 'url does not end with google.com'
assert action['go_to_url']['url'].endswith('google.com'), 'url does not end with google.com'
break

View File

@@ -11,308 +11,377 @@ from browser_use.browser.context import BrowserContext, BrowserContextConfig
@pytest.mark.asyncio
async def test_builtin_browser_launch(monkeypatch):
"""
Test that the standard browser is launched correctly:
When no remote (cdp or wss) or chrome instance is provided, the Browser class uses _setup_builtin_browser.
This test monkeypatches async_playwright to return dummy objects, and asserts that get_playwright_browser returns the expected DummyBrowser.
"""
class DummyBrowser:
pass
class DummyChromium:
async def launch(self, headless, args, proxy=None):
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr("browser_use.browser.browser.async_playwright", lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(headless=True, disable_security=False, extra_browser_args=["--test"])
browser_obj = Browser(config=config)
result_browser = await browser_obj.get_playwright_browser()
assert isinstance(result_browser, DummyBrowser), "Expected DummyBrowser from _setup_builtin_browser"
await browser_obj.close()
"""
Test that the standard browser is launched correctly:
When no remote (cdp or wss) or chrome instance is provided, the Browser class uses _setup_builtin_browser.
This test monkeypatches async_playwright to return dummy objects, and asserts that get_playwright_browser returns the expected DummyBrowser.
"""
class DummyBrowser:
pass
class DummyChromium:
async def launch(self, headless, args, proxy=None):
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr('browser_use.browser.browser.async_playwright', lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(headless=True, disable_security=False, extra_browser_args=['--test'])
browser_obj = Browser(config=config)
result_browser = await browser_obj.get_playwright_browser()
assert isinstance(result_browser, DummyBrowser), 'Expected DummyBrowser from _setup_builtin_browser'
await browser_obj.close()
@pytest.mark.asyncio
async def test_cdp_browser_launch(monkeypatch):
"""
Test that when a CDP URL is provided in the configuration, the Browser uses _setup_cdp
and returns the expected DummyBrowser.
"""
class DummyBrowser:
pass
class DummyChromium:
async def connect_over_cdp(self, endpoint_url, timeout=20000):
assert endpoint_url == "ws://dummy-cdp-url", "The endpoint URL should match the configuration."
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr("browser_use.browser.browser.async_playwright", lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(cdp_url="ws://dummy-cdp-url")
browser_obj = Browser(config=config)
result_browser = await browser_obj.get_playwright_browser()
assert isinstance(result_browser, DummyBrowser), "Expected DummyBrowser from _setup_cdp"
await browser_obj.close()
"""
Test that when a CDP URL is provided in the configuration, the Browser uses _setup_cdp
and returns the expected DummyBrowser.
"""
class DummyBrowser:
pass
class DummyChromium:
async def connect_over_cdp(self, endpoint_url, timeout=20000):
assert endpoint_url == 'ws://dummy-cdp-url', 'The endpoint URL should match the configuration.'
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr('browser_use.browser.browser.async_playwright', lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(cdp_url='ws://dummy-cdp-url')
browser_obj = Browser(config=config)
result_browser = await browser_obj.get_playwright_browser()
assert isinstance(result_browser, DummyBrowser), 'Expected DummyBrowser from _setup_cdp'
await browser_obj.close()
@pytest.mark.asyncio
async def test_wss_browser_launch(monkeypatch):
"""
Test that when a WSS URL is provided in the configuration,
the Browser uses setup_wss and returns the expected DummyBrowser.
"""
class DummyBrowser:
pass
class DummyChromium:
async def connect(self, wss_url):
assert wss_url == "ws://dummy-wss-url", "WSS URL should match the configuration."
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr("browser_use.browser.browser.async_playwright", lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(wss_url="ws://dummy-wss-url")
browser_obj = Browser(config=config)
result_browser = await browser_obj.get_playwright_browser()
assert isinstance(result_browser, DummyBrowser), "Expected DummyBrowser from _setup_wss"
await browser_obj.close()
"""
Test that when a WSS URL is provided in the configuration,
the Browser uses setup_wss and returns the expected DummyBrowser.
"""
class DummyBrowser:
pass
class DummyChromium:
async def connect(self, wss_url):
assert wss_url == 'ws://dummy-wss-url', 'WSS URL should match the configuration.'
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr('browser_use.browser.browser.async_playwright', lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(wss_url='ws://dummy-wss-url')
browser_obj = Browser(config=config)
result_browser = await browser_obj.get_playwright_browser()
assert isinstance(result_browser, DummyBrowser), 'Expected DummyBrowser from _setup_wss'
await browser_obj.close()
@pytest.mark.asyncio
async def test_user_provided_browser_launch(monkeypatch):
"""
Test that when a browser_binary_path is provided the Browser class uses
_setup_user_provided_browser branch and returns the expected DummyBrowser object
by reusing an existing Chrome instance.
"""
# Dummy response for requests.get when checking chrome debugging endpoint.
class DummyResponse:
status_code = 200
def dummy_get(url, timeout):
if url == "http://localhost:9222/json/version":
return DummyResponse()
raise requests.ConnectionError("Connection failed")
monkeypatch.setattr(requests, "get", dummy_get)
class DummyBrowser:
pass
class DummyChromium:
async def connect_over_cdp(self, endpoint_url, timeout=20000):
assert endpoint_url == "http://localhost:9222", "Endpoint URL must be 'http://localhost:9222'"
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr("browser_use.browser.browser.async_playwright", lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(browser_binary_path="dummy/chrome", extra_browser_args=["--dummy-arg"])
browser_obj = Browser(config=config)
result_browser = await browser_obj.get_playwright_browser()
assert isinstance(result_browser, DummyBrowser), "Expected DummyBrowser from _setup_user_provided_browser"
await browser_obj.close()
"""
Test that when a browser_binary_path is provided the Browser class uses
_setup_user_provided_browser branch and returns the expected DummyBrowser object
by reusing an existing Chrome instance.
"""
# Dummy response for requests.get when checking chrome debugging endpoint.
class DummyResponse:
status_code = 200
def dummy_get(url, timeout):
if url == 'http://localhost:9222/json/version':
return DummyResponse()
raise requests.ConnectionError('Connection failed')
monkeypatch.setattr(requests, 'get', dummy_get)
class DummyBrowser:
pass
class DummyChromium:
async def connect_over_cdp(self, endpoint_url, timeout=20000):
assert endpoint_url == 'http://localhost:9222', "Endpoint URL must be 'http://localhost:9222'"
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr('browser_use.browser.browser.async_playwright', lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(browser_binary_path='dummy/chrome', extra_browser_args=['--dummy-arg'])
browser_obj = Browser(config=config)
result_browser = await browser_obj.get_playwright_browser()
assert isinstance(result_browser, DummyBrowser), 'Expected DummyBrowser from _setup_user_provided_browser'
await browser_obj.close()
@pytest.mark.asyncio
async def test_builtin_browser_disable_security_args(monkeypatch):
"""
Test that the standard browser launch includes disable-security arguments when disable_security is True.
This verifies that _setup_builtin_browser correctly appends the security disabling arguments along with
the base arguments and any extra arguments provided.
"""
# These are the base arguments defined in _setup_builtin_browser.
base_args = [
'--no-sandbox',
'--disable-blink-features=AutomationControlled',
'--disable-infobars',
'--disable-background-timer-throttling',
'--disable-popup-blocking',
'--disable-backgrounding-occluded-windows',
'--disable-renderer-backgrounding',
'--disable-window-activation',
'--disable-focus-on-load',
'--no-first-run',
'--no-default-browser-check',
'--no-startup-window',
'--window-position=0,0',
]
# When disable_security is True, these arguments should be added.
disable_security_args = [
'--disable-web-security',
'--disable-site-isolation-trials',
'--disable-features=IsolateOrigins,site-per-process'
]
# Additional arbitrary argument for testing extra args
extra_args = ["--dummy-extra"]
class DummyBrowser:
pass
class DummyChromium:
async def launch(self, headless, args, proxy=None):
# Expected args is the base args plus disable security args and the extra args.
expected_args = base_args + disable_security_args + extra_args
assert headless is True, "Expected headless to be True"
assert args == expected_args, f"Expected args {expected_args}, but got {args}"
assert proxy is None, "Expected proxy to be None"
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr("browser_use.browser.browser.async_playwright", lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(headless=True, disable_security=True, extra_browser_args=extra_args)
browser_obj = Browser(config=config)
result_browser = await browser_obj.get_playwright_browser()
assert isinstance(result_browser, DummyBrowser), "Expected DummyBrowser from _setup_builtin_browser with disable_security active"
await browser_obj.close()
"""
Test that the standard browser launch includes disable-security arguments when disable_security is True.
This verifies that _setup_builtin_browser correctly appends the security disabling arguments along with
the base arguments and any extra arguments provided.
"""
# These are the base arguments defined in _setup_builtin_browser.
base_args = [
'--no-sandbox',
'--disable-blink-features=AutomationControlled',
'--disable-infobars',
'--disable-background-timer-throttling',
'--disable-popup-blocking',
'--disable-backgrounding-occluded-windows',
'--disable-renderer-backgrounding',
'--disable-window-activation',
'--disable-focus-on-load',
'--no-first-run',
'--no-default-browser-check',
'--no-startup-window',
'--window-position=0,0',
]
# When disable_security is True, these arguments should be added.
disable_security_args = [
'--disable-web-security',
'--disable-site-isolation-trials',
'--disable-features=IsolateOrigins,site-per-process',
]
# Additional arbitrary argument for testing extra args
extra_args = ['--dummy-extra']
class DummyBrowser:
pass
class DummyChromium:
async def launch(self, headless, args, proxy=None):
# Expected args is the base args plus disable security args and the extra args.
expected_args = base_args + disable_security_args + extra_args
assert headless is True, 'Expected headless to be True'
assert args == expected_args, f'Expected args {expected_args}, but got {args}'
assert proxy is None, 'Expected proxy to be None'
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr('browser_use.browser.browser.async_playwright', lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(headless=True, disable_security=True, extra_browser_args=extra_args)
browser_obj = Browser(config=config)
result_browser = await browser_obj.get_playwright_browser()
assert isinstance(result_browser, DummyBrowser), (
'Expected DummyBrowser from _setup_builtin_browser with disable_security active'
)
await browser_obj.close()
@pytest.mark.asyncio
async def test_new_context_creation():
"""
Test that the new_context method returns a BrowserContext with the correct attributes.
This verifies that the BrowserContext is initialized with the provided Browser instance and configuration.
"""
config = BrowserConfig()
browser_obj = Browser(config=config)
custom_context_config = BrowserContextConfig()
context = await browser_obj.new_context(custom_context_config)
assert isinstance(context, BrowserContext), "Expected new_context to return an instance of BrowserContext"
assert context.browser is browser_obj, "Expected the context's browser attribute to be the Browser instance"
assert context.config == custom_context_config, "Expected the context's config attribute to be the provided config"
await browser_obj.close()
"""
Test that the new_context method returns a BrowserContext with the correct attributes.
This verifies that the BrowserContext is initialized with the provided Browser instance and configuration.
"""
config = BrowserConfig()
browser_obj = Browser(config=config)
custom_context_config = BrowserContextConfig()
context = await browser_obj.new_context(custom_context_config)
assert isinstance(context, BrowserContext), 'Expected new_context to return an instance of BrowserContext'
assert context.browser is browser_obj, "Expected the context's browser attribute to be the Browser instance"
assert context.config == custom_context_config, "Expected the context's config attribute to be the provided config"
await browser_obj.close()
@pytest.mark.asyncio
async def test_user_provided_browser_launch_failure(monkeypatch):
"""
Test that when a Chrome instance cannot be started or connected to,
the Browser._setup_user_provided_browser branch eventually raises a RuntimeError.
We simulate failure by:
- Forcing requests.get to always raise a ConnectionError (so no existing instance is found).
- Monkeypatching subprocess.Popen to do nothing.
- Replacing asyncio.sleep to avoid delays.
- Having the dummy playwright's connect_over_cdp method always raise an Exception.
"""
def dummy_get(url, timeout):
raise requests.ConnectionError("Simulated connection failure")
monkeypatch.setattr(requests, "get", dummy_get)
monkeypatch.setattr(subprocess, "Popen", lambda args, stdout, stderr: None)
async def fake_sleep(seconds):
return
monkeypatch.setattr(asyncio, "sleep", fake_sleep)
class DummyChromium:
async def connect_over_cdp(self, endpoint_url, timeout=20000):
raise Exception("Connection failed simulation")
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr("browser_use.browser.browser.async_playwright", lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(browser_binary_path="dummy/chrome", extra_browser_args=["--dummy-arg"])
browser_obj = Browser(config=config)
with pytest.raises(RuntimeError, match="To start chrome in Debug mode"):
await browser_obj.get_playwright_browser()
await browser_obj.close()
"""
Test that when a Chrome instance cannot be started or connected to,
the Browser._setup_user_provided_browser branch eventually raises a RuntimeError.
We simulate failure by:
- Forcing requests.get to always raise a ConnectionError (so no existing instance is found).
- Monkeypatching subprocess.Popen to do nothing.
- Replacing asyncio.sleep to avoid delays.
- Having the dummy playwright's connect_over_cdp method always raise an Exception.
"""
def dummy_get(url, timeout):
raise requests.ConnectionError('Simulated connection failure')
monkeypatch.setattr(requests, 'get', dummy_get)
monkeypatch.setattr(subprocess, 'Popen', lambda args, stdout, stderr: None)
async def fake_sleep(seconds):
return
monkeypatch.setattr(asyncio, 'sleep', fake_sleep)
class DummyChromium:
async def connect_over_cdp(self, endpoint_url, timeout=20000):
raise Exception('Connection failed simulation')
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr('browser_use.browser.browser.async_playwright', lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(browser_binary_path='dummy/chrome', extra_browser_args=['--dummy-arg'])
browser_obj = Browser(config=config)
with pytest.raises(RuntimeError, match='To start chrome in Debug mode'):
await browser_obj.get_playwright_browser()
await browser_obj.close()
@pytest.mark.asyncio
async def test_get_playwright_browser_caching(monkeypatch):
"""
Test that get_playwright_browser returns a cached browser instance.
On the first call, the browser is initialized; on subsequent calls,
the same instance is returned.
"""
class DummyBrowser:
pass
class DummyChromium:
async def launch(self, headless, args, proxy=None):
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr("browser_use.browser.browser.async_playwright", lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(headless=True, disable_security=False, extra_browser_args=["--test"])
browser_obj = Browser(config=config)
first_browser = await browser_obj.get_playwright_browser()
second_browser = await browser_obj.get_playwright_browser()
assert first_browser is second_browser, "Expected the browser to be cached and reused across calls."
await browser_obj.close()
"""
Test that get_playwright_browser returns a cached browser instance.
On the first call, the browser is initialized; on subsequent calls,
the same instance is returned.
"""
class DummyBrowser:
pass
class DummyChromium:
async def launch(self, headless, args, proxy=None):
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
monkeypatch.setattr('browser_use.browser.browser.async_playwright', lambda: DummyAsyncPlaywrightContext())
config = BrowserConfig(headless=True, disable_security=False, extra_browser_args=['--test'])
browser_obj = Browser(config=config)
first_browser = await browser_obj.get_playwright_browser()
second_browser = await browser_obj.get_playwright_browser()
assert first_browser is second_browser, 'Expected the browser to be cached and reused across calls.'
await browser_obj.close()
@pytest.mark.asyncio
async def test_close_error_handling(monkeypatch):
"""
Test that the close method properly handles exceptions thrown by
playwright_browser.close() and playwright.stop(), ensuring that the
browser's attributes are set to None even if errors occur.
"""
class DummyBrowserWithError:
async def close(self):
raise Exception("Close error simulation")
class DummyPlaywrightWithError:
async def stop(self):
raise Exception("Stop error simulation")
config = BrowserConfig()
browser_obj = Browser(config=config)
browser_obj.playwright_browser = DummyBrowserWithError()
browser_obj.playwright = DummyPlaywrightWithError()
await browser_obj.close()
assert browser_obj.playwright_browser is None, "Expected playwright_browser to be None after close"
assert browser_obj.playwright is None, "Expected playwright to be None after close"
"""
Test that the close method properly handles exceptions thrown by
playwright_browser.close() and playwright.stop(), ensuring that the
browser's attributes are set to None even if errors occur.
"""
class DummyBrowserWithError:
async def close(self):
raise Exception('Close error simulation')
class DummyPlaywrightWithError:
async def stop(self):
raise Exception('Stop error simulation')
config = BrowserConfig()
browser_obj = Browser(config=config)
browser_obj.playwright_browser = DummyBrowserWithError()
browser_obj.playwright = DummyPlaywrightWithError()
await browser_obj.close()
assert browser_obj.playwright_browser is None, 'Expected playwright_browser to be None after close'
assert browser_obj.playwright is None, 'Expected playwright to be None after close'
@pytest.mark.asyncio
async def test_standard_browser_launch_with_proxy(monkeypatch):
"""
Test that when a proxy is provided in the BrowserConfig, the _setup_builtin_browser method
correctly passes the proxy parameter to the playwright.chromium.launch method.
This test sets up a dummy async_playwright context and verifies that the dummy proxy is received.
"""
class DummyBrowser:
pass
# Create a dummy proxy settings instance.
dummy_proxy = ProxySettings(server="http://dummy.proxy")
class DummyChromium:
async def launch(self, headless, args, proxy=None):
# Assert that the proxy passed equals the dummy proxy provided in the configuration.
assert proxy == dummy_proxy, f"Expected proxy {dummy_proxy} but got {proxy}"
# We can also verify some base parameters if needed (headless, args) but our focus is proxy.
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
# Monkeypatch async_playwright to return our dummy async playwright context.
monkeypatch.setattr("browser_use.browser.browser.async_playwright", lambda: DummyAsyncPlaywrightContext())
# Create a BrowserConfig with the dummy proxy.
config = BrowserConfig(headless=False, disable_security=False, proxy=dummy_proxy)
browser_obj = Browser(config=config)
# Call get_playwright_browser and verify that the returned browser is as expected.
result_browser = await browser_obj.get_playwright_browser()
assert isinstance(result_browser, DummyBrowser), "Expected DummyBrowser from _setup_builtin_browser with proxy provided"
await browser_obj.close()
"""
Test that when a proxy is provided in the BrowserConfig, the _setup_builtin_browser method
correctly passes the proxy parameter to the playwright.chromium.launch method.
This test sets up a dummy async_playwright context and verifies that the dummy proxy is received.
"""
class DummyBrowser:
pass
# Create a dummy proxy settings instance.
dummy_proxy = ProxySettings(server='http://dummy.proxy')
class DummyChromium:
async def launch(self, headless, args, proxy=None):
# Assert that the proxy passed equals the dummy proxy provided in the configuration.
assert proxy == dummy_proxy, f'Expected proxy {dummy_proxy} but got {proxy}'
# We can also verify some base parameters if needed (headless, args) but our focus is proxy.
return DummyBrowser()
class DummyPlaywright:
def __init__(self):
self.chromium = DummyChromium()
async def stop(self):
pass
class DummyAsyncPlaywrightContext:
async def start(self):
return DummyPlaywright()
# Monkeypatch async_playwright to return our dummy async playwright context.
monkeypatch.setattr('browser_use.browser.browser.async_playwright', lambda: DummyAsyncPlaywrightContext())
# Create a BrowserConfig with the dummy proxy.
config = BrowserConfig(headless=False, disable_security=False, proxy=dummy_proxy)
browser_obj = Browser(config=config)
# Call get_playwright_browser and verify that the returned browser is as expected.
result_browser = await browser_obj.get_playwright_browser()
assert isinstance(result_browser, DummyBrowser), 'Expected DummyBrowser from _setup_builtin_browser with proxy provided'
await browser_obj.close()

View File

@@ -9,319 +9,354 @@ from browser_use.dom.views import DOMElementNode
def test_is_url_allowed():
"""
Test the _is_url_allowed method to verify that it correctly checks URLs against
the allowed domains configuration.
Scenario 1: When allowed_domains is None, all URLs should be allowed.
Scenario 2: When allowed_domains is a list, only URLs matching the allowed domain(s) are allowed.
Scenario 3: When the URL is malformed, it should return False.
"""
# Create a dummy Browser mock. Only the 'config' attribute is needed for _is_url_allowed.
dummy_browser = Mock()
# Set an empty config for dummy_browser; it won't be used in _is_url_allowed.
dummy_browser.config = Mock()
# Scenario 1: allowed_domains is None, any URL should be allowed.
config1 = BrowserContextConfig(allowed_domains=None)
context1 = BrowserContext(browser=dummy_browser, config=config1)
assert context1._is_url_allowed("http://anydomain.com") is True
assert context1._is_url_allowed("https://anotherdomain.org/path") is True
# Scenario 2: allowed_domains is provided.
allowed = ["example.com", "mysite.org"]
config2 = BrowserContextConfig(allowed_domains=allowed)
context2 = BrowserContext(browser=dummy_browser, config=config2)
# URL exactly matching
assert context2._is_url_allowed("http://example.com") is True
# URL with subdomain (should be allowed)
assert context2._is_url_allowed("http://sub.example.com/path") is True
# URL with different domain (should not be allowed)
assert context2._is_url_allowed("http://notexample.com") is False
# URL that matches second allowed domain
assert context2._is_url_allowed("https://mysite.org/page") is True
# URL with port number, still allowed (port is stripped)
assert context2._is_url_allowed("http://example.com:8080") is True
# Scenario 3: Malformed URL or empty domain
# urlparse will return an empty netloc for some malformed URLs.
assert context2._is_url_allowed("notaurl") is False
"""
Test the _is_url_allowed method to verify that it correctly checks URLs against
the allowed domains configuration.
Scenario 1: When allowed_domains is None, all URLs should be allowed.
Scenario 2: When allowed_domains is a list, only URLs matching the allowed domain(s) are allowed.
Scenario 3: When the URL is malformed, it should return False.
"""
# Create a dummy Browser mock. Only the 'config' attribute is needed for _is_url_allowed.
dummy_browser = Mock()
# Set an empty config for dummy_browser; it won't be used in _is_url_allowed.
dummy_browser.config = Mock()
# Scenario 1: allowed_domains is None, any URL should be allowed.
config1 = BrowserContextConfig(allowed_domains=None)
context1 = BrowserContext(browser=dummy_browser, config=config1)
assert context1._is_url_allowed('http://anydomain.com') is True
assert context1._is_url_allowed('https://anotherdomain.org/path') is True
# Scenario 2: allowed_domains is provided.
allowed = ['example.com', 'mysite.org']
config2 = BrowserContextConfig(allowed_domains=allowed)
context2 = BrowserContext(browser=dummy_browser, config=config2)
# URL exactly matching
assert context2._is_url_allowed('http://example.com') is True
# URL with subdomain (should be allowed)
assert context2._is_url_allowed('http://sub.example.com/path') is True
# URL with different domain (should not be allowed)
assert context2._is_url_allowed('http://notexample.com') is False
# URL that matches second allowed domain
assert context2._is_url_allowed('https://mysite.org/page') is True
# URL with port number, still allowed (port is stripped)
assert context2._is_url_allowed('http://example.com:8080') is True
# Scenario 3: Malformed URL or empty domain
# urlparse will return an empty netloc for some malformed URLs.
assert context2._is_url_allowed('notaurl') is False
def test_convert_simple_xpath_to_css_selector():
"""
Test the _convert_simple_xpath_to_css_selector method of BrowserContext.
This verifies that simple XPath expressions (with and without indices) are correctly converted to CSS selectors.
"""
# Test empty xpath returns empty string
assert BrowserContext._convert_simple_xpath_to_css_selector('') == ''
# Test a simple xpath without indices
xpath = "/html/body/div/span"
expected = "html > body > div > span"
result = BrowserContext._convert_simple_xpath_to_css_selector(xpath)
assert result == expected
# Test xpath with an index on one element: [2] should translate to :nth-of-type(2)
xpath = "/html/body/div[2]/span"
expected = "html > body > div:nth-of-type(2) > span"
result = BrowserContext._convert_simple_xpath_to_css_selector(xpath)
assert result == expected
# Test xpath with indices on multiple elements:
# For "li[3]" -> li:nth-of-type(3) and for "a[1]" -> a:nth-of-type(1)
xpath = "/ul/li[3]/a[1]"
expected = "ul > li:nth-of-type(3) > a:nth-of-type(1)"
result = BrowserContext._convert_simple_xpath_to_css_selector(xpath)
assert result == expected
"""
Test the _convert_simple_xpath_to_css_selector method of BrowserContext.
This verifies that simple XPath expressions (with and without indices) are correctly converted to CSS selectors.
"""
# Test empty xpath returns empty string
assert BrowserContext._convert_simple_xpath_to_css_selector('') == ''
# Test a simple xpath without indices
xpath = '/html/body/div/span'
expected = 'html > body > div > span'
result = BrowserContext._convert_simple_xpath_to_css_selector(xpath)
assert result == expected
# Test xpath with an index on one element: [2] should translate to :nth-of-type(2)
xpath = '/html/body/div[2]/span'
expected = 'html > body > div:nth-of-type(2) > span'
result = BrowserContext._convert_simple_xpath_to_css_selector(xpath)
assert result == expected
# Test xpath with indices on multiple elements:
# For "li[3]" -> li:nth-of-type(3) and for "a[1]" -> a:nth-of-type(1)
xpath = '/ul/li[3]/a[1]'
expected = 'ul > li:nth-of-type(3) > a:nth-of-type(1)'
result = BrowserContext._convert_simple_xpath_to_css_selector(xpath)
assert result == expected
def test_get_initial_state():
"""
Test the _get_initial_state method to verify it returns the correct initial BrowserState.
The test checks that when a dummy page with a URL is provided,
the returned state contains that URL and other default values.
"""
# Create a dummy browser since only its existence is needed.
dummy_browser = Mock()
dummy_browser.config = Mock()
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
# Define a dummy page with a 'url' attribute.
class DummyPage:
url = "http://dummy.com"
dummy_page = DummyPage()
# Call _get_initial_state with a page: URL should be set from page.url.
state_with_page = context._get_initial_state(page=dummy_page)
assert state_with_page.url == dummy_page.url
# Verify that the element_tree is initialized with tag 'root'
assert state_with_page.element_tree.tag_name == 'root'
# Call _get_initial_state without a page: URL should be empty.
state_without_page = context._get_initial_state()
assert state_without_page.url == ""
"""
Test the _get_initial_state method to verify it returns the correct initial BrowserState.
The test checks that when a dummy page with a URL is provided,
the returned state contains that URL and other default values.
"""
# Create a dummy browser since only its existence is needed.
dummy_browser = Mock()
dummy_browser.config = Mock()
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
# Define a dummy page with a 'url' attribute.
class DummyPage:
url = 'http://dummy.com'
dummy_page = DummyPage()
# Call _get_initial_state with a page: URL should be set from page.url.
state_with_page = context._get_initial_state(page=dummy_page)
assert state_with_page.url == dummy_page.url
# Verify that the element_tree is initialized with tag 'root'
assert state_with_page.element_tree.tag_name == 'root'
# Call _get_initial_state without a page: URL should be empty.
state_without_page = context._get_initial_state()
assert state_without_page.url == ''
@pytest.mark.asyncio
async def test_execute_javascript():
"""
Test the execute_javascript method by mocking the current page's evaluate function.
This ensures that when execute_javascript is called, it correctly returns the value
from the page's evaluate method.
"""
# Define a dummy page with an async evaluate method.
class DummyPage:
async def evaluate(self, script):
return "dummy_result"
# Create a dummy session object with a dummy current_page.
dummy_session = type("DummySession", (), {})()
dummy_session.current_page = DummyPage()
# Create a dummy browser mock with a minimal config.
dummy_browser = Mock()
dummy_browser.config = Mock()
# Initialize the BrowserContext with the dummy browser and config.
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
# Manually set the session to our dummy session.
context.session = dummy_session
# Call execute_javascript and verify it returns the expected result.
result = await context.execute_javascript("return 1+1")
assert result == "dummy_result"
"""
Test the execute_javascript method by mocking the current page's evaluate function.
This ensures that when execute_javascript is called, it correctly returns the value
from the page's evaluate method.
"""
# Define a dummy page with an async evaluate method.
class DummyPage:
async def evaluate(self, script):
return 'dummy_result'
# Create a dummy session object with a dummy current_page.
dummy_session = type('DummySession', (), {})()
dummy_session.current_page = DummyPage()
# Create a dummy browser mock with a minimal config.
dummy_browser = Mock()
dummy_browser.config = Mock()
# Initialize the BrowserContext with the dummy browser and config.
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
# Manually set the session to our dummy session.
context.session = dummy_session
# Call execute_javascript and verify it returns the expected result.
result = await context.execute_javascript('return 1+1')
assert result == 'dummy_result'
@pytest.mark.asyncio
async def test_enhanced_css_selector_for_element():
"""
Test the _enhanced_css_selector_for_element method to verify that
it returns the correct CSS selector string for a dummy DOMElementNode.
The test checks that:
- The provided xpath is correctly converted (handling indices),
- Class attributes are appended as CSS classes,
- Standard and dynamic attributes (including ones with special characters)
are correctly added to the selector.
"""
# Create a dummy DOMElementNode instance with a complex set of attributes.
dummy_element = DOMElementNode(
tag_name="div",
is_visible=True,
parent=None,
xpath="/html/body/div[2]",
attributes={
"class": "foo bar",
"id": "my-id",
"placeholder": 'some "quoted" text',
"data-testid": "123"
},
children=[]
)
# Call the method with include_dynamic_attributes=True.
actual_selector = BrowserContext._enhanced_css_selector_for_element(dummy_element, include_dynamic_attributes=True)
# Expected conversion:
# 1. The xpath "/html/body/div[2]" converts to "html > body > div:nth-of-type(2)".
# 2. The class attribute "foo bar" appends ".foo.bar".
# 3. The "id" attribute is added as [id="my-id"].
# 4. The "placeholder" attribute contains quotes; it is added as
# [placeholder*="some \"quoted\" text"].
# 5. The dynamic attribute "data-testid" is added as [data-testid="123"].
expected_selector = 'html > body > div:nth-of-type(2).foo.bar[id="my-id"][placeholder*="some \\"quoted\\" text"][data-testid="123"]'
assert actual_selector == expected_selector, f"Expected {expected_selector}, but got {actual_selector}"
"""
Test the _enhanced_css_selector_for_element method to verify that
it returns the correct CSS selector string for a dummy DOMElementNode.
The test checks that:
- The provided xpath is correctly converted (handling indices),
- Class attributes are appended as CSS classes,
- Standard and dynamic attributes (including ones with special characters)
are correctly added to the selector.
"""
# Create a dummy DOMElementNode instance with a complex set of attributes.
dummy_element = DOMElementNode(
tag_name='div',
is_visible=True,
parent=None,
xpath='/html/body/div[2]',
attributes={'class': 'foo bar', 'id': 'my-id', 'placeholder': 'some "quoted" text', 'data-testid': '123'},
children=[],
)
# Call the method with include_dynamic_attributes=True.
actual_selector = BrowserContext._enhanced_css_selector_for_element(dummy_element, include_dynamic_attributes=True)
# Expected conversion:
# 1. The xpath "/html/body/div[2]" converts to "html > body > div:nth-of-type(2)".
# 2. The class attribute "foo bar" appends ".foo.bar".
# 3. The "id" attribute is added as [id="my-id"].
# 4. The "placeholder" attribute contains quotes; it is added as
# [placeholder*="some \"quoted\" text"].
# 5. The dynamic attribute "data-testid" is added as [data-testid="123"].
expected_selector = (
'html > body > div:nth-of-type(2).foo.bar[id="my-id"][placeholder*="some \\"quoted\\" text"][data-testid="123"]'
)
assert actual_selector == expected_selector, f'Expected {expected_selector}, but got {actual_selector}'
@pytest.mark.asyncio
async def test_get_scroll_info():
"""
Test the get_scroll_info method by mocking the page's evaluate method.
This dummy page returns preset values for window.scrollY, window.innerHeight,
and document.documentElement.scrollHeight. The test then verifies that the
computed scroll information (pixels_above and pixels_below) match the expected values.
"""
# Define a dummy page with an async evaluate method returning preset values.
class DummyPage:
async def evaluate(self, script):
if "window.scrollY" in script:
return 100 # scrollY
elif "window.innerHeight" in script:
return 500 # innerHeight
elif "document.documentElement.scrollHeight" in script:
return 1200 # total scrollable height
return None
# Create a dummy session with a dummy current_page.
dummy_session = type("DummySession", (), {})()
dummy_session.current_page = DummyPage()
# We also need a dummy context attribute but it won't be used in this test.
dummy_session.context = type("DummyContext", (), {})()
# Create a dummy browser mock.
dummy_browser = Mock()
dummy_browser.config = Mock()
# Initialize BrowserContext with the dummy browser and config.
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
# Manually set the session to our dummy session.
context.session = dummy_session
# Call get_scroll_info on the dummy page.
pixels_above, pixels_below = await context.get_scroll_info(dummy_session.current_page)
# Expected calculations:
# pixels_above = scrollY = 100
# pixels_below = total_height - (scrollY + innerHeight) = 1200 - (100 + 500) = 600
assert pixels_above == 100, f"Expected 100 pixels above, got {pixels_above}"
assert pixels_below == 600, f"Expected 600 pixels below, got {pixels_below}"
"""
Test the get_scroll_info method by mocking the page's evaluate method.
This dummy page returns preset values for window.scrollY, window.innerHeight,
and document.documentElement.scrollHeight. The test then verifies that the
computed scroll information (pixels_above and pixels_below) match the expected values.
"""
# Define a dummy page with an async evaluate method returning preset values.
class DummyPage:
async def evaluate(self, script):
if 'window.scrollY' in script:
return 100 # scrollY
elif 'window.innerHeight' in script:
return 500 # innerHeight
elif 'document.documentElement.scrollHeight' in script:
return 1200 # total scrollable height
return None
# Create a dummy session with a dummy current_page.
dummy_session = type('DummySession', (), {})()
dummy_session.current_page = DummyPage()
# We also need a dummy context attribute but it won't be used in this test.
dummy_session.context = type('DummyContext', (), {})()
# Create a dummy browser mock.
dummy_browser = Mock()
dummy_browser.config = Mock()
# Initialize BrowserContext with the dummy browser and config.
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
# Manually set the session to our dummy session.
context.session = dummy_session
# Call get_scroll_info on the dummy page.
pixels_above, pixels_below = await context.get_scroll_info(dummy_session.current_page)
# Expected calculations:
# pixels_above = scrollY = 100
# pixels_below = total_height - (scrollY + innerHeight) = 1200 - (100 + 500) = 600
assert pixels_above == 100, f'Expected 100 pixels above, got {pixels_above}'
assert pixels_below == 600, f'Expected 600 pixels below, got {pixels_below}'
@pytest.mark.asyncio
async def test_reset_context():
"""
Test the reset_context method to ensure it correctly closes all existing tabs,
resets the cached state, and creates a new page.
"""
# Dummy Page with close and wait_for_load_state methods.
class DummyPage:
def __init__(self, url="http://dummy.com"):
self.url = url
self.closed = False
async def close(self):
self.closed = True
async def wait_for_load_state(self):
pass
# Dummy Context that holds pages and can create a new page.
class DummyContext:
def __init__(self):
self.pages = []
async def new_page(self):
new_page = DummyPage(url="")
self.pages.append(new_page)
return new_page
# Create a dummy session with a context containing two pages.
dummy_session = type("DummySession", (), {})()
dummy_context = DummyContext()
page1 = DummyPage(url="http://page1.com")
page2 = DummyPage(url="http://page2.com")
dummy_context.pages.extend([page1, page2])
dummy_session.context = dummy_context
dummy_session.current_page = page1
dummy_session.cached_state = None
# Create a dummy browser mock.
dummy_browser = Mock()
dummy_browser.config = Mock()
# Initialize BrowserContext using our dummy_browser and config,
# and manually set its session to our dummy session.
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
context.session = dummy_session
# Confirm session has 2 pages before reset.
assert len(dummy_session.context.pages) == 2
# Call reset_context which should close existing pages,
# reset the cached state, and create a new page as current_page.
await context.reset_context()
# Verify that initial pages were closed.
assert page1.closed is True
assert page2.closed is True
# Check that a new page is created and set as current_page.
assert dummy_session.current_page is not None
new_page = dummy_session.current_page
# New page URL should be empty as per _get_initial_state.
assert new_page.url == ""
# Verify that cached_state is reset to an initial BrowserState.
state = dummy_session.cached_state
assert isinstance(state, BrowserState)
assert state.url == ""
assert state.element_tree.tag_name == 'root'
"""
Test the reset_context method to ensure it correctly closes all existing tabs,
resets the cached state, and creates a new page.
"""
# Dummy Page with close and wait_for_load_state methods.
class DummyPage:
def __init__(self, url='http://dummy.com'):
self.url = url
self.closed = False
async def close(self):
self.closed = True
async def wait_for_load_state(self):
pass
# Dummy Context that holds pages and can create a new page.
class DummyContext:
def __init__(self):
self.pages = []
async def new_page(self):
new_page = DummyPage(url='')
self.pages.append(new_page)
return new_page
# Create a dummy session with a context containing two pages.
dummy_session = type('DummySession', (), {})()
dummy_context = DummyContext()
page1 = DummyPage(url='http://page1.com')
page2 = DummyPage(url='http://page2.com')
dummy_context.pages.extend([page1, page2])
dummy_session.context = dummy_context
dummy_session.current_page = page1
dummy_session.cached_state = None
# Create a dummy browser mock.
dummy_browser = Mock()
dummy_browser.config = Mock()
# Initialize BrowserContext using our dummy_browser and config,
# and manually set its session to our dummy session.
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
context.session = dummy_session
# Confirm session has 2 pages before reset.
assert len(dummy_session.context.pages) == 2
# Call reset_context which should close existing pages,
# reset the cached state, and create a new page as current_page.
await context.reset_context()
# Verify that initial pages were closed.
assert page1.closed is True
assert page2.closed is True
# Check that a new page is created and set as current_page.
assert dummy_session.current_page is not None
new_page = dummy_session.current_page
# New page URL should be empty as per _get_initial_state.
assert new_page.url == ''
# Verify that cached_state is reset to an initial BrowserState.
state = dummy_session.cached_state
assert isinstance(state, BrowserState)
assert state.url == ''
assert state.element_tree.tag_name == 'root'
@pytest.mark.asyncio
async def test_take_screenshot():
"""
Test the take_screenshot method to verify that it returns a base64 encoded screenshot string.
A dummy page with a mocked screenshot method is used, returning a predefined byte string.
"""
class DummyPage:
async def screenshot(self, full_page, animations):
# Verify that parameters are forwarded correctly.
assert full_page is True, "full_page parameter was not correctly passed"
assert animations == 'disabled', "animations parameter was not correctly passed"
# Return a test byte string.
return b'test'
# Create a dummy session with the DummyPage as the current_page.
dummy_session = type("DummySession", (), {})()
dummy_session.current_page = DummyPage()
dummy_session.context = None # Not used in this test
# Create a dummy browser mock.
dummy_browser = Mock()
dummy_browser.config = Mock()
# Initialize the BrowserContext with the dummy browser and config.
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
# Manually set the session to our dummy session.
context.session = dummy_session
# Call take_screenshot and check that it returns the expected base64 encoded string.
result = await context.take_screenshot(full_page=True)
expected = base64.b64encode(b'test').decode('utf-8')
assert result == expected, f"Expected {expected}, but got {result}"
"""
Test the take_screenshot method to verify that it returns a base64 encoded screenshot string.
A dummy page with a mocked screenshot method is used, returning a predefined byte string.
"""
class DummyPage:
async def screenshot(self, full_page, animations):
# Verify that parameters are forwarded correctly.
assert full_page is True, 'full_page parameter was not correctly passed'
assert animations == 'disabled', 'animations parameter was not correctly passed'
# Return a test byte string.
return b'test'
# Create a dummy session with the DummyPage as the current_page.
dummy_session = type('DummySession', (), {})()
dummy_session.current_page = DummyPage()
dummy_session.context = None # Not used in this test
# Create a dummy browser mock.
dummy_browser = Mock()
dummy_browser.config = Mock()
# Initialize the BrowserContext with the dummy browser and config.
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
# Manually set the session to our dummy session.
context.session = dummy_session
# Call take_screenshot and check that it returns the expected base64 encoded string.
result = await context.take_screenshot(full_page=True)
expected = base64.b64encode(b'test').decode('utf-8')
assert result == expected, f'Expected {expected}, but got {result}'
@pytest.mark.asyncio
async def test_refresh_page_behavior():
"""
Test the refresh_page method of BrowserContext to verify that it correctly reloads the current page
and waits for the page's load state. This is done by creating a dummy page that flags when its
reload and wait_for_load_state methods are called.
"""
class DummyPage:
def __init__(self):
self.reload_called = False
self.wait_for_load_state_called = False
async def reload(self):
self.reload_called = True
async def wait_for_load_state(self):
self.wait_for_load_state_called = True
# Create a dummy session with the dummy page as the current_page.
dummy_page = DummyPage()
dummy_session = type("DummySession", (), {})()
dummy_session.current_page = dummy_page
dummy_session.context = None # Not required for this test
# Create a dummy browser mock
dummy_browser = Mock()
dummy_browser.config = Mock()
# Initialize BrowserContext with the dummy browser and config,
# and manually set its session to our dummy session.
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
context.session = dummy_session
# Call refresh_page and verify that reload and wait_for_load_state were called.
await context.refresh_page()
assert dummy_page.reload_called is True, "Expected the page to call reload()"
assert dummy_page.wait_for_load_state_called is True, "Expected the page to call wait_for_load_state()"
"""
Test the refresh_page method of BrowserContext to verify that it correctly reloads the current page
and waits for the page's load state. This is done by creating a dummy page that flags when its
reload and wait_for_load_state methods are called.
"""
class DummyPage:
def __init__(self):
self.reload_called = False
self.wait_for_load_state_called = False
async def reload(self):
self.reload_called = True
async def wait_for_load_state(self):
self.wait_for_load_state_called = True
# Create a dummy session with the dummy page as the current_page.
dummy_page = DummyPage()
dummy_session = type('DummySession', (), {})()
dummy_session.current_page = dummy_page
dummy_session.context = None # Not required for this test
# Create a dummy browser mock
dummy_browser = Mock()
dummy_browser.config = Mock()
# Initialize BrowserContext with the dummy browser and config,
# and manually set its session to our dummy session.
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
context.session = dummy_session
# Call refresh_page and verify that reload and wait_for_load_state were called.
await context.refresh_page()
assert dummy_page.reload_called is True, 'Expected the page to call reload()'
assert dummy_page.wait_for_load_state_called is True, 'Expected the page to call wait_for_load_state()'
@pytest.mark.asyncio
async def test_remove_highlights_failure():
"""
Test the remove_highlights method to ensure that if the page.evaluate call fails,
the exception is caught and does not propagate (i.e. the method handles errors gracefully).
"""
# Dummy page that always raises an exception when evaluate is called.
class DummyPage:
async def evaluate(self, script):
raise Exception("dummy error")
# Create a dummy session with the DummyPage as current_page.
dummy_session = type("DummySession", (), {})()
dummy_session.current_page = DummyPage()
dummy_session.context = None # Not used in this test
# Create a dummy browser mock.
dummy_browser = Mock()
dummy_browser.config = Mock()
# Initialize BrowserContext with the dummy browser and configuration.
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
context.session = dummy_session
# Call remove_highlights and verify that no exception is raised.
try:
await context.remove_highlights()
except Exception as e:
pytest.fail(f"remove_highlights raised an exception: {e}")
"""
Test the remove_highlights method to ensure that if the page.evaluate call fails,
the exception is caught and does not propagate (i.e. the method handles errors gracefully).
"""
# Dummy page that always raises an exception when evaluate is called.
class DummyPage:
async def evaluate(self, script):
raise Exception('dummy error')
# Create a dummy session with the DummyPage as current_page.
dummy_session = type('DummySession', (), {})()
dummy_session.current_page = DummyPage()
dummy_session.context = None # Not used in this test
# Create a dummy browser mock.
dummy_browser = Mock()
dummy_browser.config = Mock()
# Initialize BrowserContext with the dummy browser and configuration.
context = BrowserContext(browser=dummy_browser, config=BrowserContextConfig())
context.session = dummy_session
# Call remove_highlights and verify that no exception is raised.
try:
await context.remove_highlights()
except Exception as e:
pytest.fail(f'remove_highlights raised an exception: {e}')

View File

@@ -1,6 +1,7 @@
"""
Test dropdown interaction functionality.
"""
import pytest
from browser_use.agent.service import Agent
@@ -9,31 +10,31 @@ from browser_use.agent.views import AgentHistoryList
@pytest.mark.asyncio
async def test_dropdown(llm, browser_context):
"""Test selecting an option from a dropdown menu."""
agent = Agent(
task=(
'go to https://codepen.io/geheimschriftstift/pen/mPLvQz and first get all options for the dropdown and then select the 5th option'
),
llm=llm,
browser_context=browser_context,
)
"""Test selecting an option from a dropdown menu."""
agent = Agent(
task=(
'go to https://codepen.io/geheimschriftstift/pen/mPLvQz and first get all options for the dropdown and then select the 5th option'
),
llm=llm,
browser_context=browser_context,
)
try:
history: AgentHistoryList = await agent.run(20)
result = history.final_result()
try:
history: AgentHistoryList = await agent.run(20)
result = history.final_result()
# Verify dropdown interaction
assert result is not None
assert 'Duck' in result, "Expected 5th option 'Duck' to be selected"
# Verify dropdown interaction
assert result is not None
assert 'Duck' in result, "Expected 5th option 'Duck' to be selected"
# Verify dropdown state
element = await browser_context.get_element_by_selector('select')
assert element is not None, "Dropdown element should exist"
# Verify dropdown state
element = await browser_context.get_element_by_selector('select')
assert element is not None, 'Dropdown element should exist'
value = await element.evaluate('el => el.value')
assert value == '5', "Dropdown should have 5th option selected"
value = await element.evaluate('el => el.value')
assert value == '5', 'Dropdown should have 5th option selected'
except Exception as e:
pytest.fail(f"Dropdown test failed: {str(e)}")
finally:
await browser_context.close()
except Exception as e:
pytest.fail(f'Dropdown test failed: {str(e)}')
finally:
await browser_context.close()

View File

@@ -1,6 +1,7 @@
"""
Test complex dropdown interaction functionality.
"""
import pytest
from browser_use.agent.service import Agent
@@ -9,35 +10,35 @@ from browser_use.agent.views import AgentHistoryList
@pytest.mark.asyncio
async def test_dropdown_complex(llm, browser_context):
"""Test selecting an option from a complex dropdown menu."""
agent = Agent(
task=(
'go to https://codepen.io/shyam-king/pen/pvzpByJ and first get all options for the dropdown and then select the json option'
),
llm=llm,
browser_context=browser_context,
)
"""Test selecting an option from a complex dropdown menu."""
agent = Agent(
task=(
'go to https://codepen.io/shyam-king/pen/pvzpByJ and first get all options for the dropdown and then select the json option'
),
llm=llm,
browser_context=browser_context,
)
try:
history: AgentHistoryList = await agent.run(20)
result = history.final_result()
try:
history: AgentHistoryList = await agent.run(20)
result = history.final_result()
# Verify dropdown interaction
assert result is not None
assert 'json' in result.lower(), "Expected 'json' option to be selected"
# Verify dropdown interaction
assert result is not None
assert 'json' in result.lower(), "Expected 'json' option to be selected"
# Verify dropdown state
element = await browser_context.get_element_by_selector('.select-selected')
assert element is not None, "Custom dropdown element should exist"
# Verify dropdown state
element = await browser_context.get_element_by_selector('.select-selected')
assert element is not None, 'Custom dropdown element should exist'
text = await element.text_content()
assert 'json' in text.lower(), "Dropdown should display json option"
text = await element.text_content()
assert 'json' in text.lower(), 'Dropdown should display json option'
# Verify the selected option's effect
code_element = await browser_context.get_element_by_selector('pre code')
assert code_element is not None, "Code element should be visible when JSON is selected"
# Verify the selected option's effect
code_element = await browser_context.get_element_by_selector('pre code')
assert code_element is not None, 'Code element should be visible when JSON is selected'
except Exception as e:
pytest.fail(f"Complex dropdown test failed: {str(e)}")
finally:
await browser_context.close()
except Exception as e:
pytest.fail(f'Complex dropdown test failed: {str(e)}')
finally:
await browser_context.close()

View File

@@ -20,9 +20,7 @@ llm = ChatOpenAI(model='gpt-4o')
# browser = Browser(config=BrowserConfig(headless=False))
agent = Agent(
task=(
'go to https://codepen.io/shyam-king/pen/emOyjKm and select number "4" and return the output of "selected value"'
),
task=('go to https://codepen.io/shyam-king/pen/emOyjKm and select number "4" and return the output of "selected value"'),
llm=llm,
browser_context=BrowserContext(
browser=Browser(config=BrowserConfig(headless=False, disable_security=True)),

View File

@@ -19,24 +19,22 @@ from browser_use import Agent, AgentHistoryList
llm = ChatOpenAI(model='gpt-4o')
agent = Agent(
task=(
'go to google.com and search for text "hi there"'
),
task=('go to google.com and search for text "hi there"'),
llm=llm,
browser_context=BrowserContext(
browser=Browser(config=BrowserConfig(headless=False, disable_security=True)),
),
generate_gif="./google.gif"
generate_gif='./google.gif',
)
async def test_gif_path():
if os.path.exists("./google.gif"):
os.unlink("./google.gif")
if os.path.exists('./google.gif'):
os.unlink('./google.gif')
history: AgentHistoryList = await agent.run(20)
result = history.final_result()
assert result is not None
assert os.path.exists("./google.gif"), "google.gif was not created"
assert os.path.exists('./google.gif'), 'google.gif was not created'

View File

@@ -83,7 +83,7 @@ async def test_random_samples(test_cases: List[Dict[str, Any]], llm, context, va
samples = random.sample(test_cases, 1)
for i, case in enumerate(samples, 1):
task = f"Go to {case['website']}.com and {case['confirmed_task']}"
task = f'Go to {case["website"]}.com and {case["confirmed_task"]}'
logger.info(f'--- Random Sample {i}/{len(samples)} ---')
logger.info(f'Task: {task}\n')

View File

@@ -4,7 +4,7 @@ import sys
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
sys.path.insert(0, project_root)
import pytest
from dotenv import load_dotenv
@@ -24,43 +24,45 @@ load_dotenv()
llm = ChatOpenAI(model='gpt-4o')
controller = Controller()
@pytest.mark.skip(reason='this is for local testing only')
async def test_wait_for_element():
"""Test 'Wait for element' action."""
initial_actions = [
{'open_tab': {'url': 'https://pypi.org/'}},
# Uncomment the line below to include the wait action in initial actions.
# {'wait_for_element': {'selector': '#search', 'timeout': 30}},
]
# Set up the browser context.
context = BrowserContext(
browser=Browser(config=BrowserConfig(headless=False, disable_security=True)),
)
# Create the agent with the task.
agent = Agent(
task="Wait for element '#search' to be visible with a timeout of 30 seconds.",
llm=llm,
browser_context=context,
initial_actions=initial_actions,
controller=controller
)
# Run the agent for a few steps to trigger navigation and then the wait action.
history = await agent.run(max_steps=3)
action_names = history.action_names()
# Ensure that the wait_for_element action was executed.
assert 'wait_for_element' in action_names, "Expected wait_for_element action to be executed."
# Verify that the #search element is visible by querying the page.
page = await context.get_current_page()
header_handle = await page.query_selector("#search")
assert header_handle is not None, "Expected to find a #search element on the page."
is_visible = await header_handle.is_visible()
assert is_visible, "Expected the #search element to be visible."
"""Test 'Wait for element' action."""
initial_actions = [
{'open_tab': {'url': 'https://pypi.org/'}},
# Uncomment the line below to include the wait action in initial actions.
# {'wait_for_element': {'selector': '#search', 'timeout': 30}},
]
# Set up the browser context.
context = BrowserContext(
browser=Browser(config=BrowserConfig(headless=False, disable_security=True)),
)
# Create the agent with the task.
agent = Agent(
task="Wait for element '#search' to be visible with a timeout of 30 seconds.",
llm=llm,
browser_context=context,
initial_actions=initial_actions,
controller=controller,
)
# Run the agent for a few steps to trigger navigation and then the wait action.
history = await agent.run(max_steps=3)
action_names = history.action_names()
# Ensure that the wait_for_element action was executed.
assert 'wait_for_element' in action_names, 'Expected wait_for_element action to be executed.'
# Verify that the #search element is visible by querying the page.
page = await context.get_current_page()
header_handle = await page.query_selector('#search')
assert header_handle is not None, 'Expected to find a #search element on the page.'
is_visible = await header_handle.is_visible()
assert is_visible, 'Expected the #search element to be visible.'
if __name__ == '__main__':
asyncio.run(test_wait_for_element())
asyncio.run(test_wait_for_element())