安全与权限控制
RAG 系统直接暴露企业内部知识,安全问题不容忽视。一个泄露的查询接口可能让敏感信息流出,一次 Prompt Injection 可能绕过所有访问控制。
RAG 安全威胁
graph TB
A[RAG 安全威胁] --> B[数据泄露]
A --> C[Prompt Injection]
A --> D[权限越权]
A --> E[投毒攻击]
B --> B1[敏感文档通过 RAG 泄露]
C --> C1[恶意 Prompt 绕过限制]
D --> D1[低权限用户访问高密文档]
E --> E1[注入恶意文档影响检索]
style A fill:#ffebee,stroke:#c62828,stroke-width:3px
style B fill:#fff3e0,stroke:#e65100,stroke-width:2px
style C fill:#fff3e0,stroke:#e65100,stroke-width:2px
style D fill:#fff3e0,stroke:#e65100,stroke-width:2px
style E fill:#fff3e0,stroke:#e65100,stroke-width:2px
| 威胁 | 风险等级 | 攻击方式 | 影响 |
|---|---|---|---|
| 数据泄露 | 高 | 构造查询提取敏感信息 | 隐私/合规违规 |
| Prompt Injection | 高 | 在文档或查询中注入指令 | 绕过安全策略 |
| 权限越权 | 高 | 低权限用户获取受限文档 | 信息不对称 |
| 投毒攻击 | 中 | 上传恶意文档污染知识库 | 误导回答 |
| 推理攻击 | 中 | 多次查询推断敏感信息 | 隐式泄露 |
文档级权限控制
"""
文档级别的访问控制
"""
from dataclasses import dataclass, field
from enum import Enum
class AccessLevel(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
@dataclass
class DocumentACL:
"""文档访问控制列表"""
doc_id: str
access_level: AccessLevel
allowed_roles: list[str] = field(default_factory=list)
allowed_users: list[str] = field(default_factory=list)
denied_users: list[str] = field(default_factory=list)
@dataclass
class UserContext:
"""用户上下文"""
user_id: str
roles: list[str]
department: str
access_level: AccessLevel
class ACLFilteredRetriever:
"""带权限过滤的检索器"""
def __init__(self, base_retriever, acl_store: dict[str, DocumentACL]):
self.retriever = base_retriever
self.acl_store = acl_store
def search(self, query: str, user: UserContext, top_k: int = 5) -> list[dict]:
"""带权限过滤的检索"""
# 先检索更多候选
candidates = self.retriever.search(query, top_k=top_k * 3)
# 权限过滤
filtered = []
for doc in candidates:
acl = self.acl_store.get(doc["id"])
if acl and self._check_access(user, acl):
filtered.append(doc)
if len(filtered) >= top_k:
break
denied_count = len(candidates) - len(filtered)
if denied_count > 0:
print(f" 权限过滤: {denied_count} 文档被拒绝")
return filtered
def _check_access(self, user: UserContext, acl: DocumentACL) -> bool:
"""检查用户是否有权限访问文档"""
# 显式拒绝优先
if user.user_id in acl.denied_users:
return False
# 检查用户级别是否足够
level_order = [AccessLevel.PUBLIC, AccessLevel.INTERNAL,
AccessLevel.CONFIDENTIAL, AccessLevel.RESTRICTED]
user_level_idx = level_order.index(user.access_level)
doc_level_idx = level_order.index(acl.access_level)
if user_level_idx < doc_level_idx:
return False
# 公开文档直接通过
if acl.access_level == AccessLevel.PUBLIC:
return True
# 检查角色或用户白名单
if acl.allowed_users and user.user_id in acl.allowed_users:
return True
if acl.allowed_roles and any(r in acl.allowed_roles for r in user.roles):
return True
return False
Prompt Injection 防护
"""
Prompt Injection 检测与防护
"""
from dataclasses import dataclass
@dataclass
class InjectionCheckResult:
"""注入检测结果"""
is_safe: bool
risk_score: float
matched_patterns: list[str]
class PromptInjectionGuard:
"""Prompt Injection 防护"""
# 常见注入模式
INJECTION_PATTERNS = [
"ignore previous instructions",
"忽略之前的指令",
"you are now",
"你现在是",
"forget everything",
"system prompt",
"reveal your instructions",
"output the above",
"repeat the system",
]
def check_query(self, query: str) -> InjectionCheckResult:
"""检查用户查询是否包含注入"""
query_lower = query.lower()
matched = [p for p in self.INJECTION_PATTERNS if p in query_lower]
risk_score = min(1.0, len(matched) * 0.3)
return InjectionCheckResult(
is_safe=len(matched) == 0,
risk_score=risk_score,
matched_patterns=matched,
)
def check_document(self, content: str) -> InjectionCheckResult:
"""检查文档内容是否包含注入(投毒检测)"""
content_lower = content.lower()
matched = [p for p in self.INJECTION_PATTERNS if p in content_lower]
# 文档中隐藏的指令
hidden_markers = ["[INST]", "[/INST]", "<|system|>", "<<SYS>>"]
for marker in hidden_markers:
if marker.lower() in content_lower:
matched.append(f"hidden_marker: {marker}")
risk_score = min(1.0, len(matched) * 0.4)
return InjectionCheckResult(
is_safe=len(matched) == 0,
risk_score=risk_score,
matched_patterns=matched,
)
def sanitize_context(self, context: str) -> str:
"""清洗检索到的上下文"""
# 移除可能的隐藏指令标记
sanitized = context
for marker in ["[INST]", "[/INST]", "<|system|>", "<<SYS>>", "<</SYS>>"]:
sanitized = sanitized.replace(marker, "")
return sanitized
输出安全过滤
"""
RAG 输出安全过滤
"""
import re
from dataclasses import dataclass
@dataclass
class OutputFilterResult:
"""输出过滤结果"""
original: str
filtered: str
redacted_count: int
is_safe: bool
class OutputSafetyFilter:
"""输出安全过滤器"""
# PII 检测正则
PII_PATTERNS = {
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone_cn": r"\b1[3-9]\d{9}\b",
"id_card_cn": r"\b\d{17}[\dXx]\b",
"credit_card": r"\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b",
}
def filter_output(self, text: str) -> OutputFilterResult:
"""过滤输出中的敏感信息"""
filtered = text
total_redacted = 0
for pii_type, pattern in self.PII_PATTERNS.items():
matches = re.findall(pattern, filtered)
for match in matches:
filtered = filtered.replace(match, f"[{pii_type.upper()}_REDACTED]")
total_redacted += 1
return OutputFilterResult(
original=text,
filtered=filtered,
redacted_count=total_redacted,
is_safe=total_redacted == 0,
)
安全架构最佳实践
graph TB
A[安全 RAG 架构] --> B[输入层]
A --> C[检索层]
A --> D[生成层]
A --> E[输出层]
B --> B1[查询注入检测]
B --> B2[速率限制]
B --> B3[身份认证]
C --> C1[文档级 ACL]
C --> C2[行级权限]
C --> C3[审计日志]
D --> D1[上下文清洗]
D --> D2[System Prompt 加固]
D --> D3[温度控制]
E --> E1[PII 脱敏]
E --> E2[内容审核]
E --> E3[引用溯源]
style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
| 层级 | 防护措施 | 优先级 |
|---|---|---|
| 输入层 | Prompt Injection 检测 + 速率限制 | P0 |
| 检索层 | 文档级 ACL + 审计日志 | P0 |
| 生成层 | 上下文清洗 + System Prompt 加固 | P1 |
| 输出层 | PII 脱敏 + 内容审核 | P1 |
| 基础设施 | 加密传输 + 密钥管理 + 数据隔离 | P0 |
本章小结
| 主题 | 要点 |
|---|---|
| 核心威胁 | 数据泄露、Prompt Injection、权限越权 |
| ACL 过滤 | 检索时实时过滤无权限文档 |
| 注入防护 | 模式匹配 + LLM 检测双重防线 |
| 输出过滤 | PII 脱敏保护用户隐私 |
| 最佳实践 | 四层纵深防御,审计日志不可少 |
下一章:成本优化与缓存策略