尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

OpenHarness源码研究-4-AgentLoop对话引擎与工具系统

OpenHarness源码研究-4-AgentLoop对话引擎与工具系统
📅 发布时间:2026/7/1 3:06:58

OpenHarness源码研究-4-AgentLoop对话引擎与工具系统

从debug说起

第2篇我们写了oh -p "你是谁"的 debug 配置。断点打在run_print_mode()里,跟着调用栈一路往下走,能看到这样一条链路:

run_print_mode() ui/app.py └─ build_runtime(...) ui/runtime.py └─ _resolve_api_client_from_settings() └─ QueryEngine(...) └─ start_runtime(bundle) └─ handle_line(bundle, "你是谁", ...) ui/runtime.py └─ engine.submit_message("你是谁") engine/query_engine.py └─ run_query(context, messages) engine/query.py

第3篇分析了 API Client 层(_resolve_api_client_from_settings那段),这篇讲剩下的:消息模型、Agent Loop、工具系统、Runtime 组装、System Prompt。

消息模型

不同 LLM 的 API 格式不一样,但引擎内部需要一套统一的消息表示。这就是engine/messages.py的职责。

# engine/messages.py 第14-61行 class TextBlock(BaseModel): type: Literal["text"] = "text" text: str class ImageBlock(BaseModel): type: Literal["image"] = "image" media_type: str data: str # base64编码 class ToolUseBlock(BaseModel): type: Literal["tool_use"] = "tool_use" id: str # 工具调用的唯一ID name: str # 工具名 input: dict[str, Any] # 参数 class ToolResultBlock(BaseModel): type: Literal["tool_result"] = "tool_result" tool_use_id: str # 关联到哪个tool_use content: str # 执行结果文本 is_error: bool = False # 是否执行出错 ContentBlock = TextBlock | ImageBlock | ToolUseBlock | ToolResultBlock class ConversationMessage(BaseModel): role: Literal["user", "assistant"] content: list[ContentBlock]

四个 ContentBlock 覆盖了对话中的所有信息类型。role 只有 user 和 assistant 两种——system prompt 不在这里,它作为独立参数传给 API。

这套模型是 Anthropic Messages API 的原生格式。OpenAICompatibleClient 把它翻译成 OpenAI 格式(第3篇分析过),AnthropicApiClient 直接序列化:

# engine/messages.py 第92-97行 def to_api_param(self) -> dict[str, Any]: return { "role": self.role, "content": [serialize_content_block(block) for block in self.content], }

消息模型决定了整个引擎的设计。如果把 system prompt 也当作一条消息、把 tool 也当作一种 role,那代码复杂度会成倍增加——OpenAI 就是这么做的,而 Anthropic 的设计更简洁。OpenHarness 选择了 Anthropic 格式作为内部规范,翻译工作全部丢给 OpenAICompatibleClient。

AgentLoop:while循环+tool_use检测

engine/query.py的run_query()是整个项目的核心。它做的事说起来简单:

用户发一句话 → 模型回复(可能带 tool_use) → 有 tool_use?执行工具,把结果喂回去 → 模型再回复 → 还有 tool_use?继续循环 → 没有了,结束

用代码表达:

# engine/query.py 第88-143行(简化) turn_count = 0 while context.max_turns is None or turn_count < context.max_turns: turn_count += 1 # 上下文压缩检查 messages, was_compacted = await auto_compact_if_needed(...) # 调模型 async for event in context.api_client.stream_message(request): if isinstance(event, ApiTextDeltaEvent): yield AssistantTextDelta(text=event.text) # 逐字流式输出 elif isinstance(event, ApiMessageCompleteEvent): final_message = event.message # 模型说完了 messages.append(final_message) # 没有工具调用 → 结束 if not final_message.tool_uses: return # 有工具调用 → 执行 if len(tool_calls) == 1: result = await _execute_tool_call(...) # 单工具:顺序执行 tool_results = [result] else: results = await asyncio.gather(*[_run(tc) for tc in tool_calls]) # 多工具:并发 tool_results = list(results) # 把工具结果作为新的user消息追加 messages.append(ConversationMessage(role="user", content=tool_results)) # → 继续循环

几个关键设计决策:

1. 单工具顺序 vs 多工具并发的选择

如果模型一次返回了多个 tool_use(比如同时读3个文件),用asyncio.gather并发执行。只返回一个时走顺序路径。这个分支不是无谓的优化——并发时需要等所有工具都完成才能通知 UI 更新,顺序时则不需要等。两种场景的 UI 事件发送时序不同。

2. max_turns 的硬截断

# engine/query.py 第42-45行 class MaxTurnsExceeded(RuntimeError): def __init__(self, max_turns: int) -> None: super().__init__(f"Exceeded maximum turn limit ({max_turns})")

