版本管理与监控
模型上线不是终点——你需要管理版本、监控质量、随时回滚。
模型运维架构
graph TD
DEV[开发环境] -->|版本注册| REGISTRY[模型仓库]
REGISTRY -->|部署| STAGING[预发布环境]
STAGING -->|验证通过| PROD[生产环境]
PROD -->|回滚| REGISTRY
PROD --> MONITOR[监控系统]
MONITOR --> QUALITY[质量指标]
MONITOR --> PERF[性能指标]
MONITOR --> COST[成本指标]
QUALITY -->|低于阈值| ALERT[告警]
ALERT -->|自动/手动| ROLLBACK[回滚]
style DEV fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style PROD fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style ALERT fill:#ffcdd2,stroke:#c62828,stroke-width:2px
模型版本管理
"""
模型版本管理系统
"""
import json
from datetime import datetime
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class ModelVersion:
"""模型版本"""
name: str
version: str
base_model: str
training_data: str
metrics: dict
created_at: str = ""
status: str = "draft"
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now().isoformat()
class ModelRegistry:
"""模型注册中心"""
def __init__(self, registry_path: str = "./model-registry"):
self.path = Path(registry_path)
self.path.mkdir(exist_ok=True)
self.versions: list[ModelVersion] = []
def register(self, model: ModelVersion) -> str:
"""注册新模型版本"""
self.versions.append(model)
# 保存元数据
meta_path = self.path / f"{model.name}-{model.version}.json"
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(model.__dict__, f, ensure_ascii=False, indent=2)
return f"✅ 注册成功: {model.name}:{model.version}"
def promote(self, name: str, version: str, stage: str) -> str:
"""提升模型阶段: draft → staging → production"""
for v in self.versions:
if v.name == name and v.version == version:
v.status = stage
return f"✅ {name}:{version} → {stage}"
return f"❌ 未找到 {name}:{version}"
def get_production(self, name: str) -> ModelVersion | None:
"""获取生产版本"""
for v in self.versions:
if v.name == name and v.status == "production":
return v
return None
def rollback(self, name: str) -> str:
"""回滚到上一个生产版本"""
prod_versions = [
v for v in self.versions
if v.name == name and v.status in ("production", "archived")
]
if len(prod_versions) < 2:
return "❌ 没有可回滚的版本"
# 当前版本归档
current = prod_versions[-1]
current.status = "archived"
# 上一版本恢复
previous = prod_versions[-2]
previous.status = "production"
return f"✅ 回滚: {current.version} → {previous.version}"
def list_versions(self, name: str) -> list[dict]:
"""列出所有版本"""
return [
{
"version": v.version,
"status": v.status,
"metrics": v.metrics,
"created_at": v.created_at,
}
for v in self.versions
if v.name == name
]
# 使用示例
registry = ModelRegistry()
# 注册版本
v1 = ModelVersion(
name="customer-service-bot",
version="1.0.0",
base_model="Qwen2.5-7B-Instruct",
training_data="cs-data-v1 (5000 samples)",
metrics={"rouge_l": 0.42, "llm_judge": 7.5},
)
print(registry.register(v1))
print(registry.promote("customer-service-bot", "1.0.0", "production"))
线上监控
"""
模型线上监控
"""
class ModelMonitor:
"""模型监控"""
METRICS = {
"质量指标": {
"用户满意度": {
"采集": "用户点赞/点踩",
"告警": "满意率 < 80%",
"频率": "实时",
},
"回答质量": {
"采集": "LLM-as-Judge 抽样评分",
"告警": "均分 < 7.0",
"频率": "每小时",
},
"拒绝率": {
"采集": "模型拒绝回答的比例",
"告警": "> 10%",
"频率": "实时",
},
"幻觉率": {
"采集": "事实核查抽检",
"告警": "> 5%",
"频率": "每天",
},
},
"性能指标": {
"P50 延迟": {"告警": "> 500ms", "频率": "实时"},
"P99 延迟": {"告警": "> 2000ms", "频率": "实时"},
"吞吐量": {"告警": "< 预期 80%", "频率": "实时"},
"GPU 利用率": {"告警": "> 95%", "频率": "实时"},
"错误率": {"告警": "> 1%", "频率": "实时"},
},
"成本指标": {
"每请求成本": {"告警": "> 预算 120%", "频率": "每天"},
"月度总成本": {"告警": "> 预算", "频率": "每天"},
"Token 用量": {"告警": "异常增长 50%", "频率": "每天"},
},
}
MONITORING_CODE = """
# Prometheus + Grafana 监控示例
from prometheus_client import (
Counter, Histogram, Gauge, start_http_server,
)
# 定义指标
REQUEST_LATENCY = Histogram(
"model_request_latency_seconds",
"Model request latency",
buckets=[0.1, 0.25, 0.5, 1.0, 2.0, 5.0],
)
REQUEST_COUNT = Counter(
"model_request_total",
"Total model requests",
["status"],
)
USER_SATISFACTION = Gauge(
"model_user_satisfaction_rate",
"User satisfaction rate (thumbs up ratio)",
)
# 使用
import time
@REQUEST_LATENCY.time()
def handle_request(prompt: str) -> str:
try:
result = model.generate(prompt)
REQUEST_COUNT.labels(status="success").inc()
return result
except Exception as e:
REQUEST_COUNT.labels(status="error").inc()
raise
# 启动指标端口
start_http_server(9090)
"""
monitor = ModelMonitor()
print("=== 监控指标体系 ===")
for category, metrics in monitor.METRICS.items():
print(f"\n📊 {category}:")
for name, info in metrics.items():
alert = info.get("告警", "")
print(f" {name}: 告警阈值 {alert}")
灰度发布与 A/B 测试
"""
灰度发布策略
"""
class GradualRollout:
"""灰度发布"""
STRATEGIES = {
"金丝雀发布": {
"流程": [
"1. 新模型部署到 1 台机器",
"2. 导入 5% 流量",
"3. 监控 1-2 小时",
"4. 指标正常则 25% → 50% → 100%",
"5. 异常立即回滚",
],
"切流速度": "慢(小时级)",
},
"A/B 测试": {
"流程": [
"1. 流量 50/50 分配到新旧模型",
"2. 收集 1000+ 请求数据",
"3. 统计显著性检验",
"4. 胜出模型全量",
],
"切流速度": "中(天级)",
},
"影子模式": {
"流程": [
"1. 线上用旧模型回答",
"2. 同时用新模型影子预测",
"3. 对比两者输出",
"4. 新模型质量达标后切换",
],
"切流速度": "慢(天-周级),但最安全",
},
}
rollout = GradualRollout()
print("=== 灰度发布策略 ===")
for name, info in rollout.STRATEGIES.items():
print(f"\n{name} ({info['切流速度']}):")
for step in info["流程"]:
print(f" {step}")
运维清单
| 阶段 | 检查项 | 负责 | 频率 |
|---|---|---|---|
| 上线前 | 回归测试通过 | ML 工程师 | 每次 |
| 上线前 | 安全审查通过 | 安全团队 | 每次 |
| 上线中 | 灰度切流 | SRE | 每次 |
| 上线后 | 质量监控 | ML 工程师 | 持续 |
| 上线后 | 成本追踪 | 产品经理 | 每周 |
| 定期 | 模型重训练 | ML 工程师 | 每月 |
| 定期 | 数据漂移检测 | 数据工程师 | 每周 |
下一章:客服模型微调——从零开始的完整实战案例。