告警与事件响应
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read295 words

告警与事件响应

当治理系统检测到异常时,需要快速告警并执行标准化的事件响应流程。

告警与响应流程

graph TB A[异常检测] --> B[告警分级] B --> C{严重级别} C -->|P0 紧急| D[自动熔断 + 即时通知] C -->|P1 高| E[通知安全团队] C -->|P2 中| F[工单系统] C -->|P3 低| G[日志记录] D --> H[事件响应团队] E --> H H --> I[调查分析] I --> J[修复措施] J --> K[复盘总结] K --> L[更新规则] style D fill:#ffcdd2,stroke:#c62828,stroke-width:2px style E fill:#ffe0b2,stroke:#ef6c00,stroke-width:2px style F fill:#fff9c4,stroke:#f9a825,stroke-width:2px style G fill:#e0e0e0,stroke:#616161,stroke-width:2px

告警引擎

from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timezone
from typing import Callable
class Severity(Enum):
P0_CRITICAL = 0
P1_HIGH = 1
P2_MEDIUM = 2
P3_LOW = 3
class AlertType(Enum):
JAILBREAK_BURST = "越狱攻击突增"
HIGH_BLOCK_RATE = "拦截率异常升高"
DATA_EXFILTRATION = "数据泄露尝试"
MODEL_DEGRADATION = "模型质量下降"
COST_SPIKE = "成本突增"
LATENCY_SPIKE = "延迟飙升"
@dataclass
class Alert:
alert_type: AlertType
severity: Severity
message: str
timestamp: str = ""
source: str = ""
metric_value: float = 0.0
threshold: float = 0.0
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now(timezone.utc).isoformat()
@dataclass
class AlertRule:
"""告警规则定义"""
name: str
alert_type: AlertType
severity: Severity
condition: Callable[[float], bool]
message_template: str
cooldown_seconds: int = 300   # 同类告警冷却期
class AlertEngine:
"""告警引擎——基于规则的实时告警"""
def __init__(self):
self.rules: list[AlertRule] = []
self.alerts: list[Alert] = []
self.last_fired: dict[str, float] = {}
self.handlers: dict[Severity, list[Callable[[Alert], None]]] = {
s: [] for s in Severity
}
def add_rule(self, rule: AlertRule) -> None:
self.rules.append(rule)
def register_handler(
self, severity: Severity, handler: Callable[[Alert], None]
) -> None:
"""注册告警处理器(按严重级别)"""
self.handlers[severity].append(handler)
def evaluate(self, metric_name: str, value: float) -> list[Alert]:
"""评估指标值,触发匹配的告警规则"""
triggered = []
now = datetime.now(timezone.utc).timestamp()
for rule in self.rules:
if rule.name != metric_name:
continue
if not rule.condition(value):
continue
# 冷却期检查
last = self.last_fired.get(rule.name, 0)
if now - last < rule.cooldown_seconds:
continue
alert = Alert(
alert_type=rule.alert_type,
severity=rule.severity,
message=rule.message_template.format(value=value),
source=metric_name,
metric_value=value,
threshold=0,
)
self.alerts.append(alert)
self.last_fired[rule.name] = now
triggered.append(alert)
# 调用对应级别的处理器
for handler in self.handlers[rule.severity]:
handler(alert)
return triggered
# 配置示例
engine = AlertEngine()
engine.add_rule(AlertRule(
name="block_rate",
alert_type=AlertType.HIGH_BLOCK_RATE,
severity=Severity.P1_HIGH,
condition=lambda v: v > 0.15,
message_template="拦截率达到 {value:.1%},超过 15% 阈值",
cooldown_seconds=600,
))
engine.add_rule(AlertRule(
name="jailbreak_count_5min",
alert_type=AlertType.JAILBREAK_BURST,
severity=Severity.P0_CRITICAL,
condition=lambda v: v > 50,
message_template="5分钟内越狱尝试 {value:.0f} 次,触发紧急告警",
cooldown_seconds=300,
))

事件响应流程

from dataclasses import dataclass
from enum import Enum
class IncidentStatus(Enum):
OPENED = "opened"
INVESTIGATING = "investigating"
MITIGATING = "mitigating"
RESOLVED = "resolved"
POST_MORTEM = "post_mortem"
@dataclass
class Incident:
id: str
alert: Alert
status: IncidentStatus
assignee: str = ""
timeline: list[str] = field(default_factory=list)
root_cause: str = ""
resolution: str = ""
def update_status(self, new_status: IncidentStatus, note: str = "") -> None:
self.status = new_status
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
self.timeline.append(f"[{ts}] {new_status.value}: {note}")
class IncidentManager:
"""事件管理器"""
PLAYBOOKS: dict[AlertType, list[str]] = {
AlertType.JAILBREAK_BURST: [
"1. 启用严格模式(提高检测灵敏度)",
"2. 临时限制可疑用户",
"3. 检查攻击模式是否为新型",
"4. 更新防御规则",
"5. 通知安全团队复查",
],
AlertType.DATA_EXFILTRATION: [
"1. 立即阻断相关会话",
"2. 评估泄露范围",
"3. 通知数据保护官(DPO)",
"4. 启动数据泄露应急预案",
"5. 72小时内向监管机构报告(GDPR)",
],
AlertType.HIGH_BLOCK_RATE: [
"1. 检查是否为误判导致的批量拦截",
"2. 分析被拦截请求的模式",
"3. 如果是误判,临时降低阈值",
"4. 更新关键词库或模型",
],
}
def __init__(self):
self.incidents: dict[str, Incident] = {}
self._counter = 0
def create_incident(self, alert: Alert) -> Incident:
self._counter += 1
incident_id = f"INC-{self._counter:04d}"
incident = Incident(
id=incident_id,
alert=alert,
status=IncidentStatus.OPENED,
)
incident.update_status(
IncidentStatus.OPENED,
f"由告警 {alert.alert_type.value} 自动创建"
)
self.incidents[incident_id] = incident
return incident
def get_playbook(self, alert_type: AlertType) -> list[str]:
"""获取标准化响应流程"""
return self.PLAYBOOKS.get(alert_type, ["无标准流程,请人工评估"])

告警级别定义

级别 响应时间 通知方式 自动操作 示例场景
P0 紧急 5 分钟 电话 + 短信 + 即时通讯 自动熔断 大规模数据泄露
P1 高 30 分钟 短信 + 即时通讯 限流 越狱攻击突增
P2 中 4 小时 即时通讯 + 邮件 记录 拦截率异常
P3 低 24 小时 邮件 仅日志 单次可疑请求

事件响应成熟度

graph LR A[Level 1
被动响应] --> B[Level 2
告警+手动] B --> C[Level 3
Playbook标准化] C --> D[Level 4
自动化响应] D --> E[Level 5
预测性防御] style A fill:#ffcdd2,stroke:#c62828,stroke-width:2px style C fill:#fff9c4,stroke:#f9a825,stroke-width:2px style E fill:#c8e6c9,stroke:#43a047,stroke-width:2px

本章小结

下一章:规则引擎设计