数据清洗与 ETL
真实数据总是脏的——缺失值、重复行、格式不一致。掌握 Pandas 数据清洗流水线是数据分析的第一步。
ETL 流程
graph LR
E[Extract 提取] --> T[Transform 转换]
T --> L[Load 加载]
E --> E1[CSV / Excel]
E --> E2[API / JSON]
E --> E3[数据库]
T --> T1[清洗缺失值]
T --> T2[去重]
T --> T3[类型转换]
T --> T4[特征工程]
L --> L1[数据库]
L --> L2[Parquet]
L --> L3[数据仓库]
style E fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style T fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style L fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
数据清洗实战
"""
Pandas 数据清洗完整流程
"""
import pandas as pd
import numpy as np
# === 1. 读取原始数据 ===
# 模拟脏数据
raw_data = pd.DataFrame({
"姓名": ["Alice", "Bob", "Charlie", "Alice", "Dave", None, "Eve"],
"年龄": [25, 30, None, 25, -5, 28, 200],
"邮箱": ["alice@test.com", "bob@test", "charlie@test.com",
"alice@test.com", "dave@test.com", "frank@test.com", "eve@test.com"],
"薪资": ["5000", "6000", "7,500", "5000", "N/A", "5500", "6000"],
"入职日期": ["2023-01-15", "2023/02/20", "2023-03-10",
"2023-01-15", "invalid", "2023-06-01", "2023-07-15"],
})
print("原始数据:")
print(raw_data)
print(f"形状: {raw_data.shape}")
# === 2. 数据诊断 ===
def diagnose(df: pd.DataFrame) -> dict:
"""数据质量诊断"""
return {
"总行数": len(df),
"重复行": df.duplicated().sum(),
"缺失值": df.isnull().sum().to_dict(),
"数据类型": df.dtypes.to_dict(),
}
report = diagnose(raw_data)
print(f"\n诊断报告: {report}")
# === 3. 去重 ===
df = raw_data.drop_duplicates(subset=["姓名", "邮箱"], keep="first")
print(f"\n去重后: {len(df)} 行")
# === 4. 处理缺失值 ===
# 删除姓名为空的行
df = df.dropna(subset=["姓名"])
# 年龄缺失用中位数填充
median_age = df["年龄"].median()
df["年龄"] = df["年龄"].fillna(median_age)
# === 5. 数据类型转换 ===
# 薪资:去掉逗号和非数字
df["薪资"] = pd.to_numeric(
df["薪资"].str.replace(",", "").str.strip(),
errors="coerce", # 无法转换的变 NaN
)
# 日期:统一格式
df["入职日期"] = pd.to_datetime(df["入职日期"], errors="coerce")
# === 6. 异常值处理 ===
# 年龄范围约束
df["年龄"] = df["年龄"].clip(lower=18, upper=65)
# 薪资缺失填充中位数
df["薪资"] = df["薪资"].fillna(df["薪资"].median())
print("\n清洗后数据:")
print(df)
管道式处理
"""
Pandas 链式操作:方法链让数据清洗更清晰
"""
import pandas as pd
def clean_pipeline(df: pd.DataFrame) -> pd.DataFrame:
"""数据清洗管道"""
return (
df
.drop_duplicates()
.dropna(subset=["姓名"])
.assign(
年龄=lambda x: x["年龄"].fillna(x["年龄"].median()).clip(18, 65),
薪资=lambda x: pd.to_numeric(
x["薪资"].astype(str).str.replace(",", ""),
errors="coerce",
),
入职日期=lambda x: pd.to_datetime(x["入职日期"], errors="coerce"),
)
.query("薪资 > 0")
.reset_index(drop=True)
)
# 自定义 pipe 函数
def validate_email(df: pd.DataFrame) -> pd.DataFrame:
"""验证邮箱格式"""
mask = df["邮箱"].str.contains(r"^[\w.+-]+@[\w-]+\.[\w.]+$", na=False)
return df[mask]
def add_features(df: pd.DataFrame) -> pd.DataFrame:
"""特征工程"""
return df.assign(
工龄=lambda x: (pd.Timestamp.now() - x["入职日期"]).dt.days // 365,
薪资等级=lambda x: pd.cut(
x["薪资"],
bins=[0, 5000, 8000, float("inf")],
labels=["初级", "中级", "高级"],
),
)
# 链式使用
# result = (
# raw_data
# .pipe(clean_pipeline)
# .pipe(validate_email)
# .pipe(add_features)
# )
批量文件处理
"""
批量 ETL:处理多个数据源
"""
from pathlib import Path
import pandas as pd
def etl_csv_folder(input_dir: str, output_path: str):
"""批量处理 CSV 文件"""
input_path = Path(input_dir)
dfs = []
for csv_file in sorted(input_path.glob("*.csv")):
print(f"处理: {csv_file.name}")
df = pd.read_csv(csv_file, encoding="utf-8")
df["来源文件"] = csv_file.stem
dfs.append(df)
if not dfs:
print("无文件可处理")
return
# 合并
combined = pd.concat(dfs, ignore_index=True)
print(f"合并: {len(combined)} 行")
# 导出为 Parquet(推荐:压缩好、读取快)
combined.to_parquet(output_path, index=False)
print(f"保存: {output_path}")
# 数据格式对比
FORMAT_COMPARISON = {
"CSV": {"大小": "大", "读取速度": "慢", "类型保留": "否", "适合": "通用交换"},
"Parquet": {"大小": "小", "读取速度": "快", "类型保留": "是", "适合": "分析存储"},
"JSON": {"大小": "大", "读取速度": "中", "类型保留": "部分", "适合": "API 数据"},
"Excel": {"大小": "中", "读取速度": "慢", "类型保留": "部分", "适合": "报表"},
}
for fmt, info in FORMAT_COMPARISON.items():
print(f"{fmt}: {info}")
清洗速查
| 问题 | Pandas 方法 | 示例 |
|---|---|---|
| 重复行 | drop_duplicates() | df.drop_duplicates(subset=['id']) |
| 缺失值删除 | dropna() | df.dropna(subset=['name']) |
| 缺失值填充 | fillna() | df['age'].fillna(df['age'].median()) |
| 类型转换 | pd.to_numeric() | errors='coerce' 非法值变 NaN |
| 日期解析 | pd.to_datetime() | errors='coerce' |
| 异常值裁剪 | clip() | df['age'].clip(0, 120) |
| 字符串清理 | str.strip() | df['name'].str.strip().str.lower() |
| 分箱 | pd.cut() | pd.cut(df['score'], bins=3) |
本章小结
| 知识点 | 要点 |
|---|---|
| 数据诊断 | 检查重复、缺失、类型 |
| 管道式清洗 | assign + query + pipe 链式 |
| 缺失值 | 删除 / 填充中位数 / 前向填充 |
| 批量 ETL | glob 遍历 + concat 合并 |
| 存储格式 | Parquet > CSV(分析场景) |
下一章:Web 开发——FastAPI 入门与实战。