import asyncio import tempfile from pathlib import Path import pytest from pytest_httpserver import HTTPServer from browser_use.agent.views import ActionResult from browser_use.browser import BrowserSession from browser_use.browser.profile import BrowserProfile from browser_use.tools.service import Tools from browser_use.tools.views import ( ClickElementAction, GoToUrlAction, UploadFileAction, ) @pytest.fixture(scope='session') def http_server(): """Create and provide a test HTTP server that serves static content.""" server = HTTPServer() server.start() # Add routes for common test pages server.expect_request('/').respond_with_data( 'Test Home Page

Test Home Page

Welcome to the test site

', content_type='text/html', ) server.expect_request('/page1').respond_with_data( 'Test Page 1

Test Page 1

This is test page 1

', content_type='text/html', ) server.expect_request('/page2').respond_with_data( 'Test Page 2

Test Page 2

This is test page 2

', content_type='text/html', ) yield server server.stop() @pytest.fixture(scope='session') def base_url(http_server): """Return the base URL for the test HTTP server.""" return f'http://{http_server.host}:{http_server.port}' @pytest.fixture(scope='module') async def browser_session(): """Create and provide a Browser instance with security disabled.""" browser_session = BrowserSession( browser_profile=BrowserProfile( headless=True, user_data_dir=None, keep_alive=True, ) ) await browser_session.start() yield browser_session await browser_session.kill() @pytest.fixture(scope='function') def tools(): """Create and provide a Tools instance.""" return Tools() class TestClickElementEvent: """Test cases for ClickElementEvent and click_element_by_index action.""" async def test_error_handling(self, tools, browser_session): """Test error handling when an action fails.""" # Create an action with an invalid index invalid_action = {'click_element_by_index': ClickElementAction(index=999)} # doesn't exist on page from browser_use.agent.views import ActionModel class ClickActionModel(ActionModel): click_element_by_index: ClickElementAction | None = None # This should fail since the element doesn't exist result: ActionResult = await tools.act(ClickActionModel(**invalid_action), browser_session) assert result.error is not None async def test_click_element_by_index(self, tools, browser_session, base_url, http_server): """Test that click_element_by_index correctly clicks an element and handles different outcomes.""" # Add route for clickable elements test page http_server.expect_request('/clickable').respond_with_data( """ Click Test

Click Test

Button 1
Button 2
Link 1
""", content_type='text/html', ) # Navigate to the clickable elements test page goto_action = {'go_to_url': GoToUrlAction(url=f'{base_url}/clickable', new_tab=False)} from browser_use.agent.views import ActionModel class GoToUrlActionModel(ActionModel): go_to_url: GoToUrlAction | None = None await tools.act(GoToUrlActionModel(**goto_action), browser_session) # Wait for the page to load await asyncio.sleep(0.5) # Give page time to load # Initialize the DOM state to populate the selector map await browser_session.get_browser_state_summary(cache_clickable_elements_hashes=True) # Get the selector map selector_map = await browser_session.get_selector_map() # Find a clickable element in the selector map button_index = None button_text = None for idx, element in selector_map.items(): # Look for the first div with class "clickable" if element.tag_name.lower() == 'div' and 'clickable' in str(element.attributes.get('class', '')): button_index = idx button_text = element.get_all_children_text(max_depth=2).strip() break # Verify we found a clickable element assert button_index is not None, ( f'Could not find clickable element in selector map. Available elements: {[f"{idx}: {element.tag_name}" for idx, element in selector_map.items()]}' ) # Define expected test data expected_button_text = 'Button 1' expected_result_text = 'Button 1 clicked' # Verify the button text matches what we expect assert button_text is not None and expected_button_text in button_text, ( f"Expected button text '{expected_button_text}' not found in '{button_text}'" ) # Create a model for the click_element_by_index action class ClickElementActionModel(ActionModel): click_element_by_index: ClickElementAction | None = None # Execute the action with the button index result = await tools.act( ClickElementActionModel(click_element_by_index=ClickElementAction(index=button_index)), browser_session ) # Verify the result structure assert isinstance(result, ActionResult), 'Result should be an ActionResult instance' assert result.error is None, f'Expected no error but got: {result.error}' result_text = result.extracted_content or result.long_term_memory # Core logic validation: Verify click was successful assert result_text is not None assert f'Clicked element with index {button_index}' in result_text, ( f'Expected click confirmation in result content, got: {result_text}' ) # Note: The click action doesn't include button text in the result, only the index # Verify the click actually had an effect on the page using CDP cdp_session = await browser_session.get_or_create_cdp_session() result_js = await browser_session.cdp_client.send.Runtime.evaluate( params={'expression': "document.getElementById('result').textContent", 'returnByValue': True}, session_id=cdp_session.session_id, ) result_text = result_js.get('result', {}).get('value', '') assert result_text == expected_result_text, f"Expected result text '{expected_result_text}', got '{result_text}'" async def test_click_element_new_tab(self, tools, browser_session, base_url, http_server): """Test that click_element_by_index with while_holding_ctrl=True opens links in new tabs.""" # Add route for new tab test page http_server.expect_request('/newTab').respond_with_data( """ New Tab Test

