API Reference

Auto-generated API documentation from Python docstrings.

Core Components

Orchestrator

Autotuner Orchestrator

Main orchestration logic for running parameter tuning experiments. Coordinates deployment controllers and benchmark execution.

class orchestrator.AutotunerOrchestrator[source]

Bases: object

Main orchestrator for the autotuning process.

__init__(deployment_mode='ome', kubeconfig_path=None, use_direct_benchmark=False, docker_model_path='/mnt/data/models', verbose=False, http_proxy='', https_proxy='', no_proxy='', hf_token='')[source]

Initialize the orchestrator.

Parameters:
  • deployment_mode (str) – Deployment mode - ‘ome’ (Kubernetes), ‘docker’ (standalone), or ‘local’ (subprocess)

  • kubeconfig_path (str) – Path to kubeconfig file (for OME mode)

  • use_direct_benchmark (bool) – If True, use direct genai-bench CLI instead of K8s BenchmarkJob

  • docker_model_path (str) – Base path for models in Docker/Local mode

  • verbose (bool) – If True, stream genai-bench output in real-time

  • http_proxy (str) – HTTP proxy URL for containers (optional)

  • https_proxy (str) – HTTPS proxy URL for containers (optional)

  • no_proxy (str) – Comma-separated list of hosts to bypass proxy (optional)

  • hf_token (str)

run_experiment(task, experiment_id, parameters, on_benchmark_start=None)[source]

Run a single tuning experiment.

Parameters:
  • task (Dict[str, Any]) – Task configuration

  • experiment_id (int) – Unique experiment identifier

  • parameters (Dict[str, Any]) – Parameter values for this experiment

  • on_benchmark_start – Optional callback function called when benchmark phase starts

Return type:

Dict[str, Any]

Returns:

Experiment results dictionary

cleanup_experiment(isvc_name, benchmark_name, namespace, experiment_id=None)[source]

Clean up experiment resources.

Parameters:
  • isvc_name (str) – InferenceService name

  • benchmark_name (str) – BenchmarkJob name (can be None)

  • namespace (str) – K8s namespace

  • experiment_id (int) – Experiment ID (for direct benchmark cleanup)

Return type:

str

Returns:

Container logs if available (Docker mode only), None otherwise

run_task(task)[source]

Run a complete autotuning task using optimization strategy.

Parameters:

task (Dict[str, Any]) – Task configuration dictionary

Return type:

Dict[str, Any]

Returns:

Summary of all experiments

Controllers

Base Controller

Base Controller Interface

Abstract base class for model deployment controllers. Supports multiple deployment modes (OME/Kubernetes, Docker, etc.)

class controllers.base_controller.BaseModelController[source]

Bases: ABC

Abstract base class for model deployment controllers.

abstractmethod deploy_inference_service(task_name, experiment_id, namespace, model_name, runtime_name, parameters)[source]

Deploy a model inference service with specified parameters.

Parameters:
  • task_name (str) – Autotuning task name

  • experiment_id (int) – Unique experiment identifier

  • namespace (str) – Namespace/resource group identifier

  • model_name (str) – Model name/path

  • runtime_name (str) – Runtime identifier (e.g., ‘sglang’)

  • parameters (Dict[str, Any]) – Deployment parameters (tp_size, mem_frac, etc.)

Return type:

Optional[str]

Returns:

Service identifier (name/ID) if successful, None otherwise

abstractmethod wait_for_ready(service_id, namespace, timeout=600, poll_interval=10)[source]

Wait for the inference service to become ready.

Parameters:
  • service_id (str) – Service identifier returned by deploy_inference_service

  • namespace (str) – Namespace/resource group identifier

  • timeout (int) – Maximum wait time in seconds

  • poll_interval (int) – Polling interval in seconds

Return type:

bool

Returns:

True if service is ready, False if timeout or error

abstractmethod delete_inference_service(service_id, namespace)[source]

Delete an inference service.

Parameters:
  • service_id (str) – Service identifier

  • namespace (str) – Namespace/resource group identifier

Return type:

bool

Returns:

True if deleted successfully

abstractmethod get_service_url(service_id, namespace)[source]

Get the service URL/endpoint for the inference service.

Parameters:
  • service_id (str) – Service identifier

  • namespace (str) – Namespace/resource group identifier

Return type:

Optional[str]

Returns:

Service URL/endpoint if available, None otherwise

Docker Controller

Docker Deployment Controller

Manages the lifecycle of model inference services using standalone Docker containers. No Kubernetes required - direct Docker container management.

class controllers.docker_controller.DockerController[source]

Bases: BaseModelController

Controller for managing standalone Docker container deployments.

__init__(model_base_path='/mnt/data/models', http_proxy='', https_proxy='', no_proxy='', hf_token='')[source]

Initialize the Docker controller.

