混合检索与重排序
High Contrast
Dark Mode
Light Mode
Sepia
Forest
2 min read403 words

混合检索与重排序

仅靠向量检索往往不够。混合检索结合了语义检索和关键词检索的优势,重排序则进一步提升结果的准确性。

为什么需要混合检索?

graph TB A[单一检索的局限] --> B[向量检索] A --> C[关键词检索] B --> B1[擅长语义匹配] B --> B2[不擅长精确匹配] B --> B3[数字/专有名词容易丢失] C --> C1[擅长精确匹配] C --> C2[不擅长语义理解] C --> C3[同义词容易遗漏] B1 --> D[混合检索] C1 --> D D --> E[取长补短
兼顾语义和精确匹配] style D fill:#c8e6c9,stroke:#388e3c,stroke-width:3px style E fill:#c8e6c9,stroke:#388e3c,stroke-width:2px

具体对比

查询示例 向量检索结果 关键词检索结果 混合检索结果
"错误代码 E1001" 可能匹配"异常处理" 精确匹配 E1001 精确 + 相关上下文
"怎么安装这个软件" 匹配"安装指南" 需要关键词"安装" 全面覆盖
"性能很慢" 匹配"性能优化" 可能找不到 语义匹配成功

BM25 关键词检索

BM25(Best Matching 25)是最经典的关键词检索算法:

"""
BM25 关键词检索实现
"""
import math
import re
from collections import Counter
class BM25:
"""BM25 关键词检索"""
def __init__(self, k1: float = 1.5, b: float = 0.75):
"""
Args:
k1: 词频饱和参数(1.2-2.0)
b: 文档长度归一化参数(0-1)
"""
self.k1 = k1
self.b = b
self.documents = []
self.doc_lengths = []
self.avg_doc_length = 0
self.word_doc_freq = {}
self.doc_word_counts = []
def index(self, documents: list[str]) -> None:
"""构建 BM25 索引"""
self.documents = documents
n = len(documents)
for doc in documents:
words = self._tokenize(doc)
self.doc_lengths.append(len(words))
word_count = Counter(words)
self.doc_word_counts.append(word_count)
for word in set(words):
self.word_doc_freq[word] = self.word_doc_freq.get(word, 0) + 1
self.avg_doc_length = sum(self.doc_lengths) / n if n > 0 else 0
print(f"BM25 索引构建完成: {n} 篇文档, "
f"{len(self.word_doc_freq)} 个词汇")
def search(self, query: str, top_k: int = 5) -> list[dict]:
"""检索"""
query_words = self._tokenize(query)
scores = []
n = len(self.documents)
for i, doc in enumerate(self.documents):
score = 0
doc_len = self.doc_lengths[i]
word_count = self.doc_word_counts[i]
for word in query_words:
if word not in self.word_doc_freq:
continue
# IDF
df = self.word_doc_freq[word]
idf = math.log((n - df + 0.5) / (df + 0.5) + 1)
# TF with length normalization
tf = word_count.get(word, 0)
tf_norm = (tf * (self.k1 + 1)) / (
tf + self.k1 * (
1 - self.b + self.b * doc_len / self.avg_doc_length
)
)
score += idf * tf_norm
scores.append({"index": i, "score": score, "doc": doc})
scores.sort(key=lambda x: x["score"], reverse=True)
return scores[:top_k]
def _tokenize(self, text: str) -> list[str]:
"""简单分词(中英文)"""
# 英文:按空格和标点分词
# 中文:按字分词(生产环境建议用 jieba)
words = re.findall(r"\w+", text.lower())
# 中文字符拆分
result = []
for word in words:
if re.search(r"[\u4e00-\u9fff]", word):
result.extend(list(word))
else:
result.append(word)
return result
# 使用
bm25 = BM25()
docs = [
"Python 是一门解释型编程语言,广泛用于数据科学和 Web 开发",
"JavaScript 是前端开发的核心语言,也可用于后端 Node.js",
"错误代码 E1001 表示数据库连接超时,请检查网络配置",
"性能优化包括缓存策略、数据库索引和代码重构",
]
bm25.index(docs)
results = bm25.search("错误代码 E1001", top_k=2)
for r in results:
print(f"  [{r['score']:.2f}] {r['doc'][:50]}...")