New Tab Test

Open Page 1 """, content_type='text/html', ) # Navigate to the new tab test page goto_action = {'go_to_url': GoToUrlAction(url=f'{base_url}/newTab', new_tab=False)} from browser_use.agent.views import ActionModel class GoToUrlActionModel(ActionModel): go_to_url: GoToUrlAction | None = None await tools.act(GoToUrlActionModel(**goto_action), browser_session) await asyncio.sleep(1) # Wait for page to load # Count initial tabs tabs = await browser_session.get_tabs() initial_tab_count = len(tabs) # Get the link element (assuming it will be at index 0) # First get the browser state to see what elements are available state = await browser_session.get_browser_state_summary() # Find the link element in the selector map link_index = None for index, element in state.dom_state.selector_map.items(): if hasattr(element, 'tag_name') and element.tag_name == 'a': link_index = index break assert link_index is not None, 'Could not find link element' # Click the link with while_holding_ctrl=True click_action = {'click_element_by_index': ClickElementAction(index=link_index, while_holding_ctrl=True)} class ClickActionModel(ActionModel): click_element_by_index: ClickElementAction | None = None result = await tools.act(ClickActionModel(**click_action), browser_session) await asyncio.sleep(1) # Wait for new tab to open # Verify the result assert isinstance(result, ActionResult) result_text = result.extracted_content or result.long_term_memory assert result_text is not None assert f'Clicked element with index {link_index}' in result_text, ( f'Expected click confirmation in result content, got: {result_text}' ) # Verify that a new tab was opened tabs = await browser_session.get_tabs() final_tab_count = len(tabs) assert final_tab_count == initial_tab_count + 1, f'Expected {initial_tab_count + 1} tabs, got {final_tab_count}' # Verify we're still on the original tab (not switched) - matches browser Cmd/Ctrl+click behavior current_url = await browser_session.get_current_page_url() assert f'{base_url}/newTab' in current_url, f'Should still be on original tab, but got {current_url}' # Wait for the new tab to finish navigating to the target URL # New tabs initially open at the current URL then navigate to the target max_wait = 5 for _ in range(max_wait): await asyncio.sleep(0.5) tabs = await browser_session.get_tabs() new_tab = tabs[-1] # Last tab is the newly opened one if f'{base_url}/page1' in new_tab.url: break # Verify the new tab has the correct URL (may be page1 or newTab depending on navigation timing) assert f'{base_url}/page1' in new_tab.url or f'{base_url}/newTab' in new_tab.url, ( f'New tab should have page1 or newTab URL, but got {new_tab.url}' ) @pytest.mark.skip(reason='Tab count assertion failures - tab management logic changed') async def test_click_element_normal_vs_new_tab(self, tools, browser_session, base_url, http_server): """Test that click_element_by_index behaves differently with while_holding_ctrl=False vs while_holding_ctrl=True.""" # Add route for comparison test page http_server.expect_request('/comparison').respond_with_data( """ Comparison Test

Comparison Test

