关键词过滤器
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read220 words

关键词过滤器

关键词过滤是最快速、成本最低的审核方法,适合拦截明显违规内容。

关键词过滤原理

graph LR A[输入文本] --> B[预处理] B --> C[分词] C --> D[标准化] D --> E[匹配检测] E --> F{匹配成功?} F -->|是| G[阻断/警告] F -->|否| H[通过] style G fill:#f8d7da style H fill:#d4edda

匹配策略

策略 说明 速度 准确率 误报率
精确匹配 完全相同 ⚡ 极快 ⭐⭐ 低 ⭐ 低
包含匹配 子串 ⚡ 快 ⭐⭐⭐ 中 ⭐⭐ 中
模糊匹配 相似度 🐢 中等 ⭐⭐⭐⭐ 高 ⭐⭐⭐ 中
正则匹配 模式 🐌 较慢 ⭐⭐⭐⭐⭐ 很高 ⭐⭐ 中

实现关键词过滤器

创建 core/keyword_filter.py

import re
from typing import List, Set, Dict, Tuple
from collections import defaultdict
from core.base_auditor import BaseAuditor
from core.models import AuditRequest, AuditResult, AuditLevel, RiskLevel
from loguru import logger
class KeywordAuditor(BaseAuditor):
"""关键词审核器"""
def __init__(
self,
match_type: str = "contains",  # exact, contains, fuzzy, regex
case_sensitive: bool = False,
block_on_match: bool = True
):
super().__init__("keyword_auditor")
self.match_type = match_type
self.case_sensitive = case_sensitive
self.block_on_match = block_on_match
# 关键词分类
self.keywords: Dict[str, Set[str]] = {
"violence": set(),
"harassment": set(),
"adult": set(),
"illegal": set(),
"spam": set()
}
self._load_keywords()
def _load_keywords(self):
"""加载默认关键词"""
# 暴力内容
self.keywords["violence"].update([
"暴力", "杀戮", "恐怖", "袭击", "屠杀",
"酷刑", "殴打", "残害", "处决", "暗杀"
])
# 骚扰辱骂
self.keywords["harassment"].update([
"骚扰", "辱骂", "威胁", "恐吓", "诅咒",
"人渣", "废物", "垃圾", "白痴", "傻瓜"
])
# 成人内容
self.keywords["adult"].update([
"色情", "淫秽", "裸露", "性交", "色诱",
"成人视频", "淫荡"
])
# 非法活动
self.keywords["illegal"].update([
"毒品", "走私", "黑客攻击", "洗钱", "诈骗",
"赌博", "制造炸弹", "购买枪支"
])
# 垃圾信息
self.keywords["spam"].update([
"免费领取", "点击领取", "中奖通知", "代开发票",
"套现", "办证"
])
logger.info(f"✅ 加载了 {sum(len(v) for v in self.keywords.values())} 个关键词")
async def audit(self, request: AuditRequest) -> AuditResult:
"""
执行关键词审核
Args:
request: 审核请求
Returns:
审核结果
"""
content = request.content
if not self.case_sensitive:
content = content.lower()
# 执行匹配
matches = self._find_matches(content)
if matches:
risk_level = self._calculate_risk_level(matches)
reasons = [f"检测到{cat}关键词: {kw}" for cat, kw in matches]
logger.warning(f"🚫 关键词匹配: {len(matches)}个")
return self._create_result(
request=request,
level=AuditLevel.BLOCK if self.block_on_match else AuditLevel.WARNING,
risk_level=risk_level,
is_blocked=self.block_on_match,
reasons=reasons,
details={
"matches": matches,
"match_type": self.match_type,
"auditor": self.name
}
)
logger.debug(f"✅ 关键词审核通过")
return self._create_result(
request=request,
level=AuditLevel.PASS,
risk_level=RiskLevel.LOW,
is_blocked=False,
reasons=["关键词审核通过"],
details={"auditor": self.name}
)
def _find_matches(self, content: str) -> List[Tuple[str, str]]:
"""
查找匹配的关键词
Returns:
[(category, keyword), ...]
"""
matches = []
if self.match_type == "exact":
matches = self._exact_match(content)
elif self.match_type == "contains":
matches = self._contains_match(content)
elif self.match_type == "fuzzy":
matches = self._fuzzy_match(content)
elif self.match_type == "regex":
matches = self._regex_match(content)
return matches
def _exact_match(self, content: str) -> List[Tuple[str, str]]:
"""精确匹配"""
matches = []
words = re.findall(r'\b\w+\b', content)  # 提取单词
for category, keywords in self.keywords.items():
for word in words:
if word in keywords:
matches.append((category, word))
return matches
def _contains_match(self, content: str) -> List[Tuple[str, str]]:
"""包含匹配"""
matches = []
for category, keywords in self.keywords.items():
for keyword in keywords:
if keyword in content:
matches.append((category, keyword))
return matches
def _fuzzy_match(self, content: str, threshold: float = 0.8) -> List[Tuple[str, str]]:
"""模糊匹配(使用Levenshtein距离)"""
matches = []
content_words = re.findall(r'\b\w+\b', content)
for category, keywords in self.keywords.items():
for keyword in keywords:
for word in content_words:
similarity = self._levenshtein_similarity(word, keyword)
if similarity >= threshold:
matches.append((category, keyword))
return matches
def _regex_match(self, content: str) -> List[Tuple[str, str]]:
"""正则匹配"""
matches = []
# 构建正则模式
patterns = {
"violence": r'暴力|杀戮|恐怖|袭击|屠杀|酷刑|殴打|残害|处决|暗杀',
"harassment": r'骚扰|辱骂|威胁|恐吓|诅咒|人渣|废物|垃圾|白痴|傻瓜',
"adult": r'色情|淫秽|裸露|性交|色诱|成人视频|淫荡',
"illegal": r'毒品|走私|黑客攻击|洗钱|诈骗|赌博|制造炸弹|购买枪支',
"spam": r'免费领取|点击领取|中奖通知|代开发票|套现|办证'
}
for category, pattern in patterns.items():
found = re.findall(pattern, content, re.IGNORECASE)
for match in found:
matches.append((category, match))
return matches
def _levenshtein_similarity(self, s1: str, s2: str) -> float:
"""计算Levenshtein相似度"""
if len(s1) == 0:
return 0.0 if len(s2) == 0 else 0.0
if len(s2) == 0:
return 0.0
# 编辑距离
matrix = [[0 for _ in range(len(s2) + 1)] for _ in range(len(s1) + 1)]
for i in range(len(s1) + 1):
matrix[i][0] = i
for j in range(len(s2) + 1):
matrix[0][j] = j
for i in range(1, len(s1) + 1):
for j in range(1, len(s2) + 1):
cost = 0 if s1[i-1] == s2[j-1] else 1
matrix[i][j] = min(
matrix[i-1][j] + 1,  # 删除
matrix[i][j-1] + 1,  # 插入
matrix[i-1][j-1] + cost  # 替换
)
distance = matrix[len(s1)][len(s2)]
max_len = max(len(s1), len(s2))
return 1.0 - (distance / max_len)
def _calculate_risk_level(self, matches: List[Tuple[str, str]]) -> RiskLevel:
"""计算风险级别"""
if len(matches) >= 5:
return RiskLevel.CRITICAL
elif len(matches) >= 3:
return RiskLevel.HIGH
elif len(matches) >= 2:
return RiskLevel.MEDIUM
else:
return RiskLevel.LOW
def add_keyword(self, category: str, keyword: str):
"""添加关键词"""
if category not in self.keywords:
self.keywords[category] = set()
self.keywords[category].add(keyword)
logger.info(f"✅ 添加关键词: [{category}] {keyword}")
def remove_keyword(self, category: str, keyword: str) -> bool:
"""删除关键词"""
if category in self.keywords and keyword in self.keywords[category]:
self.keywords[category].remove(keyword)
logger.info(f"✅ 删除关键词: [{category}] {keyword}")
return True
return False
def get_stats(self) -> Dict[str, int]:
"""获取统计信息"""
return {
category: len(keywords)
for category, keywords in self.keywords.items()
}

