监控与审计架构
High Contrast
Dark Mode
Light Mode
Sepia
Forest
2 min read312 words

监控与审计架构

完整的监控与审计系统是LLM治理平台的重要组成部分。

监控体系架构

graph TB A[监控体系] --> B[指标监控] A --> C[日志记录] A --> D[链路追踪] A --> E[审计追踪] B --> B1[系统指标] B --> B2[业务指标] B --> B3[安全指标] C --> C1[访问日志] C --> C2[错误日志] C --> C3[审计日志] D --> D1[请求追踪] D --> D2[性能分析] E --> E1[操作记录] E --> E2[合规审计] style A fill:#d4edda

核心指标设计

1. 系统指标

指标名称 说明 类型 单位
request_total 请求总数 Counter
request_duration 请求耗时 Histogram 毫秒
active_requests 活跃请求 Gauge
error_rate 错误率 Gauge %
queue_length 队列长度 Gauge

2. 业务指标

指标名称 说明 类型 单位
llm_requests_total LLM请求总数 Counter
llm_tokens_total Token消耗 Counter
audit_block_total 审核阻断数 Counter
content_pass_rate 内容通过率 Gauge %

3. 安全指标

指标名称 说明 类型 单位
injection_attempts 注入攻击尝试 Counter
jailbreak_attempts 越狱攻击尝试 Counter
violation_detected 违规检测数 Counter
risk_score_avg 平均风险评分 Gauge

实现监控指标

创建 core/metrics.py

