数据分析自动化
不要花 3 小时做一份每周都一样的报表——自动化它,10 分钟搞定。
报表自动化架构
graph TD
SOURCE[数据源] --> ETL[ETL 抽取]
ETL --> CLEAN[清洗]
CLEAN --> ANALYZE[分析]
ANALYZE --> REPORT[报表生成]
REPORT --> DISTRIBUTE[分发]
SOURCE --> DB[数据库]
SOURCE --> API[API]
SOURCE --> FILE[Excel/CSV]
DISTRIBUTE --> EMAIL[邮件推送]
DISTRIBUTE --> BI[BI 看板]
DISTRIBUTE --> PDF[PDF 导出]
style ETL fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style REPORT fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
自动化报表生成
"""
自动化报表引擎
"""
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ReportConfig:
name: str
frequency: str # daily / weekly / monthly
data_source: str
metrics: list[str]
recipients: list[str]
@dataclass
class ReportEngine:
"""报表自动化引擎"""
configs: list[ReportConfig] = field(default_factory=list)
def add_report(self, config: ReportConfig):
self.configs.append(config)
def generate(self, config: ReportConfig) -> dict:
"""生成报表 (模拟)"""
# 模拟数据
data = {
"total_revenue": 1258000,
"order_count": 4230,
"avg_order_value": 297.4,
"conversion_rate": 0.034,
"return_rate": 0.052,
}
report = {
"报表名": config.name,
"生成时间": datetime.now().strftime("%Y-%m-%d %H:%M"),
"数据源": config.data_source,
"频率": config.frequency,
"指标": {},
}
metric_map = {
"revenue": ("总收入", f"¥{data['total_revenue']:,}"),
"orders": ("订单数", f"{data['order_count']:,}"),
"aov": ("客单价", f"¥{data['avg_order_value']:.0f}"),
"cvr": ("转化率", f"{data['conversion_rate']*100:.1f}%"),
"return": ("退货率", f"{data['return_rate']*100:.1f}%"),
}
for m in config.metrics:
if m in metric_map:
name, value = metric_map[m]
report["指标"][name] = value
report["收件人"] = config.recipients
return report
def schedule_summary(self) -> list[dict]:
"""排程汇总"""
return [
{
"报表": c.name,
"频率": c.frequency,
"收件人": len(c.recipients),
}
for c in self.configs
]
# 演示
engine = ReportEngine()
engine.add_report(ReportConfig(
"每日销售快报", "daily", "数据库",
["revenue", "orders", "aov"],
["ceo@company.com", "sales@company.com"],
))
engine.add_report(ReportConfig(
"每周运营报告", "weekly", "多数据源",
["revenue", "orders", "cvr", "return"],
["ops@company.com"],
))
for config in engine.configs:
report = engine.generate(config)
print(f"\n=== {report['报表名']} ===")
print(f" 频率: {report['频率']}")
for metric, value in report["指标"].items():
print(f" {metric}: {value}")
print(f" 收件人: {report['收件人']}")
BI 工具选型
| 工具 | 定价 | 适合 | 学习曲线 |
|---|---|---|---|
| Metabase | 免费(开源) | 中小团队 | 低 |
| Superset | 免费(开源) | 技术团队 | 中 |
| Power BI | $10/月 | Microsoft 生态 | 中 |
| Tableau | $70/月 | 数据分析师 | 中高 |
| Grafana | 免费(开源) | 运维/实时 | 中 |
| 帆软 FineBI | 询价 | 国内企业 | 中 |
Python 数据分析自动化
"""
Python 数据分析管道
"""
from dataclasses import dataclass
@dataclass
class AnalysisPipeline:
"""数据分析管道"""
@staticmethod
def example_pipeline() -> str:
"""完整管道代码模板"""
return '''
import pandas as pd
from pathlib import Path
from datetime import datetime
def daily_sales_report():
"""每日销售报表自动化"""
# 1. 读取数据
df = pd.read_csv("sales_data.csv")
# 2. 数据清洗
df["date"] = pd.to_datetime(df["date"])
df = df.dropna(subset=["amount"])
# 3. 分析计算
summary = {
"total_revenue": df["amount"].sum(),
"order_count": len(df),
"avg_order": df["amount"].mean(),
"top_product": df.groupby("product")["amount"]
.sum().idxmax(),
}
# 4. 按类目汇总
by_category = df.groupby("category").agg(
revenue=("amount", "sum"),
orders=("amount", "count"),
avg=("amount", "mean"),
).round(2)
# 5. 生成 Excel 报表
output = f"report_{datetime.now():%Y%m%d}.xlsx"
with pd.ExcelWriter(output) as writer:
by_category.to_excel(writer, sheet_name="类目汇总")
df.to_excel(writer, sheet_name="明细", index=False)
# 6. 发送邮件
# send_email(output, recipients)
return summary
'''
pipeline = AnalysisPipeline()
print("=== Python 报表管道 ===")
print(pipeline.example_pipeline())
定时任务调度
| 方式 | 适用 | 配置示例 |
|---|---|---|
| cron (Linux) | 服务器脚本 | 0 8 * * 1-5 (工作日8点) |
| Windows 任务计划 | 桌面 Python | 创建基本任务 → 触发器 |
| APScheduler | Python 应用 | CronTrigger(hour=8) |
| GitHub Actions | CI/CD 管道 | schedule: cron: '0 0 * * 1' |
| Airflow | 企业数据管道 | DAG 定义 |
下一章:流程设计与治理——让自动化安全、合规、可持续。