完整项目实战:智能客服 RAG 系统
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read287 words

完整项目实战:智能客服 RAG 系统

本章通过构建一个完整的智能客服系统,串联前面所有章节的知识。从零到生产级,一步步实现。

项目目标

graph LR A[用户提问] --> B[智能客服 RAG] B --> C[精准回答 + 引用来源] B --> D[文档管理] B --> E[多轮对话] B --> F[自动升级人工] style B fill:#e3f2fd,stroke:#1976d2,stroke-width:3px

功能需求: - 从产品文档中检索答案 - 多轮对话支持 - 无法回答时自动转人工 - 回答附带引用来源

项目结构

customer-service-rag/
├── app.py              # FastAPI 主应用
├── rag/
│   ├── __init__.py
│   ├── loader.py       # 文档加载
│   ├── chunker.py      # 文本切分
│   ├── indexer.py       # 向量索引
│   ├── retriever.py     # 检索器
│   ├── generator.py     # 生成器
│   └── pipeline.py      # RAG Pipeline
├── config.py            # 配置
├── docs/                # 产品文档
└── requirements.txt

第一步:文档加载与切分

"""
rag/loader.py - 文档加载
"""
from pathlib import Path
class DocumentLoader:
"""加载产品文档"""
def load_directory(self, docs_dir: str) -> list[dict]:
"""加载目录下所有 Markdown 文档"""
documents = []
docs_path = Path(docs_dir)
for md_file in sorted(docs_path.rglob("*.md")):
text = md_file.read_text(encoding="utf-8")
# 提取标题
title = md_file.stem
for line in text.split("\n"):
if line.startswith("# "):
title = line[2:].strip()
break
documents.append({
"id": str(md_file.relative_to(docs_path)),
"title": title,
"content": text,
"source": str(md_file),
})
print(f"加载了 {len(documents)} 个文档")
return documents
"""
rag/chunker.py - 文本切分
"""
class SmartChunker:
"""按语义边界切分文本"""
def __init__(self, chunk_size: int = 500, overlap: int = 50):
self.chunk_size = chunk_size
self.overlap = overlap
def chunk_documents(self, documents: list[dict]) -> list[dict]:
"""切分所有文档"""
all_chunks = []
for doc in documents:
chunks = self._split_by_headers(doc["content"])
for i, chunk_text in enumerate(chunks):
all_chunks.append({
"id": f"{doc['id']}_chunk_{i}",
"text": chunk_text,
"source": doc["source"],
"title": doc["title"],
"chunk_index": i,
})
print(f"切分为 {len(all_chunks)} 个文本块")
return all_chunks
def _split_by_headers(self, text: str) -> list[str]:
"""按 Markdown 标题切分"""
sections = []
current = []
for line in text.split("\n"):
if line.startswith("## ") and current:
section_text = "\n".join(current).strip()
if section_text:
sections.extend(
self._split_long_section(section_text)
)
current = [line]
else:
current.append(line)
if current:
section_text = "\n".join(current).strip()
if section_text:
sections.extend(self._split_long_section(section_text))
return sections
def _split_long_section(self, text: str) -> list[str]:
"""对过长的段落进一步切分"""
if len(text) <= self.chunk_size:
return [text]
chunks = []
words = text.split("\n\n")
current_chunk = []
current_len = 0
for para in words:
if current_len + len(para) > self.chunk_size and current_chunk:
chunks.append("\n\n".join(current_chunk))
# 保留重叠
overlap_text = current_chunk[-1] if current_chunk else ""
current_chunk = [overlap_text, para] if overlap_text else [para]
current_len = sum(len(c) for c in current_chunk)
else:
current_chunk.append(para)
current_len += len(para)
if current_chunk:
chunks.append("\n\n".join(current_chunk))
return chunks

第二步:向量索引

"""
rag/indexer.py - 向量索引
"""
import chromadb
from openai import OpenAI
class VectorIndexer:
"""向量索引管理"""
def __init__(self, collection_name: str = "customer_service"):
self.openai = OpenAI()
self.chroma = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.chroma.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"},
)
def index_chunks(self, chunks: list[dict]) -> None:
"""批量索引文本块"""
batch_size = 100
total = len(chunks)
for start in range(0, total, batch_size):
batch = chunks[start:start + batch_size]
# 获取 Embedding
texts = [c["text"] for c in batch]
embeddings = self._get_embeddings(texts)
# 写入 ChromaDB
self.collection.add(
ids=[c["id"] for c in batch],
embeddings=embeddings,
documents=texts,
metadatas=[{
"source": c["source"],
"title": c["title"],
"chunk_index": c["chunk_index"],
} for c in batch],
)
print(f"  已索引 {min(start + batch_size, total)}/{total}")
print(f"索引完成,共 {self.collection.count()} 条记录")
def _get_embeddings(self, texts: list[str]) -> list[list[float]]:
"""获取文本向量"""
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=texts,
)
return [d.embedding for d in response.data]

第三步:检索器