from prometheus_client import Counter, Histogram, Gauge, Summary
from prometheus_client.metrics import MetricWrapperBase
from typing import Dict, List, Optional
from datetime import datetime
from loguru import logger
class MetricsCollector:
"""指标收集器"""
def __init__(self):
# 系统指标
self.request_total = Counter(
'llm_governance_requests_total',
'Total number of requests',
['endpoint', 'status']
)
self.request_duration = Histogram(
'llm_governance_request_duration_seconds',
'Request duration',
['endpoint'],
buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
self.active_requests = Gauge(
'llm_governance_active_requests',
'Current number of active requests'
)
self.error_rate = Gauge(
'llm_governance_error_rate',
'Current error rate'
)
# 业务指标
self.llm_requests = Counter(
'llm_governance_llm_requests_total',
'Total LLM requests',
['model', 'status']
)
self.llm_tokens = Counter(
'llm_governance_llm_tokens_total',
'Total tokens consumed',
['model', 'type']  # input, output
)
self.audit_block = Counter(
'llm_governance_audit_block_total',
'Total audit blocks',
['auditor', 'reason']
)
self.content_pass_rate = Gauge(
'llm_governance_content_pass_rate',
'Content pass rate',
['type']  # input, output
)
# 安全指标
self.injection_attempts = Counter(
'llm_governance_injection_attempts_total',
'Total prompt injection attempts',
['type']
)
self.jailbreak_attempts = Counter(
'llm_governance_jailbreak_attempts_total',
'Total jailbreak attempts',
['pattern']
)
self.violation_detected = Counter(
'llm_governance_violation_detected_total',
'Total violations detected',
['category', 'severity']
)
self.risk_score = Histogram(
'llm_governance_risk_score',
'Content risk score',
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)
# 统计
self._request_counts = {'total': 0, 'success': 0, 'error': 0}
def record_request(self, endpoint: str, status: str, duration: float):
"""记录请求"""
self.request_total.labels(endpoint=endpoint, status=status).inc()
self.request_duration.labels(endpoint=endpoint).observe(duration)
self._request_counts['total'] += 1
if status == 'success':
self._request_counts['success'] += 1
else:
self._request_counts['error'] += 1
# 更新错误率
if self._request_counts['total'] > 0:
error_rate = self._request_counts['error'] / self._request_counts['total']
self.error_rate.set(error_rate)
def record_llm_request(self, model: str, status: str, input_tokens: int, output_tokens: int):
"""记录LLM请求"""
self.llm_requests.labels(model=model, status=status).inc()
self.llm_tokens.labels(model=model, type='input').inc(input_tokens)
self.llm_tokens.labels(model=model, type='output').inc(output_tokens)
def record_audit_block(self, auditor: str, reason: str):
"""记录审核阻断"""
self.audit_block.labels(auditor=auditor, reason=reason).inc()
def record_pass_rate(self, content_type: str, rate: float):
"""记录通过率"""
self.content_pass_rate.labels(type=content_type).set(rate)
def record_injection(self, injection_type: str):
"""记录注入攻击"""
self.injection_attempts.labels(type=injection_type).inc()
def record_jailbreak(self, pattern: str):
"""记录越狱攻击"""
self.jailbreak_attempts.labels(pattern=pattern).inc()
def record_violation(self, category: str, severity: str):
"""记录违规"""
self.violation_detected.labels(category=category, severity=severity).inc()
def record_risk_score(self, score: float):
"""记录风险评分"""
self.risk_score.observe(score)
def increment_active(self):
"""增加活跃请求"""
self.active_requests.inc()
def decrement_active(self):
"""减少活跃请求"""
self.active_requests.dec()
def get_summary(self) -> Dict:
"""获取摘要"""
return {
'timestamp': datetime.now().isoformat(),
'requests': self._request_counts,
'active': self.active_requests._value.get(),
'error_rate': self.error_rate._value.get()
}

实现审计日志系统

创建 core/audit_log.py

import json
import asyncio
from datetime import datetime
from typing import Dict, List, Optional, Any
from pathlib import Path
from loguru import logger
import aiofiles
class AuditLogger:
"""审计日志记录器"""
def __init__(self, log_dir: str = "./logs/audit"):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self.current_date = datetime.now().strftime("%Y-%m-%d")
self.current_file = self._get_log_file(self.current_date)
# 日志缓冲区
self.buffer: List[Dict] = []
self.buffer_size = 100
self._flush_interval = 5  # 秒
# 启动刷新任务
self._flush_task = None
def _get_log_file(self, date: str) -> Path:
"""获取日志文件路径"""
return self.log_dir / f"audit_{date}.jsonl"
async def start(self):
"""启动日志系统"""
self._flush_task = asyncio.create_task(self._auto_flush())
logger.info("✅ 审计日志系统启动")
async def stop(self):
"""停止日志系统"""
if self._flush_task:
self._flush_task.cancel()
await self._flush()
async def _auto_flush(self):
"""自动刷新缓冲区"""
while True:
try:
await asyncio.sleep(self._flush_interval)
await self._flush()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"❌ 自动刷新失败: {e}")
async def _flush(self):
"""刷新缓冲区到文件"""
if not self.buffer:
return
# 检查日期变化
new_date = datetime.now().strftime("%Y-%m-%d")
if new_date != self.current_date:
await self._flush()
self.current_date = new_date
self.current_file = self._get_log_file(new_date)
# 写入文件
try:
async with aiofiles.open(self.current_file, 'a', encoding='utf-8') as f:
for entry in self.buffer:
await f.write(json.dumps(entry, ensure_ascii=False) + '\n')
logger.debug(f"✅ 写入了 {len(self.buffer)} 条审计日志")
self.buffer.clear()
except Exception as e:
logger.error(f"❌ 写入审计日志失败: {e}")
async def log_request(self, request_id: str, data: Dict[str, Any]):
"""记录请求日志"""
entry = {
'timestamp': datetime.now().isoformat(),
'type': 'request',
'request_id': request_id,
**data
}
self.buffer.append(entry)
if len(self.buffer) >= self.buffer_size:
await self._flush()
async def log_audit(self, request_id: str, auditor: str, result: Dict[str, Any]):
"""记录审核日志"""
entry = {
'timestamp': datetime.now().isoformat(),
'type': 'audit',
'request_id': request_id,
'auditor': auditor,
'result': result
}
self.buffer.append(entry)
async def log_security(self, request_id: str, scanner: str, result: Dict[str, Any]):
"""记录安全扫描日志"""
entry = {
'timestamp': datetime.now().isoformat(),
'type': 'security',
'request_id': request_id,
'scanner': scanner,
'result': result
}
self.buffer.append(entry)
async def log_response(self, request_id: str, response: Dict[str, Any]):
"""记录响应日志"""
entry = {
'timestamp': datetime.now().isoformat(),
'type': 'response',
'request_id': request_id,
**response
}
self.buffer.append(entry)
async def log_violation(self, request_id: str, violation: Dict[str, Any]):
"""记录违规日志"""
entry = {
'timestamp': datetime.now().isoformat(),
'type': 'violation',
'request_id': request_id,
**violation
}
self.buffer.append(entry)
async def log_operation(self, operator: str, action: str, details: Dict[str, Any]):
"""记录操作日志"""
entry = {
'timestamp': datetime.now().isoformat(),
'type': 'operation',
'operator': operator,
'action': action,
'details': details
}
self.buffer.append(entry)
async def query(
self,
start_date: str,
end_date: str,
filters: Optional[Dict] = None
) -> List[Dict]:
"""
查询审计日志
Args:
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
filters: 过滤条件
Returns:
日志条目列表
"""
results = []
current = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
while current <= end:
date_str = current.strftime("%Y-%m-%d")
log_file = self._get_log_file(date_str)
if log_file.exists():
async with aiofiles.open(log_file, 'r', encoding='utf-8') as f:
async for line in f:
entry = json.loads(line.strip())
# 应用过滤器
if filters:
match = True
for key, value in filters.items():
if entry.get(key) != value:
match = False
break
if match:
results.append(entry)
else:
results.append(entry)
current += timedelta(days=1)
return results

