Source code for utils.gpu_monitor

"""
GPU Monitoring and Management Utilities

Provides comprehensive GPU tracking, allocation, and monitoring capabilities
for the LLM Autotuner.
"""

import subprocess
import time
import logging
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, asdict
from datetime import datetime

from .gpu_types import LocalGPUInfo

logger = logging.getLogger(__name__)


[docs] @dataclass class GPUSnapshot: """Snapshot of all GPUs at a point in time.""" timestamp: datetime gpus: List[LocalGPUInfo] total_gpus: int available_gpus: int
[docs] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "timestamp": self.timestamp.isoformat(), "gpus": [gpu.to_dict() for gpu in self.gpus], "total_gpus": self.total_gpus, "available_gpus": self.available_gpus }
[docs] class GPUMonitor: """Monitor and manage GPU resources."""
[docs] def __init__(self): self._cache: Optional[GPUSnapshot] = None self._cache_ttl = 2.0 # Cache for 2 seconds self._last_update = 0.0
[docs] def is_available(self) -> bool: """Check if nvidia-smi is available.""" try: result = subprocess.run( ["nvidia-smi", "--version"], capture_output=True, timeout=2 ) return result.returncode == 0 except (FileNotFoundError, subprocess.TimeoutExpired): return False
[docs] def get_gpu_count(self) -> int: """Get total number of GPUs.""" if not self.is_available(): return 0 try: result = subprocess.run( ["nvidia-smi", "--list-gpus"], capture_output=True, text=True, timeout=2 ) if result.returncode == 0: return len([line for line in result.stdout.strip().split("\n") if line]) return 0 except Exception as e: logger.error(f"Failed to get GPU count: {e}") return 0
[docs] def query_gpus(self, use_cache: bool = True) -> Optional[GPUSnapshot]: """ Query all GPU information. Args: use_cache: Whether to use cached results if available Returns: GPUSnapshot or None if query fails """ # Return cache if valid and requested now = time.time() if use_cache and self._cache and (now - self._last_update) < self._cache_ttl: return self._cache if not self.is_available(): logger.warning("nvidia-smi not available") return None try: # Query comprehensive GPU info result = subprocess.run( [ "nvidia-smi", "--query-gpu=index,name,uuid,memory.total,memory.used,memory.free," "utilization.gpu,temperature.gpu,power.draw,power.limit,compute_mode", "--format=csv,noheader,nounits" ], capture_output=True, text=True, timeout=5 ) if result.returncode != 0: logger.error(f"nvidia-smi failed: {result.stderr}") return None gpus = [] for line in result.stdout.strip().split("\n"): if not line: continue parts = [p.strip() for p in line.split(",")] if len(parts) >= 11: index = int(parts[0]) # Query processes for this GPU processes = self._query_gpu_processes(index) memory_total = int(parts[3]) memory_used = int(parts[4]) gpu_info = LocalGPUInfo( index=index, uuid=parts[2], name=parts[1], memory_total_mb=memory_total, memory_free_mb=int(parts[5]), memory_used_mb=memory_used, utilization_gpu=int(parts[6]) if parts[6] != "N/A" else 0, utilization_memory=round(memory_used / memory_total * 100, 1) if memory_total > 0 else 0, temperature=int(parts[7]) if parts[7] != "N/A" else None, power_draw=float(parts[8]) if parts[8] != "N/A" else None, power_limit=float(parts[9]) if parts[9] != "N/A" else None, compute_mode=parts[10], processes=processes ) gpus.append(gpu_info) snapshot = GPUSnapshot( timestamp=datetime.now(), gpus=gpus, total_gpus=len(gpus), available_gpus=sum(1 for gpu in gpus if gpu.utilization_gpu < 50 and gpu.utilization_memory < 80) ) # Update cache self._cache = snapshot self._last_update = now return snapshot except Exception as e: logger.error(f"Failed to query GPUs: {e}") return None
def _query_gpu_processes(self, gpu_index: int) -> List[Dict[str, Any]]: """Query processes running on specific GPU.""" try: result = subprocess.run( [ "nvidia-smi", "--query-compute-apps=pid,used_memory", "--format=csv,noheader,nounits", f"--id={gpu_index}" ], capture_output=True, text=True, timeout=3 ) if result.returncode != 0: return [] processes = [] for line in result.stdout.strip().split("\n"): if not line: continue parts = [p.strip() for p in line.split(",")] if len(parts) >= 2: processes.append({ "pid": int(parts[0]), "used_memory_mb": int(parts[1]) }) return processes except Exception as e: logger.debug(f"Failed to query GPU {gpu_index} processes: {e}") return []
[docs] def get_available_gpus(self, min_memory_mb: Optional[int] = None, max_utilization: int = 50) -> List[int]: """ Get list of available GPU indices. Args: min_memory_mb: Minimum free memory required (MB) max_utilization: Maximum GPU utilization allowed (%) Returns: List of GPU indices sorted by availability score (best first) """ snapshot = self.query_gpus() if not snapshot: return [] available = [] for gpu in snapshot.gpus: # Check utilization if gpu.utilization_gpu > max_utilization: continue # Check memory if specified if min_memory_mb and gpu.memory_free_mb < min_memory_mb: continue available.append(gpu) # Sort by availability score (best first) available.sort(key=lambda g: g.score, reverse=True) return [gpu.index for gpu in available]
[docs] def allocate_gpus(self, count: int, min_memory_mb: Optional[int] = None) -> Tuple[List[int], bool]: """ Allocate specified number of GPUs. Args: count: Number of GPUs to allocate min_memory_mb: Minimum memory required per GPU (MB) Returns: Tuple of (allocated_gpu_indices, success) """ available = self.get_available_gpus(min_memory_mb=min_memory_mb) if len(available) < count: logger.warning( f"Requested {count} GPUs but only {len(available)} available " f"(min_memory_mb={min_memory_mb})" ) return ([], False) # Allocate best GPUs allocated = available[:count] # Validate memory balance for multi-GPU configurations (TP > 1) if count > 1 and min_memory_mb: from .gpu_selection import validate_memory_balance # Get detailed info for allocated GPUs snapshot = self.query_gpus(use_cache=False) if snapshot: allocated_gpu_info = [gpu for gpu in snapshot.gpus if gpu.index in allocated] if allocated_gpu_info: memory_amounts = [gpu.memory_free_mb for gpu in allocated_gpu_info] is_balanced, msg = validate_memory_balance(memory_amounts, min_ratio=0.8) if not is_balanced: logger.error( f"GPU memory imbalance detected: {msg}. " f"Rejecting allocation due to memory imbalance." ) return ([], False) logger.info(f"GPU memory balance validated: {msg}") logger.info(f"Allocated GPUs: {allocated}") return (allocated, True)
[docs] def get_gpu_info(self, gpu_index: int) -> Optional[LocalGPUInfo]: """Get information for specific GPU.""" snapshot = self.query_gpus() if not snapshot: return None for gpu in snapshot.gpus: if gpu.index == gpu_index: return gpu return None
[docs] def monitor_gpus(self, gpu_indices: List[int], duration_seconds: float, interval_seconds: float = 1.0) -> List[GPUSnapshot]: """ Monitor specific GPUs over time. Args: gpu_indices: List of GPU indices to monitor duration_seconds: How long to monitor interval_seconds: Sampling interval Returns: List of GPU snapshots """ snapshots = [] start_time = time.time() while (time.time() - start_time) < duration_seconds: snapshot = self.query_gpus(use_cache=False) if snapshot: # Filter to requested GPUs filtered_gpus = [gpu for gpu in snapshot.gpus if gpu.index in gpu_indices] filtered_snapshot = GPUSnapshot( timestamp=snapshot.timestamp, gpus=filtered_gpus, total_gpus=len(filtered_gpus), available_gpus=sum(1 for gpu in filtered_gpus if gpu.utilization_gpu < 50 and gpu.utilization_memory < 80) ) snapshots.append(filtered_snapshot) time.sleep(interval_seconds) return snapshots
[docs] def get_summary_stats(self, snapshots: List[GPUSnapshot]) -> Dict[str, Any]: """ Calculate summary statistics from monitoring snapshots. Args: snapshots: List of GPU snapshots Returns: Dictionary with summary statistics """ if not snapshots: return {} # Aggregate per-GPU statistics gpu_stats = {} for snapshot in snapshots: for gpu in snapshot.gpus: if gpu.index not in gpu_stats: gpu_stats[gpu.index] = { "name": gpu.name, "utilization": [], "memory_used": [], "memory_usage_percent": [], "temperature": [], "power_draw": [] } gpu_stats[gpu.index]["utilization"].append(gpu.utilization_gpu) gpu_stats[gpu.index]["memory_used"].append(gpu.memory_used_mb) gpu_stats[gpu.index]["memory_usage_percent"].append(gpu.utilization_memory) gpu_stats[gpu.index]["temperature"].append(gpu.temperature if gpu.temperature else 0) gpu_stats[gpu.index]["power_draw"].append(gpu.power_draw if gpu.power_draw else 0.0) # Calculate statistics summary = {} for gpu_index, stats in gpu_stats.items(): summary[gpu_index] = { "name": stats["name"], "utilization": { "min": min(stats["utilization"]), "max": max(stats["utilization"]), "mean": sum(stats["utilization"]) / len(stats["utilization"]), "samples": len(stats["utilization"]) }, "memory_used_mb": { "min": min(stats["memory_used"]), "max": max(stats["memory_used"]), "mean": sum(stats["memory_used"]) / len(stats["memory_used"]) }, "memory_usage_percent": { "min": min(stats["memory_usage_percent"]), "max": max(stats["memory_usage_percent"]), "mean": sum(stats["memory_usage_percent"]) / len(stats["memory_usage_percent"]) }, "temperature_c": { "min": min(stats["temperature"]), "max": max(stats["temperature"]), "mean": sum(stats["temperature"]) / len(stats["temperature"]) }, "power_draw_w": { "min": min(stats["power_draw"]), "max": max(stats["power_draw"]), "mean": sum(stats["power_draw"]) / len(stats["power_draw"]) } } return { "monitoring_duration_seconds": (snapshots[-1].timestamp - snapshots[0].timestamp).total_seconds(), "sample_count": len(snapshots), "gpu_stats": summary }
# Global GPU monitor instance _gpu_monitor = None
[docs] def get_gpu_monitor() -> GPUMonitor: """Get global GPU monitor instance.""" global _gpu_monitor if _gpu_monitor is None: _gpu_monitor = GPUMonitor() return _gpu_monitor