审计日志与合规报告
High Contrast
Dark Mode
Light Mode
Sepia
Forest
2 min read433 words

审计日志与合规报告

完善的审计日志是 LLM 治理合规的基石。本章构建结构化日志系统和自动化合规报告生成器。

审计日志架构

graph TB A[LLM 请求] --> B[请求拦截器] B --> C[日志采集] C --> D[结构化处理] D --> E[日志存储] E --> F[实时查询] E --> G[合规报告] E --> H[告警触发] F --> I[审计面板] G --> J[定期导出] H --> K[事件响应] style A fill:#e3f2fd,stroke:#1565c0,stroke-width:2px style E fill:#fff9c4,stroke:#f9a825,stroke-width:2px style I fill:#c8e6c9,stroke:#43a047,stroke-width:2px

结构化审计日志

from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
import json
import hashlib
class AuditAction(Enum):
REQUEST = "request"
RESPONSE = "response"
BLOCK = "block"
OVERRIDE = "override"
CONFIG_CHANGE = "config_change"
ACCESS_GRANT = "access_grant"
class SensitivityLevel(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
@dataclass
class AuditLogEntry:
timestamp: str
action: AuditAction
user_id: str
session_id: str
model_id: str
input_hash: str          # 不存原文,只存哈希
output_hash: str
token_count: int
latency_ms: float
sensitivity: SensitivityLevel
blocked: bool = False
block_reason: str = ""
metadata: dict = field(default_factory=dict)
def to_json(self) -> str:
return json.dumps({
"timestamp": self.timestamp,
"action": self.action.value,
"user_id": self.user_id,
"session_id": self.session_id,
"model_id": self.model_id,
"input_hash": self.input_hash,
"output_hash": self.output_hash,
"token_count": self.token_count,
"latency_ms": self.latency_ms,
"sensitivity": self.sensitivity.value,
"blocked": self.blocked,
"block_reason": self.block_reason,
"metadata": self.metadata,
}, ensure_ascii=False)
class AuditLogger:
"""审计日志记录器"""
def __init__(self):
self.entries: list[AuditLogEntry] = []
@staticmethod
def hash_content(content: str) -> str:
"""对内容做单向哈希——保护隐私同时保留可追溯性"""
return hashlib.sha256(content.encode()).hexdigest()[:16]
def log_request(
self,
user_id: str,
session_id: str,
model_id: str,
input_text: str,
output_text: str,
token_count: int,
latency_ms: float,
sensitivity: SensitivityLevel = SensitivityLevel.INTERNAL,
blocked: bool = False,
block_reason: str = "",
) -> AuditLogEntry:
entry = AuditLogEntry(
timestamp=datetime.now(timezone.utc).isoformat(),
action=AuditAction.BLOCK if blocked else AuditAction.REQUEST,
user_id=user_id,
session_id=session_id,
model_id=model_id,
input_hash=self.hash_content(input_text),
output_hash=self.hash_content(output_text),
token_count=token_count,
latency_ms=latency_ms,
sensitivity=sensitivity,
blocked=blocked,
block_reason=block_reason,
)
self.entries.append(entry)
return entry
def query(
self,
user_id: str | None = None,
action: AuditAction | None = None,
start_time: str | None = None,
end_time: str | None = None,
) -> list[AuditLogEntry]:
"""查询审计日志"""
results = self.entries
if user_id:
results = [e for e in results if e.user_id == user_id]
if action:
results = [e for e in results if e.action == action]
if start_time:
results = [e for e in results if e.timestamp >= start_time]
if end_time:
results = [e for e in results if e.timestamp <= end_time]
return results

合规报告生成器

from dataclasses import dataclass
@dataclass
class ComplianceReport:
period: str
total_requests: int
blocked_requests: int
block_rate: float
top_block_reasons: list[tuple[str, int]]
user_activity: dict[str, int]
model_usage: dict[str, int]
sensitivity_distribution: dict[str, int]
compliance_score: float
class ReportGenerator:
"""自动化合规报告生成器"""
def __init__(self, logger: AuditLogger):
self.logger = logger
def generate(self, period: str = "monthly") -> ComplianceReport:
"""生成合规报告"""
entries = self.logger.entries
total = len(entries)
blocked = [e for e in entries if e.blocked]
# 统计拦截原因
reason_counts: dict[str, int] = {}
for e in blocked:
reason = e.block_reason or "unknown"
reason_counts[reason] = reason_counts.get(reason, 0) + 1
top_reasons = sorted(
reason_counts.items(), key=lambda x: x[1], reverse=True
)[:5]
# 用户活跃度
user_counts: dict[str, int] = {}
for e in entries:
user_counts[e.user_id] = user_counts.get(e.user_id, 0) + 1
# 模型使用分布
model_counts: dict[str, int] = {}
for e in entries:
model_counts[e.model_id] = model_counts.get(e.model_id, 0) + 1
# 敏感度分布
sens_counts: dict[str, int] = {}
for e in entries:
key = e.sensitivity.value
sens_counts[key] = sens_counts.get(key, 0) + 1
# 合规评分: 拦截率越低越好,但零拦截可能意味着检测不足
block_rate = len(blocked) / total if total > 0 else 0
if block_rate < 0.01:
score = 85.0   # 可能检测不足
elif block_rate < 0.05:
score = 95.0   # 理想范围
elif block_rate < 0.15:
score = 80.0   # 正常偏高
else:
score = 60.0   # 需要关注
return ComplianceReport(
period=period,
total_requests=total,
blocked_requests=len(blocked),
block_rate=block_rate,
top_block_reasons=top_reasons,
user_activity=user_counts,
model_usage=model_counts,
sensitivity_distribution=sens_counts,
compliance_score=score,
)
def format_text(self, report: ComplianceReport) -> str:
"""输出文本格式报告"""
lines = [
f"=== LLM 治理合规报告 ({report.period}) ===",
f"总请求数: {report.total_requests}",
f"拦截数: {report.blocked_requests} ({report.block_rate:.1%})",
f"合规评分: {report.compliance_score}/100",
"",
"Top 拦截原因:",
]
for reason, count in report.top_block_reasons:
lines.append(f"  - {reason}: {count} 次")
return "\n".join(lines)

数据保留策略

数据类型 保留周期 存储方式 加密要求 访问权限
请求哈希 90 天 热存储 AES-256 审计员
拦截记录 1 年 热存储 AES-256 安全团队
合规报告 3 年 冷存储归档 AES-256 管理层
用户行为统计 30 天 热存储 脱敏 运维
配置变更记录 永久 版本控制 GPG 签名 管理员

合规框架对照

框架 关键要求 日志要求 报告频率
GDPR 数据最小化、被遗忘权 处理活动记录 年度 DPIA
SOC 2 安全性、可用性、隐私 完整审计追踪 年度审计
ISO 27001 信息安全管理 访问日志 + 变更日志 年度认证
AI Act (EU) 风险分级、透明度 高风险系统完整日志 持续合规
中国《算法推荐规定》 算法透明、用户权益 用户行为 + 推荐日志 备案审查

本章小结

下一章:告警与事件响应