数据清洗与 ETL
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read203 words

数据清洗与 ETL

真实数据总是脏的——缺失值、重复行、格式不一致。掌握 Pandas 数据清洗流水线是数据分析的第一步。

ETL 流程

graph LR E[Extract 提取] --> T[Transform 转换] T --> L[Load 加载] E --> E1[CSV / Excel] E --> E2[API / JSON] E --> E3[数据库] T --> T1[清洗缺失值] T --> T2[去重] T --> T3[类型转换] T --> T4[特征工程] L --> L1[数据库] L --> L2[Parquet] L --> L3[数据仓库] style E fill:#e3f2fd,stroke:#1565c0,stroke-width:2px style T fill:#fff3e0,stroke:#f57c00,stroke-width:2px style L fill:#c8e6c9,stroke:#388e3c,stroke-width:2px

数据清洗实战

"""
Pandas 数据清洗完整流程
"""
import pandas as pd
import numpy as np
# === 1. 读取原始数据 ===
# 模拟脏数据
raw_data = pd.DataFrame({
"姓名": ["Alice", "Bob", "Charlie", "Alice", "Dave", None, "Eve"],
"年龄": [25, 30, None, 25, -5, 28, 200],
"邮箱": ["alice@test.com", "bob@test", "charlie@test.com",
"alice@test.com", "dave@test.com", "frank@test.com", "eve@test.com"],
"薪资": ["5000", "6000", "7,500", "5000", "N/A", "5500", "6000"],
"入职日期": ["2023-01-15", "2023/02/20", "2023-03-10",
"2023-01-15", "invalid", "2023-06-01", "2023-07-15"],
})
print("原始数据:")
print(raw_data)
print(f"形状: {raw_data.shape}")
# === 2. 数据诊断 ===
def diagnose(df: pd.DataFrame) -> dict:
"""数据质量诊断"""
return {
"总行数": len(df),
"重复行": df.duplicated().sum(),
"缺失值": df.isnull().sum().to_dict(),
"数据类型": df.dtypes.to_dict(),
}
report = diagnose(raw_data)
print(f"\n诊断报告: {report}")
# === 3. 去重 ===
df = raw_data.drop_duplicates(subset=["姓名", "邮箱"], keep="first")
print(f"\n去重后: {len(df)} 行")
# === 4. 处理缺失值 ===
# 删除姓名为空的行
df = df.dropna(subset=["姓名"])
# 年龄缺失用中位数填充
median_age = df["年龄"].median()
df["年龄"] = df["年龄"].fillna(median_age)
# === 5. 数据类型转换 ===
# 薪资:去掉逗号和非数字
df["薪资"] = pd.to_numeric(
df["薪资"].str.replace(",", "").str.strip(),
errors="coerce",  # 无法转换的变 NaN
)
# 日期:统一格式
df["入职日期"] = pd.to_datetime(df["入职日期"], errors="coerce")
# === 6. 异常值处理 ===
# 年龄范围约束
df["年龄"] = df["年龄"].clip(lower=18, upper=65)
# 薪资缺失填充中位数
df["薪资"] = df["薪资"].fillna(df["薪资"].median())
print("\n清洗后数据:")
print(df)

管道式处理

"""
Pandas 链式操作:方法链让数据清洗更清晰
"""
import pandas as pd
def clean_pipeline(df: pd.DataFrame) -> pd.DataFrame:
"""数据清洗管道"""
return (
df
.drop_duplicates()
.dropna(subset=["姓名"])
.assign(
年龄=lambda x: x["年龄"].fillna(x["年龄"].median()).clip(18, 65),
薪资=lambda x: pd.to_numeric(
x["薪资"].astype(str).str.replace(",", ""),
errors="coerce",
),
入职日期=lambda x: pd.to_datetime(x["入职日期"], errors="coerce"),
)
.query("薪资 > 0")
.reset_index(drop=True)
)
# 自定义 pipe 函数
def validate_email(df: pd.DataFrame) -> pd.DataFrame:
"""验证邮箱格式"""
mask = df["邮箱"].str.contains(r"^[\w.+-]+@[\w-]+\.[\w.]+$", na=False)
return df[mask]
def add_features(df: pd.DataFrame) -> pd.DataFrame:
"""特征工程"""
return df.assign(
工龄=lambda x: (pd.Timestamp.now() - x["入职日期"]).dt.days // 365,
薪资等级=lambda x: pd.cut(
x["薪资"],
bins=[0, 5000, 8000, float("inf")],
labels=["初级", "中级", "高级"],
),
)
# 链式使用
# result = (
#     raw_data
#     .pipe(clean_pipeline)
#     .pipe(validate_email)
#     .pipe(add_features)
# )

批量文件处理

"""
批量 ETL:处理多个数据源
"""
from pathlib import Path
import pandas as pd
def etl_csv_folder(input_dir: str, output_path: str):
"""批量处理 CSV 文件"""
input_path = Path(input_dir)
dfs = []
for csv_file in sorted(input_path.glob("*.csv")):
print(f"处理: {csv_file.name}")
df = pd.read_csv(csv_file, encoding="utf-8")
df["来源文件"] = csv_file.stem
dfs.append(df)
if not dfs:
print("无文件可处理")
return
# 合并
combined = pd.concat(dfs, ignore_index=True)
print(f"合并: {len(combined)} 行")
# 导出为 Parquet(推荐:压缩好、读取快)
combined.to_parquet(output_path, index=False)
print(f"保存: {output_path}")
# 数据格式对比
FORMAT_COMPARISON = {
"CSV": {"大小": "大", "读取速度": "慢", "类型保留": "否", "适合": "通用交换"},
"Parquet": {"大小": "小", "读取速度": "快", "类型保留": "是", "适合": "分析存储"},
"JSON": {"大小": "大", "读取速度": "中", "类型保留": "部分", "适合": "API 数据"},
"Excel": {"大小": "中", "读取速度": "慢", "类型保留": "部分", "适合": "报表"},
}
for fmt, info in FORMAT_COMPARISON.items():
print(f"{fmt}: {info}")

清洗速查

问题 Pandas 方法 示例
重复行 drop_duplicates() df.drop_duplicates(subset=['id'])
缺失值删除 dropna() df.dropna(subset=['name'])
缺失值填充 fillna() df['age'].fillna(df['age'].median())
类型转换 pd.to_numeric() errors='coerce' 非法值变 NaN
日期解析 pd.to_datetime() errors='coerce'
异常值裁剪 clip() df['age'].clip(0, 120)
字符串清理 str.strip() df['name'].str.strip().str.lower()
分箱 pd.cut() pd.cut(df['score'], bins=3)

本章小结

知识点 要点
数据诊断 检查重复、缺失、类型
管道式清洗 assign + query + pipe 链式
缺失值 删除 / 填充中位数 / 前向填充
批量 ETL glob 遍历 + concat 合并
存储格式 Parquet > CSV(分析场景)

下一章:Web 开发——FastAPI 入门与实战。