关键词管理工具

创建 tools/keyword_manager.py

import json
from pathlib import Path
from typing import Dict, List
from loguru import logger
class KeywordManager:
"""关键词管理器"""
def __init__(self, file_path: str = "./data/keywords.json"):
self.file_path = Path(file_path)
self.keywords: Dict[str, List[str]] = {}
self._load()
def _load(self):
"""加载关键词"""
if self.file_path.exists():
with open(self.file_path, "r", encoding="utf-8") as f:
self.keywords = json.load(f)
logger.info(f"✅ 从文件加载关键词: {self.file_path}")
else:
self.keywords = {
"violence": [],
"harassment": [],
"adult": [],
"illegal": [],
"spam": []
}
self._save()
def _save(self):
"""保存关键词"""
self.file_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.file_path, "w", encoding="utf-8") as f:
json.dump(self.keywords, f, ensure_ascii=False, indent=2)
logger.info(f"✅ 保存关键词到文件: {self.file_path}")
def add_keyword(self, category: str, keyword: str):
"""添加关键词"""
if category not in self.keywords:
self.keywords[category] = []
if keyword not in self.keywords[category]:
self.keywords[category].append(keyword)
self._save()
logger.info(f"✅ 添加关键词: [{category}] {keyword}")
def remove_keyword(self, category: str, keyword: str) -> bool:
"""删除关键词"""
if category in self.keywords and keyword in self.keywords[category]:
self.keywords[category].remove(keyword)
self._save()
logger.info(f"✅ 删除关键词: [{category}] {keyword}")
return True
return False
def add_category(self, category: str):
"""添加分类"""
if category not in self.keywords:
self.keywords[category] = []
self._save()
logger.info(f"✅ 添加分类: {category}")
def remove_category(self, category: str) -> bool:
"""删除分类"""
if category in self.keywords:
del self.keywords[category]
self._save()
logger.info(f"✅ 删除分类: {category}")
return True
return False
def import_keywords(self, file_path: str, category: str):
"""从文件导入关键词"""
import_path = Path(file_path)
if not import_path.exists():
logger.error(f"❌ 文件不存在: {file_path}")
return
with open(import_path, "r", encoding="utf-8") as f:
keywords = [line.strip() for line in f if line.strip()]
for keyword in keywords:
self.add_keyword(category, keyword)
logger.info(f"✅ 导入了 {len(keywords)} 个关键词到 [{category}]")
def export_keywords(self, category: str, file_path: str):
"""导出关键词到文件"""
if category not in self.keywords:
logger.error(f"❌ 分类不存在: {category}")
return
export_path = Path(file_path)
export_path.parent.mkdir(parents=True, exist_ok=True)
with open(export_path, "w", encoding="utf-8") as f:
for keyword in self.keywords[category]:
f.write(keyword + "\n")
logger.info(f"✅ 导出了 {len(self.keywords[category])} 个关键词到 {file_path}")
def get_all(self) -> Dict[str, List[str]]:
"""获取所有关键词"""
return self.keywords
def get_category(self, category: str) -> List[str]:
"""获取指定分类的关键词"""
return self.keywords.get(category, [])
def get_stats(self) -> Dict[str, int]:
"""获取统计信息"""
return {
category: len(keywords)
for category, keywords in self.keywords.items()
}

