mirror of
https://github.com/browser-use/browser-use
synced 2026-04-22 17:45:09 +02:00
Click element c init
This commit is contained in:
@@ -22,6 +22,10 @@ class BrowserActions:
|
||||
self.go_back()
|
||||
elif action_name == "done":
|
||||
return True
|
||||
elif action_name == "input":
|
||||
self.input_text_by_c(params["c"], params["text"])
|
||||
elif action_name == "click":
|
||||
self.click_element_by_c(params["c"])
|
||||
else:
|
||||
raise Exception(f"Action {action_name} not found")
|
||||
|
||||
@@ -50,48 +54,31 @@ class BrowserActions:
|
||||
"""
|
||||
self.driver.back()
|
||||
|
||||
# specific actions
|
||||
def click_element(self, identifier: dict):
|
||||
def click_element_by_c(self, c_value: str):
|
||||
"""
|
||||
Clicks an element identified by attributes.
|
||||
Clicks an element identified by its c attribute.
|
||||
"""
|
||||
element = self._find_element(identifier)
|
||||
element = self.wait.until(
|
||||
EC.element_to_be_clickable((By.CSS_SELECTOR, f'[c="{c_value}"]'))
|
||||
)
|
||||
element.click()
|
||||
|
||||
def input_text(self, identifier: dict, text: str):
|
||||
def input_text_by_c(self, c_value: str, text: str):
|
||||
"""
|
||||
Inputs text into a field identified by attributes.
|
||||
Inputs text into a field identified by its c attribute.
|
||||
"""
|
||||
element = self._find_element(identifier)
|
||||
element = self.wait.until(
|
||||
EC.presence_of_element_located((By.CSS_SELECTOR, f'[c="{c_value}"]'))
|
||||
)
|
||||
element.clear()
|
||||
element.send_keys(text)
|
||||
|
||||
def _find_element(self, identifier: dict):
|
||||
"""
|
||||
Helper method to find elements based on various identifiers.
|
||||
"""
|
||||
for key, value in identifier.items():
|
||||
try:
|
||||
if key == "id":
|
||||
return self.wait.until(
|
||||
EC.presence_of_element_located((By.ID, value))
|
||||
)
|
||||
elif key == "class":
|
||||
return self.wait.until(
|
||||
EC.presence_of_element_located((By.CLASS_NAME, value))
|
||||
)
|
||||
elif key == "name":
|
||||
return self.wait.until(
|
||||
EC.presence_of_element_located((By.NAME, value))
|
||||
)
|
||||
except:
|
||||
continue
|
||||
raise Exception("Element not found with provided identifiers")
|
||||
|
||||
def get_default_actions(self) -> dict[str, str]:
|
||||
return {
|
||||
"search_google": "query: string",
|
||||
"go_to_url": "url: string",
|
||||
"done": "",
|
||||
"go_back": ""
|
||||
"go_back": "",
|
||||
"click": "c: int",
|
||||
"input": "c: int, text: string",
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ import json
|
||||
import os
|
||||
from openai import OpenAI
|
||||
from dotenv import load_dotenv
|
||||
from tokencost import calculate_prompt_cost, count_string_tokens
|
||||
|
||||
|
||||
class PlaningAgent:
|
||||
@@ -16,13 +17,25 @@ class PlaningAgent:
|
||||
# TODO: include state, actions, etc.
|
||||
|
||||
# select next functions to call
|
||||
self.messages.append({"role": "user", "content": task})
|
||||
messages = self.messages + [{"role": "user", "content": task}]
|
||||
|
||||
# Calculate total cost for all messages
|
||||
total_cost = calculate_prompt_cost(messages, self.model)
|
||||
total_tokens = count_string_tokens(" ".join([m["content"] for m in messages]), self.model)
|
||||
print(
|
||||
"Total prompt cost: ", f"${total_cost:,.2f}",
|
||||
"Total tokens: ", f"{total_tokens:,}",
|
||||
)
|
||||
|
||||
response = self.client.chat.completions.create(
|
||||
model=self.model,
|
||||
messages=self.messages,
|
||||
messages=messages,
|
||||
response_format={"type": "json_object"}
|
||||
)
|
||||
self.messages.append(response.choices[0].message)
|
||||
|
||||
# Only append the output message
|
||||
self.messages.append({"role": "assistant", "content": response.choices[0].message.content})
|
||||
|
||||
# parse the response
|
||||
return json.loads(response.choices[0].message.content)
|
||||
|
||||
@@ -34,13 +47,15 @@ class PlaningAgent:
|
||||
|
||||
AGENT_PROMPT = f"""
|
||||
You are a web scraping agent. Your task is to control the browser where, for every step,
|
||||
you get the current state and a list of actions you can take as dictionary. You have to select the next action in json format:
|
||||
you get the current state and a list of actions you can take as dictionary.
|
||||
If you want to click on an element or input text, you need to specify the element id (c="") from the cleaned HTML.
|
||||
You have to select the next action in json format:
|
||||
{output_format}
|
||||
|
||||
Your task is:
|
||||
{task}
|
||||
|
||||
Available default actions:
|
||||
Available actions:
|
||||
{default_actions}
|
||||
"""
|
||||
|
||||
|
||||
@@ -2,7 +2,9 @@ from selenium import webdriver
|
||||
from selenium.webdriver.common.by import By
|
||||
from typing import Dict, List
|
||||
from bs4 import BeautifulSoup
|
||||
from .html_cleaner import cleanup_html
|
||||
from src.state_manager.utils import cleanup_html
|
||||
import requests
|
||||
from main_content_extractor import MainContentExtractor
|
||||
|
||||
|
||||
class StateManager:
|
||||
@@ -18,13 +20,13 @@ class StateManager:
|
||||
"""
|
||||
html_content = self.driver.page_source
|
||||
cleaned_html = cleanup_html(html_content)
|
||||
functions = self.get_functions()
|
||||
|
||||
main_content = self.get_main_content()
|
||||
return {
|
||||
"current_url": self.driver.current_url,
|
||||
"page_title": self.driver.title,
|
||||
"interactable_elements": cleaned_html,
|
||||
"functions": functions
|
||||
"main_content": main_content,
|
||||
# "functions": functions
|
||||
}
|
||||
|
||||
def get_functions(self) -> List[Dict]:
|
||||
@@ -32,3 +34,24 @@ class StateManager:
|
||||
Retrieves available functions from cleaned HTML.
|
||||
"""
|
||||
return []
|
||||
|
||||
def get_main_content(self) -> str:
|
||||
"""
|
||||
Retrieves main content from cleaned Markdown.
|
||||
"""
|
||||
try:
|
||||
# Get HTML using requests
|
||||
response = requests.get(self.driver.current_url)
|
||||
response.encoding = 'utf-8'
|
||||
content = response.text
|
||||
|
||||
# Get HTML with main content extracted from HTML
|
||||
# extracted_html = MainContentExtractor.extract(content)
|
||||
|
||||
# Get HTML with main content extracted from Markdown
|
||||
extracted_markdown = MainContentExtractor.extract(content, output_format="markdown")
|
||||
|
||||
return extracted_markdown
|
||||
except Exception as e:
|
||||
print(f"Error getting main content: {e}")
|
||||
return ""
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from bs4 import BeautifulSoup
|
||||
from bs4 import BeautifulSoup, Comment, Tag
|
||||
import re
|
||||
import os
|
||||
|
||||
|
||||
def cleanup_html(html_content):
|
||||
@@ -86,3 +88,27 @@ def cleanup_html(html_content):
|
||||
cleaned_html = re.sub(empty_tags_pattern, "", cleaned_html)
|
||||
|
||||
return cleaned_html
|
||||
|
||||
|
||||
# Format and save HTML content to file
|
||||
|
||||
|
||||
def save_formatted_html(html_content, output_file_name):
|
||||
"""
|
||||
Format HTML content using BeautifulSoup and save to file
|
||||
|
||||
Args:
|
||||
html_content (str): Raw HTML content to format
|
||||
output_file_name (str): Name of the file where formatted HTML will be saved
|
||||
"""
|
||||
# Format HTML with BeautifulSoup for nice indentation
|
||||
soup = BeautifulSoup(html_content, 'html.parser')
|
||||
formatted_html = soup.prettify()
|
||||
|
||||
# create temp folder if it doesn't exist
|
||||
if not os.path.exists("temp"):
|
||||
os.makedirs("temp")
|
||||
|
||||
# Save formatted HTML to file
|
||||
with open("temp/"+output_file_name, 'w', encoding='utf-8') as f:
|
||||
f.write(formatted_html)
|
||||
@@ -2,6 +2,7 @@ import unittest
|
||||
from src.utils.selenium_utils import setup_selenium_driver
|
||||
from src.actions.browser_actions import BrowserActions
|
||||
from src.state_manager.state import StateManager
|
||||
from src.state_manager.utils import save_formatted_html
|
||||
from src.agent_interface.planing_agent import PlaningAgent
|
||||
|
||||
|
||||
@@ -25,10 +26,10 @@ class TestKayakSearch(unittest.TestCase):
|
||||
|
||||
# Main interaction loop
|
||||
max_steps = 10
|
||||
for _ in range(max_steps):
|
||||
for i in range(max_steps):
|
||||
# Get current state
|
||||
current_state = self.state_manager.get_current_state()
|
||||
|
||||
save_formatted_html(current_state["interactable_elements"], f"current_state_{i}.html")
|
||||
# Get next action from agent
|
||||
text = f"Current state: {current_state}"
|
||||
print(f"\n{text}\n")
|
||||
|
||||
Reference in New Issue
Block a user