多路召回与融合策略
单一检索方式往往无法覆盖所有相关文档。多路召回(Multi-Route Retrieval)通过并行使用多种检索策略,再将结果融合排序,显著提升检索质量。
多路召回架构
graph TB
A[用户查询] --> B[查询路由]
B --> C[向量检索]
B --> D[关键词检索]
B --> E[知识图谱检索]
B --> F[SQL 结构化检索]
C --> G[结果融合]
D --> G
E --> G
F --> G
G --> H[重排序]
H --> I[Top-K 文档]
style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style B fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style G fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style I fill:#c8e6c9,stroke:#388e3c,stroke-width:3px
检索路由器
"""
智能检索路由器
"""
from dataclasses import dataclass, field
from enum import Enum
from abc import ABC, abstractmethod
class RetrievalMode(Enum):
VECTOR = "vector"
KEYWORD = "keyword"
KNOWLEDGE_GRAPH = "knowledge_graph"
SQL = "sql"
HYBRID = "hybrid"
@dataclass
class RetrievalResult:
"""检索结果"""
doc_id: str
content: str
score: float
source: RetrievalMode
metadata: dict = field(default_factory=dict)
class BaseRetriever(ABC):
"""检索器基类"""
@abstractmethod
def search(self, query: str, top_k: int = 5) -> list[RetrievalResult]:
...
class VectorRetriever(BaseRetriever):
"""向量检索器"""
def __init__(self, vector_store, embed_client):
self.store = vector_store
self.embedder = embed_client
def search(self, query: str, top_k: int = 5) -> list[RetrievalResult]:
query_vector = self.embedder.embed(query)
results = self.store.similarity_search(query_vector, top_k=top_k)
return [
RetrievalResult(
doc_id=r["id"],
content=r["content"],
score=r["score"],
source=RetrievalMode.VECTOR,
)
for r in results
]
class KeywordRetriever(BaseRetriever):
"""关键词检索器(BM25)"""
def __init__(self, bm25_index):
self.index = bm25_index
def search(self, query: str, top_k: int = 5) -> list[RetrievalResult]:
results = self.index.search(query, top_k=top_k)
return [
RetrievalResult(
doc_id=r["id"],
content=r["content"],
score=r["score"],
source=RetrievalMode.KEYWORD,
)
for r in results
]
Reciprocal Rank Fusion(RRF)
RRF 是最常用的多路结果融合算法,简单高效,不需要分数归一化。
graph LR
A[向量检索排名] --> C[RRF 融合]
B[关键词检索排名] --> C
C --> D["score = Σ 1/(k + rank)"]
D --> E[统一排名]
style C fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style E fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
"""
Reciprocal Rank Fusion 实现
"""
from collections import defaultdict
class RRFFusion:
"""RRF 融合排序"""
def __init__(self, k: int = 60):
"""
Args:
k: RRF 常数,通常为 60
"""
self.k = k
def fuse(self, result_lists: list[list[RetrievalResult]]) -> list[RetrievalResult]:
"""
融合多路检索结果
Args:
result_lists: 多个检索器返回的结果列表
Returns:
融合排序后的结果
"""
# 计算每个文档的 RRF 分数
rrf_scores: dict[str, float] = defaultdict(float)
doc_map: dict[str, RetrievalResult] = {}
for results in result_lists:
for rank, result in enumerate(results):
rrf_scores[result.doc_id] += 1.0 / (self.k + rank + 1)
if result.doc_id not in doc_map:
doc_map[result.doc_id] = result
# 按 RRF 分数降序排列
sorted_ids = sorted(rrf_scores, key=rrf_scores.get, reverse=True)
fused_results = []
for doc_id in sorted_ids:
result = doc_map[doc_id]
result.score = rrf_scores[doc_id]
fused_results.append(result)
return fused_results
加权融合策略
当不同检索器质量差异较大时,可以使用加权融合。
"""
加权分数融合
"""
from dataclasses import dataclass
@dataclass
class RetrieverWeight:
"""检索器权重配置"""
mode: RetrievalMode
weight: float
min_score: float = 0.0
class WeightedFusion:
"""加权分数融合"""
def __init__(self, weights: list[RetrieverWeight]):
self.weights = {w.mode: w for w in weights}
def normalize_score(self, scores: list[float]) -> list[float]:
"""Min-Max 归一化"""
if not scores:
return []
min_s = min(scores)
max_s = max(scores)
if max_s == min_s:
return [1.0] * len(scores)
return [(s - min_s) / (max_s - min_s) for s in scores]
def fuse(self, result_lists: list[list[RetrievalResult]]) -> list[RetrievalResult]:
"""加权融合"""
doc_scores: dict[str, float] = defaultdict(float)
doc_map: dict[str, RetrievalResult] = {}
for results in result_lists:
if not results:
continue
source_mode = results[0].source
weight_config = self.weights.get(source_mode)
weight = weight_config.weight if weight_config else 1.0
min_score = weight_config.min_score if weight_config else 0.0
# 归一化分数
raw_scores = [r.score for r in results]
norm_scores = self.normalize_score(raw_scores)
for result, norm_score in zip(results, norm_scores):
if norm_score >= min_score:
doc_scores[result.doc_id] += norm_score * weight
if result.doc_id not in doc_map:
doc_map[result.doc_id] = result
sorted_ids = sorted(doc_scores, key=doc_scores.get, reverse=True)
fused = []
for doc_id in sorted_ids:
result = doc_map[doc_id]
result.score = doc_scores[doc_id]
fused.append(result)
return fused
融合策略对比
| 策略 | 原理 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|---|
| RRF | 基于排名倒数求和 | 无需分数归一化,鲁棒 | 忽略分数绝对值 | 通用场景 |
| 加权融合 | 归一化后加权求和 | 可调整检索器贡献 | 需要调参 | 检索器质量差异大 |
| 交叉编码器 | Cross-Encoder 重排 | 精确度最高 | 计算开销大 | 高精度要求 |
| LLM 重排 | 让 LLM 判断相关性 | 语义理解最强 | 成本高、延迟大 | 低频高价值场景 |
完整多路召回管道
"""
完整多路召回管道
"""
from dataclasses import dataclass
@dataclass
class MultiRouteConfig:
"""多路召回配置"""
vector_top_k: int = 10
keyword_top_k: int = 10
final_top_k: int = 5
fusion_method: str = "rrf"
rrf_k: int = 60
class MultiRouteRetriever:
"""多路召回检索器"""
def __init__(
self,
retrievers: list[BaseRetriever],
config: MultiRouteConfig | None = None,
):
self.retrievers = retrievers
self.config = config or MultiRouteConfig()
self.fusion = RRFFusion(k=self.config.rrf_k)
def search(self, query: str) -> list[RetrievalResult]:
"""多路并行检索 + 融合"""
all_results = []
for retriever in self.retrievers:
results = retriever.search(query, top_k=self.config.vector_top_k)
all_results.append(results)
print(f" {type(retriever).__name__}: {len(results)} 条结果")
# 融合
fused = self.fusion.fuse(all_results)
final = fused[: self.config.final_top_k]
print(f" 融合后保留 {len(final)} 条")
return final
本章小结
| 主题 | 要点 |
|---|---|
| 多路召回 | 向量 + 关键词 + 知识图谱并行检索 |
| RRF | 基于排名倒数融合,无需归一化 |
| 加权融合 | Min-Max 归一化后按权重合并 |
| 路由策略 | 根据查询类型选择检索路径 |
| 实战建议 | 先 RRF 验证效果,再尝试加权调优 |
下一章:Graph RAG