实现告警系统

创建 core/alert_manager.py

from typing import Dict, List, Callable, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
from loguru import logger
@dataclass
class AlertRule:
"""告警规则"""
name: str
metric: str
condition: str  # >, <, >=, <=, ==
threshold: float
severity: str  # low, medium, high, critical
duration: int  # 持续时间(秒)
class AlertManager:
"""告警管理器"""
def __init__(self):
self.rules: List[AlertRule] = []
self.active_alerts: Dict[str, datetime] = {}
self.alert_handlers: List[Callable] = []
self._load_default_rules()
def _load_default_rules(self):
"""加载默认告警规则"""
default_rules = [
AlertRule(
name="high_error_rate",
metric="error_rate",
condition=">",
threshold=0.1,  # 10%
severity="high",
duration=60
),
AlertRule(
name="too_many_injections",
metric="injection_attempts",
condition=">",
threshold=10,
severity="critical",
duration=300
),
AlertRule(
name="high_risk_score",
metric="risk_score_avg",
condition=">",
threshold=0.8,
severity="medium",
duration=120
)
]
self.rules.extend(default_rules)
logger.info(f"✅ 加载了 {len(default_rules)} 个默认告警规则")
def add_rule(self, rule: AlertRule):
"""添加告警规则"""
self.rules.append(rule)
logger.info(f"✅ 添加告警规则: {rule.name}")
def remove_rule(self, name: str) -> bool:
"""删除告警规则"""
original_count = len(self.rules)
self.rules = [r for r in self.rules if r.name != name]
if len(self.rules) < original_count:
logger.info(f"✅ 删除告警规则: {name}")
return True
return False
def add_handler(self, handler: Callable):
"""添加告警处理器"""
self.alert_handlers.append(handler)
async def check_rules(self, metrics: Dict[str, float]):
"""
检查告警规则
Args:
metrics: 当前指标值
"""
for rule in self.rules:
metric_value = metrics.get(rule.metric, 0)
# 评估条件
should_alert = self._evaluate_condition(
metric_value,
rule.condition,
rule.threshold
)
alert_key = f"{rule.name}:{rule.metric}"
if should_alert:
# 记录告警开始时间
if alert_key not in self.active_alerts:
self.active_alerts[alert_key] = datetime.now()
# 检查是否持续足够长时间
duration = (datetime.now() - self.active_alerts[alert_key]).total_seconds()
if duration >= rule.duration:
await self._trigger_alert(rule, metric_value)
else:
# 清除告警
if alert_key in self.active_alerts:
del self.active_alerts[alert_key]
def _evaluate_condition(
self,
value: float,
condition: str,
threshold: float
) -> bool:
"""评估条件"""
if condition == ">":
return value > threshold
elif condition == "<":
return value < threshold
elif condition == ">=":
return value >= threshold
elif condition == "<=":
return value <= threshold
elif condition == "==":
return value == threshold
else:
return False
async def _trigger_alert(self, rule: AlertRule, current_value: float):
"""触发告警"""
alert = {
'name': rule.name,
'metric': rule.metric,
'threshold': rule.threshold,
'current_value': current_value,
'severity': rule.severity,
'timestamp': datetime.now().isoformat()
}
logger.warning(f"🚨 触发告警: {rule.name} (当前值: {current_value}, 阈值: {rule.threshold})")
# 调用处理器
for handler in self.alert_handlers:
try:
await handler(alert)
except Exception as e:
logger.error(f"❌ 告警处理器异常: {e}")
# 默认告警处理器
async def email_alert_handler(alert: Dict):
"""邮件告警处理器"""
# 这里实现邮件发送逻辑
logger.info(f"📧 发送告警邮件: {alert['name']}")
async def webhook_alert_handler(alert: Dict):
"""Webhook告警处理器"""
# 这里实现Webhook调用逻辑
logger.info(f"🔗 调用告警Webhook: {alert['name']}")

使用示例

from core.metrics import MetricsCollector
from core.audit_log import AuditLogger
from core.alert_manager import AlertManager
import asyncio
async def main():
# 初始化
metrics = MetricsCollector()
audit = AuditLogger()
alerts = AlertManager()
# 启动审计日志
await audit.start()
# 添加告警处理器
alerts.add_handler(email_alert_handler)
alerts.add_handler(webhook_alert_handler)
# 模拟请求
metrics.record_request("/chat", "success", 0.5)
await audit.log_request("req-001", {"user": "test", "content": "hello"})
# 检查告警
await alerts.check_rules(metrics.get_summary())
# 停止
await audit.stop()
asyncio.run(main())

学习要点

✅ 设计了完整的监控指标体系 ✅ 实现了审计日志系统 ✅ 创建了告警规则引擎 ✅ 实现了异步日志刷新 ✅ 支持日志查询和过滤


下一步: 实现 监控指标收集 📊