Parameters:
  • model_base_path (str) – Base path where models are stored on the host

  • http_proxy (str) – HTTP proxy URL (optional)

  • https_proxy (str) – HTTPS proxy URL (optional)

  • no_proxy (str) – Comma-separated list of hosts to bypass proxy (optional)

  • hf_token (str) – HuggingFace access token for gated models (optional)

Note

Container logs are retrieved before deletion and saved to task log file. Containers are manually removed during cleanup phase.

deploy_inference_service(task_name, experiment_id, namespace, model_name, runtime_name, parameters, image_tag=None)[source]

Deploy a model inference service using Docker.

Parameters:
  • task_name (str) – Autotuning task name

  • experiment_id (int) – Unique experiment identifier

  • namespace (str) – Namespace identifier (used for container naming)

  • model_name (str) – Model name (HuggingFace model ID or local path)

  • runtime_name (str) – Runtime identifier (e.g., ‘sglang’, ‘vllm’)

  • parameters (Dict[str, Any]) – SGLang/runtime parameters (tp_size, mem_frac, etc.)

  • image_tag (Optional[str]) – Optional Docker image tag (e.g., ‘v0.5.2-cu126’)

Return type:

Optional[str]

Returns:

Container ID if successful, None otherwise

wait_for_ready(service_id, namespace, timeout=600, poll_interval=5)[source]

Wait for the Docker container service to become ready.

Parameters:
  • service_id (str) – Service identifier (container name)

  • namespace (str) – Namespace identifier

  • timeout (int) – Maximum wait time in seconds

  • poll_interval (int) – Polling interval in seconds

Return type:

bool

Returns:

True if service is ready, False if timeout or error

delete_inference_service(service_id, namespace)[source]

Delete a Docker container service.

Parameters:
  • service_id (str) – Service identifier

  • namespace (str) – Namespace identifier

Return type:

bool

Returns:

True if deleted successfully

get_service_url(service_id, namespace)[source]

Get the service URL for a Docker container.

Parameters:
  • service_id (str) – Service identifier

  • namespace (str) – Namespace identifier

Return type:

Optional[str]

Returns:

Service URL if available, None otherwise

get_container_logs(service_id, namespace, tail=1000)[source]

Get logs from a Docker container.

Parameters:
  • service_id (str) – Service identifier

  • namespace (str) – Namespace identifier

  • tail (int) – Number of lines to retrieve (default: 1000, 0 for all)

Return type:

Optional[str]

Returns:

Container logs as string, None if container not found

get_gpu_info(service_id, namespace)[source]

Get GPU information for a deployed container.

Parameters:
  • service_id (str) – Service identifier

  • namespace (str) – Namespace identifier

Returns:

{model, count, device_ids, world_size}, or None if not found

Return type:

Dict with GPU info

OME Controller

OME Deployment Controller

Manages the lifecycle of InferenceService resources for autotuning experiments.

class controllers.ome_controller.OMEController[source]

Bases: BaseModelController

Controller for managing OME InferenceService deployments.

__init__(kubeconfig_path=None)[source]

Initialize the OME controller.

Parameters:

kubeconfig_path (Optional[str]) – Path to kubeconfig file. If None, uses in-cluster config.

create_namespace(namespace)[source]

Create namespace if it doesn’t exist.

Parameters:

namespace (str) – Namespace name

Return type:

bool

Returns:

True if created or already exists

deploy_inference_service(task_name, experiment_id, namespace, model_name, runtime_name, parameters, storage=None, enable_gpu_selection=True)[source]

Deploy an InferenceService with specified parameters.

Parameters:
  • task_name (str) – Autotuning task name

  • experiment_id (int) – Unique experiment identifier (will be converted to string internally)

  • namespace (str) – K8s namespace

  • model_name (str) – Model name

  • runtime_name (str) – ServingRuntime name

  • parameters (Dict[str, Any]) – SGLang parameters (tp_size, mem_frac, etc.)

  • storage (Optional[Dict[str, Any]]) –

    Optional storage configuration for PVC support {

    ’type’: ‘pvc’, ‘pvc_name’: ‘model-storage-pvc’, ‘pvc_subpath’: ‘meta/llama-3-2-1b-instruct’, ‘mount_path’: ‘/raid/models/meta/llama-3-2-1b-instruct’

    }

  • enable_gpu_selection (bool) – If True, intelligently select node with idle GPUs (default: True)

Return type:

Optional[str]

Returns:

InferenceService name if successful, None otherwise

wait_for_ready(isvc_name, namespace, timeout=600, poll_interval=10)[source]

Wait for InferenceService to become ready.

Parameters:
  • isvc_name (str) – InferenceService name

  • namespace (str) – K8s namespace

  • timeout (int) – Maximum wait time in seconds

  • poll_interval (int) – Polling interval in seconds

Return type:

bool

Returns:

True if ready, False if timeout or error

delete_inference_service(isvc_name, namespace)[source]

