系统架构设计
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read222 words

系统架构设计

生产级 LLM 系统的架构决定了系统的上限。好的架构让后续优化事半功倍。

架构全景

graph TB subgraph 客户端 A[Web/Mobile/API] end subgraph API网关 B[认证] --> C[限流] C --> D[路由] end subgraph 应用层 E[LLM 服务] F[缓存层] G[RAG 检索] end subgraph 模型层 H[OpenAI API] I[自托管模型] J[Fallback 模型] end subgraph 基础设施 K[Redis] L[PostgreSQL] M[向量数据库] N[消息队列] end subgraph 可观测性 O[Prometheus] P[Grafana] Q[日志系统] end A --> B D --> E E --> F E --> G F --> K G --> M E --> H E --> I E --> J E --> L E --> N E --> O O --> P style E fill:#e3f2fd,stroke:#1976d2,stroke-width:2px

FastAPI 生产级服务

"""
生产级 LLM API 服务
"""
import asyncio
import time
import hashlib
import json
from dataclasses import dataclass, field
from collections import defaultdict
# === 限流器 ===
class RateLimiter:
"""令牌桶限流"""
def __init__(self, max_tokens: int = 60, refill_rate: float = 1.0):
self.max_tokens = max_tokens
self.refill_rate = refill_rate    # 每秒补充的令牌数
self.buckets: dict[str, dict] = {}
def allow(self, key: str) -> bool:
"""检查是否允许请求"""
now = time.time()
if key not in self.buckets:
self.buckets[key] = {
"tokens": self.max_tokens,
"last_refill": now,
}
bucket = self.buckets[key]
# 补充令牌
elapsed = now - bucket["last_refill"]
refill = elapsed * self.refill_rate
bucket["tokens"] = min(self.max_tokens, bucket["tokens"] + refill)
bucket["last_refill"] = now
# 消耗令牌
if bucket["tokens"] >= 1:
bucket["tokens"] -= 1
return True
return False
# === 断路器 ===
class CircuitBreaker:
"""断路器模式"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure_time = 0
self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
def can_execute(self) -> bool:
"""检查是否可以执行"""
if self.state == "CLOSED":
return True
if self.state == "OPEN":
# 检查是否已过恢复时间
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
return True
return False
# HALF_OPEN: 允许一个试探请求
return True
def record_success(self) -> None:
"""记录成功"""
self.failures = 0
self.state = "CLOSED"
def record_failure(self) -> None:
"""记录失败"""
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
print(f"⚡ 断路器打开!失败次数: {self.failures}")
# === 重试策略 ===
class RetryStrategy:
"""指数退避重试"""
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
async def execute(self, func, *args, **kwargs):
"""执行带重试的函数"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < self.max_retries:
delay = min(
self.base_delay * (2 ** attempt),
self.max_delay,
)
print(f"  重试 {attempt + 1}/{self.max_retries},等待 {delay}s")
await asyncio.sleep(delay)
raise last_exception
# === 多模型 Fallback ===
@dataclass
class ModelConfig:
"""模型配置"""
name: str
provider: str          # openai, anthropic, local
model_id: str
max_tokens: int = 4096
temperature: float = 0.7
priority: int = 1      # 1 = 最高优先级
cost_per_1k_tokens: float = 0.01
class ModelRouter:
"""多模型路由与 Fallback"""
def __init__(self):
self.models: list[ModelConfig] = []
self.circuit_breakers: dict[str, CircuitBreaker] = {}
self.retry = RetryStrategy(max_retries=2)
def register_model(self, config: ModelConfig) -> None:
"""注册模型"""
self.models.append(config)
self.circuit_breakers[config.name] = CircuitBreaker()
# 按优先级排序
self.models.sort(key=lambda m: m.priority)
async def generate(self, prompt: str) -> dict:
"""生成回复,支持自动 Fallback"""
for model in self.models:
breaker = self.circuit_breakers[model.name]
if not breaker.can_execute():
print(f"  ⏭ 跳过 {model.name}(断路器打开)")
continue
try:
result = await self._call_model(model, prompt)
breaker.record_success()
return {
"content": result,
"model": model.name,
"provider": model.provider,
}
except Exception as e:
breaker.record_failure()
print(f"  ❌ {model.name} 失败: {e}")
continue
raise RuntimeError("所有模型均不可用")
async def _call_model(
self, model: ModelConfig, prompt: str
) -> str:
"""调用模型(实际中接入真实 API)"""
# 模拟 API 调用
await asyncio.sleep(0.1)
return f"[{model.name}] Response to: {prompt[:50]}..."
# 使用
router = ModelRouter()
router.register_model(ModelConfig(
name="gpt-4o", provider="openai",
model_id="gpt-4o", priority=1,
cost_per_1k_tokens=0.005,
))
router.register_model(ModelConfig(
name="gpt-4o-mini", provider="openai",
model_id="gpt-4o-mini", priority=2,
cost_per_1k_tokens=0.00015,
))
router.register_model(ModelConfig(
name="local-llama", provider="local",
model_id="llama-3-8b", priority=3,
cost_per_1k_tokens=0.0,
))
# asyncio.run(router.generate("Hello"))

高可用设计

"""
高可用设计模式
"""
class HealthChecker:
"""健康检查"""
def __init__(self):
self.checks: dict[str, callable] = {}
def register(self, name: str, check_fn) -> None:
self.checks[name] = check_fn
def run_all(self) -> dict:
"""运行所有健康检查"""
results = {}
for name, check_fn in self.checks.items():
try:
check_fn()
results[name] = {"status": "healthy"}
except Exception as e:
results[name] = {
"status": "unhealthy",
"error": str(e),
}
overall = all(
r["status"] == "healthy" for r in results.values()
)
return {
"status": "healthy" if overall else "degraded",
"checks": results,
}
# 健康检查示例
health = HealthChecker()
health.register("redis", lambda: True)  # 检查 Redis 连接
health.register("database", lambda: True)  # 检查数据库
health.register("model_api", lambda: True)  # 检查模型 API
print(health.run_all())

负载均衡策略

策略 适用场景 优点 缺点
轮询 同质服务器 简单 不考虑负载
加权轮询 异构服务器 灵活 需手动配置
最少连接 长连接场景 智能 有延迟
一致性哈希 缓存亲和 缓存命中高 迁移成本

本章小结

下一章:模型部署与平台选型。