Bugfixes for recordings, traces, and conversation log output, improvements to wait_for_load_state timeout handling, make more things async, improve parallel tab setup handling (#1973)

This commit is contained in:
Nick Sweeting
2025-06-14 00:31:04 -07:00
committed by GitHub
16 changed files with 1029 additions and 496 deletions

View File

@@ -11,8 +11,8 @@ name: publish
on:
release:
types: [published] # publish full release to PyPI when a release is created on Github
schedule:
- cron: "0 17 * * FRI" # tag a pre-release on Github every Friday at 5 PM UTC
# schedule:
# - cron: "0 17 * * FRI" # tag a pre-release on Github every Friday at 5 PM UTC
workflow_dispatch:
permissions:
@@ -21,7 +21,7 @@ permissions:
jobs:
tag_pre_release:
if: github.event_name == 'schedule'
if: github.event_name == 'workflow_dispatch'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

View File

@@ -2,10 +2,11 @@ from __future__ import annotations
import json
import logging
import os
import re
from pathlib import Path
from typing import Any
import anyio
from langchain_core.messages import (
AIMessage,
BaseMessage,
@@ -108,42 +109,46 @@ def _merge_successive_messages(messages: list[BaseMessage], class_to_merge: type
return merged_messages
def save_conversation(input_messages: list[BaseMessage], response: Any, target: str, encoding: str | None = None) -> None:
"""Save conversation history to file."""
async def save_conversation(
input_messages: list[BaseMessage], response: Any, target: str | Path, encoding: str | None = None
) -> None:
"""Save conversation history to file asynchronously."""
target_path = Path(target)
# create folders if not exists
if dirname := os.path.dirname(target):
os.makedirs(dirname, exist_ok=True)
if target_path.parent:
await anyio.Path(target_path.parent).mkdir(parents=True, exist_ok=True)
with open(
target,
'w',
encoding=encoding,
) as f:
_write_messages_to_file(f, input_messages)
_write_response_to_file(f, response)
await anyio.Path(target_path).write_text(await _format_conversation(input_messages, response), encoding=encoding or 'utf-8')
def _write_messages_to_file(f: Any, messages: list[BaseMessage]) -> None:
"""Write messages to conversation file"""
async def _format_conversation(messages: list[BaseMessage], response: Any) -> str:
"""Format the conversation including messages and response."""
lines = []
# Format messages
for message in messages:
f.write(f' {message.__class__.__name__} \n')
lines.append(f' {message.__class__.__name__} ')
if isinstance(message.content, list):
for item in message.content:
if isinstance(item, dict) and item.get('type') == 'text':
f.write(item['text'].strip() + '\n')
lines.append(item['text'].strip())
elif isinstance(message.content, str):
try:
content = json.loads(message.content)
f.write(json.dumps(content, indent=2) + '\n')
lines.append(json.dumps(content, indent=2))
except json.JSONDecodeError:
f.write(message.content.strip() + '\n')
lines.append(message.content.strip())
f.write('\n')
lines.append('') # Empty line after each message
# Format response
lines.append(' RESPONSE')
lines.append(json.dumps(json.loads(response.model_dump_json(exclude_unset=True)), indent=2))
return '\n'.join(lines)
def _write_response_to_file(f: Any, response: Any) -> None:
"""Write model response to conversation file"""
f.write(' RESPONSE\n')
f.write(json.dumps(json.loads(response.model_dump_json(exclude_unset=True)), indent=2))
# Note: _write_messages_to_file and _write_response_to_file have been merged into _format_conversation
# This is more efficient for async operations and reduces file I/O

View File

@@ -15,8 +15,6 @@ from typing import Any, Generic, TypeVar
from dotenv import load_dotenv
from browser_use.browser.session import DEFAULT_BROWSER_PROFILE
load_dotenv()
from langchain_core.language_models.chat_models import BaseChatModel
@@ -53,6 +51,7 @@ from browser_use.agent.views import (
ToolCallingMethod,
)
from browser_use.browser import BrowserProfile, BrowserSession
from browser_use.browser.session import DEFAULT_BROWSER_PROFILE
# from lmnr.sdk.decorators import observe
from browser_use.browser.views import BrowserStateSummary
@@ -67,7 +66,7 @@ from browser_use.telemetry.service import ProductTelemetry
from browser_use.telemetry.views import (
AgentTelemetryEvent,
)
from browser_use.utils import get_browser_use_version, time_execution_async, time_execution_sync
from browser_use.utils import _log_pretty_path, get_browser_use_version, time_execution_async, time_execution_sync
logger = logging.getLogger(__name__)
@@ -132,7 +131,7 @@ class Agent(Generic[Context]):
# Agent settings
use_vision: bool = True,
use_vision_for_planner: bool = False,
save_conversation_path: str | None = None,
save_conversation_path: str | Path | None = None,
save_conversation_path_encoding: str | None = 'utf-8',
max_failures: int = 3,
retry_delay: int = 10,
@@ -417,7 +416,8 @@ class Agent(Generic[Context]):
self.telemetry = ProductTelemetry()
if self.settings.save_conversation_path:
self.logger.info(f'Saving conversation to {self.settings.save_conversation_path}')
self.settings.save_conversation_path = Path(self.settings.save_conversation_path).expanduser().resolve()
self.logger.info(f'💬 Saving conversation to {_log_pretty_path(self.settings.save_conversation_path)}')
self._external_pause_event = asyncio.Event()
self._external_pause_event.set()
@@ -910,8 +910,11 @@ class Agent(Generic[Context]):
else:
self.register_new_step_callback(browser_state_summary, model_output, self.state.n_steps)
if self.settings.save_conversation_path:
target = self.settings.save_conversation_path + f'_{self.state.n_steps}.txt'
save_conversation(input_messages, model_output, target, self.settings.save_conversation_path_encoding)
# Treat save_conversation_path as a directory (consistent with other recording paths)
conversation_dir = Path(self.settings.save_conversation_path)
conversation_filename = f'conversation_{self.id}_{self.state.n_steps}.txt'
target = conversation_dir / conversation_filename
await save_conversation(input_messages, model_output, target, self.settings.save_conversation_path_encoding)
self._message_manager._remove_last_state_message() # we dont want the whole state in the chat history
@@ -1591,7 +1594,7 @@ class Agent(Generic[Context]):
self.logger.info('❌ Task completed without success')
total_tokens = self.state.history.total_input_tokens()
self.logger.debug(f'📝 Total input tokens used (approximate): {total_tokens}')
self.logger.debug(f'💲 Total input tokens used (approximate): {total_tokens}')
if self.register_done_callback:
if inspect.iscoroutinefunction(self.register_done_callback):

View File

@@ -39,7 +39,7 @@ class AgentSettings(BaseModel):
use_vision: bool = True
use_vision_for_planner: bool = False
save_conversation_path: str | None = None
save_conversation_path: str | Path | None = None
save_conversation_path_encoding: str | None = 'utf-8'
max_failures: int = 3
retry_delay: int = 10

View File

@@ -375,9 +375,11 @@ class BrowserContextArgs(BaseModel):
record_har_content: RecordHarContent = RecordHarContent.EMBED
record_har_mode: RecordHarMode = RecordHarMode.FULL
record_har_omit_content: bool = False
record_har_path: str | Path | None = None
record_har_path: str | Path | None = Field(default=None, validation_alias=AliasChoices('save_har_path', 'record_har_path'))
record_har_url_filter: str | Pattern | None = None
record_video_dir: str | Path | None = None
record_video_dir: str | Path | None = Field(
default=None, validation_alias=AliasChoices('save_recording_path', 'record_video_dir')
)
record_video_size: ViewportSize | None = None
@@ -452,7 +454,11 @@ class BrowserLaunchArgs(BaseModel):
description='Directory to save downloads to.',
validation_alias=AliasChoices('downloads_dir', 'save_downloads_path'),
)
traces_dir: str | Path | None = Field(default=None, description='Directory to save HAR trace files to.')
traces_dir: str | Path | None = Field(
default=None,
description='Directory for saving playwright trace.zip files (playwright actions, screenshots, DOM snapshots, HAR traces).',
validation_alias=AliasChoices('trace_path', 'traces_dir'),
)
handle_sighup: bool = Field(
default=True, description='Whether playwright should swallow SIGHUP signals and kill the browser.'
)
@@ -539,7 +545,7 @@ class BrowserLaunchPersistentContextArgs(BrowserLaunchArgs, BrowserContextArgs):
class BrowserProfile(BrowserConnectArgs, BrowserLaunchPersistentContextArgs, BrowserLaunchArgs, BrowserNewContextArgs):
"""
A BrowserProfile is a static collection of kwargs that get passed to:
A BrowserProfile is a static template collection of kwargs that can be passed to:
- BrowserType.launch(**BrowserLaunchArgs)
- BrowserType.connect(**BrowserConnectArgs)
- BrowserType.connect_over_cdp(**BrowserConnectArgs)
@@ -575,7 +581,7 @@ class BrowserProfile(BrowserConnectArgs, BrowserLaunchPersistentContextArgs, Bro
keep_alive: bool | None = Field(default=None, description='Keep browser alive after agent run.')
window_size: ViewportSize | None = Field(
default=None,
description='Window size to use for the browser when headless=False.',
description='Browser window size to use when headless=False.',
)
window_height: int | None = Field(default=None, description='DEPRECATED, use window_size["height"] instead', exclude=True)
window_width: int | None = Field(default=None, description='DEPRECATED, use window_size["width"] instead', exclude=True)
@@ -599,13 +605,16 @@ class BrowserProfile(BrowserConnectArgs, BrowserLaunchPersistentContextArgs, Bro
profile_directory: str = 'Default' # e.g. 'Profile 1', 'Profile 2', 'Custom Profile', etc.
save_har_path: str | None = Field(default=None, description='Directory for saving HAR files.')
trace_path: str | None = Field(default=None, description='Directory for saving trace files.')
# these can be found in BrowserLaunchArgs, BrowserLaunchPersistentContextArgs, BrowserNewContextArgs, BrowserConnectArgs:
# save_recording_path: alias of record_video_dir
# save_har_path: alias of record_har_path
# trace_path: alias of traces_dir
cookies_file: Path | None = Field(
default=None, description='File to save cookies to. DEPRECATED, use `storage_state` instead.'
)
# TODO: finish implementing extension support in extensions.py
# extension_ids_to_preinstall: list[str] = Field(
# default_factory=list, description='List of Chrome extension IDs to preinstall.'
# )
@@ -625,6 +634,9 @@ class BrowserProfile(BrowserConnectArgs, BrowserLaunchPersistentContextArgs, Bro
def copy_old_config_names_to_new(self) -> Self:
"""Copy old config window_width & window_height to window_size."""
if self.window_width or self.window_height:
logger.warning(
f'⚠️ BrowserProfile(window_width=..., window_height=...) are deprecated, use BrowserProfile(window_size={"width": 1280, "height": 1100}) instead.'
)
self.window_size = self.window_size or {}
self.window_size['width'] = (self.window_size or {}).get('width') or self.window_width or 1280
self.window_size['height'] = (self.window_size or {}).get('height') or self.window_height or 1100
@@ -643,13 +655,16 @@ class BrowserProfile(BrowserConnectArgs, BrowserLaunchPersistentContextArgs, Bro
f'⚠️ BrowserSession(...) was passed both {static_source} AND user_data_dir. {static_source}={self.storage_state or self.cookies_file} will forcibly overwrite '
f'cookies/localStorage/sessionStorage in user_data_dir={self.user_data_dir}. '
f'For multiple browsers in parallel, use only storage_state with user_data_dir=None, '
f'or use separate user_data_dirs for each browser and set storage_state=None.'
f'or use a separate user_data_dir for each browser and set storage_state=None.'
)
return self
@model_validator(mode='after')
def warn_user_data_dir_non_default_version(self) -> Self:
"""If user is using default profile dir with a non-default channel, force-change it to avoid corrupting the default data dir."""
"""
If user is using default profile dir with a non-default channel, force-change it
to avoid corrupting the default data dir created with a different channel.
"""
is_not_using_default_chromium = self.executable_path or self.channel not in (BROWSERUSE_DEFAULT_CHANNEL, None)
if self.user_data_dir == BROWSERUSE_CHROMIUM_USER_DATA_DIR and is_not_using_default_chromium:
@@ -663,6 +678,8 @@ class BrowserProfile(BrowserConnectArgs, BrowserLaunchPersistentContextArgs, Bro
return self
def get_args(self) -> list[str]:
"""Get the list of all Chrome CLI launch args for this profile (compiled from defaults, user-provided, and system-specific)."""
if isinstance(self.ignore_default_args, list):
default_args = set(CHROME_DEFAULT_ARGS) - set(self.ignore_default_args)
elif self.ignore_default_args is True:
@@ -691,6 +708,7 @@ class BrowserProfile(BrowserConnectArgs, BrowserLaunchPersistentContextArgs, Bro
),
]
# convert to dict and back to dedupe and merge duplicate args
final_args_list = BrowserLaunchArgs.args_as_list(BrowserLaunchArgs.args_as_dict(pre_conversion_args))
return final_args_list
@@ -710,33 +728,6 @@ class BrowserProfile(BrowserConnectArgs, BrowserLaunchPersistentContextArgs, Bro
"""Return the kwargs for BrowserType.connect_over_cdp()."""
return BrowserLaunchArgs(**self.model_dump(exclude={'args'}), args=self.get_args())
def prepare_user_data_dir(self) -> None:
"""Create and unlock the user data dir for first-run initialization."""
if self.user_data_dir:
try:
self.user_data_dir = Path(self.user_data_dir).expanduser().resolve()
self.user_data_dir.mkdir(parents=True, exist_ok=True)
(self.user_data_dir / '.browseruse_profile_id').write_text(self.id)
except Exception as e:
raise ValueError(
f'Unusable path provided for user_data_dir= {_log_pretty_path(self.user_data_dir)} (check for typos/permissions issues)'
) from e
# clear any existing locks by any other chrome processes (hacky)
# helps stop chrome crashes from leaving the profile dir in a locked state and breaking subsequent runs,
# but can cause conflicts if the user actually tries to run multiple chrome copies on the same user_data_dir
singleton_lock = self.user_data_dir / 'SingletonLock'
if singleton_lock.exists():
singleton_lock.unlink()
logger.warning(
f'⚠️ Multiple chrome processes may be trying to share user_data_dir={self.user_data_dir} which can lead to crashes and profile data corruption!'
)
if self.downloads_path:
self.downloads_path = Path(self.downloads_path).expanduser().resolve()
self.downloads_path.mkdir(parents=True, exist_ok=True)
# def preinstall_extensions(self) -> None:
# """Preinstall the extensions."""

View File

@@ -255,10 +255,10 @@ class BrowserSession(BaseModel):
return self._logger
def __repr__(self) -> str:
return f'BrowserSession#{self.id[-4:]}(profile={self.browser_profile}, {self._connection_str})'
return f'BrowserSession{self.id[-4:]}(profile={self.browser_profile}, {self._connection_str}) ref#={str(id(self))[-2:]}'
def __str__(self) -> str:
return f'BrowserSession#{self.id[-4:]}'
return f'BrowserSession{self.id[-4:]}.{str(id(self.agent_current_page))[-2:]}'
# def __getattr__(self, key: str) -> Any:
# """
@@ -285,8 +285,8 @@ class BrowserSession(BaseModel):
# Use timeout to prevent indefinite waiting on lock acquisition
async with asyncio.timeout(60): # 60 overall second timeout for entire launching process
async with self._start_lock:
async with asyncio.timeout(60): # 60 second overall timeout for entire launching process to avoid deadlocks
async with self._start_lock: # prevent parallel calls to start() / stop() / save_storage_state() from clashing
if self.initialized:
if self.is_connected():
return self
@@ -307,8 +307,8 @@ class BrowserSession(BaseModel):
try:
# apply last-minute runtime-computed options to the the browser_profile, validate profile, set up folders on disk
assert isinstance(self.browser_profile, BrowserProfile)
self.browser_profile.prepare_user_data_dir() # create/unlock the <user_data_dir>/SingletonLock
self.browser_profile.detect_display_configuration() # adjusts config values, must come before launch/connect
self.prepare_user_data_dir() # create/unlock the <user_data_dir>/SingletonLock
# launch/connect to the browser:
# setup playwright library client, Browser, and BrowserContext objects
@@ -325,6 +325,7 @@ class BrowserSession(BaseModel):
# resize the existing pages and set up foreground tab detection
await self._setup_viewports()
await self._setup_current_page_change_listeners()
await self._start_context_tracing()
except BaseException:
self.initialized = False
raise
@@ -382,11 +383,50 @@ class BrowserSession(BaseModel):
if self.browser_context or self.browser:
self.logger.info(f'🛑 Closing {self._connection_str} browser context {self.browser_context or self.browser}')
# Stop trace recording before closing context
if self.browser_profile.traces_dir and self.browser_context is not None:
try:
traces_path = Path(self.browser_profile.traces_dir)
if traces_path.suffix:
# Path has extension, use as-is (user specified exact file path)
final_trace_path = traces_path
else:
# Path has no extension, treat as directory and create filename
trace_filename = f'BrowserSession_{self.id}.zip'
final_trace_path = traces_path / trace_filename
self.logger.info(f'🎥 Saving browser context trace to {final_trace_path}...')
async with asyncio.timeout(30):
await self.browser_context.tracing.stop(path=str(final_trace_path))
except Exception as e:
# TargetClosedError is expected when browser has already been closed - don't log as error
from playwright._impl._errors import TargetClosedError
if isinstance(e, TargetClosedError):
self.logger.debug('Browser context already closed, trace may have been saved automatically')
else:
self.logger.error(f'❌ Error saving browser context trace: {type(e).__name__}: {e}')
# playwright saves these on browser.close() automatically
if self.browser_profile.record_video_dir:
self.logger.info(
f'🎥 Saving video recording to record_video_dir= {self.browser_profile.record_video_dir}...'
)
if self.browser_profile.record_har_path:
self.logger.info(f'🎥 Saving HAR file to record_har_path= {self.browser_profile.record_har_path}...')
try:
# Add timeout to prevent hanging on close if context is already closed
try:
async with asyncio.timeout(5): # 5 second timeout for close operation
await (self.browser or self.browser_context.browser or self.browser_context).close()
async with asyncio.timeout(30): # 30 second timeout for close operation
# IMPORTANT: Close context first to ensure HAR/video files are saved
if self.browser_context:
await self.browser_context.close()
self.browser_context = None # Prevent duplicate close attempts
# Then close browser if we have one
if self.browser and self.browser.is_connected():
await self.browser.close()
except TimeoutError:
self.logger.warning('⏱️ Timeout while closing browser/context, has it become unresponsive?')
except Exception as e:
@@ -443,10 +483,10 @@ class BrowserSession(BaseModel):
await self.playwright.stop()
# Give playwright tasks a moment to clean up properly
# This prevents "Task was destroyed but it is pending!" warnings
await asyncio.sleep(0.01)
await asyncio.sleep(0.1)
# self.logger.debug('🎭 Stopped playwright node.js API worker')
except Exception as e:
self.logger.warning(f'❌ Error stopping playwright: {type(e).__name__}: {e}')
self.logger.warning(f'❌ Error stopping playwright node.js API subprocess: {type(e).__name__}: {e}')
finally:
# Clear global references if they match this instance
global GLOBAL_PLAYWRIGHT_API_OBJECT, GLOBAL_PATCHRIGHT_API_OBJECT
@@ -534,14 +574,34 @@ class BrowserSession(BaseModel):
except Exception as e:
self.logger.warning(f'Error force-killing browser in BrowserSession.__del__: {type(e).__name__}: {e}')
@staticmethod
async def _start_global_playwright_subprocess(is_stealth: bool) -> Playwright | Patchright:
"""Create and return a new playwright or patchright node.js subprocess / API connector"""
global GLOBAL_PLAYWRIGHT_API_OBJECT, GLOBAL_PATCHRIGHT_API_OBJECT
global GLOBAL_PLAYWRIGHT_EVENT_LOOP, GLOBAL_PATCHRIGHT_EVENT_LOOP
try:
current_loop = asyncio.get_running_loop()
except RuntimeError:
current_loop = None
if is_stealth:
GLOBAL_PATCHRIGHT_API_OBJECT = await async_patchright().start()
GLOBAL_PATCHRIGHT_EVENT_LOOP = current_loop
return GLOBAL_PATCHRIGHT_API_OBJECT
else:
GLOBAL_PLAYWRIGHT_API_OBJECT = await async_playwright().start()
GLOBAL_PLAYWRIGHT_EVENT_LOOP = current_loop
return GLOBAL_PLAYWRIGHT_API_OBJECT
async def setup_playwright(self) -> None:
"""
Set up playwright library client object: usually the result of (await async_playwright().start())
Override to customize the set up of the playwright or patchright library object
"""
global GLOBAL_PLAYWRIGHT_API_OBJECT
global GLOBAL_PLAYWRIGHT_API_OBJECT # one per thread, represents a node.js playwright subprocess that relays commands to the browser via CDP
global GLOBAL_PATCHRIGHT_API_OBJECT
global GLOBAL_PLAYWRIGHT_EVENT_LOOP
global GLOBAL_PLAYWRIGHT_EVENT_LOOP # one per thread, represents a node.js playwright subprocess that relays commands to the browser via CDP
global GLOBAL_PATCHRIGHT_EVENT_LOOP
# Get current event loop
@@ -550,44 +610,47 @@ class BrowserSession(BaseModel):
except RuntimeError:
current_loop = None
if self.browser_profile.stealth:
is_stealth = self.browser_profile.stealth
# Configure browser channel based on stealth mode
if is_stealth:
# use patchright + chrome when stealth=True
self.browser_profile.channel = self.browser_profile.channel or BrowserChannel.CHROME
self.logger.info(f'🕶️ Activated stealth mode using patchright {self.browser_profile.channel.name.lower()} browser...')
if not self.playwright:
# Check if the global object is still valid
should_recreate = False
else:
# use playwright + chromium by default
self.browser_profile.channel = self.browser_profile.channel or BrowserChannel.CHROMIUM
# Check if we're in a different event loop than the one that created the global object
if GLOBAL_PATCHRIGHT_API_OBJECT and GLOBAL_PATCHRIGHT_EVENT_LOOP != current_loop:
self.logger.debug(
'Detected event loop change. Previous patchright instance was created in a different event loop. '
'Creating new instance to avoid asyncio conflicts.'
)
should_recreate = True
# Check if we're in a different event loop than the one that created the global object
should_recreate = False
driver_name = 'patchright' if is_stealth else 'playwright'
global_api_object = GLOBAL_PATCHRIGHT_API_OBJECT if is_stealth else GLOBAL_PLAYWRIGHT_API_OBJECT
global_event_loop = GLOBAL_PATCHRIGHT_EVENT_LOOP if is_stealth else GLOBAL_PLAYWRIGHT_EVENT_LOOP
self.playwright = (
self.playwright or global_api_object or await self._start_global_playwright_subprocess(is_stealth=is_stealth)
)
# Also check if the object exists but is no longer functional
if GLOBAL_PATCHRIGHT_API_OBJECT and not should_recreate:
try:
# Try to access the chromium property to verify the object is still valid
_ = GLOBAL_PATCHRIGHT_API_OBJECT.chromium
except Exception as e:
self.logger.debug(f'Detected invalid patchright instance: {type(e).__name__}. Creating new instance.')
should_recreate = True
if global_api_object and global_event_loop != current_loop:
self.logger.debug(
f'Detected event loop change. Previous {driver_name} instance was created in a different event loop. '
'Creating new instance to avoid disconnection when the previous loop closes.'
)
should_recreate = True
if should_recreate:
# Reset the global object since we need a new one
GLOBAL_PATCHRIGHT_API_OBJECT = None
GLOBAL_PATCHRIGHT_EVENT_LOOP = None
# Also check if the object exists but is no longer functional
if global_api_object and not should_recreate:
try:
# Try to access the chromium property to verify the object is still valid
_ = global_api_object.chromium.executable_path
except Exception as e:
self.logger.debug(f'Detected invalid {driver_name} instance: {type(e).__name__}. Creating new instance.')
should_recreate = True
if GLOBAL_PATCHRIGHT_API_OBJECT:
self.playwright = GLOBAL_PATCHRIGHT_API_OBJECT
else:
GLOBAL_PATCHRIGHT_API_OBJECT = await async_patchright().start() # never start more than one per thread, never try to close it if any other code might be trying to use it, cannot be re-opened once closed
GLOBAL_PATCHRIGHT_EVENT_LOOP = current_loop
self.playwright = GLOBAL_PATCHRIGHT_API_OBJECT
if should_recreate:
self.playwright = await self._start_global_playwright_subprocess(is_stealth=is_stealth)
# check for stealth best-practices
# Log stealth best-practices warnings if applicable
if is_stealth:
if self.browser_profile.channel and self.browser_profile.channel != BrowserChannel.CHROME:
self.logger.info(
' 🪄 For maximum stealth, BrowserSession(...) should be passed channel=None or BrowserChannel.CHROME'
@@ -596,68 +659,6 @@ class BrowserSession(BaseModel):
self.logger.info(' 🪄 For maximum stealth, BrowserSession(...) should be passed a persistent user_data_dir=...')
if self.browser_profile.headless or not self.browser_profile.no_viewport:
self.logger.info(' 🪄 For maximum stealth, BrowserSession(...) should be passed headless=False & viewport=None')
else:
# use playwright + chromium by default
self.browser_profile.channel = self.browser_profile.channel or BrowserChannel.CHROMIUM
if not self.playwright:
# Check if the global object is still valid
should_recreate = False
# Check if we're in a different event loop than the one that created the global object
if GLOBAL_PLAYWRIGHT_API_OBJECT and GLOBAL_PLAYWRIGHT_EVENT_LOOP != current_loop:
self.logger.debug(
'Detected event loop change. Previous playwright instance was created in a different event loop. '
'Creating new instance to avoid asyncio conflicts.'
)
should_recreate = True
# Also check if the object exists but is no longer functional
if GLOBAL_PLAYWRIGHT_API_OBJECT and not should_recreate:
try:
# Try to access the chromium property to verify the object is still valid
_ = GLOBAL_PLAYWRIGHT_API_OBJECT.chromium
except Exception as e:
self.logger.debug(f'Detected invalid playwright instance: {type(e).__name__}. Creating new instance.')
should_recreate = True
if should_recreate:
# Reset the global object since we need a new one
GLOBAL_PLAYWRIGHT_API_OBJECT = None
GLOBAL_PLAYWRIGHT_EVENT_LOOP = None
if GLOBAL_PLAYWRIGHT_API_OBJECT:
self.playwright = GLOBAL_PLAYWRIGHT_API_OBJECT
else:
GLOBAL_PLAYWRIGHT_API_OBJECT = await async_playwright().start() # never start more than one per thread, never try to close it if any other code might be trying to use it, cannot be re-opened once closed
GLOBAL_PLAYWRIGHT_EVENT_LOOP = current_loop
self.playwright = GLOBAL_PLAYWRIGHT_API_OBJECT
async def _recreate_playwright_instance(self) -> None:
"""Force recreation of the playwright instance when it's detected to be invalid"""
global GLOBAL_PLAYWRIGHT_API_OBJECT
global GLOBAL_PATCHRIGHT_API_OBJECT
global GLOBAL_PLAYWRIGHT_EVENT_LOOP
global GLOBAL_PATCHRIGHT_EVENT_LOOP
try:
current_loop = asyncio.get_running_loop()
except RuntimeError:
current_loop = None
if self.browser_profile.stealth:
# Reset patchright
GLOBAL_PATCHRIGHT_API_OBJECT = None
GLOBAL_PATCHRIGHT_EVENT_LOOP = None
GLOBAL_PATCHRIGHT_API_OBJECT = await async_patchright().start()
GLOBAL_PATCHRIGHT_EVENT_LOOP = current_loop
self.playwright = GLOBAL_PATCHRIGHT_API_OBJECT
else:
# Reset playwright
GLOBAL_PLAYWRIGHT_API_OBJECT = None
GLOBAL_PLAYWRIGHT_EVENT_LOOP = None
GLOBAL_PLAYWRIGHT_API_OBJECT = await async_playwright().start()
GLOBAL_PLAYWRIGHT_EVENT_LOOP = current_loop
self.playwright = GLOBAL_PLAYWRIGHT_API_OBJECT
async def setup_browser_via_passed_objects(self) -> None:
"""Override to customize the set up of the connection to an existing browser"""
@@ -758,7 +759,7 @@ class BrowserSession(BaseModel):
self.logger.info(f'🌎 Using first browser_context available in existing browser: {self.browser_context}')
else:
self.browser_context = await self.browser.new_context(
**self.browser_profile.kwargs_for_new_context().model_dump()
**self.browser_profile.kwargs_for_new_context().model_dump(mode='json')
)
storage_info = (
f' + loaded storage_state={len(self.browser_profile.storage_state.cookies) if self.browser_profile.storage_state else 0} cookies'
@@ -787,7 +788,7 @@ class BrowserSession(BaseModel):
# self.logger.debug('🌎 Launching new incognito context in browser')
async with asyncio.timeout(10): # Reduced timeout from 30s to 10s
self.browser_context = await self.browser.new_context(
**self.browser_profile.kwargs_for_new_context().model_dump()
**self.browser_profile.kwargs_for_new_context().model_dump(mode='json')
)
except TimeoutError:
self.logger.warning(
@@ -795,7 +796,7 @@ class BrowserSession(BaseModel):
'Recreating playwright instance and retrying...'
)
# Force recreation of the playwright object
await self._recreate_playwright_instance()
self.playwright = await self._start_global_playwright_subprocess(is_stealth=self.browser_profile.stealth)
# Retry the operation with the new playwright instance
async with asyncio.timeout(10):
self.browser = await self.playwright.chromium.launch(
@@ -808,7 +809,7 @@ class BrowserSession(BaseModel):
# self.logger.debug('🌎 Created new incognito context in browser')
else:
# user data dir was provided, prepare it for use
self.browser_profile.prepare_user_data_dir()
self.prepare_user_data_dir()
# search for potentially conflicting local processes running on the same user_data_dir
for proc in psutil.process_iter(['pid', 'cmdline']):
@@ -824,7 +825,7 @@ class BrowserSession(BaseModel):
async with asyncio.timeout(10): # Reduced timeout from 30s to 10s
try:
self.browser_context = await self.playwright.chromium.launch_persistent_context(
**self.browser_profile.kwargs_for_launch_persistent_context().model_dump()
**self.browser_profile.kwargs_for_launch_persistent_context().model_dump(mode='json')
)
except Exception as e:
# Re-raise if not a timeout
@@ -836,7 +837,7 @@ class BrowserSession(BaseModel):
'Recreating playwright instance and retrying...'
)
# Force recreation of the playwright object
await self._recreate_playwright_instance()
self.playwright = await self._start_global_playwright_subprocess(is_stealth=self.browser_profile.stealth)
# Retry the operation with the new playwright instance
async with asyncio.timeout(10):
self.browser_context = await self.playwright.chromium.launch_persistent_context(
@@ -1316,6 +1317,57 @@ class BrowserSession(BaseModel):
if not already_disconnected:
self.logger.debug(f'⚰️ Browser {self._connection_str} disconnected')
def prepare_user_data_dir(self) -> None:
"""Create and unlock the user data dir and ensure all recording paths exist."""
if self.browser_profile.user_data_dir:
try:
self.browser_profile.user_data_dir = Path(self.browser_profile.user_data_dir).expanduser().resolve()
self.browser_profile.user_data_dir.mkdir(parents=True, exist_ok=True)
(self.browser_profile.user_data_dir / '.browseruse_profile_id').write_text(self.browser_profile.id)
except Exception as e:
raise ValueError(
f'Unusable path provided for user_data_dir= {_log_pretty_path(self.browser_profile.user_data_dir)} (check for typos/permissions issues)'
) from e
# clear any existing locks by any other chrome processes (hacky)
singleton_lock = self.browser_profile.user_data_dir / 'SingletonLock'
if singleton_lock.exists():
singleton_lock.unlink()
self.logger.warning(
f'⚠️ Multiple chrome processes may be trying to share user_data_dir={self.browser_profile.user_data_dir} which can lead to crashes and profile data corruption!'
)
# Create directories for all paths that need them
dir_paths = {
'downloads_path': self.browser_profile.downloads_path,
'record_video_dir': self.browser_profile.record_video_dir,
'traces_dir': self.browser_profile.traces_dir,
}
file_paths = {
'record_har_path': self.browser_profile.record_har_path,
}
# Handle directory creation
for path_name, path_value in dir_paths.items():
if path_value:
try:
path_obj = Path(path_value).expanduser().resolve()
path_obj.mkdir(parents=True, exist_ok=True)
setattr(self.browser_profile, path_name, str(path_obj) if path_name == 'traces_dir' else path_obj)
except Exception as e:
self.logger.error(f'❌ Failed to create {path_name} directory {path_value}: {e}')
# Handle file path parent directory creation
for path_name, path_value in file_paths.items():
if path_value:
try:
path_obj = Path(path_value).expanduser().resolve()
path_obj.parent.mkdir(parents=True, exist_ok=True)
except Exception as e:
self.logger.error(f'❌ Failed to create parent directory for {path_name} {path_value}: {e}')
# --- Tab management ---
async def get_current_page(self) -> Page:
"""Get the current page + ensure it's not None / closed"""
@@ -1457,12 +1509,22 @@ class BrowserSession(BaseModel):
except Exception:
# If no download is triggered, treat as normal click
# self.logger.debug('No download triggered within timeout. Checking navigation...')
await page.wait_for_load_state()
try:
await page.wait_for_load_state()
except Exception as e:
self.logger.warning(
f'⚠️ Page {_log_pretty_url(page.url)} failed to finish loading after click: {type(e).__name__}: {e}'
)
await self._check_and_handle_navigation(page)
else:
# If downloads are disabled, just perform the click
await click_func()
await page.wait_for_load_state()
try:
await page.wait_for_load_state()
except Exception as e:
self.logger.warning(
f'⚠️ Page {_log_pretty_url(page.url)} failed to finish loading after click: {type(e).__name__}: {e}'
)
await self._check_and_handle_navigation(page)
try:
@@ -1499,7 +1561,10 @@ class BrowserSession(BaseModel):
await page.mouse.click(
element_node.viewport_coordinates.center.x, element_node.viewport_coordinates.center.y
)
await page.wait_for_load_state()
try:
await page.wait_for_load_state()
except Exception:
pass
await self._check_and_handle_navigation(page)
return None # Success
except Exception as coord_e:
@@ -1582,6 +1647,9 @@ class BrowserSession(BaseModel):
if not (path or self.browser_profile.cookies_file):
return
if not cookies:
return
try:
cookies_file_path = Path(path or self.browser_profile.cookies_file).expanduser().resolve()
cookies_file_path.parent.mkdir(parents=True, exist_ok=True)
@@ -1630,7 +1698,9 @@ class BrowserSession(BaseModel):
pass
temp_path.replace(json_path)
self.logger.info(f'🍪 Saved {len(storage_state["cookies"])} cookies to storage_state= {_log_pretty_path(json_path)}')
self.logger.info(
f'🍪 Saved {len(storage_state["cookies"]) + len(storage_state.get("origins", []))} cookies to storage_state= {_log_pretty_path(json_path)}'
)
except Exception as e:
self.logger.warning(f'❌ Failed to save cookies to storage_state= {_log_pretty_path(path)}: {type(e).__name__}: {e}')
@@ -1644,9 +1714,10 @@ class BrowserSession(BaseModel):
storage_state = await self.browser_context.storage_state()
cookies = storage_state['cookies']
has_any_auth_data = cookies or storage_state.get('origins', [])
# they passed an explicit path, only save to that path and return
if path:
if path and has_any_auth_data:
if path.name == 'storage_state.json':
await self._save_storage_state_to_file(path, storage_state)
return
@@ -1743,9 +1814,11 @@ class BrowserSession(BaseModel):
# playwright doesn't provide an API for setting these before launch
# https://playwright.dev/python/docs/auth#session-storage
# await self.browser_context.add_local_storage(storage_state['localStorage'])
self.logger.info(
f'🍪 Loaded {len(storage_state["cookies"])} cookies from storage_state= {_log_pretty_path(self.browser_profile.storage_state)}'
)
num_entries = len(storage_state['cookies']) + len(storage_state.get('origins', []))
if num_entries:
self.logger.info(
f'🍪 Loaded {num_entries} cookies from storage_state= {_log_pretty_path(self.browser_profile.storage_state)}'
)
except Exception as e:
self.logger.warning(
f'❌ Failed to load cookies from storage_state= {_log_pretty_path(self.browser_profile.storage_state)}: {type(e).__name__}: {e}'
@@ -2061,14 +2134,22 @@ class BrowserSession(BaseModel):
page = await self.get_current_page()
await page.goto(url)
await page.wait_for_load_state()
try:
await page.wait_for_load_state()
except Exception as e:
self.logger.warning(
f'⚠️ Page {_log_pretty_url(page.url)} failed to fully load after navigation: {type(e).__name__}: {e}'
)
async def refresh_page(self):
"""Refresh the agent's current page"""
page = await self.get_current_page()
await page.reload()
await page.wait_for_load_state()
try:
await page.wait_for_load_state()
except Exception as e:
self.logger.warning(f'⚠️ Page {_log_pretty_url(page.url)} failed to fully load after refresh: {type(e).__name__}: {e}')
async def go_back(self):
"""Navigate the agent's tab back in browser history"""
@@ -2332,7 +2413,7 @@ class BrowserSession(BaseModel):
full_page=full_page,
scale='css',
timeout=15000,
animations='disabled',
animations='allow',
caret='initial',
)
@@ -2372,8 +2453,9 @@ class BrowserSession(BaseModel):
scale='css',
timeout=30000,
clip={'x': 0, 'y': 0, 'width': expanded_width, 'height': expanded_height},
animations='allow',
caret='initial',
# animations='disabled', # these can cause CSP errors on some pages, leading to a red herring "waiting for fonts to load" error
# caret='initial',
)
# TODO: manually take multiple clipped screenshots to capture the full height and stitch them together?
@@ -2403,6 +2485,20 @@ class BrowserSession(BaseModel):
counter += 1
return new_filename
async def _start_context_tracing(self):
"""Start tracing on browser context if trace_path is configured."""
if self.browser_profile.traces_dir and self.browser_context:
try:
self.logger.debug(f'📽️ Starting tracing (will save to: {self.browser_profile.traces_dir})')
# Don't pass any path to start() - let Playwright handle internal temp files
await self.browser_context.tracing.start(
screenshots=True,
snapshots=True,
sources=False, # Reduce trace size
)
except Exception as e:
self.logger.warning(f'Failed to start tracing: {e}')
@staticmethod
def _convert_simple_xpath_to_css_selector(xpath: str) -> str:
"""Converts simple XPath expressions to CSS selectors."""
@@ -2610,7 +2706,12 @@ class BrowserSession(BaseModel):
parent,
include_dynamic_attributes=self.browser_profile.include_dynamic_attributes,
)
current_frame = current_frame.frame_locator(css_selector)
# Use CSS selector if available, otherwise fall back to XPath
if css_selector:
current_frame = current_frame.frame_locator(css_selector)
else:
self.logger.debug(f'Using XPath for iframe: {parent.xpath}')
current_frame = current_frame.frame_locator(f'xpath={parent.xpath}')
css_selector = self._enhanced_css_selector_for_element(
element, include_dynamic_attributes=self.browser_profile.include_dynamic_attributes
@@ -2618,11 +2719,22 @@ class BrowserSession(BaseModel):
try:
if isinstance(current_frame, FrameLocator):
element_handle = await current_frame.locator(css_selector).element_handle()
if css_selector:
element_handle = await current_frame.locator(css_selector).element_handle()
else:
# Fall back to XPath when CSS selector is empty
self.logger.debug(f'CSS selector empty, falling back to XPath: {element.xpath}')
element_handle = await current_frame.locator(f'xpath={element.xpath}').element_handle()
return element_handle
else:
# Try to scroll into view if hidden
element_handle = await current_frame.query_selector(css_selector)
# Try CSS selector first if available
if css_selector:
element_handle = await current_frame.query_selector(css_selector)
else:
# Fall back to XPath
self.logger.debug(f'CSS selector empty, falling back to XPath: {element.xpath}')
element_handle = await current_frame.locator(f'xpath={element.xpath}').element_handle()
if element_handle:
is_visible = await self._is_visible(element_handle)
if is_visible:
@@ -2630,10 +2742,30 @@ class BrowserSession(BaseModel):
return element_handle
return None
except Exception as e:
self.logger.error(
f'❌ Failed to locate element {css_selector} on page {_log_pretty_url(page.url)}: {type(e).__name__}: {e}'
)
return None
# If CSS selector failed, try XPath as fallback
if css_selector and 'CSS.escape' not in str(e):
try:
self.logger.debug(f'CSS selector failed, trying XPath fallback: {element.xpath}')
if isinstance(current_frame, FrameLocator):
element_handle = await current_frame.locator(f'xpath={element.xpath}').element_handle()
else:
element_handle = await current_frame.locator(f'xpath={element.xpath}').element_handle()
if element_handle:
is_visible = await self._is_visible(element_handle)
if is_visible:
await element_handle.scroll_into_view_if_needed()
return element_handle
except Exception as xpath_e:
self.logger.error(
f'❌ Failed to locate element with both CSS ({css_selector}) and XPath ({element.xpath}): {type(xpath_e).__name__}: {xpath_e}'
)
return None
else:
self.logger.error(
f'❌ Failed to locate element {css_selector or element.xpath} on page {_log_pretty_url(page.url)}: {type(e).__name__}: {e}'
)
return None
@require_initialization
@time_execution_async('--get_locate_element_by_xpath')
@@ -2814,6 +2946,7 @@ class BrowserSession(BaseModel):
# update the human's active tab to match the agent's
if self.human_current_page != page:
# TODO: figure out how to do this without bringing the entire window to the foreground and stealing foreground app focus
# might require browser-use extension loaded into the browser so we can use chrome.tabs extension APIs
await page.bring_to_front()
self.human_current_page = page
@@ -2859,7 +2992,13 @@ class BrowserSession(BaseModel):
if (not self.human_current_page) or self.human_current_page.is_closed():
self.human_current_page = new_page
await new_page.wait_for_load_state()
tab_idx = self.tabs.index(new_page)
try:
await new_page.wait_for_load_state()
except Exception as e:
self.logger.warning(
f'⚠️ New page [{tab_idx}]{_log_pretty_url(new_page.url)} failed to fully load: {type(e).__name__}: {e}'
)
# Set the viewport size for the new tab
if self.browser_profile.viewport:
@@ -2880,12 +3019,21 @@ class BrowserSession(BaseModel):
# assert self.agent_current_page.url == 'about:blank'
# if there are any unused about:blank tabs after we open a new tab, close them to clean up unused tabs
# hacky way to be sure we only close our own tabs, check the title of the tab for our BrowserSession name
title_of_our_setup_tab = f'Setting up #{str(self.id)[-4:]}...' # set up by self._show_dvd_screensaver_loading_animation()
for page in self.browser_context.pages:
if page.url == 'about:blank' and page != self.agent_current_page:
page_title = await page.title()
if page.url == 'about:blank' and page != self.agent_current_page and page_title == title_of_our_setup_tab:
await page.close()
self.human_current_page = ( # in case we just closed the human's tab, fix the refs
self.human_current_page if not self.human_current_page.is_closed() else self.agent_current_page
)
break # only close a maximum of one unused about:blank tab,
# if multiple parallel agents share one BrowserSession
# closing every new_page() tab (which start on about:blank) causes lots of problems
# (the title check is not enough when they share a single BrowserSession)
return new_page
@@ -3050,8 +3198,18 @@ class BrowserSession(BaseModel):
# dont bother wasting CPU showing animations during evals
return
await page.evaluate("""() => {
document.title = 'Setting up...';
# we could enforce this, but maybe it's useful to be able to show it on other tabs?
# assert page.url == 'about:blank', 'DVD screensaver loading animation should only be shown on about:blank tabs'
# all in one JS function for speed, we want as few roundtrip CDP calls as possible
# between opening the tab and showing the animation
await page.evaluate(
"""(browser_session_label) => {
const animated_title = `Setting up #${browser_session_label}...`;
if (document.title === animated_title) {
return; // already run on this tab, dont run again
}
document.title = animated_title;
// Create the main overlay
const loadingOverlay = document.createElement('div');
@@ -3135,4 +3293,6 @@ class BrowserSession(BaseModel):
}
`;
document.head.appendChild(style);
}""")
}""",
str(self.id)[-4:],
)

View File

@@ -98,15 +98,35 @@ class Controller(Generic[Context]):
@self.registry.action('Navigate to URL in the current tab', param_model=GoToUrlAction)
async def go_to_url(params: GoToUrlAction, browser_session: BrowserSession):
page = await browser_session.get_current_page()
if page:
await page.goto(params.url)
await page.wait_for_load_state()
else:
page = await browser_session.create_new_tab(params.url)
msg = f'🔗 Navigated to {params.url}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
try:
page = await browser_session.get_current_page()
if page:
await page.goto(params.url)
await page.wait_for_load_state()
else:
page = await browser_session.create_new_tab(params.url)
msg = f'🔗 Navigated to {params.url}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
error_msg = str(e)
# Check for network-related errors
if any(
err in error_msg
for err in [
'ERR_NAME_NOT_RESOLVED',
'ERR_INTERNET_DISCONNECTED',
'ERR_CONNECTION_REFUSED',
'ERR_TIMED_OUT',
'net::',
]
):
site_unavailable_msg = f'Site unavailable: {params.url} - {error_msg}'
logger.warning(site_unavailable_msg)
return ActionResult(success=False, error=site_unavailable_msg, include_in_memory=True)
else:
# Re-raise non-network errors
raise
@self.registry.action('Go back', param_model=NoParamsAction)
async def go_back(params: NoParamsAction, browser_session: BrowserSession):
@@ -172,7 +192,7 @@ class Controller(Generic[Context]):
msg += f' - {new_tab_msg}'
logger.info(new_tab_msg)
await browser_session.switch_to_tab(-1)
return ActionResult(extracted_content=msg, include_in_memory=True)
return ActionResult(extracted_content=msg, include_in_memory=True, success=True)
except Exception as e:
error_msg = str(e)
if 'Execution context was destroyed' in error_msg or 'Cannot find context with specified id' in error_msg:
@@ -221,17 +241,21 @@ class Controller(Generic[Context]):
@self.registry.action('Switch tab', param_model=SwitchTabAction)
async def switch_tab(params: SwitchTabAction, browser_session: BrowserSession):
await browser_session.switch_to_tab(params.page_id)
# Wait for tab to be ready and ensure references are synchronized
page = await browser_session.get_current_page()
await page.wait_for_load_state()
msg = f'🔄 Switched to tab {params.page_id}'
try:
await page.wait_for_load_state(state='domcontentloaded', timeout=5_000)
# page was already loaded when we first navigated, this is additional to wait for onfocus/onblur animations/ajax to settle
except Exception as e:
pass
msg = f'🔄 Switched to tab #{params.page_id} with url {page.url}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
@self.registry.action('Open a specific url in new tab', param_model=OpenTabAction)
async def open_tab(params: OpenTabAction, browser_session: BrowserSession):
await browser_session.create_new_tab(params.url)
msg = f'🔗 Opened new tab with {params.url}'
page = await browser_session.create_new_tab(params.url)
tab_idx = browser_session.tabs.index(page)
msg = f'🔗 Opened new tab #{tab_idx} with url {params.url}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)

View File

@@ -1391,10 +1391,16 @@
}
}
// Skip empty anchor tags
// Skip empty anchor tags only if they have no dimensions and no children
if (nodeData.tagName === 'a' && nodeData.children.length === 0 && !nodeData.attributes.href) {
if (debugMode) PERF_METRICS.nodeMetrics.skippedNodes++;
return null;
// Check if the anchor has actual dimensions
const rect = getCachedBoundingRect(node);
const hasSize = (rect && rect.width > 0 && rect.height > 0) || (node.offsetWidth > 0 || node.offsetHeight > 0);
if (!hasSize) {
if (debugMode) PERF_METRICS.nodeMetrics.skippedNodes++;
return null;
}
}
const id = `${ID.current++}`;

View File

@@ -11,20 +11,19 @@ load_dotenv()
from langchain_openai import ChatOpenAI
from browser_use.agent.service import Agent
from browser_use.browser import BrowserProfile, BrowserSession
from browser_use.browser import BrowserSession
browser_session = BrowserSession(
browser_profile=BrowserProfile(
disable_security=True,
headless=False,
record_video_dir='./tmp/recordings',
user_data_dir='~/.config/browseruse/profiles/default',
)
keep_alive=True,
headless=False,
save_recording_path='./tmp/recordings',
user_data_dir='~/.config/browseruse/profiles/default',
)
llm = ChatOpenAI(model='gpt-4o')
async def main():
await browser_session.start()
agents = [
Agent(task=task, llm=llm, browser_session=browser_session)
for task in [
@@ -32,7 +31,7 @@ async def main():
'Check Reddit front page title',
'Look up Bitcoin price on Coinbase',
'Find NASA image of the day',
# 'Check top story on CNN',
'Check top story on CNN',
# 'Search latest SpaceX launch date',
# 'Look up population of Paris',
# 'Find current time in Sydney',
@@ -41,16 +40,8 @@ async def main():
]
]
await asyncio.gather(*[agent.run() for agent in agents])
agentX = Agent(
task='Go to apple.com and return the title of the page',
llm=llm,
browser_session=browser_session,
)
await agentX.run()
await browser_session.close()
print(await asyncio.gather(*[agent.run() for agent in agents]))
await browser_session.kill()
if __name__ == '__main__':

77
tests/ci/mocks.py Normal file
View File

@@ -0,0 +1,77 @@
"""Mock utilities for testing browser-use."""
from unittest.mock import AsyncMock
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import AIMessage
def create_mock_llm(actions=None):
"""Create a mock LLM that returns specified actions or a default done action.
Args:
actions: Optional list of JSON strings representing actions to return in sequence.
If not provided, returns a single done action.
After all actions are exhausted, returns a done action.
Returns:
Mock LLM that will return the actions in order, or just a done action if no actions provided.
"""
mock = AsyncMock(spec=BaseChatModel)
mock._verified_api_keys = True
mock._verified_tool_calling_method = 'raw'
mock.model_name = 'mock-llm'
# Default done action
default_done_action = """
{
"current_state": {
"evaluation_previous_goal": "Successfully completed the task",
"memory": "Task completed",
"next_goal": "Task completed"
},
"action": [
{
"done": {
"text": "Task completed successfully",
"success": true
}
}
]
}
"""
if actions is None:
# No actions provided, just return done action
mock.invoke.return_value = AIMessage(content=default_done_action)
async def async_invoke(*args, **kwargs):
return AIMessage(content=default_done_action)
mock.ainvoke.side_effect = async_invoke
else:
# Actions provided, return them in sequence
action_index = 0
def get_next_action():
nonlocal action_index
if action_index < len(actions):
action = actions[action_index]
action_index += 1
return action
else:
return default_done_action
# Mock the invoke method
def mock_invoke(*args, **kwargs):
return AIMessage(content=get_next_action())
mock.invoke.side_effect = mock_invoke
# Create an async version
async def mock_ainvoke(*args, **kwargs):
return AIMessage(content=get_next_action())
mock.ainvoke.side_effect = mock_ainvoke
return mock

View File

@@ -23,105 +23,16 @@ from playwright.async_api import async_playwright
from browser_use import Agent, setup_logging
from browser_use.browser import BrowserProfile, BrowserSession
from tests.ci.mocks import create_mock_llm
# Set up test logging
setup_logging()
logger = logging.getLogger(__name__)
def create_mock_llm():
"""Create a mock LLM that returns a done action"""
mock = AsyncMock(spec=BaseChatModel)
mock._verified_api_keys = True
mock._verified_tool_calling_method = 'raw'
# Add model_name attribute for logging
mock.model_name = 'mock-llm'
# Create the response
response_content = """
{
"current_state": {
"evaluation_previous_goal": "Starting the task",
"memory": "Task completed",
"next_goal": "Complete the task"
},
"action": [
{
"done": {
"text": "Task completed successfully",
"success": true
}
}
]
}
"""
# Mock the invoke method to return an AIMessage
mock.invoke.return_value = AIMessage(content=response_content)
# Make ainvoke return a coroutine that returns the AIMessage
async def async_invoke(*args, **kwargs):
return AIMessage(content=response_content)
mock.ainvoke.side_effect = async_invoke
return mock
def create_mock_llm_with_actions(action_sequence):
"""Factory to create mock LLMs with specific action sequences"""
mock = AsyncMock(spec=BaseChatModel)
mock._verified_api_keys = True
mock._verified_tool_calling_method = 'raw'
# Add model_name attribute for logging
mock.model_name = 'mock-llm'
action_index = 0
def mock_invoke(*args, **kwargs):
nonlocal action_index
if action_index < len(action_sequence):
content = action_sequence[action_index]
action_index += 1
else:
# Default to done action
content = """
{
"current_state": {
"evaluation_previous_goal": "All actions completed",
"memory": "Task completed",
"next_goal": "Complete the task"
},
"action": [
{
"done": {
"text": "Task completed successfully",
"success": true
}
}
]
}
"""
return AIMessage(content=content)
# Create async version for ainvoke
async def async_mock_invoke(*args, **kwargs):
return mock_invoke(*args, **kwargs)
mock.invoke.side_effect = mock_invoke
mock.ainvoke.side_effect = async_mock_invoke
return mock
def run_agent_in_subprocess_module(task_description):
"""Module-level function to run an agent in a subprocess"""
import asyncio
from unittest.mock import AsyncMock
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import AIMessage
from browser_use import Agent
@@ -487,8 +398,8 @@ class TestParallelism:
"""
# Create mocks with tab creation actions
mock_llm1 = create_mock_llm_with_actions([tab_action, done_action])
mock_llm2 = create_mock_llm_with_actions([tab_action, done_action])
mock_llm1 = create_mock_llm([tab_action, done_action])
mock_llm2 = create_mock_llm([tab_action, done_action])
# Create shared browser session
shared_session = BrowserSession(

View File

@@ -0,0 +1,407 @@
"""Test all recording and save functionality for Agent and BrowserSession."""
import asyncio
import json
import shutil
import zipfile
from pathlib import Path
import pytest
from browser_use import Agent, AgentHistoryList
from browser_use.browser import BrowserProfile, BrowserSession
from tests.ci.mocks import create_mock_llm
@pytest.fixture
def test_dir(tmp_path):
"""Create a test directory that gets cleaned up after each test."""
test_path = tmp_path / 'test_recordings'
test_path.mkdir(exist_ok=True)
yield test_path
@pytest.fixture
async def httpserver_url(httpserver):
"""Simple test page."""
httpserver.expect_request('/').respond_with_data(
"""
<!DOCTYPE html>
<html>
<head><title>Test Page</title></head>
<body>
<h1>Test Recording Page</h1>
<input type="text" id="search" placeholder="Search here">
<button id="submit">Submit</button>
</body>
</html>
""",
content_type='text/html',
)
return httpserver.url_for('/')
@pytest.fixture
def llm():
"""Create mocked LLM instance for tests."""
return create_mock_llm()
@pytest.fixture
def interactive_llm(httpserver_url):
"""Create mocked LLM that navigates to page and interacts with elements."""
actions = [
# First action: Navigate to the page
f"""
{{
"current_state": {{
"evaluation_previous_goal": "Starting the task",
"memory": "Need to navigate to the test page",
"next_goal": "Navigate to the URL"
}},
"action": [
{{
"go_to_url": {{
"url": "{httpserver_url}"
}}
}}
]
}}
""",
# Second action: Click in the search box
"""
{
"current_state": {
"evaluation_previous_goal": "Successfully navigated to the page",
"memory": "Page loaded, can see search box and submit button",
"next_goal": "Click on the search box to focus it"
},
"action": [
{
"click_element_by_index": {
"index": 0
}
}
]
}
""",
# Third action: Type text in the search box
"""
{
"current_state": {
"evaluation_previous_goal": "Clicked on search box",
"memory": "Search box is focused and ready for input",
"next_goal": "Type 'test' in the search box"
},
"action": [
{
"input_text": {
"index": 0,
"text": "test"
}
}
]
}
""",
# Fourth action: Click submit button
"""
{
"current_state": {
"evaluation_previous_goal": "Typed 'test' in search box",
"memory": "Text 'test' has been entered successfully",
"next_goal": "Click the submit button to complete the task"
},
"action": [
{
"click_element_by_index": {
"index": 1
}
}
]
}
""",
]
return create_mock_llm(actions)
class TestAgentRecordings:
"""Test Agent save_conversation_path and generate_gif parameters."""
@pytest.mark.parametrize('path_type', ['with_slash', 'without_slash', 'deep_directory'])
async def test_save_conversation_path(self, test_dir, httpserver_url, llm, path_type):
"""Test saving conversation with different path types."""
if path_type == 'with_slash':
conversation_path = test_dir / 'logs' / 'conversation'
elif path_type == 'without_slash':
conversation_path = test_dir / 'logs'
else: # deep_directory
conversation_path = test_dir / 'logs' / 'deep' / 'directory' / 'conversation'
browser_session = BrowserSession(browser_profile=BrowserProfile(headless=True, disable_security=True, user_data_dir=None))
await browser_session.start()
try:
agent = Agent(
task=f'go to {httpserver_url} and type "test" in the search box',
llm=llm,
browser_session=browser_session,
save_conversation_path=str(conversation_path),
)
history: AgentHistoryList = await agent.run(max_steps=2)
result = history.final_result()
assert result is not None
# Check that the conversation directory and files were created
assert conversation_path.exists(), f'{path_type}: conversation directory was not created'
# Files are now always created as conversation_<agent_id>_<step>.txt inside the directory
conversation_files = list(conversation_path.glob('conversation_*.txt'))
assert len(conversation_files) > 0, f'{path_type}: conversation file was not created in {conversation_path}'
finally:
await browser_session.stop()
@pytest.mark.parametrize('generate_gif', [False, True, 'custom_path'])
async def test_generate_gif(self, test_dir, httpserver_url, llm, generate_gif):
"""Test GIF generation with different settings."""
# Clean up any existing GIFs first
for gif in Path.cwd().glob('agent_*.gif'):
gif.unlink()
gif_param = generate_gif
expected_gif_path = None
if generate_gif == 'custom_path':
expected_gif_path = test_dir / 'custom_agent.gif'
gif_param = str(expected_gif_path)
browser_session = BrowserSession(browser_profile=BrowserProfile(headless=True, disable_security=True, user_data_dir=None))
await browser_session.start()
try:
agent = Agent(
task=f'go to {httpserver_url}',
llm=llm,
browser_session=browser_session,
generate_gif=gif_param,
)
history: AgentHistoryList = await agent.run(max_steps=2)
result = history.final_result()
assert result is not None
# Check GIF creation
if generate_gif is False:
gif_files = list(Path.cwd().glob('*.gif'))
assert len(gif_files) == 0, 'GIF file was created when generate_gif=False'
elif generate_gif is True:
gif_files = list(Path.cwd().glob('agent_*.gif'))
assert len(gif_files) > 0, 'No GIF file was created when generate_gif=True'
# Clean up
for gif in gif_files:
gif.unlink()
else: # custom_path
assert expected_gif_path.exists(), f'GIF was not created at {expected_gif_path}'
finally:
await browser_session.stop()
class TestBrowserProfileRecordings:
"""Test BrowserProfile recording parameters with aliases."""
@pytest.mark.parametrize(
'context_type,alias',
[
('incognito', 'save_recording_path'),
('incognito', 'record_video_dir'),
('persistent', 'save_recording_path'),
('persistent', 'record_video_dir'),
],
)
async def test_video_recording(self, test_dir, httpserver_url, context_type, alias):
"""Test video recording with different contexts and aliases."""
video_dir = test_dir / f'videos_{context_type}_{alias}'
user_data_dir = None if context_type == 'incognito' else str(test_dir / 'user_data')
browser_session = BrowserSession(
browser_profile=BrowserProfile(
headless=True, disable_security=True, user_data_dir=user_data_dir, **{alias: str(video_dir)}
)
)
await browser_session.start()
try:
await browser_session.navigate(httpserver_url)
await asyncio.sleep(0.5)
finally:
await browser_session.stop()
# Add delay for video processing
await asyncio.sleep(1)
# Check if videos were created (may not work in all CI environments)
if video_dir.exists():
video_files = list(video_dir.glob('*.webm'))
if video_files:
for video_file in video_files:
file_size = video_file.stat().st_size
assert file_size > 1000, f'Video file {video_file.name} is too small'
else:
# Video recording might not work in headless CI environments - skip gracefully
pytest.skip('Video recording not supported in this environment')
@pytest.mark.parametrize(
'context_type,alias',
[
('incognito', 'save_har_path'),
('incognito', 'record_har_path'),
('persistent', 'save_har_path'),
('persistent', 'record_har_path'),
],
)
async def test_har_recording(self, test_dir, httpserver_url, context_type, alias):
"""Test HAR recording with different contexts and aliases."""
har_path = test_dir / f'network_{context_type}_{alias}.har'
user_data_dir = None if context_type == 'incognito' else str(test_dir / f'user_data_har_{alias}')
browser_session = BrowserSession(
browser_profile=BrowserProfile(
headless=True, disable_security=True, user_data_dir=user_data_dir, **{alias: str(har_path)}
)
)
await browser_session.start()
try:
await browser_session.navigate(httpserver_url)
await asyncio.sleep(0.5)
finally:
await browser_session.stop()
# HAR files should be created
assert har_path.exists(), f'HAR file was not created at {har_path}'
# Check HAR file content
har_content = json.loads(har_path.read_text())
assert 'log' in har_content, "HAR file missing 'log' key"
assert 'entries' in har_content['log'], 'HAR file missing entries'
assert len(har_content['log']['entries']) > 0, 'HAR file has no network entries'
@pytest.mark.parametrize(
'context_type,alias',
[
('incognito', 'trace_path'),
('incognito', 'traces_dir'),
('persistent', 'trace_path'),
('persistent', 'traces_dir'),
],
)
async def test_trace_recording(self, test_dir, httpserver_url, context_type, alias, interactive_llm):
"""Test trace recording with different contexts and aliases."""
browser_session = BrowserSession(
browser_profile=BrowserProfile(
headless=True,
disable_security=True,
user_data_dir=None if context_type == 'incognito' else str(test_dir / f'user_data_trace_{alias}'),
)
)
# Use browser session ID to create unique trace directory
trace_dir = test_dir / f'trace_{context_type}_{alias}_{browser_session.id}'
# Clean up any existing directory at this path
if trace_dir.exists():
shutil.rmtree(trace_dir)
# Set the trace directory - trace_path is an alias for traces_dir
if alias == 'trace_path':
browser_session.browser_profile.traces_dir = str(trace_dir)
else:
setattr(browser_session.browser_profile, alias, str(trace_dir))
await browser_session.start()
try:
# Use Agent to interact with page for better trace content
agent = Agent(
task=f'go to {httpserver_url} and type "test" in the search box',
llm=interactive_llm,
browser_session=browser_session,
)
await agent.run(max_steps=5)
finally:
await browser_session.stop()
# Check trace file - should be created automatically in the directory
assert trace_dir.exists(), f'Trace directory was not created at {trace_dir}'
trace_files = list(trace_dir.glob('*.zip'))
assert len(trace_files) > 0, f'No trace files were created in {trace_dir}'
trace_file = trace_files[0]
assert zipfile.is_zipfile(trace_file), 'Trace file is not a valid ZIP'
with zipfile.ZipFile(trace_file, 'r') as zip_file:
files = zip_file.namelist()
assert len(files) > 0, 'Trace ZIP file is empty'
assert any('trace' in f.lower() for f in files), 'Trace ZIP missing trace data'
class TestCombinedRecordings:
"""Test using multiple recording parameters together."""
async def test_all_recording_parameters(self, test_dir, httpserver_url, interactive_llm):
"""Test using all recording parameters together."""
conversation_path = test_dir / 'conversation'
gif_path = test_dir / 'agent.gif'
video_dir = test_dir / 'videos'
har_path = test_dir / 'network.har'
trace_dir = test_dir / 'traces'
browser_session = BrowserSession(
browser_profile=BrowserProfile(
headless=True,
disable_security=True,
user_data_dir=None,
record_video_dir=str(video_dir),
record_har_path=str(har_path),
traces_dir=str(trace_dir),
)
)
await browser_session.start()
try:
agent = Agent(
task=f'go to {httpserver_url} and type "test" in the search box',
llm=interactive_llm,
browser_session=browser_session,
save_conversation_path=str(conversation_path),
generate_gif=str(gif_path),
)
history: AgentHistoryList = await agent.run(max_steps=5)
result = history.final_result()
assert result is not None
# Check conversation files in directory
conversation_files = list(conversation_path.glob('conversation_*.txt'))
assert len(conversation_files) > 0, 'Conversation file was not created'
# Check GIF
assert gif_path.exists(), 'GIF was not created'
# Check video directory
assert video_dir.exists(), 'Video directory was not created'
finally:
await browser_session.stop()
# Check files created after browser close
video_files = list(video_dir.glob('*.webm'))
assert len(video_files) > 0, 'No video files were created'
assert har_path.exists(), 'HAR file was not created'
# Verify HAR file
har_content = json.loads(har_path.read_text())
assert 'log' in har_content and 'entries' in har_content['log'], 'Invalid HAR structure'
assert trace_dir.exists(), 'Trace directory was not created'
trace_files = list(trace_dir.glob('*.zip'))
assert len(trace_files) > 0, 'No trace files were created'
# Verify trace file
trace_file = trace_files[0]
assert zipfile.is_zipfile(trace_file), 'Trace file is not a valid ZIP'

View File

@@ -10,6 +10,7 @@ Tests cover:
"""
import asyncio
import json
import logging
from pathlib import Path
@@ -22,6 +23,7 @@ from browser_use.browser.profile import (
BrowserProfile,
)
from browser_use.browser.session import BrowserSession
from tests.ci.mocks import create_mock_llm
# Set up test logging
logger = logging.getLogger('browser_session_start_tests')
@@ -605,61 +607,6 @@ class TestBrowserSessionReusePatterns:
return mock
def create_mock_llm_with_actions(self, action_sequence):
"""Factory to create mock LLMs with specific action sequences"""
from unittest.mock import MagicMock
from langchain_core.language_models.chat_models import BaseChatModel
# Create a MagicMock that supports dictionary-style access
mock = MagicMock(spec=BaseChatModel)
# Skip verification by setting these attributes
mock._verified_api_keys = True
mock._verified_tool_calling_method = 'raw'
# Track which action to return next
action_index = 0
# Mock the invoke method to return actions in sequence
def mock_invoke(*args, **kwargs):
nonlocal action_index
response = MagicMock()
if action_index < len(action_sequence):
response.content = action_sequence[action_index]
action_index += 1
else:
# Default to done action if we run out of actions
response.content = """
{
"current_state": {
"evaluation_previous_goal": "All actions completed",
"memory": "Task completed",
"next_goal": "Complete the task"
},
"action": [
{
"done": {
"text": "Task completed successfully",
"success": true
}
}
]
}
"""
return response
mock.invoke = mock_invoke
# Create an async version of the mock_invoke
async def mock_ainvoke(*args, **kwargs):
return mock_invoke(*args, **kwargs)
mock.ainvoke = mock_ainvoke
return mock
async def test_sequential_agents_same_profile_different_browser(self, mock_llm):
"""Test Sequential Agents, Same Profile, Different Browser pattern"""
from browser_use import Agent
@@ -821,9 +768,9 @@ class TestBrowserSessionReusePatterns:
# Create 3 agents sharing the same browser session
# Each gets its own mock LLM with the same action sequence
mock_llm1 = self.create_mock_llm_with_actions([tab_creation_action, done_action])
mock_llm2 = self.create_mock_llm_with_actions([tab_creation_action, done_action])
mock_llm3 = self.create_mock_llm_with_actions([tab_creation_action, done_action])
mock_llm1 = create_mock_llm([tab_creation_action, done_action])
mock_llm2 = create_mock_llm([tab_creation_action, done_action])
mock_llm3 = create_mock_llm([tab_creation_action, done_action])
agent1 = Agent(
task='First parallel task...',
@@ -933,7 +880,7 @@ class TestBrowserSessionReusePatterns:
# Create a shared profile with storage state
with tempfile.NamedTemporaryFile(suffix='.json', delete=False, mode='w') as f:
# Write minimal valid storage state
f.write('{"cookies": [], "origins": []}')
f.write('{"cookies": [], "origins": [{"origin": "https://example.com", "localStorage": []}]}')
auth_json_path = f.name
# Convert to Path object
@@ -980,6 +927,11 @@ class TestBrowserSessionReusePatterns:
# Verify storage state file exists
assert Path(auth_json_path).exists()
# Verify storage state file was not wiped
storage_state = json.loads(auth_json_path.read_text())
assert 'origins' in storage_state
assert len(storage_state['origins']) > 0
finally:
await window1.kill()
await window2.kill()

View File

@@ -1338,3 +1338,107 @@ class TestControllerIntegration:
# Verify the click actually had an effect on the page
result_text = await page.evaluate("document.getElementById('result').textContent")
assert result_text == expected_result_text, f"Expected result text '{expected_result_text}', got '{result_text}'"
async def test_empty_css_selector_fallback(self, controller, browser_session, httpserver):
"""Test that clicking elements with empty CSS selectors falls back to XPath."""
# Create a test page with an element that would produce an empty CSS selector
# This could happen with elements that have no tag name or unusual XPath structures
httpserver.expect_request('/empty_css_test').respond_with_data(
"""
<html>
<head><title>Empty CSS Selector Test</title></head>
<body>
<div id="container">
<!-- Element with minimal attributes that might produce empty CSS selector -->
<custom-element>Click Me</custom-element>
<div id="result">Not clicked</div>
</div>
<script>
// Add click handler to the custom element
document.querySelector('custom-element').addEventListener('click', function() {
document.getElementById('result').textContent = 'Clicked!';
});
</script>
</body>
</html>
""",
content_type='text/html',
)
# Navigate to the test page
page = await browser_session.get_current_page()
await page.goto(httpserver.url_for('/empty_css_test'))
await page.wait_for_load_state()
# Get the page state which includes clickable elements
state = await browser_session.get_state_summary(cache_clickable_elements_hashes=False)
# Find the custom element index
custom_element_index = None
for index, element in state.selector_map.items():
if element.tag_name == 'custom-element':
custom_element_index = index
break
assert custom_element_index is not None, 'Could not find custom-element in selector map'
# Mock a scenario where CSS selector generation returns empty string
# by temporarily patching the method (we'll test the actual fallback behavior)
original_method = browser_session._enhanced_css_selector_for_element
empty_css_called = False
def mock_css_selector(element, include_dynamic_attributes=True):
nonlocal empty_css_called
# Return empty string for our custom element to trigger fallback
if element.tag_name == 'custom-element':
empty_css_called = True
return ''
return original_method(element, include_dynamic_attributes)
# Temporarily replace the method
browser_session._enhanced_css_selector_for_element = mock_css_selector
try:
# Create click action for the custom element
click_action = {'click_element_by_index': ClickElementAction(index=custom_element_index)}
class ClickActionModel(ActionModel):
click_element_by_index: ClickElementAction | None = None
# Execute the click - should use XPath fallback
result = await controller.act(ClickActionModel(**click_action), browser_session)
# Verify the click succeeded
assert result.error is None, f'Click failed with error: {result.error}'
assert result.success is True, 'Click was not successful'
assert empty_css_called, 'CSS selector method was not called'
# Verify the element was actually clicked by checking the result
result_text = await page.evaluate("document.getElementById('result').textContent")
assert result_text == 'Clicked!', f'Element was not clicked, result text: {result_text}'
finally:
# Restore the original method
browser_session._enhanced_css_selector_for_element = original_method
async def test_go_to_url_network_error(self, controller, browser_session):
"""Test that go_to_url handles network errors gracefully instead of throwing hard errors."""
# Create action model for go_to_url with an invalid domain
action_data = {'go_to_url': GoToUrlAction(url='https://www.nonexistentdndbeyond.com/')}
# Create the ActionModel instance
class GoToUrlActionModel(ActionModel):
go_to_url: GoToUrlAction | None = None
action_model = GoToUrlActionModel(**action_data)
# Execute the action - should return soft error instead of throwing
result = await controller.act(action_model, browser_session)
# Verify the result
assert isinstance(result, ActionResult)
assert result.success is False, 'Expected success=False for network error'
assert result.error is not None, 'Expected error message to be set'
assert 'Site unavailable' in result.error, f"Expected 'Site unavailable' in error message, got: {result.error}"
assert 'nonexistentdndbeyond.com' in result.error, 'Expected URL in error message'
assert result.include_in_memory is True, 'Network errors should be included in memory'

View File

@@ -29,22 +29,13 @@ from browser_use.controller.views import (
NoParamsAction,
SearchGoogleAction,
)
from tests.ci.mocks import create_mock_llm
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class MockLLM:
"""Mock LLM for testing"""
async def ainvoke(self, prompt):
class MockResponse:
content = 'Mocked LLM response'
return MockResponse()
class TestContext:
"""Simple context for testing"""
@@ -92,7 +83,7 @@ def base_url(http_server):
@pytest.fixture(scope='module')
def mock_llm():
"""Create a mock LLM"""
return MockLLM()
return create_mock_llm()
@pytest.fixture(scope='function')
@@ -240,7 +231,9 @@ class TestActionRegistryParameterPatterns:
assert isinstance(result, ActionResult)
assert 'Text: hello' in result.extracted_content
assert base_url in result.extracted_content
assert 'LLM: Mocked LLM response' in result.extracted_content
# The mock LLM returns a JSON response
assert 'LLM: \n\t{\n\t\t"current_state":' in result.extracted_content
assert '"Task completed successfully"' in result.extracted_content
assert 'Files: 2' in result.extracted_content
async def test_no_params_action(self, registry, browser_session):
@@ -834,7 +827,7 @@ class TestDecoratedFunctionBehavior:
special_context = {
'page': MockPage(),
'browser_session': None,
'page_extraction_llm': MockLLM(),
'page_extraction_llm': create_mock_llm(),
'context': {'extra': 'data'},
'unknown_param': 'ignored',
}

View File

@@ -1,91 +0,0 @@
"""
Simple try of the agent.
@dev You need to add OPENAI_API_KEY to your environment variables.
"""
import os
import shutil
import sys
from browser_use.browser import BrowserProfile, BrowserSession
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from langchain_openai import ChatOpenAI
from browser_use import Agent, AgentHistoryList
llm = ChatOpenAI(model='gpt-4o')
async def test_save_conversation_contains_slash():
if os.path.exists('./logs'):
shutil.rmtree('./logs')
browser_session = BrowserSession(browser_profile=BrowserProfile(headless=True, disable_security=True))
await browser_session.start()
try:
agent = Agent(
task=('go to google.com and search for text "hi there"'),
llm=llm,
browser_session=browser_session,
save_conversation_path='logs/conversation',
)
history: AgentHistoryList = await agent.run(20)
result = history.final_result()
assert result is not None
assert os.path.exists('./logs'), 'logs directory was not created'
assert os.path.exists('./logs/conversation_2.txt'), 'logs file was not created'
finally:
await browser_session.stop()
async def test_save_conversation_not_contains_slash():
if os.path.exists('./logs'):
shutil.rmtree('./logs')
browser_session = BrowserSession(browser_profile=BrowserProfile(headless=True, disable_security=True))
await browser_session.start()
try:
agent = Agent(
task=('go to google.com and search for text "hi there"'),
llm=llm,
browser_session=browser_session,
save_conversation_path='logs',
)
history: AgentHistoryList = await agent.run(20)
result = history.final_result()
assert result is not None
assert os.path.exists('./logs'), 'logs directory was not created'
assert os.path.exists('./logs/_2.txt'), 'logs file was not created'
finally:
await browser_session.stop()
async def test_save_conversation_deep_directory():
if os.path.exists('./logs'):
shutil.rmtree('./logs')
browser_session = BrowserSession(browser_profile=BrowserProfile(headless=True, disable_security=True))
await browser_session.start()
try:
agent = Agent(
task=('go to google.com and search for text "hi there"'),
llm=llm,
browser_session=browser_session,
save_conversation_path='logs/deep/directory/conversation',
)
history: AgentHistoryList = await agent.run(20)
result = history.final_result()
assert result is not None
assert os.path.exists('./logs/deep/directory'), 'logs directory was not created'
assert os.path.exists('./logs/deep/directory/conversation_2.txt'), 'logs file was not created'
finally:
await browser_session.stop()