生产实战案例
两个完整的生产级 LLM 系统案例,综合运用前面所有章节的技术。
案例一:千万级用户对话系统
graph TB
subgraph 用户层
A[Web/Mobile 客户端]
end
subgraph 网关层
B[Nginx + CDN]
C[API Gateway]
D[认证 + 限流]
end
subgraph 服务层
E[对话服务 x3]
F[RAG 检索服务]
G[安全审核服务]
end
subgraph 模型层
H[GPT-4o-mini 主力]
I[GPT-4o 复杂任务]
J[本地模型 Fallback]
end
subgraph 数据层
K[Redis 缓存集群]
L[PostgreSQL 会话存储]
M[Qdrant 向量库]
end
subgraph 运维层
N[Prometheus]
O[Grafana]
P[PagerDuty]
end
A --> B --> C --> D
D --> E
E --> F --> M
E --> G
E --> H
E --> I
E --> J
E --> K
E --> L
N --> O --> P
style E fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
架构实现
"""
案例:高并发对话系统
"""
import asyncio
import time
import hashlib
from dataclasses import dataclass, field
@dataclass
class ConversationMessage:
"""对话消息"""
role: str
content: str
timestamp: float = field(default_factory=time.time)
class ConversationManager:
"""会话管理器"""
def __init__(self, max_history: int = 20):
self.max_history = max_history
self.sessions: dict[str, list[ConversationMessage]] = {}
def add_message(
self, session_id: str, role: str, content: str
) -> None:
if session_id not in self.sessions:
self.sessions[session_id] = []
self.sessions[session_id].append(
ConversationMessage(role=role, content=content)
)
# 保持窗口大小
if len(self.sessions[session_id]) > self.max_history:
self.sessions[session_id] = self.sessions[session_id][-self.max_history:]
def get_history(self, session_id: str) -> list[dict]:
"""获取对话历史"""
messages = self.sessions.get(session_id, [])
return [{"role": m.role, "content": m.content} for m in messages]
def summarize_old_history(
self, session_id: str, keep_recent: int = 5
) -> str:
"""
压缩旧对话历史
策略:保留最近 N 轮,之前的做摘要
"""
messages = self.sessions.get(session_id, [])
if len(messages) <= keep_recent:
return ""
old = messages[:-keep_recent]
summary_parts = []
for m in old:
summary_parts.append(f"{m.role}: {m.content[:50]}")
return "之前的对话摘要: " + "; ".join(summary_parts)
class ChatService:
"""对话服务"""
def __init__(self):
self.conversation_mgr = ConversationManager()
self.request_count = 0
self.total_latency = 0
async def handle_message(
self,
session_id: str,
user_message: str,
) -> dict:
"""处理用户消息"""
start = time.time()
self.request_count += 1
# 1. 输入安全检查
if self._check_safety(user_message):
return {"error": "不安全的输入"}
# 2. 获取历史
history = self.conversation_mgr.get_history(session_id)
# 3. 记录用户消息
self.conversation_mgr.add_message(
session_id, "user", user_message
)
# 4. 生成回复
response = await self._generate_response(
user_message, history
)
# 5. 记录助手回复
self.conversation_mgr.add_message(
session_id, "assistant", response
)
latency = (time.time() - start) * 1000
self.total_latency += latency
return {
"response": response,
"session_id": session_id,
"latency_ms": round(latency, 1),
}
def _check_safety(self, text: str) -> bool:
"""安全检查"""
unsafe_keywords = ["忽略指令", "ignore instructions"]
return any(kw in text.lower() for kw in unsafe_keywords)
async def _generate_response(
self, message: str, history: list
) -> str:
"""生成回复(模拟)"""
await asyncio.sleep(0.1) # 模拟 LLM 调用
return f"收到您的消息:{message[:30]}。这是一个模拟回复。"
def get_stats(self) -> dict:
return {
"total_requests": self.request_count,
"avg_latency_ms": round(
self.total_latency / max(self.request_count, 1), 1
),
"active_sessions": len(
self.conversation_mgr.sessions
),
}
# 运行
service = ChatService()
async def simulate():
for i in range(5):
result = await service.handle_message(
session_id="session_001",
user_message=f"这是第 {i+1} 条测试消息",
)
print(f" 回复: {result['response'][:40]}... ({result['latency_ms']}ms)")
print(f"\n服务统计: {service.get_stats()}")
asyncio.run(simulate())
案例二:企业知识库问答
"""
案例:企业知识库问答系统
"""
class EnterpriseRAGService:
"""企业 RAG 问答服务"""
def __init__(self):
self.access_control = {}
self.audit_log = []
def configure_access(
self,
department: str,
allowed_collections: list[str],
) -> None:
"""配置部门访问权限"""
self.access_control[department] = allowed_collections
async def query(
self,
user_id: str,
department: str,
question: str,
) -> dict:
"""知识库查询"""
start = time.time()
# 1. 权限检查
allowed = self.access_control.get(department, [])
if not allowed:
return {"error": "无权限访问"}
# 2. 检索相关文档
docs = self._retrieve(question, allowed)
# 3. 生成回答
answer = self._generate(question, docs)
# 4. 输出安全检查
safe = self._check_output(answer)
# 5. 审计日志
self.audit_log.append({
"user": user_id,
"department": department,
"question": question[:100],
"docs_used": len(docs),
"safe": safe,
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
})
latency = (time.time() - start) * 1000
return {
"answer": answer if safe else "无法回答此问题",
"sources": [d["title"] for d in docs],
"confidence": 0.85,
"latency_ms": round(latency, 1),
}
def _retrieve(
self, query: str, collections: list[str]
) -> list[dict]:
"""检索文档"""
# 模拟检索
return [
{"title": f"文档-{c}-1", "content": f"关于{query}的内容..."}
for c in collections[:2]
]
def _generate(self, question: str, docs: list) -> str:
"""生成回答"""
context = "\n".join(d["content"] for d in docs)
return f"根据知识库,{question}的答案是:{context[:100]}"
def _check_output(self, output: str) -> bool:
"""输出安全检查"""
# 检查是否包含敏感信息
sensitive = ["密码", "token", "secret"]
return not any(s in output.lower() for s in sensitive)
# 使用
rag_service = EnterpriseRAGService()
rag_service.configure_access("技术部", ["技术文档", "API文档"])
rag_service.configure_access("销售部", ["产品手册", "FAQ"])
# 模拟查询
async def demo():
result = await rag_service.query(
user_id="user_001",
department="技术部",
question="如何部署微服务?",
)
print(f"回答: {result['answer'][:80]}...")
print(f"来源: {result['sources']}")
print(f"延迟: {result['latency_ms']}ms")
asyncio.run(demo())
运维经验总结
上线检查清单
GO_LIVE_CHECKLIST = {
"Week -4": [
"确定 SLA 目标",
"完成安全审查",
"准备监控面板",
],
"Week -2": [
"压力测试通过",
"制定回滚方案",
"配置告警规则",
"准备故障预案",
],
"Week -1": [
"灰度 10% 验证",
"On-call 安排",
"文档更新",
],
"Go Live": [
"灰度 → 50% → 100%",
"实时监控",
"随时准备回滚",
],
"Week +1": [
"收集用户反馈",
"优化告警阈值",
"总结上线报告",
],
}
print("=== 上线检查清单 ===")
for phase, items in GO_LIVE_CHECKLIST.items():
print(f"\n{phase}:")
for item in items:
print(f" [ ] {item}")
核心指标基准
| 指标 | 优秀 | 良好 | 需改进 |
|---|---|---|---|
| P50 延迟 | < 500ms | < 1s | > 2s |
| P99 延迟 | < 2s | < 5s | > 10s |
| 错误率 | < 0.1% | < 1% | > 5% |
| 缓存命中率 | > 50% | > 30% | < 10% |
| 可用性 | 99.99% | 99.9% | < 99% |
| 成本/请求 | < $0.005 | < $0.01 | > $0.05 |
本书总结
graph LR
A[Demo] --> B[架构设计]
B --> C[部署优化]
C --> D[成本控制]
D --> E[监控运维]
E --> F[安全合规]
F --> G[持续迭代]
G --> E
style A fill:#ffcdd2,stroke:#c62828
style G fill:#c8e6c9,stroke:#388e3c
从 Demo 到生产的关键心得:
- 先跑起来,再优化——不要过早优化
- 监控先行——看不到的问题无法修复
- 成本意识——80% 的请求不需要最强模型
- 缓存为王——最快的 LLM 调用是不调用
- 安全兜底——Prompt 注入防护是必须的
- 灰度发布——每次变更都控制爆炸半径
- 故障复盘——每次事故都是学习机会