监控和维护
确保LLM应用稳定运行并持续优化。
监控概览
graph TB
A[LLM应用] --> B[监控数据]
B --> C[日志收集]
B --> D[指标监控]
B --> E[用户反馈]
B --> F[成本追踪]
C --> G[分析]
D --> G
E --> G
F --> G
G --> H[优化决策]
H --> A
style A fill:#e1f5ff
style H fill:#c8e6c9
日志管理
1. 结构化日志
import logging
import json
from datetime import datetime
from typing import Dict, Any
class StructuredLogger:
"""结构化日志记录器"""
def __init__(self, name: str, level: int = logging.INFO):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(console_handler)
def log(self, level: str, message: str, **kwargs):
"""
记录结构化日志
Args:
level: 日志级别 (info, warning, error)
message: 日志消息
**kwargs: 额外字段
"""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": level.upper(),
"message": message,
**kwargs
}
log_str = json.dumps(log_data, ensure_ascii=False)
if level == "error":
self.logger.error(log_str)
elif level == "warning":
self.logger.warning(log_str)
else:
self.logger.info(log_str)
def info(self, message: str, **kwargs):
"""记录INFO级别日志"""
self.log("info", message, **kwargs)
def error(self, message: str, **kwargs):
"""记录ERROR级别日志"""
self.log("error", message, **kwargs)
def warning(self, message: str, **kwargs):
"""记录WARNING级别日志"""
self.log("warning", message, **kwargs)
# 使用
logger = StructuredLogger("llm-app")
logger.info(
"API调用",
endpoint="/generate",
query="Python快速排序",
user_id="12345"
)
2. 请求追踪
import uuid
from contextvars import ContextVar
# 请求ID上下文变量
request_id_var: ContextVar[str] = ContextVar('request_id')
def get_request_id() -> str:
"""获取请求ID"""
try:
return request_id_var.get()
except LookupError:
return "unknown"
def set_request_id() -> str:
"""设置请求ID"""
request_id = str(uuid.uuid4())
request_id_var.set(request_id)
return request_id
# 在API中使用
@app.post("/generate")
async def generate(request: QueryRequest):
# 设置请求ID
request_id = set_request_id()
logger.info(
"收到请求",
request_id=request_id,
query=request.query
)
try:
response = llm.invoke(request.query)
logger.info(
"生成完成",
request_id=request_id,
token_count=len(response.content)
)
return {"answer": response.content}
except Exception as e:
logger.error(
"生成失败",
request_id=request_id,
error=str(e)
)
raise
3. 日志轮转
from logging.handlers import RotatingFileHandler
import os
def setup_logging():
"""配置日志"""
# 确保日志目录存在
os.makedirs("logs", exist_ok=True)
# 文件处理器(10MB轮转,保留5个)
file_handler = RotatingFileHandler(
"logs/app.log",
maxBytes=10*1024*1024,
backupCount=5
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
# 配置根日志记录器
logging.basicConfig(
level=logging.INFO,
handlers=[file_handler, console_handler]
)
setup_logging()
指标监控
1. Prometheus指标
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time
# 定义指标
request_counter = Counter(
'requests_total',
'Total number of requests',
['endpoint', 'method']
)
request_duration = Histogram(
'request_duration_seconds',
'Request duration',
['endpoint']
)
active_connections = Gauge(
'active_connections',
'Number of active connections'
)
token_counter = Counter(
'tokens_generated_total',
'Total tokens generated',
['model']
)
error_counter = Counter(
'errors_total',
'Total number of errors',
['error_type']
)
# 使用中间件
from fastapi import Request, Response
@app.middleware("http")
async def track_metrics(request: Request, call_next):
"""追踪指标"""
start_time = time.time()
# 增加活跃连接
active_connections.inc()
try:
response: Response = await call_next(request)
# 记录请求
request_counter.labels(
endpoint=request.url.path,
method=request.method
).inc()
# 记录耗时
duration = time.time() - start_time
request_duration.labels(
endpoint=request.url.path
).observe(duration)
return response
except Exception as e:
# 记录错误
error_counter.labels(error_type=type(e).__name__).inc()
raise
finally:
# 减少活跃连接
active_connections.dec()
# 指标端点
@app.get("/metrics")
async def metrics():
"""Prometheus指标端点"""
return Response(generate_latest(), media_type="text/plain")
2. 自定义指标
from prometheus_client import Gauge
# LLM性能指标
latency_gauge = Gauge('llm_latency_seconds', 'LLM generation latency')
throughput_gauge = Gauge('llm_throughput_tokens_per_second', 'LLM throughput')
cache_hit_rate = Gauge('cache_hit_rate', 'Cache hit rate')
# 成本指标
api_cost_gauge = Gauge('api_cost_dollars', 'Total API cost in dollars')
def track_llm_call(start_time: float, token_count: int, cost: float):
"""追踪LLM调用"""
latency = time.time() - start_time
throughput = token_count / latency if latency > 0 else 0
latency_gauge.set(latency)
throughput_gauge.set(throughput)
api_cost_gauge.inc(cost)
成本追踪
1. Token计数
import tiktoken
from typing import Dict
class CostTracker:
"""成本追踪器"""
def __init__(self):
self.encoding = tiktoken.encoding_for_model("gpt-4o")
# 定价(美元/百万tokens)
self.pricing = {
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4o": {"input": 2.50, "output": 10.00},
"text-embedding-3-small": {"input": 0.02}
}
# 统计数据
self.stats = {
"input_tokens": 0,
"output_tokens": 0,
"costs": {}
}
def count_tokens(self, text: str, model: str = "gpt-4o") -> int:
"""
计算token数量
Args:
text: 文本
model: 模型名称
Returns:
token数量
"""
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
def track_request(
self,
input_text: str,
output_text: str,
model: str = "gpt-4o-mini"
) -> float:
"""
追踪请求
Args:
input_text: 输入文本
output_text: 输出文本
model: 模型名称
Returns:
成本(美元)
"""
input_tokens = self.count_tokens(input_text, model)
output_tokens = self.count_tokens(output_text, model)
# 计算成本
pricing = self.pricing.get(model, self.pricing["gpt-4o-mini"])
input_cost = input_tokens * pricing["input"] / 1_000_000
output_cost = output_tokens * pricing["output"] / 1_000_000
total_cost = input_cost + output_cost
# 更新统计
self.stats["input_tokens"] += input_tokens
self.stats["output_tokens"] += output_tokens
if model not in self.stats["costs"]:
self.stats["costs"][model] = 0
self.stats["costs"][model] += total_cost
return total_cost
def get_summary(self) -> Dict:
"""获取摘要"""
return {
"input_tokens": self.stats["input_tokens"],
"output_tokens": self.stats["output_tokens"],
"total_tokens": self.stats["input_tokens"] + self.stats["output_tokens"],
"costs_by_model": self.stats["costs"],
"total_cost": sum(self.stats["costs"].values())
}
# 使用
cost_tracker = CostTracker()
cost = cost_tracker.track_request(
input_text="写一个快速排序算法",
output_text="# 快速排序算法\ndef quicksort(arr): ...",
model="gpt-4o-mini"
)
print(f"本次调用成本: ${cost:.6f}")
print(f"总成本: ${cost_tracker.get_summary()['total_cost']:.4f}")
健康检查
from fastapi import HTTPException
from langchain_openai import ChatOpenAI
import os
# 初始化LLM
llm = ChatOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
@app.get("/health")
async def health_check():
"""
健康检查
检查:
1. API连接
2. 环境变量
3. 依赖服务
"""
checks = {
"api_connection": False,
"environment": False,
"dependencies": []
}
# 检查API连接
try:
llm.invoke("test")
checks["api_connection"] = True
except:
checks["api_connection"] = False
# 检查环境变量
if os.getenv("OPENAI_API_KEY"):
checks["environment"] = True
# 检查依赖
try:
import langchain
import openai
checks["dependencies"].append({"name": "langchain", "status": "OK"})
checks["dependencies"].append({"name": "openai", "status": "OK"})
except ImportError as e:
checks["dependencies"].append({"name": str(e), "status": "ERROR"})
# 判断整体状态
all_healthy = (
checks["api_connection"] and
checks["environment"] and
all(dep["status"] == "OK" for dep in checks["dependencies"])
)
status_code = 200 if all_healthy else 503
return {
"status": "healthy" if all_healthy else "unhealthy",
"checks": checks,
"timestamp": datetime.utcnow().isoformat()
}, status_code
告警系统
1. 错误率告警
from collections import deque
from datetime import datetime, timedelta
class ErrorRateMonitor:
"""错误率监控"""
def __init__(self, window_minutes: int = 5, threshold: float = 0.1):
"""
初始化监控器
Args:
window_minutes: 时间窗口(分钟)
threshold: 错误率阈值
"""
self.window = timedelta(minutes=window_minutes)
self.threshold = threshold
self.errors = deque()
self.total = deque()
def record(self, is_error: bool):
"""
记录一次请求
Args:
is_error: 是否为错误
"""
now = datetime.utcnow()
# 清理旧数据
while self.errors and now - self.errors[0] > self.window:
self.errors.popleft()
while self.total and now - self.total[0] > self.window:
self.total.popleft()
# 记录
self.total.append(now)
if is_error:
self.errors.append(now)
def get_error_rate(self) -> float:
"""获取错误率"""
if not self.total:
return 0.0
return len(self.errors) / len(self.total)
def should_alert(self) -> bool:
"""是否应该告警"""
return self.get_error_rate() > self.threshold
# 使用
error_monitor = ErrorRateMonitor(window_minutes=5, threshold=0.1)
# 在请求中记录
@app.post("/generate")
async def generate(request: QueryRequest):
try:
response = llm.invoke(request.query)
error_monitor.record(is_error=False)
return {"answer": response.content}
except Exception as e:
error_monitor.record(is_error=True)
if error_monitor.should_alert():
# 发送告警(邮件、Slack等)
send_alert("错误率超过阈值!")
raise
2. 成本告警
class CostAlert:
"""成本告警"""
def __init__(self, daily_limit: float = 10.0):
"""
初始化告警
Args:
daily_limit: 每日成本限制(美元)
"""
self.daily_limit = daily_limit
self.daily_cost = 0.0
self.last_reset = datetime.utcnow().date()
def add_cost(self, cost: float):
"""添加成本"""
today = datetime.utcnow().date()
# 每天重置
if today != self.last_reset:
self.daily_cost = 0.0
self.last_reset = today
self.daily_cost += cost
# 检查是否超限
if self.daily_cost > self.daily_limit:
send_alert(f"今日成本已超限!当前:${self.daily_cost:.2f}")
def get_status(self) -> dict:
"""获取状态"""
return {
"daily_cost": self.daily_cost,
"daily_limit": self.daily_limit,
"remaining": self.daily_limit - self.daily_cost,
"usage_percent": (self.daily_cost / self.daily_limit * 100)
}
用户反馈收集
from fastapi import Form
from pydantic import BaseModel
class FeedbackRequest(BaseModel):
"""反馈请求"""
query: str
answer: str
rating: int # 1-5星
comment: str = ""
@app.post("/feedback")
async def submit_feedback(feedback: FeedbackRequest):
"""提交用户反馈"""
# 记录到数据库
feedback_data = {
"timestamp": datetime.utcnow().isoformat(),
"query": feedback.query,
"answer": feedback.answer,
"rating": feedback.rating,
"comment": feedback.comment
}
# 保存到数据库或文件
save_feedback(feedback_data)
return {"status": "success"}
def save_feedback(data: dict):
"""保存反馈"""
import json
os.makedirs("feedback", exist_ok=True)
filename = f"feedback/{datetime.utcnow().strftime('%Y%m%d')}.jsonl"
with open(filename, 'a', encoding='utf-8') as f:
f.write(json.dumps(data, ensure_ascii=False) + '\n')
logger.info(
"收到用户反馈",
rating=data['rating'],
has_comment=bool(data['comment'])
)
监控仪表板
import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
st.title("📊 LLM应用监控")
# 加载日志
@st.cache_data(ttl=300)
def load_logs():
"""加载日志"""
logs = []
try:
with open("logs/app.log", 'r') as f:
for line in f:
try:
log = json.loads(line)
logs.append(log)
except:
pass
except:
pass
return pd.DataFrame(logs)
df = load_logs()
# 统计
col1, col2, col3 = st.columns(3)
col1.metric("总请求数", len(df))
col2.metric("错误数", len(df[df['level'] == 'ERROR']))
col3.metric("成功率", f"{(len(df[df['level'] != 'ERROR']) / len(df) * 100):.1f}%")
# 成本
st.subheader("💰 成本统计")
if not cost_tracker.stats["costs"]:
st.info("暂无成本数据")
else:
costs = cost_tracker.stats["costs"]
st.dataframe(
pd.DataFrame([
{"模型": model, "成本": f"${cost:.4f}"}
for model, cost in costs.items()
])
)
# 请求趋势
st.subchart("📈 请求趋势")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour
hourly_counts = df.groupby('hour').size()
fig, ax = plt.subplots()
hourly_counts.plot(ax=ax)
st.pyplot(fig)
学习要点
✅ 结构化日志便于分析和检索 ✅ Prometheus指标实现自动化监控 ✅ 成本追踪避免意外超支 ✅ 健康检查确保服务可用 ✅ 告警系统及时发现异常 ✅ 用户反馈帮助持续改进
下一步: 查看学习 资源和社区 📚