Source code for controllers.docker_controller

"""
Docker Deployment Controller

Manages the lifecycle of model inference services using standalone Docker containers.
No Kubernetes required - direct Docker container management.
"""

import time
import requests
from pathlib import Path
from typing import Dict, Any, Optional

try:
	import docker
	from docker.errors import DockerException, NotFound, APIError
except ImportError:
	docker = None

from .base_controller import BaseModelController
from .utils import (
	sanitize_container_name,
	find_available_port,
	parse_parallel_config,
	setup_proxy_environment,
	build_param_list,
	get_runtime_config
)


[docs] class DockerController(BaseModelController): """Controller for managing standalone Docker container deployments."""
[docs] def __init__(self, model_base_path: str = "/mnt/data/models", http_proxy: str = "", https_proxy: str = "", no_proxy: str = "", hf_token: str = ""): """Initialize the Docker controller. Args: model_base_path: Base path where models are stored on the host http_proxy: HTTP proxy URL (optional) https_proxy: HTTPS proxy URL (optional) no_proxy: Comma-separated list of hosts to bypass proxy (optional) hf_token: HuggingFace access token for gated models (optional) Note: Container logs are retrieved before deletion and saved to task log file. Containers are manually removed during cleanup phase. """ if docker is None: raise ImportError("Docker SDK for Python is not installed. " "Install it with: pip install docker") try: self.client = docker.from_env() # Test connection self.client.ping() print("[Docker] Successfully connected to Docker daemon") except DockerException as e: raise RuntimeError(f"Failed to connect to Docker daemon: {e}") self.model_base_path = Path(model_base_path) self.containers = {} # Track containers by service_id # Store proxy settings self.http_proxy = http_proxy self.https_proxy = https_proxy self.no_proxy = no_proxy # Store HuggingFace token self.hf_token = hf_token if self.http_proxy or self.https_proxy: print(f"[Docker] Proxy configured - HTTP: {self.http_proxy or 'None'}, HTTPS: {self.https_proxy or 'None'}") if self.no_proxy: print(f"[Docker] No proxy for: {self.no_proxy}")
[docs] def deploy_inference_service( self, task_name: str, experiment_id: int, namespace: str, model_name: str, runtime_name: str, parameters: Dict[str, Any], image_tag: Optional[str] = None, ) -> Optional[str]: """Deploy a model inference service using Docker. Args: task_name: Autotuning task name experiment_id: Unique experiment identifier namespace: Namespace identifier (used for container naming) model_name: Model name (HuggingFace model ID or local path) runtime_name: Runtime identifier (e.g., 'sglang', 'vllm') parameters: SGLang/runtime parameters (tp_size, mem_frac, etc.) image_tag: Optional Docker image tag (e.g., 'v0.5.2-cu126') Returns: Container ID if successful, None otherwise """ # Sanitize names for container naming safe_task_name = sanitize_container_name(task_name) safe_namespace = sanitize_container_name(namespace) service_id = f"{safe_namespace}-{safe_task_name}-exp{experiment_id}" container_name = service_id # Determine if model_name is a local path or HuggingFace model ID # Local paths start with / or contain model_base_path use_local_model = False model_identifier = model_name volumes = {} # Always mount HuggingFace cache directory for model caching # This allows reusing downloaded models across container restarts hf_cache_dir = Path.home() / ".cache/huggingface" hf_cache_dir.mkdir(parents=True, exist_ok=True) volumes[str(hf_cache_dir)] = {"bind": "/root/.cache/huggingface", "mode": "rw"} if model_name.startswith("/") or "/" not in model_name: # Could be a local path - check if it exists if model_name.startswith("/"): model_path = Path(model_name) else: model_path = self.model_base_path / model_name if model_path.exists(): # Local model exists - use volume mount use_local_model = True model_identifier = "/model" volumes[str(model_path)] = {"bind": "/model", "mode": "ro"} print(f"[Docker] Using local model at {model_path}") else: # Local path doesn't exist - fail early print(f"[Docker] ERROR: Local model path {model_path} does not exist") print(f"[Docker] Either:") print(f"[Docker] 1. Download the model to {model_path}") print(f"[Docker] 2. Use a HuggingFace model ID (e.g., 'meta-llama/Llama-3.2-1B-Instruct')") return None else: # Contains / - likely a HuggingFace model ID (e.g., meta-llama/Llama-3.2-1B) print(f"[Docker] Using HuggingFace model ID: {model_name}") print(f"[Docker] Model will be downloaded from HuggingFace Hub if not cached") # Determine runtime image and command based on runtime_name runtime_config = self._get_runtime_config(runtime_name, parameters, image_tag) if not runtime_config: print(f"[Docker] Unsupported runtime: {runtime_name}") return None # Determine host port (avoid conflicts) - needed for both bridge and host networking host_port = find_available_port(8000, 8100) if not host_port: print(f"[Docker] Could not find available port in range 8000-8100") return None # Build command with model identifier and host port # Use list composition directly to avoid issues with spaces in parameter values command_template = runtime_config["command"] # Parse the base command from template (e.g., "python3 -m sglang.launch_server ...") # Split only the base command part, preserving parameter placeholders base_parts = command_template.split() command_list = [] # Build command list by substituting placeholders for part in base_parts: if "{model_path}" in part: command_list.append(part.replace("{model_path}", model_identifier)) elif "{port}" in part: command_list.append(part.replace("{port}", str(host_port))) else: command_list.append(part) # Add all parameters as command-line arguments using shared utility param_list = build_param_list(parameters) command_list.extend(param_list) # Determine GPU allocation based on parallel configuration using shared utility parallel_config = self._get_parallel_config(parameters) tp = parallel_config["tp"] pp = parallel_config["pp"] dp = parallel_config["dp"] cp = parallel_config["cp"] dcp = parallel_config["dcp"] world_size = parallel_config["world_size"] num_gpus = world_size print(f"[Docker] Parallel configuration: TP={tp}, PP={pp}, DP={dp}, CP={cp}, DCP={dcp}") print(f"[Docker] Calculated world_size: {world_size}") # Build container configuration try: # Determine GPU devices using shared intelligent selection gpu_info_dict = self._select_gpus_intelligent(num_gpus, log_prefix="[Docker]") if not gpu_info_dict: print(f"[Docker] Failed to allocate {num_gpus} GPU(s)") return None gpu_devices = gpu_info_dict["device_ids"] gpu_model = gpu_info_dict["gpu_model"] print(f"[Docker] Deploying container '{container_name}'") print(f"[Docker] Image: {runtime_config['image']}") if use_local_model: print(f"[Docker] Model: {model_path} (local)") else: print(f"[Docker] Model: {model_name} (HuggingFace Hub)") print(f"[Docker] GPUs: {gpu_devices} (Model: {gpu_model})") print(f"[Docker] Parameters: {parameters}") # Remove existing container if present try: old_container = self.client.containers.get(container_name) print(f"[Docker] Removing existing container '{container_name}'") old_container.remove(force=True) except NotFound: pass # Check if image exists locally, pull if not self._ensure_image_available(runtime_config["image"]) # Prepare environment variables env_vars = { "MODEL_PATH": model_identifier, "HF_HOME": "/root/.cache/huggingface" # Cache directory for downloaded models } # Note: CUDA_VISIBLE_DEVICES is NOT set here because we use device_requests # Docker's device_requests handles GPU allocation directly via device_ids # Setting CUDA_VISIBLE_DEVICES could conflict with device_requests # Add proxy settings and HF token using shared utility env_vars = setup_proxy_environment( env_vars, http_proxy=self.http_proxy, https_proxy=self.https_proxy, no_proxy=self.no_proxy, hf_token=self.hf_token ) # Debug: Print env vars being passed to container print(f"[Docker] Environment variables to be set in container:") for key, value in env_vars.items(): if "proxy" in key.lower() or "token" in key.lower(): # Mask token value for security display_value = "***" if "token" in key.lower() and value else value print(f"[Docker] {key}={display_value}") # Create and start container with host networking container = self.client.containers.run( image=runtime_config["image"], name=container_name, command=command_list, detach=True, device_requests=[docker.types.DeviceRequest(device_ids=gpu_devices, capabilities=[["gpu"]])], volumes=volumes, # Use conditional volumes (empty for HuggingFace models) environment=env_vars, shm_size="16g", # Shared memory for multi-process inference ipc_mode="host", # Use host IPC namespace for shared memory network_mode="host", # Use host network for better performance and compatibility remove=False, # Don't auto-remove - we need to retrieve logs first ) # Store container reference self.containers[service_id] = { "container": container, "host_port": host_port, "gpu_devices": gpu_devices, "gpu_model": gpu_model, "world_size": world_size, } print(f"[Docker] Container '{container_name}' started (ID: {container.short_id})") print(f"[Docker] Service URL: http://localhost:{host_port}") # Verify proxy settings in container - inspect via Docker API try: container.reload() # Refresh container info container_env = container.attrs.get('Config', {}).get('Env', []) proxy_env = [env for env in container_env if 'proxy' in env.lower()] if proxy_env: print(f"[Docker] Proxy environment variables in container (from Docker API):") for env in proxy_env: print(f"[Docker] {env}") else: print(f"[Docker] No proxy environment variables found in container (via Docker API)") except Exception as e: print(f"[Docker] Could not inspect container environment: {e}") return service_id except APIError as e: print(f"[Docker] Error creating container: {e}") return None except Exception as e: print(f"[Docker] Unexpected error: {e}") return None
[docs] def wait_for_ready(self, service_id: str, namespace: str, timeout: int = 600, poll_interval: int = 5) -> bool: """Wait for the Docker container service to become ready. Args: service_id: Service identifier (container name) namespace: Namespace identifier timeout: Maximum wait time in seconds poll_interval: Polling interval in seconds Returns: True if service is ready, False if timeout or error """ if service_id not in self.containers: print(f"[Docker] Service '{service_id}' not found") return False container_info = self.containers[service_id] container = container_info["container"] host_port = container_info["host_port"] # Check both /health and /v1/models endpoints # /health may return 503 if warmup fails, but /v1/models will return 200 if service is ready health_url = f"http://localhost:{host_port}/health" models_url = f"http://localhost:{host_port}/v1/models" start_time = time.time() print(f"[Docker] Waiting for service to be ready (checking /health and /v1/models)...") # Track consecutive failures for crash-loop detection consecutive_exits = 0 max_consecutive_exits = 3 # Track log snapshots - capture logs at key intervals for debugging log_snapshot_intervals = [60, 120, 300] # Capture logs at 1min, 2min, 5min next_snapshot_idx = 0 while time.time() - start_time < timeout: try: # Check container status container.reload() # Handle different container states if container.status == "running": # Container is running, try health check consecutive_exits = 0 # Reset counter try: # Check both endpoints, service is ready if either returns 200 health_response = requests.get(health_url, timeout=5) models_response = requests.get(models_url, timeout=5) if health_response.status_code == 200: print(f"[Docker] Service is ready! (via /health) URL: http://localhost:{host_port}") return True elif models_response.status_code == 200: print(f"[Docker] Service is ready! (via /v1/models) URL: http://localhost:{host_port}") return True except requests.RequestException: # Endpoints not ready yet, continue waiting pass elif container.status in ["exited", "dead"]: # Container has stopped - this is a failure consecutive_exits += 1 print(f"[Docker] Container status: {container.status} (attempt {consecutive_exits}/{max_consecutive_exits})") # Get exit code for more information exit_code = container.attrs.get('State', {}).get('ExitCode', 'unknown') print(f"[Docker] Container exit code: {exit_code}") # Print logs to help diagnose the issue try: # Retrieve ALL logs to diagnose startup issues (not just last 100 lines) logs = container.logs(stdout=True, stderr=True).decode("utf-8", errors="replace") print(f"[Docker] Container logs:\n{logs}") except Exception as e: print(f"[Docker] Could not retrieve container logs: {e}") # If container exits multiple times quickly, it's crash-looping - fail immediately if consecutive_exits >= max_consecutive_exits: print(f"[Docker] Container is crash-looping, giving up") return False # Container exited, likely due to error - fail immediately print(f"[Docker] Container stopped unexpectedly, deployment failed") return False elif container.status in ["removing", "paused"]: # Container is being removed or paused - this is a failure print(f"[Docker] Container status: {container.status} - deployment failed") return False elif container.status in ["created", "restarting"]: # Container is starting or restarting - keep waiting print(f"[Docker] Container status: {container.status} - waiting for running state...") else: # Unknown status print(f"[Docker] Container status: {container.status} (unknown state)") except NotFound: # Container has been auto-removed (because it exited with remove=True) print(f"[Docker] Container was automatically removed after exiting") print(f"[Docker] This typically means the container failed to start") print(f"[Docker] Check that the model path exists and the runtime parameters are correct") return False except Exception as e: print(f"[Docker] Error checking service status: {e}") # If we can't check status, the container might be gone return False elapsed = int(time.time() - start_time) # Capture log snapshots at key intervals for debugging long startups if next_snapshot_idx < len(log_snapshot_intervals): if elapsed >= log_snapshot_intervals[next_snapshot_idx]: print(f"\n[Docker] === Log Snapshot at {elapsed}s ===") try: snapshot_logs = container.logs(tail=50, stdout=True, stderr=True).decode("utf-8", errors="replace") print(snapshot_logs) print(f"[Docker] === End Snapshot ===\n") except Exception as e: print(f"[Docker] Could not capture log snapshot: {e}") next_snapshot_idx += 1 print(f"[Docker] Waiting for service... ({elapsed}s)") time.sleep(poll_interval) # Timeout reached print(f"[Docker] Timeout waiting for service '{service_id}' to be ready after {timeout}s") # Print final container logs for debugging try: container.reload() print(f"[Docker] Final container status: {container.status}") # Retrieve ALL logs to diagnose startup issues (not just last 100 lines) logs = container.logs(stdout=True, stderr=True).decode("utf-8", errors="replace") print(f"[Docker] Container logs:\n{logs}") except Exception as e: print(f"[Docker] Could not retrieve final container state: {e}") return False
[docs] def delete_inference_service(self, service_id: str, namespace: str) -> bool: """Delete a Docker container service. Args: service_id: Service identifier namespace: Namespace identifier Returns: True if deleted successfully """ if service_id not in self.containers: print(f"[Docker] Service '{service_id}' not found (already deleted?)") return True try: container_info = self.containers[service_id] container = container_info["container"] print(f"[Docker] Stopping and removing container '{service_id}'...") # Stop the container first try: container.stop(timeout=10) except Exception as e: print(f"[Docker] Error stopping container (may already be stopped): {e}") # Now remove the container try: container.remove(force=True) except Exception as e: print(f"[Docker] Error removing container: {e}") # Release GPU tracking (if implemented) del self.containers[service_id] print(f"[Docker] Container '{service_id}' removed") return True except NotFound: print(f"[Docker] Container '{service_id}' not found (already deleted?)") del self.containers[service_id] return True except Exception as e: print(f"[Docker] Error deleting container: {e}") return False
[docs] def get_service_url(self, service_id: str, namespace: str) -> Optional[str]: """Get the service URL for a Docker container. Args: service_id: Service identifier namespace: Namespace identifier Returns: Service URL if available, None otherwise """ if service_id not in self.containers: return None host_port = self.containers[service_id]["host_port"] return f"http://localhost:{host_port}"
[docs] def get_container_logs(self, service_id: str, namespace: str, tail: int = 1000) -> Optional[str]: """Get logs from a Docker container. Args: service_id: Service identifier namespace: Namespace identifier tail: Number of lines to retrieve (default: 1000, 0 for all) Returns: Container logs as string, None if container not found """ if service_id not in self.containers: print(f"[Docker] Service '{service_id}' not found, cannot retrieve logs") return None try: container = self.containers[service_id]["container"] # Get logs (both stdout and stderr) if tail > 0: logs = container.logs(tail=tail, stdout=True, stderr=True).decode("utf-8", errors="replace") else: logs = container.logs(stdout=True, stderr=True).decode("utf-8", errors="replace") return logs except Exception as e: print(f"[Docker] Error retrieving logs for '{service_id}': {e}") return None
[docs] def get_gpu_info(self, service_id: str, namespace: str) -> Optional[Dict[str, Any]]: """Get GPU information for a deployed container. Args: service_id: Service identifier namespace: Namespace identifier Returns: Dict with GPU info: {model, count, device_ids, world_size}, or None if not found """ if service_id not in self.containers: print(f"[Docker] Service '{service_id}' not found, cannot retrieve GPU info") return None try: container_info = self.containers[service_id] return { "model": container_info.get("gpu_model", "Unknown"), "count": len(container_info.get("gpu_devices", [])), "device_ids": container_info.get("gpu_devices", []), "world_size": container_info.get("world_size", 1) } except Exception as e: print(f"[Docker] Error retrieving GPU info for '{service_id}': {e}") return None
def _get_runtime_config( self, runtime_name: str, parameters: Dict[str, Any], image_tag: Optional[str] = None ) -> Optional[Dict[str, str]]: """Get Docker image and command configuration for a runtime using shared utility. Args: runtime_name: Runtime identifier parameters: Runtime parameters (unused, kept for compatibility) image_tag: Optional Docker image tag to override default Returns: Dictionary with 'image' and 'command' keys, or None if unsupported """ # Use shared runtime configuration utility config = get_runtime_config(runtime_name, image_tag) if config and image_tag: print(f"[Docker] Using custom image tag: {config['image']}") return config def _ensure_image_available(self, image_name: str) -> bool: """Ensure Docker image is available locally, pull if not. Args: image_name: Full image name with tag (e.g., 'lmsysorg/sglang:v0.5.2-cu126') Returns: True if image is available, False if pull failed """ try: # Check if image exists locally self.client.images.get(image_name) print(f"[Docker] Image '{image_name}' found in local cache") return True except docker.errors.ImageNotFound: # Image not found locally, need to pull print(f"[Docker] Image '{image_name}' not found locally") print(f"[Docker] Pulling image (this may take several minutes)...") try: # Split image name and tag for pull API if ":" in image_name: repository, tag = image_name.rsplit(":", 1) else: repository = image_name tag = "latest" # Pull with progress tracking last_status = {} for line in self.client.api.pull(repository, tag=tag, stream=True, decode=True): # Each line is a dict with status, progressDetail, etc. if "status" in line: status = line["status"] layer_id = line.get("id", "") # Show progress for downloading/extracting layers if "progressDetail" in line and line["progressDetail"]: progress = line["progressDetail"] current = progress.get("current", 0) total = progress.get("total", 0) if total > 0: percent = (current / total) * 100 # Update status for this layer last_status[layer_id] = f"{status}: {percent:.1f}%" else: last_status[layer_id] = status else: # Status without progress (e.g., "Pull complete") if layer_id: last_status[layer_id] = status # Print summary of active layers periodically if layer_id and status in ["Downloading", "Extracting"]: # Count layers in each state downloading = sum(1 for s in last_status.values() if "Downloading" in s) extracting = sum(1 for s in last_status.values() if "Extracting" in s) complete = sum(1 for s in last_status.values() if "complete" in s) # Show compact progress summary print( f"\r[Docker] Progress: {complete} complete, {downloading} downloading, {extracting} extracting", end="", flush=True, ) # Final newline after progress print() print(f"[Docker] Successfully pulled '{image_name}'") return True except Exception as e: print(f"\n[Docker] Error pulling image: {e}") return False except Exception as e: print(f"[Docker] Error checking image: {e}") return False