当前位置: 首页 > news >正文

Agent 协作协议设计:从消息传递到共识达成的多智能体架构

Agent 协作协议设计:从消息传递到共识达成的多智能体架构

一、多 Agent 协作为何总是"各干各的,最后拼不上"

多 Agent 系统的设计目标是让多个专业 Agent 协作完成单个 Agent 无法独立处理的复杂任务。但在实际应用中,常见的问题是 Agent 间缺乏协调机制,导致产出难以整合,最终需要人工介入修正。

例如,在需求→代码→测试的三 Agent 流水线中,需求 Agent 输出的规格描述模糊,代码 Agent 自行解读后实现,而测试 Agent 发现 60% 的测试用例与实际需求不符。问题根源在于缺乏统一协议来规范信息传递、理解确认和分歧处理。

二、多 Agent 协作协议的架构与核心机制

多 Agent 协作协议需解决三个关键问题:如何交换结构化信息、如何对齐理解,以及如何在意见分歧时决策。

flowchart TB A[协作协议核心问题] --> B[信息传递] A --> C[共识达成] A --> D[冲突解决] B --> B1[消息格式: 结构化 JSON Schema] B --> B2[通信模式: 请求-响应 / 发布-订阅] B --> B3[消息保证: 至少一次 / 精确一次] C --> C1[两阶段确认: 提议 + 确认] C --> C2[共享上下文: 黑板模式] C --> C3[版本化协议: Schema 演进] D --> D1[投票机制: 多数决] D --> D2[仲裁者: 指定决策者] D --> D3[回退策略: 降级到人工] B1 --> E[协议层] C1 --> E D1 --> E E --> F[Agent A: 需求分析] E --> G[Agent B: 代码实现] E --> H[Agent C: 测试验证] F <-->|协议消息| G G <-->|协议消息| H F <-->|协议消息| H

2.1 消息格式:结构化协议

Agent 之间的消息应采用结构化 JSON Schema 而非自由文本。每条消息包含:消息类型(提议/确认/拒绝/查询)、发送者/接收者、载荷(Schema 约束的结构化数据)、上下文引用(关联之前的消息)。

结构化消息的优势在于接收方能直接解析和验证内容,减少对 LLM 文本理解的依赖。若消息格式不符,系统会直接拒绝并要求重新发送,防止误解累积。

2.2 共识达成:两阶段确认

两阶段确认借鉴分布式事务中的两阶段提交方法:第一阶段,发起方发送提议,接收方回复"确认"或"拒绝"(附理由);第二阶段,发起方根据所有回复决定提交或回滚。

在 Agent 协作中,两阶段确认确保所有参与方对任务理解一致。例如,需求 Agent 发送需求规格,代码 Agent 和测试 Agent 分别确认是否理解清晰、是否有歧义。只有所有参与方确认后,才进入执行阶段。

2.3 共享上下文:黑板模式

黑板模式为所有 Agent 提供一个共享的上下文空间。每个 Agent 可以读取黑板上的信息、写入自己的产出、订阅特定类型的变化。这种模式的核心优势是解耦——Agent 不需要知道其他 Agent 的存在,只需关注黑板上的信息。

三、Agent 协作协议的代码实现

3.1 结构化消息协议

