企业知识库实战
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read278 words

企业知识库实战

企业知识库是 RAG 最经典的应用场景。从几百份内部文档到几十万份技术手册,RAG 让员工用自然语言检索到精准答案。

企业知识库架构

graph TB A[数据源] --> B[文档处理管道] A1[Confluence] --> A A2[SharePoint] --> A A3[Google Drive] --> A A4[内部 Wiki] --> A B --> C[向量索引] B --> D[全文索引] B --> E[元数据索引] F[用户查询] --> G[混合检索] C --> G D --> G E --> G G --> H[权限过滤] H --> I[LLM 生成] I --> J[回答 + 引用] style F fill:#e3f2fd,stroke:#1976d2,stroke-width:2px style J fill:#c8e6c9,stroke:#388e3c,stroke-width:3px

文档采集管道

"""
企业文档采集管道
"""
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import hashlib
import time
class DocSource(Enum):
CONFLUENCE = "confluence"
SHAREPOINT = "sharepoint"
LOCAL_FS = "local_filesystem"
GIT_REPO = "git_repository"
@dataclass
class RawDocument:
"""原始文档"""
doc_id: str
title: str
content: str
source: DocSource
source_url: str = ""
author: str = ""
updated_at: float = 0.0
metadata: dict = field(default_factory=dict)
@dataclass
class ProcessedDocument:
"""处理后的文档"""
doc_id: str
chunks: list[dict]
total_tokens: int
processing_time: float
class DocumentIngestionPipeline:
"""文档摄取管道"""
def __init__(self, chunker, embedder, vector_store):
self.chunker = chunker
self.embedder = embedder
self.store = vector_store
self._doc_hashes: dict[str, str] = {}
def ingest(self, documents: list[RawDocument]) -> dict:
"""批量摄取文档"""
stats = {"processed": 0, "skipped": 0, "failed": 0, "chunks_created": 0}
start_time = time.time()
for doc in documents:
# 增量更新:检查文档是否变化
content_hash = hashlib.md5(doc.content.encode()).hexdigest()
if self._doc_hashes.get(doc.doc_id) == content_hash:
stats["skipped"] += 1
continue
try:
# 切分
chunks = self.chunker.split(doc.content, metadata={
"doc_id": doc.doc_id,
"title": doc.title,
"source": doc.source.value,
"author": doc.author,
})
# 向量化
for chunk in chunks:
chunk["embedding"] = self.embedder.embed(chunk["text"])
# 存储(先删旧的,再插新的)
self.store.delete_by_doc_id(doc.doc_id)
self.store.upsert(chunks)
self._doc_hashes[doc.doc_id] = content_hash
stats["processed"] += 1
stats["chunks_created"] += len(chunks)
except Exception as e:
print(f"  文档处理失败 {doc.doc_id}: {e}")
stats["failed"] += 1
stats["duration_seconds"] = round(time.time() - start_time, 2)
return stats
def sync_from_confluence(self, base_url: str, space_key: str) -> list[RawDocument]:
"""从 Confluence 同步文档(框架示意)"""
# 实际需要 atlassian-python-api
print(f"  同步 Confluence: {base_url}/wiki/spaces/{space_key}")
return []

多租户知识库

"""
多租户知识库隔离
"""
from dataclasses import dataclass
@dataclass
class Tenant:
"""租户"""
tenant_id: str
name: str
namespace: str
max_documents: int = 10000
max_queries_per_day: int = 5000
class MultiTenantKnowledgeBase:
"""多租户知识库"""
def __init__(self, vector_store):
self.store = vector_store
self.tenants: dict[str, Tenant] = {}
def register_tenant(self, tenant: Tenant) -> None:
"""注册租户"""
self.tenants[tenant.tenant_id] = tenant
# 在向量库中创建命名空间
self.store.create_namespace(tenant.namespace)
def query(self, tenant_id: str, query: str, top_k: int = 5) -> list[dict]:
"""租户隔离的查询"""
tenant = self.tenants.get(tenant_id)
if not tenant:
raise ValueError(f"未知租户: {tenant_id}")
# 只在租户的命名空间中检索
return self.store.search(
query=query,
top_k=top_k,
namespace=tenant.namespace,
)

知识库质量维护

阶段 操作 频率 工具
采集 增量同步新文档 每日 定时任务
清洗 检测重复/过期文档 每周 哈希去重
评估 抽样评估检索质量 每周 评估集
更新 重新 Embedding 过时向量 每月 全量重建
审计 查看用户查询日志 每月 监控面板

本章小结

主题 要点
架构设计 多数据源采集 + 混合检索 + 权限过滤
增量更新 哈希对比避免重复处理
多租户 命名空间隔离,独立配额
质量维护 定期评估、清洗、重建索引
关键挑战 文档格式多样、权限复杂、时效性

下一章:智能法律助手