多 Agent 架构模式
当单个 Agent 无法胜任复杂任务时,多个 Agent 协作可以显著提升系统能力。
为什么需要多 Agent
graph TB
A[单 Agent 的局限] --> B[上下文窗口有限]
A --> C[角色冲突]
A --> D[复杂任务失败率高]
E[多 Agent 的优势] --> F[专业分工]
E --> G[并行处理]
E --> H[降低单点复杂度]
style A fill:#ffcdd2,stroke:#c62828,stroke-width:2px
style E fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
常见架构模式
模式 1:主从架构
一个 Orchestrator 负责分发任务给 Worker Agents。
graph TB
U[用户任务] --> O[Orchestrator Agent]
O --> W1[Researcher Agent]
O --> W2[Writer Agent]
O --> W3[Reviewer Agent]
W1 --> O
W2 --> O
W3 --> O
O --> R[最终结果]
style O fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style W1 fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style W2 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style W3 fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
"""
主从架构 Multi-Agent 系统
"""
from openai import OpenAI
import json
class WorkerAgent:
"""工作 Agent"""
def __init__(self, name: str, role: str, instructions: str):
self.name = name
self.role = role
self.instructions = instructions
self.client = OpenAI()
def execute(self, task: str, context: str = "") -> str:
"""执行分配的任务"""
messages = [
{
"role": "system",
"content": f"你是 {self.role}。{self.instructions}",
},
{
"role": "user",
"content": f"任务: {task}\n\n上下文: {context}" if context else task,
},
]
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0.3,
)
return response.choices[0].message.content
class OrchestratorAgent:
"""编排 Agent - 负责任务分解和协调"""
def __init__(self):
self.client = OpenAI()
self.workers: dict[str, WorkerAgent] = {}
def add_worker(self, worker: WorkerAgent) -> None:
"""注册工作 Agent"""
self.workers[worker.name] = worker
def run(self, task: str) -> str:
"""编排执行任务"""
# 1. 分解任务
plan = self._plan(task)
print(f"\n📋 任务计划 ({len(plan)} 步):")
for step in plan:
print(f" → {step['worker']}: {step['task']}")
# 2. 按顺序执行
results = {}
for step in plan:
worker_name = step["worker"]
sub_task = step["task"]
if worker_name not in self.workers:
results[worker_name] = f"未找到 Agent: {worker_name}"
continue
# 构建上下文(包括之前的结果)
context = "\n".join(
f"[{k} 的结果]: {v[:500]}"
for k, v in results.items()
)
print(f"\n🔧 {worker_name} 正在执行: {sub_task[:50]}...")
result = self.workers[worker_name].execute(sub_task, context)
results[worker_name] = result
print(f" ✅ 完成 ({len(result)} 字符)")
# 3. 汇总
return self._synthesize(task, results)
def _plan(self, task: str) -> list[dict]:
"""分解任务为子任务"""
workers_desc = "\n".join(
f"- {name}: {w.role}" for name, w in self.workers.items()
)
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": f"""你是任务规划器。将任务分解给以下 Agent:
{workers_desc}
返回 JSON 数组: [{{"worker": "agent名称", "task": "子任务描述"}}]""",
},
{"role": "user", "content": task},
],
temperature=0,
response_format={"type": "json_object"},
)
try:
data = json.loads(response.choices[0].message.content)
return data if isinstance(data, list) else data.get("steps", [])
except json.JSONDecodeError:
return [{"worker": list(self.workers.keys())[0], "task": task}]
def _synthesize(self, task: str, results: dict) -> str:
"""汇总所有结果"""
parts = "\n\n".join(
f"## {name} 的结果\n{result}" for name, result in results.items()
)
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "将各 Agent 的结果汇总为最终回答。"},
{"role": "user", "content": f"原始任务: {task}\n\n{parts}"},
],
)
return response.choices[0].message.content
# ==================
# 使用示例:技术调研团队
# ==================
orchestrator = OrchestratorAgent()
orchestrator.add_worker(WorkerAgent(
name="researcher",
role="技术调研员",
instructions="负责调研技术方案,收集优缺点,列出关键参考信息。",
))
orchestrator.add_worker(WorkerAgent(
name="analyst",
role="技术分析师",
instructions="负责对比分析不同方案,给出量化评估。",
))
orchestrator.add_worker(WorkerAgent(
name="writer",
role="技术文档撰写者",
instructions="将调研和分析结果整理为结构化的技术报告。",
))
result = orchestrator.run("调研 2025 年主流的向量数据库,推荐最适合生产环境的方案")
print(result)
模式 2:流水线架构
Agent 按顺序处理,像工厂流水线:
graph LR
A[输入] --> B[Agent 1 - 收集]
B --> C[Agent 2 - 分析]
C --> D[Agent 3 - 撰写]
D --> E[Agent 4 - 审核]
E --> F[输出]
style B fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style C fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style D fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style E fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
"""
流水线架构
"""
class Pipeline:
"""Agent 流水线"""
def __init__(self):
self.stages: list[WorkerAgent] = []
def add_stage(self, agent: WorkerAgent) -> "Pipeline":
"""添加流水线阶段"""
self.stages.append(agent)
return self
def run(self, input_data: str) -> str:
"""按顺序执行所有阶段"""
current = input_data
for i, agent in enumerate(self.stages):
print(f"\n[Stage {i + 1}/{len(self.stages)}] {agent.name}")
current = agent.execute(current)
print(f" 输出: {current[:100]}...")
return current
# 使用:代码审查流水线
pipeline = Pipeline()
pipeline.add_stage(WorkerAgent(
"parser", "代码解析器",
"分析代码结构,列出函数、类和关键逻辑。"
))
pipeline.add_stage(WorkerAgent(
"reviewer", "代码审查员",
"审查代码质量,找出潜在问题和改进点。"
))
pipeline.add_stage(WorkerAgent(
"reporter", "报告生成器",
"将审查结果整理为结构化的审查报告,包含严重等级。"
))
report = pipeline.run("def calc(x): return x*2+1 # 需要审查的代码")
模式 3:辩论架构
多个 Agent 针对同一问题给出不同观点,最终由 Judge 裁定:
graph TB
Q[问题] --> A1[Agent A - 正方]
Q --> A2[Agent B - 反方]
A1 --> D[辩论]
A2 --> D
D --> J[Judge Agent - 裁决]
J --> R[最终结论]
style J fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style A1 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style A2 fill:#ffcdd2,stroke:#c62828,stroke-width:2px
"""
辩论架构 - 通过对抗提升回答质量
"""
class DebateSystem:
"""辩论系统"""
def __init__(self):
self.client = OpenAI()
def debate(self, question: str, rounds: int = 2) -> str:
"""执行辩论"""
positions = []
# 正方观点
pro = self._get_position(question, "supporter", positions)
positions.append(f"正方: {pro}")
print(f"正方: {pro[:100]}...")
# 反方观点
con = self._get_position(question, "critic", positions)
positions.append(f"反方: {con}")
print(f"反方: {con[:100]}...")
# 多轮辩论
for r in range(rounds - 1):
pro = self._get_position(question, "supporter", positions)
positions.append(f"正方回应: {pro}")
con = self._get_position(question, "critic", positions)
positions.append(f"反方回应: {con}")
# 裁决
verdict = self._judge(question, positions)
return verdict
def _get_position(
self, question: str, role: str, history: list[str]
) -> str:
"""获取立场"""
if role == "supporter":
instruction = "你是正方,积极支持这个观点。提供论据和证据。"
else:
instruction = "你是反方,批判性地审视这个观点。指出问题和风险。"
context = "\n".join(history) if history else "这是第一轮发言。"
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": instruction},
{"role": "user", "content": f"议题: {question}\n\n之前发言:\n{context}"},
],
max_tokens=300,
)
return response.choices[0].message.content
def _judge(self, question: str, positions: list[str]) -> str:
"""裁决"""
debate_log = "\n\n".join(positions)
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "你是公正的裁判。综合正反双方观点,给出客观平衡的结论。",
},
{"role": "user", "content": f"议题: {question}\n\n辩论记录:\n{debate_log}"},
],
)
return response.choices[0].message.content
架构选择指南
| 架构 | 适用场景 | 复杂度 |
|---|---|---|
| 主从架构 | 多种专业技能的任务 | 中 |
| 流水线 | 有明确处理顺序的任务 | 低 |
| 辩论架构 | 需要多角度分析的决策 | 中 |
本章小结
- 多 Agent 系统通过分工协作解决复杂问题
- 主从架构最灵活,Orchestrator 负责分发协调
- 流水线架构最简单,适合顺序处理任务
- 辩论架构通过对抗提升回答质量
- 选择架构时要考虑任务特点和系统复杂度
下一章:学习多 Agent 间的任务分解与协作机制。