mirror of
https://github.com/browser-use/browser-use
synced 2026-05-06 17:52:15 +02:00
This commit updates the Playwright integration example in `playwright_integration.py` to provide a clearer overview of its key features. The previous detailed description has been condensed into a more concise format, highlighting the main functionalities of sharing a Chrome instance via CDP and the ability to perform actions with both Playwright and Browser-Use. - **Documentation Update** - Simplified the example description for better clarity and focus on key features.
377 lines
12 KiB
Python
377 lines
12 KiB
Python
"""
|
|
Key features:
|
|
1. Browser-Use and Playwright sharing the same Chrome instance via CDP
|
|
2. Take actions with Playwright and continue with Browser-Use actions
|
|
3. Let the agent call Playwright functions like screenshot or click on selectors
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
# Check for required dependencies first - before other imports
|
|
try:
|
|
import aiohttp # type: ignore
|
|
from playwright.async_api import Browser, Page, async_playwright # type: ignore
|
|
except ImportError as e:
|
|
print(f'❌ Missing dependencies for this example: {e}')
|
|
print('This example requires: playwright aiohttp')
|
|
print('Install with: uv add playwright aiohttp')
|
|
print('Also run: playwright install chromium')
|
|
sys.exit(1)
|
|
|
|
from browser_use import Agent, BrowserSession, ChatOpenAI, Tools
|
|
from browser_use.agent.views import ActionResult
|
|
|
|
# Global Playwright browser instance - shared between custom actions
|
|
playwright_browser: Browser | None = None
|
|
playwright_page: Page | None = None
|
|
|
|
|
|
# Custom action parameter models
|
|
class PlaywrightFillFormAction(BaseModel):
|
|
"""Parameters for Playwright form filling action."""
|
|
|
|
customer_name: str = Field(..., description='Customer name to fill')
|
|
phone_number: str = Field(..., description='Phone number to fill')
|
|
email: str = Field(..., description='Email address to fill')
|
|
size_option: str = Field(..., description='Size option (small/medium/large)')
|
|
|
|
|
|
class PlaywrightScreenshotAction(BaseModel):
|
|
"""Parameters for Playwright screenshot action."""
|
|
|
|
filename: str = Field(default='playwright_screenshot.png', description='Filename for screenshot')
|
|
quality: int | None = Field(default=None, description='JPEG quality (1-100), only for .jpg/.jpeg files')
|
|
|
|
|
|
class PlaywrightGetTextAction(BaseModel):
|
|
"""Parameters for getting text using Playwright selectors."""
|
|
|
|
selector: str = Field(..., description='CSS selector to get text from. Use "title" for page title.')
|
|
|
|
|
|
async def start_chrome_with_debug_port(port: int = 9222):
|
|
"""
|
|
Start Chrome with remote debugging enabled.
|
|
Returns the Chrome process.
|
|
"""
|
|
# Create temporary directory for Chrome user data
|
|
user_data_dir = tempfile.mkdtemp(prefix='chrome_cdp_')
|
|
|
|
# Chrome launch command
|
|
chrome_paths = [
|
|
'/Applications/Google Chrome.app/Contents/MacOS/Google Chrome', # macOS
|
|
'/usr/bin/google-chrome', # Linux
|
|
'/usr/bin/chromium-browser', # Linux Chromium
|
|
'chrome', # Windows/PATH
|
|
'chromium', # Generic
|
|
]
|
|
|
|
chrome_exe = None
|
|
for path in chrome_paths:
|
|
if os.path.exists(path) or path in ['chrome', 'chromium']:
|
|
try:
|
|
# Test if executable works
|
|
test_proc = await asyncio.create_subprocess_exec(
|
|
path, '--version', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
|
|
)
|
|
await test_proc.wait()
|
|
chrome_exe = path
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if not chrome_exe:
|
|
raise RuntimeError('❌ Chrome not found. Please install Chrome or Chromium.')
|
|
|
|
# Chrome command arguments
|
|
cmd = [
|
|
chrome_exe,
|
|
f'--remote-debugging-port={port}',
|
|
f'--user-data-dir={user_data_dir}',
|
|
'--no-first-run',
|
|
'--no-default-browser-check',
|
|
'--disable-extensions',
|
|
'about:blank', # Start with blank page
|
|
]
|
|
|
|
# Start Chrome process
|
|
process = await asyncio.create_subprocess_exec(*cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
# Wait for Chrome to start and CDP to be ready
|
|
cdp_ready = False
|
|
for _ in range(20): # 20 second timeout
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
f'http://localhost:{port}/json/version', timeout=aiohttp.ClientTimeout(total=1)
|
|
) as response:
|
|
if response.status == 200:
|
|
cdp_ready = True
|
|
break
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(1)
|
|
|
|
if not cdp_ready:
|
|
process.terminate()
|
|
raise RuntimeError('❌ Chrome failed to start with CDP')
|
|
|
|
return process
|
|
|
|
|
|
async def connect_playwright_to_cdp(cdp_url: str):
|
|
"""
|
|
Connect Playwright to the same Chrome instance Browser-Use is using.
|
|
This enables custom actions to use Playwright functions.
|
|
"""
|
|
global playwright_browser, playwright_page
|
|
|
|
playwright = await async_playwright().start()
|
|
playwright_browser = await playwright.chromium.connect_over_cdp(cdp_url)
|
|
|
|
# Get or create a page
|
|
if playwright_browser and playwright_browser.contexts and playwright_browser.contexts[0].pages:
|
|
playwright_page = playwright_browser.contexts[0].pages[0]
|
|
elif playwright_browser:
|
|
context = await playwright_browser.new_context()
|
|
playwright_page = await context.new_page()
|
|
|
|
|
|
# Create custom tools that use Playwright functions
|
|
tools = Tools()
|
|
|
|
|
|
@tools.registry.action(
|
|
"Fill out a form using Playwright's precise form filling capabilities. This uses Playwright selectors for reliable form interaction.",
|
|
param_model=PlaywrightFillFormAction,
|
|
)
|
|
async def playwright_fill_form(params: PlaywrightFillFormAction, browser_session: BrowserSession):
|
|
"""
|
|
Custom action that uses Playwright to fill forms with high precision.
|
|
This demonstrates how to create Browser-Use actions that leverage Playwright's capabilities.
|
|
"""
|
|
try:
|
|
if not playwright_page:
|
|
return ActionResult(error='Playwright not connected. Run setup first.')
|
|
|
|
# Filling form with Playwright's precise selectors
|
|
|
|
# Wait for form to be ready and fill basic fields
|
|
await playwright_page.wait_for_selector('input[name="custname"]', timeout=10000)
|
|
await playwright_page.fill('input[name="custname"]', params.customer_name)
|
|
await playwright_page.fill('input[name="custtel"]', params.phone_number)
|
|
await playwright_page.fill('input[name="custemail"]', params.email)
|
|
|
|
# Handle size selection - check if it's a select dropdown or radio buttons
|
|
size_select = playwright_page.locator('select[name="size"]')
|
|
size_radio = playwright_page.locator(f'input[name="size"][value="{params.size_option}"]')
|
|
|
|
if await size_select.count() > 0:
|
|
# It's a select dropdown
|
|
await playwright_page.select_option('select[name="size"]', params.size_option)
|
|
elif await size_radio.count() > 0:
|
|
# It's radio buttons
|
|
await playwright_page.check(f'input[name="size"][value="{params.size_option}"]')
|
|
else:
|
|
raise ValueError(f'Could not find size input field for value: {params.size_option}')
|
|
|
|
# Get form data to verify it was filled
|
|
form_data = {}
|
|
form_data['name'] = await playwright_page.input_value('input[name="custname"]')
|
|
form_data['phone'] = await playwright_page.input_value('input[name="custtel"]')
|
|
form_data['email'] = await playwright_page.input_value('input[name="custemail"]')
|
|
|
|
# Get size value based on input type
|
|
if await size_select.count() > 0:
|
|
form_data['size'] = await playwright_page.input_value('select[name="size"]')
|
|
else:
|
|
# For radio buttons, find the checked one
|
|
checked_radio = playwright_page.locator('input[name="size"]:checked')
|
|
if await checked_radio.count() > 0:
|
|
form_data['size'] = await checked_radio.get_attribute('value')
|
|
else:
|
|
form_data['size'] = 'none selected'
|
|
|
|
success_msg = f'✅ Form filled successfully with Playwright: {form_data}'
|
|
|
|
return ActionResult(
|
|
extracted_content=success_msg, include_in_memory=True, long_term_memory=f'Filled form with: {form_data}'
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = f'❌ Playwright form filling failed: {str(e)}'
|
|
return ActionResult(error=error_msg)
|
|
|
|
|
|
@tools.registry.action(
|
|
"Take a screenshot using Playwright's screenshot capabilities with high quality and precision.",
|
|
param_model=PlaywrightScreenshotAction,
|
|
)
|
|
async def playwright_screenshot(params: PlaywrightScreenshotAction, browser_session: BrowserSession):
|
|
"""
|
|
Custom action that uses Playwright's advanced screenshot features.
|
|
"""
|
|
try:
|
|
if not playwright_page:
|
|
return ActionResult(error='Playwright not connected. Run setup first.')
|
|
|
|
# Taking screenshot with Playwright
|
|
|
|
# Use Playwright's screenshot with full page capture
|
|
screenshot_kwargs = {'path': params.filename, 'full_page': True}
|
|
|
|
# Add quality parameter only for JPEG files
|
|
if params.quality is not None and params.filename.lower().endswith(('.jpg', '.jpeg')):
|
|
screenshot_kwargs['quality'] = params.quality
|
|
|
|
await playwright_page.screenshot(**screenshot_kwargs)
|
|
|
|
success_msg = f'✅ Screenshot saved as {params.filename} using Playwright'
|
|
|
|
return ActionResult(
|
|
extracted_content=success_msg, include_in_memory=True, long_term_memory=f'Screenshot saved: {params.filename}'
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = f'❌ Playwright screenshot failed: {str(e)}'
|
|
return ActionResult(error=error_msg)
|
|
|
|
|
|
@tools.registry.action(
|
|
"Extract text from elements using Playwright's powerful CSS selectors and XPath support.", param_model=PlaywrightGetTextAction
|
|
)
|
|
async def playwright_get_text(params: PlaywrightGetTextAction, browser_session: BrowserSession):
|
|
"""
|
|
Custom action that uses Playwright's advanced text extraction with CSS selectors and XPath.
|
|
"""
|
|
try:
|
|
if not playwright_page:
|
|
return ActionResult(error='Playwright not connected. Run setup first.')
|
|
|
|
# Extracting text with Playwright selectors
|
|
|
|
# Handle special selectors
|
|
if params.selector.lower() == 'title':
|
|
# Use page.title() for title element
|
|
text_content = await playwright_page.title()
|
|
result_data = {
|
|
'selector': 'title',
|
|
'text_content': text_content,
|
|
'inner_text': text_content,
|
|
'tag_name': 'TITLE',
|
|
'is_visible': True,
|
|
}
|
|
else:
|
|
# Use Playwright's robust element selection and text extraction
|
|
element = playwright_page.locator(params.selector).first
|
|
|
|
if await element.count() == 0:
|
|
error_msg = f'❌ No element found with selector: {params.selector}'
|
|
return ActionResult(error=error_msg)
|
|
|
|
text_content = await element.text_content()
|
|
inner_text = await element.inner_text()
|
|
|
|
# Get additional element info
|
|
tag_name = await element.evaluate('el => el.tagName')
|
|
is_visible = await element.is_visible()
|
|
|
|
result_data = {
|
|
'selector': params.selector,
|
|
'text_content': text_content,
|
|
'inner_text': inner_text,
|
|
'tag_name': tag_name,
|
|
'is_visible': is_visible,
|
|
}
|
|
|
|
success_msg = f'✅ Extracted text using Playwright: {result_data}'
|
|
|
|
return ActionResult(
|
|
extracted_content=str(result_data),
|
|
include_in_memory=True,
|
|
long_term_memory=f'Extracted from {params.selector}: {result_data["text_content"]}',
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = f'❌ Playwright text extraction failed: {str(e)}'
|
|
return ActionResult(error=error_msg)
|
|
|
|
|
|
async def main():
|
|
"""
|
|
Main function demonstrating Browser-Use + Playwright integration with custom actions.
|
|
"""
|
|
print('🚀 Advanced Playwright + Browser-Use Integration with Custom Actions')
|
|
|
|
chrome_process = None
|
|
try:
|
|
# Step 1: Start Chrome with CDP debugging
|
|
chrome_process = await start_chrome_with_debug_port()
|
|
cdp_url = 'http://localhost:9222'
|
|
|
|
# Step 2: Connect Playwright to the same Chrome instance
|
|
await connect_playwright_to_cdp(cdp_url)
|
|
|
|
# Step 3: Create Browser-Use session connected to same Chrome
|
|
browser_session = BrowserSession(cdp_url=cdp_url)
|
|
|
|
# Step 4: Create AI agent with our custom Playwright-powered tools
|
|
agent = Agent(
|
|
task="""
|
|
Please help me demonstrate the integration between Browser-Use and Playwright:
|
|
|
|
1. First, navigate to https://httpbin.org/forms/post
|
|
2. Use the 'playwright_fill_form' action to fill the form with these details:
|
|
- Customer name: "Alice Johnson"
|
|
- Phone: "555-9876"
|
|
- Email: "alice@demo.com"
|
|
- Size: "large"
|
|
3. Take a screenshot using the 'playwright_screenshot' action and save it as "form_demo.png"
|
|
4. Extract the title of the page using 'playwright_get_text' action with selector "title"
|
|
5. Finally, submit the form and tell me what happened
|
|
|
|
This demonstrates how Browser-Use AI can orchestrate tasks while using Playwright's precise capabilities for specific operations.
|
|
""",
|
|
llm=ChatOpenAI(model='gpt-4.1-mini'),
|
|
tools=tools, # Our custom tools with Playwright actions
|
|
browser_session=browser_session,
|
|
)
|
|
|
|
print('🎯 Starting AI agent with custom Playwright actions...')
|
|
|
|
# Step 5: Run the agent - it will use both Browser-Use actions and our custom Playwright actions
|
|
result = await agent.run()
|
|
|
|
# Keep browser open briefly to see results
|
|
print(f'✅ Integration demo completed! Result: {result}')
|
|
await asyncio.sleep(2) # Brief pause to see results
|
|
|
|
except Exception as e:
|
|
print(f'❌ Error: {e}')
|
|
raise
|
|
|
|
finally:
|
|
# Clean up resources
|
|
if playwright_browser:
|
|
await playwright_browser.close()
|
|
|
|
if chrome_process:
|
|
chrome_process.terminate()
|
|
try:
|
|
await asyncio.wait_for(chrome_process.wait(), 5)
|
|
except TimeoutError:
|
|
chrome_process.kill()
|
|
|
|
print('✅ Cleanup complete')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Run the advanced integration demo
|
|
asyncio.run(main())
|