混合检索实现

"""
混合检索系统
结合向量检索和 BM25 关键词检索
"""
from dataclasses import dataclass
@dataclass
class SearchResult:
"""检索结果"""
content: str
score: float
source: str
method: str  # "vector" 或 "keyword" 或 "hybrid"
class HybridRetriever:
"""混合检索器"""
def __init__(
self,
vector_store,
bm25_index: BM25,
vector_weight: float = 0.6,
keyword_weight: float = 0.4
):
"""
Args:
vector_weight: 向量检索权重(0-1)
keyword_weight: 关键词检索权重(0-1)
"""
self.vector_store = vector_store
self.bm25 = bm25_index
self.vector_weight = vector_weight
self.keyword_weight = keyword_weight
def search(self, query: str, top_k: int = 5) -> list[SearchResult]:
"""执行混合检索"""
# 1. 向量检索
query_embedding = get_embedding(query)
vector_results = self.vector_store.search(
query_embedding=query_embedding,
top_k=top_k * 2  # 多取一些用于融合
)
# 2. BM25 关键词检索
keyword_results = self.bm25.search(query, top_k=top_k * 2)
# 3. Reciprocal Rank Fusion (RRF)
fused = self._rrf_fusion(vector_results, keyword_results)
return fused[:top_k]
def _rrf_fusion(
self,
vector_results: list,
keyword_results: list,
k: int = 60
) -> list[SearchResult]:
"""
Reciprocal Rank Fusion 排序融合
RRF 公式: score = sum(1 / (k + rank))
其中 k 是常数(通常为 60),rank 是排名
"""
doc_scores = {}
# 向量检索的 RRF 分数
for rank, result in enumerate(vector_results):
content = result.get("content", result.get("doc", ""))
doc_key = content[:100]  # 用前100字符作为key
rrf_score = self.vector_weight / (k + rank + 1)
doc_scores[doc_key] = doc_scores.get(doc_key, 0) + rrf_score
# 关键词检索的 RRF 分数
for rank, result in enumerate(keyword_results):
content = result.get("content", result.get("doc", ""))
doc_key = content[:100]
rrf_score = self.keyword_weight / (k + rank + 1)
doc_scores[doc_key] = doc_scores.get(doc_key, 0) + rrf_score
# 按融合分数排序
sorted_docs = sorted(
doc_scores.items(),
key=lambda x: x[1],
reverse=True
)
return [
SearchResult(
content=doc_key,
score=score,
source="",
method="hybrid"
)
for doc_key, score in sorted_docs
]

重排序(Reranking)

重排序是在初步检索后,使用更精确的模型对结果进行二次排序。

graph LR A[用户查询] --> B[初步检索
Top-20] B --> C[重排序模型] C --> D[精排结果
Top-5] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:2px style C fill:#fff3e0,stroke:#f57c00,stroke-width:2px style D fill:#c8e6c9,stroke:#388e3c,stroke-width:2px

使用 Cohere Reranker

"""
使用 Cohere Reranker 进行重排序
pip install cohere
"""
import cohere
class CohereReranker:
"""Cohere 重排序器"""
def __init__(self, api_key: str):
self.client = cohere.Client(api_key)
def rerank(
self,
query: str,
documents: list[str],
top_k: int = 5
) -> list[dict]:
"""
使用 Cohere Rerank 模型重排序
Args:
query: 用户查询
documents: 待排序的文档列表
top_k: 返回前 K 个结果
"""
response = self.client.rerank(
model="rerank-v3.5",
query=query,
documents=documents,
top_n=top_k,
return_documents=True
)
results = []
for item in response.results:
results.append({
"content": item.document.text,
"relevance_score": item.relevance_score,
"original_index": item.index
})
return results
# 使用
reranker = CohereReranker(api_key="your-api-key")
# 假设初步检索得到 20 个结果
initial_results = [
"Python 支持多种设计模式的实现",
"RAG 系统需要向量数据库支持",
"单例模式确保类只有一个实例",
# ... 更多结果
]
reranked = reranker.rerank(
query="什么是单例模式",
documents=initial_results,
top_k=3
)
for r in reranked:
print(f"  [{r['relevance_score']:.4f}] {r['content']}")