CLI工具

创建 tools/keyword_cli.py

import argparse
from keyword_manager import KeywordManager
def main():
parser = argparse.ArgumentParser(description="关键词管理工具")
parser.add_argument("--file", default="./data/keywords.json", help="关键词文件路径")
subparsers = parser.add_subparsers(dest="command", help="命令")
# 添加关键词
add_parser = subparsers.add_parser("add", help="添加关键词")
add_parser.add_argument("--category", required=True, help="分类")
add_parser.add_argument("--keyword", required=True, help="关键词")
# 删除关键词
remove_parser = subparsers.add_parser("remove", help="删除关键词")
remove_parser.add_argument("--category", required=True, help="分类")
remove_parser.add_argument("--keyword", required=True, help="关键词")
# 列出关键词
list_parser = subparsers.add_parser("list", help="列出关键词")
list_parser.add_argument("--category", help="分类(可选)")
# 导入关键词
import_parser = subparsers.add_parser("import", help="导入关键词")
import_parser.add_argument("--category", required=True, help="目标分类")
import_parser.add_argument("--file", required=True, help="源文件路径")
# 导出关键词
export_parser = subparsers.add_parser("export", help="导出关键词")
export_parser.add_argument("--category", required=True, help="源分类")
export_parser.add_argument("--file", required=True, help="目标文件路径")
# 统计
stats_parser = subparsers.add_parser("stats", help="统计信息")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
manager = KeywordManager(args.file)
if args.command == "add":
manager.add_keyword(args.category, args.keyword)
print(f"✅ 添加关键词: [{args.category}] {args.keyword}")
elif args.command == "remove":
if manager.remove_keyword(args.category, args.keyword):
print(f"✅ 删除关键词: [{args.category}] {args.keyword}")
else:
print(f"❌ 关键词不存在: [{args.category}] {args.keyword}")
elif args.command == "list":
if args.category:
keywords = manager.get_category(args.category)
print(f"\n分类 [{args.category}] ({len(keywords)} 个关键词):")
for kw in keywords:
print(f"  - {kw}")
else:
all_keywords = manager.get_all()
print(f"\n所有分类:")
for category, keywords in all_keywords.items():
print(f"  [{category}] {len(keywords)} 个关键词")
elif args.command == "import":
manager.import_keywords(args.file, args.category)
elif args.command == "export":
manager.export_keywords(args.category, args.file)
elif args.command == "stats":
stats = manager.get_stats()
print("\n关键词统计:")
total = 0
for category, count in stats.items():
print(f"  [{category}] {count}")
total += count
print(f"\n总计: {total} 个关键词")
if __name__ == "__main__":
main()

