Source code for controllers.base_controller

"""
Base Controller Interface

Abstract base class for model deployment controllers.
Supports multiple deployment modes (OME/Kubernetes, Docker, etc.)
"""

from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from .utils import parse_parallel_config
from utils.gpu_monitor import get_gpu_monitor


[docs] class BaseModelController(ABC): """Abstract base class for model deployment controllers."""
[docs] @abstractmethod def deploy_inference_service( self, task_name: str, experiment_id: int, namespace: str, model_name: str, runtime_name: str, parameters: Dict[str, Any], ) -> Optional[str]: """Deploy a model inference service with specified parameters. Args: task_name: Autotuning task name experiment_id: Unique experiment identifier namespace: Namespace/resource group identifier model_name: Model name/path runtime_name: Runtime identifier (e.g., 'sglang') parameters: Deployment parameters (tp_size, mem_frac, etc.) Returns: Service identifier (name/ID) if successful, None otherwise """ pass
def _get_parallel_config(self, parameters: Dict[str, Any]) -> Dict[str, int]: """ Get parallel configuration from parameters using shared utility. This is a convenience wrapper for the parse_parallel_config utility that can be overridden by subclasses if needed. Args: parameters: Runtime parameters dictionary Returns: Dictionary with normalized keys: {tp, pp, dp, cp, dcp, world_size} """ return parse_parallel_config(parameters) def _select_gpus_intelligent(self, num_gpus: int, min_memory_mb: int = 8000, log_prefix: str = "") -> Optional[Dict[str, Any]]: """ Intelligent GPU selection with fallback logic. This method provides a consistent GPU selection strategy across controllers: 1. Try intelligent allocation with memory constraint 2. Retry without memory constraint if needed 3. Fall back to simple sequential allocation if all else fails Args: num_gpus: Number of GPUs required min_memory_mb: Minimum memory per GPU (default: 8000 MB) log_prefix: Prefix for log messages (e.g., "[Docker]", "[Local]") Returns: Dict with 'device_ids' (list of GPU IDs), 'gpu_model' (str), and 'gpu_info' (detailed info), or None if not enough GPUs """ gpu_monitor = get_gpu_monitor() if not gpu_monitor.is_available(): print(f"{log_prefix} nvidia-smi not available. Using fallback GPU allocation.") # Fallback: use first N GPUs return { "device_ids": [str(i) for i in range(num_gpus)], "gpu_model": "Unknown", "gpu_info": { "count": num_gpus, "indices": list(range(num_gpus)), "allocation_method": "fallback" } } # Use intelligent GPU allocation try: allocated_gpus, success = gpu_monitor.allocate_gpus( count=num_gpus, min_memory_mb=min_memory_mb ) if not success or len(allocated_gpus) < num_gpus: print(f"{log_prefix} Could not allocate {num_gpus} GPU(s) with {min_memory_mb}MB free memory") print(f"{log_prefix} Available GPUs: {len(allocated_gpus)}") # Try without memory constraint print(f"{log_prefix} Retrying allocation without memory constraint...") allocated_gpus, success = gpu_monitor.allocate_gpus(count=num_gpus, min_memory_mb=None) if not success: print(f"{log_prefix} Failed to allocate any GPUs") return None # Get detailed GPU info for allocated devices snapshot = gpu_monitor.query_gpus(use_cache=False) if not snapshot: print(f"{log_prefix} Failed to query GPU information") return None gpu_details = [] gpu_model = None for gpu in snapshot.gpus: if gpu.index in allocated_gpus: gpu_details.append({ "index": gpu.index, "name": gpu.name, "uuid": gpu.uuid, "memory_total_mb": gpu.memory_total_mb, "memory_free_mb": gpu.memory_free_mb, "memory_usage_percent": gpu.memory_usage_percent, "utilization_percent": gpu.utilization_percent, "temperature_c": gpu.temperature_c, "power_draw_w": gpu.power_draw_w, "availability_score": gpu.score }) if gpu_model is None: gpu_model = gpu.name print(f"{log_prefix} Selected GPUs: {allocated_gpus}") print(f"{log_prefix} GPU Model: {gpu_model}") for detail in gpu_details: print(f"{log_prefix} GPU {detail['index']}: {detail['memory_free_mb']}/{detail['memory_total_mb']}MB free, " f"{detail['utilization_percent']}% utilized, Score: {detail['availability_score']:.2f}") return { "device_ids": [str(idx) for idx in allocated_gpus], "gpu_model": gpu_model or "Unknown", "gpu_info": { "count": len(allocated_gpus), "indices": allocated_gpus, "allocation_method": "intelligent", "details": gpu_details, "allocated_at": snapshot.timestamp.isoformat() } } except Exception as e: print(f"{log_prefix} Error in intelligent GPU selection: {e}") print(f"{log_prefix} Falling back to simple allocation") # Final fallback: simple first-N allocation return { "device_ids": [str(i) for i in range(num_gpus)], "gpu_model": "Unknown", "gpu_info": { "count": num_gpus, "indices": list(range(num_gpus)), "allocation_method": "fallback_error" } }
[docs] @abstractmethod def wait_for_ready(self, service_id: str, namespace: str, timeout: int = 600, poll_interval: int = 10) -> bool: """Wait for the inference service to become ready. Args: service_id: Service identifier returned by deploy_inference_service namespace: Namespace/resource group identifier timeout: Maximum wait time in seconds poll_interval: Polling interval in seconds Returns: True if service is ready, False if timeout or error """ pass
[docs] @abstractmethod def delete_inference_service(self, service_id: str, namespace: str) -> bool: """Delete an inference service. Args: service_id: Service identifier namespace: Namespace/resource group identifier Returns: True if deleted successfully """ pass
[docs] @abstractmethod def get_service_url(self, service_id: str, namespace: str) -> Optional[str]: """Get the service URL/endpoint for the inference service. Args: service_id: Service identifier namespace: Namespace/resource group identifier Returns: Service URL/endpoint if available, None otherwise """ pass