import json import logging import subprocess import time from typing import Any from pydantic import BaseModel logger = logging.getLogger(__name__) def create_pydantic_model_from_schema(original_schema: dict | str, model_name: str = 'DynamicModel') -> type[BaseModel]: """ Convert JSON schema to Pydantic model class using datamodel-code-generator. Args: schema: JSON schema dictionary model_name: Name for the generated model class Returns: Pydantic model class that can be used with Controller(output_model=...) Example: schema = { "type": "object", "properties": { "name": {"type": "string"}, "age": {"type": "integer"}, "email": {"type": "string"} }, "required": ["name", "age"] } PersonModel = create_pydantic_model_from_schema(schema, "Person") controller = Controller(output_model=PersonModel) """ try: import importlib.util import tempfile from pathlib import Path from datamodel_code_generator import DataModelType, generate # type: ignore[import-untyped] # Handle case where schema might be a string (JSON) if isinstance(original_schema, str): schema: dict = json.loads(original_schema) else: schema = original_schema logger.debug(f'Creating Pydantic model from schema: {schema}') # Initialize paths for cleanup schema_path = None model_path = None try: # Create temporary files for input schema and output model with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as schema_file: json.dump(schema, schema_file, indent=2) schema_path = Path(schema_file.name) with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as model_file: model_path = Path(model_file.name) # Generate Pydantic model code using datamodel-code-generator generate( input_=schema_path, output=model_path, output_model_type=DataModelType.PydanticV2BaseModel, class_name=model_name, ) # Read the generated Python code generated_code = model_path.read_text() logger.debug(f'Generated Pydantic model code:\n{generated_code}') # Create a module and execute the generated code spec = importlib.util.spec_from_loader(f'dynamic_model_{model_name}', loader=None) if spec is None: raise ValueError('Failed to create module spec') module = importlib.util.module_from_spec(spec) # Add necessary imports to the module namespace before executing from typing import Any, Optional, Union from pydantic import BaseModel, Field module.__dict__.update( { 'Optional': Optional, 'Union': Union, 'list': list, 'dict': dict, 'Any': Any, 'BaseModel': BaseModel, 'Field': Field, 'str': str, 'int': int, 'float': float, 'bool': bool, } ) # Execute the generated code in the module's namespace exec(generated_code, module.__dict__) # Get the generated model class if hasattr(module, model_name): model_class = getattr(module, model_name) # Rebuild the model to resolve forward references and type annotations # Pass the module's namespace so it can resolve imports like Optional model_class.model_rebuild(_types_namespace=module.__dict__) logger.debug(f'Successfully created Pydantic model: {model_class}') return model_class else: # Fallback: look for any BaseModel subclass in the module for attr_name in dir(module): attr = getattr(module, attr_name) if isinstance(attr, type) and issubclass(attr, BaseModel) and attr != BaseModel: # Rebuild the model to resolve forward references and type annotations # Pass the module's namespace so it can resolve imports like Optional attr.model_rebuild(_types_namespace=module.__dict__) logger.debug(f'Using fallback model class: {attr}') return attr raise ValueError('No Pydantic model class found in generated code') finally: # Clean up temporary files safely if schema_path is not None: try: schema_path.unlink() except Exception as cleanup_error: logger.warning(f'Failed to cleanup schema file: {cleanup_error}') if model_path is not None: try: model_path.unlink() except Exception as cleanup_error: logger.warning(f'Failed to cleanup model file: {cleanup_error}') except ImportError as e: logger.error(f'datamodel-code-generator not available: {e}') logger.error('Falling back to basic schema conversion') try: # Fallback to basic implementation if datamodel-code-generator is not available from typing import Any, Optional from pydantic import create_model def json_type_to_python_type(json_type): """Map JSON schema types to Python types""" # Handle union types (arrays of types) if isinstance(json_type, list): # Handle union types like ["string", "null"] types = [] for t in json_type: if t == 'null': continue # We'll handle null separately types.append(json_type_to_python_type(t)) if len(types) == 0: return Any elif len(types) == 1: return types[0] else: # len(types) >= 2 - create Union dynamically from typing import Union # For 2 types, use Union[type1, type2] if len(types) == 2: return Union[types[0], types[1]] # For more types, we need to use a different approach # Since Union doesn't support unpacking, we'll just use the first type # This is a limitation of the fallback implementation return types[0] # Handle single types if json_type == 'string': return str elif json_type == 'integer': return int elif json_type == 'number': return float elif json_type == 'boolean': return bool elif json_type == 'array': return list[Any] elif json_type == 'object': return dict[str, Any] else: return Any # Handle case where schema might be a string (JSON) if isinstance(schema, str): schema = json.loads(schema) # Extract properties and required fields from schema properties = schema.get('properties', {}) required_fields = schema.get('required', []) # Build field definitions for create_model field_definitions = {} for field_name, field_schema in properties.items(): json_type = field_schema.get('type') field_type = json_type_to_python_type(json_type) # Check if the field allows null (either not required or explicitly allows null) allows_null = field_name not in required_fields or (isinstance(json_type, list) and 'null' in json_type) # Handle required vs optional fields if field_name in required_fields and not allows_null: field_definitions[field_name] = (field_type, ...) # Required field else: optional_type = Optional[field_type] # Use Optional instead of Union[T, None] field_definitions[field_name] = (optional_type, None) # Optional field with default None # Create the dynamic model using create_model return create_model(model_name, **field_definitions) except Exception as fallback_error: logger.error(f'Fallback schema conversion also failed: {fallback_error}') raise ValueError(f'Both primary and fallback schema conversion failed: {fallback_error}') from fallback_error except Exception as e: logger.error(f'Failed to create Pydantic model from schema: {e}') logger.error(f'Schema: {schema}') raise ValueError(f'Invalid JSON schema: {e}') from e def make_json_serializable(obj: Any) -> Any: """ Convert objects to JSON-serializable types. Handles common non-serializable types like enums, custom objects, etc. """ if obj is None: return None elif isinstance(obj, (str, int, float, bool)): return obj elif isinstance(obj, dict): return {str(k): make_json_serializable(v) for k, v in obj.items()} elif isinstance(obj, (list, tuple)): return [make_json_serializable(item) for item in obj] elif hasattr(obj, 'value'): # Handle enums return obj.value elif hasattr(obj, '__dict__'): # Handle custom objects return str(obj) else: return str(obj) def get_git_info(): """Retrieves git branch, commit hash, commit timestamp, and repository URL using subprocess.""" try: branch = subprocess.run( ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], capture_output=True, text=True, check=True ).stdout.strip() commit_hash = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True, text=True, check=True).stdout.strip() # Get commit timestamp as Unix epoch integer commit_timestamp_str = subprocess.run( ['git', 'log', '-1', '--format=%ct'], capture_output=True, text=True, check=True ).stdout.strip() commit_timestamp = int(commit_timestamp_str) # Get repository URL repo_url = subprocess.run( ['git', 'config', '--get', 'remote.origin.url'], capture_output=True, text=True, check=True ).stdout.strip() return {'branch': branch, 'hash': commit_hash, 'timestamp': commit_timestamp, 'repo': repo_url} except (subprocess.CalledProcessError, FileNotFoundError, ValueError) as e: logger.warning(f'Could not retrieve git info: {type(e).__name__}: {e}. Using defaults.') return { 'branch': 'unknown', 'hash': 'unknown', 'timestamp': int(time.time()), # Fallback to current time 'repo': 'unknown', }