监控与可观测性
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read198 words

监控与可观测性

没有监控的 LLM 系统就像盲飞。可观测性是生产系统的"眼睛"。

可观测性三支柱

graph TB A[可观测性] --> B[指标 Metrics] A --> C[日志 Logs] A --> D[追踪 Traces] B --> B1[延迟 P50/P95/P99] B --> B2[吞吐量 QPS] B --> B3[错误率] B --> B4[Token 使用量] B --> B5[成本] C --> C1[请求日志] C --> C2[错误日志] C --> C3[审计日志] D --> D1[请求链路] D --> D2[耗时分布] D --> D3[依赖调用] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:2px

指标收集系统

"""
LLM 指标收集
"""
import time
from collections import defaultdict, deque
from dataclasses import dataclass, field
class MetricsCollector:
"""指标收集器 (Prometheus 风格)"""
def __init__(self):
self.counters: defaultdict = defaultdict(int)
self.histograms: defaultdict = defaultdict(list)
self.gauges: dict = {}
def inc_counter(self, name: str, labels: dict = None, value: int = 1):
"""递增计数器"""
key = self._make_key(name, labels)
self.counters[key] += value
def observe_histogram(
self, name: str, value: float, labels: dict = None
):
"""记录直方图值"""
key = self._make_key(name, labels)
self.histograms[key].append(value)
def set_gauge(self, name: str, value: float, labels: dict = None):
"""设置仪表盘值"""
key = self._make_key(name, labels)
self.gauges[key] = value
def _make_key(self, name: str, labels: dict = None) -> str:
if labels:
label_str = ",".join(f'{k}="{v}"' for k, v in sorted(labels.items()))
return f"{name}{{{label_str}}}"
return name
def get_histogram_stats(self, name: str, labels: dict = None) -> dict:
"""获取直方图统计"""
key = self._make_key(name, labels)
values = self.histograms.get(key, [])
if not values:
return {}
sorted_v = sorted(values)
n = len(sorted_v)
return {
"count": n,
"avg": round(sum(values) / n, 2),
"p50": sorted_v[n // 2],
"p95": sorted_v[int(n * 0.95)],
"p99": sorted_v[int(n * 0.99)],
"min": sorted_v[0],
"max": sorted_v[-1],
}
def export_prometheus(self) -> str:
"""导出 Prometheus 格式"""
lines = []
for key, value in self.counters.items():
lines.append(f"{key} {value}")
for key, values in self.histograms.items():
stats = self.get_histogram_stats(key)
lines.append(f"{key}_count {stats.get('count', 0)}")
lines.append(f"{key}_avg {stats.get('avg', 0)}")
for key, value in self.gauges.items():
lines.append(f"{key} {value}")
return "\n".join(lines)
# 全局指标实例
metrics = MetricsCollector()
class LLMRequestTracker:
"""LLM 请求追踪"""
def __init__(self, collector: MetricsCollector):
self.metrics = collector
def track_request(
self,
model: str,
latency_ms: float,
tokens_in: int,
tokens_out: int,
success: bool,
cached: bool = False,
):
"""追踪一次请求"""
labels = {"model": model}
# 请求计数
self.metrics.inc_counter("llm_requests_total", labels)
if not success:
self.metrics.inc_counter("llm_errors_total", labels)
if cached:
self.metrics.inc_counter("llm_cache_hits_total", labels)
# 延迟
self.metrics.observe_histogram(
"llm_request_duration_ms", latency_ms, labels
)
# Token
self.metrics.inc_counter(
"llm_tokens_input_total", labels, tokens_in
)
self.metrics.inc_counter(
"llm_tokens_output_total", labels, tokens_out
)
# 使用
tracker = LLMRequestTracker(metrics)
# 模拟请求
import random
for _ in range(100):
model = random.choice(["gpt-4o", "gpt-4o-mini"])
latency = random.gauss(500 if model == "gpt-4o" else 200, 100)
tracker.track_request(
model=model,
latency_ms=max(50, latency),
tokens_in=random.randint(100, 500),
tokens_out=random.randint(50, 300),
success=random.random() > 0.02,
cached=random.random() < 0.3,
)
# 查看统计
print("=== LLM 延迟统计 ===")
for model in ["gpt-4o", "gpt-4o-mini"]:
stats = metrics.get_histogram_stats(
"llm_request_duration_ms", {"model": model}
)
if stats:
print(f"\n  {model}:")
print(f"    请求数: {stats['count']}")
print(f"    P50: {stats['p50']:.0f}ms")
print(f"    P95: {stats['p95']:.0f}ms")
print(f"    P99: {stats['p99']:.0f}ms")

结构化日志

"""
结构化日志系统
"""
import json
import time
import uuid
class StructuredLogger:
"""结构化日志"""
def __init__(self, service: str = "llm-service"):
self.service = service
self.logs: list[dict] = []
def log(
self,
level: str,
message: str,
request_id: str = None,
**extra,
) -> dict:
"""记录结构化日志"""
entry = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
"level": level,
"service": self.service,
"message": message,
"request_id": request_id or str(uuid.uuid4())[:8],
**extra,
}
self.logs.append(entry)
# 实际中写入文件或发送到日志系统
if level in ("ERROR", "WARN"):
print(json.dumps(entry, ensure_ascii=False))
return entry
def log_llm_request(
self,
request_id: str,
model: str,
prompt_tokens: int,
completion_tokens: int,
latency_ms: float,
status: str,
error: str = None,
) -> dict:
"""记录 LLM 请求日志"""
return self.log(
level="ERROR" if error else "INFO",
message=f"LLM request {'failed' if error else 'completed'}",
request_id=request_id,
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
latency_ms=round(latency_ms, 1),
status=status,
error=error,
)
logger = StructuredLogger()
# 成功请求
logger.log_llm_request(
request_id="req-abc123",
model="gpt-4o-mini",
prompt_tokens=150,
completion_tokens=80,
latency_ms=234.5,
status="success",
)
# 失败请求
logger.log_llm_request(
request_id="req-def456",
model="gpt-4o",
prompt_tokens=500,
completion_tokens=0,
latency_ms=5000,
status="error",
error="RateLimitError: 429 Too Many Requests",
)

