完整项目实战:智能客服 RAG 系统
本章通过构建一个完整的智能客服系统,串联前面所有章节的知识。从零到生产级,一步步实现。
项目目标
graph LR
A[用户提问] --> B[智能客服 RAG]
B --> C[精准回答 + 引用来源]
B --> D[文档管理]
B --> E[多轮对话]
B --> F[自动升级人工]
style B fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
功能需求: - 从产品文档中检索答案 - 多轮对话支持 - 无法回答时自动转人工 - 回答附带引用来源
项目结构
customer-service-rag/
├── app.py # FastAPI 主应用
├── rag/
│ ├── __init__.py
│ ├── loader.py # 文档加载
│ ├── chunker.py # 文本切分
│ ├── indexer.py # 向量索引
│ ├── retriever.py # 检索器
│ ├── generator.py # 生成器
│ └── pipeline.py # RAG Pipeline
├── config.py # 配置
├── docs/ # 产品文档
└── requirements.txt
第一步:文档加载与切分
"""
rag/loader.py - 文档加载
"""
from pathlib import Path
class DocumentLoader:
"""加载产品文档"""
def load_directory(self, docs_dir: str) -> list[dict]:
"""加载目录下所有 Markdown 文档"""
documents = []
docs_path = Path(docs_dir)
for md_file in sorted(docs_path.rglob("*.md")):
text = md_file.read_text(encoding="utf-8")
# 提取标题
title = md_file.stem
for line in text.split("\n"):
if line.startswith("# "):
title = line[2:].strip()
break
documents.append({
"id": str(md_file.relative_to(docs_path)),
"title": title,
"content": text,
"source": str(md_file),
})
print(f"加载了 {len(documents)} 个文档")
return documents
"""
rag/chunker.py - 文本切分
"""
class SmartChunker:
"""按语义边界切分文本"""
def __init__(self, chunk_size: int = 500, overlap: int = 50):
self.chunk_size = chunk_size
self.overlap = overlap
def chunk_documents(self, documents: list[dict]) -> list[dict]:
"""切分所有文档"""
all_chunks = []
for doc in documents:
chunks = self._split_by_headers(doc["content"])
for i, chunk_text in enumerate(chunks):
all_chunks.append({
"id": f"{doc['id']}_chunk_{i}",
"text": chunk_text,
"source": doc["source"],
"title": doc["title"],
"chunk_index": i,
})
print(f"切分为 {len(all_chunks)} 个文本块")
return all_chunks
def _split_by_headers(self, text: str) -> list[str]:
"""按 Markdown 标题切分"""
sections = []
current = []
for line in text.split("\n"):
if line.startswith("## ") and current:
section_text = "\n".join(current).strip()
if section_text:
sections.extend(
self._split_long_section(section_text)
)
current = [line]
else:
current.append(line)
if current:
section_text = "\n".join(current).strip()
if section_text:
sections.extend(self._split_long_section(section_text))
return sections
def _split_long_section(self, text: str) -> list[str]:
"""对过长的段落进一步切分"""
if len(text) <= self.chunk_size:
return [text]
chunks = []
words = text.split("\n\n")
current_chunk = []
current_len = 0
for para in words:
if current_len + len(para) > self.chunk_size and current_chunk:
chunks.append("\n\n".join(current_chunk))
# 保留重叠
overlap_text = current_chunk[-1] if current_chunk else ""
current_chunk = [overlap_text, para] if overlap_text else [para]
current_len = sum(len(c) for c in current_chunk)
else:
current_chunk.append(para)
current_len += len(para)
if current_chunk:
chunks.append("\n\n".join(current_chunk))
return chunks
第二步:向量索引
"""
rag/indexer.py - 向量索引
"""
import chromadb
from openai import OpenAI
class VectorIndexer:
"""向量索引管理"""
def __init__(self, collection_name: str = "customer_service"):
self.openai = OpenAI()
self.chroma = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.chroma.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"},
)
def index_chunks(self, chunks: list[dict]) -> None:
"""批量索引文本块"""
batch_size = 100
total = len(chunks)
for start in range(0, total, batch_size):
batch = chunks[start:start + batch_size]
# 获取 Embedding
texts = [c["text"] for c in batch]
embeddings = self._get_embeddings(texts)
# 写入 ChromaDB
self.collection.add(
ids=[c["id"] for c in batch],
embeddings=embeddings,
documents=texts,
metadatas=[{
"source": c["source"],
"title": c["title"],
"chunk_index": c["chunk_index"],
} for c in batch],
)
print(f" 已索引 {min(start + batch_size, total)}/{total}")
print(f"索引完成,共 {self.collection.count()} 条记录")
def _get_embeddings(self, texts: list[str]) -> list[list[float]]:
"""获取文本向量"""
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=texts,
)
return [d.embedding for d in response.data]
第三步:检索器
"""
rag/retriever.py - 混合检索
"""
class HybridRetriever:
"""混合检索器:向量 + 关键词"""
def __init__(self, indexer):
self.indexer = indexer
self.openai = OpenAI()
def retrieve(self, query: str, top_k: int = 5) -> list[dict]:
"""检索相关文档"""
# 向量检索
query_embedding = self._embed(query)
results = self.indexer.collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
)
# 格式化结果
documents = []
for i in range(len(results["ids"][0])):
documents.append({
"id": results["ids"][0][i],
"text": results["documents"][0][i],
"score": 1 - results["distances"][0][i],
"metadata": results["metadatas"][0][i],
})
return documents
def _embed(self, text: str) -> list[float]:
"""获取查询向量"""
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
第四步:生成器
"""
rag/generator.py - 回答生成
"""
from openai import OpenAI
SYSTEM_PROMPT = """你是一个专业的客服助手。请根据提供的参考资料回答用户问题。
规则:
1. 只根据参考资料回答,不要编造信息
2. 如果参考资料中没有相关信息,回答"抱歉,我无法在文档中找到相关信息,建议您联系人工客服"
3. 回答要简洁专业
4. 在回答末尾标注引用来源"""
class AnswerGenerator:
"""回答生成器"""
def __init__(self, model: str = "gpt-4o-mini"):
self.client = OpenAI()
self.model = model
self.confidence_threshold = 0.3
def generate(
self,
question: str,
documents: list[dict],
chat_history: list[dict] = None,
) -> dict:
"""生成回答"""
# 检查检索质量
if not documents or documents[0]["score"] < self.confidence_threshold:
return {
"answer": "抱歉,我无法在文档中找到相关信息,建议您联系人工客服。",
"sources": [],
"confidence": 0,
"escalate": True,
}
# 构造上下文
context = self._format_context(documents)
# 构造消息
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
# 加入对话历史
if chat_history:
messages.extend(chat_history[-6:]) # 最近 3 轮对话
messages.append({
"role": "user",
"content": f"参考资料:\n{context}\n\n用户问题:{question}",
})
# 调用 LLM
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.1,
max_tokens=500,
)
answer = response.choices[0].message.content
# 提取引用
sources = [
{"title": d["metadata"]["title"], "source": d["metadata"]["source"]}
for d in documents[:3]
]
return {
"answer": answer,
"sources": sources,
"confidence": documents[0]["score"],
"escalate": False,
}
def _format_context(self, documents: list[dict]) -> str:
"""格式化上下文"""
parts = []
for i, doc in enumerate(documents[:3], 1):
parts.append(f"[{i}] {doc['metadata']['title']}\n{doc['text']}")
return "\n\n---\n\n".join(parts)
第五步:完整 Pipeline
"""
rag/pipeline.py - RAG 完整流水线
"""
class RAGPipeline:
"""完整 RAG 流水线"""
def __init__(self, docs_dir: str = "./docs"):
self.loader = DocumentLoader()
self.chunker = SmartChunker(chunk_size=500, overlap=50)
self.indexer = VectorIndexer()
self.retriever = HybridRetriever(self.indexer)
self.generator = AnswerGenerator()
self.docs_dir = docs_dir
self._initialized = False
def initialize(self) -> None:
"""初始化:加载文档并建立索引"""
if self._initialized:
return
print("=== 初始化 RAG Pipeline ===")
# 加载文档
documents = self.loader.load_directory(self.docs_dir)
# 切分
chunks = self.chunker.chunk_documents(documents)
# 索引
self.indexer.index_chunks(chunks)
self._initialized = True
print("=== 初始化完成 ===\n")
def query(
self, question: str, chat_history: list[dict] = None
) -> dict:
"""处理用户查询"""
if not self._initialized:
self.initialize()
# 检索
documents = self.retriever.retrieve(question, top_k=5)
# 生成
result = self.generator.generate(
question=question,
documents=documents,
chat_history=chat_history,
)
return result
运行效果
"""
app.py - 主程序
"""
pipeline = RAGPipeline(docs_dir="./docs")
pipeline.initialize()
# 模拟对话
chat_history = []
questions = [
"你们的产品支持哪些文件格式?",
"如何导出 PDF?",
"价格方案有哪些?",
]
for q in questions:
print(f"\n用户: {q}")
result = pipeline.query(q, chat_history)
print(f"客服: {result['answer']}")
print(f" 置信度: {result['confidence']:.2f}")
print(f" 需要转人工: {result['escalate']}")
# 维护对话历史
chat_history.append({"role": "user", "content": q})
chat_history.append({"role": "assistant", "content": result["answer"]})
项目总结
graph TB
A[本书知识点总结] --> B[基础]
A --> C[工程]
A --> D[高级]
A --> E[生产]
B --> B1[Embedding 原理]
B --> B2[向量检索]
B --> B3[RAG 架构]
C --> C1[文档处理]
C --> C2[Chunk 策略]
C --> C3[索引构建]
D --> D1[混合检索]
D --> D2[重排序]
D --> D3[Self-RAG / GraphRAG]
E --> E1[语义缓存]
E --> E2[成本控制]
E --> E3[可观测性]
style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
恭喜你完成了 RAG 检索增强生成实战指南的全部学习!
本章小结
- 完整 RAG 项目包含:加载 → 切分 → 索引 → 检索 → 生成 五大模块
- 使用 ChromaDB 作为向量存储,OpenAI Embedding 做向量化
- 通过置信度阈值判断是否需要转人工
- 多轮对话通过维护 chat_history 实现
- 生产环境需要添加缓存、监控、成本控制
感谢阅读!希望本书对你构建 RAG 系统有所帮助。