平台架构设计
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read120 words

平台架构设计

整合所有模块,构建完整的LLM治理平台。

平台整体架构

graph TB subgraph "客户端层" U1[Web界面] U2[API客户端] U3[管理后台] end subgraph "API网关层" A1[API Gateway] A2[认证中间件] A3[限流中间件] end subgraph "治理层" G1[内容审核器] G2[安全扫描器] G3[规则引擎] end subgraph "服务层" S1[LLM服务] S2[审计服务] S3[告警服务] S4[配置服务] end subgraph "数据层" D1[向量数据库] D2[时序数据库] D3[审计日志] D4[规则存储] end subgraph "监控层" M1[指标收集] M2[日志聚合] M3[告警管理] end U1 --> A1 U2 --> A1 U3 --> A1 A1 --> A2 A2 --> A3 A3 --> G1 A3 --> G2 A3 --> G3 G1 --> S1 G2 --> S1 G3 --> S2 S1 --> D1 S2 --> D3 S3 --> D2 S4 --> D4 G1 --> M1 G2 --> M1 S1 --> M1 M1 --> M2 M1 --> M3 style U1 fill:#e1f5ff style S1 fill:#fff3cd style M3 fill:#d4edda

请求处理流程

sequenceDiagram participant C as 客户端 participant GW as API网关 participant G as 治理服务 participant CA as 内容审核 participant SS as 安全扫描 participant RE as 规则引擎 participant LLM as LLM服务 participant M as 监控 C->>GW: POST /chat GW->>GW: 认证 & 限流 GW->>G: 处理请求 G->>CA: 审核输入 CA-->>G: 审核结果 alt 输入违规 G->>M: 记录阻断 G-->>GW: 返回阻断 end G->>SS: 安全扫描 SS-->>G: 扫描结果 alt 安全风险 G->>M: 记录风险 G-->>GW: 返回风险 end G->>RE: 规则评估 RE-->>G: 匹配规则 alt 规则阻断 G->>M: 记录阻断 G-->>GW: 返回阻断 end G->>LLM: 生成内容 LLM-->>G: 返回内容 G->>CA: 审核输出 CA-->>G: 审核结果 alt 输出违规 G->>M: 记录违规 G-->>GW: 返回违规 end G->>M: 记录成功 G-->>GW: 返回结果 GW-->>C: JSON响应

核心服务整合

创建 services/platform_service.py