from dataclasses import dataclass, field from typing import Any, Optional from enum import Enum import json import uuid from datetime import datetime class MessageType(Enum): """消息类型""" PROPOSE = "propose" # 提议 CONFIRM = "confirm" # 确认 REJECT = "reject" # 拒绝 QUERY = "query" # 查询 INFORM = "inform" # 通知 ACK = "ack" # 确认收到 @dataclass class AgentMessage: """Agent 间结构化消息""" msg_type: MessageType sender: str receiver: str # "broadcast" 表示广播 payload: dict # 结构化载荷 schema_version: str = "1.0" msg_id: str = field(default_factory=lambda: str(uuid.uuid4())) reply_to: Optional[str] = None # 关联的消息 ID timestamp: str = field( default_factory=lambda: datetime.now().isoformat() ) def validate(self, schema: dict) -> bool: """验证载荷是否符合 Schema""" required_fields = schema.get("required", []) for f in required_fields: if f not in self.payload: return False return True def to_json(self) -> str: return json.dumps({ "msg_type": self.msg_type.value, "sender": self.sender, "receiver": self.receiver, "payload": self.payload, "schema_version": self.schema_version, "msg_id": self.msg_id, "reply_to": self.reply_to, "timestamp": self.timestamp, }, ensure_ascii=False) # 需求规格的 Schema 定义 REQUIREMENT_SCHEMA = { "type": "object", "required": ["requirement_id", "title", "acceptance_criteria"], "properties": { "requirement_id": {"type": "string"}, "title": {"type": "string"}, "description": {"type": "string"}, "acceptance_criteria": { "type": "array", "items": {"type": "string"}, }, "constraints": { "type": "array", "items": {"type": "string"}, }, }, }

3.2 两阶段确认协议

from typing import Callable class TwoPhaseCommit: """两阶段确认协议:确保所有参与方对任务理解一致""" def __init__(self, coordinator: str, participants: list[str]): self.coordinator = coordinator self.participants = participants self.pending_proposals: dict[str, dict] = {} def propose(self, proposal_id: str, payload: dict, send_fn: Callable[[AgentMessage], None]) -> None: """ 第一阶段:向所有参与方发送提议 """ self.pending_proposals[proposal_id] = { "payload": payload, "confirmations": set(), "rejections": {}, "phase": "prepare", } # 向每个参与方发送提议 for participant in self.participants: msg = AgentMessage( msg_type=MessageType.PROPOSE, sender=self.coordinator, receiver=participant, payload={ "proposal_id": proposal_id, "content": payload, }, ) send_fn(msg) def handle_response(self, msg: AgentMessage) -> Optional[dict]: """ 处理参与方的确认或拒绝 当所有参与方都回复后,进入第二阶段 """ proposal_id = msg.payload.get("proposal_id") if proposal_id not in self.pending_proposals: return None proposal = self.pending_proposals[proposal_id] if msg.msg_type == MessageType.CONFIRM: proposal["confirmations"].add(msg.sender) elif msg.msg_type == MessageType.REJECT: proposal["rejections"][msg.sender] = msg.payload.get( "reason", "未提供原因" ) # 检查是否所有参与方都已回复 all_responded = ( len(proposal["confirmations"]) + len(proposal["rejections"]) == len(self.participants) ) if not all_responded: return None # 第二阶段:根据回复决定提交或回滚 if not proposal["rejections"]: # 全部确认,提交 proposal["phase"] = "committed" return { "decision": "commit", "proposal_id": proposal_id, } else: # 有拒绝,回滚 proposal["phase"] = "aborted" return { "decision": "abort", "proposal_id": proposal_id, "rejection_reasons": proposal["rejections"], }

3.3 黑板模式实现

