向量索引构建
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read220 words

向量索引构建

将文档切分后,下一步是创建向量索引。本章将详细讲解如何构建高效的向量索引系统。

索引构建流程

graph LR A[文档块] --> B[Embedding 编码] B --> C[创建索引] C --> D[存入数据库] D --> E[元数据关联] E --> F[索引验证] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:2px style F fill:#c8e6c9,stroke:#388e3c,stroke-width:2px

完整的索引构建系统

"""
向量索引构建系统
支持批量处理、进度显示、错误重试
"""
import time
import json
from pathlib import Path
from dataclasses import dataclass, field
from openai import OpenAI
@dataclass
class ChunkWithEmbedding:
"""带向量的文档块"""
chunk_id: str
content: str
embedding: list[float]
metadata: dict = field(default_factory=dict)
class IndexBuilder:
"""向量索引构建器"""
def __init__(
self,
embedding_model: str = "text-embedding-3-small",
batch_size: int = 100,
max_retries: int = 3
):
self.client = OpenAI()
self.model = embedding_model
self.batch_size = batch_size
self.max_retries = max_retries
def build_index(self, chunks: list[dict]) -> list[ChunkWithEmbedding]:
"""
为文档块批量创建向量索引
Args:
chunks: 文档块列表,每个包含 content 和 metadata
Returns:
带向量的文档块列表
"""
results = []
total = len(chunks)
print(f"开始构建索引: {total} 个文档块")
print(f"使用模型: {self.model}")
print(f"批次大小: {self.batch_size}")
# 分批处理
for i in range(0, total, self.batch_size):
batch = chunks[i:i + self.batch_size]
batch_num = i // self.batch_size + 1
total_batches = (total + self.batch_size - 1) // self.batch_size
print(f"  处理批次 {batch_num}/{total_batches}...")
# 获取 Embedding
texts = [c["content"] for c in batch]
embeddings = self._get_embeddings_with_retry(texts)
# 组装结果
for j, (chunk, emb) in enumerate(zip(batch, embeddings)):
result = ChunkWithEmbedding(
chunk_id=f"chunk_{i + j}",
content=chunk["content"],
embedding=emb,
metadata=chunk.get("metadata", {})
)
results.append(result)
print(f"索引构建完成: {len(results)} 个向量")
return results
def _get_embeddings_with_retry(
self, texts: list[str]
) -> list[list[float]]:
"""带重试的 Embedding 获取"""
for attempt in range(self.max_retries):
try:
response = self.client.embeddings.create(
model=self.model,
input=texts
)
return [item.embedding for item in response.data]
except Exception as e:
if attempt < self.max_retries - 1:
wait = 2 ** attempt  # 指数退避
print(f"    重试 ({attempt + 1}/{self.max_retries}),等待 {wait}s...")
time.sleep(wait)
else:
raise RuntimeError(f"Embedding 失败: {e}")
# 使用示例
builder = IndexBuilder(batch_size=50)
chunks = [
{"content": "RAG 是检索增强生成的缩写", "metadata": {"source": "ch1.md"}},
{"content": "向量数据库用于存储高维向量", "metadata": {"source": "ch2.md"}},
# ... 更多文档块
]
indexed = builder.build_index(chunks)

存入向量数据库

使用 ChromaDB

"""
将索引存入 ChromaDB
适合本地开发和中小规模项目
"""
import chromadb
from chromadb.config import Settings
class ChromaStore:
"""ChromaDB 向量存储"""
def __init__(self, persist_dir: str = "./chroma_db"):
self.client = chromadb.PersistentClient(path=persist_dir)
def create_collection(self, name: str) -> None:
"""创建或获取集合"""
self.collection = self.client.get_or_create_collection(
name=name,
metadata={"hnsw:space": "cosine"}
)
print(f"集合 '{name}' 准备就绪")
def add_documents(self, indexed_chunks: list) -> None:
"""批量添加文档"""
self.collection.add(
ids=[c.chunk_id for c in indexed_chunks],
embeddings=[c.embedding for c in indexed_chunks],
documents=[c.content for c in indexed_chunks],
metadatas=[c.metadata for c in indexed_chunks]
)
print(f"已添加 {len(indexed_chunks)} 个文档到集合")
def search(self, query_embedding: list[float], top_k: int = 5) -> dict:
"""向量检索"""
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
include=["documents", "metadatas", "distances"]
)
return results
def count(self) -> int:
"""获取文档数量"""
return self.collection.count()
# 使用
store = ChromaStore("./my_knowledge_base")
store.create_collection("documents")
store.add_documents(indexed)
print(f"数据库中共有 {store.count()} 个文档块")

使用 Qdrant

