生产环境部署
将 RAG 系统从 Notebook 实验推向生产环境,需要考虑性能、可靠性、可观测性和成本控制。
生产架构
graph TB
subgraph 用户层
U[用户请求]
end
subgraph API网关
GW[负载均衡]
CACHE[语义缓存]
end
subgraph 服务层
API[RAG API Service]
QUEUE[异步队列]
end
subgraph 检索层
VDB[向量数据库集群]
ES[全文搜索]
RERANKER[重排序服务]
end
subgraph 生成层
LLM1[LLM Provider A]
LLM2[LLM Provider B]
end
subgraph 可观测性
LOG[日志系统]
METRIC[指标监控]
TRACE[链路追踪]
end
U --> GW
GW --> CACHE
CACHE -->|缓存未命中| API
API --> QUEUE
API --> VDB
API --> ES
VDB --> RERANKER
ES --> RERANKER
RERANKER --> LLM1
RERANKER --> LLM2
API --> LOG
API --> METRIC
API --> TRACE
style GW fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style CACHE fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style API fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
FastAPI 生产服务
"""
RAG 生产 API 服务
"""
import hashlib
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
# 请求/响应模型
class QueryRequest(BaseModel):
question: str
top_k: int = 5
use_rerank: bool = True
stream: bool = False
class QueryResponse(BaseModel):
answer: str
sources: list[dict]
latency_ms: float
cached: bool = False
class RAGService:
"""RAG 核心服务"""
def __init__(self):
self.vector_store = None # 初始化向量存储
self.llm_client = None # 初始化 LLM 客户端
self.cache = {} # 简单内存缓存(生产用 Redis)
self.metrics = {
"total_queries": 0,
"cache_hits": 0,
"avg_latency_ms": 0,
}
async def initialize(self):
"""异步初始化资源"""
print("正在初始化 RAG 服务...")
# 初始化向量数据库连接
# 初始化 LLM 客户端
# 预热缓存
print("RAG 服务初始化完成")
async def query(self, request: QueryRequest) -> QueryResponse:
"""处理 RAG 查询"""
start = time.time()
self.metrics["total_queries"] += 1
# 1. 检查缓存
cache_key = self._cache_key(request.question)
if cache_key in self.cache:
self.metrics["cache_hits"] += 1
cached = self.cache[cache_key]
cached["cached"] = True
return QueryResponse(**cached)
# 2. 检索
docs = await self._retrieve(request.question, request.top_k)
# 3. 重排序(可选)
if request.use_rerank and docs:
docs = await self._rerank(request.question, docs)
# 4. 生成
answer, sources = await self._generate(request.question, docs)
# 5. 构造响应
latency = (time.time() - start) * 1000
response = {
"answer": answer,
"sources": sources,
"latency_ms": round(latency, 2),
"cached": False,
}
# 6. 更新缓存
self.cache[cache_key] = response
return QueryResponse(**response)
def _cache_key(self, question: str) -> str:
"""生成缓存键"""
normalized = question.strip().lower()
return hashlib.md5(normalized.encode()).hexdigest()
async def _retrieve(self, question: str, top_k: int) -> list[dict]:
"""检索相关文档"""
# 调用向量数据库
return []
async def _rerank(self, question: str, docs: list[dict]) -> list[dict]:
"""重排序"""
return docs
async def _generate(
self, question: str, docs: list[dict]
) -> tuple[str, list[dict]]:
"""生成回答"""
return "示例回答", []
# 全局服务实例
rag_service = RAGService()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期"""
await rag_service.initialize()
yield
print("RAG 服务关闭")
app = FastAPI(title="RAG API", version="1.0.0", lifespan=lifespan)
@app.post("/api/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""处理 RAG 查询"""
try:
return await rag_service.query(request)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/health")
async def health():
"""健康检查"""
return {
"status": "healthy",
"metrics": rag_service.metrics,
}
语义缓存
"""
语义缓存: 相似问题复用已有回答
"""
import numpy as np
class SemanticCache:
"""基于语义相似度的缓存"""
def __init__(self, similarity_threshold: float = 0.92):
self.threshold = similarity_threshold
self.entries: list[dict] = []
# entries = [{"embedding": [...], "question": "...", "response": {...}}]
def get(self, question: str, embedding: list[float]) -> dict | None:
"""
查找语义相似的缓存项
"""
if not self.entries:
return None
q_vec = np.array(embedding)
best_score = 0
best_entry = None
for entry in self.entries:
cached_vec = np.array(entry["embedding"])
score = np.dot(q_vec, cached_vec) / (
np.linalg.norm(q_vec) * np.linalg.norm(cached_vec)
)
if score > best_score:
best_score = score
best_entry = entry
if best_score >= self.threshold and best_entry:
print(f"缓存命中! 相似度: {best_score:.4f}")
print(f" 原始问题: {best_entry['question']}")
return best_entry["response"]
return None
def put(
self, question: str, embedding: list[float], response: dict
) -> None:
"""写入缓存"""
self.entries.append({
"question": question,
"embedding": embedding,
"response": response,
})
def clear(self) -> None:
"""清空缓存"""
self.entries.clear()
@property
def size(self) -> int:
return len(self.entries)
成本控制
| 优化策略 | 效果 | 实现难度 |
|---|---|---|
| 语义缓存 | 减少 30-50% LLM 调用 | 中 |
| 小模型预过滤 | 减少 20-30% 无效查询 | 低 |
| Embedding 批处理 | 降低 API 调用次数 | 低 |
| 自动降级 (GPT-4→3.5) | 降低 50-80% Token 成本 | 中 |
| 结果缓存 + TTL | 减少重复计算 | 低 |
"""
成本控制: 自动降级策略
"""
class CostController:
"""成本控制器"""
def __init__(self, daily_budget_usd: float = 50.0):
self.daily_budget = daily_budget_usd
self.today_cost = 0.0
self.model_costs = {
"gpt-4o": {"input": 2.5, "output": 10.0}, # $/1M tokens
"gpt-4o-mini": {"input": 0.15, "output": 0.6}, # $/1M tokens
}
def select_model(self, question: str) -> str:
"""
根据预算和问题复杂度选择模型
"""
remaining = self.daily_budget - self.today_cost
# 预算不足,降级
if remaining < self.daily_budget * 0.2:
print("预算不足 20%,降级到 gpt-4o-mini")
return "gpt-4o-mini"
# 简单问题用小模型
if len(question) < 50 and "?" not in question:
return "gpt-4o-mini"
return "gpt-4o"
def record_usage(
self, model: str, input_tokens: int, output_tokens: int
) -> float:
"""记录使用量"""
costs = self.model_costs.get(model, self.model_costs["gpt-4o"])
cost = (
input_tokens * costs["input"]
+ output_tokens * costs["output"]
) / 1_000_000
self.today_cost += cost
return cost
可观测性
"""
RAG 链路追踪和指标收集
"""
import time
import uuid
class RAGTracer:
"""RAG 请求追踪器"""
def __init__(self):
self.traces: list[dict] = []
def start_trace(self, question: str) -> str:
"""开始追踪"""
trace_id = str(uuid.uuid4())[:8]
trace = {
"trace_id": trace_id,
"question": question,
"start_time": time.time(),
"steps": [],
}
self.traces.append(trace)
return trace_id
def add_step(
self, trace_id: str, step_name: str, duration_ms: float, metadata: dict = None
) -> None:
"""记录步骤"""
for trace in self.traces:
if trace["trace_id"] == trace_id:
trace["steps"].append({
"name": step_name,
"duration_ms": round(duration_ms, 2),
"metadata": metadata or {},
})
break
def end_trace(self, trace_id: str) -> dict:
"""结束追踪,输出汇总"""
for trace in self.traces:
if trace["trace_id"] == trace_id:
total_ms = (time.time() - trace["start_time"]) * 1000
trace["total_ms"] = round(total_ms, 2)
print(f"\n=== Trace {trace_id} ===")
print(f"问题: {trace['question'][:50]}...")
for step in trace["steps"]:
print(f" [{step['duration_ms']:.0f}ms] {step['name']}")
print(f" 总耗时: {total_ms:.0f}ms")
return trace
return {}
# 使用
tracer = RAGTracer()
trace_id = tracer.start_trace("什么是 RAG?")
tracer.add_step(trace_id, "embedding", 50.0)
tracer.add_step(trace_id, "vector_search", 30.0, {"top_k": 5, "results": 5})
tracer.add_step(trace_id, "rerank", 120.0, {"input": 5, "output": 3})
tracer.add_step(trace_id, "llm_generate", 800.0, {"model": "gpt-4o", "tokens": 500})
result = tracer.end_trace(trace_id)
部署清单
graph LR
A[部署前] --> B[部署中] --> C[部署后]
A --> A1[性能测试通过]
A --> A2[评估指标达标]
A --> A3[成本预估完成]
B --> B1[灰度发布]
B --> B2[健康检查]
B --> B3[回滚方案]
C --> C1[监控告警]
C --> C2[日志收集]
C --> C3[定期评估]
style A fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style B fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style C fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
本章小结
- 生产 RAG 系统需要关注性能、可靠性、成本三个维度
- 语义缓存可以大幅减少 LLM 调用成本
- 自动降级策略帮助控制预算
- 链路追踪让每一步耗时都可观测
- 建议使用灰度发布,逐步切换流量
下一章:我们将通过一个完整的 RAG 项目实战来串联所有知识点。