动态规则更新
治理规则需要根据新威胁和业务变化动态调整,而不是每次都重新部署。本章构建热更新规则系统。
动态更新架构
graph TB
A[规则变更来源] --> B{来源类型}
B -->|管理后台| C[人工审批]
B -->|自动检测| D[自动推荐]
B -->|外部情报| E[威胁情报同步]
C --> F[规则版本控制]
D --> F
E --> F
F --> G[灰度发布]
G --> H{效果验证}
H -->|通过| I[全量生效]
H -->|失败| J[自动回滚]
style A fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style I fill:#c8e6c9,stroke:#43a047,stroke-width:2px
style J fill:#ffcdd2,stroke:#c62828,stroke-width:2px
规则版本管理
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
import json
import copy
class RuleStatus(Enum):
DRAFT = "draft"
REVIEWING = "reviewing"
APPROVED = "approved"
ACTIVE = "active"
DEPRECATED = "deprecated"
ROLLED_BACK = "rolled_back"
@dataclass
class RuleVersion:
version: int
rule_id: str
config: dict
status: RuleStatus
created_at: str = ""
created_by: str = ""
changelog: str = ""
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now(timezone.utc).isoformat()
class RuleVersionControl:
"""规则版本控制系统"""
def __init__(self):
self.versions: dict[str, list[RuleVersion]] = {}
self.active_versions: dict[str, int] = {}
def create_version(
self,
rule_id: str,
config: dict,
created_by: str,
changelog: str = "",
) -> RuleVersion:
"""创建新版本"""
history = self.versions.setdefault(rule_id, [])
version_num = len(history) + 1
version = RuleVersion(
version=version_num,
rule_id=rule_id,
config=config,
status=RuleStatus.DRAFT,
created_by=created_by,
changelog=changelog,
)
history.append(version)
return version
def approve(self, rule_id: str, version: int) -> bool:
"""审批规则版本"""
rv = self._get_version(rule_id, version)
if rv and rv.status == RuleStatus.REVIEWING:
rv.status = RuleStatus.APPROVED
return True
return False
def activate(self, rule_id: str, version: int) -> bool:
"""激活规则版本"""
rv = self._get_version(rule_id, version)
if rv and rv.status == RuleStatus.APPROVED:
# 将当前激活版本标记为已弃用
current = self.active_versions.get(rule_id)
if current:
old_rv = self._get_version(rule_id, current)
if old_rv:
old_rv.status = RuleStatus.DEPRECATED
rv.status = RuleStatus.ACTIVE
self.active_versions[rule_id] = version
return True
return False
def rollback(self, rule_id: str) -> RuleVersion | None:
"""回滚到上一个活跃版本"""
history = self.versions.get(rule_id, [])
deprecated = [
v for v in history if v.status == RuleStatus.DEPRECATED
]
if not deprecated:
return None
# 回滚当前版本
current = self.active_versions.get(rule_id)
if current:
cv = self._get_version(rule_id, current)
if cv:
cv.status = RuleStatus.ROLLED_BACK
# 恢复上一个版本
prev = deprecated[-1]
prev.status = RuleStatus.ACTIVE
self.active_versions[rule_id] = prev.version
return prev
def _get_version(self, rule_id: str, version: int) -> RuleVersion | None:
history = self.versions.get(rule_id, [])
for v in history:
if v.version == version:
return v
return None
灰度发布策略
from dataclasses import dataclass
import hashlib
class GradualRollout:
"""规则灰度发布——逐步扩大生效范围"""
STAGES = [
(0.01, "1% 金丝雀"),
(0.10, "10% 小流量"),
(0.50, "50% 半量"),
(1.00, "100% 全量"),
]
def __init__(self, rule_id: str):
self.rule_id = rule_id
self.current_stage = 0
self.metrics: list[dict] = []
def should_apply(self, user_id: str) -> bool:
"""基于用户ID哈希决定是否命中灰度"""
ratio = self.STAGES[self.current_stage][0]
hash_val = int(
hashlib.md5(
f"{self.rule_id}:{user_id}".encode()
).hexdigest(), 16
)
return (hash_val % 10000) / 10000 < ratio
def record_metrics(self, false_positive: int, true_positive: int,
total: int) -> None:
"""记录当前阶段的指标"""
self.metrics.append({
"stage": self.STAGES[self.current_stage][1],
"false_positive_rate": false_positive / total if total > 0 else 0,
"true_positive_rate": true_positive / total if total > 0 else 0,
"total": total,
})
def can_promote(self) -> bool:
"""判断是否可以进入下一阶段"""
if not self.metrics:
return False
latest = self.metrics[-1]
return (
latest["false_positive_rate"] < 0.02 and
latest["true_positive_rate"] > 0.8 and
latest["total"] >= 100
)
def promote(self) -> str:
"""晋升到下一阶段"""
if not self.can_promote():
return "指标不达标,无法晋升"
if self.current_stage >= len(self.STAGES) - 1:
return "已经是全量"
self.current_stage += 1
return f"已晋升至: {self.STAGES[self.current_stage][1]}"
规则生命周期
graph LR
A[Draft
草稿] --> B[Reviewing
审核中] B --> C[Approved
已批准] C --> D[Active
生效中] D --> E[Deprecated
已弃用] D --> F[Rolled Back
已回滚] F --> C style A fill:#e0e0e0,stroke:#616161,stroke-width:2px style B fill:#fff9c4,stroke:#f9a825,stroke-width:2px style C fill:#e3f2fd,stroke:#1565c0,stroke-width:2px style D fill:#c8e6c9,stroke:#43a047,stroke-width:2px style E fill:#e0e0e0,stroke:#616161,stroke-width:2px style F fill:#ffcdd2,stroke:#c62828,stroke-width:2px
草稿] --> B[Reviewing
审核中] B --> C[Approved
已批准] C --> D[Active
生效中] D --> E[Deprecated
已弃用] D --> F[Rolled Back
已回滚] F --> C style A fill:#e0e0e0,stroke:#616161,stroke-width:2px style B fill:#fff9c4,stroke:#f9a825,stroke-width:2px style C fill:#e3f2fd,stroke:#1565c0,stroke-width:2px style D fill:#c8e6c9,stroke:#43a047,stroke-width:2px style E fill:#e0e0e0,stroke:#616161,stroke-width:2px style F fill:#ffcdd2,stroke:#c62828,stroke-width:2px
规则更新策略对比
| 策略 | 变更延迟 | 风险等级 | 回滚能力 | 适用场景 |
|---|---|---|---|---|
| 立即全量 | 即时 | 高 | 需手动 | 紧急安全补丁 |
| 灰度发布 | 数小时~数天 | 低 | 自动回滚 | 常规规则更新 |
| 定时发布 | 下个维护窗口 | 中 | 计划内回滚 | 非紧急变更 |
| A/B 测试 | 数天~数周 | 最低 | 随时切换 | 优化型变更 |
| 审批流程 | 1-3 天 | 低 | 版本回滚 | 合规类变更 |
本章小结
- 版本控制——每次规则变更生成新版本,保留完整历史
- 审批流程——草稿 → 审核 → 批准 → 激活,防止未经审核的变更
- 灰度发布——1% → 10% → 50% → 100% 逐步扩大
- 自动回滚——指标不达标时自动回滚到上一个版本
- 用户哈希分桶——确保灰度分配一致且均匀
下一章:完整治理平台架构