From e8db375401582cc83bc1d4d86cdb0ec84dfb4576 Mon Sep 17 00:00:00 2001 From: Alezander9 Date: Wed, 2 Jul 2025 16:30:48 -0700 Subject: [PATCH] make eval service fetch rotating auth info from server --- eval/service.py | 168 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 165 insertions(+), 3 deletions(-) diff --git a/eval/service.py b/eval/service.py index 0a2a63018..d5386a1e4 100644 --- a/eval/service.py +++ b/eval/service.py @@ -1119,6 +1119,7 @@ class Task: self.login_type = kwargs.get('login_type', None) self.category = kwargs.get('category', None) self.output_schema = kwargs.get('output_schema', None) # Add structured output schema support + self.auth_keys = kwargs.get('auth_keys', None) # List of auth keys to fetch from auth distribution if self.output_schema: # Convert JSON schema to Pydantic model class self.output_model = create_pydantic_model_from_schema(self.output_schema, f'Task_{self.task_id}_Output') @@ -1135,6 +1136,7 @@ class Task: 'login_type', 'category', 'output_schema', + 'auth_keys', } self.additional_fields = {k: v for k, v in kwargs.items() if k not in known_fields} @@ -1144,7 +1146,7 @@ class Task: def __str__(self): # Include main fields and indicate if there are additional fields - base_str = f'Task(task_id={self.task_id}, confirmed_task={self.confirmed_task}, website={self.website}, reference_length={self.reference_length}, level={self.level}, cluster_id={self.cluster_id}, login_cookie={self.login_cookie}, login_type={self.login_type}, category={self.category}, output_schema={self.output_schema}' + base_str = f'Task(task_id={self.task_id}, confirmed_task={self.confirmed_task}, website={self.website}, reference_length={self.reference_length}, level={self.level}, cluster_id={self.cluster_id}, login_cookie={self.login_cookie}, login_type={self.login_type}, category={self.category}, output_schema={self.output_schema}, auth_keys={self.auth_keys}' if self.additional_fields: additional_str = ', '.join(f'{k}={v}' for k, v in self.additional_fields.items()) base_str += f', {additional_str}' @@ -1603,6 +1605,7 @@ async def run_task_with_semaphore( headless: bool, use_vision: bool, semaphore_runs: asyncio.Semaphore, # Pass semaphore as argument + auth_distribution: dict | None = None, # Pre-fetched auth distribution github_workflow_url: str | None = None, use_serp: bool = False, enable_memory: bool = False, @@ -1727,6 +1730,38 @@ async def run_task_with_semaphore( convex_url, secret_key, run_id, task.task_id, 'run_agent', 'active', github_workflow_url ) + # Handle auth information if task requires it + task_with_auth = task + if hasattr(task, 'auth_keys') and task.auth_keys: + # Validate auth_keys is a list + if isinstance(task.auth_keys, list) and len(task.auth_keys) > 0: + if auth_distribution: + logger.info( + f'Task {task.task_id}: Using pre-fetched auth distribution for auth_keys: {task.auth_keys}' + ) + auth_info_text = format_auth_info_for_agent(auth_distribution, task.auth_keys) + if auth_info_text: + # Create a modified task with auth info appended + class TaskWithAuth: + def __init__(self, original_task, auth_text): + # Copy all attributes from original task + for attr_name in dir(original_task): + if not attr_name.startswith('__'): + setattr(self, attr_name, getattr(original_task, attr_name)) + # Modify the confirmed_task to include auth info + self.confirmed_task = original_task.confirmed_task + auth_text + + task_with_auth = TaskWithAuth(task, auth_info_text) + logger.info(f'Task {task.task_id}: Auth info added to task description') + else: + logger.warning( + f'Task {task.task_id}: No matching auth info found for keys: {task.auth_keys}' + ) + else: + logger.warning(f'Task {task.task_id}: Auth keys specified but no auth distribution available') + else: + logger.warning(f'Task {task.task_id}: auth_keys is not a valid list: {task.auth_keys}') + # Start timing for agent execution only agent_start_time = time.time() @@ -1734,7 +1769,7 @@ async def run_task_with_semaphore( Stage.RUN_AGENT, lambda: run_agent_with_browser( browser_session, - task, + task_with_auth, llm, max_steps_per_task, use_vision, @@ -1987,6 +2022,7 @@ async def run_multiple_tasks( convex_url: str, secret_key: str, eval_model: BaseChatModel, + auth_distribution: dict | None = None, github_workflow_url: str | None = None, max_parallel_runs: int = 3, max_steps_per_task: int = 25, @@ -2072,6 +2108,7 @@ async def run_multiple_tasks( headless=headless, use_vision=use_vision, semaphore_runs=semaphore_runs, # Pass the semaphore + auth_distribution=auth_distribution, # Pass the pre-fetched auth distribution github_workflow_url=github_workflow_url, use_serp=use_serp, enable_memory=enable_memory, @@ -2195,6 +2232,109 @@ def fetch_tasks_from_server(convex_url: str, secret_key: str, test_case_name: st return None +# Helper function to fetch auth distribution from the server +def fetch_auth_distribution_from_server(convex_url: str, secret_key: str): + """Fetches an available auth distribution from the Convex HTTP endpoint.""" + + if not convex_url: + logger.error('Error: EVALUATION_TOOL_URL environment variable not set.') + return None + + if not secret_key: + logger.error('Error: EVALUATION_TOOL_SECRET_KEY environment variable not set.') + return None + + endpoint_url = f'{convex_url}/api/getAuthDistribution' + headers = { + 'Authorization': f'Bearer {secret_key}', + 'Content-Type': 'application/json', + } + + logger.info(f'Fetching auth distribution from {endpoint_url}...') + + try: + response = requests.post(endpoint_url, headers=headers, json={}) + + logger.info(f'Fetch Auth Distribution Status Code: {response.status_code}') + + if response.status_code == 200: + try: + data = response.json() + logger.info('Successfully fetched auth distribution data.') + # Verify the response has the expected structure + if isinstance(data, dict) and 'id' in data and 'loginInfo' in data: + return data + else: + logger.error( + f'Error: Fetched auth distribution data has unexpected structure. Keys: {list(data.keys()) if isinstance(data, dict) else "Not a dict"}' + ) + logger.error(f'Raw response: {response.text}') + return None + + except json.JSONDecodeError: + logger.error('Error: Failed to decode JSON response for auth distribution.') + logger.error(f'Raw response text: {response.text}') + return None + elif response.status_code == 404: + logger.warning('No available auth distribution found on server.') + return None + else: + logger.error(f'Error: Failed to fetch auth distribution. Status: {response.status_code}') + logger.error(f'Response: {response.text}') + return None + + except requests.exceptions.RequestException as e: + logger.error(f'Error during request to fetch auth distribution: {type(e).__name__}: {e}') + return None + + +# Helper function to format auth information for the agent +def format_auth_info_for_agent(auth_distribution: dict, auth_keys: list[str]) -> str: + """ + Formats auth information from auth distribution for the agent task description. + + Args: + auth_distribution: Dict with 'loginInfo' key containing auth data + auth_keys: List of auth keys to extract (e.g., ['google', 'facebook']) + + Returns: + Formatted string with login credentials or empty string if no matching keys + """ + if not auth_distribution or not auth_keys: + return '' + + login_info = auth_distribution.get('loginInfo', {}) + if not login_info: + logger.warning('Auth distribution has no loginInfo') + return '' + + # Extract relevant auth information based on auth_keys + relevant_auths = [] + for auth_key in auth_keys: + if auth_key in login_info: + auth_data = login_info[auth_key] + if isinstance(auth_data, dict): + # Format the auth data for this key + auth_details = [] + for key, value in auth_data.items(): + auth_details.append(f'{key}: {value}') + + if auth_details: + relevant_auths.append(f'{auth_key} with {", ".join(auth_details)}') + else: + logger.warning(f"Auth data for key '{auth_key}' is not a dictionary: {type(auth_data)}") + else: + logger.warning(f"Auth key '{auth_key}' not found in available login info. Available keys: {list(login_info.keys())}") + + if relevant_auths: + auth_text = f'\n\nThe following login credentials can be used to complete this task: {"; ".join(relevant_auths)}.' + logger.info(f'Formatted auth info: {auth_text}') + return auth_text + else: + logger.warning(f'No matching auth keys found. Requested: {auth_keys}, Available: {list(login_info.keys())}') + return '' + + # Helper function to get git information def get_git_info(): """Retrieves git branch, commit hash, commit timestamp, and repository URL using subprocess.""" @@ -2424,6 +2564,7 @@ async def run_evaluation_pipeline( convex_url: str, secret_key: str, eval_model: BaseChatModel, + auth_distribution: dict | None = None, github_workflow_url: str | None = None, max_parallel_runs: int = 3, max_steps_per_task: int = 25, @@ -2477,6 +2618,7 @@ async def run_evaluation_pipeline( convex_url=convex_url, secret_key=secret_key, eval_model=eval_model, + auth_distribution=auth_distribution, github_workflow_url=github_workflow_url, max_parallel_runs=max_parallel_runs, max_steps_per_task=max_steps_per_task, @@ -2585,6 +2727,7 @@ if __name__ == '__main__': # --- Load Tasks (Either Single Task or from Server) --- tasks = [] task_id = None # Initialize for proper scoping + auth_distribution = None # Initialize auth distribution # Check if this is single task mode if args.task_text: @@ -2619,10 +2762,28 @@ if __name__ == '__main__': logger.info(f'Successfully loaded {len(tasks)} tasks from the server.') except (TypeError, ValueError) as e: logger.error( - f'Error creating Task objects from fetched data. Ensure the data structure includes required fields (task_id, confirmed_task). Known optional fields: website, reference_length, level, cluster_id, login_cookie, login_type, category. Any additional fields will be accepted dynamically. Error: {type(e).__name__}: {e}' + f'Error creating Task objects from fetched data. Ensure the data structure includes required fields (task_id, confirmed_task). Known optional fields: website, reference_length, level, cluster_id, login_cookie, login_type, category, auth_keys. Any additional fields will be accepted dynamically. Error: {type(e).__name__}: {e}' ) logger.error(f'First item in fetched data: {fetched_task_data[0] if fetched_task_data else "None"}') exit(1) + + # --- Fetch Auth Distribution Once (if any tasks need auth) --- + tasks_with_auth = [ + task + for task in tasks + if hasattr(task, 'auth_keys') and task.auth_keys and isinstance(task.auth_keys, list) and len(task.auth_keys) > 0 + ] + if tasks_with_auth and CONVEX_URL and SECRET_KEY: + logger.info(f'Found {len(tasks_with_auth)} tasks requiring auth. Fetching auth distribution...') + auth_distribution = fetch_auth_distribution_from_server(CONVEX_URL, SECRET_KEY) + if auth_distribution: + logger.info( + f'Successfully fetched auth distribution with login info for: {list(auth_distribution.get("loginInfo", {}).keys())}' + ) + else: + logger.warning('Failed to fetch auth distribution. Tasks requiring auth may fail.') + elif tasks_with_auth: + logger.warning(f'Found {len(tasks_with_auth)} tasks requiring auth but no server config available') # ----------------------------- # --- Start Run on Server (with optional existing Run ID) --- @@ -2788,6 +2949,7 @@ if __name__ == '__main__': convex_url=convex_url, secret_key=secret_key, eval_model=eval_model, + auth_distribution=auth_distribution, github_workflow_url=args.github_workflow_url, max_parallel_runs=parallel_runs, max_steps_per_task=args.max_steps,