Delete an InferenceService.

Parameters:
  • isvc_name (str) – InferenceService name

  • namespace (str) – K8s namespace

Return type:

bool

Returns:

True if deleted successfully

get_service_url(isvc_name, namespace)[source]

Get the service URL for an InferenceService.

Parameters:
  • isvc_name (str) – InferenceService name

  • namespace (str) – K8s namespace

Return type:

Optional[str]

Returns:

Service URL if available, None otherwise

ensure_clusterbasemodel(name, spec, labels=None, annotations=None)[source]

Ensure ClusterBaseModel exists, create if missing.

Parameters:
Return type:

bool

Returns:

True if exists or created successfully, False otherwise

list_clusterbasemodels()[source]

List all ClusterBaseModels in the cluster.

Return type:

Optional[Dict[str, Any]]

Returns:

List of ClusterBaseModels or None on error

ensure_clusterservingruntime(name, spec, labels=None, annotations=None)[source]

Ensure ClusterServingRuntime exists, create if missing.

Parameters:
Return type:

bool

Returns:

True if exists or created successfully, False otherwise

list_clusterservingruntimes()[source]

List all ClusterServingRuntimes in the cluster.

Return type:

Optional[Dict[str, Any]]

Returns:

List of ClusterServingRuntimes or None on error

Local Controller

Local Subprocess Controller

Manages the lifecycle of model inference services using local subprocess. No Docker or Kubernetes required - direct process management.

class controllers.local_controller.LocalController[source]

Bases: BaseModelController

Controller for managing local subprocess deployments.

__init__(model_base_path='/mnt/data/models', python_path='python3', http_proxy='', https_proxy='', no_proxy='', hf_token='')[source]

Initialize the local subprocess controller.

Parameters:
  • model_base_path (str) – Base path where models are stored

  • python_path (str) – Path to python executable with sglang installed

  • http_proxy (str) – HTTP proxy URL (optional)

  • https_proxy (str) – HTTPS proxy URL (optional)

  • no_proxy (str) – Comma-separated list of hosts to bypass proxy (optional)

  • hf_token (str) – HuggingFace access token for gated models (optional)

deploy_inference_service(task_name, experiment_id, namespace, model_name, runtime_name, parameters, image_tag=None)[source]

Deploy a model inference service using local subprocess.

Parameters:
  • task_name (str) – Autotuning task name

  • experiment_id (int) – Unique experiment identifier

  • namespace (str) – Namespace identifier (used for naming)

  • model_name (str) – Model name (HuggingFace model ID or local path)

  • runtime_name (str) – Runtime identifier (e.g., ‘sglang’, ‘vllm’)

  • parameters (Dict[str, Any]) – Runtime parameters (tp_size, mem_frac, etc.)

  • image_tag (Optional[str]) – Unused in local mode, kept for compatibility

Return type:

Optional[str]

Returns:

Service ID if successful, None otherwise

wait_for_ready(service_id, namespace, timeout=600, poll_interval=5)[source]

Wait for the local subprocess service to become ready.

Parameters:
  • service_id (str) – Service identifier

  • namespace (str) – Namespace identifier

  • timeout (int) – Maximum wait time in seconds

  • poll_interval (int) – Polling interval in seconds

Return type:

bool

Returns:

True if service is ready, False if timeout or error

delete_inference_service(service_id, namespace)[source]

Delete a local subprocess service.

Parameters:
  • service_id (str) – Service identifier

  • namespace (str) – Namespace identifier

Return type:

bool

Returns:

True if deleted successfully

get_service_url(service_id, namespace)[source]

Get the service URL for a local subprocess.

Parameters:
  • service_id (str) – Service identifier

  • namespace (str) – Namespace identifier

Return type:

Optional[str]

Returns:

Service URL if available, None otherwise

get_container_logs(service_id, namespace, tail=1000)[source]

Get logs from a local subprocess.

Parameters:
  • service_id (str) – Service identifier

  • namespace (str) – Namespace identifier

  • tail (int) – Number of lines to retrieve

Return type:

Optional[str]

Returns:

Log content as string, None if not found

get_gpu_info(service_id, namespace)[source]

Get GPU information for a deployed service.

Parameters:
  • service_id (str) – Service identifier

  • namespace (str) – Namespace identifier

Return type:

Optional[Dict[str, Any]]

Returns:

Dict with GPU info, or None if not found

ensure_model_downloaded(model_name, timeout=3600)[source]

Pre-download model weights before starting experiments.

This ensures large models are fully downloaded before the experiment timeout starts counting. Uses huggingface-cli for efficient downloading.

Parameters:
  • model_name (str) – HuggingFace model ID (e.g., ‘openai/gpt-oss-120b’)

  • timeout (int) – Maximum time to wait for download (default: 1 hour)

Return type:

bool

Returns:

True if model is ready, False if download failed