默认 200 轮。一次"turn" = 模型回复 + 可能的一次工具调用。200 轮对于大多数任务绰绰有余,但如果不设限,一个死循环的工具调用就能烧掉大量 token。这个截断是防御性的。

3. auto-compact:上下文太长怎么办

在每轮循环开始时检查:

# engine/query.py 第91-97行 messages, was_compacted = await auto_compact_if_needed( messages, api_client=context.api_client, model=context.model, system_prompt=context.system_prompt, state=compact_state, )

上下文压缩分两步:先尝试 microcompact(把旧工具结果的 content 清掉换成占位文本,成本极低),如果还不够,再做 full compact(调 LLM 对旧消息做摘要,成本较高但有意义)。阈值是AUTOCOMPACT_BUFFER_TOKENS = 13000,给模型预留的输出空间。

流式事件-引擎怎么通知UI

引擎在执行过程中产生 6 种事件:

# engine/stream_events.py AssistantTextDelta → 模型正在逐字输出,这是下一个字 AssistantTurnComplete → 模型说完了,附完整消息+token用量 ToolExecutionStarted → 开始执行工具,告诉UI显示了 ToolExecutionCompleted → 工具执行完毕,附输出 ErrorEvent → 出错了 StatusEvent → 临时状态通知(重试中之类)

UI 层只消费这些事件,不关心事件是怎么产生的。run_print_mode()和run_repl()处理同一套事件,只是渲染方式不同。这个设计和前端框架里的"状态管理"是一个思路——引擎是 store,事件是 action,UI 是 view。

工具系统-AI操控电脑的接口

tools/base.py定义了工具的契约:

class BaseTool(ABC): name: str description: str input_model: type[BaseModel] # Pydantic模型,自动生成JSON Schema async def execute(self, arguments, context) -> ToolResult: """执行工具""" def is_read_only(self, arguments) -> bool: """是否只读。权限检查用""" def to_api_schema(self) -> dict: """转为API要求的JSON Schema格式"""

每个工具就是实现这 5 个东西。以文件读取为例:

# tools/file_read_tool.py class FileReadTool(BaseTool): name = "read_file" description = "Read a text file from the local repository." input_model = FileReadToolInput # path + offset + limit def is_read_only(self, arguments): return True # 读文件是只读操作 async def execute(self, arguments, context): # 读文件、编号行号、返回

is_read_only是关键——它直接关联权限系统。读操作自动放行,写操作触发权限检查。

Bash 工具更复杂一些。它的is_read_only默认返回 False(没法静态判断一个 shell 命令是不是只读),所以总是需要权限确认,除非用户在 full_auto 模式下:

# tools/bash_tool.py 第30-79行 async def execute(self, arguments, context): process = await create_shell_subprocess(arguments.command, cwd=cwd, ...) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=...) # 超时则 kill 进程 # 输出截断到 12000 字符 return ToolResult(output=text, is_error=process.returncode != 0, ...)

AgentTool 更有意思——工具里调子 Agent:

# tools/agent_tool.py 第43-94行 async def execute(self, arguments, context): executor = registry.get_executor("subprocess") config = TeammateSpawnConfig( name=arguments.subagent_type, prompt=arguments.prompt, cwd=str(context.cwd), ... ) result = await executor.spawn(config) return ToolResult(output=f"Spawned agent {result.agent_id} ...")

工具不再是简单的文件操作,而是可以递归地启动新的 Agent 进程。这就从"工具调用"升级到了"多 Agent 协作"。

Runtime组装-build_runtime的12步

ui/runtime.py的build_runtime()把所有零件装到一起:

# ui/runtime.py 第163-301行 async def build_runtime(cwd, model, ...): # 1. 合并 CLI 覆盖到 settings settings = load_settings().merge_cli_overrides(...) # 2. 加载 plugins plugins = load_plugins(settings, cwd, ...) # 3. 创建 API Client resolved_api_client = _resolve_api_client_from_settings(settings) # 4. 连接 MCP servers mcp_manager = McpClientManager(load_mcp_server_configs(...)) await mcp_manager.connect_all() # 5. 创建 ToolRegistry(内置工具 + MCP 工具) tool_registry = create_default_tool_registry(mcp_manager) # 6. 初始化 AppState(UI 状态) app_state = AppStateStore(AppState(...)) # 7. 加载 Hooks hook_executor = HookExecutor(hook_reloader.current_registry(), ...) # 8. 构建 System Prompt system_prompt_text = build_runtime_system_prompt(settings, ...) # 9. 创建 QueryEngine engine = QueryEngine(api_client=..., tool_registry=..., ...) # 10. 恢复历史消息(如果有) if restore_messages: engine.load_messages(restored) # 11. 注册 slash 命令 commands = create_default_command_registry(...) # 12. 打包成 RuntimeBundle 返回 return RuntimeBundle(api_client=..., engine=..., ...)

