mirror of
https://github.com/browser-use/browser-use
synced 2026-05-06 17:52:15 +02:00
Refactored the `_map_action_to_playwright` method in `PlaywrightScriptGenerator` to address its excessive length and complexity stemming from numerous if/elif blocks. Introduced a dictionary (`_action_handlers`) to map action type strings to dedicated private handler methods (e.g., `_map_go_to_url`, `_map_input_text`). The main `_map_action_to_playwright` method now delegates the call to the appropriate handler via this dictionary lookup. This change significantly simplifies the main mapping logic, improves code readability, isolates action-specific logic, and makes the overall structure easier to maintain and extend. It directly resolves the concern about the method's length and error-proneness.
734 lines
40 KiB
Python
734 lines
40 KiB
Python
import json
|
|
import logging
|
|
import os # Added os import
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
# Import necessary config types
|
|
from browser_use.browser.browser import BrowserConfig
|
|
from browser_use.browser.context import BrowserContextConfig
|
|
# --- End Import ---
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Define a custom exception for action failures within the generated script
|
|
class PlaywrightActionError(Exception):
|
|
"""Custom exception for errors during Playwright script action execution."""
|
|
pass
|
|
|
|
class PlaywrightScriptGenerator:
|
|
"""Generates a Playwright script from AgentHistoryList."""
|
|
|
|
def __init__(
|
|
self,
|
|
history_list: List[Dict[str, Any]],
|
|
sensitive_data_keys: Optional[List[str]] = None,
|
|
browser_config: Optional[BrowserConfig] = None,
|
|
context_config: Optional[BrowserContextConfig] = None,
|
|
):
|
|
"""
|
|
Initializes the script generator.
|
|
|
|
Args:
|
|
history_list: A list of dictionaries, where each dictionary represents an AgentHistory item.
|
|
Expected to be raw dictionaries from `AgentHistoryList.model_dump()`.
|
|
sensitive_data_keys: A list of keys used as placeholders for sensitive data.
|
|
browser_config: Configuration from the original Browser instance.
|
|
context_config: Configuration from the original BrowserContext instance.
|
|
"""
|
|
self.history = history_list
|
|
self.sensitive_data_keys = sensitive_data_keys or []
|
|
self.browser_config = browser_config
|
|
self.context_config = context_config
|
|
self._imports_helpers_added = False
|
|
self._page_counter = 0 # Track pages for tab management
|
|
|
|
# Dictionary mapping action types to handler methods
|
|
self._action_handlers = {
|
|
'go_to_url': self._map_go_to_url,
|
|
'wait': self._map_wait,
|
|
'input_text': self._map_input_text,
|
|
'click_element': self._map_click_element,
|
|
'click_element_by_index': self._map_click_element, # Map legacy action
|
|
'scroll_down': self._map_scroll_down,
|
|
'scroll_up': self._map_scroll_up,
|
|
'send_keys': self._map_send_keys,
|
|
'go_back': self._map_go_back,
|
|
'open_tab': self._map_open_tab,
|
|
'close_tab': self._map_close_tab,
|
|
'switch_tab': self._map_switch_tab,
|
|
'search_google': self._map_search_google,
|
|
'drag_drop': self._map_drag_drop,
|
|
'extract_content': self._map_extract_content,
|
|
'click_download_button': self._map_click_download_button,
|
|
'done': self._map_done,
|
|
}
|
|
|
|
def _generate_browser_launch_args(self) -> str:
|
|
"""Generates the arguments string for browser launch based on BrowserConfig."""
|
|
if not self.browser_config:
|
|
# Default launch if no config provided
|
|
return "headless=False"
|
|
|
|
args_dict = {
|
|
"headless": self.browser_config.headless,
|
|
# Add other relevant launch options here based on self.browser_config
|
|
# Example: 'proxy': self.browser_config.proxy.model_dump() if self.browser_config.proxy else None
|
|
# Example: 'args': self.browser_config.extra_browser_args # Be careful inheriting args
|
|
}
|
|
if self.browser_config.proxy:
|
|
args_dict["proxy"] = self.browser_config.proxy.model_dump()
|
|
|
|
# Filter out None values
|
|
args_dict = {k: v for k, v in args_dict.items() if v is not None}
|
|
|
|
# Format as keyword arguments string
|
|
args_str = ", ".join(f"{key}={repr(value)}" for key, value in args_dict.items())
|
|
return args_str
|
|
|
|
def _generate_context_options(self) -> str:
|
|
"""Generates the options string for context creation based on BrowserContextConfig."""
|
|
if not self.context_config:
|
|
return "" # Default context
|
|
|
|
options_dict = {}
|
|
|
|
# Map relevant BrowserContextConfig fields to Playwright context options
|
|
if self.context_config.user_agent:
|
|
options_dict["user_agent"] = self.context_config.user_agent
|
|
if self.context_config.locale:
|
|
options_dict["locale"] = self.context_config.locale
|
|
if self.context_config.permissions:
|
|
options_dict["permissions"] = self.context_config.permissions
|
|
if self.context_config.geolocation:
|
|
options_dict["geolocation"] = self.context_config.geolocation
|
|
if self.context_config.timezone_id:
|
|
options_dict["timezone_id"] = self.context_config.timezone_id
|
|
if self.context_config.http_credentials:
|
|
options_dict["http_credentials"] = self.context_config.http_credentials
|
|
if self.context_config.is_mobile is not None:
|
|
options_dict["is_mobile"] = self.context_config.is_mobile
|
|
if self.context_config.has_touch is not None:
|
|
options_dict["has_touch"] = self.context_config.has_touch
|
|
if self.context_config.save_recording_path:
|
|
options_dict["record_video_dir"] = self.context_config.save_recording_path
|
|
if self.context_config.save_har_path:
|
|
options_dict["record_har_path"] = self.context_config.save_har_path
|
|
|
|
# Handle viewport/window size
|
|
if self.context_config.no_viewport:
|
|
options_dict["no_viewport"] = True
|
|
elif self.context_config.browser_window_size:
|
|
options_dict["viewport"] = {
|
|
"width": self.context_config.browser_window_size.width,
|
|
"height": self.context_config.browser_window_size.height
|
|
}
|
|
|
|
# Note: cookies_file and save_downloads_path are handled separately
|
|
|
|
# Filter out None values
|
|
options_dict = {k: v for k, v in options_dict.items() if v is not None}
|
|
|
|
# Format as keyword arguments string
|
|
options_str = ", ".join(f"{key}={repr(value)}" for key, value in options_dict.items())
|
|
return options_str
|
|
|
|
def _get_imports_and_helpers(self) -> List[str]:
|
|
"""Generates necessary import statements and helper functions."""
|
|
# Updated _try_locate_and_act to raise PlaywrightActionError on failure
|
|
try_locate_and_act_code = """
|
|
class PlaywrightActionError(Exception):
|
|
\"\"\"Custom exception for errors during Playwright script action execution.\"\"\"
|
|
pass
|
|
|
|
async def _try_locate_and_act(page: Page, selector: str, action_type: str, text: str | None = None, step_info: str = '') -> None:
|
|
\"\"\"
|
|
Attempts an action (click/fill) with XPath fallback by trimming prefixes.
|
|
Raises PlaywrightActionError if the action fails after all fallbacks.
|
|
\"\"\"
|
|
print(f"Attempting {action_type} ({step_info}) using selector: {repr(selector)}")
|
|
original_selector = selector
|
|
MAX_FALLBACKS = 50 # Increased fallbacks
|
|
# Increased timeouts for potentially slow pages
|
|
INITIAL_TIMEOUT = 10000 # Milliseconds for the first attempt (10 seconds)
|
|
FALLBACK_TIMEOUT = 1000 # Shorter timeout for fallback attempts (1 second)
|
|
|
|
try:
|
|
locator = page.locator(selector).first
|
|
if action_type == 'click':
|
|
await locator.click(timeout=INITIAL_TIMEOUT)
|
|
elif action_type == 'fill' and text is not None:
|
|
await locator.fill(text, timeout=INITIAL_TIMEOUT)
|
|
else:
|
|
# This case should ideally not happen if called correctly
|
|
raise PlaywrightActionError(f"Invalid action_type '{action_type}' or missing text for fill. ({step_info})")
|
|
print(f" Action '{action_type}' successful with original selector.")
|
|
await page.wait_for_timeout(500) # Wait after successful action
|
|
return # Successful exit
|
|
except Exception as e:
|
|
print(f" Warning: Action '{action_type}' failed with original selector ({repr(selector)}): {e}. Starting fallback...")
|
|
|
|
# Fallback only works for XPath selectors
|
|
if not selector.startswith('xpath='):
|
|
# Raise error immediately if not XPath, as fallback won't work
|
|
raise PlaywrightActionError(f"Action '{action_type}' failed. Fallback not possible for non-XPath selector: {repr(selector)}. ({step_info})")
|
|
|
|
xpath_parts = selector.split('=', 1)
|
|
if len(xpath_parts) < 2:
|
|
raise PlaywrightActionError(f"Action '{action_type}' failed. Could not extract XPath string from selector: {repr(selector)}. ({step_info})")
|
|
xpath = xpath_parts[1] # Correctly get the XPath string
|
|
|
|
segments = [seg for seg in xpath.split('/') if seg]
|
|
|
|
for i in range(1, min(MAX_FALLBACKS + 1, len(segments))):
|
|
trimmed_xpath_raw = '/'.join(segments[i:])
|
|
fallback_xpath = f'xpath=//{trimmed_xpath_raw}'
|
|
|
|
print(f' Fallback attempt {i}/{MAX_FALLBACKS}: Trying selector: {repr(fallback_xpath)}')
|
|
try:
|
|
locator = page.locator(fallback_xpath).first
|
|
if action_type == 'click':
|
|
await locator.click(timeout=FALLBACK_TIMEOUT)
|
|
elif action_type == 'fill' and text is not None:
|
|
try:
|
|
await locator.clear(timeout=FALLBACK_TIMEOUT)
|
|
await page.wait_for_timeout(100)
|
|
except Exception as clear_error:
|
|
print(f" Warning: Failed to clear field during fallback ({step_info}): {clear_error}")
|
|
await locator.fill(text, timeout=FALLBACK_TIMEOUT)
|
|
|
|
print(f" Action '{action_type}' successful with fallback selector: {repr(fallback_xpath)}")
|
|
await page.wait_for_timeout(500)
|
|
return # Successful exit after fallback
|
|
except Exception as fallback_e:
|
|
print(f' Fallback attempt {i} failed: {fallback_e}')
|
|
if i == MAX_FALLBACKS:
|
|
# Raise exception after exhausting fallbacks
|
|
raise PlaywrightActionError(f"Action '{action_type}' failed after {MAX_FALLBACKS} fallback attempts. Original selector: {repr(original_selector)}. ({step_info})")
|
|
|
|
# This part should not be reachable if logic is correct, but added as safeguard
|
|
raise PlaywrightActionError(f"Action '{action_type}' failed unexpectedly for {repr(original_selector)}. ({step_info})")
|
|
|
|
"""
|
|
return [
|
|
"import asyncio",
|
|
"import json",
|
|
"import os",
|
|
"import sys",
|
|
"from pathlib import Path", # Added Path import
|
|
"import urllib.parse", # Needed for search_google
|
|
"from playwright.async_api import async_playwright, Page, BrowserContext", # Added BrowserContext
|
|
"from dotenv import load_dotenv",
|
|
"",
|
|
"# Load environment variables",
|
|
"load_dotenv(override=True)",
|
|
"",
|
|
"# --- Helper Function for Replacing Sensitive Data ---",
|
|
"def replace_sensitive_data(text: str, sensitive_map: dict) -> str:",
|
|
' """Replaces sensitive data placeholders in text."""',
|
|
" if not isinstance(text, str): return text",
|
|
" for placeholder, value in sensitive_map.items():",
|
|
" replacement_value = str(value) if value is not None else ''",
|
|
" text = text.replace(f'<secret>{placeholder}</secret>', replacement_value)",
|
|
" return text",
|
|
"",
|
|
"# --- Helper Function for Robust Action Execution ---",
|
|
try_locate_and_act_code, # Include the updated helper
|
|
""
|
|
]
|
|
|
|
def _get_sensitive_data_definitions(self) -> List[str]:
|
|
"""Generates the SENSITIVE_DATA dictionary definition."""
|
|
if not self.sensitive_data_keys:
|
|
return ["SENSITIVE_DATA = {}", ""]
|
|
|
|
lines = ["# Sensitive data placeholders mapped to environment variables"]
|
|
lines.append("SENSITIVE_DATA = {")
|
|
for key in self.sensitive_data_keys:
|
|
env_var_name = key.upper()
|
|
default_value_placeholder = f'YOUR_{env_var_name}'
|
|
lines.append(f' "{key}": os.getenv("{env_var_name}", {json.dumps(default_value_placeholder)}),')
|
|
lines.append("}")
|
|
lines.append("")
|
|
return lines
|
|
|
|
def _get_selector_for_action(self, history_item: dict, action_index_in_step: int) -> Optional[str]:
|
|
"""
|
|
Gets the selector (preferring XPath) for a given action index within a history step.
|
|
Formats the XPath correctly for Playwright.
|
|
"""
|
|
state = history_item.get('state')
|
|
if not isinstance(state, dict): return None
|
|
interacted_elements = state.get('interacted_element')
|
|
if not isinstance(interacted_elements, list): return None
|
|
if action_index_in_step >= len(interacted_elements): return None
|
|
element_data = interacted_elements[action_index_in_step]
|
|
if not isinstance(element_data, dict): return None
|
|
|
|
# Prioritize XPath
|
|
xpath = element_data.get('xpath')
|
|
if isinstance(xpath, str) and xpath.strip():
|
|
if not xpath.startswith('xpath=') and not xpath.startswith('/') and not xpath.startswith('//'):
|
|
xpath_selector = f'xpath=//{xpath}' # Make relative if not already
|
|
elif not xpath.startswith('xpath='):
|
|
xpath_selector = f'xpath={xpath}' # Add prefix if missing
|
|
else:
|
|
xpath_selector = xpath
|
|
return xpath_selector
|
|
|
|
# Fallback to CSS selector if XPath is missing
|
|
css_selector = element_data.get('css_selector')
|
|
if isinstance(css_selector, str) and css_selector.strip():
|
|
return css_selector # Use CSS selector as is
|
|
|
|
logger.warning(f"Could not find a usable XPath or CSS selector for action index {action_index_in_step} (element index {element_data.get('highlight_index', 'N/A')}).")
|
|
return None
|
|
|
|
def _get_goto_timeout(self) -> int:
|
|
"""Gets the page navigation timeout in milliseconds."""
|
|
default_timeout = 90000 # Default 90 seconds
|
|
if self.context_config and self.context_config.maximum_wait_page_load_time:
|
|
# Convert seconds to milliseconds
|
|
return int(self.context_config.maximum_wait_page_load_time * 1000)
|
|
return default_timeout
|
|
|
|
# --- Action Mapping Methods ---
|
|
def _map_go_to_url(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
url = params.get('url')
|
|
goto_timeout = self._get_goto_timeout()
|
|
script_lines = []
|
|
if url and isinstance(url, str):
|
|
escaped_url = json.dumps(url)
|
|
script_lines.append(f' print(f"Navigating to: {url} ({step_info_str})")')
|
|
script_lines.append(f" await page.goto({escaped_url}, timeout={goto_timeout})")
|
|
script_lines.append(f" await page.wait_for_load_state('load', timeout={goto_timeout})")
|
|
script_lines.append(f" await page.wait_for_timeout(1000)") # Short pause
|
|
else:
|
|
script_lines.append(f" # Skipping go_to_url ({step_info_str}): missing or invalid url")
|
|
return script_lines
|
|
|
|
def _map_wait(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
seconds = params.get('seconds', 3)
|
|
try: wait_seconds = int(seconds)
|
|
except (ValueError, TypeError): wait_seconds = 3
|
|
return [
|
|
f' print(f"Waiting for {wait_seconds} seconds... ({step_info_str})")',
|
|
f" await asyncio.sleep({wait_seconds})"
|
|
]
|
|
|
|
def _map_input_text(self, params: dict, history_item: dict, action_index_in_step: int, step_info_str: str, **kwargs) -> List[str]:
|
|
index = params.get('index')
|
|
text = params.get('text', '')
|
|
selector = self._get_selector_for_action(history_item, action_index_in_step)
|
|
script_lines = []
|
|
if selector and index is not None:
|
|
clean_text_expression = f"replace_sensitive_data({json.dumps(str(text))}, SENSITIVE_DATA)"
|
|
escaped_selector = json.dumps(selector)
|
|
escaped_step_info = json.dumps(step_info_str)
|
|
script_lines.append(f' await _try_locate_and_act(page, {escaped_selector}, "fill", text={clean_text_expression}, step_info={escaped_step_info})')
|
|
else:
|
|
script_lines.append(f" # Skipping input_text ({step_info_str}): missing index ({index}) or selector ({selector})")
|
|
return script_lines
|
|
|
|
def _map_click_element(self, params: dict, history_item: dict, action_index_in_step: int, step_info_str: str, action_type: str, **kwargs) -> List[str]:
|
|
if action_type == 'click_element_by_index':
|
|
logger.warning(f"Mapping legacy 'click_element_by_index' to 'click_element' ({step_info_str})")
|
|
index = params.get('index')
|
|
selector = self._get_selector_for_action(history_item, action_index_in_step)
|
|
script_lines = []
|
|
if selector and index is not None:
|
|
escaped_selector = json.dumps(selector)
|
|
escaped_step_info = json.dumps(step_info_str)
|
|
script_lines.append(f' await _try_locate_and_act(page, {escaped_selector}, "click", step_info={escaped_step_info})')
|
|
else:
|
|
script_lines.append(f" # Skipping {action_type} ({step_info_str}): missing index ({index}) or selector ({selector})")
|
|
return script_lines
|
|
|
|
def _map_scroll_down(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
amount = params.get('amount')
|
|
script_lines = []
|
|
if amount and isinstance(amount, int):
|
|
script_lines.append(f' print(f"Scrolling down by {amount} pixels ({step_info_str})")')
|
|
script_lines.append(f" await page.evaluate('window.scrollBy(0, {amount})')")
|
|
else:
|
|
script_lines.append(f' print(f"Scrolling down by one page height ({step_info_str})")')
|
|
script_lines.append(" await page.evaluate('window.scrollBy(0, window.innerHeight)')")
|
|
script_lines.append(f" await page.wait_for_timeout(500)")
|
|
return script_lines
|
|
|
|
def _map_scroll_up(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
amount = params.get('amount')
|
|
script_lines = []
|
|
if amount and isinstance(amount, int):
|
|
script_lines.append(f' print(f"Scrolling up by {amount} pixels ({step_info_str})")')
|
|
script_lines.append(f" await page.evaluate('window.scrollBy(0, -{amount})')")
|
|
else:
|
|
script_lines.append(f' print(f"Scrolling up by one page height ({step_info_str})")')
|
|
script_lines.append(" await page.evaluate('window.scrollBy(0, -window.innerHeight)')")
|
|
script_lines.append(f" await page.wait_for_timeout(500)")
|
|
return script_lines
|
|
|
|
def _map_send_keys(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
keys = params.get('keys')
|
|
script_lines = []
|
|
if keys and isinstance(keys, str):
|
|
escaped_keys = json.dumps(keys)
|
|
script_lines.append(f' print(f"Sending keys: {keys} ({step_info_str})")')
|
|
script_lines.append(f" await page.keyboard.press({escaped_keys})")
|
|
script_lines.append(f" await page.wait_for_timeout(500)")
|
|
else:
|
|
script_lines.append(f" # Skipping send_keys ({step_info_str}): missing or invalid keys")
|
|
return script_lines
|
|
|
|
def _map_go_back(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
goto_timeout = self._get_goto_timeout()
|
|
return [
|
|
f' await asyncio.sleep(60) # Wait 1 minute (important) before going back',
|
|
f' print(f"Navigating back using browser history ({step_info_str})")',
|
|
f" await page.go_back(timeout={goto_timeout})",
|
|
f" await page.wait_for_load_state('load', timeout={goto_timeout})",
|
|
f" await page.wait_for_timeout(1000)"
|
|
]
|
|
|
|
def _map_open_tab(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
url = params.get('url')
|
|
goto_timeout = self._get_goto_timeout()
|
|
script_lines = []
|
|
if url and isinstance(url, str):
|
|
escaped_url = json.dumps(url)
|
|
script_lines.append(f' print(f"Opening new tab and navigating to: {url} ({step_info_str})")')
|
|
script_lines.append(" page = await context.new_page()")
|
|
script_lines.append(f" await page.goto({escaped_url}, timeout={goto_timeout})")
|
|
script_lines.append(f" await page.wait_for_load_state('load', timeout={goto_timeout})")
|
|
script_lines.append(f" await page.wait_for_timeout(1000)")
|
|
self._page_counter += 1 # Increment page counter
|
|
else:
|
|
script_lines.append(f" # Skipping open_tab ({step_info_str}): missing or invalid url")
|
|
return script_lines
|
|
|
|
def _map_close_tab(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
page_id = params.get('page_id')
|
|
script_lines = []
|
|
if page_id is not None:
|
|
script_lines.extend([
|
|
f' print(f"Attempting to close tab with page_id {page_id} ({step_info_str})")',
|
|
f" if {page_id} < len(context.pages):",
|
|
f" target_page = context.pages[{page_id}]",
|
|
" await target_page.close()",
|
|
" await page.wait_for_timeout(500)",
|
|
f" if context.pages: page = context.pages[-1]", # Switch to last page
|
|
f" else:",
|
|
f" print(' Warning: No pages left after closing tab. Cannot switch.', file=sys.stderr)",
|
|
f" # Optionally, create a new page here if needed: page = await context.new_page()",
|
|
f" if page: await page.bring_to_front()", # Bring to front if page exists
|
|
f" else:",
|
|
f' print(f" Warning: Tab with page_id {page_id} not found to close ({step_info_str})", file=sys.stderr)'
|
|
])
|
|
else:
|
|
script_lines.append(f" # Skipping close_tab ({step_info_str}): missing page_id")
|
|
return script_lines
|
|
|
|
def _map_switch_tab(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
page_id = params.get('page_id')
|
|
script_lines = []
|
|
if page_id is not None:
|
|
script_lines.extend([
|
|
f' print(f"Switching to tab with page_id {page_id} ({step_info_str})")',
|
|
f" if {page_id} < len(context.pages):",
|
|
f" page = context.pages[{page_id}]",
|
|
f" await page.bring_to_front()",
|
|
f" await page.wait_for_load_state('load', timeout=15000)",
|
|
f" await page.wait_for_timeout(500)",
|
|
f" else:",
|
|
f' print(f" Warning: Tab with page_id {page_id} not found to switch ({step_info_str})", file=sys.stderr)'
|
|
])
|
|
else:
|
|
script_lines.append(f" # Skipping switch_tab ({step_info_str}): missing page_id")
|
|
return script_lines
|
|
|
|
def _map_search_google(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
query = params.get('query')
|
|
goto_timeout = self._get_goto_timeout()
|
|
script_lines = []
|
|
if query and isinstance(query, str):
|
|
clean_query = f"replace_sensitive_data({json.dumps(query)}, SENSITIVE_DATA)"
|
|
search_url_expression = f'f"https://www.google.com/search?q={{ urllib.parse.quote_plus({clean_query}) }}&udm=14"'
|
|
script_lines.extend([
|
|
f' search_url = {search_url_expression}',
|
|
f' print(f"Searching Google for query related to: {{ {clean_query} }} ({step_info_str})")',
|
|
f" await page.goto(search_url, timeout={goto_timeout})",
|
|
f" await page.wait_for_load_state('load', timeout={goto_timeout})",
|
|
f" await page.wait_for_timeout(1000)"
|
|
])
|
|
else:
|
|
script_lines.append(f" # Skipping search_google ({step_info_str}): missing or invalid query")
|
|
return script_lines
|
|
|
|
def _map_drag_drop(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
source_sel = params.get('element_source')
|
|
target_sel = params.get('element_target')
|
|
source_coords = (params.get('coord_source_x'), params.get('coord_source_y'))
|
|
target_coords = (params.get('coord_target_x'), params.get('coord_target_y'))
|
|
script_lines = [f' print(f"Attempting drag and drop ({step_info_str})")']
|
|
if source_sel and target_sel:
|
|
escaped_source = json.dumps(source_sel)
|
|
escaped_target = json.dumps(target_sel)
|
|
script_lines.append(f" await page.drag_and_drop({escaped_source}, {escaped_target})")
|
|
script_lines.append(f" print(f' Dragged element {escaped_source} to {escaped_target}')")
|
|
elif all(c is not None for c in source_coords) and all(c is not None for c in target_coords):
|
|
sx, sy = source_coords
|
|
tx, ty = target_coords
|
|
script_lines.extend([
|
|
f" await page.mouse.move({sx}, {sy})",
|
|
" await page.mouse.down()",
|
|
f" await page.mouse.move({tx}, {ty})",
|
|
" await page.mouse.up()",
|
|
f" print(f' Dragged from ({sx},{sy}) to ({tx},{ty})')"
|
|
])
|
|
else:
|
|
script_lines.append(f" # Skipping drag_drop ({step_info_str}): requires either element selectors or full coordinates")
|
|
script_lines.append(f" await page.wait_for_timeout(500)")
|
|
return script_lines
|
|
|
|
def _map_extract_content(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
goal = params.get('goal', 'content')
|
|
logger.warning(f"Action 'extract_content' ({step_info_str}) cannot be directly translated to Playwright script.")
|
|
return [f" # Action: extract_content (Goal: {goal}) - Skipped in Playwright script ({step_info_str})"]
|
|
|
|
def _map_click_download_button(self, params: dict, history_item: dict, action_index_in_step: int, step_info_str: str, **kwargs) -> List[str]:
|
|
index = params.get('index')
|
|
selector = self._get_selector_for_action(history_item, action_index_in_step)
|
|
download_dir_in_script = "'./files'" # Default
|
|
if self.context_config and self.context_config.save_downloads_path:
|
|
download_dir_in_script = repr(self.context_config.save_downloads_path)
|
|
|
|
script_lines = []
|
|
if selector and index is not None:
|
|
script_lines.append(f' print(f"Attempting to download file by clicking element ({selector}) ({step_info_str})")')
|
|
script_lines.append(" try:")
|
|
script_lines.append(f" async with page.expect_download(timeout=120000) as download_info:") # 2 min timeout
|
|
step_info_for_download = f'{step_info_str} (triggering download)'
|
|
script_lines.append(f' await _try_locate_and_act(page, {json.dumps(selector)}, "click", step_info={json.dumps(step_info_for_download)})')
|
|
script_lines.append(" download = await download_info.value")
|
|
script_lines.append(f" configured_download_dir = {download_dir_in_script}")
|
|
script_lines.append(" download_dir_path = Path(configured_download_dir).resolve()")
|
|
script_lines.append(" download_dir_path.mkdir(parents=True, exist_ok=True)")
|
|
script_lines.append(" base, ext = os.path.splitext(download.suggested_filename or f'download_{{len(list(download_dir_path.iterdir())) + 1}}.tmp')")
|
|
script_lines.append(" counter = 1")
|
|
script_lines.append(" download_path_obj = download_dir_path / f'{base}{ext}'")
|
|
script_lines.append(" while download_path_obj.exists():")
|
|
script_lines.append(" download_path_obj = download_dir_path / f'{base}({{counter}}){ext}'")
|
|
script_lines.append(" counter += 1")
|
|
script_lines.append(" await download.save_as(str(download_path_obj))")
|
|
script_lines.append(" print(f' File downloaded successfully to: {str(download_path_obj)}')")
|
|
script_lines.append(" except PlaywrightActionError as pae:")
|
|
script_lines.append(" raise pae") # Re-raise to stop script
|
|
script_lines.append(" except Exception as download_err:")
|
|
script_lines.append(f" raise PlaywrightActionError(f'Download failed for {step_info_str}: {{download_err}}') from download_err")
|
|
else:
|
|
script_lines.append(f" # Skipping click_download_button ({step_info_str}): missing index ({index}) or selector ({selector})")
|
|
return script_lines
|
|
|
|
def _map_done(self, params: dict, step_info_str: str, **kwargs) -> List[str]:
|
|
script_lines = []
|
|
if isinstance(params, dict):
|
|
final_text = params.get('text', '')
|
|
success_status = params.get('success', False)
|
|
escaped_final_text_with_placeholders = json.dumps(str(final_text))
|
|
script_lines.append(f' print("\\n--- Task marked as Done by agent ({step_info_str}) ---")')
|
|
script_lines.append(f' print(f"Agent reported success: {success_status}")')
|
|
script_lines.append(f' # Final Message from agent (may contain placeholders):')
|
|
script_lines.append(f' final_message = replace_sensitive_data({escaped_final_text_with_placeholders}, SENSITIVE_DATA)')
|
|
script_lines.append(f' print(final_message)')
|
|
else:
|
|
script_lines.append(f' print("\\n--- Task marked as Done by agent ({step_info_str}) ---")')
|
|
script_lines.append(f' print("Success: N/A (invalid params)")')
|
|
script_lines.append(f' print("Final Message: N/A (invalid params)")')
|
|
return script_lines
|
|
|
|
def _map_action_to_playwright(self, action_dict: dict, history_item: dict, previous_history_item: Optional[dict], action_index_in_step: int, step_info_str: str) -> List[str]:
|
|
"""
|
|
Translates a single action dictionary into Playwright script lines using dictionary dispatch.
|
|
"""
|
|
if not isinstance(action_dict, dict) or not action_dict:
|
|
return [f" # Invalid action format: {action_dict} ({step_info_str})"]
|
|
|
|
action_type = next(iter(action_dict.keys()), None)
|
|
params = action_dict.get(action_type)
|
|
|
|
if not action_type or params is None:
|
|
if action_dict == {}: return [f" # Empty action dictionary found ({step_info_str})"]
|
|
return [f" # Could not determine action type or params: {action_dict} ({step_info_str})"]
|
|
|
|
# Get the handler function from the dictionary
|
|
handler = self._action_handlers.get(action_type)
|
|
|
|
if handler:
|
|
# Call the specific handler method
|
|
return handler(
|
|
params=params,
|
|
history_item=history_item,
|
|
action_index_in_step=action_index_in_step,
|
|
step_info_str=step_info_str,
|
|
action_type=action_type, # Pass action_type for legacy handling etc.
|
|
previous_history_item=previous_history_item
|
|
)
|
|
else:
|
|
# Handle unsupported actions
|
|
logger.warning(f"Unsupported action type encountered: {action_type} ({step_info_str})")
|
|
return [f" # Unsupported action type: {action_type} ({step_info_str})"]
|
|
|
|
|
|
def generate_script_content(self) -> str:
|
|
"""Generates the full Playwright script content as a string."""
|
|
script_lines = []
|
|
self._page_counter = 0 # Reset page counter for new script generation
|
|
|
|
if not self._imports_helpers_added:
|
|
script_lines.extend(self._get_imports_and_helpers())
|
|
self._imports_helpers_added = True
|
|
|
|
script_lines.extend(self._get_sensitive_data_definitions())
|
|
|
|
# Generate browser launch and context creation code
|
|
browser_launch_args = self._generate_browser_launch_args()
|
|
context_options = self._generate_context_options()
|
|
# Determine browser type (defaulting to chromium)
|
|
browser_type = "chromium"
|
|
if self.browser_config and self.browser_config.browser_class in ["firefox", "webkit"]:
|
|
browser_type = self.browser_config.browser_class
|
|
|
|
script_lines.extend([
|
|
"async def run_generated_script():",
|
|
" global SENSITIVE_DATA",
|
|
" async with async_playwright() as p:",
|
|
" browser = None",
|
|
" context = None",
|
|
" page = None",
|
|
" exit_code = 0 # Default success exit code",
|
|
" try:",
|
|
f" print('Launching {browser_type} browser...')",
|
|
# Use generated launch args, remove slow_mo
|
|
f" browser = await p.{browser_type}.launch({browser_launch_args})",
|
|
# Use generated context options
|
|
f" context = await browser.new_context({context_options})",
|
|
" print('Browser context created.')",
|
|
])
|
|
|
|
# Add cookie loading logic if cookies_file is specified
|
|
if self.context_config and self.context_config.cookies_file:
|
|
cookies_file_path = repr(self.context_config.cookies_file)
|
|
script_lines.extend([
|
|
f" # Load cookies if specified",
|
|
f" cookies_path = {cookies_file_path}",
|
|
f" if cookies_path and os.path.exists(cookies_path):",
|
|
" try:",
|
|
" with open(cookies_path, 'r', encoding='utf-8') as f_cookies:",
|
|
" cookies = json.load(f_cookies)",
|
|
" # Validate sameSite attribute",
|
|
" valid_same_site = ['Strict', 'Lax', 'None']",
|
|
" for cookie in cookies:",
|
|
" if 'sameSite' in cookie and cookie['sameSite'] not in valid_same_site:",
|
|
" print(f' Warning: Fixing invalid sameSite value \"{{cookie[\"sameSite\"]}}\" to None for cookie {{cookie.get(\"name\")}}', file=sys.stderr)",
|
|
" cookie['sameSite'] = 'None'",
|
|
" await context.add_cookies(cookies)",
|
|
" print(f' Successfully loaded {{len(cookies)}} cookies from {{cookies_path}}')",
|
|
" except Exception as cookie_err:",
|
|
" print(f' Warning: Failed to load or add cookies from {{cookies_path}}: {{cookie_err}}', file=sys.stderr)",
|
|
" else:",
|
|
f" if cookies_path:", # Only print if a path was specified but not found
|
|
" print(f' Cookie file not found at: {cookies_path}')",
|
|
""
|
|
])
|
|
|
|
script_lines.extend([
|
|
" # Initial page handling",
|
|
" if context.pages:",
|
|
" page = context.pages[0]",
|
|
" print('Using initial page provided by context.')",
|
|
" else:",
|
|
" page = await context.new_page()",
|
|
" print('Created a new page as none existed.')",
|
|
" print('\\n--- Starting Generated Script Execution ---')",
|
|
])
|
|
|
|
action_counter = 0
|
|
stop_processing_steps = False
|
|
previous_item_dict = None
|
|
|
|
for step_index, item_dict in enumerate(self.history):
|
|
if stop_processing_steps: break
|
|
|
|
if not isinstance(item_dict, dict):
|
|
logger.warning(f"Skipping step {step_index + 1}: Item is not a dictionary ({type(item_dict)})")
|
|
script_lines.append(f"\n # --- Step {step_index + 1}: Skipped (Invalid Format) ---")
|
|
previous_item_dict = item_dict
|
|
continue
|
|
|
|
script_lines.append(f"\n # --- Step {step_index + 1} ---")
|
|
model_output = item_dict.get('model_output')
|
|
|
|
if not isinstance(model_output, dict) or 'action' not in model_output:
|
|
script_lines.append(" # No valid model_output or action found for this step")
|
|
previous_item_dict = item_dict
|
|
continue
|
|
|
|
actions = model_output.get('action')
|
|
if not isinstance(actions, list):
|
|
script_lines.append(f" # Actions format is not a list: {type(actions)}")
|
|
previous_item_dict = item_dict
|
|
continue
|
|
|
|
for action_index_in_step, action_detail in enumerate(actions):
|
|
action_counter += 1
|
|
script_lines.append(f" # Action {action_counter}")
|
|
|
|
step_info_str = f"Step {step_index + 1}, Action {action_index_in_step + 1}"
|
|
action_lines = self._map_action_to_playwright(
|
|
action_dict=action_detail,
|
|
history_item=item_dict,
|
|
previous_history_item=previous_item_dict,
|
|
action_index_in_step=action_index_in_step,
|
|
step_info_str=step_info_str
|
|
)
|
|
script_lines.extend(action_lines)
|
|
|
|
action_type = next(iter(action_detail.keys()), None) if isinstance(action_detail, dict) else None
|
|
if action_type == 'done':
|
|
stop_processing_steps = True
|
|
break
|
|
|
|
previous_item_dict = item_dict
|
|
|
|
# Updated final block to include sys.exit
|
|
script_lines.extend([
|
|
" except PlaywrightActionError as pae:", # Catch specific action errors
|
|
" print(f'\\n--- Playwright Action Error: {pae} ---', file=sys.stderr)",
|
|
" exit_code = 1", # Set exit code to failure
|
|
" except Exception as e:",
|
|
" print(f'\\n--- An unexpected error occurred: {e} ---', file=sys.stderr)",
|
|
" import traceback",
|
|
" traceback.print_exc()",
|
|
" exit_code = 1", # Set exit code to failure
|
|
" finally:",
|
|
" print('\\n--- Generated Script Execution Finished ---')",
|
|
" print('Closing browser/context...')",
|
|
" if context:",
|
|
" try: await context.close()",
|
|
" except Exception as ctx_close_err: print(f' Warning: could not close context: {ctx_close_err}', file=sys.stderr)",
|
|
" if browser:",
|
|
" try: await browser.close()",
|
|
" except Exception as browser_close_err: print(f' Warning: could not close browser: {browser_close_err}', file=sys.stderr)",
|
|
" print('Browser/context closed.')",
|
|
" # Exit with the determined exit code",
|
|
" if exit_code != 0:",
|
|
" print(f'Script finished with errors (exit code {exit_code}).', file=sys.stderr)",
|
|
" sys.exit(exit_code)", # Exit with non-zero code on error
|
|
"",
|
|
"# --- Script Entry Point ---",
|
|
"if __name__ == '__main__':",
|
|
" if os.name == 'nt':",
|
|
" asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())",
|
|
" asyncio.run(run_generated_script())"
|
|
])
|
|
|
|
return "\n".join(script_lines) |