安全与权限控制
High Contrast
Dark Mode
Light Mode
Sepia
Forest
2 min read396 words

安全与权限控制

RAG 系统直接暴露企业内部知识,安全问题不容忽视。一个泄露的查询接口可能让敏感信息流出,一次 Prompt Injection 可能绕过所有访问控制。

RAG 安全威胁

graph TB A[RAG 安全威胁] --> B[数据泄露] A --> C[Prompt Injection] A --> D[权限越权] A --> E[投毒攻击] B --> B1[敏感文档通过 RAG 泄露] C --> C1[恶意 Prompt 绕过限制] D --> D1[低权限用户访问高密文档] E --> E1[注入恶意文档影响检索] style A fill:#ffebee,stroke:#c62828,stroke-width:3px style B fill:#fff3e0,stroke:#e65100,stroke-width:2px style C fill:#fff3e0,stroke:#e65100,stroke-width:2px style D fill:#fff3e0,stroke:#e65100,stroke-width:2px style E fill:#fff3e0,stroke:#e65100,stroke-width:2px
威胁 风险等级 攻击方式 影响
数据泄露 构造查询提取敏感信息 隐私/合规违规
Prompt Injection 在文档或查询中注入指令 绕过安全策略
权限越权 低权限用户获取受限文档 信息不对称
投毒攻击 上传恶意文档污染知识库 误导回答
推理攻击 多次查询推断敏感信息 隐式泄露

文档级权限控制

