日志与审计
LLM 应用面临独特的日志挑战:输入输出非结构化、Token 计费需追踪、对话历史需关联、敏感数据需脱敏。
日志架构
graph TB
A[LLM 应用] --> B[结构化日志层]
B --> C[敏感数据脱敏]
C --> D[日志聚合]
D --> E[实时监控
Grafana / DataDog] D --> F[审计存储
S3 / 数据库] D --> G[分析查询
ClickHouse / ES] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px style C fill:#fff3e0,stroke:#f57c00,stroke-width:2px
Grafana / DataDog] D --> F[审计存储
S3 / 数据库] D --> G[分析查询
ClickHouse / ES] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px style C fill:#fff3e0,stroke:#f57c00,stroke-width:2px
结构化日志实现
"""
LLM 专用结构化日志
"""
import json
import time
import uuid
import re
import logging
from dataclasses import dataclass, field, asdict
from typing import Any
@dataclass
class LLMLogEntry:
"""LLM 调用日志条目"""
request_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: float = field(default_factory=time.time)
# 调用信息
model: str = ""
provider: str = ""
operation: str = "" # chat / completion / embedding
# 输入输出
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
# 性能
latency_ms: float = 0.0
ttft_ms: float = 0.0 # Time To First Token
# 结果
status: str = "success" # success / error / timeout
error_type: str = ""
error_message: str = ""
# 成本
estimated_cost_usd: float = 0.0
# 上下文
user_id: str = ""
session_id: str = ""
trace_id: str = ""
# 元数据
metadata: dict = field(default_factory=dict)
class LLMLogger:
"""LLM 调用日志器"""
# 常见 PII 模式
PII_PATTERNS = [
(re.compile(r'\b\d{3}-\d{2}-\d{4}\b'), '[SSN]'),
(re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), '[EMAIL]'),
(re.compile(r'\b\d{13,19}\b'), '[CARD]'),
(re.compile(r'\b1[3-9]\d{9}\b'), '[PHONE]'),
]
def __init__(
self,
logger_name: str = "llm",
enable_pii_masking: bool = True,
log_prompts: bool = False, # 生产环境默认不记录完整 prompt
):
self.logger = logging.getLogger(logger_name)
self.enable_pii_masking = enable_pii_masking
self.log_prompts = log_prompts
def log_request(self, entry: LLMLogEntry, prompt: str = "", response: str = ""):
"""记录一次 LLM 调用"""
log_data = asdict(entry)
if self.log_prompts:
log_data["prompt_preview"] = self._safe_text(prompt[:500])
log_data["response_preview"] = self._safe_text(response[:500])
self.logger.info(json.dumps(log_data, ensure_ascii=False))
def log_error(self, entry: LLMLogEntry, error: Exception):
"""记录错误"""
entry.status = "error"
entry.error_type = type(error).__name__
entry.error_message = str(error)[:200]
self.logger.error(json.dumps(asdict(entry), ensure_ascii=False))
def _safe_text(self, text: str) -> str:
"""脱敏处理"""
if not self.enable_pii_masking:
return text
for pattern, replacement in self.PII_PATTERNS:
text = pattern.sub(replacement, text)
return text
审计追踪
"""
LLM 审计追踪系统
"""
from dataclasses import dataclass, field
from enum import Enum
import time
import uuid
class AuditAction(Enum):
"""审计动作类型"""
MODEL_CALL = "model_call"
PROMPT_CHANGE = "prompt_change"
MODEL_SWITCH = "model_switch"
SAFETY_BLOCK = "safety_block"
COST_ALERT = "cost_alert"
DATA_ACCESS = "data_access"
@dataclass
class AuditRecord:
"""审计记录"""
audit_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: float = field(default_factory=time.time)
action: AuditAction = AuditAction.MODEL_CALL
actor: str = "" # 用户/系统
resource: str = "" # 模型/prompt/数据
details: dict = field(default_factory=dict)
ip_address: str = ""
result: str = "success"
class AuditTrail:
"""审计追踪管理器"""
def __init__(self, storage_backend=None):
self._records: list[AuditRecord] = []
self._storage = storage_backend
def record(self, action: AuditAction, actor: str, resource: str, **details):
"""记录审计事件"""
record = AuditRecord(
action=action,
actor=actor,
resource=resource,
details=details,
)
self._records.append(record)
if self._storage:
self._storage.save(record)
def query(
self,
action: AuditAction | None = None,
actor: str | None = None,
start_time: float | None = None,
end_time: float | None = None,
limit: int = 100,
) -> list[AuditRecord]:
"""查询审计记录"""
results = self._records
if action:
results = [r for r in results if r.action == action]
if actor:
results = [r for r in results if r.actor == actor]
if start_time:
results = [r for r in results if r.timestamp >= start_time]
if end_time:
results = [r for r in results if r.timestamp <= end_time]
return results[-limit:]
def get_cost_summary(self, hours: int = 24) -> dict:
"""获取指定时间段的成本摘要"""
cutoff = time.time() - hours * 3600
calls = [
r for r in self._records
if r.action == AuditAction.MODEL_CALL and r.timestamp >= cutoff
]
total_cost = sum(r.details.get("cost_usd", 0) for r in calls)
return {
"period_hours": hours,
"total_calls": len(calls),
"total_cost_usd": round(total_cost, 4),
"avg_cost_per_call": round(total_cost / len(calls), 6) if calls else 0,
}
合规日志对比
| 合规要求 | 需记录的内容 | 保留期限 | 注意事项 |
|---|---|---|---|
| GDPR | 数据访问记录 | 至少 3 年 | 需支持删除权 |
| SOC 2 | 全部访问日志 | 1 年 | 需防篡改 |
| HIPAA | PHI 访问记录 | 6 年 | 需加密存储 |
| PCI DSS | 支付相关日志 | 1 年 | 卡号不可出现在日志 |
日志分层策略
graph LR
A[日志分层] --> B[热存储 7天
Elasticsearch] A --> C[温存储 90天
对象存储] A --> D[冷存储 3年+
归档存储] B --> E[实时查询] C --> F[按需查询] D --> G[合规审计] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
Elasticsearch] A --> C[温存储 90天
对象存储] A --> D[冷存储 3年+
归档存储] B --> E[实时查询] C --> F[按需查询] D --> G[合规审计] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
本章小结
| 主题 | 要点 |
|---|---|
| 结构化日志 | JSON 格式,包含请求 ID、Token、延迟、成本 |
| PII 脱敏 | 正则匹配 + 替换,生产环境默认不记录 Prompt |
| 审计追踪 | 模型调用/Prompt 变更/安全拦截全部记录 |
| 分层存储 | 热 → 温 → 冷,按合规要求保留 |
下一章:数据隐私与合规