Source code for controllers.ome_controller

"""
OME Deployment Controller

Manages the lifecycle of InferenceService resources for autotuning experiments.
"""

import time
import yaml
import sys
from pathlib import Path
from typing import Dict, Any, Optional
from jinja2 import Template
from kubernetes import client, config
from kubernetes.client.rest import ApiException

from .base_controller import BaseModelController
from .utils import sanitize_dns_name

# Import GPU discovery utility with absolute import
# Add parent directory to path if needed
if str(Path(__file__).parent.parent) not in sys.path:
    sys.path.insert(0, str(Path(__file__).parent.parent))

from utils.gpu_discovery import find_best_node_for_deployment


[docs] class OMEController(BaseModelController): """Controller for managing OME InferenceService deployments."""
[docs] def __init__(self, kubeconfig_path: Optional[str] = None): """Initialize the OME controller. Args: kubeconfig_path: Path to kubeconfig file. If None, uses in-cluster config. """ try: if kubeconfig_path: config.load_kube_config(config_file=kubeconfig_path) else: config.load_kube_config() except Exception: # Fallback to in-cluster config config.load_incluster_config() self.api_client = client.ApiClient() self.custom_api = client.CustomObjectsApi(self.api_client) self.core_api = client.CoreV1Api(self.api_client) # Load templates template_dir = Path(__file__).parent.parent / "templates" with open(template_dir / "inference_service.yaml.j2") as f: self.isvc_template = Template(f.read()) with open(template_dir / "clusterbasemodel.yaml.j2") as f: self.cbm_template = Template(f.read()) with open(template_dir / "clusterservingruntime.yaml.j2") as f: self.csr_template = Template(f.read())
[docs] def create_namespace(self, namespace: str) -> bool: """Create namespace if it doesn't exist. Args: namespace: Namespace name Returns: True if created or already exists """ try: self.core_api.read_namespace(namespace) print(f"Namespace '{namespace}' already exists") return True except ApiException as e: if e.status == 404: # Create namespace ns_body = client.V1Namespace(metadata=client.V1ObjectMeta(name=namespace)) self.core_api.create_namespace(ns_body) print(f"Created namespace '{namespace}'") return True else: print(f"Error checking namespace: {e}") return False
[docs] def deploy_inference_service( self, task_name: str, experiment_id: int, namespace: str, model_name: str, runtime_name: str, parameters: Dict[str, Any], storage: Optional[Dict[str, Any]] = None, enable_gpu_selection: bool = True, ) -> Optional[str]: """Deploy an InferenceService with specified parameters. Args: task_name: Autotuning task name experiment_id: Unique experiment identifier (will be converted to string internally) namespace: K8s namespace model_name: Model name runtime_name: ServingRuntime name parameters: SGLang parameters (tp_size, mem_frac, etc.) storage: Optional storage configuration for PVC support { 'type': 'pvc', 'pvc_name': 'model-storage-pvc', 'pvc_subpath': 'meta/llama-3-2-1b-instruct', 'mount_path': '/raid/models/meta/llama-3-2-1b-instruct' } enable_gpu_selection: If True, intelligently select node with idle GPUs (default: True) Returns: InferenceService name if successful, None otherwise """ # Convert experiment_id to string for Kubernetes resource naming experiment_id_str = str(experiment_id) # Sanitize task_name to be DNS-1123 compliant safe_task_name = sanitize_dns_name(task_name) isvc_name = f"{safe_task_name}-exp{experiment_id_str}" # Sanitize model_name to match ClusterBaseModel naming convention # Convert "meta-llama/Llama-3.2-3B-Instruct" to "llama-3-2-3b-instruct" # Strip namespace prefix (everything before and including the slash) first model_basename = model_name.split('/')[-1] if '/' in model_name else model_name safe_model_name = sanitize_dns_name(model_basename) # Determine required GPUs from parameters required_gpus = parameters.get('tpsize', parameters.get('tp_size', parameters.get('tp-size', 1))) # Find best node for deployment if enabled selected_node = None if enable_gpu_selection: print(f"\n=== GPU Node Selection ===") print(f"Looking for node with {required_gpus} idle GPU(s)...") selected_node = find_best_node_for_deployment(required_gpus=required_gpus) if selected_node: print(f"✓ Selected node: {selected_node}") else: print("⚠ No specific node selected (will use Kubernetes scheduler)") print("=" * 26 + "\n") # Render template rendered = self.isvc_template.render( namespace=namespace, isvc_name=isvc_name, task_name=task_name, experiment_id=experiment_id_str, model_name=safe_model_name, runtime_name=runtime_name, params=parameters, storage=storage, selected_node=selected_node, # Pass to template ) # Parse YAML (contains namespace + InferenceService) resources = list(yaml.safe_load_all(rendered)) # Create namespace self.create_namespace(namespace) # Create InferenceService try: isvc_resource = resources[1] # Second resource is the InferenceService self.custom_api.create_namespaced_custom_object( group="ome.io", version="v1beta1", namespace=namespace, plural="inferenceservices", body=isvc_resource, ) node_info = f" on node '{selected_node}'" if selected_node else "" print(f"Created InferenceService '{isvc_name}' in namespace '{namespace}'{node_info}") return isvc_name except ApiException as e: print(f"Error creating InferenceService: {e}") return None
[docs] def wait_for_ready(self, isvc_name: str, namespace: str, timeout: int = 600, poll_interval: int = 10) -> bool: """Wait for InferenceService to become ready. Args: isvc_name: InferenceService name namespace: K8s namespace timeout: Maximum wait time in seconds poll_interval: Polling interval in seconds Returns: True if ready, False if timeout or error """ start_time = time.time() while time.time() - start_time < timeout: try: isvc = self.custom_api.get_namespaced_custom_object( group="ome.io", version="v1beta1", namespace=namespace, plural="inferenceservices", name=isvc_name, ) # Check status conditions status = isvc.get("status", {}) conditions = status.get("conditions", []) for condition in conditions: if condition.get("type") == "Ready": if condition.get("status") == "True": url = status.get("url", "N/A") print(f"InferenceService '{isvc_name}' is ready! URL: {url}") return True elif condition.get("status") == "False": reason = condition.get("reason", "Unknown") message = condition.get("message", "No details") print(f"InferenceService not ready - {reason}: {message}") elapsed = int(time.time() - start_time) print(f"Waiting for InferenceService '{isvc_name}' to be ready... ({elapsed}s)") time.sleep(poll_interval) except ApiException as e: print(f"Error checking InferenceService status: {e}") time.sleep(poll_interval) print(f"Timeout waiting for InferenceService '{isvc_name}' to be ready") return False
[docs] def delete_inference_service(self, isvc_name: str, namespace: str) -> bool: """Delete an InferenceService. Args: isvc_name: InferenceService name namespace: K8s namespace Returns: True if deleted successfully """ try: self.custom_api.delete_namespaced_custom_object( group="ome.io", version="v1beta1", namespace=namespace, plural="inferenceservices", name=isvc_name, ) print(f"Deleted InferenceService '{isvc_name}' from namespace '{namespace}'") return True except ApiException as e: if e.status == 404: print(f"InferenceService '{isvc_name}' not found (already deleted?)") return True else: print(f"Error deleting InferenceService: {e}") return False
[docs] def get_service_url(self, isvc_name: str, namespace: str) -> Optional[str]: """Get the service URL for an InferenceService. Args: isvc_name: InferenceService name namespace: K8s namespace Returns: Service URL if available, None otherwise """ try: isvc = self.custom_api.get_namespaced_custom_object( group="ome.io", version="v1beta1", namespace=namespace, plural="inferenceservices", name=isvc_name, ) return isvc.get("status", {}).get("url") except ApiException as e: print(f"Error getting service URL: {e}") return None
# ClusterBaseModel Management
[docs] def ensure_clusterbasemodel( self, name: str, spec: Dict[str, Any], labels: Optional[Dict[str, str]] = None, annotations: Optional[Dict[str, str]] = None ) -> bool: """Ensure ClusterBaseModel exists, create if missing. Args: name: ClusterBaseModel name spec: ClusterBaseModel specification labels: Optional labels to add annotations: Optional annotations to add Returns: True if exists or created successfully, False otherwise """ try: # Check if ClusterBaseModel already exists self.custom_api.get_cluster_custom_object( group="ome.io", version="v1beta1", plural="clusterbasemodels", name=name ) print(f"ClusterBaseModel '{name}' already exists") return True except ApiException as e: if e.status == 404: # Create ClusterBaseModel return self._create_clusterbasemodel(name, spec, labels, annotations) else: print(f"Error checking ClusterBaseModel '{name}': {e}") return False
def _create_clusterbasemodel( self, name: str, spec: Dict[str, Any], labels: Optional[Dict[str, str]] = None, annotations: Optional[Dict[str, str]] = None ) -> bool: """Create ClusterBaseModel from spec. Args: name: ClusterBaseModel name spec: ClusterBaseModel specification labels: Optional labels to add annotations: Optional annotations to add Returns: True if created successfully, False otherwise """ try: # Render template rendered = self.cbm_template.render( name=name, spec=spec, labels=labels or {}, annotations=annotations or {} ) # Parse YAML and create resource resource = yaml.safe_load(rendered) self.custom_api.create_cluster_custom_object( group="ome.io", version="v1beta1", plural="clusterbasemodels", body=resource ) print(f"Created ClusterBaseModel '{name}'") return True except ApiException as e: print(f"Error creating ClusterBaseModel '{name}': {e}") return False
[docs] def list_clusterbasemodels(self) -> Optional[Dict[str, Any]]: """List all ClusterBaseModels in the cluster. Returns: List of ClusterBaseModels or None on error """ try: result = self.custom_api.list_cluster_custom_object( group="ome.io", version="v1beta1", plural="clusterbasemodels" ) return result except ApiException as e: print(f"Error listing ClusterBaseModels: {e}") return None
# ClusterServingRuntime Management
[docs] def ensure_clusterservingruntime( self, name: str, spec: Dict[str, Any], labels: Optional[Dict[str, str]] = None, annotations: Optional[Dict[str, str]] = None ) -> bool: """Ensure ClusterServingRuntime exists, create if missing. Args: name: ClusterServingRuntime name spec: ClusterServingRuntime specification labels: Optional labels to add annotations: Optional annotations to add Returns: True if exists or created successfully, False otherwise """ try: # Check if ClusterServingRuntime already exists self.custom_api.get_cluster_custom_object( group="ome.io", version="v1beta1", plural="clusterservingruntimes", name=name ) print(f"ClusterServingRuntime '{name}' already exists") return True except ApiException as e: if e.status == 404: # Create ClusterServingRuntime return self._create_clusterservingruntime(name, spec, labels, annotations) else: print(f"Error checking ClusterServingRuntime '{name}': {e}") return False
def _create_clusterservingruntime( self, name: str, spec: Dict[str, Any], labels: Optional[Dict[str, str]] = None, annotations: Optional[Dict[str, str]] = None ) -> bool: """Create ClusterServingRuntime from spec. Args: name: ClusterServingRuntime name spec: ClusterServingRuntime specification labels: Optional labels to add annotations: Optional annotations to add Returns: True if created successfully, False otherwise """ try: # Render template rendered = self.csr_template.render( name=name, spec=spec, labels=labels or {}, annotations=annotations or {} ) # Parse YAML and create resource resource = yaml.safe_load(rendered) self.custom_api.create_cluster_custom_object( group="ome.io", version="v1beta1", plural="clusterservingruntimes", body=resource ) print(f"Created ClusterServingRuntime '{name}'") return True except ApiException as e: print(f"Error creating ClusterServingRuntime '{name}': {e}") return False
[docs] def list_clusterservingruntimes(self) -> Optional[Dict[str, Any]]: """List all ClusterServingRuntimes in the cluster. Returns: List of ClusterServingRuntimes or None on error """ try: result = self.custom_api.list_cluster_custom_object( group="ome.io", version="v1beta1", plural="clusterservingruntimes" ) return result except ApiException as e: print(f"Error listing ClusterServingRuntimes: {e}") return None