OpenHarness源码研究-4-AgentLoop对话引擎与工具系统
从debug说起
第2篇我们写了oh -p "你是谁"的 debug 配置。断点打在run_print_mode()里,跟着调用栈一路往下走,能看到这样一条链路:
run_print_mode() ui/app.py └─ build_runtime(...) ui/runtime.py └─ _resolve_api_client_from_settings() └─ QueryEngine(...) └─ start_runtime(bundle) └─ handle_line(bundle, "你是谁", ...) ui/runtime.py └─ engine.submit_message("你是谁") engine/query_engine.py └─ run_query(context, messages) engine/query.py第3篇分析了 API Client 层(_resolve_api_client_from_settings那段),这篇讲剩下的:消息模型、Agent Loop、工具系统、Runtime 组装、System Prompt。
消息模型
不同 LLM 的 API 格式不一样,但引擎内部需要一套统一的消息表示。这就是engine/messages.py的职责。
# engine/messages.py 第14-61行 class TextBlock(BaseModel): type: Literal["text"] = "text" text: str class ImageBlock(BaseModel): type: Literal["image"] = "image" media_type: str data: str # base64编码 class ToolUseBlock(BaseModel): type: Literal["tool_use"] = "tool_use" id: str # 工具调用的唯一ID name: str # 工具名 input: dict[str, Any] # 参数 class ToolResultBlock(BaseModel): type: Literal["tool_result"] = "tool_result" tool_use_id: str # 关联到哪个tool_use content: str # 执行结果文本 is_error: bool = False # 是否执行出错 ContentBlock = TextBlock | ImageBlock | ToolUseBlock | ToolResultBlock class ConversationMessage(BaseModel): role: Literal["user", "assistant"] content: list[ContentBlock]四个 ContentBlock 覆盖了对话中的所有信息类型。role 只有 user 和 assistant 两种——system prompt 不在这里,它作为独立参数传给 API。
这套模型是 Anthropic Messages API 的原生格式。OpenAICompatibleClient 把它翻译成 OpenAI 格式(第3篇分析过),AnthropicApiClient 直接序列化:
# engine/messages.py 第92-97行 def to_api_param(self) -> dict[str, Any]: return { "role": self.role, "content": [serialize_content_block(block) for block in self.content], }消息模型决定了整个引擎的设计。如果把 system prompt 也当作一条消息、把 tool 也当作一种 role,那代码复杂度会成倍增加——OpenAI 就是这么做的,而 Anthropic 的设计更简洁。OpenHarness 选择了 Anthropic 格式作为内部规范,翻译工作全部丢给 OpenAICompatibleClient。
AgentLoop:while循环+tool_use检测
engine/query.py的run_query()是整个项目的核心。它做的事说起来简单:
用户发一句话 → 模型回复(可能带 tool_use) → 有 tool_use?执行工具,把结果喂回去 → 模型再回复 → 还有 tool_use?继续循环 → 没有了,结束用代码表达:
# engine/query.py 第88-143行(简化) turn_count = 0 while context.max_turns is None or turn_count < context.max_turns: turn_count += 1 # 上下文压缩检查 messages, was_compacted = await auto_compact_if_needed(...) # 调模型 async for event in context.api_client.stream_message(request): if isinstance(event, ApiTextDeltaEvent): yield AssistantTextDelta(text=event.text) # 逐字流式输出 elif isinstance(event, ApiMessageCompleteEvent): final_message = event.message # 模型说完了 messages.append(final_message) # 没有工具调用 → 结束 if not final_message.tool_uses: return # 有工具调用 → 执行 if len(tool_calls) == 1: result = await _execute_tool_call(...) # 单工具:顺序执行 tool_results = [result] else: results = await asyncio.gather(*[_run(tc) for tc in tool_calls]) # 多工具:并发 tool_results = list(results) # 把工具结果作为新的user消息追加 messages.append(ConversationMessage(role="user", content=tool_results)) # → 继续循环几个关键设计决策:
1. 单工具顺序 vs 多工具并发的选择
如果模型一次返回了多个 tool_use(比如同时读3个文件),用asyncio.gather并发执行。只返回一个时走顺序路径。这个分支不是无谓的优化——并发时需要等所有工具都完成才能通知 UI 更新,顺序时则不需要等。两种场景的 UI 事件发送时序不同。
2. max_turns 的硬截断
# engine/query.py 第42-45行 class MaxTurnsExceeded(RuntimeError): def __init__(self, max_turns: int) -> None: super().__init__(f"Exceeded maximum turn limit ({max_turns})")默认 200 轮。一次"turn" = 模型回复 + 可能的一次工具调用。200 轮对于大多数任务绰绰有余,但如果不设限,一个死循环的工具调用就能烧掉大量 token。这个截断是防御性的。
3. auto-compact:上下文太长怎么办
在每轮循环开始时检查:
# engine/query.py 第91-97行 messages, was_compacted = await auto_compact_if_needed( messages, api_client=context.api_client, model=context.model, system_prompt=context.system_prompt, state=compact_state, )上下文压缩分两步:先尝试 microcompact(把旧工具结果的 content 清掉换成占位文本,成本极低),如果还不够,再做 full compact(调 LLM 对旧消息做摘要,成本较高但有意义)。阈值是AUTOCOMPACT_BUFFER_TOKENS = 13000,给模型预留的输出空间。
流式事件-引擎怎么通知UI
引擎在执行过程中产生 6 种事件:
# engine/stream_events.py AssistantTextDelta → 模型正在逐字输出,这是下一个字 AssistantTurnComplete → 模型说完了,附完整消息+token用量 ToolExecutionStarted → 开始执行工具,告诉UI显示了 ToolExecutionCompleted → 工具执行完毕,附输出 ErrorEvent → 出错了 StatusEvent → 临时状态通知(重试中之类)UI 层只消费这些事件,不关心事件是怎么产生的。run_print_mode()和run_repl()处理同一套事件,只是渲染方式不同。这个设计和前端框架里的"状态管理"是一个思路——引擎是 store,事件是 action,UI 是 view。
工具系统-AI操控电脑的接口
tools/base.py定义了工具的契约:
class BaseTool(ABC): name: str description: str input_model: type[BaseModel] # Pydantic模型,自动生成JSON Schema async def execute(self, arguments, context) -> ToolResult: """执行工具""" def is_read_only(self, arguments) -> bool: """是否只读。权限检查用""" def to_api_schema(self) -> dict: """转为API要求的JSON Schema格式"""每个工具就是实现这 5 个东西。以文件读取为例:
# tools/file_read_tool.py class FileReadTool(BaseTool): name = "read_file" description = "Read a text file from the local repository." input_model = FileReadToolInput # path + offset + limit def is_read_only(self, arguments): return True # 读文件是只读操作 async def execute(self, arguments, context): # 读文件、编号行号、返回is_read_only是关键——它直接关联权限系统。读操作自动放行,写操作触发权限检查。
Bash 工具更复杂一些。它的is_read_only默认返回 False(没法静态判断一个 shell 命令是不是只读),所以总是需要权限确认,除非用户在 full_auto 模式下:
# tools/bash_tool.py 第30-79行 async def execute(self, arguments, context): process = await create_shell_subprocess(arguments.command, cwd=cwd, ...) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=...) # 超时则 kill 进程 # 输出截断到 12000 字符 return ToolResult(output=text, is_error=process.returncode != 0, ...)AgentTool 更有意思——工具里调子 Agent:
# tools/agent_tool.py 第43-94行 async def execute(self, arguments, context): executor = registry.get_executor("subprocess") config = TeammateSpawnConfig( name=arguments.subagent_type, prompt=arguments.prompt, cwd=str(context.cwd), ... ) result = await executor.spawn(config) return ToolResult(output=f"Spawned agent {result.agent_id} ...")工具不再是简单的文件操作,而是可以递归地启动新的 Agent 进程。这就从"工具调用"升级到了"多 Agent 协作"。
Runtime组装-build_runtime的12步
ui/runtime.py的build_runtime()把所有零件装到一起:
# ui/runtime.py 第163-301行 async def build_runtime(cwd, model, ...): # 1. 合并 CLI 覆盖到 settings settings = load_settings().merge_cli_overrides(...) # 2. 加载 plugins plugins = load_plugins(settings, cwd, ...) # 3. 创建 API Client resolved_api_client = _resolve_api_client_from_settings(settings) # 4. 连接 MCP servers mcp_manager = McpClientManager(load_mcp_server_configs(...)) await mcp_manager.connect_all() # 5. 创建 ToolRegistry(内置工具 + MCP 工具) tool_registry = create_default_tool_registry(mcp_manager) # 6. 初始化 AppState(UI 状态) app_state = AppStateStore(AppState(...)) # 7. 加载 Hooks hook_executor = HookExecutor(hook_reloader.current_registry(), ...) # 8. 构建 System Prompt system_prompt_text = build_runtime_system_prompt(settings, ...) # 9. 创建 QueryEngine engine = QueryEngine(api_client=..., tool_registry=..., ...) # 10. 恢复历史消息(如果有) if restore_messages: engine.load_messages(restored) # 11. 注册 slash 命令 commands = create_default_command_registry(...) # 12. 打包成 RuntimeBundle 返回 return RuntimeBundle(api_client=..., engine=..., ...)这 12 步的依赖关系是单向的:settings 在最前面,engine 在最后面。RuntimeBundle只是一个 dataclass,把所有东西打成一个包。后续handle_line()从这个包里取东西用。
handle_line()是交互的核心——它判断用户输入是 slash 命令还是普通对话:
# ui/runtime.py 第428-567行 async def handle_line(bundle, line, ...): parsed = bundle.commands.lookup(line) if parsed is not None: # slash命令 → 走 CommandHandler result = await command.handler(args, context) # 可能触发 prompt 提交或 pending continuation else: # 普通对话 → 直接送 engine async for event in bundle.engine.submit_message(line): await render_event(event) # UI层渲染每个事件 bundle.session_backend.save_snapshot(...) # 自动存档SystemPrompt-AI看到的第一段话
System Prompt 不是一段写死的文本,而是在build_runtime_system_prompt()里动态拼装的:
# prompts/context.py 第46-120行 def build_runtime_system_prompt(settings, cwd, ...): sections = [ build_system_prompt(), # 基础prompt(BASE_SYSTEM_PROMPT + 环境信息) f"Effort: {settings.effort}", # 推理设置 ] # Skills列表 skills = load_skill_registry(cwd, ...) if skills: sections.append(skills_section) # CLAUDE.md项目指令 claude_md = load_claude_md_prompt(cwd) if claude_md: sections.append(claude_md) # Issue/PR上下文(如果有) if issue_file.exists(): sections.append(issue_content) # 记忆系统 if settings.memory.enabled: memory_section = load_memory_prompt(cwd) sections.append(memory_section) relevant = find_relevant_memories(query, cwd) sections.append(relevant_section) return "\n\n".join(sections)基础 prompt 本身就包含了环境信息——OS、Shell、日期、Git 分支等,由get_environment_info()自动探测。这样 AI 不用问"你是什么系统"就能直接给出正确的命令。
CLAUDE.md是放在项目根目录的一个文件,用户在里面写项目约定和偏好。它会被原样注入 System Prompt。这和 GitHub Copilot 的.github/copilot-instructions.md是一个思路。
总结
- Agent Loop 的核心就是一个 while 循环:模型说一句 → 有 tool_use 就执行 → 结果喂回去 → 继续
- 单工具顺序执行,多工具
asyncio.gather并发。分支的原因不是性能,是 UI 事件时序不同 - max_turns=200 是防御性的硬截断,auto-compact 在每轮循环前检查是否需要压缩上下文
- BaseTool 的 5 个契约方法构成了工具系统的基础,
is_read_only直接关联权限 - RuntimeBundle 是会话的"零件包",12 步装配顺序是单向依赖
- System Prompt 是动态拼装的:base + 环境 + skills + CLAUDE.md + 记忆