Merge branch 'main' into fix-rerunning

This commit is contained in:
Magnus Müller
2025-09-16 22:22:32 -07:00
committed by GitHub
10 changed files with 231 additions and 66 deletions

View File

@@ -366,10 +366,7 @@ class MessageManager:
def _set_message_with_type(self, message: BaseMessage, message_type: Literal['system', 'state']) -> None:
"""Replace a specific state message slot with a new message"""
# filter out sensitive data from the message
if self.sensitive_data:
message = self._filter_sensitive_data(message)
# Don't filter system and state messages - they should contain placeholder tags or normal conversation
if message_type == 'system':
self.state.history.system_message = message
elif message_type == 'state':
@@ -379,10 +376,7 @@ class MessageManager:
def _add_context_message(self, message: BaseMessage) -> None:
"""Add a contextual message specific to this step (e.g., validation errors, retry instructions, timeout warnings)"""
# filter out sensitive data from the message
if self.sensitive_data:
message = self._filter_sensitive_data(message)
# Don't filter context messages - they should contain normal conversation or error messages
self.state.history.context_messages.append(message)
@time_execution_sync('--filter_sensitive_data')

View File

@@ -1934,10 +1934,10 @@ class Agent(Generic[Context, AgentStructuredOutput]):
return await self.rerun_history(history, **kwargs)
def save_history(self, file_path: str | Path | None = None) -> None:
"""Save the history to a file"""
"""Save the history to a file with sensitive data filtering"""
if not file_path:
file_path = 'AgentHistory.json'
self.history.save_to_file(file_path)
self.history.save_to_file(file_path, sensitive_data=self.sensitive_data)
def pause(self) -> None:
"""Pause the agent before the next step"""
@@ -2124,7 +2124,9 @@ class Agent(Generic[Context, AgentStructuredOutput]):
# AgentHistoryList methods
'structured_output': structured_output_json,
'final_result_response': final_result,
'complete_history': _get_complete_history_without_screenshots(self.history.model_dump()),
'complete_history': _get_complete_history_without_screenshots(
self.history.model_dump(sensitive_data=self.sensitive_data)
),
},
}

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import json
import logging
import traceback
from dataclasses import dataclass
from pathlib import Path
@@ -26,6 +27,8 @@ from browser_use.llm.base import BaseChatModel
from browser_use.tokens.views import UsageSummary
from browser_use.tools.registry.views import ActionModel
logger = logging.getLogger(__name__)
class AgentSettings(BaseModel):
"""Configuration options for the Agent"""
@@ -265,13 +268,78 @@ class AgentHistory(BaseModel):
elements.append(None)
return elements
def model_dump(self, **kwargs) -> dict[str, Any]:
"""Custom serialization handling circular references"""
def _filter_sensitive_data_from_string(self, value: str, sensitive_data: dict[str, str | dict[str, str]] | None) -> str:
"""Filter out sensitive data from a string value"""
if not sensitive_data:
return value
# Collect all sensitive values, immediately converting old format to new format
sensitive_values: dict[str, str] = {}
# Process all sensitive data entries
for key_or_domain, content in sensitive_data.items():
if isinstance(content, dict):
# Already in new format: {domain: {key: value}}
for key, val in content.items():
if val: # Skip empty values
sensitive_values[key] = val
elif content: # Old format: {key: value} - convert to new format internally
# We treat this as if it was {'http*://*': {key_or_domain: content}}
sensitive_values[key_or_domain] = content
# If there are no valid sensitive data entries, just return the original value
if not sensitive_values:
return value
# Replace all valid sensitive data values with their placeholder tags
for key, val in sensitive_values.items():
value = value.replace(val, f'<secret>{key}</secret>')
return value
def _filter_sensitive_data_from_dict(
self, data: dict[str, Any], sensitive_data: dict[str, str | dict[str, str]] | None
) -> dict[str, Any]:
"""Recursively filter sensitive data from a dictionary"""
if not sensitive_data:
return data
filtered_data = {}
for key, value in data.items():
if isinstance(value, str):
filtered_data[key] = self._filter_sensitive_data_from_string(value, sensitive_data)
elif isinstance(value, dict):
filtered_data[key] = self._filter_sensitive_data_from_dict(value, sensitive_data)
elif isinstance(value, list):
filtered_data[key] = [
self._filter_sensitive_data_from_string(item, sensitive_data)
if isinstance(item, str)
else self._filter_sensitive_data_from_dict(item, sensitive_data)
if isinstance(item, dict)
else item
for item in value
]
else:
filtered_data[key] = value
return filtered_data
def model_dump(self, sensitive_data: dict[str, str | dict[str, str]] | None = None, **kwargs) -> dict[str, Any]:
"""Custom serialization handling circular references and filtering sensitive data"""
# Handle action serialization
model_output_dump = None
if self.model_output:
action_dump = [action.model_dump(exclude_none=True) for action in self.model_output.action]
# Filter sensitive data only from input_text action parameters if sensitive_data is provided
if sensitive_data:
action_dump = [
self._filter_sensitive_data_from_dict(action, sensitive_data)
if action.get('name') == 'input_text'
else action
for action in action_dump
]
model_output_dump = {
'evaluation_previous_goal': self.model_output.evaluation_previous_goal,
'memory': self.model_output.memory,
@@ -282,9 +350,13 @@ class AgentHistory(BaseModel):
if self.model_output.thinking is not None:
model_output_dump['thinking'] = self.model_output.thinking
# Handle result serialization - don't filter ActionResult data
# as it should contain meaningful information for the agent
result_dump = [r.model_dump(exclude_none=True) for r in self.result]
return {
'model_output': model_output_dump,
'result': [r.model_dump(exclude_none=True) for r in self.result],
'result': result_dump,
'state': self.state.to_dict(),
'metadata': self.metadata.model_dump() if self.metadata else None,
}
@@ -325,11 +397,11 @@ class AgentHistoryList(BaseModel, Generic[AgentStructuredOutput]):
"""Representation of the AgentHistoryList object"""
return self.__str__()
def save_to_file(self, filepath: str | Path) -> None:
"""Save history to JSON file with proper serialization"""
def save_to_file(self, filepath: str | Path, sensitive_data: dict[str, str | dict[str, str]] | None = None) -> None:
"""Save history to JSON file with proper serialization and optional sensitive data filtering"""
try:
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
data = self.model_dump()
data = self.model_dump(sensitive_data=sensitive_data)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
except Exception as e:

View File

@@ -144,6 +144,8 @@ class TypeTextEvent(ElementSelectedEvent[dict | None]):
node: 'EnhancedDOMTreeNode'
text: str
clear_existing: bool = True
is_sensitive: bool = False # Flag to indicate if text contains sensitive data
sensitive_key_name: str | None = None # Name of the sensitive key being typed (e.g., 'username', 'password')
event_timeout: float | None = _get_timeout('TIMEOUT_TypeTextEvent', 15.0) # seconds

View File

@@ -127,15 +127,32 @@ class DefaultActionWatchdog(BaseWatchdog):
if not element_node.element_index or element_node.element_index == 0:
# Type to the page without focusing any specific element
await self._type_to_page(event.text)
self.logger.info(f'⌨️ Typed "{event.text}" to the page (current focus)')
# Log with sensitive data protection
if event.is_sensitive:
if event.sensitive_key_name:
self.logger.info(f'⌨️ Typed <{event.sensitive_key_name}> to the page (current focus)')
else:
self.logger.info('⌨️ Typed <sensitive> to the page (current focus)')
else:
self.logger.info(f'⌨️ Typed "{event.text}" to the page (current focus)')
return None # No coordinates available for page typing
else:
try:
# Try to type to the specific element
input_metadata = await self._input_text_element_node_impl(
element_node, event.text, clear_existing=event.clear_existing or (not event.text)
element_node,
event.text,
clear_existing=event.clear_existing or (not event.text),
is_sensitive=event.is_sensitive,
)
self.logger.info(f'⌨️ Typed "{event.text}" into element with index {index_for_logging}')
# Log with sensitive data protection
if event.is_sensitive:
if event.sensitive_key_name:
self.logger.info(f'⌨️ Typed <{event.sensitive_key_name}> into element with index {index_for_logging}')
else:
self.logger.info(f'⌨️ Typed <sensitive> into element with index {index_for_logging}')
else:
self.logger.info(f'⌨️ Typed "{event.text}" into element with index {index_for_logging}')
self.logger.debug(f'Element xpath: {element_node.xpath}')
return input_metadata # Return coordinates if available
except Exception as e:
@@ -146,7 +163,14 @@ class DefaultActionWatchdog(BaseWatchdog):
except Exception as e:
pass
await self._type_to_page(event.text)
self.logger.info(f'⌨️ Typed "{event.text}" to the page as fallback')
# Log with sensitive data protection
if event.is_sensitive:
if event.sensitive_key_name:
self.logger.info(f'⌨️ Typed <{event.sensitive_key_name}> to the page as fallback')
else:
self.logger.info('⌨️ Typed <sensitive> to the page as fallback')
else:
self.logger.info(f'⌨️ Typed "{event.text}" to the page as fallback')
return None # No coordinates available for fallback typing
# Note: We don't clear cached state here - let multi_act handle DOM change detection
@@ -966,7 +990,7 @@ class DefaultActionWatchdog(BaseWatchdog):
return False
async def _input_text_element_node_impl(
self, element_node: EnhancedDOMTreeNode, text: str, clear_existing: bool = True
self, element_node: EnhancedDOMTreeNode, text: str, clear_existing: bool = True, is_sensitive: bool = False
) -> dict | None:
"""
Input text into an element using pure CDP with improved focus fallbacks.
@@ -1036,7 +1060,12 @@ class DefaultActionWatchdog(BaseWatchdog):
# Step 3: Type the text character by character using proper human-like key events
# This emulates exactly how a human would type, which modern websites expect
self.logger.debug(f'🎯 Typing text character by character: "{text}"')
if is_sensitive:
# Note: sensitive_key_name is not passed to this low-level method,
# but we could extend the signature if needed for more granular logging
self.logger.debug('🎯 Typing <sensitive> character by character')
else:
self.logger.debug(f'🎯 Typing text character by character: "{text}"')
for i, char in enumerate(text):
# Handle newline characters as Enter key

View File

@@ -728,9 +728,40 @@ class BrowserUseServer:
from browser_use.browser.events import TypeTextEvent
event = self.browser_session.event_bus.dispatch(TypeTextEvent(node=element, text=text))
# Conservative heuristic to detect potentially sensitive data
# Only flag very obvious patterns to minimize false positives
is_potentially_sensitive = len(text) >= 6 and (
# Email pattern: contains @ and a domain-like suffix
('@' in text and '.' in text.split('@')[-1] if '@' in text else False)
# Mixed alphanumeric with reasonable complexity (likely API keys/tokens)
or (
len(text) >= 16
and any(char.isdigit() for char in text)
and any(char.isalpha() for char in text)
and any(char in '.-_' for char in text)
)
)
# Use generic key names to avoid information leakage about detection patterns
sensitive_key_name = None
if is_potentially_sensitive:
if '@' in text and '.' in text.split('@')[-1]:
sensitive_key_name = 'email'
else:
sensitive_key_name = 'credential'
event = self.browser_session.event_bus.dispatch(
TypeTextEvent(node=element, text=text, is_sensitive=is_potentially_sensitive, sensitive_key_name=sensitive_key_name)
)
await event
return f"Typed '{text}' into element {index}"
if is_potentially_sensitive:
if sensitive_key_name:
return f'Typed <{sensitive_key_name}> into element {index}'
else:
return f'Typed <sensitive> into element {index}'
else:
return f"Typed '{text}' into element {index}"
async def _get_browser_state(self, include_screenshot: bool = False) -> str:
"""Get current browser state."""

View File

@@ -354,6 +354,10 @@ class Registry(Generic[Context]):
'file_system': file_system,
}
# Only pass sensitive_data to actions that explicitly need it (input_text)
if action_name == 'input_text':
special_context['sensitive_data'] = sensitive_data
# Add CDP-related parameters if browser_session is available
if browser_session:
# Add page_url

View File

@@ -65,6 +65,25 @@ Context = TypeVar('Context')
T = TypeVar('T', bound=BaseModel)
def _detect_sensitive_key_name(text: str, sensitive_data: dict[str, str | dict[str, str]] | None) -> str | None:
"""Detect which sensitive key name corresponds to the given text value."""
if not sensitive_data or not text:
return None
# Collect all sensitive values and their keys
for domain_or_key, content in sensitive_data.items():
if isinstance(content, dict):
# New format: {domain: {key: value}}
for key, value in content.items():
if value and value == text:
return key
elif content: # Old format: {key: value}
if content == text:
return domain_or_key
return None
def handle_browser_error(e: BrowserError) -> ActionResult:
if e.long_term_memory is not None:
if e.short_term_memory is not None:
@@ -311,7 +330,12 @@ class Tools(Generic[Context]):
'Input text into an input interactive element. Only input text into indices that are inside your current browser_state. Never input text into indices that are not inside your current browser_state.',
param_model=InputTextAction,
)
async def input_text(params: InputTextAction, browser_session: BrowserSession, has_sensitive_data: bool = False):
async def input_text(
params: InputTextAction,
browser_session: BrowserSession,
has_sensitive_data: bool = False,
sensitive_data: dict[str, str | dict[str, str]] | None = None,
):
# Look up the node from the selector map
node = await browser_session.get_element_by_index(params.index)
if node is None:
@@ -319,18 +343,41 @@ class Tools(Generic[Context]):
# Dispatch type text event with node
try:
# Detect which sensitive key is being used
sensitive_key_name = None
if has_sensitive_data and sensitive_data:
sensitive_key_name = _detect_sensitive_key_name(params.text, sensitive_data)
event = browser_session.event_bus.dispatch(
TypeTextEvent(node=node, text=params.text, clear_existing=params.clear_existing)
TypeTextEvent(
node=node,
text=params.text,
clear_existing=params.clear_existing,
is_sensitive=has_sensitive_data,
sensitive_key_name=sensitive_key_name,
)
)
await event
input_metadata = await event.event_result(raise_if_any=True, raise_if_none=False)
msg = f"Input '{params.text}' into element {params.index}."
logger.debug(msg)
# Create message with sensitive data handling
if has_sensitive_data:
if sensitive_key_name:
msg = f'Input {sensitive_key_name} into element {params.index}.'
log_msg = f'Input <{sensitive_key_name}> into element {params.index}.'
else:
msg = f'Input sensitive data into element {params.index}.'
log_msg = f'Input <sensitive> into element {params.index}.'
else:
msg = f"Input '{params.text}' into element {params.index}."
log_msg = msg
logger.debug(log_msg)
# Include input coordinates in metadata if available
return ActionResult(
extracted_content=msg,
long_term_memory=f"Input '{params.text}' into element {params.index}.",
long_term_memory=msg,
metadata=input_metadata if isinstance(input_metadata, dict) else None,
)
except BrowserError as e:

View File

@@ -71,33 +71,28 @@ You can configure browser-use through environment variables:
The MCP server exposes these browser automation tools:
### Autonomous Agent Tools
- **`run_browser_task`** - Run a complete browser automation task with an AI agent
- **`run_browser_task_streaming`** - Same as above but with streaming responses
- **`retry_with_browser_use_agent`** - Run a complete browser automation task with an AI agent (use as last resort when direct control fails)
### Direct Browser Control
- **`browser_navigate`** - Navigate to a URL
- **`browser_click`** - Click on an element by index
- **`browser_type`** - Type text into an element
- **`browser_get_state`** - Get current page state and screenshot
- **`browser_get_state`** - Get current page state and interactive elements
- **`browser_scroll`** - Scroll the page
- **`browser_go_back`** - Go back in browser history
- **`browser_go_forward`** - Go forward in browser history
- **`browser_refresh`** - Refresh the current page
### Tab Management
- **`browser_list_tabs`** - List all open browser tabs
- **`browser_switch_tab`** - Switch to a specific tab
- **`browser_close_tab`** - Close a tab
- **`browser_new_tab`** - Open a new tab
### Content Extraction
- **`browser_extract_content`** - Extract structured content from the current page
- **`browser_take_screenshot`** - Take a screenshot of the current page
### File Operations
- **`read_file`** - Read content from a file
- **`write_file`** - Write content to a file
- **`list_files`** - List files in a directory
### Session Management
- **`browser_list_sessions`** - List all active browser sessions with details
- **`browser_close_session`** - Close a specific browser session by ID
- **`browser_close_all`** - Close all active browser sessions
## Example Usage
@@ -140,12 +135,12 @@ async def use_browser_mcp():
)
print(result.content[0].text)
# Take a screenshot
# Get page state
result = await session.call_tool(
"browser_take_screenshot",
arguments={}
"browser_get_state",
arguments={"include_screenshot": True}
)
print("Screenshot taken!")
print("Page state retrieved!")
asyncio.run(use_browser_mcp())
```

View File

@@ -9,7 +9,6 @@ from dotenv import load_dotenv
load_dotenv()
from browser_use import Agent, ChatOpenAI
from browser_use.browser import BrowserProfile
# Initialize the model
llm = ChatOpenAI(
@@ -21,33 +20,23 @@ llm = ChatOpenAI(
# Advanced case: domain-specific credentials with reusable data
# Define a single credential set that can be reused
company_credentials = {'company_username': 'user@example.com', 'company_password': 'securePassword123'}
company_credentials: dict[str, str] = {'telephone': '9123456789', 'email': 'user@example.com', 'name': 'John Doe'}
# Map the same credentials to multiple domains for secure access control
# Type annotation to satisfy pyright
sensitive_data = {
'https://example.com': company_credentials,
'https://admin.example.com': company_credentials,
'https://*.example-staging.com': company_credentials,
'http*://test.example.com': company_credentials,
sensitive_data: dict[str, str | dict[str, str]] = {
# 'https://example.com': company_credentials,
# 'https://admin.example.com': company_credentials,
# 'https://*.example-staging.com': company_credentials,
# 'http*://test.example.com': company_credentials,
'httpbin.org': company_credentials,
# # You can also add domain-specific credentials
# 'https://google.com': {'g_email': 'user@gmail.com', 'g_pass': 'google_password'},
'this_email_works_on_all_domains': 'test@test.com',
# 'https://google.com': {'g_email': 'user@gmail.com', 'g_pass': 'google_password'}
}
# Update task to use one of the credentials above
task = 'Go to google.com and put the login information in the search bar.'
task = 'Go to https://httpbin.org/forms/post and put the secure information in the relevant fields.'
# Always set allowed_domains when using sensitive_data for security
from browser_use.browser.session import BrowserSession
browser_session = BrowserSession(
browser_profile=BrowserProfile(
allowed_domains=list(sensitive_data.keys())
+ ['https://*.trusted-partner.com'] # Domain patterns from sensitive_data + additional allowed domains
)
)
agent = Agent(task=task, llm=llm, sensitive_data=sensitive_data, browser_session=browser_session)
agent = Agent(task=task, llm=llm, sensitive_data=sensitive_data)
async def main():