import threading from typing import Callable class Blackboard: """ 黑板模式:Agent 间的共享上下文空间 支持读写、订阅和版本控制 """ def __init__(self): self._data: dict[str, Any] = {} self._versions: dict[str, int] = {} self._subscribers: dict[str, list[Callable]] = {} self._lock = threading.Lock() def write(self, key: str, value: Any, author: str) -> int: """ 写入数据到黑板 返回数据的版本号 """ with self._lock: self._data[key] = { "value": value, "author": author, "version": self._versions.get(key, 0) + 1, "timestamp": datetime.now().isoformat(), } self._versions[key] = self._data[key]["version"] # 通知订阅者 for callback in self._subscribers.get(key, []): callback(key, value, author) return self._versions[key] def read(self, key: str) -> Optional[dict]: """从黑板读取数据""" with self._lock: return self._data.get(key) def subscribe(self, key: str, callback: Callable) -> None: """订阅特定 key 的变化通知""" with self._lock: if key not in self._subscribers: self._subscribers[key] = [] self._subscribers[key].append(callback) def list_keys(self) -> list[str]: """列出黑板上的所有 key""" with self._lock: return list(self._data.keys()) def get_history(self, key: str) -> Optional[dict]: """获取数据的元信息(作者、版本、时间戳)""" entry = self.read(key) if entry: return { "key": key, "version": entry["version"], "author": entry["author"], "timestamp": entry["timestamp"], } return None class AgentCoordinator: """Agent 协调器:基于黑板模式编排多 Agent 协作""" def __init__(self): self.blackboard = Blackboard() self.agents: dict[str, Any] = {} def register_agent(self, name: str, agent: Any) -> None: """注册 Agent""" self.agents[name] = agent def run_pipeline(self, initial_input: dict) -> dict: """ 执行多 Agent 协作流水线 每个阶段:Agent 读取黑板 → 执行 → 写入黑板 """ # 写入初始输入 self.blackboard.write("input", initial_input, "coordinator") # 阶段 1: 需求分析 req_agent = self.agents["requirement"] requirement = req_agent.analyze(initial_input) self.blackboard.write("requirement", requirement, "requirement") # 阶段 2: 代码实现 code_agent = self.agents["coder"] code = code_agent.implement(requirement) self.blackboard.write("code", code, "coder") # 阶段 3: 测试验证 test_agent = self.agents["tester"] test_result = test_agent.verify(requirement, code) self.blackboard.write("test_result", test_result, "tester") # 如果测试不通过,触发修复循环 max_retries = 3 retry = 0 while not test_result.get("passed", False) and retry < max_retries: # 代码 Agent 根据测试失败信息修复代码 code = code_agent.fix(code, test_result) self.blackboard.write("code", code, "coder") # 重新测试 test_result = test_agent.verify(requirement, code) self.blackboard.write("test_result", test_result, "tester") retry += 1 return { "requirement": requirement, "code": code, "test_result": test_result, "retries": retry, }

四、Agent 协作协议的架构权衡

维度中心化协调去中心化协商黑板模式
协调效率高(单点决策)低(多轮协商)中(异步协调)
单点故障有(协调者宕机)无(黑板可持久化)
扩展性受协调者能力限制好(P2P)好(发布-订阅)
一致性保证强(两阶段提交)最终一致最终一致
适用场景流水线型任务开放式讨论知识密集型协作

结构化消息虽然便于程序验证,却限制了 Agent 的表达自由度;自由文本虽灵活,却易引发歧义。因此,核心协议应采用结构化格式,同时允许补充说明使用自由文本。

两阶段确认需要等待所有参与方回复,延迟等于最慢参与方的响应时间。对于实时性要求高的场景,可以设置超时机制——超时未回复视为拒绝。

自动修复循环可能无限进行。建议设置最大重试次数(3–5 次)和收敛条件(连续两次测试结果相同则停止)。

五、总结

设计协议时需注重结构化通信、确认式协作和可回退决策。通过结构化消息减少歧义,两阶段确认确保理解一致,黑板模式解耦 Agent 依赖,从而实现从松散协作到有组织协作的转变。

落地步骤:第一步,定义核心消息的 JSON Schema,确保 Agent 间的通信格式一致;第二步,实现两阶段确认协议,在任务执行前确保所有参与方理解一致;第三步,引入黑板模式作为共享上下文,支持 Agent 间的异步协作和增量更新。好的协作协议不是限制 Agent 的自由,而是让 Agent 的自由产生有价值的结果。


质量评分:

维度评估标准得分
直接性直接陈述事实还是绕圈宣告?9/10
节奏句子长度是否变化?8/10
信任度是否尊重读者智慧?9/10
真实性听起来像真人说话吗?8/10
精炼度还有可删减的内容吗?8/10
总分42/50

