可观测性与监控
High Contrast
Dark Mode
Light Mode
Sepia
Forest
2 min read425 words

可观测性与监控

生产环境的 RAG 系统是一个黑箱——用户提问,系统回答,但中间发生了什么?可观测性(Observability)让你看清每一步,从查询到检索到生成的完整链路。

RAG 可观测性全景

graph TB A[RAG 可观测性] --> B[链路追踪
Tracing] A --> C[指标监控
Metrics] A --> D[日志记录
Logging] A --> E[质量评估
Evaluation] B --> B1[查询 → 检索 → 生成
全链路 Span] C --> C1[延迟/吞吐/成本
Token 用量] D --> D1[错误日志
异常检索记录] E --> E1[忠实度/相关性
幻觉率检测] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px style B fill:#fff3e0,stroke:#f57c00,stroke-width:2px style C fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style D fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px style E fill:#ffebee,stroke:#c62828,stroke-width:2px

链路追踪

"""
RAG 链路追踪系统
"""
from dataclasses import dataclass, field
import time
import uuid
@dataclass
class Span:
"""追踪 Span"""
name: str
trace_id: str
span_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
parent_id: str = ""
start_time: float = 0.0
end_time: float = 0.0
attributes: dict = field(default_factory=dict)
events: list[dict] = field(default_factory=list)
status: str = "ok"
@property
def duration_ms(self) -> float:
return (self.end_time - self.start_time) * 1000
class RAGTracer:
"""RAG 链路追踪器"""
def __init__(self):
self.spans: list[Span] = []
self._current_trace: str = ""
def start_trace(self, query: str) -> str:
"""开始一次追踪"""
trace_id = uuid.uuid4().hex[:16]
self._current_trace = trace_id
root_span = Span(
name="rag_pipeline",
trace_id=trace_id,
start_time=time.time(),
attributes={"query": query},
)
self.spans.append(root_span)
return trace_id
def start_span(self, name: str, parent_id: str = "") -> Span:
"""开始一个子 Span"""
span = Span(
name=name,
trace_id=self._current_trace,
parent_id=parent_id,
start_time=time.time(),
)
self.spans.append(span)
return span
def end_span(self, span: Span, **attributes) -> None:
"""结束 Span 并记录属性"""
span.end_time = time.time()
span.attributes.update(attributes)
def get_trace_summary(self, trace_id: str) -> dict:
"""获取追踪摘要"""
trace_spans = [s for s in self.spans if s.trace_id == trace_id]
return {
"trace_id": trace_id,
"total_spans": len(trace_spans),
"total_duration_ms": sum(s.duration_ms for s in trace_spans if s.end_time > 0),
"spans": [
{
"name": s.name,
"duration_ms": round(s.duration_ms, 2),
"status": s.status,
"attributes": s.attributes,
}
for s in trace_spans
],
}

核心监控指标

