Files
browser-use/eval/server.py

350 lines
12 KiB
Python

import json
import logging
import os
import time
import requests
logger = logging.getLogger(__name__)
def fetch_tasks_from_server(convex_url: str, secret_key: str, test_case_name: str):
"""Fetches the specified test case file from the Convex HTTP endpoint."""
if not convex_url:
logger.error('Error: EVALUATION_TOOL_URL environment variable not set.')
return None
if not secret_key:
logger.error('Error: EVALUATION_TOOL_SECRET_KEY environment variable not set.')
return None
endpoint_url = f'{convex_url}/api/getTestCase'
headers = {
'Authorization': f'Bearer {secret_key}',
'Content-Type': 'application/json',
}
payload = {'name': test_case_name}
logger.info(f"Fetching test case '{test_case_name}' from {endpoint_url}...")
try:
response = requests.post(endpoint_url, headers=headers, json=payload, timeout=10)
logger.info(f'Fetch Status Code: {response.status_code}')
if response.status_code == 200:
try:
data = response.json()
logger.info(f"Successfully fetched test case data for '{test_case_name}'.")
# Assuming the data is the list of tasks
if isinstance(data, list):
return data
else:
logger.error(f'Error: Fetched data is not a list. Type: {type(data)}')
logger.error(f'Raw response: {response.text}')
return None
except json.JSONDecodeError:
logger.error('Error: Failed to decode JSON response.')
logger.error(f'Raw response text: {response.text}')
return None
else:
logger.error(f"Error: Failed to fetch test case '{test_case_name}'. Status: {response.status_code}")
logger.error(f'Response: {response.text}')
return None
except requests.exceptions.RequestException as e:
logger.error(f'Error during request to fetch test case: {type(e).__name__}: {e}')
return None
def fetch_auth_distribution_from_server(convex_url: str, secret_key: str):
"""Fetches an available auth distribution from the Convex HTTP endpoint."""
if not convex_url:
logger.error('Error: EVALUATION_TOOL_URL environment variable not set.')
return None
if not secret_key:
logger.error('Error: EVALUATION_TOOL_SECRET_KEY environment variable not set.')
return None
endpoint_url = f'{convex_url}/api/getAuthDistribution'
headers = {
'Authorization': f'Bearer {secret_key}',
'Content-Type': 'application/json',
}
logger.info(f'Fetching auth distribution from {endpoint_url}...')
try:
response = requests.post(endpoint_url, headers=headers, json={}, timeout=10)
logger.info(f'Fetch Auth Distribution Status Code: {response.status_code}')
if response.status_code == 200:
try:
data = response.json()
logger.info('Successfully fetched auth distribution data.')
# Verify the response has the expected structure
if isinstance(data, dict) and 'id' in data and 'loginInfo' in data:
return data
else:
logger.error(
f'Error: Fetched auth distribution data has unexpected structure. Keys: {list(data.keys()) if isinstance(data, dict) else "Not a dict"}'
)
logger.error(f'Raw response: {response.text}')
return None
except json.JSONDecodeError:
logger.error('Error: Failed to decode JSON response for auth distribution.')
logger.error(f'Raw response text: {response.text}')
return None
elif response.status_code == 404:
logger.warning('No available auth distribution found on server.')
return None
else:
logger.error(f'Error: Failed to fetch auth distribution. Status: {response.status_code}')
logger.error(f'Response: {response.text}')
return None
except requests.exceptions.RequestException as e:
logger.error(f'Error during request to fetch auth distribution: {type(e).__name__}: {e}')
return None
def format_auth_info_for_agent(auth_distribution: dict, auth_keys: list[str]) -> str:
"""
Formats auth information from auth distribution for the agent task description.
Args:
auth_distribution: Dict with 'loginInfo' key containing auth data
auth_keys: List of auth keys to extract (e.g., ['google', 'facebook'])
Returns:
Formatted string with login credentials or empty string if no matching keys
"""
if not auth_distribution or not auth_keys:
return ''
login_info = auth_distribution.get('loginInfo', {})
if not login_info:
logger.warning('Auth distribution has no loginInfo')
return ''
# Extract relevant auth information based on auth_keys
relevant_auths = []
for auth_key in auth_keys:
if auth_key in login_info:
auth_data = login_info[auth_key]
if isinstance(auth_data, dict):
# Format the auth data for this key
auth_details = []
for key, value in auth_data.items():
auth_details.append(f'{key}: {value}')
if auth_details:
relevant_auths.append(f'{auth_key} with {", ".join(auth_details)}')
else:
logger.warning(f"Auth data for key '{auth_key}' is not a dictionary: {type(auth_data)}")
else:
logger.warning(f"Auth key '{auth_key}' not found in available login info. Available keys: {list(login_info.keys())}")
if relevant_auths:
auth_text = f"\n\nOnly log into the account if it's required to complete the task. Do not log in otherwise.\n The following login credentials can be used to complete this task: {'; '.join(relevant_auths)}\nIf you need a verification code or OTP that is sent to your email, you can use the get_recent_emails action to get your latest emails."
logger.info(f'Formatted auth info: {auth_text}')
return auth_text
else:
logger.warning(f'No matching auth keys found. Requested: {auth_keys}, Available: {list(login_info.keys())}')
return ''
def start_new_run(convex_url: str, secret_key: str, run_details: dict, existing_run_id: str | None = None):
"""Sends a request to start a new evaluation run and returns the run ID."""
if not convex_url or not secret_key:
logger.error('Error: Convex URL or Secret Key not provided for starting run.')
return None
endpoint_url = f'{convex_url}/api/startRun'
headers = {
'Authorization': f'Bearer {secret_key}',
'Content-Type': 'application/json',
}
# Add existing_run_id to the payload if provided
payload = run_details.copy()
if existing_run_id:
payload['runId'] = existing_run_id
logger.info(f'Sending request to start run at {endpoint_url}...')
# Avoid logging secret key in run_details if it were ever passed
loggable_details = {k: v for k, v in payload.items() if k != 'secret_key'}
logger.info(f'Run details: {json.dumps(loggable_details, indent=2)}')
try:
response = requests.post(endpoint_url, headers=headers, json=payload, timeout=10)
logger.info(f'Start Run Status Code: {response.status_code}')
if response.status_code == 200:
try:
data = response.json()
run_id = data.get('runId')
if run_id:
logger.info(f'Successfully started run. Run ID: {run_id}')
return run_id
else:
logger.error("Error: 'runId' not found in successful startRun response.")
logger.error(f'Raw response: {response.text}')
return None
except json.JSONDecodeError:
logger.error('Error: Failed to decode startRun JSON response.')
logger.error(f'Raw response text: {response.text}')
return None
else:
logger.error('Error: Failed to start run.')
logger.error(f'Response: {response.text}')
return None
except requests.exceptions.RequestException as e:
logger.error(f'Error during startRun request: {type(e).__name__}: {e}')
return None
def save_task_result_to_server(convex_url: str, secret_key: str, result_details: dict):
"""Sends a request to save a single task result to the Convex backend."""
if not convex_url:
logger.error('Error: EVALUATION_TOOL_URL environment variable not set for saving task result.')
return False
if not secret_key:
logger.error('Error: EVALUATION_TOOL_SECRET_KEY environment variable not set for saving task result.')
return False
# Ensure runId is present in the details being sent
if 'runId' not in result_details or not result_details['runId']:
logger.error("Error: 'runId' is missing or empty in result_details for saveTaskResult.")
return False
endpoint_url = f'{convex_url}/api/saveTaskResult'
headers = {
'Authorization': f'Bearer {secret_key}',
'Content-Type': 'application/json',
}
logger.info(f'Sending request to save task result at {endpoint_url}...')
logger.debug(f'Result details payload: {json.dumps(result_details, indent=2)}') # Log details at debug level
try:
response = requests.post(endpoint_url, headers=headers, json=result_details, timeout=10)
logger.info(f'Save Task Result Status Code: {response.status_code}')
if response.status_code == 200:
try:
data = response.json()
logger.info(f'Successfully saved task result: {data.get("message")}')
logger.info(f'Result ID: {data.get("resultId")}')
return True
except json.JSONDecodeError:
logger.error('Error: Failed to decode saveTaskResult JSON response.')
logger.error(f'Raw response text: {response.text}')
return False
else:
logger.error('Error: Failed to save task result.')
logger.error(f'Response: {response.text}')
return False
except requests.exceptions.RequestException as e:
logger.error(f'Error during saveTaskResult request: {type(e).__name__}: {e}')
return False
def save_runner_progress_to_server(convex_url: str, secret_key: str, progress_details: dict):
"""Sends a request to save runner progress to the Convex backend."""
if not convex_url:
logger.debug('No EVALUATION_TOOL_URL environment variable set for saving runner progress.')
return False
if not secret_key:
logger.debug('No EVALUATION_TOOL_SECRET_KEY environment variable set for saving runner progress.')
return False
endpoint_url = f'{convex_url}/api/saveRunnerProgress'
headers = {
'Authorization': f'Bearer {secret_key}',
'Content-Type': 'application/json',
}
try:
response = requests.post(endpoint_url, headers=headers, json=progress_details, timeout=10)
if response.status_code == 200:
logger.debug(f'Successfully saved runner progress for {progress_details.get("runnerId")}')
return True
else:
logger.warning(f'Failed to save runner progress. Status: {response.status_code}')
return False
except requests.exceptions.RequestException as e:
logger.warning(f'Error during saveRunnerProgress request: {type(e).__name__}: {e}')
return False
def generate_runner_id(task_id: str, github_run_id: str | None = None) -> str:
"""Generate a unique runner ID for progress tracking that matches GitHub Actions pattern"""
if github_run_id:
# Use batch-level runner ID consistent with GitHub Actions
# GitHub Actions uses: github_run_{GITHUB_RUN_ID}_batch_{START_INDEX}
# Get start index from environment variable set by GitHub Actions
start_index = os.getenv('EVAL_START_INDEX', '0')
return f'github_run_{github_run_id}_batch_{start_index}'
else:
# Fallback for local runs
return f'local_run_{int(time.time())}'
def send_progress_update(
convex_url: str,
secret_key: str,
run_id: str,
task_id: str,
current_stage: str,
status: str = 'active',
github_workflow_url: str | None = None,
assigned_task_range: str | None = None,
error_message: str | None = None,
) -> bool:
"""Send a progress update for the current runner and task"""
try:
# Generate runner ID
github_run_id = os.getenv('GITHUB_RUN_ID')
runner_id = generate_runner_id(task_id, github_run_id)
# Extract workflow run ID from URL if available
github_workflow_run_id = None
if github_workflow_url and 'actions/runs/' in github_workflow_url:
try:
github_workflow_run_id = github_workflow_url.split('actions/runs/')[1].split('/')[0]
except (IndexError, AttributeError):
pass
progress_details = {
'runId': run_id,
'runnerId': runner_id,
'taskId': task_id,
'currentStage': current_stage,
'status': status,
'githubWorkflowUrl': github_workflow_url,
'githubWorkflowRunId': github_workflow_run_id,
'assignedTaskRange': assigned_task_range,
'errorMessage': error_message,
}
return save_runner_progress_to_server(convex_url, secret_key, progress_details)
except Exception as e:
logger.warning(f'Failed to send progress update: {type(e).__name__}: {e}')
return False