Normal Link New Tab Link """, content_type='text/html', ) # Navigate to the comparison test page goto_action = {'go_to_url': GoToUrlAction(url=f'{base_url}/comparison', new_tab=False)} from browser_use.agent.views import ActionModel class GoToUrlActionModel(ActionModel): go_to_url: GoToUrlAction | None = None await tools.act(GoToUrlActionModel(**goto_action), browser_session) await asyncio.sleep(1) tabs = await browser_session.get_tabs() initial_tab_count = len(tabs) # Get browser state and find link elements state = await browser_session.get_browser_state_summary() link_indices = [] for index, element in state.dom_state.selector_map.items(): if hasattr(element, 'tag_name') and element.tag_name == 'a': link_indices.append(index) assert len(link_indices) >= 2, 'Need at least 2 links for comparison test' # Test normal click (while_holding_ctrl=False) - should navigate in current tab click_action_normal = {'click_element_by_index': ClickElementAction(index=link_indices[0], while_holding_ctrl=False)} class ClickActionModel(ActionModel): click_element_by_index: ClickElementAction | None = None result = await tools.act(ClickActionModel(**click_action_normal), browser_session) await asyncio.sleep(1) # Should still have same number of tabs tabs = await browser_session.get_tabs() assert len(tabs) == initial_tab_count # Navigate back to comparison page for second test await tools.act(GoToUrlActionModel(**goto_action), browser_session) await asyncio.sleep(1) # Test new tab click (while_holding_ctrl=True) - should open in new background tab click_action_new_tab = {'click_element_by_index': ClickElementAction(index=link_indices[1], while_holding_ctrl=True)} result = await tools.act(ClickActionModel(**click_action_new_tab), browser_session) await asyncio.sleep(1) # Should have one more tab tabs = await browser_session.get_tabs() assert len(tabs) == initial_tab_count + 1 async def test_inline_element_mostly_offscreen(self, tools, browser_session, base_url, http_server): """Test clicking an inline element that's mostly outside the viewport.""" # Add route for test page with inline element extending beyond viewport http_server.expect_request('/inline_offscreen').respond_with_data( """ Inline Offscreen Test
This is a very long inline element that extends way beyond the viewport edge
Not clicked
""", content_type='text/html', ) # Navigate to the page goto_action = {'go_to_url': GoToUrlAction(url=f'{base_url}/inline_offscreen', new_tab=False)} from browser_use.agent.views import ActionModel class GoToUrlActionModel(ActionModel): go_to_url: GoToUrlAction | None = None await tools.act(GoToUrlActionModel(**goto_action), browser_session) await asyncio.sleep(0.5) # Get the clickable elements await browser_session.get_browser_state_summary(cache_clickable_elements_hashes=True) selector_map = await browser_session.get_selector_map() # Find the inline element inline_index = None for idx, element in selector_map.items(): if 'inline-link' in str(element.attributes.get('class', '')): inline_index = idx break assert inline_index is not None, 'Could not find inline element' # Click the element - should click the visible portion class ClickActionModel(ActionModel): click_element_by_index: ClickElementAction | None = None result = await tools.act(ClickActionModel(click_element_by_index=ClickElementAction(index=inline_index)), browser_session) assert result.error is None, f'Click failed: {result.error}' # Verify click worked using CDP cdp_session = await browser_session.get_or_create_cdp_session() result_js = await browser_session.cdp_client.send.Runtime.evaluate( params={'expression': "document.getElementById('result').textContent", 'returnByValue': True}, session_id=cdp_session.session_id, ) assert result_js.get('result', {}).get('value') == 'Inline clicked' async def test_block_inside_inline_multiline(self, tools, browser_session, base_url, http_server): """Test clicking a block element inside an inline element that spans multiple lines.""" # Add route for complex nested layout http_server.expect_request('/block_in_inline').respond_with_data( """ Block in Inline Test This is some text that wraps around and contains
Click this block element
and continues after the block element with more text that will wrap to multiple lines
Not clicked
""", content_type='text/html', ) # Navigate to the page goto_action = {'go_to_url': GoToUrlAction(url=f'{base_url}/block_in_inline', new_tab=False)} from browser_use.agent.views import ActionModel class GoToUrlActionModel(ActionModel): go_to_url: GoToUrlAction | None = None await tools.act(GoToUrlActionModel(**goto_action), browser_session) await asyncio.sleep(0.5) # Get the clickable elements await browser_session.get_browser_state_summary(cache_clickable_elements_hashes=True) selector_map = await browser_session.get_selector_map() # Find the block element inside inline block_index = None for idx, element in selector_map.items(): if 'block-inside' in str(element.attributes.get('class', '')): block_index = idx break assert block_index is not None, 'Could not find block element' # Click the block element class ClickActionModel(ActionModel): click_element_by_index: ClickElementAction | None = None result = await tools.act(ClickActionModel(click_element_by_index=ClickElementAction(index=block_index)), browser_session) assert result.error is None, f'Click failed: {result.error}' # Verify click worked cdp_session = await browser_session.get_or_create_cdp_session() result_js = await browser_session.cdp_client.send.Runtime.evaluate( params={'expression': "document.getElementById('result').textContent", 'returnByValue': True}, session_id=cdp_session.session_id, ) assert result_js.get('result', {}).get('value') == 'Block clicked' async def test_element_covered_by_overlay(self, tools, browser_session, base_url, http_server): """Test clicking an element that's mostly covered by another element.""" # Add route for overlapping elements http_server.expect_request('/covered_element').respond_with_data( """ Covered Element Test
Click me (partially covered)
Overlaying element
Not clicked
""", content_type='text/html', ) # Navigate to the page goto_action = {'go_to_url': GoToUrlAction(url=f'{base_url}/covered_element', new_tab=False)} from browser_use.agent.views import ActionModel class GoToUrlActionModel(ActionModel): go_to_url: GoToUrlAction | None = None await tools.act(GoToUrlActionModel(**goto_action), browser_session) await asyncio.sleep(0.5) # Get the clickable elements await browser_session.get_browser_state_summary(cache_clickable_elements_hashes=True) selector_map = await browser_session.get_selector_map() # Find the target element target_index = None for idx, element in selector_map.items(): if 'target' in str(element.attributes.get('class', '')): target_index = idx break assert target_index is not None, 'Could not find target element' # Click should still work on the visible portion class ClickActionModel(ActionModel): click_element_by_index: ClickElementAction | None = None result = await tools.act(ClickActionModel(click_element_by_index=ClickElementAction(index=target_index)), browser_session) assert result.error is None, f'Click failed: {result.error}' # Verify click worked cdp_session = await browser_session.get_or_create_cdp_session() result_js = await browser_session.cdp_client.send.Runtime.evaluate( params={'expression': "document.getElementById('result').textContent", 'returnByValue': True}, session_id=cdp_session.session_id, ) assert result_js.get('result', {}).get('value') == 'Target clicked' async def test_file_input_click_prevention(self, tools, browser_session, base_url, http_server): """Test that clicking a file input element raises an exception.""" # Add route with file input http_server.expect_request('/file_input').respond_with_data( """ File Input Test

