"""
GPU Discovery Utility
Provides functions to discover and select idle GPUs across the Kubernetes cluster.
"""
import subprocess
import logging
import json
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from .gpu_types import ClusterGPUInfo
logger = logging.getLogger(__name__)
[docs]
@dataclass
class NodeGPUSummary:
"""Summary of GPU resources on a node."""
node_name: str
total_gpus: int
allocatable_gpus: int
gpus_with_metrics: List[ClusterGPUInfo]
avg_utilization: float
avg_memory_usage: float
idle_gpu_count: int # GPUs with low utilization
[docs]
def get_cluster_gpu_status() -> List[ClusterGPUInfo]:
"""
Query cluster-wide GPU status using kubectl and nvidia-smi.
Returns:
List of GPUInfo objects for all GPUs in the cluster
"""
try:
# Get local hostname
local_hostname = subprocess.run(["hostname"], capture_output=True, text=True, timeout=2).stdout.strip()
# Query Kubernetes nodes
result = subprocess.run(["kubectl", "get", "nodes", "-o", "json"], capture_output=True, text=True, timeout=10)
if result.returncode != 0:
logger.warning("kubectl command failed")
return []
nodes_data = json.loads(result.stdout)
all_gpus = []
for node in nodes_data.get("items", []):
node_name = node["metadata"]["name"]
capacity = node["status"].get("capacity", {})
allocatable = node["status"].get("allocatable", {})
# Check for NVIDIA GPUs
gpu_capacity = 0
gpu_allocatable = 0
for key in capacity:
if "nvidia.com/gpu" in key:
gpu_capacity = int(capacity[key])
break
for key in allocatable:
if "nvidia.com/gpu" in key:
gpu_allocatable = int(allocatable[key])
break
if gpu_capacity == 0:
continue
# Get GPU metrics
gpu_metrics = {}
# For local node, use direct nvidia-smi
if node_name == local_hostname:
try:
nvidia_result = subprocess.run(
[
"nvidia-smi",
"--query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu,temperature.gpu",
"--format=csv,noheader,nounits",
],
capture_output=True,
text=True,
timeout=5,
)
if nvidia_result.returncode == 0 and nvidia_result.stdout:
for line in nvidia_result.stdout.strip().split("\n"):
if not line:
continue
parts = [p.strip() for p in line.split(",")]
if len(parts) >= 7:
gpu_idx = int(parts[0])
gpu_metrics[gpu_idx] = {
"name": parts[1],
"memory_total_mb": int(parts[2]),
"memory_used_mb": int(parts[3]),
"memory_free_mb": int(parts[4]),
"utilization_percent": int(parts[5]),
"temperature_c": int(parts[6]),
"memory_usage_percent": round(int(parts[3]) / int(parts[2]) * 100, 1),
}
except Exception as e:
logger.error(f"Error getting GPU metrics for local node {node_name}: {e}")
else:
# For remote nodes, find pods with GPU access
try:
pod_result = subprocess.run(
[
"kubectl",
"get",
"pods",
"--all-namespaces",
"--field-selector",
f"spec.nodeName={node_name}",
"-o",
"json",
],
capture_output=True,
text=True,
timeout=5,
)
if pod_result.returncode == 0:
pods_data = json.loads(pod_result.stdout)
# Find a pod with GPU access
target_pod = None
target_namespace = None
for pod in pods_data.get("items", []):
pod_name = pod["metadata"]["name"]
pod_namespace = pod["metadata"]["namespace"]
# Check if pod has GPU resources
containers = pod["spec"].get("containers", [])
for container in containers:
resources = container.get("resources", {})
limits = resources.get("limits", {})
requests = resources.get("requests", {})
if any("nvidia.com/gpu" in key for key in list(limits.keys()) + list(requests.keys())):
target_pod = pod_name
target_namespace = pod_namespace
break
if target_pod:
break
# If found, exec nvidia-smi
if target_pod:
nvidia_result = subprocess.run(
[
"kubectl",
"exec",
"-n",
target_namespace,
target_pod,
"--",
"nvidia-smi",
"--query-gpu=index,name,memory.total,memory.used,memory.free,utilization.gpu,temperature.gpu",
"--format=csv,noheader,nounits",
],
capture_output=True,
text=True,
timeout=5,
)
if nvidia_result.returncode == 0 and nvidia_result.stdout:
for line in nvidia_result.stdout.strip().split("\n"):
if not line:
continue
parts = [p.strip() for p in line.split(",")]
if len(parts) >= 7:
gpu_idx = int(parts[0])
gpu_metrics[gpu_idx] = {
"name": parts[1],
"memory_total_mb": int(parts[2]),
"memory_used_mb": int(parts[3]),
"memory_free_mb": int(parts[4]),
"utilization_percent": int(parts[5]),
"temperature_c": int(parts[6]),
"memory_usage_percent": round(int(parts[3]) / int(parts[2]) * 100, 1),
}
except Exception as e:
logger.warning(f"Could not get GPU metrics for remote node {node_name}: {e}")
# Get GPU type from labels
labels = node["metadata"].get("labels", {})
gpu_type = labels.get("nvidia.com/gpu.product", "Unknown")
# Create GPUInfo objects for each GPU
for i in range(gpu_capacity):
is_allocatable = i < gpu_allocatable
has_metrics = i in gpu_metrics
if has_metrics:
metrics = gpu_metrics[i]
gpu_info = ClusterGPUInfo(
node_name=node_name,
gpu_index=i,
gpu_model=metrics["name"],
memory_total_mb=metrics["memory_total_mb"],
memory_used_mb=metrics["memory_used_mb"],
memory_free_mb=metrics["memory_free_mb"],
utilization_gpu=metrics["utilization_percent"],
has_metrics=True,
allocatable=is_allocatable,
)
else:
# GPU without metrics - create placeholder
gpu_info = ClusterGPUInfo(
node_name=node_name,
gpu_index=i,
gpu_model=gpu_type,
memory_total_mb=0,
memory_used_mb=0,
memory_free_mb=0,
utilization_gpu=0,
has_metrics=False,
allocatable=is_allocatable,
)
all_gpus.append(gpu_info)
return all_gpus
except Exception as e:
logger.error(f"Error querying cluster GPU status: {e}")
return []
[docs]
def get_node_gpu_summaries() -> Dict[str, NodeGPUSummary]:
"""
Get GPU summaries grouped by node.
Returns:
Dictionary mapping node name to NodeGPUSummary
"""
all_gpus = get_cluster_gpu_status()
summaries = {}
# Group by node
nodes = {}
for gpu in all_gpus:
if gpu.node_name not in nodes:
nodes[gpu.node_name] = []
nodes[gpu.node_name].append(gpu)
# Create summaries
for node_name, gpus in nodes.items():
gpus_with_metrics = [g for g in gpus if g.has_metrics]
allocatable_gpus = sum(1 for g in gpus if g.allocatable)
if gpus_with_metrics:
avg_utilization = sum(g.utilization_gpu for g in gpus_with_metrics) / len(gpus_with_metrics)
# Calculate memory usage percentage
avg_memory_usage = sum(
(g.memory_used_mb / g.memory_total_mb * 100) if g.memory_total_mb > 0 else 0 for g in gpus_with_metrics
) / len(gpus_with_metrics)
# Define "idle" as allocatable (not reserved by K8s) AND < 30% utilization AND < 50% memory
idle_count = sum(
1
for g in gpus_with_metrics
if g.allocatable
and g.utilization_gpu < 30
and (g.memory_used_mb / g.memory_total_mb * 100 if g.memory_total_mb > 0 else 0) < 50
)
else:
# No metrics available - assume GPUs are idle if allocatable
# This happens when no GPU pods are running on the node
avg_utilization = 0.0
avg_memory_usage = 0.0
idle_count = allocatable_gpus # All allocatable GPUs are considered idle
summary = NodeGPUSummary(
node_name=node_name,
total_gpus=len(gpus),
allocatable_gpus=allocatable_gpus,
gpus_with_metrics=gpus_with_metrics,
avg_utilization=avg_utilization,
avg_memory_usage=avg_memory_usage,
idle_gpu_count=idle_count,
)
summaries[node_name] = summary
return summaries
[docs]
def find_best_node_for_deployment(
required_gpus: int = 1, utilization_threshold: float = 30.0, memory_threshold: float = 50.0
) -> Optional[str]:
"""
Find the best node for deploying a new inference service.
Selection criteria (in order):
1. Must have enough allocatable GPUs
2. Prefer nodes with idle GPUs (low utilization and memory usage)
3. Among idle nodes, prefer the one with most idle GPUs
4. If no idle nodes, prefer node with lowest average utilization
Args:
required_gpus: Number of GPUs required for deployment
utilization_threshold: GPU utilization % threshold for "idle" (default: 30%)
memory_threshold: Memory usage % threshold for "idle" (default: 50%)
Returns:
Node name to deploy to, or None if no suitable node found
"""
summaries = get_node_gpu_summaries()
if not summaries:
logger.warning("No GPU nodes found in cluster")
return None
# Filter nodes with enough allocatable GPUs
suitable_nodes = [
(name, summary) for name, summary in summaries.items() if summary.allocatable_gpus >= required_gpus
]
if not suitable_nodes:
logger.warning(f"No nodes with {required_gpus} allocatable GPU(s)")
return None
# Sort by selection criteria
def node_priority(item: Tuple[str, NodeGPUSummary]) -> Tuple[int, int, float]:
name, summary = item
# Return tuple: (has_idle_gpus, idle_count, -avg_utilization)
# Python sorts tuples lexicographically, so this prioritizes:
# 1. Nodes with idle GPUs (higher priority)
# 2. More idle GPUs (higher count)
# 3. Lower utilization (negated for ascending order)
has_idle = 1 if summary.idle_gpu_count >= required_gpus else 0
return (-has_idle, -summary.idle_gpu_count, summary.avg_utilization)
# Sort and select best node
suitable_nodes.sort(key=node_priority)
best_node_name, best_summary = suitable_nodes[0]
logger.info(f"Selected node '{best_node_name}' for deployment:")
print(f" - Allocatable GPUs: {best_summary.allocatable_gpus}/{best_summary.total_gpus}")
print(f" - Idle GPUs: {best_summary.idle_gpu_count}")
print(f" - Avg Utilization: {best_summary.avg_utilization:.1f}%")
print(f" - Avg Memory Usage: {best_summary.avg_memory_usage:.1f}%")
return best_node_name