企业知识库实战
企业知识库是 RAG 最经典的应用场景。从几百份内部文档到几十万份技术手册,RAG 让员工用自然语言检索到精准答案。
企业知识库架构
graph TB
A[数据源] --> B[文档处理管道]
A1[Confluence] --> A
A2[SharePoint] --> A
A3[Google Drive] --> A
A4[内部 Wiki] --> A
B --> C[向量索引]
B --> D[全文索引]
B --> E[元数据索引]
F[用户查询] --> G[混合检索]
C --> G
D --> G
E --> G
G --> H[权限过滤]
H --> I[LLM 生成]
I --> J[回答 + 引用]
style F fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style J fill:#c8e6c9,stroke:#388e3c,stroke-width:3px
文档采集管道
"""
企业文档采集管道
"""
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import hashlib
import time
class DocSource(Enum):
CONFLUENCE = "confluence"
SHAREPOINT = "sharepoint"
LOCAL_FS = "local_filesystem"
GIT_REPO = "git_repository"
@dataclass
class RawDocument:
"""原始文档"""
doc_id: str
title: str
content: str
source: DocSource
source_url: str = ""
author: str = ""
updated_at: float = 0.0
metadata: dict = field(default_factory=dict)
@dataclass
class ProcessedDocument:
"""处理后的文档"""
doc_id: str
chunks: list[dict]
total_tokens: int
processing_time: float
class DocumentIngestionPipeline:
"""文档摄取管道"""
def __init__(self, chunker, embedder, vector_store):
self.chunker = chunker
self.embedder = embedder
self.store = vector_store
self._doc_hashes: dict[str, str] = {}
def ingest(self, documents: list[RawDocument]) -> dict:
"""批量摄取文档"""
stats = {"processed": 0, "skipped": 0, "failed": 0, "chunks_created": 0}
start_time = time.time()
for doc in documents:
# 增量更新:检查文档是否变化
content_hash = hashlib.md5(doc.content.encode()).hexdigest()
if self._doc_hashes.get(doc.doc_id) == content_hash:
stats["skipped"] += 1
continue
try:
# 切分
chunks = self.chunker.split(doc.content, metadata={
"doc_id": doc.doc_id,
"title": doc.title,
"source": doc.source.value,
"author": doc.author,
})
# 向量化
for chunk in chunks:
chunk["embedding"] = self.embedder.embed(chunk["text"])
# 存储(先删旧的,再插新的)
self.store.delete_by_doc_id(doc.doc_id)
self.store.upsert(chunks)
self._doc_hashes[doc.doc_id] = content_hash
stats["processed"] += 1
stats["chunks_created"] += len(chunks)
except Exception as e:
print(f" 文档处理失败 {doc.doc_id}: {e}")
stats["failed"] += 1
stats["duration_seconds"] = round(time.time() - start_time, 2)
return stats
def sync_from_confluence(self, base_url: str, space_key: str) -> list[RawDocument]:
"""从 Confluence 同步文档(框架示意)"""
# 实际需要 atlassian-python-api
print(f" 同步 Confluence: {base_url}/wiki/spaces/{space_key}")
return []
多租户知识库
"""
多租户知识库隔离
"""
from dataclasses import dataclass
@dataclass
class Tenant:
"""租户"""
tenant_id: str
name: str
namespace: str
max_documents: int = 10000
max_queries_per_day: int = 5000
class MultiTenantKnowledgeBase:
"""多租户知识库"""
def __init__(self, vector_store):
self.store = vector_store
self.tenants: dict[str, Tenant] = {}
def register_tenant(self, tenant: Tenant) -> None:
"""注册租户"""
self.tenants[tenant.tenant_id] = tenant
# 在向量库中创建命名空间
self.store.create_namespace(tenant.namespace)
def query(self, tenant_id: str, query: str, top_k: int = 5) -> list[dict]:
"""租户隔离的查询"""
tenant = self.tenants.get(tenant_id)
if not tenant:
raise ValueError(f"未知租户: {tenant_id}")
# 只在租户的命名空间中检索
return self.store.search(
query=query,
top_k=top_k,
namespace=tenant.namespace,
)
知识库质量维护
| 阶段 | 操作 | 频率 | 工具 |
|---|---|---|---|
| 采集 | 增量同步新文档 | 每日 | 定时任务 |
| 清洗 | 检测重复/过期文档 | 每周 | 哈希去重 |
| 评估 | 抽样评估检索质量 | 每周 | 评估集 |
| 更新 | 重新 Embedding 过时向量 | 每月 | 全量重建 |
| 审计 | 查看用户查询日志 | 每月 | 监控面板 |
本章小结
| 主题 | 要点 |
|---|---|
| 架构设计 | 多数据源采集 + 混合检索 + 权限过滤 |
| 增量更新 | 哈希对比避免重复处理 |
| 多租户 | 命名空间隔离,独立配额 |
| 质量维护 | 定期评估、清洗、重建索引 |
| 关键挑战 | 文档格式多样、权限复杂、时效性 |
下一章:智能法律助手