A/B 测试与持续监控
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read158 words

A/B 测试与持续监控

将 LLM 从实验推向生产,需要 A/B 测试验证效果、持续监控保障质量。

A/B 测试框架

graph TB A[用户请求] --> B{流量分配} B -->|50%| C[模型 A - 当前版本] B -->|50%| D[模型 B - 新版本] C --> E[收集指标] D --> E E --> F[统计分析] F --> G{显著差异?} G -->|是| H[切换到更好的版本] G -->|否| I[继续实验] style B fill:#e3f2fd,stroke:#1976d2,stroke-width:2px style F fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
"""
LLM A/B 测试框架
"""
import random
import time
import math
from dataclasses import dataclass, field
@dataclass
class ExperimentVariant:
"""实验变体"""
name: str
model: str
system_prompt: str = ""
temperature: float = 0.7
scores: list[float] = field(default_factory=list)
latencies: list[float] = field(default_factory=list)
@property
def mean_score(self) -> float:
return sum(self.scores) / len(self.scores) if self.scores else 0
@property
def mean_latency(self) -> float:
return sum(self.latencies) / len(self.latencies) if self.latencies else 0
class ABTestRunner:
"""A/B 测试运行器"""
def __init__(self):
self.variants: list[ExperimentVariant] = []
self.total_requests = 0
def add_variant(self, variant: ExperimentVariant) -> None:
"""添加实验变体"""
self.variants.append(variant)
def route_request(self) -> ExperimentVariant:
"""路由请求到变体"""
self.total_requests += 1
return random.choice(self.variants)
def record_result(
self, variant: ExperimentVariant, score: float, latency_ms: float
) -> None:
"""记录实验结果"""
variant.scores.append(score)
variant.latencies.append(latency_ms)
def analyze(self) -> dict:
"""统计分析"""
results = {}
for v in self.variants:
n = len(v.scores)
mean = v.mean_score
std = (
math.sqrt(sum((s - mean) ** 2 for s in v.scores) / (n - 1))
if n > 1 else 0
)
results[v.name] = {
"samples": n,
"mean_score": round(mean, 4),
"std": round(std, 4),
"mean_latency_ms": round(v.mean_latency, 1),
}
# 两组比较的显著性检验(简化 t-test)
if len(self.variants) == 2:
a, b = self.variants
if len(a.scores) > 1 and len(b.scores) > 1:
significant = self._is_significant(a.scores, b.scores)
winner = a.name if a.mean_score > b.mean_score else b.name
results["conclusion"] = {
"significant": significant,
"winner": winner if significant else "无显著差异",
"improvement": abs(a.mean_score - b.mean_score),
}
# 打印报告
print("\n=== A/B 测试结果 ===")
for name, r in results.items():
if isinstance(r, dict) and "samples" in r:
print(f"\n  [{name}]")
print(f"    样本数: {r['samples']}")
print(f"    平均分: {r['mean_score']} ± {r['std']}")
print(f"    平均延迟: {r['mean_latency_ms']}ms")
if "conclusion" in results:
c = results["conclusion"]
print(f"\n  结论: {'有' if c['significant'] else '无'}显著差异")
if c["significant"]:
print(f"    胜出: {c['winner']} (提升 {c['improvement']:.4f})")
return results
def _is_significant(
self, scores_a: list, scores_b: list, alpha: float = 0.05
) -> bool:
"""简化的显著性检验"""
n_a, n_b = len(scores_a), len(scores_b)
mean_a = sum(scores_a) / n_a
mean_b = sum(scores_b) / n_b
var_a = sum((x - mean_a) ** 2 for x in scores_a) / (n_a - 1)
var_b = sum((x - mean_b) ** 2 for x in scores_b) / (n_b - 1)
se = math.sqrt(var_a / n_a + var_b / n_b)
if se == 0:
return False
t_stat = abs(mean_a - mean_b) / se
# 简化: t > 2 约等于 p < 0.05
return t_stat > 2.0
# 使用
ab_test = ABTestRunner()
ab_test.add_variant(ExperimentVariant(
name="baseline",
model="gpt-4o-mini",
system_prompt="你是一个有用的助手。",
))
ab_test.add_variant(ExperimentVariant(
name="improved",
model="gpt-4o-mini",
system_prompt="你是一个专业的技术顾问。请用简洁、准确的语言回答问题。",
))
# 模拟实验
for _ in range(100):
variant = ab_test.route_request()
# 模拟评分(实际中用 LLM-as-a-Judge 或用户反馈)
score = random.gauss(3.5 if variant.name == "baseline" else 3.8, 0.5)
latency = random.gauss(500, 100)
ab_test.record_result(variant, score, latency)
ab_test.analyze()

生产监控系统

