审计日志与合规报告
完善的审计日志是 LLM 治理合规的基石。本章构建结构化日志系统和自动化合规报告生成器。
审计日志架构
graph TB
A[LLM 请求] --> B[请求拦截器]
B --> C[日志采集]
C --> D[结构化处理]
D --> E[日志存储]
E --> F[实时查询]
E --> G[合规报告]
E --> H[告警触发]
F --> I[审计面板]
G --> J[定期导出]
H --> K[事件响应]
style A fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style E fill:#fff9c4,stroke:#f9a825,stroke-width:2px
style I fill:#c8e6c9,stroke:#43a047,stroke-width:2px
结构化审计日志
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
import json
import hashlib
class AuditAction(Enum):
REQUEST = "request"
RESPONSE = "response"
BLOCK = "block"
OVERRIDE = "override"
CONFIG_CHANGE = "config_change"
ACCESS_GRANT = "access_grant"
class SensitivityLevel(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
@dataclass
class AuditLogEntry:
timestamp: str
action: AuditAction
user_id: str
session_id: str
model_id: str
input_hash: str # 不存原文,只存哈希
output_hash: str
token_count: int
latency_ms: float
sensitivity: SensitivityLevel
blocked: bool = False
block_reason: str = ""
metadata: dict = field(default_factory=dict)
def to_json(self) -> str:
return json.dumps({
"timestamp": self.timestamp,
"action": self.action.value,
"user_id": self.user_id,
"session_id": self.session_id,
"model_id": self.model_id,
"input_hash": self.input_hash,
"output_hash": self.output_hash,
"token_count": self.token_count,
"latency_ms": self.latency_ms,
"sensitivity": self.sensitivity.value,
"blocked": self.blocked,
"block_reason": self.block_reason,
"metadata": self.metadata,
}, ensure_ascii=False)
class AuditLogger:
"""审计日志记录器"""
def __init__(self):
self.entries: list[AuditLogEntry] = []
@staticmethod
def hash_content(content: str) -> str:
"""对内容做单向哈希——保护隐私同时保留可追溯性"""
return hashlib.sha256(content.encode()).hexdigest()[:16]
def log_request(
self,
user_id: str,
session_id: str,
model_id: str,
input_text: str,
output_text: str,
token_count: int,
latency_ms: float,
sensitivity: SensitivityLevel = SensitivityLevel.INTERNAL,
blocked: bool = False,
block_reason: str = "",
) -> AuditLogEntry:
entry = AuditLogEntry(
timestamp=datetime.now(timezone.utc).isoformat(),
action=AuditAction.BLOCK if blocked else AuditAction.REQUEST,
user_id=user_id,
session_id=session_id,
model_id=model_id,
input_hash=self.hash_content(input_text),
output_hash=self.hash_content(output_text),
token_count=token_count,
latency_ms=latency_ms,
sensitivity=sensitivity,
blocked=blocked,
block_reason=block_reason,
)
self.entries.append(entry)
return entry
def query(
self,
user_id: str | None = None,
action: AuditAction | None = None,
start_time: str | None = None,
end_time: str | None = None,
) -> list[AuditLogEntry]:
"""查询审计日志"""
results = self.entries
if user_id:
results = [e for e in results if e.user_id == user_id]
if action:
results = [e for e in results if e.action == action]
if start_time:
results = [e for e in results if e.timestamp >= start_time]
if end_time:
results = [e for e in results if e.timestamp <= end_time]
return results
合规报告生成器
from dataclasses import dataclass
@dataclass
class ComplianceReport:
period: str
total_requests: int
blocked_requests: int
block_rate: float
top_block_reasons: list[tuple[str, int]]
user_activity: dict[str, int]
model_usage: dict[str, int]
sensitivity_distribution: dict[str, int]
compliance_score: float
class ReportGenerator:
"""自动化合规报告生成器"""
def __init__(self, logger: AuditLogger):
self.logger = logger
def generate(self, period: str = "monthly") -> ComplianceReport:
"""生成合规报告"""
entries = self.logger.entries
total = len(entries)
blocked = [e for e in entries if e.blocked]
# 统计拦截原因
reason_counts: dict[str, int] = {}
for e in blocked:
reason = e.block_reason or "unknown"
reason_counts[reason] = reason_counts.get(reason, 0) + 1
top_reasons = sorted(
reason_counts.items(), key=lambda x: x[1], reverse=True
)[:5]
# 用户活跃度
user_counts: dict[str, int] = {}
for e in entries:
user_counts[e.user_id] = user_counts.get(e.user_id, 0) + 1
# 模型使用分布
model_counts: dict[str, int] = {}
for e in entries:
model_counts[e.model_id] = model_counts.get(e.model_id, 0) + 1
# 敏感度分布
sens_counts: dict[str, int] = {}
for e in entries:
key = e.sensitivity.value
sens_counts[key] = sens_counts.get(key, 0) + 1
# 合规评分: 拦截率越低越好,但零拦截可能意味着检测不足
block_rate = len(blocked) / total if total > 0 else 0
if block_rate < 0.01:
score = 85.0 # 可能检测不足
elif block_rate < 0.05:
score = 95.0 # 理想范围
elif block_rate < 0.15:
score = 80.0 # 正常偏高
else:
score = 60.0 # 需要关注
return ComplianceReport(
period=period,
total_requests=total,
blocked_requests=len(blocked),
block_rate=block_rate,
top_block_reasons=top_reasons,
user_activity=user_counts,
model_usage=model_counts,
sensitivity_distribution=sens_counts,
compliance_score=score,
)
def format_text(self, report: ComplianceReport) -> str:
"""输出文本格式报告"""
lines = [
f"=== LLM 治理合规报告 ({report.period}) ===",
f"总请求数: {report.total_requests}",
f"拦截数: {report.blocked_requests} ({report.block_rate:.1%})",
f"合规评分: {report.compliance_score}/100",
"",
"Top 拦截原因:",
]
for reason, count in report.top_block_reasons:
lines.append(f" - {reason}: {count} 次")
return "\n".join(lines)
数据保留策略
| 数据类型 | 保留周期 | 存储方式 | 加密要求 | 访问权限 |
|---|---|---|---|---|
| 请求哈希 | 90 天 | 热存储 | AES-256 | 审计员 |
| 拦截记录 | 1 年 | 热存储 | AES-256 | 安全团队 |
| 合规报告 | 3 年 | 冷存储归档 | AES-256 | 管理层 |
| 用户行为统计 | 30 天 | 热存储 | 脱敏 | 运维 |
| 配置变更记录 | 永久 | 版本控制 | GPG 签名 | 管理员 |
合规框架对照
| 框架 | 关键要求 | 日志要求 | 报告频率 |
|---|---|---|---|
| GDPR | 数据最小化、被遗忘权 | 处理活动记录 | 年度 DPIA |
| SOC 2 | 安全性、可用性、隐私 | 完整审计追踪 | 年度审计 |
| ISO 27001 | 信息安全管理 | 访问日志 + 变更日志 | 年度认证 |
| AI Act (EU) | 风险分级、透明度 | 高风险系统完整日志 | 持续合规 |
| 中国《算法推荐规定》 | 算法透明、用户权益 | 用户行为 + 推荐日志 | 备案审查 |
本章小结
- 日志只存哈希——保护用户隐私,同时保留可追溯性
- 结构化审计条目——统一字段便于查询和报告
- 自动化报告生成——定期输出合规评分和关键指标
- 数据保留分级——不同类型数据不同保留周期和加密级别
- 对标合规框架——GDPR/SOC 2/ISO 27001/AI Act 各有不同要求
下一章:告警与事件响应