"""
文档级别的访问控制
"""
from dataclasses import dataclass, field
from enum import Enum
class AccessLevel(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
@dataclass
class DocumentACL:
"""文档访问控制列表"""
doc_id: str
access_level: AccessLevel
allowed_roles: list[str] = field(default_factory=list)
allowed_users: list[str] = field(default_factory=list)
denied_users: list[str] = field(default_factory=list)
@dataclass
class UserContext:
"""用户上下文"""
user_id: str
roles: list[str]
department: str
access_level: AccessLevel
class ACLFilteredRetriever:
"""带权限过滤的检索器"""
def __init__(self, base_retriever, acl_store: dict[str, DocumentACL]):
self.retriever = base_retriever
self.acl_store = acl_store
def search(self, query: str, user: UserContext, top_k: int = 5) -> list[dict]:
"""带权限过滤的检索"""
# 先检索更多候选
candidates = self.retriever.search(query, top_k=top_k * 3)
# 权限过滤
filtered = []
for doc in candidates:
acl = self.acl_store.get(doc["id"])
if acl and self._check_access(user, acl):
filtered.append(doc)
if len(filtered) >= top_k:
break
denied_count = len(candidates) - len(filtered)
if denied_count > 0:
print(f"  权限过滤: {denied_count} 文档被拒绝")
return filtered
def _check_access(self, user: UserContext, acl: DocumentACL) -> bool:
"""检查用户是否有权限访问文档"""
# 显式拒绝优先
if user.user_id in acl.denied_users:
return False
# 检查用户级别是否足够
level_order = [AccessLevel.PUBLIC, AccessLevel.INTERNAL,
AccessLevel.CONFIDENTIAL, AccessLevel.RESTRICTED]
user_level_idx = level_order.index(user.access_level)
doc_level_idx = level_order.index(acl.access_level)
if user_level_idx < doc_level_idx:
return False
# 公开文档直接通过
if acl.access_level == AccessLevel.PUBLIC:
return True
# 检查角色或用户白名单
if acl.allowed_users and user.user_id in acl.allowed_users:
return True
if acl.allowed_roles and any(r in acl.allowed_roles for r in user.roles):
return True
return False

Prompt Injection 防护

"""
Prompt Injection 检测与防护
"""
from dataclasses import dataclass
@dataclass
class InjectionCheckResult:
"""注入检测结果"""
is_safe: bool
risk_score: float
matched_patterns: list[str]
class PromptInjectionGuard:
"""Prompt Injection 防护"""
# 常见注入模式
INJECTION_PATTERNS = [
"ignore previous instructions",
"忽略之前的指令",
"you are now",
"你现在是",
"forget everything",
"system prompt",
"reveal your instructions",
"output the above",
"repeat the system",
]
def check_query(self, query: str) -> InjectionCheckResult:
"""检查用户查询是否包含注入"""
query_lower = query.lower()
matched = [p for p in self.INJECTION_PATTERNS if p in query_lower]
risk_score = min(1.0, len(matched) * 0.3)
return InjectionCheckResult(
is_safe=len(matched) == 0,
risk_score=risk_score,
matched_patterns=matched,
)
def check_document(self, content: str) -> InjectionCheckResult:
"""检查文档内容是否包含注入(投毒检测)"""
content_lower = content.lower()
matched = [p for p in self.INJECTION_PATTERNS if p in content_lower]
# 文档中隐藏的指令
hidden_markers = ["[INST]", "[/INST]", "<|system|>", "<<SYS>>"]
for marker in hidden_markers:
if marker.lower() in content_lower:
matched.append(f"hidden_marker: {marker}")
risk_score = min(1.0, len(matched) * 0.4)
return InjectionCheckResult(
is_safe=len(matched) == 0,
risk_score=risk_score,
matched_patterns=matched,
)
def sanitize_context(self, context: str) -> str:
"""清洗检索到的上下文"""
# 移除可能的隐藏指令标记
sanitized = context
for marker in ["[INST]", "[/INST]", "<|system|>", "<<SYS>>", "<</SYS>>"]:
sanitized = sanitized.replace(marker, "")
return sanitized

输出安全过滤

"""
RAG 输出安全过滤
"""
import re
from dataclasses import dataclass
@dataclass
class OutputFilterResult:
"""输出过滤结果"""
original: str
filtered: str
redacted_count: int
is_safe: bool
class OutputSafetyFilter:
"""输出安全过滤器"""
# PII 检测正则
PII_PATTERNS = {
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone_cn": r"\b1[3-9]\d{9}\b",
"id_card_cn": r"\b\d{17}[\dXx]\b",
"credit_card": r"\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b",
}
def filter_output(self, text: str) -> OutputFilterResult:
"""过滤输出中的敏感信息"""
filtered = text
total_redacted = 0
for pii_type, pattern in self.PII_PATTERNS.items():
matches = re.findall(pattern, filtered)
for match in matches:
filtered = filtered.replace(match, f"[{pii_type.upper()}_REDACTED]")
total_redacted += 1
return OutputFilterResult(
original=text,
filtered=filtered,
redacted_count=total_redacted,
is_safe=total_redacted == 0,
)

安全架构最佳实践

graph TB A[安全 RAG 架构] --> B[输入层] A --> C[检索层] A --> D[生成层] A --> E[输出层] B --> B1[查询注入检测] B --> B2[速率限制] B --> B3[身份认证] C --> C1[文档级 ACL] C --> C2[行级权限] C --> C3[审计日志] D --> D1[上下文清洗] D --> D2[System Prompt 加固] D --> D3[温度控制] E --> E1[PII 脱敏] E --> E2[内容审核] E --> E3[引用溯源] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
层级 防护措施 优先级
输入层 Prompt Injection 检测 + 速率限制 P0
检索层 文档级 ACL + 审计日志 P0
生成层 上下文清洗 + System Prompt 加固 P1
输出层 PII 脱敏 + 内容审核 P1
基础设施 加密传输 + 密钥管理 + 数据隔离 P0

本章小结

主题 要点
核心威胁 数据泄露、Prompt Injection、权限越权
ACL 过滤 检索时实时过滤无权限文档
注入防护 模式匹配 + LLM 检测双重防线
输出过滤 PII 脱敏保护用户隐私
最佳实践 四层纵深防御,审计日志不可少

下一章:成本优化与缓存策略