规则引擎设计与实现
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read157 words

规则引擎设计与实现

规则引擎是LLM治理的核心组件,支持动态配置和管理审核规则。

规则引擎架构

graph TB A[规则引擎] --> B[规则存储] A --> C[规则解析器] A --> D[规则执行器] A --> E[规则管理API] B --> B1[JSON文件] B --> B2[数据库] C --> C1[类型解析] C --> C2[优先级解析] C --> C3[条件解析] D --> D1[匹配引擎] D --> D2[动作执行] D --> D3[链式调用] E --> E1[CRUD操作] E --> E2[热更新] E --> E3[规则测试] style A fill:#d4edda

规则类型

graph LR A[规则类型] --> B[关键词规则] A --> C[正则规则] A --> D[模式规则] A --> E[组合规则] A --> F[条件规则] B --> B1[精确匹配] B --> B2[包含匹配] C --> C1[正则表达式] D --> D1[结构模式] E --> E1[AND逻辑] E --> E2[OR逻辑] E --> E3[NOT逻辑] F --> F1[大于] F --> F2[小于] F --> F3[等于] style A fill:#e1f5ff

完整规则引擎实现

创建 core/advanced_rule_engine.py

import re
import json
from typing import List, Dict, Any, Optional, Union
from datetime import datetime
from pathlib import Path
from enum import Enum
from loguru import logger
from pydantic import BaseModel, Field, validator
from dataclasses import dataclass
class RuleType(str, Enum):
"""规则类型"""
KEYWORD = "keyword"
REGEX = "regex"
PATTERN = "pattern"
COMPOSITE = "composite"
CONDITION = "condition"
class RuleAction(str, Enum):
"""规则动作"""
ALLOW = "allow"
BLOCK = "block"
WARN = "warn"
MODIFY = "modify"
LOG = "log"
class RuleOperator(str, Enum):
"""规则操作符"""
AND = "and"
OR = "or"
NOT = "not"
class ConditionOperator(str, Enum):
"""条件操作符"""
GT = ">"
LT = "<"
GTE = ">="
LTE = "<="
EQ = "=="
NEQ = "!="
CONTAINS = "contains"
STARTS_WITH = "starts_with"
ENDS_WITH = "ends_with"
MATCHES = "matches"
class RuleDefinition(BaseModel):
"""规则定义"""
rule_id: str = Field(..., description="规则ID")
name: str = Field(..., description="规则名称")
description: str = Field(default="", description="规则描述")
type: RuleType = Field(..., description="规则类型")
pattern: Optional[str] = Field(None, description="模式(关键词、正则等)")
operator: Optional[RuleOperator] = Field(None, description="组合操作符")
sub_rules: Optional[List[str]] = Field(default_factory=list, description="子规则ID")
conditions: Optional[List[Dict[str, Any]]] = Field(default_factory=list, description="条件列表")
action: RuleAction = Field(default=RuleAction.BLOCK, description="执行动作")
priority: int = Field(default=0, description="优先级(0-100,数字越大越高)")
enabled: bool = Field(default=True, description="是否启用")
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
@validator('priority')
def validate_priority(cls, v):
if not 0 <= v <= 100:
raise ValueError('优先级必须在0-100之间')
return v
class Config:
use_enum_values = True
class RuleResult(BaseModel):
"""规则执行结果"""
rule_id: str
rule_name: str
matched: bool
action: Optional[RuleAction] = None
reason: Optional[str] = None
details: Optional[Dict[str, Any]] = None
execution_time_ms: float = 0
class AdvancedRuleEngine:
"""高级规则引擎"""
def __init__(self, rules_file: str = "./data/rules.json"):
self.rules_file = Path(rules_file)
self.rules: Dict[str, RuleDefinition] = {}
self._load_rules()
# 执行统计
self.stats = {
'total_evaluations': 0,
'total_matches': 0,
'rule_hits': {}
}
def _load_rules(self):
"""加载规则"""
if self.rules_file.exists():
with open(self.rules_file, 'r', encoding='utf-8') as f:
data = json.load(f)
rules_list = data.get('rules', [])
for rule_data in rules_list:
try:
rule = RuleDefinition(**rule_data)
self.rules[rule.rule_id] = rule
except Exception as e:
logger.error(f"❌ 加载规则失败: {rule_data.get('rule_id', 'unknown')}: {e}")
logger.info(f"✅ 加载了 {len(self.rules)} 条规则")
else:
# 创建默认规则
self._create_default_rules()
def _create_default_rules(self):
"""创建默认规则"""
default_rules = [
RuleDefinition(
rule_id="rule_block_violence",
name="阻止暴力内容",
description="检测并阻止包含暴力关键词的内容",
type=RuleType.KEYWORD,
pattern="暴力,杀戮,恐怖,袭击,屠杀,酷刑,殴打,残害,处决,暗杀",
action=RuleAction.BLOCK,
priority=90,
enabled=True
),
RuleDefinition(
rule_id="rule_block_harassment",
name="阻止骚扰内容",
description="检测并阻止包含骚扰关键词的内容",
type=RuleType.KEYWORD,
pattern="骚扰,辱骂,威胁,恐吓,诅咒,人渣,废物,垃圾",
action=RuleAction.BLOCK,
priority=85,
enabled=True
),
RuleDefinition(
rule_id="rule_warn_potential",
name="警告潜在风险",
description="对潜在风险内容进行警告",
type=RuleType.KEYWORD,
pattern="可能,或许,不确定",
action=RuleAction.WARN,
priority=30,
enabled=True
)
]
for rule in default_rules:
self.rules[rule.rule_id] = rule
self._save_rules()
logger.info(f"✅ 创建了 {len(default_rules)} 条默认规则")
def _save_rules(self):
"""保存规则"""
self.rules_file.parent.mkdir(parents=True, exist_ok=True)
data = {
'rules': [rule.dict() for rule in self.rules.values()],
'last_updated': datetime.now().isoformat()
}
with open(self.rules_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"✅ 保存了 {len(self.rules)} 条规则")
def add_rule(self, rule: RuleDefinition) -> bool:
"""添加规则"""
if rule.rule_id in self.rules:
logger.warning(f"⚠️  规则ID已存在: {rule.rule_id}")
return False
self.rules[rule.rule_id] = rule
self._save_rules()
logger.info(f"✅ 添加规则: {rule.name} ({rule.rule_id})")
return True
def update_rule(self, rule_id: str, updates: Dict[str, Any]) -> bool:
"""更新规则"""
if rule_id not in self.rules:
logger.warning(f"⚠️  规则不存在: {rule_id}")
return False
rule = self.rules[rule_id]
for key, value in updates.items():
if hasattr(rule, key) and key != 'rule_id':
setattr(rule, key, value)
rule.updated_at = datetime.now()
self._save_rules()
logger.info(f"✅ 更新规则: {rule_id}")
return True
def remove_rule(self, rule_id: str) -> bool:
"""删除规则"""
if rule_id not in self.rules:
logger.warning(f"⚠️  规则不存在: {rule_id}")
return False
rule = self.rules[rule_id]
del self.rules[rule_id]
# 更新依赖该规则的组合规则
for r in self.rules.values():
if r.sub_rules and rule_id in r.sub_rules:
r.sub_rules.remove(rule_id)
self._save_rules()
logger.info(f"✅ 删除规则: {rule_id}")
return True
def get_rule(self, rule_id: str) -> Optional[RuleDefinition]:
"""获取规则"""
return self.rules.get(rule_id)
def get_all_rules(self, enabled_only: bool = True) -> List[RuleDefinition]:
"""获取所有规则"""
rules = list(self.rules.values())
if enabled_only:
rules = [r for r in rules if r.enabled]
# 按优先级排序
return sorted(rules, key=lambda x: x.priority, reverse=True)
def enable_rule(self, rule_id: str) -> bool:
"""启用规则"""
if rule_id not in self.rules:
return False
self.rules[rule_id].enabled = True
self.rules[rule_id].updated_at = datetime.now()
self._save_rules()
logger.info(f"✅ 启用规则: {rule_id}")
return True
def disable_rule(self, rule_id: str) -> bool:
"""禁用规则规则"""
if rule_id not in self.rules:
return False
self.rules[rule_id].enabled = False
self.rules[rule_id].updated_at = datetime.now()
self._save_rules()
logger.info(f"✅ 禁用规则: {rule_id}")
return True
async def evaluate(self, content: str, context: Optional[Dict[str, Any]] = None) -> List[RuleResult]:
"""
评估内容
Args:
content: 待评估内容
context: 上下文信息
Returns:
匹配的规则结果列表
"""
self.stats['total_evaluations'] += 1
results = []
enabled_rules = self.get_all_rules(enabled_only=True)
for rule in enabled_rules:
result = await self._evaluate_rule(rule, content, context)
results.append(result)
if result.matched:
self.stats['total_matches'] += 1
# 更新命中统计
if rule.rule_id not in self.stats['rule_hits']:
self.stats['rule_hits'][rule.rule_id] = 0
self.stats['rule_hits'][rule.rule_id] += 1
return results
async def _evaluate_rule(
self,
rule: RuleDefinition,
content: str,
context: Optional[Dict[str, Any]] = None
) -> RuleResult:
"""评估单条规则"""
import time
start_time = time.time()
matched = False
reason = None
try:
if rule.type == RuleType.KEYWORD:
matched, reason = self._evaluate_keyword(rule, content)
elif rule.type == RuleType.REGEX:
matched, reason = self._evaluate_regex(rule, content)
elif rule.type == RuleType.PATTERN:
matched, reason = self._evaluate_pattern(rule, content, context)
elif rule.type == RuleType.COMPOSITE:
matched, reason = await self._evaluate_composite(rule, content, context)
elif rule.type == RuleType.CONDITION:
matched, reason = self._evaluate_condition(rule, context)
except Exception as e:
logger.error(f"❌ 规则评估异常: {rule.rule_id}: {e}")
matched = False
reason = f"规则评估异常: {str(e)}"
execution_time = (time.time() - start_time) * 1000
return RuleResult(
rule_id=rule.rule_id,
rule_name=rule.name,
matched=matched,
action=rule.action if matched else None,
reason=reason if matched else None,
details={
'rule_type': rule.type,
'priority': rule.priority
},
execution_time_ms=execution_time
)
def _evaluate_keyword(self, rule: RuleDefinition, content: str) -> tuple:
"""评估关键词规则"""
keywords = rule.pattern.split(',') if rule.pattern else []
content_lower = content.lower()
for keyword in keywords:
keyword = keyword.strip()
if keyword and keyword in content_lower:
return True, f"匹配关键词: {keyword}"
return False, None
def _evaluate_regex(self, rule: RuleDefinition, content: str) -> tuple:
"""评估正则规则"""
if not rule.pattern:
return False, None
try:
if re.search(rule.pattern, content, re.IGNORECASE):
return True, f"匹配正则模式: {rule.pattern[:50]}..."
except re.error as e:
logger.error(f"❌ 正则表达式错误: {rule.pattern}: {e}")
return False, None
def _evaluate_pattern(self, rule: RuleDefinition, content: str, context: Optional[Dict]) -> tuple:
"""评估模式规则"""
# 这里可以实现更复杂的模式匹配
# 例如:结构化模式、语义模式等
return False, None
async def _evaluate_composite(
self,
rule: RuleDefinition,
content: str,
context: Optional[Dict]
) -> tuple:
"""评估组合规则"""
if not rule.sub_rules or not rule.operator:
return False, None
# 评估所有子规则
sub_results = []
for sub_rule_id in rule.sub_rules:
if sub_rule_id not in self.rules:
continue
sub_rule = self.rules[sub_rule_id]
result = await self._evaluate_rule(sub_rule, content, context)
sub_results.append(result.matched)
if not sub_results:
return False, None
# 根据操作符计算结果
if rule.operator == RuleOperator.AND:
matched = all(sub_results)
reason = f"所有子规则匹配 (AND)"
elif rule.operator == RuleOperator.OR:
matched = any(sub_results)
reason = f"任一子规则匹配 (OR)"
elif rule.operator == RuleOperator.NOT:
matched = not any(sub_results)
reason = f"子规则都不匹配 (NOT)"
else:
return False, None
return matched, reason
def _evaluate_condition(self, rule: RuleDefinition, context: Optional[Dict]) -> tuple:
"""评估条件规则"""
if not context or not rule.conditions:
return False, None
# 评估所有条件
for condition in rule.conditions:
field = condition.get('field')
operator = condition.get('operator')
value = condition.get('value')
if field not in context:
continue
field_value = context[field]
if operator == ConditionOperator.GT:
if not (field_value > value):
return False, None
elif operator == ConditionOperator.LT:
if not (field_value < value):
return False, None
elif operator == ConditionOperator.EQ:
if field_value != value:
return False, None
elif operator == ConditionOperator.NEQ:
if field_value == value:
return False, None
elif operator == ConditionOperator.CONTAINS:
if value not in field_value:
return False, None
return True, "所有条件满足"
def get_stats(self) -> Dict:
"""获取统计信息"""
return {
'total_rules': len(self.rules),
'enabled_rules': len(self.get_all_rules(enabled_only=True)),
'evaluations': self.stats['total_evaluations'],
'matches': self.stats['total_matches'],
'match_rate': self.stats['total_matches'] / self.stats['total_evaluations'] if self.stats['total_evaluations'] > 0 else 0,
'rule_hits': self.stats['rule_hits']
}
def test_rules(self, content: str) -> Dict[str, Any]:
"""测试规则"""
import asyncio
async def run_test():
results = await self.evaluate(content)
return {
'content': content,
'total_rules': len(self.get_all_rules()),
'matched_rules': len([r for r in results if r.matched]),
'results': [
{
'rule_id': r.rule_id,
'rule_name': r.rule_name,
'matched': r.matched,
'action': r.action.value if r.action else None,
'reason': r.reason,
'execution_time_ms': r.execution_time_ms
}
for r in results
]
}
return asyncio.run(run_test())

使用示例

from core.advanced_rule_engine import AdvancedRuleEngine, RuleDefinition, RuleType, RuleAction
# 创建规则引擎
engine = AdvancedRuleEngine()
# 测试内容
test_content = "这是一条包含暴力内容的信息"
# 评估
results = engine.evaluate(test_content)
# 打印匹配结果
for result in results:
if result.matched:
print(f"规则: {result.rule_name}")
print(f"动作: {result.action}")
print(f"原因: {result.reason}")
# 添加新规则
new_rule = RuleDefinition(
rule_id="rule_test",
name="测试规则",
type=RuleType.KEYWORD,
pattern="测试,实验",
action=RuleAction.WARN,
priority=50
)
engine.add_rule(new_rule)
# 获取统计
stats = engine.get_stats()
print(f"统计: {stats}")
# 测试规则
test_result = engine.test_rules("这是测试内容")
print(f"测试结果: {test_result}")

学习要点

✅ 实现了完整的规则引擎 ✅ 支持多种规则类型(关键词、正则、组合、条件) ✅ 实现了规则的CRUD操作 ✅ 支持规则优先级和链式执行 ✅ 提供了规则测试功能 ✅ 实现了执行统计


下一步: 实现 动态规则配置 ⚙️