使用开源 Cross-Encoder

"""
使用 Cross-Encoder 本地重排序
适合私有化部署
"""
from sentence_transformers import CrossEncoder
class LocalReranker:
"""本地重排序器"""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
self.model = CrossEncoder(model_name)
def rerank(
self,
query: str,
documents: list[str],
top_k: int = 5
) -> list[dict]:
"""本地重排序"""
# 构建 query-document 对
pairs = [(query, doc) for doc in documents]
# 计算相关性分数
scores = self.model.predict(pairs)
# 排序
scored_docs = [
{"content": doc, "score": float(score), "original_index": i}
for i, (doc, score) in enumerate(zip(documents, scores))
]
scored_docs.sort(key=lambda x: x["score"], reverse=True)
return scored_docs[:top_k]
# 使用
reranker = LocalReranker()
results = reranker.rerank(
query="如何优化数据库查询性能",
documents=initial_results,
top_k=3
)

查询改写

在检索前对用户查询进行改写或扩展,可以提升召回率:

"""
查询改写策略
"""
from openai import OpenAI
client = OpenAI()
def rewrite_query(original_query: str) -> list[str]:
"""
使用 LLM 改写查询
生成多个等价但表述不同的查询
"""
prompt = f"""请将以下查询改写为3个不同的版本,保持原意但使用不同的表述方式。
每个版本一行,不要编号。
原始查询: {original_query}
改写版本:"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
rewrites = response.choices[0].message.content.strip().split("\n")
rewrites = [q.strip() for q in rewrites if q.strip()]
return [original_query] + rewrites
def decompose_query(complex_query: str) -> list[str]:
"""
将复杂问题分解为多个子问题
"""
prompt = f"""请将以下复杂问题分解为2-3个独立的子问题。
每个子问题一行,不要编号。
复杂问题: {complex_query}
子问题:"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
sub_queries = response.choices[0].message.content.strip().split("\n")
return [q.strip() for q in sub_queries if q.strip()]
# 示例
query = "如何在生产环境中优化 RAG 系统的检索速度和准确性?"
# 查询改写
rewrites = rewrite_query(query)
print("改写查询:")
for q in rewrites:
print(f"  - {q}")
# 查询分解
sub_queries = decompose_query(query)
print("\n子问题:")
for q in sub_queries:
print(f"  - {q}")

完整检索管道

"""
完整的检索管道
组合:查询改写 + 混合检索 + 重排序
"""
class RetrievalPipeline:
"""完整检索管道"""
def __init__(self, vector_store, bm25, reranker=None):
self.hybrid = HybridRetriever(vector_store, bm25)
self.reranker = reranker
def retrieve(
self,
query: str,
top_k: int = 5,
use_rewrite: bool = True,
use_rerank: bool = True
) -> list[dict]:
"""
完整检索流程
1. 查询改写(可选)
2. 混合检索
3. 去重
4. 重排序(可选)
"""
# Step 1: 查询改写
queries = [query]
if use_rewrite:
queries = rewrite_query(query)
# Step 2: 对每个查询进行混合检索
all_results = []
for q in queries:
results = self.hybrid.search(q, top_k=top_k * 2)
all_results.extend(results)
# Step 3: 去重
seen = set()
unique_results = []
for r in all_results:
key = r.content[:100]
if key not in seen:
seen.add(key)
unique_results.append(r)
# Step 4: 重排序
if use_rerank and self.reranker:
docs = [r.content for r in unique_results]
reranked = self.reranker.rerank(query, docs, top_k=top_k)
return reranked
return unique_results[:top_k]

本章小结

下一章:我们将学习如何构建高质量的 Prompt 来生成最终回答。