治理 API 与系统集成
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read267 words

治理 API 与系统集成

治理平台需要通过标准化 API 与现有的 LLM 应用、监控系统和工单系统集成。

集成架构

graph TB A[LLM 应用] -->|HTTP API| B[治理网关] B --> C[审核服务] B --> D[安全服务] B --> E[规则引擎] C --> F[结果聚合] D --> F E --> F F --> G[响应返回] F --> H[日志服务] F --> I[告警服务] H --> J[Elasticsearch] I --> K[PagerDuty / 飞书] style B fill:#e3f2fd,stroke:#1565c0,stroke-width:2px style F fill:#fff9c4,stroke:#f9a825,stroke-width:2px style G fill:#c8e6c9,stroke:#43a047,stroke-width:2px

治理网关 API

from dataclasses import dataclass, field
from enum import Enum
from typing import Any
import time
class Decision(Enum):
ALLOW = "allow"
BLOCK = "block"
MODIFY = "modify"
REVIEW = "review"
@dataclass
class GovernanceRequest:
"""治理检查请求"""
request_id: str
user_id: str
model_id: str
input_text: str
context: dict = field(default_factory=dict)
# 可选:预生成的输出(用于输出审核)
output_text: str | None = None
@dataclass
class GovernanceResponse:
"""治理检查响应"""
request_id: str
decision: Decision
modified_input: str | None = None
modified_output: str | None = None
reasons: list[str] = field(default_factory=list)
latency_ms: float = 0.0
metadata: dict = field(default_factory=dict)
class GovernanceGateway:
"""治理网关——统一的治理检查入口"""
def __init__(self):
self.pre_checks: list = []   # 输入检查器
self.post_checks: list = []  # 输出检查器
self.modifiers: list = []    # 内容修改器
def check_input(self, request: GovernanceRequest) -> GovernanceResponse:
"""输入侧治理检查"""
start = time.time()
reasons = []
decision = Decision.ALLOW
modified = request.input_text
for checker in self.pre_checks:
result = checker(request.input_text, request.context)
if result["blocked"]:
decision = Decision.BLOCK
reasons.append(result["reason"])
break
if result.get("modified"):
decision = Decision.MODIFY
modified = result["modified"]
reasons.append(f"输入已修改: {result['reason']}")
return GovernanceResponse(
request_id=request.request_id,
decision=decision,
modified_input=modified if decision == Decision.MODIFY else None,
reasons=reasons,
latency_ms=(time.time() - start) * 1000,
)
def check_output(self, request: GovernanceRequest) -> GovernanceResponse:
"""输出侧治理检查"""
start = time.time()
reasons = []
decision = Decision.ALLOW
modified = request.output_text or ""
for checker in self.post_checks:
result = checker(modified, request.context)
if result["blocked"]:
decision = Decision.BLOCK
reasons.append(result["reason"])
break
if result.get("modified"):
decision = Decision.MODIFY
modified = result["modified"]
reasons.append(result["reason"])
return GovernanceResponse(
request_id=request.request_id,
decision=decision,
modified_output=modified if decision == Decision.MODIFY else None,
reasons=reasons,
latency_ms=(time.time() - start) * 1000,
)
# 快捷集成函数
def governance_middleware(gateway: GovernanceGateway):
"""创建治理中间件——可嵌入任何LLM调用链"""
def middleware(func):
def wrapper(user_id: str, prompt: str, **kwargs):
# 输入检查
req = GovernanceRequest(
request_id=f"req_{int(time.time()*1000)}",
user_id=user_id,
model_id=kwargs.get("model", "default"),
input_text=prompt,
)
input_result = gateway.check_input(req)
if input_result.decision == Decision.BLOCK:
return {"error": "请求被治理系统拒绝",
"reasons": input_result.reasons}
actual_input = input_result.modified_input or prompt
# 调用原始函数
output = func(user_id, actual_input, **kwargs)
# 输出检查
req.output_text = output.get("text", "")
output_result = gateway.check_output(req)
if output_result.decision == Decision.BLOCK:
return {"error": "输出被治理系统过滤",
"reasons": output_result.reasons}
if output_result.modified_output:
output["text"] = output_result.modified_output
return output
return wrapper
return middleware

SDK 集成模式

@dataclass
class GovernanceSDKConfig:
"""SDK 配置"""
gateway_url: str = "http://governance:8080"
timeout_ms: int = 200
fallback_on_error: str = "allow"  # allow | block
async_logging: bool = True
class GovernanceSDK:
"""轻量级治理 SDK——供 LLM 应用集成"""
def __init__(self, config: GovernanceSDKConfig):
self.config = config
self._cache: dict[str, tuple[Decision, float]] = {}
self.cache_ttl = 60.0  # 缓存60秒
def check(self, user_id: str, text: str,
direction: str = "input") -> Decision:
"""快速检查(带缓存)"""
cache_key = f"{direction}:{hash(text)}"
if cache_key in self._cache:
decision, ts = self._cache[cache_key]
if time.time() - ts < self.cache_ttl:
return decision
# 模拟 API 调用(实际应为 HTTP 请求)
decision = self._call_gateway(user_id, text, direction)
self._cache[cache_key] = (decision, time.time())
return decision
def _call_gateway(self, user_id: str, text: str,
direction: str) -> Decision:
"""调用治理网关"""
# 实际实现中这里是 HTTP 调用
# 超时回退策略
return Decision.ALLOW

集成模式对比

模式 延迟影响 集成复杂度 可靠性 适用场景
同步网关 高(+50-200ms) 强一致 高风险场景
异步旁路 最终一致 日志/监控
SDK 嵌入 低(+5-20ms) 本地缓存 高性能场景
中间件 中(+20-50ms) 依赖链路 标准 Web 应用
Sidecar 代理 低(+10-30ms) 独立进程 微服务/K8s

本章小结

下一章:多租户治理