File Upload Test

No file selected
""", content_type='text/html', ) # Navigate to the page goto_action = {'go_to_url': GoToUrlAction(url=f'{base_url}/file_input', new_tab=False)} from browser_use.agent.views import ActionModel class GoToUrlActionModel(ActionModel): go_to_url: GoToUrlAction | None = None await tools.act(GoToUrlActionModel(**goto_action), browser_session) await asyncio.sleep(0.5) # Get the clickable elements await browser_session.get_browser_state_summary(cache_clickable_elements_hashes=True) selector_map = await browser_session.get_selector_map() # Find the file input file_input_index = None for idx, element in selector_map.items(): if element.tag_name and element.tag_name.lower() == 'input': if element.attributes and element.attributes.get('type') == 'file': file_input_index = idx break assert file_input_index is not None, 'Could not find file input element' # Attempt to click should raise an exception class ClickActionModel(ActionModel): click_element_by_index: ClickElementAction | None = None result = await tools.act( ClickActionModel(click_element_by_index=ClickElementAction(index=file_input_index)), browser_session ) # Should have an error about file inputs assert result.error is not None, 'Expected error for file input click' assert 'file input' in result.error.lower() or 'file upload' in result.error.lower(), ( f'Error message should mention file input, got: {result.error}' ) async def test_select_dropdown_click_prevention(self, tools, browser_session, base_url, http_server): """Test that clicking a select dropdown element raises an exception.""" # Add route with select dropdown http_server.expect_request('/select_dropdown').respond_with_data( """ Select Dropdown Test

Select Test

