多模态数据管道
多模态 AI 应用离不开高质量的数据供给。从原始图像、视频、音频的采集到入库,再到预处理和特征提取,每一步都需要工程化管道支撑。
多模态数据管道架构
graph TB
A[数据源] --> B[采集层
Ingestion] B --> C[预处理层
Preprocessing] C --> D[特征提取层
Feature Extraction] D --> E[存储层
Storage] E --> F[服务层
Serving] A --> A1[摄像头/麦克风
用户上传
爬虫采集] C --> C1[尺寸归一化
音频降噪
格式转换] D --> D1[Embedding生成
元数据提取
内容标注] E --> E1[对象存储(S3)
向量数据库
元数据库] style D fill:#ede7f6,stroke:#5e35b1,stroke-width:2px style E fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
Ingestion] B --> C[预处理层
Preprocessing] C --> D[特征提取层
Feature Extraction] D --> E[存储层
Storage] E --> F[服务层
Serving] A --> A1[摄像头/麦克风
用户上传
爬虫采集] C --> C1[尺寸归一化
音频降噪
格式转换] D --> D1[Embedding生成
元数据提取
内容标注] E --> E1[对象存储(S3)
向量数据库
元数据库] style D fill:#ede7f6,stroke:#5e35b1,stroke-width:2px style E fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
多模态数据管道实现
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Callable, Any
import time
import hashlib
import json
class MediaType(Enum):
IMAGE = "image"
VIDEO = "video"
AUDIO = "audio"
DOCUMENT = "document"
@dataclass
class MediaFile:
"""媒体文件元数据"""
file_id: str
path: Path
media_type: MediaType
size_bytes: int
checksum: str
metadata: dict = field(default_factory=dict)
@classmethod
def from_path(cls, path: Path, media_type: MediaType) -> "MediaFile":
data = path.read_bytes()
checksum = hashlib.md5(data).hexdigest()
return cls(
file_id=f"{media_type.value}_{checksum[:12]}",
path=path,
media_type=media_type,
size_bytes=len(data),
checksum=checksum,
)
@dataclass
class PipelineStage:
"""管道阶段定义"""
name: str
processor: Callable[[MediaFile], MediaFile | None]
enabled: bool = True
timeout_sec: int = 30
@dataclass
class PipelineResult:
"""管道处理结果"""
file_id: str
success: bool
stages_completed: list[str]
stages_failed: list[str]
processing_time_ms: int
output_metadata: dict = field(default_factory=dict)
class MultimodalPipeline:
"""多模态数据处理管道"""
def __init__(self, name: str):
self.name = name
self.stages: list[PipelineStage] = []
self._results: list[PipelineResult] = []
def add_stage(self, stage: PipelineStage) -> "MultimodalPipeline":
"""添加处理阶段(链式调用)"""
self.stages.append(stage)
return self
def process(self, media_file: MediaFile) -> PipelineResult:
"""执行完整管道"""
start_ms = int(time.time() * 1000)
completed = []
failed = []
current_file = media_file
for stage in self.stages:
if not stage.enabled:
continue
try:
result = stage.processor(current_file)
if result is not None:
current_file = result
completed.append(stage.name)
except Exception as e:
print(f"[{stage.name}] 处理失败: {e}")
failed.append(stage.name)
break # 失败则停止管道
elapsed = int(time.time() * 1000) - start_ms
pipeline_result = PipelineResult(
file_id=media_file.file_id,
success=len(failed) == 0,
stages_completed=completed,
stages_failed=failed,
processing_time_ms=elapsed,
output_metadata=current_file.metadata,
)
self._results.append(pipeline_result)
return pipeline_result
def process_batch(self, files: list[MediaFile]) -> list[PipelineResult]:
"""批量处理"""
results = []
for i, f in enumerate(files):
print(f"[Pipeline] 处理 {i+1}/{len(files)}: {f.file_id}")
results.append(self.process(f))
return results
def stats(self) -> dict:
"""输出处理统计"""
total = len(self._results)
if total == 0:
return {"total": 0}
success = sum(1 for r in self._results if r.success)
avg_time = sum(r.processing_time_ms for r in self._results) / total
return {
"total": total,
"success": success,
"failed": total - success,
"success_rate": f"{success/total:.1%}",
"avg_time_ms": round(avg_time, 1),
}
# 常用预处理函数
def resize_image_stage(target_size: tuple[int, int] = (1024, 1024)) -> PipelineStage:
"""图像尺寸归一化阶段"""
def process(mf: MediaFile) -> MediaFile:
from PIL import Image
img = Image.open(mf.path)
img.thumbnail(target_size, Image.LANCZOS)
output_path = mf.path.with_suffix(".resized.jpg")
img.save(output_path, "JPEG", quality=85, optimize=True)
mf.metadata["original_size"] = f"{img.size[0]}x{img.size[1]}"
mf.metadata["resized"] = True
return mf
return PipelineStage(name="resize_image", processor=process)
def extract_image_metadata_stage() -> PipelineStage:
"""提取图像 EXIF 元数据"""
def process(mf: MediaFile) -> MediaFile:
from PIL import Image
from PIL.ExifTags import TAGS
img = Image.open(mf.path)
mf.metadata["format"] = img.format
mf.metadata["mode"] = img.mode
mf.metadata["size_wh"] = list(img.size)
exif_data = img._getexif() or {}
for tag_id, value in exif_data.items():
tag_name = TAGS.get(tag_id, tag_id)
if tag_name in ("DateTime", "Make", "Model"):
mf.metadata[f"exif_{tag_name}"] = str(value)
return mf
return PipelineStage(name="extract_metadata", processor=extract_image_metadata_stage)
def generate_embedding_stage(embedder=None) -> PipelineStage:
"""生成图像 Embedding"""
def process(mf: MediaFile) -> MediaFile:
if embedder and mf.media_type == MediaType.IMAGE:
embedding_record = embedder.embed_image(mf.path, mf.file_id)
mf.metadata["embedding_dim"] = len(embedding_record.embedding)
mf.metadata["embedding_generated"] = True
return mf
return PipelineStage(name="generate_embedding", processor=process)
# 构建图像处理管道示例
pipeline = (
MultimodalPipeline("电商图片处理管道")
.add_stage(resize_image_stage((1024, 1024)))
.add_stage(extract_image_metadata_stage())
.add_stage(generate_embedding_stage())
)
print(f"管道 '{pipeline.name}' 已配置 {len(pipeline.stages)} 个阶段")
print("阶段: " + " → ".join(s.name for s in pipeline.stages))
数据管道设计原则
| 原则 | 说明 | 违反后果 |
|---|---|---|
| 幂等性 | 同一文件处理多次结果相同 | 数据重复/不一致 |
| 可观测性 | 每阶段记录耗时和状态 | 故障难以定位 |
| 断点续传 | 失败后从中断阶段重启 | 大批量数据丢失进度 |
| Schema 校验 | 每阶段输出格式有验证 | 下游静默失败 |
| 成本追踪 | 记录 API 调用次数和费用 | 月账单超预算 |
本章小结
- 管道要分层设计——采集、预处理、特征提取、存储各自解耦
- MediaFile 作为统一数据容器——在各阶段间传递状态和元数据
- 批量处理要有进度日志——大批量任务必须知道当前进度
- 预处理阶段先做校验——无效文件要在管道入口就拦截
- Embedding 生成放在最后——最耗时的操作留给末尾,减少浪费
下一章:视觉Agent与高级应用