make eval service fetch rotating auth info from server

This commit is contained in:
Alezander9
2025-07-02 16:30:48 -07:00
parent 4c1b79cde5
commit e8db375401

View File

@@ -1119,6 +1119,7 @@ class Task:
self.login_type = kwargs.get('login_type', None)
self.category = kwargs.get('category', None)
self.output_schema = kwargs.get('output_schema', None) # Add structured output schema support
self.auth_keys = kwargs.get('auth_keys', None) # List of auth keys to fetch from auth distribution
if self.output_schema:
# Convert JSON schema to Pydantic model class
self.output_model = create_pydantic_model_from_schema(self.output_schema, f'Task_{self.task_id}_Output')
@@ -1135,6 +1136,7 @@ class Task:
'login_type',
'category',
'output_schema',
'auth_keys',
}
self.additional_fields = {k: v for k, v in kwargs.items() if k not in known_fields}
@@ -1144,7 +1146,7 @@ class Task:
def __str__(self):
# Include main fields and indicate if there are additional fields
base_str = f'Task(task_id={self.task_id}, confirmed_task={self.confirmed_task}, website={self.website}, reference_length={self.reference_length}, level={self.level}, cluster_id={self.cluster_id}, login_cookie={self.login_cookie}, login_type={self.login_type}, category={self.category}, output_schema={self.output_schema}'
base_str = f'Task(task_id={self.task_id}, confirmed_task={self.confirmed_task}, website={self.website}, reference_length={self.reference_length}, level={self.level}, cluster_id={self.cluster_id}, login_cookie={self.login_cookie}, login_type={self.login_type}, category={self.category}, output_schema={self.output_schema}, auth_keys={self.auth_keys}'
if self.additional_fields:
additional_str = ', '.join(f'{k}={v}' for k, v in self.additional_fields.items())
base_str += f', {additional_str}'
@@ -1603,6 +1605,7 @@ async def run_task_with_semaphore(
headless: bool,
use_vision: bool,
semaphore_runs: asyncio.Semaphore, # Pass semaphore as argument
auth_distribution: dict | None = None, # Pre-fetched auth distribution
github_workflow_url: str | None = None,
use_serp: bool = False,
enable_memory: bool = False,
@@ -1727,6 +1730,38 @@ async def run_task_with_semaphore(
convex_url, secret_key, run_id, task.task_id, 'run_agent', 'active', github_workflow_url
)
# Handle auth information if task requires it
task_with_auth = task
if hasattr(task, 'auth_keys') and task.auth_keys:
# Validate auth_keys is a list
if isinstance(task.auth_keys, list) and len(task.auth_keys) > 0:
if auth_distribution:
logger.info(
f'Task {task.task_id}: Using pre-fetched auth distribution for auth_keys: {task.auth_keys}'
)
auth_info_text = format_auth_info_for_agent(auth_distribution, task.auth_keys)
if auth_info_text:
# Create a modified task with auth info appended
class TaskWithAuth:
def __init__(self, original_task, auth_text):
# Copy all attributes from original task
for attr_name in dir(original_task):
if not attr_name.startswith('__'):
setattr(self, attr_name, getattr(original_task, attr_name))
# Modify the confirmed_task to include auth info
self.confirmed_task = original_task.confirmed_task + auth_text
task_with_auth = TaskWithAuth(task, auth_info_text)
logger.info(f'Task {task.task_id}: Auth info added to task description')
else:
logger.warning(
f'Task {task.task_id}: No matching auth info found for keys: {task.auth_keys}'
)
else:
logger.warning(f'Task {task.task_id}: Auth keys specified but no auth distribution available')
else:
logger.warning(f'Task {task.task_id}: auth_keys is not a valid list: {task.auth_keys}')
# Start timing for agent execution only
agent_start_time = time.time()
@@ -1734,7 +1769,7 @@ async def run_task_with_semaphore(
Stage.RUN_AGENT,
lambda: run_agent_with_browser(
browser_session,
task,
task_with_auth,
llm,
max_steps_per_task,
use_vision,
@@ -1987,6 +2022,7 @@ async def run_multiple_tasks(
convex_url: str,
secret_key: str,
eval_model: BaseChatModel,
auth_distribution: dict | None = None,
github_workflow_url: str | None = None,
max_parallel_runs: int = 3,
max_steps_per_task: int = 25,
@@ -2072,6 +2108,7 @@ async def run_multiple_tasks(
headless=headless,
use_vision=use_vision,
semaphore_runs=semaphore_runs, # Pass the semaphore
auth_distribution=auth_distribution, # Pass the pre-fetched auth distribution
github_workflow_url=github_workflow_url,
use_serp=use_serp,
enable_memory=enable_memory,
@@ -2195,6 +2232,109 @@ def fetch_tasks_from_server(convex_url: str, secret_key: str, test_case_name: st
return None
# Helper function to fetch auth distribution from the server
def fetch_auth_distribution_from_server(convex_url: str, secret_key: str):
"""Fetches an available auth distribution from the Convex HTTP endpoint."""
if not convex_url:
logger.error('Error: EVALUATION_TOOL_URL environment variable not set.')
return None
if not secret_key:
logger.error('Error: EVALUATION_TOOL_SECRET_KEY environment variable not set.')
return None
endpoint_url = f'{convex_url}/api/getAuthDistribution'
headers = {
'Authorization': f'Bearer {secret_key}',
'Content-Type': 'application/json',
}
logger.info(f'Fetching auth distribution from {endpoint_url}...')
try:
response = requests.post(endpoint_url, headers=headers, json={})
logger.info(f'Fetch Auth Distribution Status Code: {response.status_code}')
if response.status_code == 200:
try:
data = response.json()
logger.info('Successfully fetched auth distribution data.')
# Verify the response has the expected structure
if isinstance(data, dict) and 'id' in data and 'loginInfo' in data:
return data
else:
logger.error(
f'Error: Fetched auth distribution data has unexpected structure. Keys: {list(data.keys()) if isinstance(data, dict) else "Not a dict"}'
)
logger.error(f'Raw response: {response.text}')
return None
except json.JSONDecodeError:
logger.error('Error: Failed to decode JSON response for auth distribution.')
logger.error(f'Raw response text: {response.text}')
return None
elif response.status_code == 404:
logger.warning('No available auth distribution found on server.')
return None
else:
logger.error(f'Error: Failed to fetch auth distribution. Status: {response.status_code}')
logger.error(f'Response: {response.text}')
return None
except requests.exceptions.RequestException as e:
logger.error(f'Error during request to fetch auth distribution: {type(e).__name__}: {e}')
return None
# Helper function to format auth information for the agent
def format_auth_info_for_agent(auth_distribution: dict, auth_keys: list[str]) -> str:
"""
Formats auth information from auth distribution for the agent task description.
Args:
auth_distribution: Dict with 'loginInfo' key containing auth data
auth_keys: List of auth keys to extract (e.g., ['google', 'facebook'])
Returns:
Formatted string with login credentials or empty string if no matching keys
"""
if not auth_distribution or not auth_keys:
return ''
login_info = auth_distribution.get('loginInfo', {})
if not login_info:
logger.warning('Auth distribution has no loginInfo')
return ''
# Extract relevant auth information based on auth_keys
relevant_auths = []
for auth_key in auth_keys:
if auth_key in login_info:
auth_data = login_info[auth_key]
if isinstance(auth_data, dict):
# Format the auth data for this key
auth_details = []
for key, value in auth_data.items():
auth_details.append(f'{key}: {value}')
if auth_details:
relevant_auths.append(f'{auth_key} with {", ".join(auth_details)}')
else:
logger.warning(f"Auth data for key '{auth_key}' is not a dictionary: {type(auth_data)}")
else:
logger.warning(f"Auth key '{auth_key}' not found in available login info. Available keys: {list(login_info.keys())}")
if relevant_auths:
auth_text = f'\n\nThe following login credentials can be used to complete this task: {"; ".join(relevant_auths)}.'
logger.info(f'Formatted auth info: {auth_text}')
return auth_text
else:
logger.warning(f'No matching auth keys found. Requested: {auth_keys}, Available: {list(login_info.keys())}')
return ''
# Helper function to get git information
def get_git_info():
"""Retrieves git branch, commit hash, commit timestamp, and repository URL using subprocess."""
@@ -2424,6 +2564,7 @@ async def run_evaluation_pipeline(
convex_url: str,
secret_key: str,
eval_model: BaseChatModel,
auth_distribution: dict | None = None,
github_workflow_url: str | None = None,
max_parallel_runs: int = 3,
max_steps_per_task: int = 25,
@@ -2477,6 +2618,7 @@ async def run_evaluation_pipeline(
convex_url=convex_url,
secret_key=secret_key,
eval_model=eval_model,
auth_distribution=auth_distribution,
github_workflow_url=github_workflow_url,
max_parallel_runs=max_parallel_runs,
max_steps_per_task=max_steps_per_task,
@@ -2585,6 +2727,7 @@ if __name__ == '__main__':
# --- Load Tasks (Either Single Task or from Server) ---
tasks = []
task_id = None # Initialize for proper scoping
auth_distribution = None # Initialize auth distribution
# Check if this is single task mode
if args.task_text:
@@ -2619,10 +2762,28 @@ if __name__ == '__main__':
logger.info(f'Successfully loaded {len(tasks)} tasks from the server.')
except (TypeError, ValueError) as e:
logger.error(
f'Error creating Task objects from fetched data. Ensure the data structure includes required fields (task_id, confirmed_task). Known optional fields: website, reference_length, level, cluster_id, login_cookie, login_type, category. Any additional fields will be accepted dynamically. Error: {type(e).__name__}: {e}'
f'Error creating Task objects from fetched data. Ensure the data structure includes required fields (task_id, confirmed_task). Known optional fields: website, reference_length, level, cluster_id, login_cookie, login_type, category, auth_keys. Any additional fields will be accepted dynamically. Error: {type(e).__name__}: {e}'
)
logger.error(f'First item in fetched data: {fetched_task_data[0] if fetched_task_data else "None"}')
exit(1)
# --- Fetch Auth Distribution Once (if any tasks need auth) ---
tasks_with_auth = [
task
for task in tasks
if hasattr(task, 'auth_keys') and task.auth_keys and isinstance(task.auth_keys, list) and len(task.auth_keys) > 0
]
if tasks_with_auth and CONVEX_URL and SECRET_KEY:
logger.info(f'Found {len(tasks_with_auth)} tasks requiring auth. Fetching auth distribution...')
auth_distribution = fetch_auth_distribution_from_server(CONVEX_URL, SECRET_KEY)
if auth_distribution:
logger.info(
f'Successfully fetched auth distribution with login info for: {list(auth_distribution.get("loginInfo", {}).keys())}'
)
else:
logger.warning('Failed to fetch auth distribution. Tasks requiring auth may fail.')
elif tasks_with_auth:
logger.warning(f'Found {len(tasks_with_auth)} tasks requiring auth but no server config available')
# -----------------------------
# --- Start Run on Server (with optional existing Run ID) ---
@@ -2788,6 +2949,7 @@ if __name__ == '__main__':
convex_url=convex_url,
secret_key=secret_key,
eval_model=eval_model,
auth_distribution=auth_distribution,
github_workflow_url=args.github_workflow_url,
max_parallel_runs=parallel_runs,
max_steps_per_task=args.max_steps,