动态规则更新
High Contrast
Dark Mode
Light Mode
Sepia
Forest
2 min read318 words

动态规则更新

治理规则需要根据新威胁和业务变化动态调整,而不是每次都重新部署。本章构建热更新规则系统。

动态更新架构

graph TB A[规则变更来源] --> B{来源类型} B -->|管理后台| C[人工审批] B -->|自动检测| D[自动推荐] B -->|外部情报| E[威胁情报同步] C --> F[规则版本控制] D --> F E --> F F --> G[灰度发布] G --> H{效果验证} H -->|通过| I[全量生效] H -->|失败| J[自动回滚] style A fill:#e3f2fd,stroke:#1565c0,stroke-width:2px style I fill:#c8e6c9,stroke:#43a047,stroke-width:2px style J fill:#ffcdd2,stroke:#c62828,stroke-width:2px

规则版本管理

from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
import json
import copy
class RuleStatus(Enum):
DRAFT = "draft"
REVIEWING = "reviewing"
APPROVED = "approved"
ACTIVE = "active"
DEPRECATED = "deprecated"
ROLLED_BACK = "rolled_back"
@dataclass
class RuleVersion:
version: int
rule_id: str
config: dict
status: RuleStatus
created_at: str = ""
created_by: str = ""
changelog: str = ""
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now(timezone.utc).isoformat()
class RuleVersionControl:
"""规则版本控制系统"""
def __init__(self):
self.versions: dict[str, list[RuleVersion]] = {}
self.active_versions: dict[str, int] = {}
def create_version(
self,
rule_id: str,
config: dict,
created_by: str,
changelog: str = "",
) -> RuleVersion:
"""创建新版本"""
history = self.versions.setdefault(rule_id, [])
version_num = len(history) + 1
version = RuleVersion(
version=version_num,
rule_id=rule_id,
config=config,
status=RuleStatus.DRAFT,
created_by=created_by,
changelog=changelog,
)
history.append(version)
return version
def approve(self, rule_id: str, version: int) -> bool:
"""审批规则版本"""
rv = self._get_version(rule_id, version)
if rv and rv.status == RuleStatus.REVIEWING:
rv.status = RuleStatus.APPROVED
return True
return False
def activate(self, rule_id: str, version: int) -> bool:
"""激活规则版本"""
rv = self._get_version(rule_id, version)
if rv and rv.status == RuleStatus.APPROVED:
# 将当前激活版本标记为已弃用
current = self.active_versions.get(rule_id)
if current:
old_rv = self._get_version(rule_id, current)
if old_rv:
old_rv.status = RuleStatus.DEPRECATED
rv.status = RuleStatus.ACTIVE
self.active_versions[rule_id] = version
return True
return False
def rollback(self, rule_id: str) -> RuleVersion | None:
"""回滚到上一个活跃版本"""
history = self.versions.get(rule_id, [])
deprecated = [
v for v in history if v.status == RuleStatus.DEPRECATED
]
if not deprecated:
return None
# 回滚当前版本
current = self.active_versions.get(rule_id)
if current:
cv = self._get_version(rule_id, current)
if cv:
cv.status = RuleStatus.ROLLED_BACK
# 恢复上一个版本
prev = deprecated[-1]
prev.status = RuleStatus.ACTIVE
self.active_versions[rule_id] = prev.version
return prev
def _get_version(self, rule_id: str, version: int) -> RuleVersion | None:
history = self.versions.get(rule_id, [])
for v in history:
if v.version == version:
return v
return None

灰度发布策略

from dataclasses import dataclass
import hashlib
class GradualRollout:
"""规则灰度发布——逐步扩大生效范围"""
STAGES = [
(0.01, "1% 金丝雀"),
(0.10, "10% 小流量"),
(0.50, "50% 半量"),
(1.00, "100% 全量"),
]
def __init__(self, rule_id: str):
self.rule_id = rule_id
self.current_stage = 0
self.metrics: list[dict] = []
def should_apply(self, user_id: str) -> bool:
"""基于用户ID哈希决定是否命中灰度"""
ratio = self.STAGES[self.current_stage][0]
hash_val = int(
hashlib.md5(
f"{self.rule_id}:{user_id}".encode()
).hexdigest(), 16
)
return (hash_val % 10000) / 10000 < ratio
def record_metrics(self, false_positive: int, true_positive: int,
total: int) -> None:
"""记录当前阶段的指标"""
self.metrics.append({
"stage": self.STAGES[self.current_stage][1],
"false_positive_rate": false_positive / total if total > 0 else 0,
"true_positive_rate": true_positive / total if total > 0 else 0,
"total": total,
})
def can_promote(self) -> bool:
"""判断是否可以进入下一阶段"""
if not self.metrics:
return False
latest = self.metrics[-1]
return (
latest["false_positive_rate"] < 0.02 and
latest["true_positive_rate"] > 0.8 and
latest["total"] >= 100
)
def promote(self) -> str:
"""晋升到下一阶段"""
if not self.can_promote():
return "指标不达标,无法晋升"
if self.current_stage >= len(self.STAGES) - 1:
return "已经是全量"
self.current_stage += 1
return f"已晋升至: {self.STAGES[self.current_stage][1]}"

规则生命周期

graph LR A[Draft
草稿] --> B[Reviewing
审核中] B --> C[Approved
已批准] C --> D[Active
生效中] D --> E[Deprecated
已弃用] D --> F[Rolled Back
已回滚] F --> C style A fill:#e0e0e0,stroke:#616161,stroke-width:2px style B fill:#fff9c4,stroke:#f9a825,stroke-width:2px style C fill:#e3f2fd,stroke:#1565c0,stroke-width:2px style D fill:#c8e6c9,stroke:#43a047,stroke-width:2px style E fill:#e0e0e0,stroke:#616161,stroke-width:2px style F fill:#ffcdd2,stroke:#c62828,stroke-width:2px

规则更新策略对比

策略 变更延迟 风险等级 回滚能力 适用场景
立即全量 即时 需手动 紧急安全补丁
灰度发布 数小时~数天 自动回滚 常规规则更新
定时发布 下个维护窗口 计划内回滚 非紧急变更
A/B 测试 数天~数周 最低 随时切换 优化型变更
审批流程 1-3 天 版本回滚 合规类变更

本章小结

下一章:完整治理平台架构