任务分解与协作
多 Agent 系统的核心挑战是:如何高效分解任务、分配给合适的 Agent、并协调它们之间的协作。
任务分解策略
graph TB
A[复杂任务] --> B[层级分解]
A --> C[功能分解]
A --> D[依赖分解]
B --> B1[大任务拆子任务]
B --> B2[子任务拆步骤]
C --> C1[按功能模块分]
C --> C2[研究/编码/测试]
D --> D1[分析依赖关系]
D --> D2[可并行的并行]
D --> D3[不可并行的排序]
style A fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
"""
智能任务分解器
"""
from openai import OpenAI
import json
from dataclasses import dataclass
@dataclass
class SubTask:
"""子任务"""
id: int
title: str
description: str
assigned_to: str
dependencies: list[int] # 依赖的任务 ID
status: str = "pending" # pending, running, done, failed
result: str = ""
class TaskDecomposer:
"""任务分解器"""
def __init__(self, available_agents: dict[str, str]):
"""
Args:
available_agents: {agent_name: agent_description}
"""
self.client = OpenAI()
self.agents = available_agents
def decompose(self, task: str) -> list[SubTask]:
"""将复杂任务分解为子任务"""
agents_desc = "\n".join(
f"- {name}: {desc}" for name, desc in self.agents.items()
)
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": f"""你是任务分解专家。将复杂任务分解为可执行的子任务。
可用 Agent:
{agents_desc}
返回 JSON 数组,每个子任务包含:
- id: 序号 (从 1 开始)
- title: 简短标题
- description: 详细描述
- assigned_to: 分配给哪个 Agent
- dependencies: 依赖的任务 ID 列表 (无依赖为空数组)
注意:
1. 任务之间的依赖要正确
2. 没有依赖的任务可以并行执行
3. 每个任务分配给最合适的 Agent""",
},
{"role": "user", "content": f"任务: {task}"},
],
temperature=0,
response_format={"type": "json_object"},
)
try:
data = json.loads(response.choices[0].message.content)
tasks = data if isinstance(data, list) else data.get("tasks", [])
return [SubTask(**t) for t in tasks]
except (json.JSONDecodeError, TypeError):
return [SubTask(
id=1, title=task, description=task,
assigned_to=list(self.agents.keys())[0],
dependencies=[],
)]
# 使用
decomposer = TaskDecomposer({
"researcher": "技术调研,搜索信息",
"developer": "编写代码,实现功能",
"tester": "编写测试,验证代码",
"writer": "撰写文档和报告",
})
subtasks = decomposer.decompose(
"开发一个用户注册 API,包含邮箱验证功能,并编写测试和文档"
)
for t in subtasks:
deps = f" (依赖: {t.dependencies})" if t.dependencies else ""
print(f" [{t.id}] {t.title} → {t.assigned_to}{deps}")
并行执行引擎
"""
支持依赖关系的并行任务执行器
"""
import asyncio
from collections import defaultdict
class ParallelExecutor:
"""并行任务执行器"""
def __init__(self, agents: dict[str, "WorkerAgent"]):
self.agents = agents
self.results: dict[int, str] = {}
async def execute_all(self, tasks: list[SubTask]) -> dict[int, str]:
"""按依赖关系并行执行所有子任务"""
# 构建依赖图
dep_count = {t.id: len(t.dependencies) for t in tasks}
dependents = defaultdict(list) # 记录谁依赖这个任务
task_map = {t.id: t for t in tasks}
for t in tasks:
for dep_id in t.dependencies:
dependents[dep_id].append(t.id)
# 找出无依赖的任务(可以立即执行)
ready = [t.id for t in tasks if not t.dependencies]
while ready:
# 并行执行所有就绪的任务
print(f"\n🚀 并行执行: {[task_map[tid].title for tid in ready]}")
coroutines = [
self._run_task(task_map[tid]) for tid in ready
]
await asyncio.gather(*coroutines)
# 找出新的就绪任务
next_ready = []
for completed_id in ready:
for dependent_id in dependents[completed_id]:
dep_count[dependent_id] -= 1
if dep_count[dependent_id] == 0:
next_ready.append(dependent_id)
ready = next_ready
return self.results
async def _run_task(self, task: SubTask) -> None:
"""执行单个任务"""
task.status = "running"
print(f" ⏳ {task.title} ({task.assigned_to})")
# 收集依赖任务的结果作为上下文
context = ""
if task.dependencies:
context = "\n".join(
f"[任务{dep_id}结果]: {self.results.get(dep_id, '无')}"
for dep_id in task.dependencies
)
agent = self.agents.get(task.assigned_to)
if agent:
result = agent.execute(task.description, context)
else:
result = f"Agent '{task.assigned_to}' 不存在"
self.results[task.id] = result
task.status = "done"
task.result = result
print(f" ✅ {task.title} 完成")
Agent 通信协议
Agent 之间需要标准化的通信方式:
"""
Agent 通信协议
"""
from dataclasses import dataclass, field
from enum import Enum
import time
class MessageType(Enum):
REQUEST = "request" # 请求帮助
RESPONSE = "response" # 回复请求
BROADCAST = "broadcast" # 广播消息
STATUS = "status" # 状态更新
@dataclass
class AgentMessage:
"""Agent 间通信消息"""
sender: str
receiver: str # "*" 表示广播
msg_type: MessageType
content: str
metadata: dict = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
class MessageBus:
"""消息总线 - Agent 通信中枢"""
def __init__(self):
self.subscribers: dict[str, list] = {}
self.message_log: list[AgentMessage] = []
def subscribe(self, agent_name: str, callback) -> None:
"""订阅消息"""
if agent_name not in self.subscribers:
self.subscribers[agent_name] = []
self.subscribers[agent_name].append(callback)
def publish(self, message: AgentMessage) -> None:
"""发布消息"""
self.message_log.append(message)
if message.receiver == "*":
# 广播给所有 Agent
for name, callbacks in self.subscribers.items():
if name != message.sender:
for cb in callbacks:
cb(message)
elif message.receiver in self.subscribers:
# 定向发送
for cb in self.subscribers[message.receiver]:
cb(message)
def get_history(self, agent_name: str = None) -> list[AgentMessage]:
"""获取消息历史"""
if agent_name:
return [
m for m in self.message_log
if m.sender == agent_name or m.receiver in (agent_name, "*")
]
return self.message_log
# 使用
bus = MessageBus()
def researcher_handler(msg: AgentMessage):
print(f" 📩 Researcher 收到: {msg.content[:50]}")
def developer_handler(msg: AgentMessage):
print(f" 📩 Developer 收到: {msg.content[:50]}")
bus.subscribe("researcher", researcher_handler)
bus.subscribe("developer", developer_handler)
# 研究员广播发现
bus.publish(AgentMessage(
sender="researcher",
receiver="*",
msg_type=MessageType.BROADCAST,
content="发现 FastAPI 在高并发场景下性能优于 Flask 3 倍",
))
实战:软件开发团队
"""
软件开发团队 Multi-Agent 系统
"""
def build_dev_team() -> OrchestratorAgent:
"""构建软件开发团队"""
team = OrchestratorAgent()
team.add_worker(WorkerAgent(
name="pm",
role="产品经理",
instructions="""你负责:
1. 分析需求,明确功能清单
2. 编写用户故事
3. 确定优先级和验收标准""",
))
team.add_worker(WorkerAgent(
name="architect",
role="架构师",
instructions="""你负责:
1. 设计系统架构
2. 选择技术方案
3. 定义 API 接口""",
))
team.add_worker(WorkerAgent(
name="developer",
role="后端开发",
instructions="""你负责:
1. 根据设计编写 Python 代码
2. 实现 API 接口
3. 编写单元测试""",
))
team.add_worker(WorkerAgent(
name="reviewer",
role="代码审查员",
instructions="""你负责:
1. 审查代码质量和安全性
2. 检查是否符合最佳实践
3. 给出改进建议""",
))
return team
# 使用
team = build_dev_team()
result = team.run("设计并实现一个用户认证系统,支持 JWT 登录和注册")
print(result)
本章小结
- 任务分解要考虑依赖关系,无依赖的任务可以并行
- 并行执行器按拓扑排序执行任务,大幅提升效率
- Agent 通信使用消息总线,支持定向发送和广播
- 软件开发团队是多 Agent 协作的经典案例
下一章:学习 Agent 的自我反思与改进机制。