分布式追踪

"""
LLM 请求链路追踪
"""
import time
import uuid
@dataclass
class Span:
"""追踪 Span"""
name: str
trace_id: str
span_id: str
parent_id: str = None
start_time: float = 0
end_time: float = 0
attributes: dict = field(default_factory=dict)
@property
def duration_ms(self) -> float:
return (self.end_time - self.start_time) * 1000
class SimpleTracer:
"""简易链路追踪"""
def __init__(self):
self.spans: list[Span] = []
def start_trace(self, name: str) -> Span:
"""开始追踪"""
span = Span(
name=name,
trace_id=str(uuid.uuid4())[:8],
span_id=str(uuid.uuid4())[:8],
start_time=time.time(),
)
self.spans.append(span)
return span
def start_child(self, parent: Span, name: str) -> Span:
"""创建子 Span"""
span = Span(
name=name,
trace_id=parent.trace_id,
span_id=str(uuid.uuid4())[:8],
parent_id=parent.span_id,
start_time=time.time(),
)
self.spans.append(span)
return span
def end_span(self, span: Span, **attrs) -> None:
"""结束 Span"""
span.end_time = time.time()
span.attributes.update(attrs)
def print_trace(self, trace_id: str) -> None:
"""打印追踪链路"""
trace_spans = [s for s in self.spans if s.trace_id == trace_id]
trace_spans.sort(key=lambda s: s.start_time)
print(f"\n=== Trace: {trace_id} ===")
for span in trace_spans:
indent = "  " if span.parent_id else ""
dur = span.duration_ms
print(f"{indent}[{span.name}] {dur:.1f}ms")
for k, v in span.attributes.items():
print(f"{indent}  {k}: {v}")
# 使用:追踪完整请求链路
tracer = SimpleTracer()
# 模拟一个完整请求
root = tracer.start_trace("handle_request")
# 1. 缓存查询
cache_span = tracer.start_child(root, "cache_lookup")
time.sleep(0.005)  # 模拟 5ms
tracer.end_span(cache_span, hit=False)
# 2. RAG 检索
rag_span = tracer.start_child(root, "rag_retrieval")
time.sleep(0.05)  # 模拟 50ms
tracer.end_span(rag_span, docs_found=3)
# 3. LLM 调用
llm_span = tracer.start_child(root, "llm_generate")
time.sleep(0.2)  # 模拟 200ms
tracer.end_span(llm_span, model="gpt-4o-mini", tokens=150)
# 4. 后处理
post_span = tracer.start_child(root, "post_process")
time.sleep(0.01)
tracer.end_span(post_span, filtered=False)
tracer.end_span(root, status="success")
tracer.print_trace(root.trace_id)

告警规则

"""
告警规则引擎
"""
class AlertRule:
"""告警规则"""
def __init__(
self,
name: str,
condition: str,
threshold: float,
severity: str = "warning",
):
self.name = name
self.condition = condition
self.threshold = threshold
self.severity = severity
def check(self, value: float) -> bool:
if self.condition == "gt":
return value > self.threshold
elif self.condition == "lt":
return value < self.threshold
elif self.condition == "gte":
return value >= self.threshold
return False
class AlertManager:
"""告警管理"""
def __init__(self):
self.rules: list[AlertRule] = []
self.fired_alerts: list[dict] = []
def add_rule(self, rule: AlertRule) -> None:
self.rules.append(rule)
def evaluate(self, metric_name: str, value: float) -> list[dict]:
"""评估告警规则"""
alerts = []
for rule in self.rules:
if rule.name.startswith(metric_name) and rule.check(value):
alert = {
"rule": rule.name,
"severity": rule.severity,
"value": value,
"threshold": rule.threshold,
"time": time.strftime("%H:%M:%S"),
}
alerts.append(alert)
self.fired_alerts.append(alert)
return alerts
# 常用告警规则
alert_mgr = AlertManager()
alert_mgr.add_rule(AlertRule(
"latency_p95_high", "gt", 2000, "warning"
))
alert_mgr.add_rule(AlertRule(
"latency_p99_high", "gt", 5000, "critical"
))
alert_mgr.add_rule(AlertRule(
"error_rate_high", "gt", 0.05, "critical"
))
alert_mgr.add_rule(AlertRule(
"cost_daily_high", "gt", 100, "warning"
))
print("已配置规则:")
for r in alert_mgr.rules:
print(f"  [{r.severity}] {r.name}: {r.condition} {r.threshold}")

Grafana 面板配置

面板 指标 刷新 告警
请求量 llm_requests_total 10s QPS 突增
延迟分布 llm_request_duration_ms 10s P95 > 2s
错误率 llm_errors_total / total 10s > 5%
Token 消耗 llm_tokens_*_total 1m 日用量超标
缓存命中 llm_cache_hits / total 1m 命中率下降
成本 计算得出 5m 日预算 80%

本章小结

下一章:稳定性保障与故障排查。