"""
将索引存入 Qdrant
适合生产环境,性能更好
"""
from qdrant_client import QdrantClient
from qdrant_client.models import (
VectorParams, Distance, PointStruct,
Filter, FieldCondition, MatchValue
)
class QdrantStore:
"""Qdrant 向量存储"""
def __init__(self, url: str = "localhost", port: int = 6333):
self.client = QdrantClient(url=url, port=port)
def create_collection(
self, name: str, vector_size: int = 1536
) -> None:
"""创建集合"""
self.collection_name = name
self.client.recreate_collection(
collection_name=name,
vectors_config=VectorParams(
size=vector_size,
distance=Distance.COSINE
)
)
print(f"Qdrant 集合 '{name}' 已创建")
def add_documents(self, indexed_chunks: list) -> None:
"""批量添加文档"""
points = [
PointStruct(
id=i,
vector=chunk.embedding,
payload={
"content": chunk.content,
"chunk_id": chunk.chunk_id,
**chunk.metadata
}
)
for i, chunk in enumerate(indexed_chunks)
]
# 分批上传
batch_size = 100
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
self.client.upsert(
collection_name=self.collection_name,
points=batch
)
print(f"已上传 {len(points)} 个向量到 Qdrant")
def search(
self,
query_vector: list[float],
top_k: int = 5,
filter_conditions: dict = None
) -> list[dict]:
"""向量检索,支持过滤"""
search_filter = None
if filter_conditions:
conditions = [
FieldCondition(
key=key,
match=MatchValue(value=value)
)
for key, value in filter_conditions.items()
]
search_filter = Filter(must=conditions)
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
limit=top_k,
query_filter=search_filter,
)
return [
{
"content": hit.payload["content"],
"score": hit.score,
"metadata": {
k: v for k, v in hit.payload.items()
if k != "content"
}
}
for hit in results
]
# 使用
store = QdrantStore()
store.create_collection("knowledge_base", vector_size=1536)
store.add_documents(indexed)
# 检索
results = store.search(
query_vector=query_embedding,
top_k=3,
filter_conditions={"source": "ch1.md"}
)

索引优化策略

元数据设计

好的元数据设计可以大幅提升检索精度:

"""
元数据设计最佳实践
"""
# 推荐的元数据字段
metadata_schema = {
# 来源信息
"source": "docs/api-reference.md",
"title": "API 参考文档",
"url": "https://docs.example.com/api",
# 结构信息
"section": "认证",
"subsection": "OAuth 2.0",
"page": 15,
# 分类信息
"category": "技术文档",
"language": "zh",
"version": "v2.0",
# 时间信息
"created_at": "2026-01-15",
"updated_at": "2026-02-20",
# 自定义标签
"tags": ["api", "authentication", "oauth"],
}

增量更新

"""
增量更新索引
只处理新增和修改的文档,避免全量重建
"""
import hashlib
class IncrementalIndexer:
"""增量索引更新器"""
def __init__(self, store, builder):
self.store = store
self.builder = builder
self.hash_cache = {}  # 文件哈希缓存
def update(self, documents: list[dict]) -> dict:
"""增量更新索引"""
stats = {"added": 0, "updated": 0, "skipped": 0}
for doc in documents:
doc_hash = self._compute_hash(doc["content"])
source = doc.get("metadata", {}).get("source", "")
if source in self.hash_cache:
if self.hash_cache[source] == doc_hash:
stats["skipped"] += 1
continue
else:
# 内容已更新,删除旧索引
self._remove_by_source(source)
stats["updated"] += 1
else:
stats["added"] += 1
# 创建新索引
indexed = self.builder.build_index([doc])
self.store.add_documents(indexed)
self.hash_cache[source] = doc_hash
print(f"索引更新完成: "
f"新增 {stats['added']}, "
f"更新 {stats['updated']}, "
f"跳过 {stats['skipped']}")
return stats
def _compute_hash(self, content: str) -> str:
return hashlib.md5(content.encode()).hexdigest()
def _remove_by_source(self, source: str) -> None:
"""删除指定来源的所有文档"""
# 根据不同的向量数据库实现删除逻辑
pass

索引质量验证

"""
索引质量检查工具
"""
class IndexValidator:
"""索引质量验证器"""
def validate(self, store, test_queries: list[dict]) -> dict:
"""
验证索引质量
Args:
store: 向量存储
test_queries: 测试查询列表,
每个包含 query 和 expected_source
"""
results = {
"total": len(test_queries),
"hit": 0,
"miss": 0,
"details": []
}
for tq in test_queries:
query = tq["query"]
expected = tq["expected_source"]
# 执行检索
search_results = store.search(
query_embedding=get_embedding(query),
top_k=3
)
# 检查期望的文档是否在结果中
found_sources = [
r.get("metadata", {}).get("source", "")
for r in search_results
]
hit = expected in found_sources
if hit:
results["hit"] += 1
else:
results["miss"] += 1
results["details"].append({
"query": query,
"expected": expected,
"hit": hit,
"top_result": found_sources[0] if found_sources else None
})
accuracy = results["hit"] / results["total"] * 100
print(f"索引质量: {accuracy:.1f}% "
f"({results['hit']}/{results['total']})")
return results

本章小结

下一章:我们将学习检索优化技术,包括混合检索和重排序。