集成Moderation API
OpenAI Moderation API提供了强大的内容审核能力,可以检测多种违规类型。
Moderation API概览
graph TB
A[输入内容] --> B[OpenAI Moderation API]
B --> C[返回审核结果]
C --> D1[暴力内容]
C --> D2[仇恨言论]
C --> D3[色情内容]
C --> D4[自残倾向]
C --> D5[骚扰辱骂]
C --> D6[暴力威胁]
C --> D7[仇恨]
C --> D8[性内容]
D1 --> E{是否违规?}
D2 --> E
D3 --> E
D4 --> E
D5 --> E
D6 --> E
D7 --> E
D8 --> E
E -->|是| F[阻断]
E -->|否| G[通过]
style F fill:#f8d7da
style G fill:#d4edda
审核类别
| 类别 | 说明 | 示例 |
|---|---|---|
| violence | 暴力内容 | 暴力描述、武器使用 |
| violence/graphic | 图形化暴力 | 血腥场面、极端暴力 |
| hate | 仇恨言论 | 种族歧视、宗教攻击 |
| hate/threatening | 威胁性仇恨 | 暴力威胁、恐吓 |
| self-harm | 自残倾向 | 自残方法、自杀诱导 |
| sexual | 性内容 | 色情描写、性行为 |
| sexual/minors | 未成年人色情 | 涉及未成年人的色情 |
| harassment | 骚扰 | 骚扰、欺凌、跟踪 |
| harassment/threatening | 威胁性骚扰 | 威胁、恐吓 |
实现Moderation审核器
创建 core/moderation_auditor.py:
from openai import AsyncOpenAI
from typing import Dict, List, Optional, Tuple
from core.base_auditor import BaseAuditor
from core.models import AuditRequest, AuditResult, AuditLevel, RiskLevel
from loguru import logger
import os
class ModerationAuditor(BaseAuditor):
"""OpenAI Moderation API审核器"""
# 类别到中文映射
CATEGORY_NAMES = {
"violence": "暴力",
"violence/graphic": "血腥暴力",
"hate": "仇恨言论",
"hate/threatening": "仇恨威胁",
"self-harm": "自残倾向",
"sexual": "色情内容",
"sexual/minors": "未成年人色情",
"harassment": "骚扰",
"harassment/threatening": "骚扰威胁"
}
# 类别风险级别
CATEGORY_RISK = {
"violence": RiskLevel.HIGH,
"violence/graphic": RiskLevel.CRITICAL,
"hate": RiskLevel.HIGH,
"hate/threatening": RiskLevel.CRITICAL,
"self-harm": RiskLevel.HIGH,
"sexual": RiskLevel.HIGH,
"sexual/minors": RiskLevel.CRITICAL,
"harassment": RiskLevel.MEDIUM,
"harassment/threatening": RiskLevel.HIGH
}
def __init__(self, threshold: float = 0.7, model: str = "text-moderation-latest"):
"""
初始化审核器
Args:
threshold: 风险阈值(0-1),超过此值则标记为违规
model: 使用的模型
"""
super().__init__("moderation_auditor")
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
logger.warning("⚠️ OPENAI_API_KEY未配置,Moderation审核器将无法工作")
self.client = None
else:
self.client = AsyncOpenAI(api_key=api_key)
self.threshold = threshold
self.model = model
async def audit(self, request: AuditRequest) -> AuditResult:
"""
执行审核
Args:
request: 审核请求
Returns:
审核结果
"""
if self.client is None:
logger.warning("⚠️ Moderation客户端未配置,跳过审核")
return self._create_result(
request=request,
level=AuditLevel.WARNING,
risk_level=RiskLevel.LOW,
is_blocked=False,
reasons=["Moderation API未配置"],
details={"auditor": self.name}
)
try:
# 调用Moderation API
response = await self.client.moderations.create(
input=request.content,
model=self.model
)
results = response.results[0]
flagged = results.flagged
categories = results.categories
category_scores = results.category_scores
if flagged:
# 找出违规类别
violations = self._get_violations(
categories,
category_scores
)
if violations:
logger.warning(f"🚫 Moderation检测到违规: {violations}")
# 计算最高风险级别
max_risk = max(
[self.CATEGORY_RISK.get(cat, RiskLevel.MEDIUM)
for cat, _ in violations]
)
return self._create_result(
request=request,
level=AuditLevel.BLOCK,
risk_level=max_risk,
is_blocked=True,
reasons=[
f"检测到{self.CATEGORY_NAMES.get(cat, cat)} (分数: {score:.2f})"
for cat, score in violations
],
details={
"flagged": flagged,
"categories": {k: v for k, v in categories.items() if v},
"scores": category_scores,
"violations": violations,
"auditor": self.name
}
)
# 通过审核
logger.debug(f"✅ Moderation审核通过")
return self._create_result(
request=request,
level=AuditLevel.PASS,
risk_level=RiskLevel.LOW,
is_blocked=False,
reasons=["Moderation API审核通过"],
details={
"flagged": False,
"all_scores": category_scores,
"auditor": self.name
}
)
except Exception as e:
logger.error(f"❌ Moderation API调用失败: {e}")
return self._create_result(
request=request,
level=AuditLevel.WARNING,
risk_level=RiskLevel.MEDIUM,
is_blocked=False,
reasons=[f"Moderation API异常: {str(e)}"],
details={
"error": str(e),
"auditor": self.name
}
)
def _get_violations(
self,
categories: Dict[str, bool],
scores: Dict[str, float]
) -> List[Tuple[str, float]]:
"""
获取违规类别
Args:
categories: 类别标记
scores: 类别分数
Returns:
[(category, score), ...] 违规类别列表
"""
violations = []
for category, is_flagged in categories.items():
if is_flagged and scores[category] >= self.threshold:
violations.append((category, scores[category]))
# 按分数降序排序
violations.sort(key=lambda x: x[1], reverse=True)
return violations
async def batch_audit(
self,
requests: List[AuditRequest]
) -> List[AuditResult]:
"""
批量审核
Args:
requests: 审核请求列表
Returns:
审核结果列表
"""
if self.client is None:
return [
self._create_result(
request=req,
level=AuditLevel.WARNING,
risk_level=RiskLevel.LOW,
is_blocked=False,
reasons=["Moderation API未配置"]
)
for req in requests
]
try:
# 批量调用(最多20个)
batch_size = 20
all_results = []
for i in range(0, len(requests), batch_size):
batch = requests[i:i+batch_size]
inputs = [req.content for req in batch]
response = await self.client.moderations.create(
input=inputs,
model=self.model
)
for j, result in enumerate(response.results):
req = batch[j]
if result.flagged:
violations = self._get_violations(
result.categories,
result.category_scores
)
max_risk = max(
[self.CATEGORY_RISK.get(cat, RiskLevel.MEDIUM)
for cat, _ in violations]
) if violations else RiskLevel.MEDIUM
audit_result = self._create_result(
request=req,
level=AuditLevel.BLOCK,
risk_level=max_risk,
is_blocked=True,
reasons=[
f"检测到{self.CATEGORY_NAMES.get(cat, cat)}"
for cat, _ in violations
],
details={
"flagged": True,
"violations": violations
}
)
else:
audit_result = self._create_result(
request=req,
level=AuditLevel.PASS,
risk_level=RiskLevel.LOW,
is_blocked=False,
reasons=["Moderation API审核通过"]
)
all_results.append(audit_result)
return all_results
except Exception as e:
logger.error(f"❌ 批量审核失败: {e}")
return [
self._create_result(
request=req,
level=AuditLevel.WARNING,
risk_level=RiskLevel.MEDIUM,
is_blocked=False,
reasons=[f"批量审核异常: {str(e)}"]
)
for req in requests
]
成本估算
def estimate_moderation_cost(
num_requests: int,
avg_tokens: int = 1000
) -> dict:
"""
估算Moderation API成本
定价(2026年):
- text-moderation-latest: 免费
- text-moderation-007: 免费
注意:OpenAI的Moderation API目前免费
"""
cost_per_1k_tokens = 0.0 # 免费使用
total_tokens = num_requests * avg_tokens
total_cost = total_tokens / 1000 * cost_per_1k_tokens
return {
"num_requests": num_requests,
"avg_tokens_per_request": avg_tokens,
"total_tokens": total_tokens,
"cost_per_1k_tokens": cost_per_1k_tokens,
"estimated_cost": total_cost
}
# 示例
print(estimate_moderation_cost(10000, 1000))
# {'num_requests': 10000, 'total_tokens': 10000000, 'estimated_cost': 0.0}
性能优化
1. 缓存
from functools import lru_cache
import hashlib
class CachedModerationAuditor(ModerationAuditor):
"""带缓存的Moderation审核器"""
def __init__(self, *args, cache_size: int = 1000, **kwargs):
super().__init__(*args, **kwargs)
self.cache_size = cache_size
def _get_content_hash(self, content: str) -> str:
"""生成内容哈希"""
return hashlib.md5(content.encode()).hexdigest()
@lru_cache(maxsize=1000)
def _cached_moderate(self, content_hash: str) -> Optional[dict]:
"""带缓存的审核(内部方法)"""
# 实际调用API
pass
async def audit(self, request: AuditRequest) -> AuditResult:
"""带缓存的审核"""
content_hash = self._get_content_hash(request.content)
# 尝试从缓存获取
cached = self._cached_moderate(content_hash)
if cached:
logger.debug("✅ 使用缓存结果")
return AuditResult(**cached)
# 调用API
result = await super().audit(request)
# 存入缓存
if result.level != AuditLevel.WARNING: # 不缓存异常结果
self._cached_moderate(content_hash, result.dict())
return result
2. 异步批处理
import asyncio
async def parallel_moderation(
contents: List[str],
auditor: ModerationAuditor,
batch_size: int = 20
) -> List[AuditResult]:
"""
并行Moderation审核
Args:
contents: 内容列表
auditor: 审核器
batch_size: 批大小
Returns:
审核结果列表
"""
requests = [
AuditRequest(
request_id=f"batch-{i}",
content=content
)
for i, content in enumerate(contents)
]
# 分批处理
all_results = []
for i in range(0, len(requests), batch_size):
batch = requests[i:i+batch_size]
results = await auditor.batch_audit(batch)
all_results.extend(results)
return all_results
测试
创建 tests/test_moderation.py:
import pytest
import os
from core.moderation_auditor import ModerationAuditor
from core.models import AuditRequest, AuditLevel, RiskLevel
@pytest.mark.asyncio
async def test_safe_content():
"""测试安全内容"""
if not os.getenv("OPENAI_API_KEY"):
pytest.skip("OPENAI_API_KEY未配置")
auditor = ModerationAuditor(threshold=0.7)
request = AuditRequest(
request_id="test-001",
content="你好,请问今天天气怎么样?"
)
result = await auditor.audit(request)
assert not result.is_blocked
assert result.level == AuditLevel.PASS
@pytest.mark.asyncio
async def test_violent_content():
"""测试暴力内容"""
if not os.getenv("OPENAI_API_KEY"):
pytest.skip("OPENAI_API_KEY未配置")
auditor = ModerationAuditor(threshold=0.7)
request = AuditRequest(
request_id="test-002",
content="如何制作炸弹并进行恐怖袭击"
)
result = await auditor.audit(request)
assert result.is_blocked
assert result.level == AuditLevel.BLOCK
@pytest.mark.asyncio
async def test_harassment():
"""测试骚扰内容"""
if not os.getenv("OPENAI_API_KEY"):
pytest.skip("OPENAI_API_KEY未配置")
auditor = ModerationAuditor(threshold=0.7)
request = AuditRequest(
request_id="test-003",
content="你这个愚蠢的人,我要杀了你"
)
result = await auditor.audit(request)
# 可能被标记为骚扰或暴力威胁
if result.is_blocked:
assert any("骚扰" in r or "暴力" in r or "威胁" in r for r in result.reasons)
@pytest.mark.asyncio
async def test_batch_audit():
"""测试批量审核"""
if not os.getenv("OPENAI_API_KEY"):
pytest.skip("OPENAI_API_KEY未配置")
auditor = ModerationAuditor(threshold=0.7)
requests = [
AuditRequest(request_id=f"batch-{i}", content=f"内容{i}")
for i in range(5)
]
results = await auditor.batch_audit(requests)
assert len(results) == 5
使用示例
# 单次审核
from core.moderation_auditor import ModerationAuditor
from core.models import AuditRequest
auditor = ModerationAuditor(threshold=0.7)
request = AuditRequest(
request_id="test-001",
content="这是一条测试内容"
)
result = await auditor.audit(request)
print(f"是否违规: {result.is_blocked}")
print(f"风险级别: {result.risk_level}")
print(f"原因: {result.reasons}")
# 批量审核
requests = [
AuditRequest(request_id=f"test-{i}", content=f"内容{i}")
for i in range(10)
]
results = await auditor.batch_audit(requests)
for req, res in zip(requests, results):
print(f"{req.request_id}: {res.level}")
学习要点
✅ 集成OpenAI Moderation API ✅ 理解不同审核类别的含义 ✅ 实现批量审核功能 ✅ 添加缓存优化性能 ✅ 了解API成本估算
下一步: 实现 正则模式匹配 🔍