通信协议与状态共享
多 Agent 系统的核心难题不是"每个 Agent 能做什么",而是"Agent 之间如何高效、可靠地协作"。通信协议和状态共享机制决定了系统的上限。
通信模式总览
graph TB
A[Agent 间通信] --> B[直接调用]
A --> C[消息队列]
A --> D[共享状态]
A --> E[事件驱动]
B --> B1[同步,耦合高]
B --> B2[适合主从架构]
C --> C1[异步,解耦]
C --> C2[适合高吞吐]
D --> D1[共享内存/DB]
D --> D2[需要锁机制]
E --> E1[发布/订阅]
E --> E2[适合复杂协作]
style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style B fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style C fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style D fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style E fill:#fce4ec,stroke:#c62828,stroke-width:2px
消息标准格式
统一消息格式是多 Agent 协作的第一步,避免每个 Agent 各说各话。
"""
Agent 通信标准消息格式
"""
from dataclasses import dataclass, field
from enum import Enum
import uuid
import time
class MessageType(Enum):
"""消息类型"""
TASK = "task" # 任务分配
RESULT = "result" # 结果返回
STATUS = "status" # 状态更新
ERROR = "error" # 错误通知
HEARTBEAT = "heartbeat" # 心跳
class Priority(Enum):
"""优先级"""
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
@dataclass
class AgentMessage:
"""Agent 间通信的标准消息"""
sender_id: str
receiver_id: str
msg_type: MessageType
payload: dict
message_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
timestamp: float = field(default_factory=time.time)
priority: Priority = Priority.NORMAL
correlation_id: str = "" # 关联消息 ID(用于追踪请求-响应)
ttl_seconds: int = 300 # 消息有效期
def is_expired(self) -> bool:
return time.time() - self.timestamp > self.ttl_seconds
def reply(self, payload: dict, msg_type: MessageType = MessageType.RESULT) -> "AgentMessage":
"""创建回复消息"""
return AgentMessage(
sender_id=self.receiver_id,
receiver_id=self.sender_id,
msg_type=msg_type,
payload=payload,
correlation_id=self.message_id,
)
# 示例:创建任务消息
task_msg = AgentMessage(
sender_id="orchestrator",
receiver_id="researcher",
msg_type=MessageType.TASK,
payload={
"action": "research",
"topic": "向量数据库 2025 最新进展",
"max_sources": 5,
},
priority=Priority.HIGH,
)
print(f"消息 ID: {task_msg.message_id}")
print(f"发送方: {task_msg.sender_id} → 接收方: {task_msg.receiver_id}")
print(f"类型: {task_msg.msg_type.value}, 优先级: {task_msg.priority.name}")
共享状态管理
"""
多 Agent 共享状态管理器
"""
import threading
from typing import Any
from dataclasses import dataclass, field
@dataclass
class SharedState:
"""线程安全的共享状态"""
_data: dict = field(default_factory=dict)
_lock: threading.RLock = field(default_factory=threading.RLock)
_watchers: dict = field(default_factory=dict) # key -> [callback]
def get(self, key: str, default: Any = None) -> Any:
with self._lock:
return self._data.get(key, default)
def set(self, key: str, value: Any, agent_id: str = "") -> None:
with self._lock:
old_value = self._data.get(key)
self._data[key] = value
# 通知观察者
if key in self._watchers and old_value != value:
for callback in self._watchers[key]:
try:
callback(key, old_value, value, agent_id)
except Exception as e:
print(f"Watcher 出错: {e}")
def watch(self, key: str, callback) -> None:
"""监听某个 key 的变化"""
with self._lock:
if key not in self._watchers:
self._watchers[key] = []
self._watchers[key].append(callback)
def snapshot(self) -> dict:
"""获取当前状态快照"""
with self._lock:
return dict(self._data)
def update_batch(self, updates: dict, agent_id: str = "") -> None:
"""批量更新"""
for key, value in updates.items():
self.set(key, value, agent_id)
# 使用示例:多 Agent 共享任务状态
state = SharedState()
# 初始化任务状态
state.update_batch({
"task_status": "pending",
"research_result": None,
"writer_result": None,
"review_passed": False,
})
# Agent 监听状态变化
def on_research_done(key, old, new, agent_id):
if new is not None:
print(f"[Writer] 检测到 research 完成,开始写作...")
state.watch("research_result", on_research_done)
# Researcher Agent 完成后更新状态
state.set("research_result", "向量数据库调研完成:...", agent_id="researcher")
state.set("task_status", "writing", agent_id="researcher")
print(f"当前状态: {state.snapshot()}")
发布/订阅事件总线
graph LR
R[Researcher] -->|publish: research.done| BUS[事件总线]
BUS -->|subscribe| W[Writer]
BUS -->|subscribe| M[Monitor]
W -->|publish: write.done| BUS
BUS -->|subscribe| REV[Reviewer]
style BUS fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
"""
Agent 事件总线(发布/订阅模式)
"""
from collections import defaultdict
from typing import Callable
class EventBus:
"""Agent 间事件总线"""
def __init__(self):
self._handlers: dict[str, list[Callable]] = defaultdict(list)
self._history: list[dict] = []
def subscribe(self, event: str, handler: Callable) -> None:
"""订阅事件"""
self._handlers[event].append(handler)
print(f" [{handler.__name__}] 订阅了事件: {event}")
def publish(self, event: str, data: dict, source: str = "") -> None:
"""发布事件"""
record = {"event": event, "data": data, "source": source, "time": time.time()}
self._history.append(record)
handlers = self._handlers.get(event, [])
print(f" 事件 [{event}] 触发 {len(handlers)} 个处理器")
for handler in handlers:
try:
handler(data)
except Exception as e:
print(f" 处理器 {handler.__name__} 出错: {e}")
def get_history(self, event: str = "") -> list[dict]:
"""获取事件历史"""
if event:
return [h for h in self._history if h["event"] == event]
return self._history
# 使用示例
bus = EventBus()
import time
# 注册 Agent 的事件处理器
def on_task_started(data):
print(f" → 收到任务: {data['task']}")
def on_research_completed(data):
print(f" → 开始写作,基于 {len(data.get('sources', []))} 个来源")
def on_write_completed(data):
print(f" → 开始审核: {data['content'][:50]}...")
bus.subscribe("task.started", on_task_started)
bus.subscribe("research.completed", on_research_completed)
bus.subscribe("write.completed", on_write_completed)
# 模拟 Agent 工作流
print("\n=== 开始多 Agent 工作流 ===")
bus.publish("task.started", {"task": "撰写 RAG 技术报告"}, source="orchestrator")
bus.publish("research.completed", {"sources": ["论文1", "论文2", "论文3"]}, source="researcher")
bus.publish("write.completed", {"content": "RAG 技术现状报告:本文综合..."}, source="writer")
架构选型对比
| 通信模式 | 延迟 | 可靠性 | 耦合度 | 适用场景 |
|---|---|---|---|---|
| 直接函数调用 | 极低 | 高 | 高 | 同进程主从 |
| 共享状态 | 低 | 中 | 中 | 状态驱动工作流 |
| 事件总线 | 低 | 中 | 低 | 复杂 Agent 协作 |
| 消息队列(Redis/RabbitMQ) | 中 | 高 | 极低 | 跨服务 Agent |
| HTTP API | 高 | 高 | 低 | 微服务 Agent |
本章实践清单
- [ ] 定义统一的
AgentMessage消息格式,包含message_id和correlation_id - [ ] 选择合适的通信模式(同进程用事件总线,跨服务用消息队列)
- [ ] 为共享状态加锁,避免竞态条件
- [ ] 实现消息 TTL 和过期清理机制
- [ ] 为关键事件建立历史记录,方便调试
- [ ] 测试 Agent 异常时,其他 Agent 的降级行为
下一章:学习高级主题——Agent 的自我反思与改进(Reflexion)。