平台架构设计
整合所有模块,构建完整的LLM治理平台。
平台整体架构
graph TB
subgraph "客户端层"
U1[Web界面]
U2[API客户端]
U3[管理后台]
end
subgraph "API网关层"
A1[API Gateway]
A2[认证中间件]
A3[限流中间件]
end
subgraph "治理层"
G1[内容审核器]
G2[安全扫描器]
G3[规则引擎]
end
subgraph "服务层"
S1[LLM服务]
S2[审计服务]
S3[告警服务]
S4[配置服务]
end
subgraph "数据层"
D1[向量数据库]
D2[时序数据库]
D3[审计日志]
D4[规则存储]
end
subgraph "监控层"
M1[指标收集]
M2[日志聚合]
M3[告警管理]
end
U1 --> A1
U2 --> A1
U3 --> A1
A1 --> A2
A2 --> A3
A3 --> G1
A3 --> G2
A3 --> G3
G1 --> S1
G2 --> S1
G3 --> S2
S1 --> D1
S2 --> D3
S3 --> D2
S4 --> D4
G1 --> M1
G2 --> M1
S1 --> M1
M1 --> M2
M1 --> M3
style U1 fill:#e1f5ff
style S1 fill:#fff3cd
style M3 fill:#d4edda
请求处理流程
sequenceDiagram
participant C as 客户端
participant GW as API网关
participant G as 治理服务
participant CA as 内容审核
participant SS as 安全扫描
participant RE as 规则引擎
participant LLM as LLM服务
participant M as 监控
C->>GW: POST /chat
GW->>GW: 认证 & 限流
GW->>G: 处理请求
G->>CA: 审核输入
CA-->>G: 审核结果
alt 输入违规
G->>M: 记录阻断
G-->>GW: 返回阻断
end
G->>SS: 安全扫描
SS-->>G: 扫描结果
alt 安全风险
G->>M: 记录风险
G-->>GW: 返回风险
end
G->>RE: 规则评估
RE-->>G: 匹配规则
alt 规则阻断
G->>M: 记录阻断
G-->>GW: 返回阻断
end
G->>LLM: 生成内容
LLM-->>G: 返回内容
G->>CA: 审核输出
CA-->>G: 审核结果
alt 输出违规
G->>M: 记录违规
G-->>GW: 返回违规
end
G->>M: 记录成功
G-->>GW: 返回结果
GW-->>C: JSON响应
核心服务整合
创建 services/platform_service.py:
import uuid
from typing import Dict, List, Optional, Any
from datetime import datetime
from core.keyword_auditor import KeywordAuditor
from core.moderation_auditor import ModerationAuditor
from core.simple_security_scanner import SimpleSecurityScanner
from core.advanced_rule_engine import AdvancedRuleEngine
from core.simple_llm_service import SimpleLLMService
from core.metrics import MetricsCollector
from core.audit_log import AuditLogger
from core.alert_manager import AlertManager
from core.models import (
AuditRequest, AuditResult,
SecurityScanRequest, SecurityScanResult,
AuditLevel, RiskLevel
)
from loguru import logger
import time
class PlatformService:
"""平台服务"""
def __init__(self):
# 审核模块
self.keyword_auditor = KeywordAuditor()
self.moderation_auditor = ModerationAuditor()
# 安全模块
self.security_scanner = SimpleSecurityScanner()
# 规则引擎
self.rule_engine = AdvancedRuleEngine()
# LLM服务
self.llm_service = SimpleLLMService()
# 监控审计
self.metrics = MetricsCollector()
self.audit_logger = AuditLogger()
self.alert_manager = AlertManager()
async def start(self):
"""启动服务"""
await self.audit_logger.start()
logger.info("✅ 平台服务启动")
async def stop(self):
"""停止服务"""
await self.audit_logger.stop()
logger.info("✅ 平台服务停止")
async def process_chat(
self,
prompt: str,
user_id: Optional[str] = None,
model: str = "gpt-4o-mini",
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
处理聊天请求
Args:
prompt: 用户输入
user_id: 用户ID
model: 模型名称
context: 上下文信息
Returns:
处理结果
"""
request_id = str(uuid.uuid4())
start_time = time.time()
# 增加活跃请求
self.metrics.increment_active()
try:
# 记录请求日志
await self.audit_logger.log_request(request_id, {
'user_id': user_id,
'model': model,
'prompt': prompt[:500], # 保存前500字符
'timestamp': datetime.now().isoformat()
})
# === 步骤1: 内容审核 ===
logger.info(f"[{request_id}] 开始内容审核")
audit_start = time.time()
audit_request = AuditRequest(
request_id=request_id,
content=prompt,
user_id=user_id,
metadata=context or {}
)
# 关键词审核
keyword_result = await self.keyword_auditor.audit(audit_request)
self.metrics.record_audit("keyword", keyword_result.level.value)
if keyword_result.is_blocked:
await self._handle_block(request_id, "input_audit", keyword_result.reasons)
return self._block_response(request_id, "input_audit", keyword_result.reasons)
# Moderation API审核
moderation_result = await self.moderation_auditor.audit(audit_request)
self.metrics.record_audit("moderation", moderation_result.level.value)
if moderation_result.is_blocked:
await self._handle_block(request_id, "moderation", moderation_result.reasons)
return self._block_response(request_id, "moderation", moderation_result.reasons)
audit_duration = (time.time() - audit_start) * 1000
logger.info(f"[{request_id}] 内容审核完成: {audit_duration:.0f}ms")
# === 步骤2: 安全扫描 ===
logger.info(f"[{request_id}] 开始安全扫描")
security_start = time.time()
security_request = SecurityScanRequest(
request_id=request_id,
prompt=prompt,
context=context
)
security_result = await self.security_scanner.scan(security_request)
if not security_result.is_safe:
await self._handle_block(request_id, "security", [security_result.mitigation])
self.metrics.record_injection(security_result.attack_type or "unknown")
return self._block_response(request_id, "security", [security_result.mitigation])
security_duration = (time.time() - security_start) * 1000
logger.info(f"[{request_id}] 安全扫描完成: {security_duration:.0f}ms")
# === 步骤3: 规则引擎 ===
logger.info(f"[{request_id}] 规则引擎评估")
rule_results = await self.rule_engine.evaluate(prompt, context)
blocked_rules = [r for r in rule_results if r.matched and r.action == "block"]
if blocked_rules:
reasons = [r.reason for r in blocked_rules]
await self._handle_block(request_id, "rule_engine", reasons)
return self._block_response(request_id, "rule_engine", reasons)
# === 步骤4: 调用LLM ===
logger.info(f"[{request_id}] 调用LLM")
llm_start = time.time()
llm_result = await self.llm_service.generate(prompt, model)
if llm_result.get("error"):
await self._handle_error(request_id, llm_result["error"])
return {
"request_id": request_id,
"success": False,
"error": llm_result["error"]
}
llm_duration = (time.time() - llm_start) * 1000
logger.info(f"[{request_id}] LLM生成完成: {llm_duration:.0f}ms")
# === 步骤5: 输出审核 ===
logger.info(f"[{request_id}] 输出审核")
output_request = AuditRequest(
request_id=request_id,
content=llm_result["content"],
content_type="output"
)
output_result = await self.keyword_auditor.audit(output_request)
if output_result.is_blocked:
await self._handle_block(request_id, "output_audit", output_result.reasons)
return self._block_response(request_id, "output_audit", output_result.reasons)
# === 成功 ===
total_duration = (time.time() - start_time) * 1000
# 记录指标
self.metrics.record_request("/chat", "success", total_duration / 1000)
self.metrics.record_llm_request(
model,
"success",
llm_result["tokens_used"] // 2,
llm_result["tokens_used"] // 2
)
# 记录响应日志
await self.audit_logger.log_response(request_id, {
'model': model,
'content': llm_result["content"][:500],
'tokens': llm_result["tokens_used"],
'latency_ms': total_duration,
'timestamp': datetime.now().isoformat()
})
logger.info(f"[{request_id}] 请求处理成功: {total_duration:.0f}ms")
return {
"request_id": request_id,
"success": True,
"response": llm_result["content"],
"model": model,
"tokens_used": llm_result["tokens_used"],
"latency_ms": total_duration,
"breakdown": {
"audit_ms": audit_duration,
"security_ms": security_duration,
"llm_ms": llm_duration
}
}
except Exception as e:
logger.error(f"[{request_id}] 处理异常: {e}")
await self._handle_error(request_id, str(e))
return {
"request_id": request_id,
"success": False,
"error": str(e)
}
finally:
self.metrics.decrement_active()
async def _handle_block(self, request_id: str, block_type: str, reasons: List[str]):
"""处理阻断"""
await self.audit_logger.log_violation(request_id, {
'type': block_type,
'reasons': reasons,
'timestamp': datetime.now().isoformat()
})
self.metrics.record_audit(block_type, "block")
self.metrics.record_audit_block(block_type, reasons[0] if reasons else "unknown")
async def _handle_error(self, request_id: str, error: str):
"""处理错误"""
await self.audit_logger.log_violation(request_id, {
'type': 'error',
'error': error,
'timestamp': datetime.now().isoformat()
})
def _block_response(
self,
request_id: str,
block_type: str,
reasons: List[str]
) -> Dict[str, Any]:
"""构建阻断响应"""
return {
"request_id": request_id,
"success": False,
"blocked": True,
"block_type": block_type,
"reason": reasons[0] if reasons else "内容违规",
"details": {
"reasons": reasons
}
}
def get_metrics(self) -> Dict[str, Any]:
"""获取指标"""
return {
"metrics": self.metrics.get_summary(),
"rules": self.rule_engine.get_stats()
}
API接口定义
创建 api/platform_routes.py:
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Optional, Dict, Any
router = APIRouter(prefix="/api", tags=["platform"])
class ChatRequest(BaseModel):
"""聊天请求"""
prompt: str
user_id: Optional[str] = None
model: str = "gpt-4o-mini"
context: Optional[Dict[str, Any]] = None
class RuleCreateRequest(BaseModel):
"""创建规则请求"""
rule_id: str
name: str
description: Optional[str] = ""
type: str
pattern: Optional[str] = None
action: str = "block"
priority: int = 50
enabled: bool = True
def get_platform_service():
"""获取平台服务实例"""
from services.platform_service import PlatformService
# 这里应该使用依赖注入,简化处理
return None
@router.post("/chat")
async def chat(request: ChatRequest):
"""聊天接口"""
platform = get_platform_service()
if not platform:
raise HTTPException(status_code=500, detail="服务未启动")
result = await platform.process_chat(
prompt=request.prompt,
user_id=request.user_id,
model=request.model,
context=request.context
)
if result.get("error"):
raise HTTPException(status_code=500, detail=result["error"])
return result
@router.get("/metrics")
async def metrics():
"""获取指标"""
platform = get_platform_service()
if not platform:
raise HTTPException(status_code=500, detail="服务未启动")
return platform.get_metrics()
@router.post("/rules")
async def create_rule(request: RuleCreateRequest):
"""创建规则"""
platform = get_platform_service()
if not platform:
raise HTTPException(status_code=500, detail="服务未启动")
from core.advanced_rule_engine import RuleDefinition, RuleType, RuleAction
rule = RuleDefinition(
rule_id=request.rule_id,
name=request.name,
description=request.description,
type=RuleType(request.type),
pattern=request.pattern,
action=RuleAction(request.action),
priority=request.priority,
enabled=request.enabled
)
success = platform.rule_engine.add_rule(rule)
if not success:
raise HTTPException(status_code=400, detail="规则创建失败")
return {"message": "规则创建成功", "rule_id": rule.rule_id}
@router.get("/rules")
async def list_rules(enabled_only: bool = True):
"""列出规则"""
platform = get_platform_service()
if not platform:
raise HTTPException(status_code=500, detail="服务未启动")
rules = platform.rule_engine.get_all_rules(enabled_only=enabled_only)
return {
"total": len(rules),
"rules": [rule.dict() for rule in rules]
}
@router.delete("/rules/{rule_id}")
async def delete_rule(rule_id: str):
"""删除规则"""
platform = get_platform_service()
if not platform:
raise HTTPException(status_code=500, detail="服务未启动")
success = platform.rule_engine.remove_rule(rule_id)
if not success:
raise HTTPException(status_code=404, detail="规则不存在")
return {"message": "规则删除成功"}
学习要点
✅ 设计了完整的平台架构 ✅ 实现了请求处理全流程 ✅ 整合了所有治理模块 ✅ 实现了监控和审计 ✅ 定义了RESTful API接口
下一步: 整合所有模块 🔗