"""
LLM 生产环境监控
"""
import time
from dataclasses import dataclass, field
from collections import deque
@dataclass
class MonitorConfig:
"""监控配置"""
latency_threshold_ms: float = 2000     # 延迟告警阈值
error_rate_threshold: float = 0.05     # 错误率告警阈值
quality_score_threshold: float = 3.0   # 质量分告警阈值
window_size: int = 100                 # 滑动窗口大小
class LLMMonitor:
"""LLM 实时监控"""
def __init__(self, config: MonitorConfig = None):
self.config = config or MonitorConfig()
self.latencies: deque = deque(maxlen=self.config.window_size)
self.errors: deque = deque(maxlen=self.config.window_size)
self.quality_scores: deque = deque(maxlen=self.config.window_size)
self.total_requests = 0
self.total_tokens = 0
self.alerts: list[dict] = []
def record_request(
self,
latency_ms: float,
success: bool,
tokens_used: int = 0,
quality_score: float = None,
) -> list[dict]:
"""记录一次请求"""
self.total_requests += 1
self.total_tokens += tokens_used
self.latencies.append(latency_ms)
self.errors.append(0 if success else 1)
if quality_score is not None:
self.quality_scores.append(quality_score)
# 检查告警
return self._check_alerts()
def _check_alerts(self) -> list[dict]:
"""检查是否触发告警"""
new_alerts = []
# 延迟告警
if len(self.latencies) >= 10:
avg_latency = sum(self.latencies) / len(self.latencies)
if avg_latency > self.config.latency_threshold_ms:
alert = {
"type": "HIGH_LATENCY",
"message": f"平均延迟 {avg_latency:.0f}ms 超过阈值 {self.config.latency_threshold_ms}ms",
"time": time.strftime("%H:%M:%S"),
}
new_alerts.append(alert)
# 错误率告警
if len(self.errors) >= 10:
error_rate = sum(self.errors) / len(self.errors)
if error_rate > self.config.error_rate_threshold:
alert = {
"type": "HIGH_ERROR_RATE",
"message": f"错误率 {error_rate:.1%} 超过阈值 {self.config.error_rate_threshold:.1%}",
"time": time.strftime("%H:%M:%S"),
}
new_alerts.append(alert)
# 质量告警
if len(self.quality_scores) >= 10:
avg_quality = sum(self.quality_scores) / len(self.quality_scores)
if avg_quality < self.config.quality_score_threshold:
alert = {
"type": "LOW_QUALITY",
"message": f"质量均分 {avg_quality:.2f} 低于阈值 {self.config.quality_score_threshold}",
"time": time.strftime("%H:%M:%S"),
}
new_alerts.append(alert)
self.alerts.extend(new_alerts)
return new_alerts
def get_dashboard(self) -> dict:
"""获取监控面板数据"""
dashboard = {
"total_requests": self.total_requests,
"total_tokens": self.total_tokens,
}
if self.latencies:
sorted_lat = sorted(self.latencies)
dashboard["latency"] = {
"avg": round(sum(self.latencies) / len(self.latencies), 1),
"p50": sorted_lat[len(sorted_lat) // 2],
"p95": sorted_lat[int(len(sorted_lat) * 0.95)],
"p99": sorted_lat[int(len(sorted_lat) * 0.99)],
}
if self.errors:
dashboard["error_rate"] = round(
sum(self.errors) / len(self.errors), 4
)
if self.quality_scores:
dashboard["quality_avg"] = round(
sum(self.quality_scores) / len(self.quality_scores), 2
)
dashboard["active_alerts"] = len(self.alerts)
return dashboard
def print_dashboard(self) -> None:
"""打印监控面板"""
d = self.get_dashboard()
print("\n╔══════════════════════════════════╗")
print("║     LLM Production Dashboard     ║")
print("╠══════════════════════════════════╣")
print(f"║  总请求数: {d['total_requests']:<20} ║")
print(f"║  总 Tokens: {d['total_tokens']:<19} ║")
if "latency" in d:
lat = d["latency"]
print(f"║  延迟 P50: {lat['p50']:.0f}ms{' '*(16-len(f'{lat[\"p50\"]:.0f}ms'))} ║")
print(f"║  延迟 P95: {lat['p95']:.0f}ms{' '*(16-len(f'{lat[\"p95\"]:.0f}ms'))} ║")
if "error_rate" in d:
print(f"║  错误率: {d['error_rate']:.2%}{' '*(18-len(f'{d[\"error_rate\"]:.2%}'))} ║")
if "quality_avg" in d:
print(f"║  质量均分: {d['quality_avg']:.2f}/5{' '*(15-len(f'{d[\"quality_avg\"]:.2f}/5'))} ║")
print(f"║  活跃告警: {d['active_alerts']:<20} ║")
print("╚══════════════════════════════════╝")
# 使用
monitor = LLMMonitor(MonitorConfig(
latency_threshold_ms=1000,
error_rate_threshold=0.1,
))
# 模拟请求
import random
for i in range(50):
latency = random.gauss(800, 300)
success = random.random() > 0.05
quality = random.gauss(4.0, 0.5) if success else 0
tokens = random.randint(100, 500)
alerts = monitor.record_request(latency, success, tokens, quality)
if alerts:
for a in alerts:
print(f"⚠️  [{a['type']}] {a['message']}")
monitor.print_dashboard()

用户反馈收集

"""
用户反馈收集与分析
"""
class FeedbackCollector:
"""用户反馈收集器"""
def __init__(self):
self.feedbacks: list[dict] = []
def collect(
self,
request_id: str,
rating: int,
comment: str = "",
issue_type: str = None,
) -> None:
"""收集反馈"""
self.feedbacks.append({
"request_id": request_id,
"rating": rating,
"comment": comment,
"issue_type": issue_type,
"timestamp": time.time(),
})
def analyze(self) -> dict:
"""分析反馈"""
if not self.feedbacks:
return {"message": "暂无反馈"}
ratings = [f["rating"] for f in self.feedbacks]
issues = [f["issue_type"] for f in self.feedbacks if f["issue_type"]]
# 统计问题类型
issue_counts = {}
for issue in issues:
issue_counts[issue] = issue_counts.get(issue, 0) + 1
return {
"total_feedbacks": len(self.feedbacks),
"avg_rating": round(sum(ratings) / len(ratings), 2),
"rating_distribution": {
i: ratings.count(i) for i in range(1, 6)
},
"top_issues": dict(
sorted(issue_counts.items(), key=lambda x: -x[1])[:5]
),
"satisfaction_rate": round(
sum(1 for r in ratings if r >= 4) / len(ratings), 2
),
}

本章小结

下一章:完整的 LLM 评估实战案例。