"""
Utility functions and classes for parameter optimization.
"""
import itertools
import math
import random
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Tuple
import optuna
[docs]
def generate_parameter_grid(parameter_spec: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate all parameter combinations for grid search.
Supports two formats:
1. Simple format: {"param_name": [value1, value2]}
- Direct list of values for each parameter
2. Structured format: {"param_name": {"type": "choice", "values": [value1, value2]}}
- Legacy format with explicit type specification
Args:
parameter_spec: Dict mapping parameter names to their values or specifications
Simple: {"tp-size": [1, 2], "mem-fraction-static": [0.7, 0.8]}
Structured: {"tp_size": {"type": "choice", "values": [1, 2]}}
Returns:
List of parameter dictionaries, one for each combination
"""
param_names = []
param_values = []
for param_name, spec in parameter_spec.items():
# Check if it's the simple format (direct list) or structured format (dict with type/values)
if isinstance(spec, list):
# Simple format: direct list of values
# Skip empty lists (no values to search)
if len(spec) == 0:
continue
param_names.append(param_name)
param_values.append(spec)
elif isinstance(spec, dict) and "type" in spec:
# Structured format: legacy support
if spec["type"] == "choice":
# Skip empty value lists
values = spec["values"]
if len(values) == 0:
continue
param_names.append(param_name)
param_values.append(values)
else:
raise ValueError(f"Unsupported parameter type: {spec['type']}")
else:
raise ValueError(
f"Invalid parameter specification for '{param_name}'. "
f"Expected list of values or dict with 'type' and 'values' keys."
)
# Generate Cartesian product
combinations = list(itertools.product(*param_values))
# Convert to list of dicts
grid = []
for combo in combinations:
param_dict = dict(zip(param_names, combo))
grid.append(param_dict)
return grid
[docs]
def calculate_slo_penalty(
metrics: Dict[str, Any],
slo_config: Optional[Dict[str, Any]] = None
) -> Tuple[float, bool, Dict[str, Any]]:
"""Calculate SLO penalty with exponential curve near boundaries.
Implements tiered enforcement:
- Minor violations: exponential penalty only
- Severe violations: mark as hard failure
Args:
metrics: Benchmark metrics dictionary
slo_config: SLO configuration from task JSON
Format: {
"latency": {
"p50": {"threshold": 2.0, "weight": 1.0, "hard_fail": false},
"p90": {"threshold": 5.0, "weight": 2.0, "hard_fail": true, "fail_ratio": 0.2}
},
"ttft": {"threshold": 1.0, "weight": 2.0, "hard_fail": false},
"steepness": 0.1 # Controls exponential slope (lower = steeper)
}
Returns:
Tuple of (total_penalty_multiplier, is_hard_failure, violation_details)
- penalty_multiplier: Value to multiply base score by (1.0 = no penalty)
- is_hard_failure: True if experiment should be marked as failed
- violation_details: Dict with per-metric violation info
"""
# No SLO configuration means no penalties
if not slo_config or not metrics:
return 1.0, False, {}
# Default steepness parameter (lower = steeper penalty curve)
steepness = slo_config.get("steepness", 0.1)
total_penalty = 0.0
is_hard_failure = False
violation_details = {}
# Process latency SLOs (P50, P90, P99)
latency_slo = slo_config.get("latency", {})
for percentile in ["p50", "p90", "p99"]:
if percentile not in latency_slo:
continue
slo_spec = latency_slo[percentile]
threshold = slo_spec.get("threshold")
weight = slo_spec.get("weight", 1.0)
hard_fail = slo_spec.get("hard_fail", False)
fail_ratio = slo_spec.get("fail_ratio", 0.5) # Default: fail if >50% over
if threshold is None:
continue
# Get actual metric value
metric_key = f"{percentile}_e2e_latency"
actual_value = metrics.get(metric_key)
if actual_value is None:
continue
# Calculate violation ratio (normalized)
if actual_value > threshold:
violation_ratio = (actual_value - threshold) / threshold
# Check for hard failure condition
if hard_fail and violation_ratio > fail_ratio:
is_hard_failure = True
violation_details[percentile] = {
"threshold": threshold,
"actual": actual_value,
"violation_ratio": violation_ratio,
"severity": "HARD_FAIL"
}
else:
# Calculate exponential penalty
# penalty = weight × exp(violation_ratio / steepness)
# As violation_ratio increases, penalty grows exponentially
penalty = weight * math.exp(violation_ratio / steepness)
total_penalty += penalty
severity = "SEVERE" if violation_ratio > 0.2 else "MINOR"
violation_details[percentile] = {
"threshold": threshold,
"actual": actual_value,
"violation_ratio": violation_ratio,
"penalty": penalty,
"severity": severity
}
# Process TTFT SLO
ttft_slo = slo_config.get("ttft", {})
if ttft_slo:
threshold = ttft_slo.get("threshold")
weight = ttft_slo.get("weight", 1.0)
hard_fail = ttft_slo.get("hard_fail", False)
fail_ratio = ttft_slo.get("fail_ratio", 0.5)
if threshold is not None:
actual_value = metrics.get("mean_ttft")
if actual_value is not None and actual_value > threshold:
violation_ratio = (actual_value - threshold) / threshold
if hard_fail and violation_ratio > fail_ratio:
is_hard_failure = True
violation_details["ttft"] = {
"threshold": threshold,
"actual": actual_value,
"violation_ratio": violation_ratio,
"severity": "HARD_FAIL"
}
else:
penalty = weight * math.exp(violation_ratio / steepness)
total_penalty += penalty
severity = "SEVERE" if violation_ratio > 0.2 else "MINOR"
violation_details["ttft"] = {
"threshold": threshold,
"actual": actual_value,
"violation_ratio": violation_ratio,
"penalty": penalty,
"severity": severity
}
# Process TPOT SLO
tpot_slo = slo_config.get("tpot", {})
if tpot_slo:
threshold = tpot_slo.get("threshold")
weight = tpot_slo.get("weight", 1.0)
hard_fail = tpot_slo.get("hard_fail", False)
fail_ratio = tpot_slo.get("fail_ratio", 0.5)
if threshold is not None:
actual_value = metrics.get("mean_tpot")
if actual_value is not None and actual_value > threshold:
violation_ratio = (actual_value - threshold) / threshold
if hard_fail and violation_ratio > fail_ratio:
is_hard_failure = True
violation_details["tpot"] = {
"threshold": threshold,
"actual": actual_value,
"violation_ratio": violation_ratio,
"severity": "HARD_FAIL"
}
else:
penalty = weight * math.exp(violation_ratio / steepness)
total_penalty += penalty
severity = "SEVERE" if violation_ratio > 0.2 else "MINOR"
violation_details["tpot"] = {
"threshold": threshold,
"actual": actual_value,
"violation_ratio": violation_ratio,
"penalty": penalty,
"severity": severity
}
# Calculate final penalty multiplier
# penalty_multiplier > 1.0 means the score gets worse
penalty_multiplier = 1.0 + total_penalty
return penalty_multiplier, is_hard_failure, violation_details
[docs]
def check_batch_slo_compliance(batch_metrics: Dict[str, Any], slo_config: Optional[Dict[str, Any]] = None) -> Tuple[bool, Dict[str, Any]]:
"""Check if a single batch (concurrency level) meets SLO requirements.
This is used to filter out batches that violate SLO constraints before aggregation.
Args:
batch_metrics: Single batch metrics from genai-bench (one concurrency level)
slo_config: SLO configuration from task JSON
Returns:
Tuple of (is_compliant, violation_details)
- is_compliant: True if batch meets all SLO requirements
- violation_details: Dict with violation information for logging
"""
# No SLO means all batches are compliant
if not slo_config or not batch_metrics:
return True, {}
violation_details = {}
is_compliant = True
# Extract metrics from batch (genai-bench structure: {"stats": {...}})
stats = batch_metrics.get("stats", {})
if not stats:
return True, {} # No stats means we can't check, assume compliant
# Check latency SLOs (P50, P90, P99)
latency_slo = slo_config.get("latency", {})
for percentile in ["p50", "p90", "p99"]:
if percentile not in latency_slo:
continue
slo_spec = latency_slo[percentile]
threshold = slo_spec.get("threshold")
hard_fail = slo_spec.get("hard_fail", False)
fail_ratio = slo_spec.get("fail_ratio", 0.5)
if threshold is None:
continue
# Get actual value from batch stats
e2e_latency_stats = stats.get("e2e_latency", {})
actual_value = e2e_latency_stats.get(percentile)
if actual_value is None:
continue
if actual_value > threshold:
violation_ratio = (actual_value - threshold) / threshold
# For SLO compliance check, we only care about hard_fail violations
# Soft violations are OK - they just get penalties later
if hard_fail and violation_ratio > fail_ratio:
is_compliant = False
violation_details[percentile] = {
"threshold": threshold,
"actual": actual_value,
"violation_ratio": violation_ratio,
"type": "HARD_FAIL"
}
else:
# Soft violation - still compliant, but log it
violation_details[percentile] = {
"threshold": threshold,
"actual": actual_value,
"violation_ratio": violation_ratio,
"type": "SOFT_VIOLATION"
}
# Check TTFT SLO
ttft_slo = slo_config.get("ttft", {})
if ttft_slo:
threshold = ttft_slo.get("threshold")
hard_fail = ttft_slo.get("hard_fail", False)
fail_ratio = ttft_slo.get("fail_ratio", 0.5)
if threshold is not None:
ttft_stats = stats.get("ttft", {})
actual_value = ttft_stats.get("mean")
if actual_value is not None and actual_value > threshold:
violation_ratio = (actual_value - threshold) / threshold
if hard_fail and violation_ratio > fail_ratio:
is_compliant = False
violation_details["ttft"] = {
"threshold": threshold,
"actual": actual_value,
"violation_ratio": violation_ratio,
"type": "HARD_FAIL"
}
else:
violation_details["ttft"] = {
"threshold": threshold,
"actual": actual_value,
"violation_ratio": violation_ratio,
"type": "SOFT_VIOLATION"
}
# Check TPOT SLO
tpot_slo = slo_config.get("tpot", {})
if tpot_slo:
threshold = tpot_slo.get("threshold")
hard_fail = tpot_slo.get("hard_fail", False)
fail_ratio = tpot_slo.get("fail_ratio", 0.5)
if threshold is not None:
tpot_stats = stats.get("tpot", {})
actual_value = tpot_stats.get("mean")
if actual_value is not None and actual_value > threshold:
violation_ratio = (actual_value - threshold) / threshold
if hard_fail and violation_ratio > fail_ratio:
is_compliant = False
violation_details["tpot"] = {
"threshold": threshold,
"actual": actual_value,
"violation_ratio": violation_ratio,
"type": "HARD_FAIL"
}
else:
violation_details["tpot"] = {
"threshold": threshold,
"actual": actual_value,
"violation_ratio": violation_ratio,
"type": "SOFT_VIOLATION"
}
return is_compliant, violation_details
[docs]
def calculate_objective_score(results: Dict[str, Any], objective: str = "minimize_latency", slo_config: Optional[Dict[str, Any]] = None) -> float:
"""Calculate objective score from benchmark results with optional SLO penalties.
Args:
results: Benchmark results dictionary from DirectBenchmarkController._parse_results()
objective: Optimization objective - 'minimize_latency' or 'maximize_throughput'
slo_config: Optional SLO configuration for penalty calculation
Returns:
Objective score with SLO penalties applied (lower is better for minimization, higher for maximization)
Note: For hard SLO violations, returns worst possible score (inf or -inf)
"""
if not results:
print("[Optimizer] No results provided, returning worst score")
return float("inf") if "minimize" in objective else float("-inf")
# Calculate base score based on objective
try:
if objective == "minimize_latency":
# Use mean E2E latency as primary metric (in seconds)
# Fallback to P50 if mean not available
base_score = results.get("mean_e2e_latency", results.get("p50_e2e_latency"))
if base_score is None:
print(f"[Optimizer] Warning: No latency metrics found in results")
print(f"[Optimizer] Available keys: {list(results.keys())}")
return float("inf")
elif objective == "maximize_throughput":
# Use mean total throughput per GPU (tokens/s/GPU) as primary metric
# This allows fair comparison across different GPU counts
# Fallback to total throughput if per-GPU metrics not available
throughput = results.get("mean_total_throughput_per_gpu")
if throughput is None:
# Fallback to absolute throughput (for backward compatibility)
throughput = results.get(
"mean_total_throughput", results.get("mean_output_throughput", results.get("max_total_throughput"))
)
if throughput is not None:
print(f"[Optimizer] Warning: Using absolute throughput (per-GPU metrics not available)")
if throughput is None or throughput == 0:
print(f"[Optimizer] Warning: No throughput metrics found in results")
print(f"[Optimizer] Available keys: {list(results.keys())}")
return float("-inf")
# Negate for minimization (optimizer looks for minimum score)
base_score = -throughput
elif objective == "minimize_ttft":
# Time to First Token optimization
base_score = results.get("mean_ttft")
if base_score is None:
print(f"[Optimizer] Warning: No TTFT metrics found in results")
return float("inf")
elif objective == "minimize_tpot":
# Time Per Output Token optimization
base_score = results.get("mean_tpot")
if base_score is None:
print(f"[Optimizer] Warning: No TPOT metrics found in results")
return float("inf")
else:
raise ValueError(f"Unsupported objective: {objective}")
# Apply SLO penalties if configured
if slo_config:
penalty_multiplier, is_hard_failure, violation_details = calculate_slo_penalty(results, slo_config)
# Hard failure: return worst possible score
if is_hard_failure:
print(f"[Optimizer] HARD SLO FAILURE detected:")
for metric, details in violation_details.items():
if details.get("severity") == "HARD_FAIL":
print(f" {metric}: {details['actual']:.4f} >> {details['threshold']:.4f} (violation: {details['violation_ratio']*100:.1f}%)")
return float("inf") if "minimize" in objective else float("-inf")
# NOTE: Batch-level SLO filtering already handles hard failures and filtering.
# No additional penalty should be applied here for SUCCESS experiments.
# This section is kept for backward compatibility but should not trigger
# since batch-level filtering removes non-compliant data.
# Soft penalties: ADD to base score (not multiply, to handle negative scores correctly)
if penalty_multiplier > 1.0:
# Convert penalty_multiplier (1.0 + penalty) to addition value
penalty_value = (penalty_multiplier - 1.0) * abs(base_score)
# For minimize objectives (positive base_score), add penalty makes it worse (larger)
# For maximize objectives (negative base_score), add penalty makes it worse (less negative)
if "maximize" in objective:
# base_score is negative (e.g., -5000), adding positive penalty makes it less negative (worse)
final_score = base_score + penalty_value
else:
# base_score is positive, adding penalty makes it larger (worse)
final_score = base_score + penalty_value
print(f"[Optimizer] Base score: {base_score:.4f}, SLO penalty: +{penalty_value:.2f}, Final: {final_score:.4f}")
if violation_details:
print(f"[Optimizer] SLO violations detected:")
for metric, details in violation_details.items():
print(f" {metric}: {details['actual']:.4f} > {details['threshold']:.4f} "
f"(+{details['violation_ratio']*100:.1f}%, penalty: +{details['penalty']:.2f}, severity: {details['severity']})")
return final_score
else:
print(f"[Optimizer] Score: {base_score:.4f} (no SLO violations)")
return base_score
else:
# No SLO config, return base score
print(f"[Optimizer] Score: {base_score:.4f} (lower is better)")
return base_score
except Exception as e:
print(f"[Optimizer] Error calculating objective score: {e}")
return float("inf") if "minimize" in objective else float("-inf")
# ============================================================================
# Optimization Strategy Abstraction
# ============================================================================
[docs]
class OptimizationStrategy(ABC):
"""Abstract base class for optimization strategies."""
[docs]
def __init__(self, parameter_spec: Dict[str, Any], objective: str = "minimize_latency"):
"""Initialize optimization strategy.
Args:
parameter_spec: Parameter specification dictionary
objective: Optimization objective (minimize_latency, maximize_throughput, etc.)
"""
self.parameter_spec = parameter_spec
self.objective = objective
self.history: List[Dict[str, Any]] = []
[docs]
@abstractmethod
def suggest_parameters(self) -> Optional[Dict[str, Any]]:
"""Suggest next parameter configuration to try.
Returns:
Dictionary of parameter values, or None if strategy is done
"""
pass
[docs]
@abstractmethod
def tell_result(self, parameters: Dict[str, Any], objective_score: float, metrics: Dict[str, Any]):
"""Update strategy with experiment result.
Args:
parameters: Parameter configuration that was tested
objective_score: Objective score from calculate_objective_score()
metrics: Full metrics dictionary from benchmark
"""
pass
[docs]
def should_stop(self) -> bool:
"""Check if optimization should stop early.
Returns:
True if strategy has converged or no more suggestions
"""
return False
[docs]
def get_state(self) -> Dict[str, Any]:
"""Serialize strategy state for checkpoint.
Returns:
Dictionary containing strategy state
"""
return {
"parameter_spec": self.parameter_spec,
"objective": self.objective,
"history": self.history,
}
[docs]
@classmethod
def from_state(cls, state: Dict[str, Any]) -> "OptimizationStrategy":
"""Restore strategy from serialized state.
Args:
state: Dictionary containing strategy state
Returns:
Restored strategy instance
"""
# Base implementation - subclasses should override
raise NotImplementedError("Subclass must implement from_state()")
[docs]
class GridSearchStrategy(OptimizationStrategy):
"""Grid search optimization - exhaustive evaluation of all combinations."""
[docs]
def __init__(self, parameter_spec: Dict[str, Any], objective: str = "minimize_latency", max_iterations: Optional[int] = None):
"""Initialize grid search strategy.
Args:
parameter_spec: Parameter specification dictionary
objective: Optimization objective
max_iterations: Maximum number of iterations (limits grid size)
"""
super().__init__(parameter_spec, objective)
self.param_grid = generate_parameter_grid(parameter_spec)
# Limit grid size if max_iterations specified
if max_iterations is not None and max_iterations < len(self.param_grid):
print(f"[GridSearch] Limiting grid from {len(self.param_grid)} to {max_iterations} combinations")
self.param_grid = self.param_grid[:max_iterations]
self.current_index = 0
print(f"[GridSearch] Initialized with {len(self.param_grid)} parameter combinations")
[docs]
def suggest_parameters(self) -> Optional[Dict[str, Any]]:
"""Get next parameter combination from grid."""
if self.current_index >= len(self.param_grid):
return None
params = self.param_grid[self.current_index]
self.current_index += 1
print(f"[GridSearch] Suggesting combination {self.current_index}/{len(self.param_grid)}: {params}")
return params
[docs]
def tell_result(self, parameters: Dict[str, Any], objective_score: float, metrics: Dict[str, Any]):
"""Record result (grid search doesn't adapt)."""
self.history.append({
"parameters": parameters,
"objective_score": objective_score,
"metrics": metrics
})
print(f"[GridSearch] Recorded result: score={objective_score:.4f}")
[docs]
def should_stop(self) -> bool:
"""Stop when all combinations evaluated."""
return self.current_index >= len(self.param_grid)
[docs]
def get_state(self) -> Dict[str, Any]:
"""Serialize GridSearch state for checkpoint."""
base_state = super().get_state()
base_state.update({
"strategy_class": "GridSearchStrategy",
"current_index": self.current_index,
"param_grid": self.param_grid,
})
return base_state
[docs]
@classmethod
def from_state(cls, state: Dict[str, Any]) -> "GridSearchStrategy":
"""Restore GridSearch from serialized state."""
# Create instance with basic parameters
strategy = cls(
parameter_spec=state["parameter_spec"],
objective=state["objective"],
max_iterations=None # Already limited in param_grid
)
# Restore state
strategy.current_index = state["current_index"]
strategy.param_grid = state["param_grid"]
strategy.history = state.get("history", [])
return strategy
[docs]
class BayesianStrategy(OptimizationStrategy):
"""Bayesian optimization using Optuna's TPE sampler."""
[docs]
def __init__(
self,
parameter_spec: Dict[str, Any],
objective: str = "minimize_latency",
max_iterations: int = 100,
n_initial_random: int = 5,
study_name: Optional[str] = None,
storage: Optional[str] = None
):
"""Initialize Bayesian optimization strategy.
Args:
parameter_spec: Parameter specification dictionary
objective: Optimization objective
max_iterations: Maximum number of trials
n_initial_random: Number of random trials before Bayesian optimization
study_name: Optional name for Optuna study
storage: Optional Optuna storage URL (e.g., sqlite:///optuna.db)
"""
super().__init__(parameter_spec, objective)
self.max_iterations = max_iterations
self.n_initial_random = n_initial_random
self.trial_count = 0
# Parse parameter specification into Optuna search space
self.search_space = self._parse_search_space(parameter_spec)
# Track tried parameter combinations to avoid duplicates
self.tried_params = set()
# Create Optuna study
direction = "minimize" # All objectives use minimize (throughput is negated)
sampler = optuna.samplers.TPESampler(n_startup_trials=n_initial_random)
self.study = optuna.create_study(
direction=direction,
sampler=sampler,
study_name=study_name or f"autotuner_{objective}",
storage=storage,
load_if_exists=True # Resume if study exists
)
print(f"[Bayesian] Initialized with {len(self.search_space)} parameters")
print(f"[Bayesian] Max iterations: {max_iterations}, Initial random: {n_initial_random}")
print(f"[Bayesian] Search space: {list(self.search_space.keys())}")
def _parse_search_space(self, parameter_spec: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""Parse parameter specification into Optuna search space.
Supports:
- Simple format: {"param": [val1, val2]} → categorical
- Explicit format: {"param": {"type": "categorical", "values": [...]}}
- Continuous: {"param": {"type": "continuous", "low": 0.5, "high": 1.0}}
- Integer: {"param": {"type": "integer", "low": 1, "high": 10}}
Returns:
Dict mapping parameter names to Optuna space definitions
"""
search_space = {}
for param_name, spec in parameter_spec.items():
if isinstance(spec, list):
# Simple format: treat as categorical
# Skip empty lists (no values to search)
if len(spec) == 0:
continue
search_space[param_name] = {
"type": "categorical",
"choices": spec
}
elif isinstance(spec, dict):
param_type = spec.get("type", "categorical")
if param_type in ["choice", "categorical"]:
# Categorical parameter
values = spec.get("values", spec.get("choices", []))
# Skip empty value lists
if len(values) == 0:
continue
search_space[param_name] = {
"type": "categorical",
"choices": values
}
elif param_type == "continuous":
# Continuous (float) parameter
search_space[param_name] = {
"type": "continuous",
"low": spec["low"],
"high": spec["high"],
"log": spec.get("log", False) # Log scale for parameters like learning rate
}
elif param_type == "integer":
# Integer parameter
search_space[param_name] = {
"type": "integer",
"low": spec["low"],
"high": spec["high"],
"log": spec.get("log", False)
}
else:
raise ValueError(f"Unsupported parameter type: {param_type}")
else:
raise ValueError(f"Invalid parameter specification for '{param_name}'")
return search_space
[docs]
def suggest_parameters(self) -> Optional[Dict[str, Any]]:
"""Suggest next parameter configuration using Optuna.
Ensures no duplicate parameter combinations are tried by adding
random perturbation if sampler suggests a duplicate.
"""
if self.trial_count >= self.max_iterations:
print(f"[Bayesian] Reached max iterations ({self.max_iterations})")
return None
# Create Optuna trial - only called ONCE
trial = self.study.ask()
self.current_trial = trial
# Extract parameter suggestions from the trial
params = {}
for param_name, space_def in self.search_space.items():
if space_def["type"] == "categorical":
params[param_name] = trial.suggest_categorical(param_name, space_def["choices"])
elif space_def["type"] == "continuous":
params[param_name] = trial.suggest_float(
param_name,
space_def["low"],
space_def["high"],
log=space_def.get("log", False)
)
elif space_def["type"] == "integer":
params[param_name] = trial.suggest_int(
param_name,
space_def["low"],
space_def["high"],
log=space_def.get("log", False)
)
# Create hashable representation for duplicate checking
params_tuple = tuple(sorted(params.items()))
# Check if these parameters have been tried before
if params_tuple not in self.tried_params:
# New parameter combination - use it
self.tried_params.add(params_tuple)
self.trial_count += 1
print(f"[Bayesian] Trial {self.trial_count}/{self.max_iterations}: {params}")
return params
# Duplicate detected - try perturbations
print(f"[Bayesian] Duplicate detected: {params}")
# Try up to 10 perturbations to find non-duplicate
max_attempts = 10
for attempt in range(max_attempts):
# Apply perturbation based on parameter type
perturbed_params = self._perturb_parameters(params)
perturbed_tuple = tuple(sorted(perturbed_params.items()))
if perturbed_tuple not in self.tried_params:
# Use perturbed parameters
self.tried_params.add(perturbed_tuple)
self.trial_count += 1
print(f"[Bayesian] Using perturbed params (attempt {attempt + 1}): {perturbed_params}")
return perturbed_params
# After max_attempts, use the original suggestion even if duplicate
# Better to run a duplicate than to stop optimization early
print(f"[Bayesian] Could not find non-duplicate after {max_attempts} perturbation attempts")
print(f"[Bayesian] Using sampler's suggestion anyway: {params}")
params_tuple = tuple(sorted(params.items()))
self.tried_params.add(params_tuple)
self.trial_count += 1
return params
def _perturb_parameters(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Add random perturbation to parameters to avoid exact duplicates.
Args:
params: Original parameter dictionary
Returns:
New parameter dictionary with random perturbation
"""
perturbed = params.copy()
# Choose a random parameter to perturb
param_names = list(self.search_space.keys())
if not param_names:
return perturbed
param_to_perturb = random.choice(param_names)
space_def = self.search_space[param_to_perturb]
if space_def["type"] == "categorical":
# For categorical, choose a different value if possible
choices = space_def["choices"]
if len(choices) > 1:
current_value = perturbed[param_to_perturb]
other_choices = [c for c in choices if c != current_value]
if other_choices:
perturbed[param_to_perturb] = random.choice(other_choices)
elif space_def["type"] == "continuous":
# For continuous, add small random noise (1-5% of range)
low, high = space_def["low"], space_def["high"]
range_size = high - low
noise = random.uniform(0.01, 0.05) * range_size * random.choice([-1, 1])
new_value = perturbed[param_to_perturb] + noise
# Clamp to valid range
perturbed[param_to_perturb] = max(low, min(high, new_value))
elif space_def["type"] == "integer":
# For integer, add ±1 or ±2
low, high = space_def["low"], space_def["high"]
current_value = perturbed[param_to_perturb]
delta = random.choice([-2, -1, 1, 2])
new_value = current_value + delta
# Clamp to valid range
perturbed[param_to_perturb] = max(low, min(high, new_value))
return perturbed
[docs]
def tell_result(self, parameters: Dict[str, Any], objective_score: float, metrics: Dict[str, Any]):
"""Update Optuna study with experiment result."""
# Tell Optuna the result
self.study.tell(self.current_trial, objective_score)
# Record in history
self.history.append({
"parameters": parameters,
"objective_score": objective_score,
"metrics": metrics
})
# Print progress
best_score = self.study.best_value
best_params = self.study.best_params
print(f"[Bayesian] Trial complete: score={objective_score:.4f}")
print(f"[Bayesian] Best so far: score={best_score:.4f}, params={best_params}")
[docs]
def should_stop(self) -> bool:
"""Check if Bayesian optimization should stop."""
# Stop if max iterations reached
if self.trial_count >= self.max_iterations:
return True
# Optional: Add convergence detection
# e.g., no improvement in last N trials
# For now, just use max_iterations
return False
[docs]
def get_best_params(self) -> Dict[str, Any]:
"""Get best parameters found so far."""
return self.study.best_params
[docs]
def get_best_score(self) -> float:
"""Get best objective score found so far."""
return self.study.best_value
[docs]
def get_state(self) -> Dict[str, Any]:
"""Serialize Bayesian state for checkpoint."""
base_state = super().get_state()
base_state.update({
"strategy_class": "BayesianStrategy",
"trial_count": self.trial_count,
"max_iterations": self.max_iterations,
"n_initial_random": self.n_initial_random,
"tried_params": [list(p) for p in self.tried_params], # Convert tuples to lists for JSON serialization
})
return base_state
[docs]
@classmethod
def from_state(cls, state: Dict[str, Any]) -> "BayesianStrategy":
"""Restore Bayesian from serialized state."""
# Create instance (Optuna study will be restored from history)
strategy = cls(
parameter_spec=state["parameter_spec"],
objective=state["objective"],
max_iterations=state["max_iterations"],
n_initial_random=state.get("n_initial_random", 5),
)
# Restore trial count
strategy.trial_count = state["trial_count"]
strategy.history = state.get("history", [])
# Restore tried_params set
tried_params_list = state.get("tried_params", [])
strategy.tried_params = {tuple(p) for p in tried_params_list}
# Re-populate Optuna study with history
for entry in strategy.history:
if entry.get("objective_score") is not None:
# Create a completed trial from history
trial = strategy.study.ask()
# Set parameters from history
for param_name, param_value in entry["parameters"].items():
if param_name in strategy.search_space:
space_def = strategy.search_space[param_name]
if space_def["type"] == "categorical":
trial.suggest_categorical(param_name, space_def["choices"])
elif space_def["type"] == "continuous":
trial.suggest_float(param_name, space_def["low"], space_def["high"])
elif space_def["type"] == "integer":
trial.suggest_int(param_name, space_def["low"], space_def["high"])
strategy.study.tell(trial, entry["objective_score"])
return strategy
[docs]
class RandomSearchStrategy(OptimizationStrategy):
"""Random search - random sampling from parameter space."""
[docs]
def __init__(
self,
parameter_spec: Dict[str, Any],
objective: str = "minimize_latency",
max_iterations: int = 100,
seed: Optional[int] = None
):
"""Initialize random search strategy.
Args:
parameter_spec: Parameter specification dictionary
objective: Optimization objective
max_iterations: Maximum number of random samples
seed: Random seed for reproducibility
"""
super().__init__(parameter_spec, objective)
self.max_iterations = max_iterations
self.trial_count = 0
# Use Optuna's RandomSampler for convenience
import random
if seed is not None:
random.seed(seed)
sampler = optuna.samplers.RandomSampler(seed=seed)
self.study = optuna.create_study(
direction="minimize",
sampler=sampler,
study_name=f"autotuner_random_{objective}"
)
# Parse search space (reuse BayesianStrategy's parser)
bayesian_helper = BayesianStrategy(parameter_spec, objective, max_iterations=1)
self.search_space = bayesian_helper.search_space
print(f"[Random] Initialized with {max_iterations} random samples")
[docs]
def suggest_parameters(self) -> Optional[Dict[str, Any]]:
"""Suggest random parameter configuration."""
if self.trial_count >= self.max_iterations:
return None
# Create Optuna trial with random sampler
trial = self.study.ask()
self.current_trial = trial
self.trial_count += 1
# Extract parameters
params = {}
for param_name, space_def in self.search_space.items():
if space_def["type"] == "categorical":
params[param_name] = trial.suggest_categorical(param_name, space_def["choices"])
elif space_def["type"] == "continuous":
params[param_name] = trial.suggest_float(
param_name,
space_def["low"],
space_def["high"],
log=space_def.get("log", False)
)
elif space_def["type"] == "integer":
params[param_name] = trial.suggest_int(
param_name,
space_def["low"],
space_def["high"],
log=space_def.get("log", False)
)
print(f"[Random] Sample {self.trial_count}/{self.max_iterations}: {params}")
return params
[docs]
def tell_result(self, parameters: Dict[str, Any], objective_score: float, metrics: Dict[str, Any]):
"""Record result."""
self.study.tell(self.current_trial, objective_score)
self.history.append({
"parameters": parameters,
"objective_score": objective_score,
"metrics": metrics
})
[docs]
def should_stop(self) -> bool:
"""Stop after max iterations."""
return self.trial_count >= self.max_iterations
[docs]
def get_state(self) -> Dict[str, Any]:
"""Serialize Random state for checkpoint."""
base_state = super().get_state()
base_state.update({
"strategy_class": "RandomSearchStrategy",
"trial_count": self.trial_count,
"max_iterations": self.max_iterations,
})
return base_state
[docs]
@classmethod
def from_state(cls, state: Dict[str, Any]) -> "RandomSearchStrategy":
"""Restore Random from serialized state."""
strategy = cls(
parameter_spec=state["parameter_spec"],
objective=state["objective"],
max_iterations=state["max_iterations"],
)
strategy.trial_count = state["trial_count"]
strategy.history = state.get("history", [])
return strategy
# ============================================================================
# Strategy Factory
# ============================================================================
[docs]
def create_optimization_strategy(
optimization_config: Dict[str, Any],
parameter_spec: Dict[str, Any]
) -> OptimizationStrategy:
"""Factory function to create optimization strategy.
Args:
optimization_config: Optimization configuration from task
{"strategy": "grid_search", "objective": "minimize_latency", ...}
parameter_spec: Parameter specification dictionary
Returns:
OptimizationStrategy instance
"""
strategy_name = optimization_config.get("strategy", "grid_search")
objective = optimization_config.get("objective", "minimize_latency")
max_iterations = optimization_config.get("max_iterations", 100)
if strategy_name == "grid_search":
return GridSearchStrategy(
parameter_spec=parameter_spec,
objective=objective,
max_iterations=max_iterations
)
elif strategy_name == "bayesian":
n_initial_random = optimization_config.get("n_initial_random", 5)
study_name = optimization_config.get("study_name")
storage = optimization_config.get("storage") # e.g., "sqlite:///optuna.db"
return BayesianStrategy(
parameter_spec=parameter_spec,
objective=objective,
max_iterations=max_iterations,
n_initial_random=n_initial_random,
study_name=study_name,
storage=storage
)
elif strategy_name == "random":
seed = optimization_config.get("seed")
return RandomSearchStrategy(
parameter_spec=parameter_spec,
objective=objective,
max_iterations=max_iterations,
seed=seed
)
else:
raise ValueError(
f"Unsupported optimization strategy: {strategy_name}. "
f"Supported: grid_search, bayesian, random"
)
[docs]
def restore_optimization_strategy(state: Dict[str, Any]) -> OptimizationStrategy:
"""Restore optimization strategy from serialized state.
Args:
state: Serialized strategy state (from strategy.get_state())
Returns:
Restored OptimizationStrategy instance
"""
strategy_class = state.get("strategy_class")
if strategy_class == "GridSearchStrategy":
return GridSearchStrategy.from_state(state)
elif strategy_class == "BayesianStrategy":
return BayesianStrategy.from_state(state)
elif strategy_class == "RandomSearchStrategy":
return RandomSearchStrategy.from_state(state)
else:
raise ValueError(f"Unknown strategy class: {strategy_class}")