类别 指标 说明 告警阈值(参考)
延迟 P50/P95/P99 端到端响应时间 P95 > 5s
检索 召回文档数 每次检索返回的文档 0 文档率 > 5%
检索 相关文档占比 检索结果中相关的比例 < 30%
生成 Token 输入/输出 LLM 消耗的 Token 单次 > 10K
质量 幻觉率 回答中无来源支持的比例 > 15%
成本 每次查询成本 API 费用 + 基础设施 单次 > $0.05
可用性 错误率 失败请求占比 > 1%
"""
RAG 指标收集器
"""
from dataclasses import dataclass, field
from collections import defaultdict
import time
@dataclass
class MetricPoint:
"""指标数据点"""
name: str
value: float
timestamp: float = field(default_factory=time.time)
labels: dict = field(default_factory=dict)
class RAGMetricsCollector:
"""RAG 指标收集器"""
def __init__(self):
self._metrics: dict[str, list[float]] = defaultdict(list)
self._counters: dict[str, int] = defaultdict(int)
def record_latency(self, stage: str, duration_ms: float) -> None:
"""记录延迟"""
self._metrics[f"latency_{stage}"].append(duration_ms)
def record_retrieval(self, total_docs: int, relevant_docs: int) -> None:
"""记录检索指标"""
self._metrics["retrieval_total"].append(total_docs)
self._metrics["retrieval_relevant"].append(relevant_docs)
if total_docs == 0:
self._counters["empty_retrieval"] += 1
def record_tokens(self, input_tokens: int, output_tokens: int) -> None:
"""记录 Token 用量"""
self._metrics["tokens_input"].append(input_tokens)
self._metrics["tokens_output"].append(output_tokens)
def record_error(self, error_type: str) -> None:
"""记录错误"""
self._counters[f"error_{error_type}"] += 1
def get_percentile(self, metric_name: str, percentile: int) -> float:
"""计算百分位数"""
values = sorted(self._metrics.get(metric_name, []))
if not values:
return 0.0
idx = int(len(values) * percentile / 100)
return values[min(idx, len(values) - 1)]
def get_summary(self) -> dict:
"""获取指标摘要"""
latencies = self._metrics.get("latency_total", [])
return {
"total_queries": len(latencies),
"latency_p50": self.get_percentile("latency_total", 50),
"latency_p95": self.get_percentile("latency_total", 95),
"latency_p99": self.get_percentile("latency_total", 99),
"avg_tokens_input": (
sum(self._metrics["tokens_input"]) / len(self._metrics["tokens_input"])
if self._metrics["tokens_input"] else 0
),
"empty_retrieval_count": self._counters["empty_retrieval"],
"error_counts": {
k: v for k, v in self._counters.items() if k.startswith("error_")
},
}

可观测性工具生态

graph LR A[LLM 可观测性] --> B[LangSmith] A --> C[LangFuse] A --> D[Phoenix/Arize] A --> E[Weights & Biases] B --> B1[LangChain 原生
Playground 调试] C --> C1[开源自托管
成本追踪] D --> D1[实时追踪
Embedding 可视化] E --> E1[实验管理
模型对比] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
工具 开源 自托管 特色 适合场景
LangFuse 成本追踪、评估集成 中小团队
LangSmith LangChain 深度集成 LangChain 用户
Phoenix Embedding 可视化 调试嵌入质量
OpenLLMetry OpenTelemetry 兼容 已有 APM 体系
Helicone 代理模式,零侵入 快速接入

实战:告警规则

"""
RAG 告警规则引擎
"""
from dataclasses import dataclass
from enum import Enum
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class AlertRule:
"""告警规则"""
name: str
metric: str
threshold: float
operator: str  # gt, lt, eq
severity: AlertSeverity
window_minutes: int = 5
class RAGAlertEngine:
"""RAG 告警引擎"""
DEFAULT_RULES = [
AlertRule("高延迟", "latency_p95", 5000, "gt", AlertSeverity.WARNING),
AlertRule("空检索过多", "empty_retrieval_rate", 0.05, "gt", AlertSeverity.WARNING),
AlertRule("高错误率", "error_rate", 0.01, "gt", AlertSeverity.CRITICAL),
AlertRule("高 Token 消耗", "avg_tokens_input", 8000, "gt", AlertSeverity.INFO),
]
def __init__(self, metrics: RAGMetricsCollector, rules: list[AlertRule] | None = None):
self.metrics = metrics
self.rules = rules or self.DEFAULT_RULES
def check_alerts(self) -> list[dict]:
"""检查所有告警规则"""
summary = self.metrics.get_summary()
alerts = []
for rule in self.rules:
value = summary.get(rule.metric, 0)
triggered = False
if rule.operator == "gt" and value > rule.threshold:
triggered = True
elif rule.operator == "lt" and value < rule.threshold:
triggered = True
if triggered:
alerts.append({
"rule": rule.name,
"severity": rule.severity.value,
"metric": rule.metric,
"current_value": value,
"threshold": rule.threshold,
})
return alerts

本章小结

主题 要点
链路追踪 记录查询到生成的每一步耗时和属性
核心指标 延迟 P95、检索相关率、幻觉率、成本
工具选择 LangFuse 开源自托管,LangSmith 深度集成
告警规则 基于阈值自动触发,分级处理
关键原则 先有可观测性,再谈优化

下一章:安全与权限控制