Benchmark Controllers

Direct GenAI-Bench Controller

Runs genai-bench directly using the CLI instead of Kubernetes BenchmarkJob CRDs. This bypasses the genai-bench v251014 image issues by using the local installation.

class controllers.direct_benchmark_controller.DirectBenchmarkController[source]

Bases: object

Controller for running genai-bench directly via CLI.

__init__(genai_bench_path='env/bin/genai-bench', verbose=False)[source]

Initialize the direct benchmark controller.

Parameters:
  • genai_bench_path (str) – Path to genai-bench executable (can be relative or absolute)

  • verbose (bool) – If True, stream genai-bench output in real-time

setup_port_forward(service_name, namespace, remote_port=8080, local_port=8080)[source]

Setup kubectl port-forward for accessing InferenceService.

Parameters:
  • service_name (str) – InferenceService name (used to find pods)

  • namespace (str) – K8s namespace

  • remote_port (int) – Remote service port (default 8080 for OME InferenceServices)

  • local_port (int) – Local port to forward to

Return type:

Optional[str]

Returns:

Local endpoint URL if successful, None otherwise

cleanup_port_forward()[source]

Stop port forward process.

run_benchmark(task_name, experiment_id, service_name, namespace, benchmark_config, timeout=1800, local_port=8080, endpoint_url=None, gpu_indices=None)[source]

Run benchmark against an inference endpoint with automatic port forwarding.

Parameters:
  • task_name (str) – Autotuning task name

  • experiment_id (int) – Unique experiment identifier

  • service_name (str) – K8s service name (or Docker container name)

  • namespace (str) – K8s namespace (ignored in Docker mode)

  • benchmark_config (Dict[str, Any]) – Benchmark configuration from input JSON

  • timeout (int) – Maximum execution time in seconds

  • local_port (int) – Local port for port forwarding (ignored if endpoint_url is provided)

  • endpoint_url (Optional[str]) – Optional direct endpoint URL (skips port-forward setup for Docker mode)

  • gpu_indices (Optional[List[int]]) – Optional list of GPU indices to monitor during benchmark

Return type:

Optional[Dict[str, Any]]

Returns:

Dict containing benchmark metrics and GPU statistics, or None if failed

cleanup_results(task_name, experiment_id)[source]

Clean up benchmark result files.

Parameters:
  • task_name (str) – Autotuning task name

  • experiment_id (int) – Experiment identifier

Return type:

bool

Returns:

True if cleaned up successfully

GenAI-Bench Wrapper

Manages BenchmarkJob resources and collects metrics.

class controllers.benchmark_controller.BenchmarkController[source]

Bases: object

Controller for managing OME BenchmarkJob resources.

__init__(kubeconfig_path=None)[source]

Initialize the benchmark controller.

Parameters:

kubeconfig_path (Optional[str]) – Path to kubeconfig file. If None, uses in-cluster config.

create_benchmark_job(task_name, experiment_id, namespace, isvc_name, benchmark_config)[source]

Create a BenchmarkJob to evaluate an InferenceService.

Parameters:
  • task_name (str) – Autotuning task name

  • experiment_id (str) – Unique experiment identifier

  • namespace (str) – K8s namespace

  • isvc_name (str) – InferenceService name to benchmark

  • benchmark_config (Dict[str, Any]) – Benchmark configuration from input JSON

Return type:

Optional[str]

Returns:

BenchmarkJob name if successful, None otherwise

wait_for_completion(benchmark_name, namespace, timeout=1800, poll_interval=15)[source]

Wait for BenchmarkJob to complete.

Parameters:
  • benchmark_name (str) – BenchmarkJob name

  • namespace (str) – K8s namespace

  • timeout (int) – Maximum wait time in seconds

  • poll_interval (int) – Polling interval in seconds

Return type:

bool

Returns:

True if completed successfully, False if timeout or failed

get_benchmark_results(benchmark_name, namespace)[source]

Retrieve benchmark results from BenchmarkJob status.

Parameters:
  • benchmark_name (str) – BenchmarkJob name

  • namespace (str) – K8s namespace

Return type:

Optional[Dict[str, Any]]

Returns:

Dict containing benchmark metrics, or None if unavailable

delete_benchmark_job(benchmark_name, namespace)[source]

Delete a BenchmarkJob.

Parameters:
  • benchmark_name (str) – BenchmarkJob name

  • namespace (str) – K8s namespace

Return type:

bool

Returns:

True if deleted successfully

Utilities

Optimizer

Utility functions and classes for parameter optimization.

utils.optimizer.generate_parameter_grid(parameter_spec)[source]

Generate all parameter combinations for grid search.

Supports two formats: 1. Simple format: {“param_name”: [value1, value2]}

  • Direct list of values for each parameter

  1. Structured format: {“param_name”: {“type”: “choice”, “values”: [value1, value2]}} - Legacy format with explicit type specification