Nothing selected
""", content_type='text/html', ) # Navigate to the page goto_action = {'go_to_url': GoToUrlAction(url=f'{base_url}/select_dropdown', new_tab=False)} from browser_use.agent.views import ActionModel class GoToUrlActionModel(ActionModel): go_to_url: GoToUrlAction | None = None await tools.act(GoToUrlActionModel(**goto_action), browser_session) await asyncio.sleep(0.5) # Get the clickable elements await browser_session.get_browser_state_summary(cache_clickable_elements_hashes=True) selector_map = await browser_session.get_selector_map() # Find the select element select_index = None for idx, element in selector_map.items(): if element.tag_name and element.tag_name.lower() == 'select': select_index = idx break assert select_index is not None, 'Could not find select element' # Attempt to click should raise an exception class ClickActionModel(ActionModel): click_element_by_index: ClickElementAction | None = None result = await tools.act(ClickActionModel(click_element_by_index=ClickElementAction(index=select_index)), browser_session) # Should automatically provide dropdown options instead of an error assert result.error is None, 'Should not have error - should provide dropdown options automatically' assert result.extracted_content is not None, 'Should have dropdown options content' assert 'dropdown' in result.extracted_content.lower(), f'Should contain dropdown options, got: {result.extracted_content}' @pytest.mark.skip(reason='Dialog system validation bug - DialogOpenedEvent.frame_id expects string but gets None') async def test_click_triggers_alert_popup(self, browser_session, base_url, http_server): """Test that clicking a button triggers an alert dialog that is auto-accepted.""" from browser_use.browser.events import BrowserStateRequestEvent, ClickElementEvent, DialogOpenedEvent, NavigateToUrlEvent # Add route with alert dialog http_server.expect_request('/alert_test').respond_with_data( """ Alert Test

Alert Dialog Test

No popup shown
""", content_type='text/html', ) # Navigate to the alert test page using events nav_event = browser_session.event_bus.dispatch(NavigateToUrlEvent(url=f'{base_url}/alert_test')) await nav_event await asyncio.sleep(0.5) # Get the browser state to find clickable elements state_event = browser_session.event_bus.dispatch(BrowserStateRequestEvent()) browser_state = await state_event.event_result(raise_if_none=True, raise_if_any=True) # Find the alert button alert_button = None for element in browser_state.dom_state.selector_map.values(): if element.attributes and element.attributes.get('id') == 'alertButton': alert_button = element break assert alert_button is not None, 'Could not find alert button' # Expect the DialogOpenedEvent dialog_event_future = browser_session.event_bus.expect(DialogOpenedEvent) # Click the alert button using ClickElementEvent click_event = browser_session.event_bus.dispatch(ClickElementEvent(node=alert_button)) await click_event # Wait for and verify DialogOpenedEvent was dispatched dialog_event = await asyncio.wait_for(dialog_event_future, timeout=2.0) assert dialog_event.dialog_type == 'alert' assert 'This is an alert!' in dialog_event.message # Verify the page updated after alert was accepted cdp_session = await browser_session.get_or_create_cdp_session() result_js = await cdp_session.cdp_client.send.Runtime.evaluate( params={'expression': "document.getElementById('result').textContent", 'returnByValue': True}, session_id=cdp_session.session_id, ) assert result_js.get('result', {}).get('value') == 'Alert shown' @pytest.mark.skip(reason='Dialog system validation bug - DialogOpenedEvent.frame_id expects string but gets None') async def test_click_triggers_confirm_popup(self, browser_session, base_url, http_server): """Test that clicking a button triggers a confirm dialog that is auto-accepted.""" from browser_use.browser.events import BrowserStateRequestEvent, ClickElementEvent, DialogOpenedEvent, NavigateToUrlEvent # Add route with confirm dialog http_server.expect_request('/confirm_test').respond_with_data( """ Confirm Test

Confirm Dialog Test