"""
rag/retriever.py - 混合检索
"""
class HybridRetriever:
"""混合检索器:向量 + 关键词"""
def __init__(self, indexer):
self.indexer = indexer
self.openai = OpenAI()
def retrieve(self, query: str, top_k: int = 5) -> list[dict]:
"""检索相关文档"""
# 向量检索
query_embedding = self._embed(query)
results = self.indexer.collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
)
# 格式化结果
documents = []
for i in range(len(results["ids"][0])):
documents.append({
"id": results["ids"][0][i],
"text": results["documents"][0][i],
"score": 1 - results["distances"][0][i],
"metadata": results["metadatas"][0][i],
})
return documents
def _embed(self, text: str) -> list[float]:
"""获取查询向量"""
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding

第四步:生成器

"""
rag/generator.py - 回答生成
"""
from openai import OpenAI
SYSTEM_PROMPT = """你是一个专业的客服助手。请根据提供的参考资料回答用户问题。
规则:
1. 只根据参考资料回答,不要编造信息
2. 如果参考资料中没有相关信息,回答"抱歉,我无法在文档中找到相关信息,建议您联系人工客服"
3. 回答要简洁专业
4. 在回答末尾标注引用来源"""
class AnswerGenerator:
"""回答生成器"""
def __init__(self, model: str = "gpt-4o-mini"):
self.client = OpenAI()
self.model = model
self.confidence_threshold = 0.3
def generate(
self,
question: str,
documents: list[dict],
chat_history: list[dict] = None,
) -> dict:
"""生成回答"""
# 检查检索质量
if not documents or documents[0]["score"] < self.confidence_threshold:
return {
"answer": "抱歉,我无法在文档中找到相关信息,建议您联系人工客服。",
"sources": [],
"confidence": 0,
"escalate": True,
}
# 构造上下文
context = self._format_context(documents)
# 构造消息
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
# 加入对话历史
if chat_history:
messages.extend(chat_history[-6:])  # 最近 3 轮对话
messages.append({
"role": "user",
"content": f"参考资料:\n{context}\n\n用户问题:{question}",
})
# 调用 LLM
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.1,
max_tokens=500,
)
answer = response.choices[0].message.content
# 提取引用
sources = [
{"title": d["metadata"]["title"], "source": d["metadata"]["source"]}
for d in documents[:3]
]
return {
"answer": answer,
"sources": sources,
"confidence": documents[0]["score"],
"escalate": False,
}
def _format_context(self, documents: list[dict]) -> str:
"""格式化上下文"""
parts = []
for i, doc in enumerate(documents[:3], 1):
parts.append(f"[{i}] {doc['metadata']['title']}\n{doc['text']}")
return "\n\n---\n\n".join(parts)

第五步:完整 Pipeline

"""
rag/pipeline.py - RAG 完整流水线
"""
class RAGPipeline:
"""完整 RAG 流水线"""
def __init__(self, docs_dir: str = "./docs"):
self.loader = DocumentLoader()
self.chunker = SmartChunker(chunk_size=500, overlap=50)
self.indexer = VectorIndexer()
self.retriever = HybridRetriever(self.indexer)
self.generator = AnswerGenerator()
self.docs_dir = docs_dir
self._initialized = False
def initialize(self) -> None:
"""初始化:加载文档并建立索引"""
if self._initialized:
return
print("=== 初始化 RAG Pipeline ===")
# 加载文档
documents = self.loader.load_directory(self.docs_dir)
# 切分
chunks = self.chunker.chunk_documents(documents)
# 索引
self.indexer.index_chunks(chunks)
self._initialized = True
print("=== 初始化完成 ===\n")
def query(
self, question: str, chat_history: list[dict] = None
) -> dict:
"""处理用户查询"""
if not self._initialized:
self.initialize()
# 检索
documents = self.retriever.retrieve(question, top_k=5)
# 生成
result = self.generator.generate(
question=question,
documents=documents,
chat_history=chat_history,
)
return result

运行效果

"""
app.py - 主程序
"""
pipeline = RAGPipeline(docs_dir="./docs")
pipeline.initialize()
# 模拟对话
chat_history = []
questions = [
"你们的产品支持哪些文件格式?",
"如何导出 PDF?",
"价格方案有哪些?",
]
for q in questions:
print(f"\n用户: {q}")
result = pipeline.query(q, chat_history)
print(f"客服: {result['answer']}")
print(f"  置信度: {result['confidence']:.2f}")
print(f"  需要转人工: {result['escalate']}")
# 维护对话历史
chat_history.append({"role": "user", "content": q})
chat_history.append({"role": "assistant", "content": result["answer"]})

项目总结

graph TB A[本书知识点总结] --> B[基础] A --> C[工程] A --> D[高级] A --> E[生产] B --> B1[Embedding 原理] B --> B2[向量检索] B --> B3[RAG 架构] C --> C1[文档处理] C --> C2[Chunk 策略] C --> C3[索引构建] D --> D1[混合检索] D --> D2[重排序] D --> D3[Self-RAG / GraphRAG] E --> E1[语义缓存] E --> E2[成本控制] E --> E3[可观测性] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px

恭喜你完成了 RAG 检索增强生成实战指南的全部学习!

本章小结

感谢阅读!希望本书对你构建 RAG 系统有所帮助。