这 12 步的依赖关系是单向的:settings 在最前面,engine 在最后面。RuntimeBundle只是一个 dataclass,把所有东西打成一个包。后续handle_line()从这个包里取东西用。

handle_line()是交互的核心——它判断用户输入是 slash 命令还是普通对话:

# ui/runtime.py 第428-567行 async def handle_line(bundle, line, ...): parsed = bundle.commands.lookup(line) if parsed is not None: # slash命令 → 走 CommandHandler result = await command.handler(args, context) # 可能触发 prompt 提交或 pending continuation else: # 普通对话 → 直接送 engine async for event in bundle.engine.submit_message(line): await render_event(event) # UI层渲染每个事件 bundle.session_backend.save_snapshot(...) # 自动存档

SystemPrompt-AI看到的第一段话

System Prompt 不是一段写死的文本,而是在build_runtime_system_prompt()里动态拼装的:

# prompts/context.py 第46-120行 def build_runtime_system_prompt(settings, cwd, ...): sections = [ build_system_prompt(), # 基础prompt(BASE_SYSTEM_PROMPT + 环境信息) f"Effort: {settings.effort}", # 推理设置 ] # Skills列表 skills = load_skill_registry(cwd, ...) if skills: sections.append(skills_section) # CLAUDE.md项目指令 claude_md = load_claude_md_prompt(cwd) if claude_md: sections.append(claude_md) # Issue/PR上下文(如果有) if issue_file.exists(): sections.append(issue_content) # 记忆系统 if settings.memory.enabled: memory_section = load_memory_prompt(cwd) sections.append(memory_section) relevant = find_relevant_memories(query, cwd) sections.append(relevant_section) return "\n\n".join(sections)

基础 prompt 本身就包含了环境信息——OS、Shell、日期、Git 分支等,由get_environment_info()自动探测。这样 AI 不用问"你是什么系统"就能直接给出正确的命令。

CLAUDE.md是放在项目根目录的一个文件,用户在里面写项目约定和偏好。它会被原样注入 System Prompt。这和 GitHub Copilot 的.github/copilot-instructions.md是一个思路。

总结

  • Agent Loop 的核心就是一个 while 循环:模型说一句 → 有 tool_use 就执行 → 结果喂回去 → 继续
  • 单工具顺序执行,多工具asyncio.gather并发。分支的原因不是性能,是 UI 事件时序不同
  • max_turns=200 是防御性的硬截断,auto-compact 在每轮循环前检查是否需要压缩上下文
  • BaseTool 的 5 个契约方法构成了工具系统的基础,is_read_only直接关联权限
  • RuntimeBundle 是会话的"零件包",12 步装配顺序是单向依赖
  • System Prompt 是动态拼装的:base + 环境 + skills + CLAUDE.md + 记忆

写到最后

相关新闻

  • 如何深度掌控AMD Ryzen处理器:专业硬件调试工具完全指南
  • 机器人-混合关节架构
  • How To Secure A Linux Server:一份持续更新的服务器安全加固手册

最新新闻

  • AI 编程这事,已经开始变味了
  • 低场MRI仿真系统设计与磁场不均匀性校正技术
  • 工业蒸汽流量计首选品牌:高精度与高稳定性双保障
  • Visual C++运行库终极修复指南:3分钟解决所有软件启动错误
  • ClickHouse 分布式表:从分片路由到副本同步,列式存储的分布式查询引擎
  • 工业级Modbus协议栈架构深度解析:FreeModbus V1.6主机模式技术实现全解

日新闻

  • 2026年6月公司网站搭建最新热门渠道测评:四大低成本/零代码平台对比+避坑
  • 【Linux】Linux arm 编译QT程序,出现expected “}“报错
  • 【MATLAB例程】四基站二维AOA定位与距离辅助增强对比仿真。基于角度观测和测距修正的固定目标平面定位精度分析

周新闻

  • Windows字体自定义终极方案:No!! MeiryoUI完全指南
  • Deepin Boot Maker:告别命令行,3分钟制作Linux启动盘的智能解决方案
  • Plain Craft Launcher 2:重新定义你的Minecraft游戏体验

月新闻

  • 2026年6月公司网站搭建最新热门渠道测评:四大低成本/零代码平台对比+避坑
  • 【Linux】Linux arm 编译QT程序,出现expected “}“报错
  • 【MATLAB例程】四基站二维AOA定位与距离辅助增强对比仿真。基于角度观测和测距修正的固定目标平面定位精度分析

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号