No popup shown
""", content_type='text/html', ) # Navigate to the confirm test page nav_event = browser_session.event_bus.dispatch(NavigateToUrlEvent(url=f'{base_url}/confirm_test')) await nav_event await asyncio.sleep(0.5) # Get the browser state state_event = browser_session.event_bus.dispatch(BrowserStateRequestEvent()) browser_state = await state_event.event_result(raise_if_none=True, raise_if_any=True) # Find the confirm button confirm_button = None for element in browser_state.dom_state.selector_map.values(): if element.attributes and element.attributes.get('id') == 'confirmButton': confirm_button = element break assert confirm_button is not None, 'Could not find confirm button' # Expect the DialogOpenedEvent dialog_event_future = browser_session.event_bus.expect(DialogOpenedEvent) # Click the confirm button click_event = browser_session.event_bus.dispatch(ClickElementEvent(node=confirm_button)) await click_event # Wait for and verify DialogOpenedEvent was dispatched dialog_event = await asyncio.wait_for(dialog_event_future, timeout=2.0) assert dialog_event.dialog_type == 'confirm' assert 'Are you sure?' in dialog_event.message # Verify the page updated after confirm was accepted (auto-accepts with True) cdp_session = await browser_session.get_or_create_cdp_session() result_js = await cdp_session.cdp_client.send.Runtime.evaluate( params={'expression': "document.getElementById('result').textContent", 'returnByValue': True}, session_id=cdp_session.session_id, ) assert result_js.get('result', {}).get('value') == 'Confirmed' @pytest.mark.skip(reason='Dialog system validation bug - DialogOpenedEvent.frame_id expects string but gets None') async def test_page_usable_after_popup_confirm(self, browser_session, base_url, http_server): """Test that the page remains usable after handling confirm dialogs.""" from browser_use.browser.events import BrowserStateRequestEvent, ClickElementEvent, DialogOpenedEvent, NavigateToUrlEvent # Add route with confirm dialog and navigation http_server.expect_request('/popup_nav_test').respond_with_data( """ Popup Navigation Test

Popup and Navigation Test

Navigate to Page 1
No popup shown
""", content_type='text/html', ) # Navigate to the test page nav_event = browser_session.event_bus.dispatch(NavigateToUrlEvent(url=f'{base_url}/popup_nav_test')) await nav_event await asyncio.sleep(0.5) # Get browser state state_event = browser_session.event_bus.dispatch(BrowserStateRequestEvent()) browser_state = await state_event # Find and click the confirm button confirm_button = None for element in browser_state.dom_state.selector_map.values(): if element.attributes and element.attributes.get('id') == 'confirmButton': confirm_button = element break assert confirm_button is not None, 'Could not find confirm button' # Expect dialog event dialog_event_future = browser_session.event_bus.expect(DialogOpenedEvent) # Click confirm button click_event = browser_session.event_bus.dispatch(ClickElementEvent(node=confirm_button)) await click_event # Wait for dialog event dialog_event = await asyncio.wait_for(dialog_event_future, timeout=2.0) assert dialog_event.dialog_type == 'confirm' # Verify page was updated cdp_session = await browser_session.get_or_create_cdp_session() result_js = await cdp_session.cdp_client.send.Runtime.evaluate( params={'expression': "document.getElementById('result').textContent", 'returnByValue': True}, session_id=cdp_session.session_id, ) assert result_js.get('result', {}).get('value') == 'Ready to navigate' # Refresh browser state after handling dialog browser_state = await browser_session.get_browser_state_summary() # Find and click navigation link to verify page is still usable nav_link = None for element in browser_state.dom_state.selector_map.values(): if element.attributes and element.attributes.get('id') == 'navLink': nav_link = element break assert nav_link is not None, 'Could not find navigation link' # Click the navigation link click_event = browser_session.event_bus.dispatch(ClickElementEvent(node=nav_link)) await click_event await asyncio.sleep(1) # Verify navigation succeeded current_url = await browser_session.get_current_page_url() assert f'{base_url}/page1' in current_url, f'Navigation failed, current URL: {current_url}' # Verify browser is still responsive current_title = await browser_session.get_current_page_title() assert 'Test Page 1' in current_title, f'Page title incorrect: {current_title}' @pytest.mark.skip(reason='Dialog system validation bug - DialogOpenedEvent.frame_id expects string but gets None') async def test_click_triggers_onbeforeunload_popup(self, browser_session, base_url, http_server): """Test that navigating away from a page with onbeforeunload triggers a dialog.""" from browser_use.browser.events import ClickElementEvent, DialogOpenedEvent, NavigateToUrlEvent # Add route with onbeforeunload handler http_server.expect_request('/beforeunload_test').respond_with_data( """ BeforeUnload Test

BeforeUnload Test

This page has unsaved changes.

