性能优化技术
LLM 推理延迟是生产环境最大的痛点。本章涵盖从应用层到模型层的全链路优化。
延迟分析
graph LR
A[用户请求] --> B[网络传输
50-200ms] B --> C[Prompt处理
100-500ms] C --> D[模型推理
500-5000ms] D --> E[输出生成
按token线性增长] E --> F[后处理
10-50ms] style D fill:#ffcdd2,stroke:#c62828,stroke-width:2px
50-200ms] B --> C[Prompt处理
100-500ms] C --> D[模型推理
500-5000ms] D --> E[输出生成
按token线性增长] E --> F[后处理
10-50ms] style D fill:#ffcdd2,stroke:#c62828,stroke-width:2px
流式响应
"""
流式响应 - 降低首字延迟 (Time to First Token)
"""
import asyncio
from typing import AsyncGenerator
class StreamingLLM:
"""流式 LLM 响应"""
async def stream_generate(
self, prompt: str
) -> AsyncGenerator[str, None]:
"""
流式生成文本
优势:
- TTFT (首字延迟) 降低 80%
- 用户感知延迟大幅改善
- 内存占用更低
"""
# 模拟逐 token 生成
tokens = ["这是", "一段", "流式", "生成", "的", "文本", "。"]
for token in tokens:
await asyncio.sleep(0.05) # 模拟生成延迟
yield token
async def stream_with_timeout(
self,
prompt: str,
timeout: float = 30.0,
) -> AsyncGenerator[str, None]:
"""带超时的流式生成"""
try:
async with asyncio.timeout(timeout):
async for token in self.stream_generate(prompt):
yield token
except asyncio.TimeoutError:
yield "\n[生成超时]"
# FastAPI 流式端点示例结构
STREAMING_ENDPOINT = """
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/v1/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
async for token in llm.stream_generate(request.prompt):
# SSE 格式
yield f"data: {json.dumps({'token': token})}\\n\\n"
yield "data: [DONE]\\n\\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
)
"""
批处理优化
"""
请求批处理 - 提升吞吐量
"""
import asyncio
import time
from dataclasses import dataclass
@dataclass
class BatchRequest:
"""批处理请求"""
id: str
prompt: str
future: asyncio.Future = None
class DynamicBatcher:
"""动态批处理器"""
def __init__(
self,
max_batch_size: int = 8,
max_wait_ms: float = 100,
):
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.queue: list[BatchRequest] = []
self._lock = asyncio.Lock()
async def submit(self, request_id: str, prompt: str) -> str:
"""提交请求"""
loop = asyncio.get_event_loop()
future = loop.create_future()
req = BatchRequest(id=request_id, prompt=prompt, future=future)
async with self._lock:
self.queue.append(req)
# 批满或超时触发处理
if len(self.queue) >= self.max_batch_size:
batch = self.queue[:]
self.queue.clear()
asyncio.create_task(self._process_batch(batch))
return await future
async def _process_batch(self, batch: list[BatchRequest]) -> None:
"""处理一个批次"""
print(f" 处理批次: {len(batch)} 个请求")
start = time.time()
# 批量推理(实际中发送给 vLLM 等支持批处理的引擎)
results = []
for req in batch:
result = f"Batch response for {req.id}"
results.append(result)
elapsed = (time.time() - start) * 1000
print(f" 批次完成: {elapsed:.0f}ms")
# 返回结果
for req, result in zip(batch, results):
if not req.future.done():
req.future.set_result(result)
async def flush(self) -> None:
"""刷新剩余请求"""
async with self._lock:
if self.queue:
batch = self.queue[:]
self.queue.clear()
await self._process_batch(batch)
# 使用
batcher = DynamicBatcher(max_batch_size=4)
语义缓存
"""
语义缓存 - 减少重复请求
"""
import hashlib
import time
from typing import Optional
class SemanticCache:
"""语义缓存"""
def __init__(
self,
max_size: int = 10000,
ttl_seconds: float = 3600,
similarity_threshold: float = 0.85,
):
self.max_size = max_size
self.ttl = ttl_seconds
self.threshold = similarity_threshold
self.cache: dict[str, dict] = {}
self.hits = 0
self.misses = 0
def _hash_key(self, text: str) -> str:
"""生成缓存键"""
normalized = text.strip().lower()
return hashlib.md5(normalized.encode()).hexdigest()
def get(self, query: str) -> Optional[str]:
"""查询缓存"""
key = self._hash_key(query)
if key in self.cache:
entry = self.cache[key]
# 检查 TTL
if time.time() - entry["timestamp"] < self.ttl:
self.hits += 1
return entry["response"]
else:
del self.cache[key]
self.misses += 1
return None
def set(self, query: str, response: str) -> None:
"""设置缓存"""
# 淘汰旧条目
if len(self.cache) >= self.max_size:
oldest_key = min(
self.cache, key=lambda k: self.cache[k]["timestamp"]
)
del self.cache[oldest_key]
key = self._hash_key(query)
self.cache[key] = {
"query": query,
"response": response,
"timestamp": time.time(),
}
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0
def stats(self) -> dict:
return {
"size": len(self.cache),
"hits": self.hits,
"misses": self.misses,
"hit_rate": f"{self.hit_rate:.1%}",
}
# 使用
cache = SemanticCache(ttl_seconds=1800)
# 场景模拟
queries = [
"什么是机器学习?",
"什么是机器学习?", # 完全匹配命中
"机器学习是什么?", # 需要语义匹配
"Python 怎么学?",
"Python 怎么学?", # 命中
]
for q in queries:
result = cache.get(q)
if result:
print(f" 缓存命中: {q} -> {result[:30]}...")
else:
response = f"关于「{q}」的回答..."
cache.set(q, response)
print(f" 缓存未命中: {q}")
print(f"\n缓存统计: {cache.stats()}")
Prompt 压缩
"""
Prompt 压缩 - 减少 token 消耗
"""
class PromptCompressor:
"""Prompt 压缩器"""
def __init__(self):
# 常见冗余模式
self.compression_rules = [
# (模式, 替换)
("请你", ""),
("请注意", "注意"),
("你需要", ""),
("以下是", ""),
("首先,", "1."),
("其次,", "2."),
("最后,", "3."),
("\n\n\n", "\n\n"),
(" ", " "),
]
def compress(self, prompt: str) -> tuple[str, dict]:
"""压缩 Prompt"""
original_len = len(prompt)
compressed = prompt
for pattern, replacement in self.compression_rules:
compressed = compressed.replace(pattern, replacement)
compressed = compressed.strip()
new_len = len(compressed)
stats = {
"original_chars": original_len,
"compressed_chars": new_len,
"reduction": f"{(1 - new_len / original_len) * 100:.1f}%",
"original_tokens_est": original_len // 2, # 中文约2字1token
"compressed_tokens_est": new_len // 2,
}
return compressed, stats
def truncate_context(
self,
context: str,
max_tokens: int = 2000,
strategy: str = "tail",
) -> str:
"""截断上下文"""
estimated_chars = max_tokens * 2 # 粗略估计
if len(context) <= estimated_chars:
return context
if strategy == "tail":
return "..." + context[-estimated_chars:]
elif strategy == "head":
return context[:estimated_chars] + "..."
elif strategy == "middle":
half = estimated_chars // 2
return context[:half] + "\n...\n" + context[-half:]
return context[:estimated_chars]
# 使用
compressor = PromptCompressor()
verbose_prompt = """
请你帮我分析以下是一段代码,你需要首先,理解代码的功能,其次,找出潜在的问题,最后,给出改进建议。
请注意代码的性能和可读性。
"""
compressed, stats = compressor.compress(verbose_prompt)
print(f"原始: {stats['original_chars']} 字符")
print(f"压缩后: {stats['compressed_chars']} 字符")
print(f"压缩率: {stats['reduction']}")
优化效果对比
| 优化技术 | 延迟降低 | 成本降低 | 实现难度 | 优先级 |
|---|---|---|---|---|
| 流式响应 | TTFT -80% | 0% | 低 | ⭐⭐⭐ |
| 语义缓存 | -40~70% | -40~70% | 中 | ⭐⭐⭐ |
| 批处理 | 吞吐+3x | -30% | 中 | ⭐⭐ |
| Prompt 压缩 | -10~20% | -10~30% | 低 | ⭐⭐ |
| 模型量化 | +20% | 显存-50% | 高 | ⭐ |
本章小结
- 流式响应是用户体验的第一优化项
- 语义缓存是成本和延迟的双重优化
- 批处理在高并发场景下提升吞吐量
- Prompt 压缩低成本但有效
- 优化要按优先级顺序进行,避免过早优化
下一章:成本优化策略。