通信协议与状态共享
High Contrast
Dark Mode
Light Mode
Sepia
Forest
2 min read327 words

通信协议与状态共享

多 Agent 系统的核心难题不是"每个 Agent 能做什么",而是"Agent 之间如何高效、可靠地协作"。通信协议和状态共享机制决定了系统的上限。

通信模式总览

graph TB A[Agent 间通信] --> B[直接调用] A --> C[消息队列] A --> D[共享状态] A --> E[事件驱动] B --> B1[同步,耦合高] B --> B2[适合主从架构] C --> C1[异步,解耦] C --> C2[适合高吞吐] D --> D1[共享内存/DB] D --> D2[需要锁机制] E --> E1[发布/订阅] E --> E2[适合复杂协作] style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px style B fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style C fill:#fff3e0,stroke:#f57c00,stroke-width:2px style D fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px style E fill:#fce4ec,stroke:#c62828,stroke-width:2px

消息标准格式

统一消息格式是多 Agent 协作的第一步,避免每个 Agent 各说各话。

"""
Agent 通信标准消息格式
"""
from dataclasses import dataclass, field
from enum import Enum
import uuid
import time
class MessageType(Enum):
"""消息类型"""
TASK = "task"           # 任务分配
RESULT = "result"       # 结果返回
STATUS = "status"       # 状态更新
ERROR = "error"         # 错误通知
HEARTBEAT = "heartbeat" # 心跳
class Priority(Enum):
"""优先级"""
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
@dataclass
class AgentMessage:
"""Agent 间通信的标准消息"""
sender_id: str
receiver_id: str
msg_type: MessageType
payload: dict
message_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
timestamp: float = field(default_factory=time.time)
priority: Priority = Priority.NORMAL
correlation_id: str = ""    # 关联消息 ID(用于追踪请求-响应)
ttl_seconds: int = 300      # 消息有效期
def is_expired(self) -> bool:
return time.time() - self.timestamp > self.ttl_seconds
def reply(self, payload: dict, msg_type: MessageType = MessageType.RESULT) -> "AgentMessage":
"""创建回复消息"""
return AgentMessage(
sender_id=self.receiver_id,
receiver_id=self.sender_id,
msg_type=msg_type,
payload=payload,
correlation_id=self.message_id,
)
# 示例:创建任务消息
task_msg = AgentMessage(
sender_id="orchestrator",
receiver_id="researcher",
msg_type=MessageType.TASK,
payload={
"action": "research",
"topic": "向量数据库 2025 最新进展",
"max_sources": 5,
},
priority=Priority.HIGH,
)
print(f"消息 ID: {task_msg.message_id}")
print(f"发送方: {task_msg.sender_id} → 接收方: {task_msg.receiver_id}")
print(f"类型: {task_msg.msg_type.value}, 优先级: {task_msg.priority.name}")

共享状态管理

"""
多 Agent 共享状态管理器
"""
import threading
from typing import Any
from dataclasses import dataclass, field
@dataclass
class SharedState:
"""线程安全的共享状态"""
_data: dict = field(default_factory=dict)
_lock: threading.RLock = field(default_factory=threading.RLock)
_watchers: dict = field(default_factory=dict)  # key -> [callback]
def get(self, key: str, default: Any = None) -> Any:
with self._lock:
return self._data.get(key, default)
def set(self, key: str, value: Any, agent_id: str = "") -> None:
with self._lock:
old_value = self._data.get(key)
self._data[key] = value
# 通知观察者
if key in self._watchers and old_value != value:
for callback in self._watchers[key]:
try:
callback(key, old_value, value, agent_id)
except Exception as e:
print(f"Watcher 出错: {e}")
def watch(self, key: str, callback) -> None:
"""监听某个 key 的变化"""
with self._lock:
if key not in self._watchers:
self._watchers[key] = []
self._watchers[key].append(callback)
def snapshot(self) -> dict:
"""获取当前状态快照"""
with self._lock:
return dict(self._data)
def update_batch(self, updates: dict, agent_id: str = "") -> None:
"""批量更新"""
for key, value in updates.items():
self.set(key, value, agent_id)
# 使用示例:多 Agent 共享任务状态
state = SharedState()
# 初始化任务状态
state.update_batch({
"task_status": "pending",
"research_result": None,
"writer_result": None,
"review_passed": False,
})
# Agent 监听状态变化
def on_research_done(key, old, new, agent_id):
if new is not None:
print(f"[Writer] 检测到 research 完成,开始写作...")
state.watch("research_result", on_research_done)
# Researcher Agent 完成后更新状态
state.set("research_result", "向量数据库调研完成:...", agent_id="researcher")
state.set("task_status", "writing", agent_id="researcher")
print(f"当前状态: {state.snapshot()}")

发布/订阅事件总线

graph LR R[Researcher] -->|publish: research.done| BUS[事件总线] BUS -->|subscribe| W[Writer] BUS -->|subscribe| M[Monitor] W -->|publish: write.done| BUS BUS -->|subscribe| REV[Reviewer] style BUS fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
"""
Agent 事件总线(发布/订阅模式)
"""
from collections import defaultdict
from typing import Callable
class EventBus:
"""Agent 间事件总线"""
def __init__(self):
self._handlers: dict[str, list[Callable]] = defaultdict(list)
self._history: list[dict] = []
def subscribe(self, event: str, handler: Callable) -> None:
"""订阅事件"""
self._handlers[event].append(handler)
print(f"  [{handler.__name__}] 订阅了事件: {event}")
def publish(self, event: str, data: dict, source: str = "") -> None:
"""发布事件"""
record = {"event": event, "data": data, "source": source, "time": time.time()}
self._history.append(record)
handlers = self._handlers.get(event, [])
print(f"  事件 [{event}] 触发 {len(handlers)} 个处理器")
for handler in handlers:
try:
handler(data)
except Exception as e:
print(f"  处理器 {handler.__name__} 出错: {e}")
def get_history(self, event: str = "") -> list[dict]:
"""获取事件历史"""
if event:
return [h for h in self._history if h["event"] == event]
return self._history
# 使用示例
bus = EventBus()
import time
# 注册 Agent 的事件处理器
def on_task_started(data):
print(f"    → 收到任务: {data['task']}")
def on_research_completed(data):
print(f"    → 开始写作,基于 {len(data.get('sources', []))} 个来源")
def on_write_completed(data):
print(f"    → 开始审核: {data['content'][:50]}...")
bus.subscribe("task.started", on_task_started)
bus.subscribe("research.completed", on_research_completed)
bus.subscribe("write.completed", on_write_completed)
# 模拟 Agent 工作流
print("\n=== 开始多 Agent 工作流 ===")
bus.publish("task.started", {"task": "撰写 RAG 技术报告"}, source="orchestrator")
bus.publish("research.completed", {"sources": ["论文1", "论文2", "论文3"]}, source="researcher")
bus.publish("write.completed", {"content": "RAG 技术现状报告:本文综合..."}, source="writer")

架构选型对比

通信模式 延迟 可靠性 耦合度 适用场景
直接函数调用 极低 同进程主从
共享状态 状态驱动工作流
事件总线 复杂 Agent 协作
消息队列(Redis/RabbitMQ) 极低 跨服务 Agent
HTTP API 微服务 Agent

本章实践清单

下一章:学习高级主题——Agent 的自我反思与改进(Reflexion)。