微调数据飞轮
微调不是一次性任务——建立"生产数据 → 筛选 → 训练 → 部署 → 收集"的闭环,模型越用越好。
数据飞轮架构
graph TB
A[线上模型] --> B[收集用户交互
问答+反馈] B --> C[数据筛选
质量过滤] C --> D[构建训练集
格式化+增强] D --> E[微调训练
增量更新] E --> F[评估验证
A/B 测试] F --> A style A fill:#e8f5e9,stroke:#388e3c,stroke-width:3px style C fill:#fff3e0,stroke:#f57c00,stroke-width:2px
问答+反馈] B --> C[数据筛选
质量过滤] C --> D[构建训练集
格式化+增强] D --> E[微调训练
增量更新] E --> F[评估验证
A/B 测试] F --> A style A fill:#e8f5e9,stroke:#388e3c,stroke-width:3px style C fill:#fff3e0,stroke:#f57c00,stroke-width:2px
自动数据收集
"""
生产环境数据自动收集
"""
from dataclasses import dataclass, field
import time
from typing import Any
@dataclass
class InteractionRecord:
"""交互记录"""
query: str
response: str
model: str
timestamp: float = field(default_factory=time.time)
user_feedback: int | None = None # 1=👍, -1=👎
quality_score: float | None = None # LLM-as-Judge 分数
latency_ms: float = 0.0
metadata: dict = field(default_factory=dict)
class DataCollector:
"""生产数据收集器"""
def __init__(
self,
min_query_length: int = 10,
max_query_length: int = 2000,
sample_rate: float = 0.1, # 采样 10%
):
self.min_query_length = min_query_length
self.max_query_length = max_query_length
self.sample_rate = sample_rate
self._buffer: list[InteractionRecord] = []
def collect(self, record: InteractionRecord) -> bool:
"""收集一条交互记录"""
if not self._should_collect(record):
return False
self._buffer.append(record)
return True
def _should_collect(self, record: InteractionRecord) -> bool:
"""采样和过滤"""
# 长度过滤
if not (self.min_query_length <= len(record.query) <= self.max_query_length):
return False
# 随机采样
import random
return random.random() < self.sample_rate
def get_positive_samples(self, min_score: float = 0.8) -> list[InteractionRecord]:
"""获取高质量样本(用于训练)"""
return [
r for r in self._buffer
if (r.user_feedback == 1)
or (r.quality_score is not None and r.quality_score >= min_score)
]
def get_negative_samples(self) -> list[InteractionRecord]:
"""获取低质量样本(用于 DPO rejected)"""
return [
r for r in self._buffer
if r.user_feedback == -1
or (r.quality_score is not None and r.quality_score < 0.5)
]
def build_sft_dataset(self, min_score: float = 0.8) -> list[dict]:
"""构建 SFT 数据集"""
positive = self.get_positive_samples(min_score)
return [
{
"instruction": r.query,
"output": r.response,
}
for r in positive
]
def build_dpo_dataset(self) -> list[dict]:
"""构建 DPO 偏好数据集"""
pairs = []
positive = {r.query: r.response for r in self.get_positive_samples()}
negative = {r.query: r.response for r in self.get_negative_samples()}
for query in set(positive.keys()) & set(negative.keys()):
pairs.append({
"prompt": query,
"chosen": positive[query],
"rejected": negative[query],
})
return pairs
增量训练策略
"""
增量微调策略
"""
from dataclasses import dataclass
from enum import Enum
class UpdateStrategy(Enum):
FULL_RETRAIN = "full_retrain" # 全量重训
INCREMENTAL = "incremental" # 增量训练
MERGE_ADAPTERS = "merge_adapters" # 合并多个 LoRA
@dataclass
class IncrementalConfig:
"""增量训练配置"""
base_adapter: str # 上一版 LoRA 权重
new_data_path: str # 新增数据
strategy: UpdateStrategy = UpdateStrategy.INCREMENTAL
# 增量训练参数
num_epochs: int = 1 # 增量只需 1 epoch
learning_rate: float = 1e-5 # 比初次训练更低
mix_ratio: float = 0.3 # 混入 30% 旧数据防遗忘
STRATEGY_GUIDE = {
UpdateStrategy.FULL_RETRAIN: {
"场景": "数据量翻倍或领域大变",
"优点": "质量最稳定",
"缺点": "成本高、耗时长",
},
UpdateStrategy.INCREMENTAL: {
"场景": "每周新增数据 < 20%",
"优点": "快速、低成本",
"缺点": "有灾难性遗忘风险",
},
UpdateStrategy.MERGE_ADAPTERS: {
"场景": "多任务各自训练后合并",
"优点": "模块化、灵活",
"缺点": "合并后可能性能下降",
},
}
飞轮节奏建议
| 阶段 | 数据量 | 训练频率 | 策略 |
|---|---|---|---|
| 启动期 | < 1000 | 手动触发 | 全量训练 |
| 成长期 | 1K-10K | 每周 | 增量训练 |
| 成熟期 | 10K+ | 每日 | 自动化增量 |
| 稳定期 | 50K+ | 按需 | 数据筛选 + 增量 |
本章小结
| 要点 | 说明 |
|---|---|
| 闭环收集 | 生产数据反哺训练,模型越用越好 |
| 正负样本 | 用户 👍 → SFT 数据;👍 vs 👎 → DPO 数据 |
| 增量训练 | 低学习率 + 混入旧数据 + 1 epoch |
| 灾难性遗忘 | 新数据不超过总量 20%,混入 30% 旧数据 |
下一章:模型蒸馏与持续学习