可观测性与监控
生产环境的 RAG 系统是一个黑箱——用户提问,系统回答,但中间发生了什么?可观测性(Observability)让你看清每一步,从查询到检索到生成的完整链路。
RAG 可观测性全景
graph TB
A[RAG 可观测性] --> B[链路追踪
Tracing] A --> C[指标监控
Metrics] A --> D[日志记录
Logging] A --> E[质量评估
Evaluation] B --> B1[查询 → 检索 → 生成
全链路 Span] C --> C1[延迟/吞吐/成本
Token 用量] D --> D1[错误日志
异常检索记录] E --> E1[忠实度/相关性
幻觉率检测] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px style B fill:#fff3e0,stroke:#f57c00,stroke-width:2px style C fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style D fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px style E fill:#ffebee,stroke:#c62828,stroke-width:2px
Tracing] A --> C[指标监控
Metrics] A --> D[日志记录
Logging] A --> E[质量评估
Evaluation] B --> B1[查询 → 检索 → 生成
全链路 Span] C --> C1[延迟/吞吐/成本
Token 用量] D --> D1[错误日志
异常检索记录] E --> E1[忠实度/相关性
幻觉率检测] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px style B fill:#fff3e0,stroke:#f57c00,stroke-width:2px style C fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style D fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px style E fill:#ffebee,stroke:#c62828,stroke-width:2px
链路追踪
"""
RAG 链路追踪系统
"""
from dataclasses import dataclass, field
import time
import uuid
@dataclass
class Span:
"""追踪 Span"""
name: str
trace_id: str
span_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
parent_id: str = ""
start_time: float = 0.0
end_time: float = 0.0
attributes: dict = field(default_factory=dict)
events: list[dict] = field(default_factory=list)
status: str = "ok"
@property
def duration_ms(self) -> float:
return (self.end_time - self.start_time) * 1000
class RAGTracer:
"""RAG 链路追踪器"""
def __init__(self):
self.spans: list[Span] = []
self._current_trace: str = ""
def start_trace(self, query: str) -> str:
"""开始一次追踪"""
trace_id = uuid.uuid4().hex[:16]
self._current_trace = trace_id
root_span = Span(
name="rag_pipeline",
trace_id=trace_id,
start_time=time.time(),
attributes={"query": query},
)
self.spans.append(root_span)
return trace_id
def start_span(self, name: str, parent_id: str = "") -> Span:
"""开始一个子 Span"""
span = Span(
name=name,
trace_id=self._current_trace,
parent_id=parent_id,
start_time=time.time(),
)
self.spans.append(span)
return span
def end_span(self, span: Span, **attributes) -> None:
"""结束 Span 并记录属性"""
span.end_time = time.time()
span.attributes.update(attributes)
def get_trace_summary(self, trace_id: str) -> dict:
"""获取追踪摘要"""
trace_spans = [s for s in self.spans if s.trace_id == trace_id]
return {
"trace_id": trace_id,
"total_spans": len(trace_spans),
"total_duration_ms": sum(s.duration_ms for s in trace_spans if s.end_time > 0),
"spans": [
{
"name": s.name,
"duration_ms": round(s.duration_ms, 2),
"status": s.status,
"attributes": s.attributes,
}
for s in trace_spans
],
}
核心监控指标
| 类别 | 指标 | 说明 | 告警阈值(参考) |
|---|---|---|---|
| 延迟 | P50/P95/P99 | 端到端响应时间 | P95 > 5s |
| 检索 | 召回文档数 | 每次检索返回的文档 | 0 文档率 > 5% |
| 检索 | 相关文档占比 | 检索结果中相关的比例 | < 30% |
| 生成 | Token 输入/输出 | LLM 消耗的 Token | 单次 > 10K |
| 质量 | 幻觉率 | 回答中无来源支持的比例 | > 15% |
| 成本 | 每次查询成本 | API 费用 + 基础设施 | 单次 > $0.05 |
| 可用性 | 错误率 | 失败请求占比 | > 1% |
"""
RAG 指标收集器
"""
from dataclasses import dataclass, field
from collections import defaultdict
import time
@dataclass
class MetricPoint:
"""指标数据点"""
name: str
value: float
timestamp: float = field(default_factory=time.time)
labels: dict = field(default_factory=dict)
class RAGMetricsCollector:
"""RAG 指标收集器"""
def __init__(self):
self._metrics: dict[str, list[float]] = defaultdict(list)
self._counters: dict[str, int] = defaultdict(int)
def record_latency(self, stage: str, duration_ms: float) -> None:
"""记录延迟"""
self._metrics[f"latency_{stage}"].append(duration_ms)
def record_retrieval(self, total_docs: int, relevant_docs: int) -> None:
"""记录检索指标"""
self._metrics["retrieval_total"].append(total_docs)
self._metrics["retrieval_relevant"].append(relevant_docs)
if total_docs == 0:
self._counters["empty_retrieval"] += 1
def record_tokens(self, input_tokens: int, output_tokens: int) -> None:
"""记录 Token 用量"""
self._metrics["tokens_input"].append(input_tokens)
self._metrics["tokens_output"].append(output_tokens)
def record_error(self, error_type: str) -> None:
"""记录错误"""
self._counters[f"error_{error_type}"] += 1
def get_percentile(self, metric_name: str, percentile: int) -> float:
"""计算百分位数"""
values = sorted(self._metrics.get(metric_name, []))
if not values:
return 0.0
idx = int(len(values) * percentile / 100)
return values[min(idx, len(values) - 1)]
def get_summary(self) -> dict:
"""获取指标摘要"""
latencies = self._metrics.get("latency_total", [])
return {
"total_queries": len(latencies),
"latency_p50": self.get_percentile("latency_total", 50),
"latency_p95": self.get_percentile("latency_total", 95),
"latency_p99": self.get_percentile("latency_total", 99),
"avg_tokens_input": (
sum(self._metrics["tokens_input"]) / len(self._metrics["tokens_input"])
if self._metrics["tokens_input"] else 0
),
"empty_retrieval_count": self._counters["empty_retrieval"],
"error_counts": {
k: v for k, v in self._counters.items() if k.startswith("error_")
},
}
可观测性工具生态
graph LR
A[LLM 可观测性] --> B[LangSmith]
A --> C[LangFuse]
A --> D[Phoenix/Arize]
A --> E[Weights & Biases]
B --> B1[LangChain 原生
Playground 调试] C --> C1[开源自托管
成本追踪] D --> D1[实时追踪
Embedding 可视化] E --> E1[实验管理
模型对比] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
Playground 调试] C --> C1[开源自托管
成本追踪] D --> D1[实时追踪
Embedding 可视化] E --> E1[实验管理
模型对比] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
| 工具 | 开源 | 自托管 | 特色 | 适合场景 |
|---|---|---|---|---|
| LangFuse | ✅ | ✅ | 成本追踪、评估集成 | 中小团队 |
| LangSmith | ❌ | ❌ | LangChain 深度集成 | LangChain 用户 |
| Phoenix | ✅ | ✅ | Embedding 可视化 | 调试嵌入质量 |
| OpenLLMetry | ✅ | ✅ | OpenTelemetry 兼容 | 已有 APM 体系 |
| Helicone | ✅ | ✅ | 代理模式,零侵入 | 快速接入 |
实战:告警规则
"""
RAG 告警规则引擎
"""
from dataclasses import dataclass
from enum import Enum
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class AlertRule:
"""告警规则"""
name: str
metric: str
threshold: float
operator: str # gt, lt, eq
severity: AlertSeverity
window_minutes: int = 5
class RAGAlertEngine:
"""RAG 告警引擎"""
DEFAULT_RULES = [
AlertRule("高延迟", "latency_p95", 5000, "gt", AlertSeverity.WARNING),
AlertRule("空检索过多", "empty_retrieval_rate", 0.05, "gt", AlertSeverity.WARNING),
AlertRule("高错误率", "error_rate", 0.01, "gt", AlertSeverity.CRITICAL),
AlertRule("高 Token 消耗", "avg_tokens_input", 8000, "gt", AlertSeverity.INFO),
]
def __init__(self, metrics: RAGMetricsCollector, rules: list[AlertRule] | None = None):
self.metrics = metrics
self.rules = rules or self.DEFAULT_RULES
def check_alerts(self) -> list[dict]:
"""检查所有告警规则"""
summary = self.metrics.get_summary()
alerts = []
for rule in self.rules:
value = summary.get(rule.metric, 0)
triggered = False
if rule.operator == "gt" and value > rule.threshold:
triggered = True
elif rule.operator == "lt" and value < rule.threshold:
triggered = True
if triggered:
alerts.append({
"rule": rule.name,
"severity": rule.severity.value,
"metric": rule.metric,
"current_value": value,
"threshold": rule.threshold,
})
return alerts
本章小结
| 主题 | 要点 |
|---|---|
| 链路追踪 | 记录查询到生成的每一步耗时和属性 |
| 核心指标 | 延迟 P95、检索相关率、幻觉率、成本 |
| 工具选择 | LangFuse 开源自托管,LangSmith 深度集成 |
| 告警规则 | 基于阈值自动触发,分级处理 |
| 关键原则 | 先有可观测性,再谈优化 |
下一章:安全与权限控制