数据管道与仪表盘
High Contrast
Dark Mode
Light Mode
Sepia
Forest
2 min read333 words

数据管道与仪表盘

手动整理数据做报表是最消耗时间的办公活动之一。自动化数据管道 + 仪表盘可以让报表"自己跑"。

自动化数据管道架构

graph LR A[数据源] --> B[采集层] B --> C[清洗/转换] C --> D[存储] D --> E[可视化] A1[Excel] --> B A2[API] --> B A3[数据库] --> B A4[表单] --> B E --> E1[仪表盘] E --> E2[定时报告] E --> E3[告警] style C fill:#fff9c4,stroke:#f9a825,stroke-width:2px style D fill:#e3f2fd,stroke:#1565c0,stroke-width:2px style E fill:#c8e6c9,stroke:#43a047,stroke-width:2px

轻量数据管道

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import csv
import io
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
@dataclass
class PipelineStep:
name: str
transform_fn: str   # 转换函数名(描述用)
status: StepStatus = StepStatus.PENDING
row_count_in: int = 0
row_count_out: int = 0
error: str = ""
@dataclass
class DataPipeline:
"""轻量 ETL 数据管道"""
name: str
steps: list[PipelineStep] = field(default_factory=list)
created_at: str = field(
default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M")
)
def add_step(self, name: str, transform_fn: str) -> None:
self.steps.append(PipelineStep(name=name, transform_fn=transform_fn))
def run(self, data: list[dict]) -> list[dict]:
"""依次执行管道步骤"""
current = data
for step in self.steps:
step.status = StepStatus.RUNNING
step.row_count_in = len(current)
try:
current = self._apply_step(step, current)
step.row_count_out = len(current)
step.status = StepStatus.SUCCESS
except Exception as e:
step.error = str(e)
step.status = StepStatus.FAILED
break
return current
def _apply_step(
self, step: PipelineStep, data: list[dict]
) -> list[dict]:
"""应用单步转换"""
fn_name = step.transform_fn
if fn_name == "remove_nulls":
return [
row for row in data
if all(v is not None for v in row.values())
]
elif fn_name == "deduplicate":
seen = set()
unique = []
for row in data:
key = tuple(sorted(row.items()))
if key not in seen:
seen.add(key)
unique.append(row)
return unique
elif fn_name == "normalize_numbers":
for row in data:
for k, v in row.items():
if isinstance(v, str) and v.replace(".", "").isdigit():
row[k] = float(v)
return data
else:
return data
def summary(self) -> str:
lines = [f"管道: {self.name}"]
for i, s in enumerate(self.steps, 1):
lines.append(
f"  步骤{i}: {s.name} [{s.status.value}] "
f"{s.row_count_in}→{s.row_count_out} rows"
)
return "\n".join(lines)
# 使用示例
pipeline = DataPipeline(name="月度销售清洗")
pipeline.add_step("去空值", "remove_nulls")
pipeline.add_step("去重", "deduplicate")
pipeline.add_step("数字标准化", "normalize_numbers")
raw_data = [
{"product": "A", "sales": "100.5"},
{"product": "B", "sales": None},
{"product": "A", "sales": "100.5"},
{"product": "C", "sales": "200"},
]
clean = pipeline.run(raw_data)
print(pipeline.summary())

仪表盘数据聚合

from dataclasses import dataclass
@dataclass
class DashboardMetric:
label: str
value: float
unit: str
trend: str = ""   # ↑ / ↓ / →
class DashboardBuilder:
"""仪表盘数据构建器"""
def aggregate_sales(self, records: list[dict]) -> list[DashboardMetric]:
"""从销售数据生成仪表盘指标"""
if not records:
return []
total = sum(float(r.get("amount", 0)) for r in records)
count = len(records)
avg = total / count if count else 0
# 按月分组统计
monthly: dict[str, float] = {}
for r in records:
month = r.get("date", "")[:7]
monthly[month] = monthly.get(month, 0) + float(r.get("amount", 0))
months = sorted(monthly.keys())
if len(months) >= 2:
last = monthly[months[-1]]
prev = monthly[months[-2]]
trend = "↑" if last > prev else "↓" if last < prev else "→"
else:
trend = "→"
return [
DashboardMetric("总销售额", total, "元", trend),
DashboardMetric("订单数", count, "笔"),
DashboardMetric("平均客单价", round(avg, 2), "元"),
DashboardMetric("月份数", len(months), "月"),
]
def to_table(self, metrics: list[DashboardMetric]) -> str:
"""输出为 Markdown 表格"""
lines = ["| 指标 | 数值 | 单位 | 趋势 |", "|------|------|------|------|"]
for m in metrics:
lines.append(f"| {m.label} | {m.value:,.2f} | {m.unit} | {m.trend} |")
return "\n".join(lines)

报表工具对比

工具 类型 数据源 自动化 适合场景 价格
Google Sheets 表格 API/手动 Apps Script 轻量报表 免费
Excel + Power Query 桌面 文件/数据库 VBA/宏 企业报表 Office 套件
Metabase BI SQL数据库 定时查询 团队仪表盘 开源免费
Grafana BI 多种 自动刷新 运维监控 开源免费
Power BI BI 多种 计划刷新 企业级 $10/月起

自动化调度模式

graph TB A[定时触发] --> B{数据变化?} B -->|是| C[运行管道] B -->|否| D[跳过] C --> E[生成报告] E --> F{超过阈值?} F -->|是| G[发送告警] F -->|否| H[仅保存] style A fill:#e3f2fd,stroke:#1565c0,stroke-width:2px style G fill:#ffcdd2,stroke:#e53935,stroke-width:2px

本章小结

下一章:自动化监控与异常处理