自动化监控与异常处理
自动化流程一旦投产,监控和异常处理比搭建更重要。没有监控的自动化就像无人驾驶没有传感器——迟早出事。
监控体系架构
graph TB
A[自动化流程] --> B[指标采集]
B --> C[状态检查]
C --> D{健康?}
D -->|是| E[记录日志]
D -->|否| F[触发告警]
F --> G[自动重试]
G --> H{恢复?}
H -->|是| E
H -->|否| I[人工介入]
style D fill:#fff9c4,stroke:#f9a825,stroke-width:2px
style F fill:#ffcdd2,stroke:#e53935,stroke-width:2px
style E fill:#c8e6c9,stroke:#43a047,stroke-width:2px
流程监控器
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
FAILING = "failing"
UNKNOWN = "unknown"
@dataclass
class FlowMetrics:
flow_name: str
success_count: int = 0
failure_count: int = 0
avg_duration_sec: float = 0.0
last_run: str = ""
last_error: str = ""
@property
def success_rate(self) -> float:
total = self.success_count + self.failure_count
return self.success_count / total if total > 0 else 0.0
@property
def health(self) -> HealthStatus:
if self.success_count + self.failure_count == 0:
return HealthStatus.UNKNOWN
rate = self.success_rate
if rate >= 0.95:
return HealthStatus.HEALTHY
elif rate >= 0.80:
return HealthStatus.DEGRADED
return HealthStatus.FAILING
class FlowMonitor:
"""自动化流程监控"""
def __init__(self):
self.flows: dict[str, FlowMetrics] = {}
self.alerts: list[dict] = []
def register_flow(self, name: str) -> None:
self.flows[name] = FlowMetrics(flow_name=name)
def record_run(
self,
flow_name: str,
success: bool,
duration_sec: float,
error: str = "",
) -> None:
"""记录一次运行结果"""
if flow_name not in self.flows:
self.register_flow(flow_name)
m = self.flows[flow_name]
if success:
m.success_count += 1
else:
m.failure_count += 1
m.last_error = error
# 滚动平均
total = m.success_count + m.failure_count
m.avg_duration_sec = (
(m.avg_duration_sec * (total - 1) + duration_sec) / total
)
m.last_run = datetime.now().strftime("%Y-%m-%d %H:%M")
# 检查是否需要告警
if m.health == HealthStatus.FAILING:
self._send_alert(flow_name, m)
def _send_alert(self, flow_name: str, metrics: FlowMetrics) -> None:
alert = {
"flow": flow_name,
"status": metrics.health.value,
"success_rate": f"{metrics.success_rate:.1%}",
"last_error": metrics.last_error,
"time": datetime.now().strftime("%Y-%m-%d %H:%M"),
}
self.alerts.append(alert)
def dashboard(self) -> str:
"""输出监控面板"""
lines = [
"| 流程 | 状态 | 成功率 | 平均耗时 | 最后运行 |",
"|------|------|--------|---------|---------|",
]
for m in self.flows.values():
lines.append(
f"| {m.flow_name} | {m.health.value} "
f"| {m.success_rate:.1%} | {m.avg_duration_sec:.1f}s "
f"| {m.last_run} |"
)
return "\n".join(lines)
异常重试策略
import time
from dataclasses import dataclass
@dataclass
class RetryConfig:
max_retries: int = 3
base_delay_sec: float = 1.0
backoff_factor: float = 2.0
max_delay_sec: float = 60.0
class RetryHandler:
"""指数退避重试"""
def __init__(self, config: RetryConfig | None = None):
self.config = config or RetryConfig()
def execute_with_retry(self, func, *args, **kwargs):
"""带重试执行"""
last_error = None
delay = self.config.base_delay_sec
for attempt in range(self.config.max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
if attempt < self.config.max_retries:
sleep_time = min(delay, self.config.max_delay_sec)
print(
f"重试 {attempt+1}/{self.config.max_retries},"
f"等待 {sleep_time:.1f}s..."
)
time.sleep(sleep_time)
delay *= self.config.backoff_factor
raise RuntimeError(
f"重试 {self.config.max_retries} 次后仍失败: {last_error}"
)
# 重试策略对比
RETRY_STRATEGIES = {
"固定间隔": RetryConfig(max_retries=3, base_delay_sec=5, backoff_factor=1.0),
"指数退避": RetryConfig(max_retries=5, base_delay_sec=1, backoff_factor=2.0),
"快速失败": RetryConfig(max_retries=1, base_delay_sec=0, backoff_factor=1.0),
"长时容忍": RetryConfig(max_retries=10, base_delay_sec=2, backoff_factor=1.5),
}
常见异常分类
graph TB
A[异常类型] --> B[暂时性]
A --> C[永久性]
A --> D[资源性]
B --> B1[网络超时]
B --> B2[API 限流]
B --> B3[服务重启]
C --> C1[认证过期]
C --> C2[权限不足]
C --> C3[数据格式错误]
D --> D1[磁盘满]
D --> D2[内存不足]
D --> D3[配额耗尽]
style B fill:#fff9c4,stroke:#f9a825,stroke-width:2px
style C fill:#ffcdd2,stroke:#e53935,stroke-width:2px
style D fill:#e1bee7,stroke:#8e24aa,stroke-width:2px
| 异常类型 | 处理策略 | 重试 | 告警 |
|---|---|---|---|
| 网络超时 | 指数退避重试 | ✅ | 连续3次再告警 |
| API 限流 | 等待重置窗口 | ✅ | 频繁则告警 |
| 认证过期 | 刷新 token | ❌ | 立即告警 |
| 数据格式错误 | 跳过 + 记录 | ❌ | 累积告警 |
| 磁盘满 | 清理空间 | ❌ | 立即告警 |
本章小结
- 三级健康状态——healthy ≥ 95% / degraded ≥ 80% / failing < 80%
- 指数退避重试——暂时性错误用 1s → 2s → 4s 递增等待
- 异常分类处理——暂时性重试、永久性告警、资源性清理
- 监控面板——一张表看全部流程的成功率和耗时
- 告警节制——不是每次失败都告警,连续失败才触发
下一章:跨系统集成实战