Parameters:

parameter_spec (Dict[str, Any]) – Dict mapping parameter names to their values or specifications Simple: {“tp-size”: [1, 2], “mem-fraction-static”: [0.7, 0.8]} Structured: {“tp_size”: {“type”: “choice”, “values”: [1, 2]}}

Return type:

List[Dict[str, Any]]

Returns:

List of parameter dictionaries, one for each combination

utils.optimizer.calculate_slo_penalty(metrics, slo_config=None)[source]

Calculate SLO penalty with exponential curve near boundaries.

Implements tiered enforcement: - Minor violations: exponential penalty only - Severe violations: mark as hard failure

Parameters:
  • metrics (Dict[str, Any]) – Benchmark metrics dictionary

  • slo_config (Optional[Dict[str, Any]]) –

    SLO configuration from task JSON Format: {

    ”latency”: {

    “p50”: {“threshold”: 2.0, “weight”: 1.0, “hard_fail”: false}, “p90”: {“threshold”: 5.0, “weight”: 2.0, “hard_fail”: true, “fail_ratio”: 0.2}

    }, “ttft”: {“threshold”: 1.0, “weight”: 2.0, “hard_fail”: false}, “steepness”: 0.1 # Controls exponential slope (lower = steeper)

    }

Return type:

Tuple[float, bool, Dict[str, Any]]

Returns:

Tuple of (total_penalty_multiplier, is_hard_failure, violation_details) - penalty_multiplier: Value to multiply base score by (1.0 = no penalty) - is_hard_failure: True if experiment should be marked as failed - violation_details: Dict with per-metric violation info

utils.optimizer.check_batch_slo_compliance(batch_metrics, slo_config=None)[source]

Check if a single batch (concurrency level) meets SLO requirements.

This is used to filter out batches that violate SLO constraints before aggregation.

Parameters:
  • batch_metrics (Dict[str, Any]) – Single batch metrics from genai-bench (one concurrency level)

  • slo_config (Optional[Dict[str, Any]]) – SLO configuration from task JSON

Return type:

Tuple[bool, Dict[str, Any]]

Returns:

Tuple of (is_compliant, violation_details) - is_compliant: True if batch meets all SLO requirements - violation_details: Dict with violation information for logging

utils.optimizer.calculate_objective_score(results, objective='minimize_latency', slo_config=None)[source]

Calculate objective score from benchmark results with optional SLO penalties.

Parameters:
  • results (Dict[str, Any]) – Benchmark results dictionary from DirectBenchmarkController._parse_results()

  • objective (str) – Optimization objective - ‘minimize_latency’ or ‘maximize_throughput’

  • slo_config (Optional[Dict[str, Any]]) – Optional SLO configuration for penalty calculation

Return type:

float

Returns:

Objective score with SLO penalties applied (lower is better for minimization, higher for maximization) Note: For hard SLO violations, returns worst possible score (inf or -inf)

class utils.optimizer.OptimizationStrategy[source]

Bases: ABC

Abstract base class for optimization strategies.

__init__(parameter_spec, objective='minimize_latency')[source]

Initialize optimization strategy.

Parameters:
  • parameter_spec (Dict[str, Any]) – Parameter specification dictionary

  • objective (str) – Optimization objective (minimize_latency, maximize_throughput, etc.)

abstractmethod suggest_parameters()[source]

Suggest next parameter configuration to try.

Return type:

Optional[Dict[str, Any]]

Returns:

Dictionary of parameter values, or None if strategy is done

abstractmethod tell_result(parameters, objective_score, metrics)[source]

Update strategy with experiment result.

Parameters:
  • parameters (Dict[str, Any]) – Parameter configuration that was tested

  • objective_score (float) – Objective score from calculate_objective_score()

  • metrics (Dict[str, Any]) – Full metrics dictionary from benchmark

should_stop()[source]

Check if optimization should stop early.

Return type:

bool

Returns:

True if strategy has converged or no more suggestions

get_state()[source]

Serialize strategy state for checkpoint.

Return type:

Dict[str, Any]

Returns:

Dictionary containing strategy state

classmethod from_state(state)[source]

Restore strategy from serialized state.

Parameters:

state (Dict[str, Any]) – Dictionary containing strategy state

Return type:

OptimizationStrategy

Returns:

Restored strategy instance

class utils.optimizer.GridSearchStrategy[source]

Bases: OptimizationStrategy

Grid search optimization - exhaustive evaluation of all combinations.

__init__(parameter_spec, objective='minimize_latency', max_iterations=None)[source]

Initialize grid search strategy.

Parameters:
  • parameter_spec (Dict[str, Any]) – Parameter specification dictionary

  • objective (str) – Optimization objective

  • max_iterations (Optional[int]) – Maximum number of iterations (limits grid size)

suggest_parameters()[source]

Get next parameter combination from grid.

Return type:

