实战项目:数据管道 + API 服务
把所有知识点综合起来——构建一个完整的数据处理和 API 服务项目。
项目架构
graph LR
SRC[数据源 CSV/JSON] --> PIPE[数据管道]
PIPE --> CLEAN[清洗]
CLEAN --> TRANSFORM[转换]
TRANSFORM --> LOAD[存储]
LOAD --> DB[(SQLite)]
DB --> API[FastAPI]
API --> LIST_E[列表查询]
API --> STATS[统计分析]
API --> EXPORT[数据导出]
style SRC fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style PIPE fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style API fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
项目结构
"""
项目目录结构
"""
STRUCTURE = """
sales-analytics/
├── pyproject.toml
├── README.md
├── data/
│ └── sales_raw.csv # 原始数据
├── src/
│ └── sales_analytics/
│ ├── __init__.py
│ ├── pipeline.py # 数据管道
│ ├── models.py # 数据模型
│ ├── database.py # 数据库操作
│ ├── api.py # FastAPI 接口
│ └── config.py # 配置
└── tests/
├── conftest.py
└── test_pipeline.py
"""
print(STRUCTURE)
数据模型
"""
数据模型定义
"""
from dataclasses import dataclass, field
from datetime import date, datetime
from pydantic import BaseModel, Field
# === 内部数据模型 (dataclass) ===
@dataclass
class SalesRecord:
"""销售记录"""
order_id: str
product: str
category: str
quantity: int
unit_price: float
order_date: date
region: str
customer_id: str
@property
def total_amount(self) -> float:
return self.quantity * self.unit_price
def is_valid(self) -> bool:
return (
self.quantity > 0
and self.unit_price > 0
and len(self.order_id) > 0
)
# === API 模型 (Pydantic) ===
class SalesResponse(BaseModel):
"""销售记录响应"""
order_id: str
product: str
category: str
quantity: int
unit_price: float
total_amount: float
order_date: date
region: str
class SalesStats(BaseModel):
"""销售统计"""
total_revenue: float
total_orders: int
avg_order_value: float
top_products: list[dict]
revenue_by_region: dict[str, float]
revenue_by_category: dict[str, float]
class QueryParams(BaseModel):
"""查询参数"""
start_date: date | None = None
end_date: date | None = None
category: str | None = None
region: str | None = None
min_amount: float = 0
limit: int = Field(default=100, ge=1, le=1000)
offset: int = Field(default=0, ge=0)
# 演示
record = SalesRecord(
order_id="ORD-001",
product="Python 编程书",
category="图书",
quantity=2,
unit_price=59.9,
order_date=date(2024, 1, 15),
region="华东",
customer_id="C001",
)
print(f"订单: {record.order_id}, 总额: ¥{record.total_amount:.2f}")
print(f"有效: {record.is_valid()}")
数据管道
"""
ETL 数据管道
"""
import csv
import json
from datetime import date, datetime
from pathlib import Path
from dataclasses import asdict
class DataPipeline:
"""数据处理管道"""
def __init__(self):
self.records: list[dict] = []
self.errors: list[dict] = []
self.stats = {
"total_raw": 0,
"total_cleaned": 0,
"total_errors": 0,
}
# === Extract ===
def extract_csv(self, filepath: str) -> list[dict]:
"""从 CSV 提取数据"""
records = []
with open(filepath, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
records.append(dict(row))
self.stats["total_raw"] = len(records)
print(f"📥 提取: {len(records)} 条原始记录")
return records
# === Transform ===
def clean(self, records: list[dict]) -> list[dict]:
"""数据清洗"""
cleaned = []
for i, row in enumerate(records):
try:
# 类型转换
row["quantity"] = int(row.get("quantity", 0))
row["unit_price"] = float(row.get("unit_price", 0))
# 日期解析
date_str = row.get("order_date", "")
row["order_date"] = datetime.strptime(
date_str, "%Y-%m-%d"
).date()
# 字符串清洗
row["product"] = row.get("product", "").strip()
row["category"] = row.get("category", "").strip()
row["region"] = row.get("region", "").strip()
# 计算字段
row["total_amount"] = row["quantity"] * row["unit_price"]
# 验证
if row["quantity"] <= 0 or row["unit_price"] <= 0:
raise ValueError("数量或价格无效")
if not row["product"]:
raise ValueError("产品名称为空")
cleaned.append(row)
except Exception as e:
self.errors.append({
"row": i + 1,
"error": str(e),
"data": row,
})
self.stats["total_cleaned"] = len(cleaned)
self.stats["total_errors"] = len(self.errors)
print(f"✅ 清洗: {len(cleaned)} 条有效, {len(self.errors)} 条错误")
return cleaned
def enrich(self, records: list[dict]) -> list[dict]:
"""数据增强"""
for row in records:
# 添加月份维度
row["month"] = row["order_date"].strftime("%Y-%m")
# 添加价格区间
amount = row["total_amount"]
if amount >= 1000:
row["tier"] = "高"
elif amount >= 200:
row["tier"] = "中"
else:
row["tier"] = "低"
print(f"🔧 增强: 添加 month, tier 字段")
return records
# === Load ===
def load_json(self, records: list[dict], filepath: str):
"""保存为 JSON"""
output = []
for r in records:
row = dict(r)
if isinstance(row.get("order_date"), date):
row["order_date"] = row["order_date"].isoformat()
output.append(row)
with open(filepath, "w", encoding="utf-8") as f:
json.dump(output, f, ensure_ascii=False, indent=2)
print(f"💾 保存: {len(records)} 条记录 → {filepath}")
# === 统计报告 ===
def generate_report(self, records: list[dict]) -> dict:
"""生成统计报告"""
if not records:
return {"error": "没有数据"}
total_revenue = sum(r["total_amount"] for r in records)
avg_order = total_revenue / len(records)
# 按类别统计
by_category = {}
for r in records:
cat = r["category"]
by_category[cat] = by_category.get(cat, 0) + r["total_amount"]
# 按地区统计
by_region = {}
for r in records:
reg = r["region"]
by_region[reg] = by_region.get(reg, 0) + r["total_amount"]
# Top 产品
product_revenue = {}
for r in records:
prod = r["product"]
product_revenue[prod] = product_revenue.get(prod, 0) + r["total_amount"]
top_products = sorted(
product_revenue.items(),
key=lambda x: x[1],
reverse=True,
)[:5]
report = {
"total_revenue": round(total_revenue, 2),
"total_orders": len(records),
"avg_order_value": round(avg_order, 2),
"top_products": [
{"product": p, "revenue": round(r, 2)} for p, r in top_products
],
"revenue_by_category": {k: round(v, 2) for k, v in by_category.items()},
"revenue_by_region": {k: round(v, 2) for k, v in by_region.items()},
}
print(f"\n📊 报告:")
print(f" 总收入: ¥{report['total_revenue']:,.2f}")
print(f" 总订单: {report['total_orders']}")
print(f" 平均客单价: ¥{report['avg_order_value']:.2f}")
return report
# === 完整流程 ===
def run(self, input_file: str, output_file: str) -> dict:
"""执行完整 ETL 管道"""
print("=" * 50)
print("🚀 数据管道启动")
print("=" * 50)
# Extract
raw = self.extract_csv(input_file)
# Transform
cleaned = self.clean(raw)
enriched = self.enrich(cleaned)
# Load
self.load_json(enriched, output_file)
# Report
report = self.generate_report(enriched)
print("=" * 50)
print("✅ 管道完成!")
print(f" 处理统计: {self.stats}")
return report
# 使用
pipeline = DataPipeline()
print("数据管道准备就绪")
print("用法: pipeline.run('data/sales.csv', 'data/output.json')")
API 服务
"""
FastAPI 数据服务
"""
from fastapi import FastAPI, Query, HTTPException
from datetime import date
import json
app = FastAPI(title="销售数据分析 API")
# 加载处理后的数据
def load_data(filepath: str = "data/output.json") -> list[dict]:
try:
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return []
@app.get("/api/sales")
async def get_sales(
category: str | None = None,
region: str | None = None,
min_amount: float = 0,
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""查询销售记录"""
data = load_data()
filtered = data
if category:
filtered = [r for r in filtered if r["category"] == category]
if region:
filtered = [r for r in filtered if r["region"] == region]
if min_amount > 0:
filtered = [r for r in filtered if r["total_amount"] >= min_amount]
total = len(filtered)
page = filtered[offset: offset + limit]
return {
"total": total,
"limit": limit,
"offset": offset,
"data": page,
}
@app.get("/api/stats")
async def get_stats():
"""获取销售统计"""
data = load_data()
if not data:
raise HTTPException(404, "没有数据")
total_revenue = sum(r["total_amount"] for r in data)
by_category = {}
for r in data:
cat = r["category"]
by_category[cat] = by_category.get(cat, 0) + r["total_amount"]
return {
"total_revenue": round(total_revenue, 2),
"total_orders": len(data),
"avg_order_value": round(total_revenue / len(data), 2),
"revenue_by_category": by_category,
}
@app.get("/api/export")
async def export_csv():
"""导出 CSV"""
from fastapi.responses import StreamingResponse
import io
data = load_data()
output = io.StringIO()
if data:
import csv
writer = csv.DictWriter(output, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
output.seek(0)
return StreamingResponse(
output,
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=sales.csv"},
)
print("API 服务准备就绪")
print("启动: uvicorn api:app --reload")
print("文档: http://localhost:8000/docs")
本书总结
恭喜你完成了 Python 实战全面指南!回顾一下学到的核心能力:
| 章节 | 核心能力 |
|---|---|
| 01 基础语法 | 变量、控制流、函数、模块 |
| 02 数据结构 | 列表、字典、集合、生成器 |
| 03 面向对象 | 类、继承、魔法方法、dataclass |
| 04 函数式编程 | 高阶函数、闭包、装饰器 |
| 05 并发异步 | threading、multiprocessing、asyncio |
| 06 文件 IO | JSON、CSV、pathlib |
| 07 数据分析 | NumPy、Pandas |
| 08 Web 开发 | FastAPI、RESTful API |
| 09 测试工程化 | pytest、mypy、ruff |
| 10 实战项目 | ETL 管道 + API 服务 |
继续学习:将这些知识应用到你的实际项目中!