Navigate Away
Page loaded
""", content_type='text/html', ) # Navigate to the beforeunload test page nav_event = browser_session.event_bus.dispatch(NavigateToUrlEvent(url=f'{base_url}/beforeunload_test')) await nav_event await asyncio.sleep(0.5) # Get browser state browser_state = await browser_session.get_browser_state_summary() # Find the navigation link nav_link = None for element in browser_state.dom_state.selector_map.values(): if element.attributes and element.attributes.get('id') == 'navLink': nav_link = element break assert nav_link is not None, 'Could not find navigation link' # Expect the DialogOpenedEvent for beforeunload dialog_event_future = browser_session.event_bus.expect(DialogOpenedEvent) # Click the navigation link - should trigger beforeunload popup click_event = browser_session.event_bus.dispatch(ClickElementEvent(node=nav_link)) await click_event # Wait for and verify DialogOpenedEvent was dispatched dialog_event = await asyncio.wait_for(dialog_event_future, timeout=2.0) assert dialog_event.dialog_type == 'beforeunload' # Note: beforeunload messages are often browser-controlled and may not match our custom message # Wait a bit for navigation to complete after dialog is auto-accepted await asyncio.sleep(1) # Verify navigation succeeded after beforeunload was accepted current_url = await browser_session.get_current_page_url() assert f'{base_url}/page1' in current_url, ( f'Navigation should have succeeded after beforeunload was accepted, current URL: {current_url}' ) async def test_file_upload_click_and_verify(self, tools, browser_session, base_url, http_server): """Test that clicking a file upload element and uploading a file works correctly.""" # Create a temporary test file import tempfile as temp_module with temp_module.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as temp_file: temp_file.write('Test file content for upload') temp_file_path = temp_file.name try: # Add route for file upload test page http_server.expect_request('/fileupload').respond_with_data( """ File Upload Test

File Upload Test

Click the button below to select a file:

No file selected

""", content_type='text/html', ) # Navigate to the file upload test page goto_action = {'go_to_url': GoToUrlAction(url=f'{base_url}/fileupload', new_tab=False)} from browser_use.agent.views import ActionModel class GoToUrlActionModel(ActionModel): go_to_url: GoToUrlAction | None = None await tools.act(GoToUrlActionModel(**goto_action), browser_session) # Wait for the page to load await asyncio.sleep(0.5) # Initialize the DOM state to populate the selector map await browser_session.get_browser_state_summary(cache_clickable_elements_hashes=True) # Get the selector map selector_map = await browser_session.get_selector_map() # Find the label element that triggers the file input label_index = None for idx, element in selector_map.items(): if element.tag_name.lower() == 'label' and 'upload-label' in str(element.attributes.get('class', '')): label_index = idx break assert label_index is not None, 'Could not find file upload label element' # Create action model for file upload class UploadFileActionModel(ActionModel): upload_file_to_element: UploadFileAction | None = None # Create a temporary FileSystem for the test import tempfile from browser_use.filesystem.file_system import FileSystem with tempfile.TemporaryDirectory() as temp_dir: file_system = FileSystem(base_dir=temp_dir) # Upload the file using the label index (should find the associated file input) result = await tools.act( UploadFileActionModel(upload_file_to_element=UploadFileAction(index=label_index, path=temp_file_path)), browser_session, available_file_paths=[temp_file_path], # Pass the file path as available file_system=file_system, # Pass the required file_system parameter ) # Verify the upload action succeeded assert result.error is None, f'File upload failed: {result.error}' assert result.extracted_content is not None assert 'Successfully uploaded file' in result.extracted_content # Wait a moment for the JavaScript to process the file await asyncio.sleep(0.5) # Verify the file was actually selected using CDP Runtime.evaluate cdp_session = await browser_session.get_or_create_cdp_session() # Check if the file input has a file selected file_check_js = await browser_session.cdp_client.send.Runtime.evaluate( params={ 'expression': """ (() => { const input = document.getElementById('fileInput'); if (!input || !input.files || input.files.length === 0) { return { hasFile: false }; } const file = input.files[0]; return { hasFile: true, fileName: file.name, fileSize: file.size, fileType: file.type || 'text/plain' }; })() """, 'returnByValue': True, }, session_id=cdp_session.session_id, ) file_info = file_check_js.get('result', {}).get('value', {}) # Verify file was selected assert file_info.get('hasFile') is True, 'File was not properly selected in the input element' assert file_info.get('fileName', '').endswith('.txt'), f'Expected .txt file, got: {file_info.get("fileName")}' assert file_info.get('fileSize', 0) > 0, 'File size should be greater than 0' # Also verify the UI was updated (the file info div) ui_check_js = await browser_session.cdp_client.send.Runtime.evaluate( params={ 'expression': """ (() => { const fileName = document.getElementById('fileName').textContent; const fileSize = document.getElementById('fileSize').textContent; return { fileNameText: fileName, fileSizeText: fileSize, hasFileInfo: !fileName.includes('No file selected') }; })() """, 'returnByValue': True, }, session_id=cdp_session.session_id, ) ui_info = ui_check_js.get('result', {}).get('value', {}) # Verify UI was updated assert ui_info.get('hasFileInfo') is True, 'UI was not updated with file information' assert '.txt' in ui_info.get('fileNameText', ''), f'File name not shown in UI: {ui_info.get("fileNameText")}' assert 'bytes' in ui_info.get('fileSizeText', ''), f'File size not shown in UI: {ui_info.get("fileSizeText")}' finally: # Clean up the temporary file Path(temp_file_path).unlink(missing_ok=True) async def test_file_upload_path_validation(self, tools, browser_session, base_url, http_server): """Test that file upload validates paths correctly with available_file_paths, downloaded_files, and FileSystem.""" from pathlib import Path from browser_use.browser.views import BrowserError from browser_use.filesystem.file_system import FileSystem from browser_use.tools.views import UploadFileAction # Create a temporary test file that's NOT in available_file_paths with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as temp_file: temp_file.write('Test file content') test_file_path = temp_file.name try: # Set up test page with file input http_server.expect_request('/upload-test').respond_with_data( """