import uuid
from typing import Dict, List, Optional, Any
from datetime import datetime
from core.keyword_auditor import KeywordAuditor
from core.moderation_auditor import ModerationAuditor
from core.simple_security_scanner import SimpleSecurityScanner
from core.advanced_rule_engine import AdvancedRuleEngine
from core.simple_llm_service import SimpleLLMService
from core.metrics import MetricsCollector
from core.audit_log import AuditLogger
from core.alert_manager import AlertManager
from core.models import (
AuditRequest, AuditResult,
SecurityScanRequest, SecurityScanResult,
AuditLevel, RiskLevel
)
from loguru import logger
import time
class PlatformService:
"""平台服务"""
def __init__(self):
# 审核模块
self.keyword_auditor = KeywordAuditor()
self.moderation_auditor = ModerationAuditor()
# 安全模块
self.security_scanner = SimpleSecurityScanner()
# 规则引擎
self.rule_engine = AdvancedRuleEngine()
# LLM服务
self.llm_service = SimpleLLMService()
# 监控审计
self.metrics = MetricsCollector()
self.audit_logger = AuditLogger()
self.alert_manager = AlertManager()
async def start(self):
"""启动服务"""
await self.audit_logger.start()
logger.info("✅ 平台服务启动")
async def stop(self):
"""停止服务"""
await self.audit_logger.stop()
logger.info("✅ 平台服务停止")
async def process_chat(
self,
prompt: str,
user_id: Optional[str] = None,
model: str = "gpt-4o-mini",
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
处理聊天请求
Args:
prompt: 用户输入
user_id: 用户ID
model: 模型名称
context: 上下文信息
Returns:
处理结果
"""
request_id = str(uuid.uuid4())
start_time = time.time()
# 增加活跃请求
self.metrics.increment_active()
try:
# 记录请求日志
await self.audit_logger.log_request(request_id, {
'user_id': user_id,
'model': model,
'prompt': prompt[:500],  # 保存前500字符
'timestamp': datetime.now().isoformat()
})
# === 步骤1: 内容审核 ===
logger.info(f"[{request_id}] 开始内容审核")
audit_start = time.time()
audit_request = AuditRequest(
request_id=request_id,
content=prompt,
user_id=user_id,
metadata=context or {}
)
# 关键词审核
keyword_result = await self.keyword_auditor.audit(audit_request)
self.metrics.record_audit("keyword", keyword_result.level.value)
if keyword_result.is_blocked:
await self._handle_block(request_id, "input_audit", keyword_result.reasons)
return self._block_response(request_id, "input_audit", keyword_result.reasons)
# Moderation API审核
moderation_result = await self.moderation_auditor.audit(audit_request)
self.metrics.record_audit("moderation", moderation_result.level.value)
if moderation_result.is_blocked:
await self._handle_block(request_id, "moderation", moderation_result.reasons)
return self._block_response(request_id, "moderation", moderation_result.reasons)
audit_duration = (time.time() - audit_start) * 1000
logger.info(f"[{request_id}] 内容审核完成: {audit_duration:.0f}ms")
# === 步骤2: 安全扫描 ===
logger.info(f"[{request_id}] 开始安全扫描")
security_start = time.time()
security_request = SecurityScanRequest(
request_id=request_id,
prompt=prompt,
context=context
)
security_result = await self.security_scanner.scan(security_request)
if not security_result.is_safe:
await self._handle_block(request_id, "security", [security_result.mitigation])
self.metrics.record_injection(security_result.attack_type or "unknown")
return self._block_response(request_id, "security", [security_result.mitigation])
security_duration = (time.time() - security_start) * 1000
logger.info(f"[{request_id}] 安全扫描完成: {security_duration:.0f}ms")
# === 步骤3: 规则引擎 ===
logger.info(f"[{request_id}] 规则引擎评估")
rule_results = await self.rule_engine.evaluate(prompt, context)
blocked_rules = [r for r in rule_results if r.matched and r.action == "block"]
if blocked_rules:
reasons = [r.reason for r in blocked_rules]
await self._handle_block(request_id, "rule_engine", reasons)
return self._block_response(request_id, "rule_engine", reasons)
# === 步骤4: 调用LLM ===
logger.info(f"[{request_id}] 调用LLM")
llm_start = time.time()
llm_result = await self.llm_service.generate(prompt, model)
if llm_result.get("error"):
await self._handle_error(request_id, llm_result["error"])
return {
"request_id": request_id,
"success": False,
"error": llm_result["error"]
}
llm_duration = (time.time() - llm_start) * 1000
logger.info(f"[{request_id}] LLM生成完成: {llm_duration:.0f}ms")
# === 步骤5: 输出审核 ===
logger.info(f"[{request_id}] 输出审核")
output_request = AuditRequest(
request_id=request_id,
content=llm_result["content"],
content_type="output"
)
output_result = await self.keyword_auditor.audit(output_request)
if output_result.is_blocked:
await self._handle_block(request_id, "output_audit", output_result.reasons)
return self._block_response(request_id, "output_audit", output_result.reasons)
# === 成功 ===
total_duration = (time.time() - start_time) * 1000
# 记录指标
self.metrics.record_request("/chat", "success", total_duration / 1000)
self.metrics.record_llm_request(
model,
"success",
llm_result["tokens_used"] // 2,
llm_result["tokens_used"] // 2
)
# 记录响应日志
await self.audit_logger.log_response(request_id, {
'model': model,
'content': llm_result["content"][:500],
'tokens': llm_result["tokens_used"],
'latency_ms': total_duration,
'timestamp': datetime.now().isoformat()
})
logger.info(f"[{request_id}] 请求处理成功: {total_duration:.0f}ms")
return {
"request_id": request_id,
"success": True,
"response": llm_result["content"],
"model": model,
"tokens_used": llm_result["tokens_used"],
"latency_ms": total_duration,
"breakdown": {
"audit_ms": audit_duration,
"security_ms": security_duration,
"llm_ms": llm_duration
}
}
except Exception as e:
logger.error(f"[{request_id}] 处理异常: {e}")
await self._handle_error(request_id, str(e))
return {
"request_id": request_id,
"success": False,
"error": str(e)
}
finally:
self.metrics.decrement_active()
async def _handle_block(self, request_id: str, block_type: str, reasons: List[str]):
"""处理阻断"""
await self.audit_logger.log_violation(request_id, {
'type': block_type,
'reasons': reasons,
'timestamp': datetime.now().isoformat()
})
self.metrics.record_audit(block_type, "block")
self.metrics.record_audit_block(block_type, reasons[0] if reasons else "unknown")
async def _handle_error(self, request_id: str, error: str):
"""处理错误"""
await self.audit_logger.log_violation(request_id, {
'type': 'error',
'error': error,
'timestamp': datetime.now().isoformat()
})
def _block_response(
self,
request_id: str,
block_type: str,
reasons: List[str]
) -> Dict[str, Any]:
"""构建阻断响应"""
return {
"request_id": request_id,
"success": False,
"blocked": True,
"block_type": block_type,
"reason": reasons[0] if reasons else "内容违规",
"details": {
"reasons": reasons
}
}
def get_metrics(self) -> Dict[str, Any]:
"""获取指标"""
return {
"metrics": self.metrics.get_summary(),
"rules": self.rule_engine.get_stats()
}

API接口定义

创建 api/platform_routes.py

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Optional, Dict, Any
router = APIRouter(prefix="/api", tags=["platform"])
class ChatRequest(BaseModel):
"""聊天请求"""
prompt: str
user_id: Optional[str] = None
model: str = "gpt-4o-mini"
context: Optional[Dict[str, Any]] = None
class RuleCreateRequest(BaseModel):
"""创建规则请求"""
rule_id: str
name: str
description: Optional[str] = ""
type: str
pattern: Optional[str] = None
action: str = "block"
priority: int = 50
enabled: bool = True
def get_platform_service():
"""获取平台服务实例"""
from services.platform_service import PlatformService
# 这里应该使用依赖注入,简化处理
return None
@router.post("/chat")
async def chat(request: ChatRequest):
"""聊天接口"""
platform = get_platform_service()
if not platform:
raise HTTPException(status_code=500, detail="服务未启动")
result = await platform.process_chat(
prompt=request.prompt,
user_id=request.user_id,
model=request.model,
context=request.context
)
if result.get("error"):
raise HTTPException(status_code=500, detail=result["error"])
return result
@router.get("/metrics")
async def metrics():
"""获取指标"""
platform = get_platform_service()
if not platform:
raise HTTPException(status_code=500, detail="服务未启动")
return platform.get_metrics()
@router.post("/rules")
async def create_rule(request: RuleCreateRequest):
"""创建规则"""
platform = get_platform_service()
if not platform:
raise HTTPException(status_code=500, detail="服务未启动")
from core.advanced_rule_engine import RuleDefinition, RuleType, RuleAction
rule = RuleDefinition(
rule_id=request.rule_id,
name=request.name,
description=request.description,
type=RuleType(request.type),
pattern=request.pattern,
action=RuleAction(request.action),
priority=request.priority,
enabled=request.enabled
)
success = platform.rule_engine.add_rule(rule)
if not success:
raise HTTPException(status_code=400, detail="规则创建失败")
return {"message": "规则创建成功", "rule_id": rule.rule_id}
@router.get("/rules")
async def list_rules(enabled_only: bool = True):
"""列出规则"""
platform = get_platform_service()
if not platform:
raise HTTPException(status_code=500, detail="服务未启动")
rules = platform.rule_engine.get_all_rules(enabled_only=enabled_only)
return {
"total": len(rules),
"rules": [rule.dict() for rule in rules]
}
@router.delete("/rules/{rule_id}")
async def delete_rule(rule_id: str):
"""删除规则"""
platform = get_platform_service()
if not platform:
raise HTTPException(status_code=500, detail="服务未启动")
success = platform.rule_engine.remove_rule(rule_id)
if not success:
raise HTTPException(status_code=404, detail="规则不存在")
return {"message": "规则删除成功"}

学习要点

✅ 设计了完整的平台架构 ✅ 实现了请求处理全流程 ✅ 整合了所有治理模块 ✅ 实现了监控和审计 ✅ 定义了RESTful API接口


下一步: 整合所有模块 🔗