GPU 资源管理
GPU 是 LLM 推理的最大成本项。高效的 GPU 资源管理和调度是控制成本的关键。
GPU 资源管理架构
graph TB
A[GPU 资源管理] --> B[容量规划]
A --> C[调度策略]
A --> D[利用率优化]
A --> E[成本核算]
B --> B1[流量预测]
B --> B2[显存预算]
C --> C1[请求队列]
C --> C2[优先级调度]
D --> D1[模型共置]
D --> D2[动态批处理]
E --> E1[按模型计费]
E --> E2[部门分摊]
style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style D fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
GPU 集群管理器
"""
GPU 集群资源管理
"""
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
class GPUState(Enum):
IDLE = "idle"
LOADING = "loading"
SERVING = "serving"
DRAINING = "draining"
ERROR = "error"
@dataclass
class GPUNode:
"""GPU 节点"""
node_id: str
gpu_type: str # A100-80GB, H100, etc.
total_memory_gb: float
used_memory_gb: float = 0.0
state: GPUState = GPUState.IDLE
loaded_models: list[str] = field(default_factory=list)
utilization: float = 0.0 # 0.0 ~ 1.0
@property
def free_memory_gb(self) -> float:
return self.total_memory_gb - self.used_memory_gb
@property
def is_available(self) -> bool:
return self.state in (GPUState.IDLE, GPUState.SERVING)
@dataclass
class ModelDeployment:
"""模型部署"""
model_name: str
memory_required_gb: float
priority: int = 0 # 越高越优先
min_instances: int = 1
max_instances: int = 4
current_instances: int = 0
class GPUClusterManager:
"""GPU 集群管理器"""
MEMORY_OVERHEAD_RATIO = 0.15 # 显存开销预留
def __init__(self):
self.nodes: dict[str, GPUNode] = {}
self.deployments: dict[str, ModelDeployment] = {}
def add_node(self, node: GPUNode) -> None:
self.nodes[node.node_id] = node
def register_model(self, deployment: ModelDeployment) -> None:
self.deployments[deployment.model_name] = deployment
def allocate(self, model_name: str) -> GPUNode | None:
"""为模型分配 GPU"""
deployment = self.deployments.get(model_name)
if deployment is None:
return None
required = deployment.memory_required_gb * (1 + self.MEMORY_OVERHEAD_RATIO)
# 优先选择已加载相同模型的节点(模型共置)
for node in self.nodes.values():
if model_name in node.loaded_models and node.is_available:
return node
# 选择空闲内存最多的节点
candidates = [
n for n in self.nodes.values()
if n.is_available and n.free_memory_gb >= required
]
if not candidates:
return None
best = max(candidates, key=lambda n: n.free_memory_gb)
best.used_memory_gb += required
best.loaded_models.append(model_name)
best.state = GPUState.SERVING
deployment.current_instances += 1
return best
def release(self, node_id: str, model_name: str) -> bool:
"""释放 GPU 资源"""
node = self.nodes.get(node_id)
deployment = self.deployments.get(model_name)
if node is None or deployment is None:
return False
if model_name in node.loaded_models:
node.loaded_models.remove(model_name)
required = deployment.memory_required_gb * (1 + self.MEMORY_OVERHEAD_RATIO)
node.used_memory_gb = max(0, node.used_memory_gb - required)
deployment.current_instances -= 1
if not node.loaded_models:
node.state = GPUState.IDLE
return True
return False
def get_cluster_status(self) -> dict:
"""获取集群状态"""
total_memory = sum(n.total_memory_gb for n in self.nodes.values())
used_memory = sum(n.used_memory_gb for n in self.nodes.values())
avg_util = (
sum(n.utilization for n in self.nodes.values()) / len(self.nodes)
if self.nodes else 0
)
return {
"total_nodes": len(self.nodes),
"serving_nodes": sum(
1 for n in self.nodes.values() if n.state == GPUState.SERVING
),
"total_memory_gb": total_memory,
"used_memory_gb": round(used_memory, 1),
"utilization": f"{avg_util:.1%}",
"loaded_models": list({
m for n in self.nodes.values() for m in n.loaded_models
}),
}
自动扩缩容
"""
基于负载的 GPU 自动扩缩容
"""
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ScalingPolicy:
"""扩缩容策略"""
model_name: str
min_instances: int = 1
max_instances: int = 8
scale_up_threshold: float = 0.8 # 利用率超过 80% 扩容
scale_down_threshold: float = 0.3 # 利用率低于 30% 缩容
cooldown_seconds: int = 300 # 冷却时间
class AutoScaler:
"""自动扩缩容器"""
def __init__(self, cluster_manager: GPUClusterManager):
self.cluster = cluster_manager
self.policies: dict[str, ScalingPolicy] = {}
self._last_scale_time: dict[str, datetime] = {}
def set_policy(self, policy: ScalingPolicy) -> None:
self.policies[policy.model_name] = policy
def evaluate(self, model_name: str, current_utilization: float) -> str:
"""评估是否需要扩缩容"""
policy = self.policies.get(model_name)
deployment = self.cluster.deployments.get(model_name)
if policy is None or deployment is None:
return "no_action"
# 冷却期检查
last_time = self._last_scale_time.get(model_name)
if last_time:
elapsed = (datetime.now() - last_time).total_seconds()
if elapsed < policy.cooldown_seconds:
return "cooldown"
if (
current_utilization > policy.scale_up_threshold
and deployment.current_instances < policy.max_instances
):
self._last_scale_time[model_name] = datetime.now()
return "scale_up"
if (
current_utilization < policy.scale_down_threshold
and deployment.current_instances > policy.min_instances
):
self._last_scale_time[model_name] = datetime.now()
return "scale_down"
return "no_action"
GPU 成本对比
| GPU 型号 | 显存 | FP16 算力 | 按需价格/小时 | 点价格/小时 | 适用场景 |
|---|---|---|---|---|---|
| T4 | 16GB | 65 TFLOPS | $0.50 | $0.15 | 小模型推理 |
| L4 | 24GB | 121 TFLOPS | $0.80 | $0.30 | 中等模型 |
| A10G | 24GB | 125 TFLOPS | $1.00 | $0.40 | 通用推理 |
| L40S | 48GB | 362 TFLOPS | $2.50 | $1.00 | 大模型推理 |
| A100 | 80GB | 312 TFLOPS | $4.00 | $1.50 | 高吞吐 |
| H100 | 80GB | 990 TFLOPS | $6.00 | $2.50 | 极致性能 |
本章小结
| 主题 | 要点 |
|---|---|
| 资源分配 | 优先共置 → 选择空闲最大节点 |
| 自动扩缩 | 利用率阈值 + 冷却期 + min/max 防护 |
| 成本控制 | 按需 + Spot 混合、右尺寸选型 |
| 监控指标 | 显存使用率、GPU 利用率、排队长度 |
下一章:成本优化策略