微调数据飞轮
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read236 words

微调数据飞轮

微调不是一次性任务——建立"生产数据 → 筛选 → 训练 → 部署 → 收集"的闭环,模型越用越好。

数据飞轮架构

graph TB A[线上模型] --> B[收集用户交互
问答+反馈] B --> C[数据筛选
质量过滤] C --> D[构建训练集
格式化+增强] D --> E[微调训练
增量更新] E --> F[评估验证
A/B 测试] F --> A style A fill:#e8f5e9,stroke:#388e3c,stroke-width:3px style C fill:#fff3e0,stroke:#f57c00,stroke-width:2px

自动数据收集

"""
生产环境数据自动收集
"""
from dataclasses import dataclass, field
import time
from typing import Any
@dataclass
class InteractionRecord:
"""交互记录"""
query: str
response: str
model: str
timestamp: float = field(default_factory=time.time)
user_feedback: int | None = None    # 1=👍, -1=👎
quality_score: float | None = None  # LLM-as-Judge 分数
latency_ms: float = 0.0
metadata: dict = field(default_factory=dict)
class DataCollector:
"""生产数据收集器"""
def __init__(
self,
min_query_length: int = 10,
max_query_length: int = 2000,
sample_rate: float = 0.1,       # 采样 10%
):
self.min_query_length = min_query_length
self.max_query_length = max_query_length
self.sample_rate = sample_rate
self._buffer: list[InteractionRecord] = []
def collect(self, record: InteractionRecord) -> bool:
"""收集一条交互记录"""
if not self._should_collect(record):
return False
self._buffer.append(record)
return True
def _should_collect(self, record: InteractionRecord) -> bool:
"""采样和过滤"""
# 长度过滤
if not (self.min_query_length <= len(record.query) <= self.max_query_length):
return False
# 随机采样
import random
return random.random() < self.sample_rate
def get_positive_samples(self, min_score: float = 0.8) -> list[InteractionRecord]:
"""获取高质量样本(用于训练)"""
return [
r for r in self._buffer
if (r.user_feedback == 1)
or (r.quality_score is not None and r.quality_score >= min_score)
]
def get_negative_samples(self) -> list[InteractionRecord]:
"""获取低质量样本(用于 DPO rejected)"""
return [
r for r in self._buffer
if r.user_feedback == -1
or (r.quality_score is not None and r.quality_score < 0.5)
]
def build_sft_dataset(self, min_score: float = 0.8) -> list[dict]:
"""构建 SFT 数据集"""
positive = self.get_positive_samples(min_score)
return [
{
"instruction": r.query,
"output": r.response,
}
for r in positive
]
def build_dpo_dataset(self) -> list[dict]:
"""构建 DPO 偏好数据集"""
pairs = []
positive = {r.query: r.response for r in self.get_positive_samples()}
negative = {r.query: r.response for r in self.get_negative_samples()}
for query in set(positive.keys()) & set(negative.keys()):
pairs.append({
"prompt": query,
"chosen": positive[query],
"rejected": negative[query],
})
return pairs

增量训练策略

"""
增量微调策略
"""
from dataclasses import dataclass
from enum import Enum
class UpdateStrategy(Enum):
FULL_RETRAIN = "full_retrain"       # 全量重训
INCREMENTAL = "incremental"         # 增量训练
MERGE_ADAPTERS = "merge_adapters"   # 合并多个 LoRA
@dataclass
class IncrementalConfig:
"""增量训练配置"""
base_adapter: str          # 上一版 LoRA 权重
new_data_path: str         # 新增数据
strategy: UpdateStrategy = UpdateStrategy.INCREMENTAL
# 增量训练参数
num_epochs: int = 1        # 增量只需 1 epoch
learning_rate: float = 1e-5  # 比初次训练更低
mix_ratio: float = 0.3     # 混入 30% 旧数据防遗忘
STRATEGY_GUIDE = {
UpdateStrategy.FULL_RETRAIN: {
"场景": "数据量翻倍或领域大变",
"优点": "质量最稳定",
"缺点": "成本高、耗时长",
},
UpdateStrategy.INCREMENTAL: {
"场景": "每周新增数据 < 20%",
"优点": "快速、低成本",
"缺点": "有灾难性遗忘风险",
},
UpdateStrategy.MERGE_ADAPTERS: {
"场景": "多任务各自训练后合并",
"优点": "模块化、灵活",
"缺点": "合并后可能性能下降",
},
}

飞轮节奏建议

阶段 数据量 训练频率 策略
启动期 < 1000 手动触发 全量训练
成长期 1K-10K 每周 增量训练
成熟期 10K+ 每日 自动化增量
稳定期 50K+ 按需 数据筛选 + 增量

本章小结

要点 说明
闭环收集 生产数据反哺训练,模型越用越好
正负样本 用户 👍 → SFT 数据;👍 vs 👎 → DPO 数据
增量训练 低学习率 + 混入旧数据 + 1 epoch
灾难性遗忘 新数据不超过总量 20%,混入 30% 旧数据

下一章:模型蒸馏与持续学习