实战项目:数据管道 + API 服务
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read226 words

实战项目:数据管道 + API 服务

把所有知识点综合起来——构建一个完整的数据处理和 API 服务项目。

项目架构

graph LR SRC[数据源 CSV/JSON] --> PIPE[数据管道] PIPE --> CLEAN[清洗] CLEAN --> TRANSFORM[转换] TRANSFORM --> LOAD[存储] LOAD --> DB[(SQLite)] DB --> API[FastAPI] API --> LIST_E[列表查询] API --> STATS[统计分析] API --> EXPORT[数据导出] style SRC fill:#fff3e0,stroke:#f57c00,stroke-width:2px style PIPE fill:#e3f2fd,stroke:#1565c0,stroke-width:2px style API fill:#c8e6c9,stroke:#388e3c,stroke-width:2px

项目结构

"""
项目目录结构
"""
STRUCTURE = """
sales-analytics/
├── pyproject.toml
├── README.md
├── data/
│   └── sales_raw.csv       # 原始数据
├── src/
│   └── sales_analytics/
│       ├── __init__.py
│       ├── pipeline.py      # 数据管道
│       ├── models.py        # 数据模型
│       ├── database.py      # 数据库操作
│       ├── api.py           # FastAPI 接口
│       └── config.py        # 配置
└── tests/
├── conftest.py
└── test_pipeline.py
"""
print(STRUCTURE)

数据模型

"""
数据模型定义
"""
from dataclasses import dataclass, field
from datetime import date, datetime
from pydantic import BaseModel, Field
# === 内部数据模型 (dataclass) ===
@dataclass
class SalesRecord:
"""销售记录"""
order_id: str
product: str
category: str
quantity: int
unit_price: float
order_date: date
region: str
customer_id: str
@property
def total_amount(self) -> float:
return self.quantity * self.unit_price
def is_valid(self) -> bool:
return (
self.quantity > 0
and self.unit_price > 0
and len(self.order_id) > 0
)
# === API 模型 (Pydantic) ===
class SalesResponse(BaseModel):
"""销售记录响应"""
order_id: str
product: str
category: str
quantity: int
unit_price: float
total_amount: float
order_date: date
region: str
class SalesStats(BaseModel):
"""销售统计"""
total_revenue: float
total_orders: int
avg_order_value: float
top_products: list[dict]
revenue_by_region: dict[str, float]
revenue_by_category: dict[str, float]
class QueryParams(BaseModel):
"""查询参数"""
start_date: date | None = None
end_date: date | None = None
category: str | None = None
region: str | None = None
min_amount: float = 0
limit: int = Field(default=100, ge=1, le=1000)
offset: int = Field(default=0, ge=0)
# 演示
record = SalesRecord(
order_id="ORD-001",
product="Python 编程书",
category="图书",
quantity=2,
unit_price=59.9,
order_date=date(2024, 1, 15),
region="华东",
customer_id="C001",
)
print(f"订单: {record.order_id}, 总额: ¥{record.total_amount:.2f}")
print(f"有效: {record.is_valid()}")

数据管道