File Upload Test

""", content_type='text/html', ) # Navigate to the test page goto_action = {'go_to_url': GoToUrlAction(url=f'{base_url}/upload-test', new_tab=False)} from browser_use.agent.views import ActionModel class GoToUrlActionModel(ActionModel): go_to_url: GoToUrlAction | None = None await tools.act(GoToUrlActionModel(**goto_action), browser_session) await asyncio.sleep(0.5) # Get browser state to populate selector map from browser_use.browser.events import BrowserStateRequestEvent event = browser_session.event_bus.dispatch(BrowserStateRequestEvent()) state = await event # Test 1: Try to upload a file that's not in available_file_paths - should fail class UploadActionModel(ActionModel): upload_file_to_element: UploadFileAction | None = None upload_action = UploadActionModel(upload_file_to_element=UploadFileAction(index=1, path=test_file_path)) # Create a temporary FileSystem for all tests with tempfile.TemporaryDirectory() as temp_dir: file_system = FileSystem(base_dir=temp_dir) try: # This should fail because the file is not in available_file_paths result = await tools.act( upload_action, browser_session, available_file_paths=[], # Empty available_file_paths file_system=file_system, ) assert result.error is not None, 'Upload should have failed for file not in available_file_paths' assert 'not available' in result.error, f'Error message should mention file not available: {result.error}' except BrowserError as e: assert 'not available' in str(e), f'Error should mention file not available: {e}' # Test 2: Add file to available_file_paths - should succeed result = await tools.act( upload_action, browser_session, available_file_paths=[test_file_path], # File is now in available_file_paths file_system=file_system, ) assert result.error is None, f'Upload should have succeeded with file in available_file_paths: {result.error}' # Test 3: Test with FileSystem integration - write a test file to the FileSystem await file_system.write_file('test.txt', 'FileSystem test content') fs_file_path = str(file_system.get_dir() / 'test.txt') # Try to upload using just the filename (should check FileSystem) upload_action_fs = UploadActionModel(upload_file_to_element=UploadFileAction(index=1, path='test.txt')) result = await tools.act( upload_action_fs, browser_session, available_file_paths=[], # Empty available_file_paths file_system=file_system, # But FileSystem is provided ) assert result.error is None, f'Upload should have succeeded with file in FileSystem: {result.error}' # Test 4: Simulate a downloaded file # Manually add a file to browser_session._downloaded_files to simulate a download browser_session._downloaded_files.append(test_file_path) # Try uploading with the file only in downloaded_files result = await tools.act( upload_action, browser_session, available_file_paths=[], # Empty available_file_paths, but file is in downloaded_files file_system=file_system, ) assert result.error is None, f'Upload should have succeeded with file in downloaded_files: {result.error}' finally: # Clean up the temporary file Path(test_file_path).unlink(missing_ok=True)