"""
Autotuner Orchestrator
Main orchestration logic for running parameter tuning experiments.
Coordinates deployment controllers and benchmark execution.
"""
import sys
import time
from pathlib import Path
from typing import Dict, Any, List
# Add src to path for relative imports
sys.path.insert(0, str(Path(__file__).parent))
# Lazy imports - only import controllers when needed to avoid dependency issues
# This allows running in local mode without docker/kubernetes dependencies
def _import_ome_controller():
from controllers.ome_controller import OMEController
return OMEController
def _import_docker_controller():
from controllers.docker_controller import DockerController
return DockerController
def _import_local_controller():
from controllers.local_controller import LocalController
return LocalController
def _import_benchmark_controller():
from controllers.benchmark_controller import BenchmarkController
return BenchmarkController
def _import_direct_benchmark_controller():
from controllers.direct_benchmark_controller import DirectBenchmarkController
return DirectBenchmarkController
from utils.optimizer import generate_parameter_grid, calculate_objective_score, create_optimization_strategy
from utils.quantization_integration import prepare_runtime_parameters
from config import clusterbasemodel_presets, clusterservingruntime_presets
[docs]
class AutotunerOrchestrator:
"""Main orchestrator for the autotuning process."""
[docs]
def __init__(
self,
deployment_mode: str = "ome",
kubeconfig_path: str = None,
use_direct_benchmark: bool = False,
docker_model_path: str = "/mnt/data/models",
verbose: bool = False,
http_proxy: str = "",
https_proxy: str = "",
no_proxy: str = "",
hf_token: str = "",
):
"""Initialize the orchestrator.
Args:
deployment_mode: Deployment mode - 'ome' (Kubernetes), 'docker' (standalone), or 'local' (subprocess)
kubeconfig_path: Path to kubeconfig file (for OME mode)
use_direct_benchmark: If True, use direct genai-bench CLI instead of K8s BenchmarkJob
docker_model_path: Base path for models in Docker/Local mode
verbose: If True, stream genai-bench output in real-time
http_proxy: HTTP proxy URL for containers (optional)
https_proxy: HTTPS proxy URL for containers (optional)
no_proxy: Comma-separated list of hosts to bypass proxy (optional)
"""
self.deployment_mode = deployment_mode.lower()
self.use_direct_benchmark = use_direct_benchmark
# Initialize model deployment controller based on mode
if self.deployment_mode == "local":
print("[Config] Deployment mode: Local subprocess")
# Python path will be set dynamically based on runtime in run_experiment
# Store paths for different runtimes
self._runtime_python_paths = {
"sglang": str(Path.home() / "work" / "sglang" / ".venv" / "bin" / "python"),
"vllm": str(Path.home() / "work" / "vllm" / ".venv" / "bin" / "python"),
}
# Default to sglang python for initial controller setup
default_python = self._runtime_python_paths.get("sglang", "python3")
if not Path(default_python).exists():
default_python = "python3"
LocalController = _import_local_controller()
DirectBenchmarkController = _import_direct_benchmark_controller()
self.model_controller = LocalController(
model_base_path=docker_model_path,
python_path=default_python,
http_proxy=http_proxy,
https_proxy=https_proxy,
no_proxy=no_proxy,
hf_token=hf_token
)
# Local mode always uses direct benchmark
self.use_direct_benchmark = True
self.benchmark_controller = DirectBenchmarkController(verbose=verbose)
print("[Config] Benchmark mode: Direct CLI (automatic for Local mode)")
elif self.deployment_mode == "docker":
print("[Config] Deployment mode: Standalone Docker")
DockerController = _import_docker_controller()
DirectBenchmarkController = _import_direct_benchmark_controller()
self.model_controller = DockerController(
model_base_path=docker_model_path,
http_proxy=http_proxy,
https_proxy=https_proxy,
no_proxy=no_proxy,
hf_token=hf_token
)
# Docker mode always uses direct benchmark (no K8s)
self.use_direct_benchmark = True
self.benchmark_controller = DirectBenchmarkController(verbose=verbose)
print("[Config] Benchmark mode: Direct CLI (automatic for Docker mode)")
print("[Config] Containers will be auto-removed after stop")
elif self.deployment_mode == "ome":
print("[Config] Deployment mode: OME (Kubernetes)")
OMEController = _import_ome_controller()
self.model_controller = OMEController(kubeconfig_path)
if use_direct_benchmark:
DirectBenchmarkController = _import_direct_benchmark_controller()
self.benchmark_controller = DirectBenchmarkController(verbose=verbose)
print("[Config] Benchmark mode: Direct genai-bench CLI")
else:
BenchmarkController = _import_benchmark_controller()
self.benchmark_controller = BenchmarkController(kubeconfig_path)
print("[Config] Benchmark mode: Kubernetes BenchmarkJob CRD")
else:
raise ValueError(f"Unknown deployment mode: {deployment_mode}. Use 'ome', 'docker', or 'local'")
self.results = []
[docs]
def run_experiment(self, task: Dict[str, Any], experiment_id: int, parameters: Dict[str, Any], on_benchmark_start=None) -> Dict[str, Any]:
"""Run a single tuning experiment.
Args:
task: Task configuration
experiment_id: Unique experiment identifier
parameters: Parameter values for this experiment
on_benchmark_start: Optional callback function called when benchmark phase starts
Returns:
Experiment results dictionary
"""
task_name = task["task_name"]
namespace = task["model"].get("namespace", "default") # namespace is optional, default to "default"
model_name = task["model"]["id_or_path"]
runtime_name = task["base_runtime"]
timeout = task["optimization"].get("timeout_per_iteration", 1800) # Default 30 minutes
# Dynamically switch python path based on runtime (for local mode)
if self.deployment_mode == "local" and hasattr(self, '_runtime_python_paths'):
runtime_key = runtime_name.lower()
if runtime_key in self._runtime_python_paths:
new_python_path = self._runtime_python_paths[runtime_key]
if Path(new_python_path).exists():
self.model_controller.python_path = new_python_path
print(f"[Config] Using {runtime_key} python: {new_python_path}")
else:
print(f"[Config] Warning: {runtime_key} python not found at {new_python_path}, using default")
# Dynamically adjust timeout for torch-compile + multi-GPU scenarios
# Triton kernel autotuning can take 10-20 minutes with TP > 1
enable_compile = parameters.get("enable-torch-compile", False)
tp_size = parameters.get("__parallel__tp", 1)
if enable_compile and tp_size > 1:
# TP > 1 with torch-compile: increase timeout significantly
adjusted_timeout = max(timeout, 1200) # At least 20 minutes
if adjusted_timeout > timeout:
print(f"[Timeout] Increased from {timeout}s to {adjusted_timeout}s (torch-compile + TP={tp_size})")
timeout = adjusted_timeout
elif enable_compile:
# TP = 1 with torch-compile: moderate increase
adjusted_timeout = max(timeout, 900) # At least 15 minutes
if adjusted_timeout > timeout:
print(f"[Timeout] Increased from {timeout}s to {adjusted_timeout}s (torch-compile)")
timeout = adjusted_timeout
# Optional: custom Docker image tag
image_tag = task.get("runtime_image_tag")
# Step 0: Ensure ClusterBaseModel and ClusterServingRuntime exist (OME mode only)
created_resources = {"clusterbasemodel": None, "clusterservingruntime": None}
if self.deployment_mode == "ome":
# Handle ClusterBaseModel creation
cbm_config = task.get("clusterbasemodel_config")
if cbm_config:
print(f"\n[Step 0a/4] Ensuring ClusterBaseModel exists...")
cbm_name, cbm_created = self._ensure_clusterbasemodel(cbm_config, model_name)
if cbm_created:
created_resources["clusterbasemodel"] = cbm_name
print(f"ClusterBaseModel '{cbm_name}' is ready")
elif cbm_name:
print(f"Using existing ClusterBaseModel '{cbm_name}'")
else:
print("Warning: Failed to ensure ClusterBaseModel, using model_name as fallback")
# Handle ClusterServingRuntime creation
csr_config = task.get("clusterservingruntime_config")
if csr_config:
print(f"\n[Step 0b/4] Ensuring ClusterServingRuntime exists...")
csr_name, csr_created = self._ensure_clusterservingruntime(csr_config, runtime_name)
if csr_created:
created_resources["clusterservingruntime"] = csr_name
print(f"ClusterServingRuntime '{csr_name}' is ready")
elif csr_name:
print(f"Using existing ClusterServingRuntime '{csr_name}'")
else:
print("Warning: Failed to ensure ClusterServingRuntime, using runtime_name as fallback")
# Update runtime_name to use the created/ensured runtime
if csr_name:
runtime_name = csr_name
print(f"\n{'='*80}")
print(f"Experiment {experiment_id}")
print(f"Parameters: {parameters}")
print(f"{'='*80}\n")
# Convert __quant__ prefixed parameters to runtime-specific CLI args
runtime_parameters = prepare_runtime_parameters(
base_runtime=runtime_name,
params=parameters,
model_path=model_name,
model_config=task.get("model")
)
print(f"Runtime-specific parameters: {runtime_parameters}")
experiment_result = {
"experiment_id": experiment_id,
"parameters": parameters, # Keep original for database
"status": "failed",
"metrics": None,
"container_logs": None, # Will store container logs for Docker mode
"error_message": None, # Will store error details for failed experiments
"created_resources": created_resources, # Track created resources
}
# Step 1: Deploy InferenceService
print(f"[Step 1/4] Deploying InferenceService...")
# For local mode, ensure model is pre-downloaded before starting
# This prevents download time from counting against experiment timeout
if self.deployment_mode == "local" and "/" in model_name:
print(f"[Pre-download] Ensuring model weights are cached: {model_name}")
if not self.model_controller.ensure_model_downloaded(model_name):
print(f"[Pre-download] WARNING: Model download may have failed, continuing anyway...")
# Extract storage configuration if present (for PVC support)
storage_config = task.get("storage")
# Call deploy based on deployment mode
if self.deployment_mode == "docker":
# DockerController
isvc_name = self.model_controller.deploy_inference_service(
task_name=task_name,
experiment_id=experiment_id,
namespace=namespace,
model_name=model_name,
runtime_name=runtime_name,
parameters=runtime_parameters,
image_tag=image_tag,
)
elif self.deployment_mode == "local":
# LocalController
isvc_name = self.model_controller.deploy_inference_service(
task_name=task_name,
experiment_id=experiment_id,
namespace=namespace,
model_name=model_name,
runtime_name=runtime_name,
parameters=runtime_parameters,
)
else:
# OMEController
isvc_name = self.model_controller.deploy_inference_service(
task_name=task_name,
experiment_id=experiment_id,
namespace=namespace,
model_name=model_name,
runtime_name=runtime_name,
parameters=runtime_parameters,
storage=storage_config,
enable_gpu_selection=False,
)
if not isvc_name:
error_msg = "Failed to deploy InferenceService"
print(error_msg)
experiment_result["error_message"] = error_msg
return experiment_result
# Step 2: Wait for InferenceService to be ready
print(f"\n[Step 2/4] Waiting for InferenceService to be ready...")
if not self.model_controller.wait_for_ready(isvc_name, namespace, timeout=timeout):
error_msg = "InferenceService did not become ready in time"
print(error_msg)
# Capture container logs for debugging
container_logs = self.cleanup_experiment(isvc_name, None, namespace)
if container_logs:
experiment_result["container_logs"] = container_logs
# Extract key error info from container logs
if "ValueError" in container_logs or "Error" in container_logs:
# Get last few lines of error for error_message
log_lines = container_logs.strip().split('\n')
error_lines = [line for line in log_lines if "Error" in line or "failed" in line.lower()]
if error_lines:
error_msg += f" - {error_lines[-1][:200]}" # Last error, truncated
experiment_result["error_message"] = error_msg
return experiment_result
# Get GPU information if available (Docker mode)
gpu_info = None
if self.deployment_mode == "docker" and hasattr(self.model_controller, "get_gpu_info"):
gpu_info = self.model_controller.get_gpu_info(isvc_name, namespace)
if gpu_info:
experiment_result["gpu_info"] = gpu_info
# Step 3: Run benchmark
print(f"\n[Step 3/4] Running benchmark...")
# Notify that benchmark phase is starting
if on_benchmark_start:
on_benchmark_start()
if self.use_direct_benchmark:
# Get endpoint URL (differs between Docker, Local and OME modes)
endpoint_url = None
gpu_indices = None
if self.deployment_mode == "docker":
# Docker mode: Get direct URL from controller
endpoint_url = self.model_controller.get_service_url(isvc_name, namespace)
if not endpoint_url:
error_msg = "Failed to get service URL from Docker controller"
print(error_msg)
experiment_result["error_message"] = error_msg
self.cleanup_experiment(isvc_name, None, namespace, experiment_id)
return experiment_result
# Extract GPU indices from gpu_info for monitoring
if gpu_info and "indices" in gpu_info.get("gpu_info", {}):
gpu_indices = gpu_info["gpu_info"]["indices"]
print(f"[GPU Monitor] Will monitor GPUs: {gpu_indices} during benchmark")
elif self.deployment_mode == "local":
# Local mode: Get direct URL from controller
endpoint_url = self.model_controller.get_service_url(isvc_name, namespace)
if not endpoint_url:
error_msg = "Failed to get service URL from Local controller"
print(error_msg)
experiment_result["error_message"] = error_msg
self.cleanup_experiment(isvc_name, None, namespace, experiment_id)
return experiment_result
# Extract GPU indices from controller's process info for monitoring
local_gpu_info = self.model_controller.get_gpu_info(isvc_name, namespace)
if local_gpu_info and local_gpu_info.get("device_ids"):
gpu_indices = [int(idx) for idx in local_gpu_info["device_ids"]]
print(f"[GPU Monitor] Will monitor GPUs: {gpu_indices} during benchmark")
# Direct CLI execution with automatic port forwarding (OME) or direct URL (Docker)
# Merge slo_config into benchmark_config for SLO-aware filtering
benchmark_config_with_slo = task["benchmark"].copy()
# Auto-populate model_name from task config if not provided in benchmark config
if "model_name" not in benchmark_config_with_slo:
benchmark_config_with_slo["model_name"] = model_name
if "slo" in task:
benchmark_config_with_slo["slo_config"] = task["slo"]
slo_value = benchmark_config_with_slo.get("slo_config")
print(f"[DEBUG ORCHESTRATOR] slo_config value: {slo_value}")
metrics = self.benchmark_controller.run_benchmark(
task_name=task_name,
experiment_id=experiment_id,
service_name=isvc_name,
namespace=namespace,
benchmark_config=benchmark_config_with_slo,
timeout=timeout,
endpoint_url=endpoint_url,
gpu_indices=gpu_indices,
)
benchmark_name = None # No K8s resource to track
# Step 4: Process results
print(f"\n[Step 4/4] Processing results...")
if metrics:
# Add per-GPU throughput metrics BEFORE calculating objective score
# This ensures per-GPU values are available for optimization
if gpu_info and gpu_info.get("count", 0) > 0:
gpu_count = gpu_info["count"]
# Add per-GPU throughput for all throughput metrics
if "mean_output_throughput" in metrics:
metrics["mean_output_throughput_per_gpu"] = metrics["mean_output_throughput"] / gpu_count
if "max_output_throughput" in metrics:
metrics["max_output_throughput_per_gpu"] = metrics["max_output_throughput"] / gpu_count
if "mean_total_throughput" in metrics:
metrics["mean_total_throughput_per_gpu"] = metrics["mean_total_throughput"] / gpu_count
if "max_total_throughput" in metrics:
metrics["max_total_throughput_per_gpu"] = metrics["max_total_throughput"] / gpu_count
# Also add per-GPU metrics to raw results
for raw_result in metrics.get("raw_results", []):
if "mean_output_throughput_tokens_per_s" in raw_result:
raw_result["mean_output_throughput_per_gpu"] = raw_result["mean_output_throughput_tokens_per_s"] / gpu_count
if "mean_input_throughput_tokens_per_s" in raw_result:
raw_result["mean_input_throughput_per_gpu"] = raw_result["mean_input_throughput_tokens_per_s"] / gpu_count
if "mean_total_tokens_throughput_tokens_per_s" in raw_result:
raw_result["mean_total_throughput_per_gpu"] = raw_result["mean_total_tokens_throughput_tokens_per_s"] / gpu_count
# Get SLO configuration from task if present
slo_config = task.get("slo")
# Calculate objective score with SLO penalties (per-GPU metrics now available)
score = calculate_objective_score(metrics, task["optimization"]["objective"], slo_config)
# Check if this is a hard SLO failure (score = inf/-inf)
is_slo_failure = (score == float("inf") or score == float("-inf"))
if is_slo_failure:
experiment_result["status"] = "failed"
experiment_result["slo_violation"] = True
experiment_result["error_message"] = "Hard SLO violation - experiment exceeded threshold limits"
print(f"Experiment {experiment_id} FAILED due to hard SLO violation")
else:
experiment_result["status"] = "success"
experiment_result["metrics"] = metrics
experiment_result["objective_score"] = score
print(f"Experiment {experiment_id} completed. Score: {score}")
else:
error_msg = "Failed to retrieve benchmark results"
print(error_msg)
experiment_result["error_message"] = error_msg
else:
# K8s BenchmarkJob execution
benchmark_name = self.benchmark_controller.create_benchmark_job(
task_name=task_name,
experiment_id=experiment_id,
namespace=namespace,
isvc_name=isvc_name,
benchmark_config=task["benchmark"],
)
if not benchmark_name:
error_msg = "Failed to create BenchmarkJob"
print(error_msg)
experiment_result["error_message"] = error_msg
self.cleanup_experiment(isvc_name, None, namespace)
return experiment_result
# Wait for benchmark to complete
if not self.benchmark_controller.wait_for_completion(benchmark_name, namespace, timeout=timeout):
error_msg = "Benchmark did not complete in time"
print(error_msg)
experiment_result["error_message"] = error_msg
self.cleanup_experiment(isvc_name, benchmark_name, namespace)
return experiment_result
# Step 4: Collect results
print(f"\n[Step 4/4] Collecting results...")
metrics = self.benchmark_controller.get_benchmark_results(benchmark_name, namespace)
if metrics:
# Get SLO configuration from task if present
slo_config = task.get("slo")
# Calculate objective score with SLO penalties
score = calculate_objective_score(metrics, task["optimization"]["objective"], slo_config)
# Check if this is a hard SLO failure (score = inf/-inf)
is_slo_failure = (score == float("inf") or score == float("-inf"))
if is_slo_failure:
experiment_result["status"] = "failed"
experiment_result["slo_violation"] = True
experiment_result["error_message"] = "Hard SLO violation - experiment exceeded threshold limits"
print(f"Experiment {experiment_id} FAILED due to hard SLO violation")
else:
experiment_result["status"] = "success"
experiment_result["metrics"] = metrics
experiment_result["objective_score"] = score
print(f"Experiment {experiment_id} completed. Score: {score}")
else:
error_msg = "Failed to retrieve benchmark results"
print(error_msg)
experiment_result["error_message"] = error_msg
# Cleanup
print(f"\n[Cleanup] Removing experiment resources...")
container_logs = self.cleanup_experiment(isvc_name, benchmark_name, namespace, experiment_id)
# Store container logs in result if available
if container_logs:
experiment_result["container_logs"] = container_logs
return experiment_result
[docs]
def cleanup_experiment(self, isvc_name: str, benchmark_name: str, namespace: str, experiment_id: int = None) -> str:
"""Clean up experiment resources.
Args:
isvc_name: InferenceService name
benchmark_name: BenchmarkJob name (can be None)
namespace: K8s namespace
experiment_id: Experiment ID (for direct benchmark cleanup)
Returns:
Container logs if available (Docker mode only), None otherwise
"""
container_logs = None
# Retrieve container logs before deletion (Docker mode only)
if self.deployment_mode == "docker" and hasattr(self.model_controller, "get_container_logs"):
print(f"[Cleanup] Retrieving container logs before deletion...")
container_logs = self.model_controller.get_container_logs(isvc_name, namespace, tail=0) # Get ALL logs
if container_logs:
print(f"[Cleanup] Retrieved {len(container_logs)} bytes of container logs")
# Continue with cleanup
if self.use_direct_benchmark and experiment_id:
self.benchmark_controller.cleanup_results(
task_name=isvc_name.rsplit("-exp", 1)[0], experiment_id=experiment_id
)
elif benchmark_name:
self.benchmark_controller.delete_benchmark_job(benchmark_name, namespace)
if isvc_name:
self.model_controller.delete_inference_service(isvc_name, namespace)
return container_logs
[docs]
def run_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""Run a complete autotuning task using optimization strategy.
Args:
task: Task configuration dictionary
Returns:
Summary of all experiments
"""
task_name = task["task_name"]
print(f"Starting task: {task_name}")
if "description" in task:
print(f"Description: {task['description']}")
# Create optimization strategy
optimization_config = task.get("optimization", {})
strategy_name = optimization_config.get("strategy", "grid_search")
objective = optimization_config.get("objective", "minimize_latency")
print(f"\nOptimization strategy: {strategy_name}")
print(f"Objective: {objective}")
# Merge parameters and parallel_config for optimization
# This allows parallel parameters (tp, pp, dp, etc.) to be tuned alongside other parameters
combined_parameters = dict(task.get("parameters", {}))
parallel_config = task.get("parallel_config", {})
if parallel_config:
print(f"\nParallel configuration detected:")
for key, value in parallel_config.items():
print(f" {key}: {value}")
# Add parallel config to combined parameters for optimization
combined_parameters[key] = value
try:
strategy = create_optimization_strategy(optimization_config, combined_parameters)
except Exception as e:
print(f"Error creating optimization strategy: {e}")
raise
# Run experiments using strategy
start_time = time.time()
iteration = 0
max_iterations = optimization_config.get("max_iterations", 100)
print(f"\nStarting optimization (max {max_iterations} iterations)...")
while not strategy.should_stop():
iteration += 1
# Get next parameter suggestion from strategy
parameters = strategy.suggest_parameters()
if parameters is None:
print(f"[Orchestrator] Strategy has no more suggestions")
break
# Run experiment
print(f"\n{'='*80}")
print(f"Experiment {iteration}")
print(f"{'='*80}")
result = self.run_experiment(task, iteration, parameters)
self.results.append(result)
# Update strategy with result
if result["status"] == "success":
strategy.tell_result(
parameters=parameters,
objective_score=result["objective_score"],
metrics=result.get("metrics", {})
)
else:
# For failed experiments, report worst possible score
worst_score = float("inf") if "minimize" in objective else float("-inf")
strategy.tell_result(
parameters=parameters,
objective_score=worst_score,
metrics={}
)
elapsed = time.time() - start_time
# Find best result
successful_results = [r for r in self.results if r["status"] == "success"]
if successful_results:
best_result = min(successful_results, key=lambda r: r["objective_score"])
print(f"\n{'='*80}")
print(f"AUTOTUNING COMPLETE")
print(f"{'='*80}")
print(f"Strategy: {strategy_name}")
print(f"Total experiments: {len(self.results)}")
print(f"Successful: {len(successful_results)}")
print(f"Failed: {len(self.results) - len(successful_results)}")
print(f"Total time: {elapsed:.1f}s")
print(f"\nBest configuration:")
print(f" Parameters: {best_result['parameters']}")
print(f" Score: {best_result['objective_score']:.4f}")
else:
best_result = None
print("\nNo successful experiments!")
return {
"task_name": task_name,
"strategy": strategy_name,
"total_experiments": len(self.results),
"successful_experiments": len(successful_results),
"elapsed_time": elapsed,
"best_result": best_result,
"all_results": self.results,
}
def _ensure_clusterbasemodel(self, config: Dict[str, Any], fallback_name: str) -> tuple[str, bool]:
"""Ensure ClusterBaseModel exists, create if needed.
Args:
config: ClusterBaseModel configuration (preset or spec)
fallback_name: Fallback name if config doesn't specify
Returns:
Tuple of (resource_name, was_created)
"""
if self.deployment_mode != "ome":
print("Warning: ClusterBaseModel creation only supported in OME mode")
return (fallback_name, False)
# Check if config specifies a preset
preset_name = config.get("preset")
if preset_name:
try:
preset = clusterbasemodel_presets.get_preset(preset_name)
name = preset["name"]
spec = preset["spec"]
print(f"Using ClusterBaseModel preset: {preset['display_name']}")
except ValueError as e:
print(f"Error loading preset: {e}")
return (fallback_name, False)
else:
# Custom configuration
name = config.get("name", fallback_name)
spec = config.get("spec")
if not spec:
print("Error: ClusterBaseModel config must have 'preset' or 'spec'")
return (fallback_name, False)
# Apply any overrides
overrides = config.get("overrides", {})
if overrides:
# Deep merge overrides into spec
for key, value in overrides.items():
if isinstance(value, dict) and key in spec and isinstance(spec[key], dict):
spec[key] = {**spec[key], **value}
else:
spec[key] = value
# Ensure resource exists
success = self.model_controller.ensure_clusterbasemodel(
name=name,
spec=spec,
labels=config.get("labels"),
annotations=config.get("annotations")
)
return (name if success else fallback_name, success)
def _ensure_clusterservingruntime(self, config: Dict[str, Any], fallback_name: str) -> tuple[str, bool]:
"""Ensure ClusterServingRuntime exists, create if needed.
Args:
config: ClusterServingRuntime configuration (preset or spec)
fallback_name: Fallback name if config doesn't specify
Returns:
Tuple of (resource_name, was_created)
"""
if self.deployment_mode != "ome":
print("Warning: ClusterServingRuntime creation only supported in OME mode")
return (fallback_name, False)
# Check if config specifies a preset
preset_name = config.get("preset")
if preset_name:
try:
preset = clusterservingruntime_presets.get_preset(preset_name)
name = preset["name"]
spec = preset["spec"]
print(f"Using ClusterServingRuntime preset: {preset['display_name']}")
except ValueError as e:
print(f"Error loading preset: {e}")
return (fallback_name, False)
else:
# Custom configuration
name = config.get("name", fallback_name)
spec = config.get("spec")
if not spec:
print("Error: ClusterServingRuntime config must have 'preset' or 'spec'")
return (fallback_name, False)
# Apply any overrides
overrides = config.get("overrides", {})
if overrides:
# Deep merge overrides into spec
def deep_merge(base, override):
result = base.copy()
for key, value in override.items():
if isinstance(value, dict) and key in result and isinstance(result[key], dict):
result[key] = deep_merge(result[key], value)
else:
result[key] = value
return result
spec = deep_merge(spec, overrides)
# Ensure resource exists
success = self.model_controller.ensure_clusterservingruntime(
name=name,
spec=spec,
labels=config.get("labels"),
annotations=config.get("annotations")
)
return (name if success else fallback_name, success)