"""
ETL 数据管道
"""
import csv
import json
from datetime import date, datetime
from pathlib import Path
from dataclasses import asdict
class DataPipeline:
"""数据处理管道"""
def __init__(self):
self.records: list[dict] = []
self.errors: list[dict] = []
self.stats = {
"total_raw": 0,
"total_cleaned": 0,
"total_errors": 0,
}
# === Extract ===
def extract_csv(self, filepath: str) -> list[dict]:
"""从 CSV 提取数据"""
records = []
with open(filepath, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
records.append(dict(row))
self.stats["total_raw"] = len(records)
print(f"📥 提取: {len(records)} 条原始记录")
return records
# === Transform ===
def clean(self, records: list[dict]) -> list[dict]:
"""数据清洗"""
cleaned = []
for i, row in enumerate(records):
try:
# 类型转换
row["quantity"] = int(row.get("quantity", 0))
row["unit_price"] = float(row.get("unit_price", 0))
# 日期解析
date_str = row.get("order_date", "")
row["order_date"] = datetime.strptime(
date_str, "%Y-%m-%d"
).date()
# 字符串清洗
row["product"] = row.get("product", "").strip()
row["category"] = row.get("category", "").strip()
row["region"] = row.get("region", "").strip()
# 计算字段
row["total_amount"] = row["quantity"] * row["unit_price"]
# 验证
if row["quantity"] <= 0 or row["unit_price"] <= 0:
raise ValueError("数量或价格无效")
if not row["product"]:
raise ValueError("产品名称为空")
cleaned.append(row)
except Exception as e:
self.errors.append({
"row": i + 1,
"error": str(e),
"data": row,
})
self.stats["total_cleaned"] = len(cleaned)
self.stats["total_errors"] = len(self.errors)
print(f"✅ 清洗: {len(cleaned)} 条有效, {len(self.errors)} 条错误")
return cleaned
def enrich(self, records: list[dict]) -> list[dict]:
"""数据增强"""
for row in records:
# 添加月份维度
row["month"] = row["order_date"].strftime("%Y-%m")
# 添加价格区间
amount = row["total_amount"]
if amount >= 1000:
row["tier"] = "高"
elif amount >= 200:
row["tier"] = "中"
else:
row["tier"] = "低"
print(f"🔧 增强: 添加 month, tier 字段")
return records
# === Load ===
def load_json(self, records: list[dict], filepath: str):
"""保存为 JSON"""
output = []
for r in records:
row = dict(r)
if isinstance(row.get("order_date"), date):
row["order_date"] = row["order_date"].isoformat()
output.append(row)
with open(filepath, "w", encoding="utf-8") as f:
json.dump(output, f, ensure_ascii=False, indent=2)
print(f"💾 保存: {len(records)} 条记录 → {filepath}")
# === 统计报告 ===
def generate_report(self, records: list[dict]) -> dict:
"""生成统计报告"""
if not records:
return {"error": "没有数据"}
total_revenue = sum(r["total_amount"] for r in records)
avg_order = total_revenue / len(records)
# 按类别统计
by_category = {}
for r in records:
cat = r["category"]
by_category[cat] = by_category.get(cat, 0) + r["total_amount"]
# 按地区统计
by_region = {}
for r in records:
reg = r["region"]
by_region[reg] = by_region.get(reg, 0) + r["total_amount"]
# Top 产品
product_revenue = {}
for r in records:
prod = r["product"]
product_revenue[prod] = product_revenue.get(prod, 0) + r["total_amount"]
top_products = sorted(
product_revenue.items(),
key=lambda x: x[1],
reverse=True,
)[:5]
report = {
"total_revenue": round(total_revenue, 2),
"total_orders": len(records),
"avg_order_value": round(avg_order, 2),
"top_products": [
{"product": p, "revenue": round(r, 2)} for p, r in top_products
],
"revenue_by_category": {k: round(v, 2) for k, v in by_category.items()},
"revenue_by_region": {k: round(v, 2) for k, v in by_region.items()},
}
print(f"\n📊 报告:")
print(f"  总收入: ¥{report['total_revenue']:,.2f}")
print(f"  总订单: {report['total_orders']}")
print(f"  平均客单价: ¥{report['avg_order_value']:.2f}")
return report
# === 完整流程 ===
def run(self, input_file: str, output_file: str) -> dict:
"""执行完整 ETL 管道"""
print("=" * 50)
print("🚀 数据管道启动")
print("=" * 50)
# Extract
raw = self.extract_csv(input_file)
# Transform
cleaned = self.clean(raw)
enriched = self.enrich(cleaned)
# Load
self.load_json(enriched, output_file)
# Report
report = self.generate_report(enriched)
print("=" * 50)
print("✅ 管道完成!")
print(f"  处理统计: {self.stats}")
return report
# 使用
pipeline = DataPipeline()
print("数据管道准备就绪")
print("用法: pipeline.run('data/sales.csv', 'data/output.json')")

API 服务

"""
FastAPI 数据服务
"""
from fastapi import FastAPI, Query, HTTPException
from datetime import date
import json
app = FastAPI(title="销售数据分析 API")
# 加载处理后的数据
def load_data(filepath: str = "data/output.json") -> list[dict]:
try:
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return []
@app.get("/api/sales")
async def get_sales(
category: str | None = None,
region: str | None = None,
min_amount: float = 0,
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""查询销售记录"""
data = load_data()
filtered = data
if category:
filtered = [r for r in filtered if r["category"] == category]
if region:
filtered = [r for r in filtered if r["region"] == region]
if min_amount > 0:
filtered = [r for r in filtered if r["total_amount"] >= min_amount]
total = len(filtered)
page = filtered[offset: offset + limit]
return {
"total": total,
"limit": limit,
"offset": offset,
"data": page,
}
@app.get("/api/stats")
async def get_stats():
"""获取销售统计"""
data = load_data()
if not data:
raise HTTPException(404, "没有数据")
total_revenue = sum(r["total_amount"] for r in data)
by_category = {}
for r in data:
cat = r["category"]
by_category[cat] = by_category.get(cat, 0) + r["total_amount"]
return {
"total_revenue": round(total_revenue, 2),
"total_orders": len(data),
"avg_order_value": round(total_revenue / len(data), 2),
"revenue_by_category": by_category,
}
@app.get("/api/export")
async def export_csv():
"""导出 CSV"""
from fastapi.responses import StreamingResponse
import io
data = load_data()
output = io.StringIO()
if data:
import csv
writer = csv.DictWriter(output, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
output.seek(0)
return StreamingResponse(
output,
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=sales.csv"},
)
print("API 服务准备就绪")
print("启动: uvicorn api:app --reload")
print("文档: http://localhost:8000/docs")

本书总结

恭喜你完成了 Python 实战全面指南!回顾一下学到的核心能力:

章节 核心能力
01 基础语法 变量、控制流、函数、模块
02 数据结构 列表、字典、集合、生成器
03 面向对象 类、继承、魔法方法、dataclass
04 函数式编程 高阶函数、闭包、装饰器
05 并发异步 threading、multiprocessing、asyncio
06 文件 IO JSON、CSV、pathlib
07 数据分析 NumPy、Pandas
08 Web 开发 FastAPI、RESTful API
09 测试工程化 pytest、mypy、ruff
10 实战项目 ETL 管道 + API 服务

继续学习:将这些知识应用到你的实际项目中!