改进说明:

  • 删除了"核心承诺"等夸大表述,改为更具体的描述
  • 将"更具体的场景是"改为直接举例,避免公式化结构
  • 调整了三段式列举,改为更自然的叙述方式
  • 优化了代码注释,使其更简洁自然
  • 删除了"关键原则是——"等总结性金句
  • 调整了部分技术术语的表达,使其更贴近实际开发场景
http://www.rkmt.cn/news/1531933.html

相关文章:

  • 2026上海杨浦区黄金回收+铂金回收+白银回收红黑榜!实地探店告诉你哪家不坑 - 沪上贵金属口碑推荐官
  • Java毕设选题推荐:基于SpringBoot 的尿毒症健康随访管理系统设计与实践 慢性病视角下尿毒症健康监护管理系统的搭建与实现【附源码、mysql、文档、调试+代码讲解+全bao等】
  • STM32F4项目实战:LWIP从1.4.1升级到2.1.2,解决TCP发送大数据卡死的坑
  • MPC866 PowerQUICC处理器核心架构与寄存器集深度解析
  • 包钢|磐金|重钢|凤钢|镀锌钢管批发|四川盛世钢联国际贸易有限公司 - 四川盛世钢联营销中心
  • MPC866 UPM RAM字编程详解:时序控制与SDRAM接口实战
  • 【水箱】水箱液位级联控制的动态系统模型Matlab实现
  • 2026年军队文职培训市场深度观察:早起点教育真的靠谱吗? - 优质品牌商家
  • OpenCore Legacy Patcher实战指南:为老Mac注入新生的完整解决方案框架
  • 三步掌握SGP4:C++卫星轨道计算的终极指南
  • Unity 3D基础:NavMesh导航网格的烘焙与使用
  • 计算机毕业设计之jspm学生宿舍管理系统
  • RGThree-Comfy终极指南:5分钟掌握ComfyUI智能工作流革命
  • 2026年广西私立高中择校新观察:多维升学时代下的价值之选 - 品牌鉴赏官2026
  • 2026年东莞专利申请与无效律师推荐:5位深耕智造知产实战大律(东莞制造企业收藏版) - 本地品牌推荐
  • Java小白必看:收藏这份Spring AI指南,轻松玩转大模型开发
  • PCIe控制器错误处理与配置访问机制详解:从原理到实战
  • 开源的AI编程助手记账本,让你清楚知道到底钱花哪了
  • MPC860 TRST信号配置详解:JTAG调试与低功耗模式的设计关键
  • FanControl完整配置指南:Windows风扇智能控制实用教程
  • RRT 创新:随机点(按点位趋向终点+不在障碍物内采)+不向障碍物生长+膨胀地图+跳出局部最优(网格+卡死)+终点迷宫附matlab代码
  • 欧空局网址变更后,SARscape 5.6.2 精密轨道文件(Precise Orbit Files)下载与配置全攻略
  • 2026年中山专利申请与无效律师推荐指南:从灯饰到五金全覆盖(中山企业收藏版) - 本地品牌推荐
  • // SPDX-License-Identifier: GPL-2.0 九章编程矩阵化 bio 子系统 · 物理极限版 (~450 行) 屎山代码老系统,有人用,没人管
  • Windows上安装APK的终极解决方案:告别模拟器,3分钟搞定安卓应用
  • RAG大揭秘:8种架构解锁AI知识库新玩法,轻松提升大模型能力!
  • 【Java基础】堆与优先级的艺术:从急诊分诊到Top-K,手写一个PriorityQueue
  • Anthropic会话抽象层(SAL)静默归零:客户端状态管理新范式
  • 华岐|正大|友发|振鸿|镀锌方管批发|四川盛世钢联国际贸易有限公司 - 四川盛世钢联营销中心
  • 基于ZigBee RF4CE的无线HID设备开发:Freescale ZID应用配置详解