测试

创建 tests/test_keyword_filter.py

import pytest
from core.keyword_filter import KeywordAuditor
from core.models import AuditRequest, AuditLevel, RiskLevel
@pytest.mark.asyncio
async def test_exact_match():
"""测试精确匹配"""
auditor = KeywordAuditor(match_type="exact")
request = AuditRequest(
request_id="test-001",
content="这是一条暴力内容"
)
result = await auditor.audit(request)
assert result.is_blocked == False  # "暴力"不是精确单词匹配
@pytest.mark.asyncio
async def test_contains_match():
"""测试包含匹配"""
auditor = KeywordAuditor(match_type="contains")
request = AuditRequest(
request_id="test-002",
content="这是一条暴力内容"
)
result = await auditor.audit(request)
assert result.is_blocked == True
assert "暴力" in str(result.reasons)
@pytest.mark.asyncio
async def test_multiple_keywords():
"""测试多个关键词"""
auditor = KeywordAuditor(match_type="contains")
request = AuditRequest(
request_id="test-003",
content="包含暴力和骚扰的内容"
)
result = await auditor.audit(request)
assert result.is_blocked == True
assert result.risk_level == RiskLevel.MEDIUM
@pytest.mark.asyncio
async def test_fuzzy_match():
"""测试模糊匹配"""
auditor = KeywordAuditor(match_type="fuzzy")
request = AuditRequest(
request_id="test-004",
content="这是梡力内容"  # 错别字
)
result = await auditor.audit(request)
# 可能检测到相似度
print(f"模糊匹配结果: {result}")
@pytest.mark.asyncio
async def test_regex_match():
"""测试正则匹配"""
auditor = KeywordAuditor(match_type="regex")
request = AuditRequest(
request_id="test-005",
content="这是一条恐怖袭击相关内容"
)
result = await auditor.audit(request)
assert result.is_blocked == True

使用示例

# Python API
from core.keyword_filter import KeywordAuditor
from core.models import AuditRequest
# 创建审核器
auditor = KeywordAuditor(match_type="contains")
# 添加自定义关键词
auditor.add_keyword("custom", "禁止词")
# 审核内容
request = AuditRequest(
request_id="test-001",
content="这是一条测试内容"
)
result = await auditor.audit(request)
print(f"是否阻断: {result.is_blocked}")
print(f"原因: {result.reasons}")
# 查看统计
stats = auditor.get_stats()
print(f"关键词统计: {stats}")

学习要点

✅ 实现了多种关键词匹配策略 ✅ 支持关键词分类管理 ✅ 实现了模糊匹配算法 ✅ 提供了CLI管理工具 ✅ 支持关键词导入导出


下一步: 集成 OpenAI Moderation API 🔌