治理框架结构设计
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read192 words

治理框架结构设计

在本章中,我们将设计并实现LLM治理平台的核心框架结构。

治理架构概览

graph TB subgraph "接入层" A1[API Gateway] A2[认证中间件] A3[限流中间件] end subgraph "治理层" B1[内容审核器] B2[安全扫描器] B3[规则引擎] B4[监控器] end subgraph "服务层" C1[LLM服务] C2[审计服务] C3[告警服务] end subgraph "数据层" D1[向量数据库] D2[时序数据库] D3[审计日志] D4[规则存储] end subgraph "展示层" E1[管理后台] E2[监控仪表盘] end A1 --> B1 A2 --> B2 A3 --> B3 B1 --> C1 B2 --> C1 B3 --> C1 B4 --> C2 C1 --> C2 C2 --> C3 C1 --> D1 B1 --> D4 B2 --> D4 B4 --> D2 C2 --> D3 E1 --> B3 E2 --> B4 style A1 fill:#e1f5ff style E2 fill:#d4edda

数据模型设计

核心数据模型

创建 core/models.py

from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime
from enum import Enum
class AuditLevel(str, Enum):
"""审核级别"""
PASS = "pass"
WARNING = "warning"
BLOCK = "block"
class RiskLevel(str, Enum):
"""风险级别"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AuditRequest(BaseModel):
"""审核请求"""
request_id: str = Field(..., description="请求ID")
user_id: Optional[str] = Field(None, description="用户ID")
content: str = Field(..., description="待审核内容")
content_type: str = Field(default="text", description="内容类型")
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="元数据")
class AuditResult(BaseModel):
"""审核结果"""
request_id: str
level: AuditLevel
risk_level: RiskLevel
is_blocked: bool
reasons: List[str]
details: Optional[Dict[str, Any]] = None
timestamp: datetime = Field(default_factory=datetime.now)
class SecurityScanRequest(BaseModel):
"""安全扫描请求"""
request_id: str
prompt: str
context: Optional[Dict[str, Any]] = None
class SecurityScanResult(BaseModel):
"""安全扫描结果"""
request_id: str
is_safe: bool
attack_type: Optional[str] = None
confidence: float
mitigation: Optional[str] = None
timestamp: datetime = Field(default_factory=datetime.now)
class Rule(BaseModel):
"""规则"""
rule_id: str
name: str
description: str
type: str  # keyword, regex, pattern, ai
pattern: Optional[str] = None
action: str  # allow, block, warn, modify
priority: int = Field(default=0, description="优先级,数字越大越高")
enabled: bool = True
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
class LLMRequest(BaseModel):
"""LLM请求"""
request_id: str
user_id: Optional[str]
prompt: str
model: str
max_tokens: int = 1000
temperature: float = 0.7
audit_results: Optional[List[AuditResult]] = None
security_results: Optional[SecurityScanResult] = None
class LLMResponse(BaseModel):
"""LLM响应"""
request_id: str
response_id: str
content: str
model: str
tokens_used: int
latency_ms: int
audit_results: Optional[AuditResult] = None
timestamp: datetime = Field(default_factory=datetime.now)
class MetricData(BaseModel):
"""监控指标"""
timestamp: datetime
metric_name: str
metric_value: float
labels: Optional[Dict[str, str]] = None
class Alert(BaseModel):
"""告警"""
alert_id: str
severity: str
metric_name: str
threshold: float
current_value: float
message: str
timestamp: datetime = Field(default_factory=datetime.now)
resolved: bool = False

核心模块架构

模块依赖关系

graph LR A[BaseAuditor] --> B[KeywordFilter] A --> C[ModerationAPI] A --> D[RegexPattern] E[BaseSecurityScanner] --> F[PromptInjectionDetector] E --> G[JailbreakDetector] E --> H[OutputScanner] I[RuleEngine] --> A I --> E J[Monitor] --> K[MetricsCollector] J --> L[Logger] J --> M[AlertManager] style A fill:#e1f5ff style I fill:#fff3cd style J fill:#d4edda

基础审核器接口

创建 core/base_auditor.py

from abc import ABC, abstractmethod
from typing import List, Optional
from core.models import AuditRequest, AuditResult, AuditLevel, RiskLevel
class BaseAuditor(ABC):
"""审核器基类"""
def __init__(self, name: str):
self.name = name
@abstractmethod
async def audit(self, request: AuditRequest) -> AuditResult:
"""
执行审核
Args:
request: 审核请求
Returns:
审核结果
"""
pass
def _create_result(
self,
request: AuditRequest,
level: AuditLevel,
risk_level: RiskLevel,
is_blocked: bool,
reasons: List[str],
details: Optional[dict] = None
) -> AuditResult:
"""创建审核结果"""
return AuditResult(
request_id=request.request_id,
level=level,
risk_level=risk_level,
is_blocked=is_blocked,
reasons=reasons,
details=details or {}
)
async def _safe_audit(self, request: AuditRequest) -> AuditResult:
"""安全审核包装器,捕获异常"""
try:
return await self.audit(request)
except Exception as e:
return self._create_result(
request=request,
level=AuditLevel.WARNING,
risk_level=RiskLevel.MEDIUM,
is_blocked=False,
reasons=[f"审核器异常: {str(e)}"],
details={"error": str(e), "auditor": self.name}
)

基础安全扫描器接口

创建 core/base_security_scanner.py

from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
from core.models import SecurityScanRequest, SecurityScanResult
class BaseSecurityScanner(ABC):
"""安全扫描器基类"""
def __init__(self, name: str):
self.name = name
@abstractmethod
async def scan(self, request: SecurityScanRequest) -> SecurityScanResult:
"""
执行安全扫描
Args:
request: 扫描请求
Returns:
扫描结果
"""
pass
def _create_result(
self,
request: SecurityScanRequest,
is_safe: bool,
attack_type: Optional[str],
confidence: float,
mitigation: Optional[str] = None
) -> SecurityScanResult:
"""创建扫描结果"""
return SecurityScanResult(
request_id=request.request_id,
is_safe=is_safe,
attack_type=attack_type,
confidence=confidence,
mitigation=mitigation
)
async def _safe_scan(self, request: SecurityScanRequest) -> SecurityScanResult:
"""安全扫描包装器,捕获异常"""
try:
return await self.scan(request)
except Exception as e:
return self._create_result(
request=request,
is_safe=False,
attack_type="scanner_error",
confidence=1.0,
mitigation=f"扫描器异常: {str(e)}"
)

规则引擎基础

创建 core/rule_engine.py

from typing import List, Optional, Dict, Any
from core.models import Rule, AuditRequest, AuditResult
from loguru import logger
import json
from pathlib import Path
from config import settings
class RuleEngine:
"""规则引擎"""
def __init__(self):
self.rules: List[Rule] = []
self.rules_file = Path(settings.rules_file)
self._load_rules()
def _load_rules(self):
"""加载规则"""
if self.rules_file.exists():
with open(self.rules_file, "r", encoding="utf-8") as f:
data = json.load(f)
self.rules = [Rule(**rule) for rule in data.get("rules", [])]
logger.info(f"✅ 加载了 {len(self.rules)} 条规则")
else:
logger.info("⚠️  规则文件不存在,使用空规则集")
self.rules = []
self._save_default_rules()
def _save_default_rules(self):
"""保存默认规则"""
default_rules = {
"rules": [
{
"rule_id": "default_block_violence",
"name": "阻止暴力内容",
"description": "检测并阻止暴力相关关键词",
"type": "keyword",
"pattern": "暴力,杀戮,恐怖,袭击",
"action": "block",
"priority": 10,
"enabled": True
},
{
"rule_id": "default_block_harassment",
"name": "阻止骚扰内容",
"description": "检测并阻止骚扰相关关键词",
"type": "keyword",
"pattern": "骚扰,辱骂,威胁,恐吓",
"action": "block",
"priority": 10,
"enabled": True
}
]
}
self.rules_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.rules_file, "w", encoding="utf-8") as f:
json.dump(default_rules, f, ensure_ascii=False, indent=2)
logger.info("✅ 创建了默认规则文件")
def add_rule(self, rule: Rule):
"""添加规则"""
self.rules.append(rule)
self._save_rules()
logger.info(f"✅ 添加规则: {rule.name}")
def remove_rule(self, rule_id: str) -> bool:
"""删除规则"""
original_count = len(self.rules)
self.rules = [r for r in self.rules if r.rule_id != rule_id]
if len(self.rules) < original_count:
self._save_rules()
logger.info(f"✅ 删除规则: {rule_id}")
return True
return False
def update_rule(self, rule_id: str, updates: Dict[str, Any]) -> bool:
"""更新规则"""
for rule in self.rules:
if rule.rule_id == rule_id:
for key, value in updates.items():
if hasattr(rule, key):
setattr(rule, key, value)
self._save_rules()
logger.info(f"✅ 更新规则: {rule_id}")
return True
return False
def get_enabled_rules(self) -> List[Rule]:
"""获取启用的规则(按优先级排序)"""
return sorted(
[r for r in self.rules if r.enabled],
key=lambda x: x.priority,
reverse=True
)
def _save_rules(self):
"""保存规则到文件"""
data = {
"rules": [rule.dict() for rule in self.rules]
}
with open(self.rules_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
async def evaluate(self, request: AuditRequest) -> List[Rule]:
"""
评估请求,返回匹配的规则
Args:
request: 审核请求
Returns:
匹配的规则列表
"""
matched_rules = []
enabled_rules = self.get_enabled_rules()
for rule in enabled_rules:
if self._rule_matches(rule, request):
matched_rules.append(rule)
logger.debug(f"规则匹配: {rule.name}")
return matched_rules
def _rule_matches(self, rule: Rule, request: AuditRequest) -> bool:
"""
检查规则是否匹配
Args:
rule: 规则
request: 审核请求
Returns:
是否匹配
"""
content = request.content.lower()
if rule.type == "keyword" and rule.pattern:
keywords = rule.pattern.split(",")
return any(kw.strip().lower() in content for kw in keywords)
elif rule.type == "regex" and rule.pattern:
import re
try:
return bool(re.search(rule.pattern, content, re.IGNORECASE))
except re.error:
logger.error(f"正则表达式错误: {rule.pattern}")
return False
return False

监控器基础

创建 core/monitor.py

from typing import Dict, Optional
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from loguru import logger
from config import settings
class Monitor:
"""监控器"""
def __init__(self):
# 请求计数器
self.request_total = Counter(
'llm_requests_total',
'Total LLM requests',
['model', 'status']
)
# 审核计数器
self.audit_total = Counter(
'llm_audits_total',
'Total audits',
['auditor', 'level']
)
# 阻塞计数器
self.block_total = Counter(
'llm_blocks_total',
'Total blocked requests',
['reason']
)
# 延迟直方图
self.request_latency = Histogram(
'llm_request_latency_seconds',
'Request latency',
['model']
)
# Token使用
self.token_usage = Counter(
'llm_tokens_total',
'Total tokens used',
['model', 'type']  # input, output
)
# 当前活跃请求数
self.active_requests = Gauge(
'llm_active_requests',
'Current active requests'
)
def record_request(self, model: str, status: str):
"""记录请求"""
self.request_total.labels(model=model, status=status).inc()
def record_audit(self, auditor: str, level: str):
"""记录审核"""
self.audit_total.labels(auditor=auditor, level=level).inc()
def record_block(self, reason: str):
"""记录阻塞"""
self.block_total.labels(reason=reason).inc()
def record_latency(self, model: str, latency: float):
"""记录延迟(秒)"""
self.request_latency.labels(model=model).observe(latency)
def record_tokens(self, model: str, token_type: str, count: int):
"""记录Token使用"""
self.token_usage.labels(model=model, type=token_type).inc(count)
def increment_active(self):
"""增加活跃请求数"""
self.active_requests.inc()
def decrement_active(self):
"""减少活跃请求数"""
self.active_requests.dec()
def start_server(self, port: Optional[int] = None):
"""启动Prometheus服务器"""
port = port or settings.prometheus_port
try:
start_http_server(port)
logger.success(f"✅ Prometheus监控服务启动在端口 {port}")
except OSError as e:
logger.error(f"❌ 无法启动Prometheus服务: {e}")
logger.info(f"   端口 {port} 可能已被占用")
# 全局监控实例
monitor = Monitor()

创建测试

创建 tests/test_framework.py

import pytest
from core.models import AuditRequest, AuditResult, AuditLevel, RiskLevel
from core.base_auditor import BaseAuditor
from core.rule_engine import RuleEngine
from core.monitor import Monitor
class MockAuditor(BaseAuditor):
"""模拟审核器"""
async def audit(self, request: AuditRequest) -> AuditResult:
content_lower = request.content.lower()
if "暴力" in content_lower:
return self._create_result(
request=request,
level=AuditLevel.BLOCK,
risk_level=RiskLevel.HIGH,
is_blocked=True,
reasons=["包含暴力内容"]
)
return self._create_result(
request=request,
level=AuditLevel.PASS,
risk_level=RiskLevel.LOW,
is_blocked=False,
reasons=["通过"]
)
@pytest.mark.asyncio
async def test_auditor():
"""测试审核器"""
auditor = MockAuditor("test")
request = AuditRequest(
request_id="test-001",
content="这是一条正常内容"
)
result = await auditor.audit(request)
assert result.level == AuditLevel.PASS
assert result.is_blocked == False
@pytest.mark.asyncio
async def test_auditor_block():
"""测试审核器阻塞"""
auditor = MockAuditor("test")
request = AuditRequest(
request_id="test-002",
content="包含暴力内容"
)
result = await auditor.audit(request)
assert result.level == AuditLevel.BLOCK
assert result.is_blocked == True
def test_rule_engine():
"""测试规则引擎"""
engine = RuleEngine()
rules = engine.get_enabled_rules()
assert len(rules) > 0
def test_monitor():
"""测试监控器"""
monitor = Monitor()
monitor.record_request("gpt-4", "success")
monitor.record_audit("keyword", "pass")
monitor.record_block("violence")
monitor.record_tokens("gpt-4", "input", 100)
assert monitor.active_requests._value.get() == 0

学习要点

✅ 设计了完整的数据模型体系 ✅ 实现了基础审核器接口 ✅ 实现了基础安全扫描器接口 ✅ 实现了规则引擎核心功能 ✅ 集成了Prometheus监控


下一步: 创建第一个治理示例 Hello Governance 👋