Optional[Dict[str, Any]]

tell_result(parameters, objective_score, metrics)[source]

Record result (grid search doesn’t adapt).

Parameters:
should_stop()[source]

Stop when all combinations evaluated.

Return type:

bool

get_state()[source]

Serialize GridSearch state for checkpoint.

Return type:

Dict[str, Any]

classmethod from_state(state)[source]

Restore GridSearch from serialized state.

Return type:

GridSearchStrategy

Parameters:

state (Dict[str, Any])

class utils.optimizer.BayesianStrategy[source]

Bases: OptimizationStrategy

Bayesian optimization using Optuna’s TPE sampler.

__init__(parameter_spec, objective='minimize_latency', max_iterations=100, n_initial_random=5, study_name=None, storage=None)[source]

Initialize Bayesian optimization strategy.

Parameters:
  • parameter_spec (Dict[str, Any]) – Parameter specification dictionary

  • objective (str) – Optimization objective

  • max_iterations (int) – Maximum number of trials

  • n_initial_random (int) – Number of random trials before Bayesian optimization

  • study_name (Optional[str]) – Optional name for Optuna study

  • storage (Optional[str]) – Optional Optuna storage URL (e.g., sqlite:///optuna.db)

suggest_parameters()[source]

Suggest next parameter configuration using Optuna.

Ensures no duplicate parameter combinations are tried by adding random perturbation if sampler suggests a duplicate.

Return type:

Optional[Dict[str, Any]]

tell_result(parameters, objective_score, metrics)[source]

Update Optuna study with experiment result.

Parameters:
should_stop()[source]

Check if Bayesian optimization should stop.

Return type:

bool

get_best_params()[source]

Get best parameters found so far.

Return type:

Dict[str, Any]

get_best_score()[source]

Get best objective score found so far.

Return type:

float

get_state()[source]

Serialize Bayesian state for checkpoint.

Return type:

Dict[str, Any]

classmethod from_state(state)[source]

Restore Bayesian from serialized state.

Return type:

BayesianStrategy

Parameters:

state (Dict[str, Any])

class utils.optimizer.RandomSearchStrategy[source]

Bases: OptimizationStrategy

Random search - random sampling from parameter space.

__init__(parameter_spec, objective='minimize_latency', max_iterations=100, seed=None)[source]

Initialize random search strategy.

Parameters:
  • parameter_spec (Dict[str, Any]) – Parameter specification dictionary

  • objective (str) – Optimization objective

  • max_iterations (int) – Maximum number of random samples

  • seed (Optional[int]) – Random seed for reproducibility

suggest_parameters()[source]

Suggest random parameter configuration.

Return type:

Optional[Dict[str, Any]]

tell_result(parameters, objective_score, metrics)[source]

Record result.

Parameters:
should_stop()[source]

Stop after max iterations.

Return type:

bool

get_state()[source]

Serialize Random state for checkpoint.

Return type:

Dict[str, Any]

classmethod from_state(state)[source]

Restore Random from serialized state.

Return type:

RandomSearchStrategy

Parameters:

state (Dict[str, Any])

utils.optimizer.create_optimization_strategy(optimization_config, parameter_spec)[source]

Factory function to create optimization strategy.

Parameters:
  • optimization_config (Dict[str, Any]) – Optimization configuration from task {“strategy”: “grid_search”, “objective”: “minimize_latency”, …}

  • parameter_spec (Dict[str, Any]) – Parameter specification dictionary

Return type:

OptimizationStrategy

Returns:

OptimizationStrategy instance

utils.optimizer.restore_optimization_strategy(state)[source]

Restore optimization strategy from serialized state.

Parameters:

state (Dict[str, Any]) – Serialized strategy state (from strategy.get_state())

Return type:

OptimizationStrategy

Returns:

Restored OptimizationStrategy instance

GPU Discovery

GPU Discovery Utility

Provides functions to discover and select idle GPUs across the Kubernetes cluster.

class utils.gpu_discovery.NodeGPUSummary[source]

Bases: object

Summary of GPU resources on a node.

node_name: str
total_gpus: int
allocatable_gpus: int
gpus_with_metrics: List[ClusterGPUInfo]
avg_utilization: float
avg_memory_usage: float
idle_gpu_count: int
__init__(node_name, total_gpus, allocatable_gpus, gpus_with_metrics, avg_utilization, avg_memory_usage, idle_gpu_count)
Parameters:
  • node_name (str)

  • total_gpus (int)

  • allocatable_gpus (int)

  • gpus_with_metrics (List[ClusterGPUInfo])

  • avg_utilization (float)

  • avg_memory_usage (float)

  • idle_gpu_count (int)

Return type:

None

utils.gpu_discovery.get_cluster_gpu_status()[source]

Query cluster-wide GPU status using kubectl and nvidia-smi.

Return type:

List[ClusterGPUInfo]

Returns:

List of GPUInfo objects for all GPUs in the cluster

utils.gpu_discovery.get_node_gpu_summaries()[source]

Get GPU summaries grouped by node.

Return type:

Dict[str, NodeGPUSummary]

Returns:

Dictionary mapping node name to NodeGPUSummary

utils.gpu_discovery.find_best_node_for_deployment(required_gpus=1, utilization_threshold=30.0, memory_threshold=50.0)[source]

Find the best node for deploying a new inference service.

Selection criteria (in order): 1. Must have enough allocatable GPUs 2. Prefer nodes with idle GPUs (low utilization and memory usage) 3. Among idle nodes, prefer the one with most idle GPUs 4. If no idle nodes, prefer node with lowest average utilization

Parameters:
  • required_gpus (int) – Number of GPUs required for deployment

  • utilization_threshold (float) – GPU utilization % threshold for “idle” (default: 30%)

  • memory_threshold (float) – Memory usage % threshold for “idle” (default: 50%)

Return type:

Optional[str]

Returns:

Node name to deploy to, or None if no suitable node found

GPU Monitor

GPU Monitoring and Management Utilities

Provides comprehensive GPU tracking, allocation, and monitoring capabilities for the LLM Autotuner.

class utils.gpu_monitor.GPUSnapshot[source]

Bases: object

Snapshot of all GPUs at a point in time.

timestamp: datetime
gpus: List[LocalGPUInfo]
total_gpus: int
available_gpus: int
to_dict()[source]

Convert to dictionary.

Return type:

Dict[str, Any]

__init__(timestamp, gpus, total_gpus, available_gpus)
Parameters:
Return type:

None

class utils.gpu_monitor.GPUMonitor[source]

Bases: object

Monitor and manage GPU resources.

__init__()[source]
is_available()[source]

Check if nvidia-smi is available.

Return type:

bool

get_gpu_count()[source]

Get total number of GPUs.

Return type:

int

query_gpus(use_cache=True)[source]

Query all GPU information.

Parameters:

use_cache (bool) – Whether to use cached results if available

Return type:

Optional[GPUSnapshot]

Returns:

GPUSnapshot or None if query fails

get_available_gpus(min_memory_mb=None, max_utilization=50)[source]

Get list of available GPU indices.

Parameters:
  • min_memory_mb (Optional[int]) – Minimum free memory required (MB)

  • max_utilization (int) – Maximum GPU utilization allowed (%)

Return type:

List[int]

Returns:

List of GPU indices sorted by availability score (best first)

allocate_gpus(count, min_memory_mb=None)[source]

Allocate specified number of GPUs.

Parameters:
  • count (int) – Number of GPUs to allocate

  • min_memory_mb (Optional[int]) – Minimum memory required per GPU (MB)

Return type:

Tuple[List[int], bool]

Returns:

Tuple of (allocated_gpu_indices, success)

get_gpu_info(gpu_index)[source]

Get information for specific GPU.

Return type:

Optional[LocalGPUInfo]

Parameters:

gpu_index (int)

monitor_gpus(gpu_indices, duration_seconds, interval_seconds=1.0)[source]

Monitor specific GPUs over time.

Parameters:
  • gpu_indices (List[int]) – List of GPU indices to monitor

  • duration_seconds (float) – How long to monitor

  • interval_seconds (float) – Sampling interval

Return type:

List[GPUSnapshot]

Returns:

List of GPU snapshots

get_summary_stats(snapshots)[source]

Calculate summary statistics from monitoring snapshots.

Parameters:

snapshots (List[GPUSnapshot]) – List of GPU snapshots

Return type:

Dict[str, Any]

Returns:

Dictionary with summary statistics

utils.gpu_monitor.get_gpu_monitor()[source]

Get global GPU monitor instance.

Return type:

GPUMonitor

GPU Pool

GPU Resource Pool for parallel experiment execution.

This module provides a GPU resource pool that manages allocation and deallocation of GPU resources across concurrent experiments. It ensures: - No GPU allocation conflicts - Fair FIFO allocation order - Automatic cleanup on failure - Integration with existing gpu_monitor infrastructure

class utils.gpu_pool.GPUAllocation[source]

Bases: object

Represents an allocated GPU resource.

gpu_indices: List[int]
allocated_at: datetime
experiment_id: Optional[int] = None
params: Optional[dict] = None
__init__(gpu_indices, allocated_at, experiment_id=None, params=None)
Parameters:
Return type:

None

class utils.gpu_pool.GPUResourcePool[source]

Bases: object

GPU resource pool for managing concurrent experiment execution.

Features: - FIFO queue for fair allocation - Atomic acquire/release operations - Integration with gpu_monitor for availability checking - Automatic cleanup via context manager

Example

async with GPUResourcePool(max_parallel=3) as pool:

allocation = await pool.acquire(required_gpus=2, experiment_id=1) try:

# Run experiment with allocation.gpu_indices pass

finally:

await pool.release(allocation)

__init__(max_parallel=1)[source]

Initialize GPU resource pool.

Parameters:

max_parallel (int) – Maximum number of concurrent experiments

async acquire(required_gpus, min_memory_mb=8000, experiment_id=None, params=None, timeout=None)[source]

Acquire GPU resources for an experiment.

This method: 1. Waits for available slot if at max_parallel capacity 2. Selects optimal GPUs using availability scoring 3. Returns GPUAllocation object

Parameters:
  • required_gpus (int) – Number of GPUs needed

  • min_memory_mb (int) – Minimum free memory per GPU

  • experiment_id (Optional[int]) – Optional experiment ID for tracking

  • params (Optional[dict]) – Optional parameters for tracking

  • timeout (Optional[float]) – Optional timeout in seconds

Return type:

GPUAllocation

Returns:

GPUAllocation object with selected GPU indices

Raises:
async release(allocation)[source]

Release GPU resources.

Parameters:

allocation (GPUAllocation) – GPUAllocation object from acquire()

Return type:

None

get_status()[source]

Get current pool status.

Return type:

dict

Returns:

Dictionary with pool statistics

async utils.gpu_pool.estimate_and_acquire(pool, task_config, experiment_id=None, params=None, timeout=None)[source]

Helper function to estimate GPU requirements and acquire resources.

This combines estimate_gpu_requirements() from gpu_scheduler with the resource pool acquisition.

Parameters:
  • pool (GPUResourcePool) – GPUResourcePool instance

  • task_config (dict) – Task configuration dictionary

  • experiment_id (Optional[int]) – Optional experiment ID for tracking

  • params (Optional[dict]) – Optional parameters for tracking

  • timeout (Optional[float]) – Optional timeout in seconds

Return type:

GPUAllocation

Returns:

GPUAllocation object

Example

async with GPUResourcePool(max_parallel=3) as pool:
allocation = await estimate_and_acquire(

pool, task_config, experiment_id=1

) try:

# Run experiment pass

finally:

await pool.release(allocation)

Web Application

FastAPI Application

Database Models

Database models for tasks, experiments, and parameter presets.

class web.db.models.TaskStatus[source]

Bases: str, Enum

Task status enum.

PENDING = 'pending'
RUNNING = 'running'
COMPLETED = 'completed'
FAILED = 'failed'
CANCELLED = 'cancelled'
__new__(value)
class web.db.models.Task[source]

Bases: Base

Autotuning task model.

id
task_name
description
status
model_config
base_runtime
runtime_image_tag
parameters
optimization_config
benchmark_config
slo_config
quant_config
parallel_config
clusterbasemodel_config
clusterservingruntime_config
created_clusterbasemodel
created_clusterservingruntime
deployment_mode
task_metadata
total_experiments
successful_experiments
best_experiment_id
created_at
started_at
completed_at
elapsed_time
experiments
best_experiment
to_dict(include_full_config=False)[source]

Convert task to dictionary.

Parameters:

include_full_config – If True, include all configuration details. If False, return summary view (for list endpoints).

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class web.db.models.ExperimentStatus[source]

Bases: str, Enum

Experiment status enum.

PENDING = 'pending'
DEPLOYING = 'deploying'
BENCHMARKING = 'benchmarking'
SUCCESS = 'success'
FAILED = 'failed'
__new__(value)
class web.db.models.Experiment[source]

Bases: Base

Individual experiment (single parameter configuration) model.

id
task_id
experiment_id
parameters
status
error_message
metrics
objective_score
gpu_info
service_name
service_url
created_at
started_at
completed_at
elapsed_time
task
to_dict(include_logs=False)[source]

Convert experiment to dictionary.

Parameters:

include_logs – If True, include benchmark_logs (can be large).

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class web.db.models.ParameterPreset[source]

Bases: Base

Parameter preset model for reusable parameter configurations.

id
name
description
category
runtime
is_system
parameters
preset_metadata
created_at
updated_at
to_dict()[source]

Convert model to dictionary.

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class web.db.models.MessageRole[source]

Bases: str, Enum

Chat message role enum.

USER = 'user'
ASSISTANT = 'assistant'
SYSTEM = 'system'
__new__(value)
class web.db.models.ChatSession[source]

Bases: Base

Agent chat session model.

id
session_id
user_id
title
context_summary
is_active
session_metadata
created_at
updated_at
messages
subscriptions
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class web.db.models.ChatMessage[source]

Bases: Base

Agent chat message model.

id
session_id
role
content
tool_calls
message_metadata
token_count
created_at
session
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class web.db.models.AgentEventSubscription[source]

Bases: Base

Agent event subscription model for auto-triggering analysis.

id
session_id
task_id
event_types
is_active
created_at
expires_at
session
task
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

Schemas