Source code for controllers.benchmark_controller

"""
GenAI-Bench Wrapper

Manages BenchmarkJob resources and collects metrics.
"""

import time
import yaml
from pathlib import Path
from typing import Dict, Any, Optional
from jinja2 import Template
from kubernetes import client, config
from kubernetes.client.rest import ApiException


[docs] class BenchmarkController: """Controller for managing OME BenchmarkJob resources."""
[docs] def __init__(self, kubeconfig_path: Optional[str] = None): """Initialize the benchmark controller. Args: kubeconfig_path: Path to kubeconfig file. If None, uses in-cluster config. """ try: if kubeconfig_path: config.load_kube_config(config_file=kubeconfig_path) else: config.load_kube_config() except Exception: config.load_incluster_config() self.api_client = client.ApiClient() self.custom_api = client.CustomObjectsApi(self.api_client) self.batch_api = client.BatchV1Api(self.api_client) # Load templates template_dir = Path(__file__).parent.parent / "templates" with open(template_dir / "benchmark_job.yaml.j2") as f: self.benchmark_template = Template(f.read())
[docs] def create_benchmark_job( self, task_name: str, experiment_id: str, namespace: str, isvc_name: str, benchmark_config: Dict[str, Any], ) -> Optional[str]: """Create a BenchmarkJob to evaluate an InferenceService. Args: task_name: Autotuning task name experiment_id: Unique experiment identifier namespace: K8s namespace isvc_name: InferenceService name to benchmark benchmark_config: Benchmark configuration from input JSON Returns: BenchmarkJob name if successful, None otherwise """ benchmark_name = f"{task_name}-bench{experiment_id}" # Debug: Check model_tokenizer value model_tokenizer_value = benchmark_config.get("model_tokenizer") print( f"DEBUG: model_tokenizer = '{model_tokenizer_value}' (type: {type(model_tokenizer_value).__name__}, bool: {bool(model_tokenizer_value)})" ) # Render template rendered = self.benchmark_template.render( benchmark_name=benchmark_name, namespace=namespace, task_name=task_name, experiment_id=experiment_id, isvc_name=isvc_name, task_type=benchmark_config.get("task", "text-to-text"), model_tokenizer=model_tokenizer_value, traffic_scenarios=benchmark_config.get("traffic_scenarios", ["D(100,100)"]), num_concurrency=benchmark_config.get("num_concurrency", [1]), max_time_per_iteration=benchmark_config.get("max_time_per_iteration", 15), max_requests_per_iteration=benchmark_config.get("max_requests_per_iteration", 100), additional_params=benchmark_config.get("additional_params", {}), ) print(f"DEBUG: First 600 chars of rendered YAML:\n{rendered[:600]}\n---END DEBUG---") # Parse YAML benchmark_resource = yaml.safe_load(rendered) # Create BenchmarkJob try: self.custom_api.create_namespaced_custom_object( group="ome.io", version="v1beta1", namespace=namespace, plural="benchmarkjobs", body=benchmark_resource, ) print(f"Created BenchmarkJob '{benchmark_name}' in namespace '{namespace}'") return benchmark_name except ApiException as e: print(f"Error creating BenchmarkJob: {e}") return None
[docs] def wait_for_completion( self, benchmark_name: str, namespace: str, timeout: int = 1800, poll_interval: int = 15 ) -> bool: """Wait for BenchmarkJob to complete. Args: benchmark_name: BenchmarkJob name namespace: K8s namespace timeout: Maximum wait time in seconds poll_interval: Polling interval in seconds Returns: True if completed successfully, False if timeout or failed """ start_time = time.time() while time.time() - start_time < timeout: try: job = self.custom_api.get_namespaced_custom_object( group="ome.io", version="v1beta1", namespace=namespace, plural="benchmarkjobs", name=benchmark_name, ) # Check status state status = job.get("status", {}) state = status.get("state", "") # Check for completion if state == "Complete": print(f"BenchmarkJob '{benchmark_name}' completed successfully") return True elif state == "Failed": failure_message = status.get("failureMessage", "No details") print(f"BenchmarkJob '{benchmark_name}' failed: {failure_message}") return False elapsed = int(time.time() - start_time) print(f"Waiting for BenchmarkJob '{benchmark_name}' to complete... ({elapsed}s)") time.sleep(poll_interval) except ApiException as e: print(f"Error checking BenchmarkJob status: {e}") time.sleep(poll_interval) print(f"Timeout waiting for BenchmarkJob '{benchmark_name}' to complete") return False
[docs] def get_benchmark_results(self, benchmark_name: str, namespace: str) -> Optional[Dict[str, Any]]: """Retrieve benchmark results from BenchmarkJob status. Args: benchmark_name: BenchmarkJob name namespace: K8s namespace Returns: Dict containing benchmark metrics, or None if unavailable """ try: job = self.custom_api.get_namespaced_custom_object( group="ome.io", version="v1beta1", namespace=namespace, plural="benchmarkjobs", name=benchmark_name, ) status = job.get("status", {}) results = status.get("results", {}) if not results: print(f"No results found for BenchmarkJob '{benchmark_name}'") return None # Extract key metrics metrics = {"benchmark_name": benchmark_name, "status": status, "results": results} print(f"Retrieved results for BenchmarkJob '{benchmark_name}'") return metrics except ApiException as e: print(f"Error getting benchmark results: {e}") return None
[docs] def delete_benchmark_job(self, benchmark_name: str, namespace: str) -> bool: """Delete a BenchmarkJob. Args: benchmark_name: BenchmarkJob name namespace: K8s namespace Returns: True if deleted successfully """ try: self.custom_api.delete_namespaced_custom_object( group="ome.io", version="v1beta1", namespace=namespace, plural="benchmarkjobs", name=benchmark_name, ) print(f"Deleted BenchmarkJob '{benchmark_name}' from namespace '{namespace}'") return True except ApiException as e: if e.status == 404: print(f"BenchmarkJob '{benchmark_name}' not found (already deleted?)") return True else: print(f"Error deleting BenchmarkJob: {e}") return False