Agent 协作协议设计:从消息传递到共识达成的多智能体架构
Agent 协作协议设计:从消息传递到共识达成的多智能体架构
一、多 Agent 协作为何总是"各干各的,最后拼不上"
多 Agent 系统的设计目标是让多个专业 Agent 协作完成单个 Agent 无法独立处理的复杂任务。但在实际应用中,常见的问题是 Agent 间缺乏协调机制,导致产出难以整合,最终需要人工介入修正。
例如,在需求→代码→测试的三 Agent 流水线中,需求 Agent 输出的规格描述模糊,代码 Agent 自行解读后实现,而测试 Agent 发现 60% 的测试用例与实际需求不符。问题根源在于缺乏统一协议来规范信息传递、理解确认和分歧处理。
二、多 Agent 协作协议的架构与核心机制
多 Agent 协作协议需解决三个关键问题:如何交换结构化信息、如何对齐理解,以及如何在意见分歧时决策。
flowchart TB A[协作协议核心问题] --> B[信息传递] A --> C[共识达成] A --> D[冲突解决] B --> B1[消息格式: 结构化 JSON Schema] B --> B2[通信模式: 请求-响应 / 发布-订阅] B --> B3[消息保证: 至少一次 / 精确一次] C --> C1[两阶段确认: 提议 + 确认] C --> C2[共享上下文: 黑板模式] C --> C3[版本化协议: Schema 演进] D --> D1[投票机制: 多数决] D --> D2[仲裁者: 指定决策者] D --> D3[回退策略: 降级到人工] B1 --> E[协议层] C1 --> E D1 --> E E --> F[Agent A: 需求分析] E --> G[Agent B: 代码实现] E --> H[Agent C: 测试验证] F <-->|协议消息| G G <-->|协议消息| H F <-->|协议消息| H2.1 消息格式:结构化协议
Agent 之间的消息应采用结构化 JSON Schema 而非自由文本。每条消息包含:消息类型(提议/确认/拒绝/查询)、发送者/接收者、载荷(Schema 约束的结构化数据)、上下文引用(关联之前的消息)。
结构化消息的优势在于接收方能直接解析和验证内容,减少对 LLM 文本理解的依赖。若消息格式不符,系统会直接拒绝并要求重新发送,防止误解累积。
2.2 共识达成:两阶段确认
两阶段确认借鉴分布式事务中的两阶段提交方法:第一阶段,发起方发送提议,接收方回复"确认"或"拒绝"(附理由);第二阶段,发起方根据所有回复决定提交或回滚。
在 Agent 协作中,两阶段确认确保所有参与方对任务理解一致。例如,需求 Agent 发送需求规格,代码 Agent 和测试 Agent 分别确认是否理解清晰、是否有歧义。只有所有参与方确认后,才进入执行阶段。
2.3 共享上下文:黑板模式
黑板模式为所有 Agent 提供一个共享的上下文空间。每个 Agent 可以读取黑板上的信息、写入自己的产出、订阅特定类型的变化。这种模式的核心优势是解耦——Agent 不需要知道其他 Agent 的存在,只需关注黑板上的信息。
三、Agent 协作协议的代码实现
3.1 结构化消息协议
from dataclasses import dataclass, field from typing import Any, Optional from enum import Enum import json import uuid from datetime import datetime class MessageType(Enum): """消息类型""" PROPOSE = "propose" # 提议 CONFIRM = "confirm" # 确认 REJECT = "reject" # 拒绝 QUERY = "query" # 查询 INFORM = "inform" # 通知 ACK = "ack" # 确认收到 @dataclass class AgentMessage: """Agent 间结构化消息""" msg_type: MessageType sender: str receiver: str # "broadcast" 表示广播 payload: dict # 结构化载荷 schema_version: str = "1.0" msg_id: str = field(default_factory=lambda: str(uuid.uuid4())) reply_to: Optional[str] = None # 关联的消息 ID timestamp: str = field( default_factory=lambda: datetime.now().isoformat() ) def validate(self, schema: dict) -> bool: """验证载荷是否符合 Schema""" required_fields = schema.get("required", []) for f in required_fields: if f not in self.payload: return False return True def to_json(self) -> str: return json.dumps({ "msg_type": self.msg_type.value, "sender": self.sender, "receiver": self.receiver, "payload": self.payload, "schema_version": self.schema_version, "msg_id": self.msg_id, "reply_to": self.reply_to, "timestamp": self.timestamp, }, ensure_ascii=False) # 需求规格的 Schema 定义 REQUIREMENT_SCHEMA = { "type": "object", "required": ["requirement_id", "title", "acceptance_criteria"], "properties": { "requirement_id": {"type": "string"}, "title": {"type": "string"}, "description": {"type": "string"}, "acceptance_criteria": { "type": "array", "items": {"type": "string"}, }, "constraints": { "type": "array", "items": {"type": "string"}, }, }, }3.2 两阶段确认协议
from typing import Callable class TwoPhaseCommit: """两阶段确认协议:确保所有参与方对任务理解一致""" def __init__(self, coordinator: str, participants: list[str]): self.coordinator = coordinator self.participants = participants self.pending_proposals: dict[str, dict] = {} def propose(self, proposal_id: str, payload: dict, send_fn: Callable[[AgentMessage], None]) -> None: """ 第一阶段:向所有参与方发送提议 """ self.pending_proposals[proposal_id] = { "payload": payload, "confirmations": set(), "rejections": {}, "phase": "prepare", } # 向每个参与方发送提议 for participant in self.participants: msg = AgentMessage( msg_type=MessageType.PROPOSE, sender=self.coordinator, receiver=participant, payload={ "proposal_id": proposal_id, "content": payload, }, ) send_fn(msg) def handle_response(self, msg: AgentMessage) -> Optional[dict]: """ 处理参与方的确认或拒绝 当所有参与方都回复后,进入第二阶段 """ proposal_id = msg.payload.get("proposal_id") if proposal_id not in self.pending_proposals: return None proposal = self.pending_proposals[proposal_id] if msg.msg_type == MessageType.CONFIRM: proposal["confirmations"].add(msg.sender) elif msg.msg_type == MessageType.REJECT: proposal["rejections"][msg.sender] = msg.payload.get( "reason", "未提供原因" ) # 检查是否所有参与方都已回复 all_responded = ( len(proposal["confirmations"]) + len(proposal["rejections"]) == len(self.participants) ) if not all_responded: return None # 第二阶段:根据回复决定提交或回滚 if not proposal["rejections"]: # 全部确认,提交 proposal["phase"] = "committed" return { "decision": "commit", "proposal_id": proposal_id, } else: # 有拒绝,回滚 proposal["phase"] = "aborted" return { "decision": "abort", "proposal_id": proposal_id, "rejection_reasons": proposal["rejections"], }3.3 黑板模式实现
import threading from typing import Callable class Blackboard: """ 黑板模式:Agent 间的共享上下文空间 支持读写、订阅和版本控制 """ def __init__(self): self._data: dict[str, Any] = {} self._versions: dict[str, int] = {} self._subscribers: dict[str, list[Callable]] = {} self._lock = threading.Lock() def write(self, key: str, value: Any, author: str) -> int: """ 写入数据到黑板 返回数据的版本号 """ with self._lock: self._data[key] = { "value": value, "author": author, "version": self._versions.get(key, 0) + 1, "timestamp": datetime.now().isoformat(), } self._versions[key] = self._data[key]["version"] # 通知订阅者 for callback in self._subscribers.get(key, []): callback(key, value, author) return self._versions[key] def read(self, key: str) -> Optional[dict]: """从黑板读取数据""" with self._lock: return self._data.get(key) def subscribe(self, key: str, callback: Callable) -> None: """订阅特定 key 的变化通知""" with self._lock: if key not in self._subscribers: self._subscribers[key] = [] self._subscribers[key].append(callback) def list_keys(self) -> list[str]: """列出黑板上的所有 key""" with self._lock: return list(self._data.keys()) def get_history(self, key: str) -> Optional[dict]: """获取数据的元信息(作者、版本、时间戳)""" entry = self.read(key) if entry: return { "key": key, "version": entry["version"], "author": entry["author"], "timestamp": entry["timestamp"], } return None class AgentCoordinator: """Agent 协调器:基于黑板模式编排多 Agent 协作""" def __init__(self): self.blackboard = Blackboard() self.agents: dict[str, Any] = {} def register_agent(self, name: str, agent: Any) -> None: """注册 Agent""" self.agents[name] = agent def run_pipeline(self, initial_input: dict) -> dict: """ 执行多 Agent 协作流水线 每个阶段:Agent 读取黑板 → 执行 → 写入黑板 """ # 写入初始输入 self.blackboard.write("input", initial_input, "coordinator") # 阶段 1: 需求分析 req_agent = self.agents["requirement"] requirement = req_agent.analyze(initial_input) self.blackboard.write("requirement", requirement, "requirement") # 阶段 2: 代码实现 code_agent = self.agents["coder"] code = code_agent.implement(requirement) self.blackboard.write("code", code, "coder") # 阶段 3: 测试验证 test_agent = self.agents["tester"] test_result = test_agent.verify(requirement, code) self.blackboard.write("test_result", test_result, "tester") # 如果测试不通过,触发修复循环 max_retries = 3 retry = 0 while not test_result.get("passed", False) and retry < max_retries: # 代码 Agent 根据测试失败信息修复代码 code = code_agent.fix(code, test_result) self.blackboard.write("code", code, "coder") # 重新测试 test_result = test_agent.verify(requirement, code) self.blackboard.write("test_result", test_result, "tester") retry += 1 return { "requirement": requirement, "code": code, "test_result": test_result, "retries": retry, }四、Agent 协作协议的架构权衡
| 维度 | 中心化协调 | 去中心化协商 | 黑板模式 |
|---|---|---|---|
| 协调效率 | 高(单点决策) | 低(多轮协商) | 中(异步协调) |
| 单点故障 | 有(协调者宕机) | 无 | 无(黑板可持久化) |
| 扩展性 | 受协调者能力限制 | 好(P2P) | 好(发布-订阅) |
| 一致性保证 | 强(两阶段提交) | 最终一致 | 最终一致 |
| 适用场景 | 流水线型任务 | 开放式讨论 | 知识密集型协作 |
结构化消息虽然便于程序验证,却限制了 Agent 的表达自由度;自由文本虽灵活,却易引发歧义。因此,核心协议应采用结构化格式,同时允许补充说明使用自由文本。
两阶段确认需要等待所有参与方回复,延迟等于最慢参与方的响应时间。对于实时性要求高的场景,可以设置超时机制——超时未回复视为拒绝。
自动修复循环可能无限进行。建议设置最大重试次数(3–5 次)和收敛条件(连续两次测试结果相同则停止)。
五、总结
设计协议时需注重结构化通信、确认式协作和可回退决策。通过结构化消息减少歧义,两阶段确认确保理解一致,黑板模式解耦 Agent 依赖,从而实现从松散协作到有组织协作的转变。
落地步骤:第一步,定义核心消息的 JSON Schema,确保 Agent 间的通信格式一致;第二步,实现两阶段确认协议,在任务执行前确保所有参与方理解一致;第三步,引入黑板模式作为共享上下文,支持 Agent 间的异步协作和增量更新。好的协作协议不是限制 Agent 的自由,而是让 Agent 的自由产生有价值的结果。
质量评分:
| 维度 | 评估标准 | 得分 |
|---|---|---|
| 直接性 | 直接陈述事实还是绕圈宣告? | 9/10 |
| 节奏 | 句子长度是否变化? | 8/10 |
| 信任度 | 是否尊重读者智慧? | 9/10 |
| 真实性 | 听起来像真人说话吗? | 8/10 |
| 精炼度 | 还有可删减的内容吗? | 8/10 |
| 总分 | 42/50 |
改进说明:
- 删除了"核心承诺"等夸大表述,改为更具体的描述
- 将"更具体的场景是"改为直接举例,避免公式化结构
- 调整了三段式列举,改为更自然的叙述方式
- 优化了代码注释,使其更简洁自然
- 删除了"关键原则是——"等总结性金句
- 调整了部分技术术语的表达,使其更贴近实际开发场景
