尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

【从0到1构建一个ClaudeAgent】并

【从0到1构建一个ClaudeAgent】并
📅 发布时间:2026/6/24 4:22:27

有些操作很慢,Agent 不能干等着。例如长时间编译/构建:make,mvn compile,gradle build或大数据处理:hadoop,spark-submit等的一些工作

Java实现代码

java

public class BackgroundTasksSystem { // --- 配置 --- private static final Path WORKDIR = Paths.get(System.getProperty("user.dir")); private static final Gson gson = new GsonBuilder().setPrettyPrinting().create(); // --- 后台任务管理器 --- static class BackgroundManager { // 任务存储 private final Map<String, TaskInfo> tasks = new ConcurrentHashMap<>(); // 通知队列 private final Queue<TaskNotification> notificationQueue = new ConcurrentLinkedQueue<>(); // 任务 ID 生成器 private final AtomicInteger taskIdCounter = new AtomicInteger(1); // 锁 private final Object lock = new Object(); static class TaskInfo { String taskId; String status; // running, completed, timeout, error String result; String command; long startTime; Thread thread; // 关联的执行线程 } static class TaskNotification { String taskId; String status; String command; String result; } /** * 启动后台任务 * 立即返回任务 ID,不等待命令完成 */ public String run(String command) { String taskId = "task_" + taskIdCounter.getAndIncrement(); TaskInfo task = new TaskInfo(taskId, command); tasks.put(taskId, task); // 创建并启动后台线程 Thread thread = new Thread(() -> executeTask(task), "BackgroundTask-" + taskId); thread.setDaemon(true); task.thread = thread; thread.start(); // 立即返回,不阻塞 return String.format("Background task %s started: %s", taskId, command.substring(0, Math.min(command.length(), 80))); } /** * 线程目标:执行子进程,捕获输出,推送结果到队列 */ private void executeTask(TaskInfo task) { String output; String status; try { ProcessBuilder pb = new ProcessBuilder("bash", "-c", task.command); pb.directory(WORKDIR.toFile()); pb.redirectErrorStream(true); Process process = pb.start(); boolean finished = process.waitFor(300, TimeUnit.SECONDS); // 5分钟超时 if (!finished) { process.destroy(); output = "Error: Timeout (300s)"; status = "timeout"; } else { output = new String(process.getInputStream().readAllBytes()).trim(); status = "completed"; } } catch (Exception e) { output = "Error: " + e.getMessage(); status = "error"; } // 更新任务状态 task.status = status; task.result = output.isEmpty() ? "(no output)" : output.substring(0, Math.min(output.length(), 50000)); // 添加通知到队列 synchronized (lock) { notificationQueue.offer(new TaskNotification( task.taskId, status, task.command.substring(0, Math.min(task.command.length(), 80)), task.result.substring(0, Math.min(task.result.length(), 500)) )); } } /** * 检查任务状态 * 如果指定 taskId,检查单个任务;否则列出所有任务 */ public String check(String taskId) { if (taskId != null && !taskId.isEmpty()) { TaskInfo task = tasks.get(taskId); if (task == null) { return "Error: Unknown task " + taskId; } return String.format("[%s] %s\n%s", task.status, task.command.substring(0, Math.min(task.command.length(), 60)), task.result != null ? task.result : "(running)"); } else { StringBuilder sb = new StringBuilder(); for (Map.Entry<String, TaskInfo> entry : tasks.entrySet()) { TaskInfo task = entry.getValue(); sb.append(String.format("%s: [%s] %s\n", task.taskId, task.status, task.command.substring(0, Math.min(task.command.length(), 60)))); } return sb.length() > 0 ? sb.toString().trim() : "No background tasks."; } } /** * 清空通知队列并返回所有待处理的通知 */ public List<TaskNotification> drainNotifications() { synchronized (lock) { List<TaskNotification> notifications = new ArrayList<>(); while (!notificationQueue.isEmpty()) { notifications.add(notificationQueue.poll()); } return notifications; } } /** * 获取所有任务 */ public Map<String, TaskInfo> getAllTasks() { return new HashMap<>(tasks); } } // 初始化后台管理器 private static final BackgroundManager BG_MANAGER = new BackgroundManager(); // --- 工具枚举 --- public enum ToolType { BASH("bash", "Run a shell command (blocking)."), READ_FILE("read_file", "Read file contents."), WRITE_FILE("write_file", "Write content to file."), EDIT_FILE("edit_file", "Replace exact text in file."), BACKGROUND_RUN("background_run", "Run command in background thread. Returns task_id immediately."), // 新增 CHECK_BACKGROUND("check_background", "Check background task status. Omit task_id to list all."); // 新增 public final String name; public final String description; ToolType(String name, String description) { this.name = name; this.description = description; } } // --- 工具处理器映射 --- private static final Map<String, ToolExecutor> TOOL_HANDLERS = new HashMap<>(); static { // ... 省略基础工具注册 // 后台任务工具 TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args -> { String command = (String) args.get("command"); return BG_MANAGER.run(command); }); TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args -> { String taskId = (String) args.get("task_id"); return BG_MANAGER.check(taskId); }); } // ... 省略相同的工具实现 // --- Agent 主循环(集成后台任务通知)--- public static void agentLoop(List<Map<String, Object>> messages) { while (true) { try { // 在 LLM 调用前检查后台通知 List<BackgroundManager.TaskNotification> notifications = BG_MANAGER.drainNotifications(); if (!notifications.isEmpty() && !messages.isEmpty()) { StringBuilder notifText = new StringBuilder(); notifText.append("<background-results>\n"); for (BackgroundManager.TaskNotification notif : notifications) { notifText.append(String.format("[bg:%s] %s: %s\n", notif.taskId, notif.status, notif.result)); } notifText.append("</background-results>"); messages.add(Map.of( "role", "user", "content", notifText.toString() )); messages.add(Map.of( "role", "assistant", "content", "Noted background results." )); // 异步结果注入:将后台任务结果插入到对话中 // 结构化格式:用XML标签包裹,便于LLM解析 } // 显示当前活动任务 Map<String, BackgroundManager.TaskInfo> activeTasks = BG_MANAGER.getAllTasks(); int runningTasks = (int) activeTasks.values().stream() .filter(t -> "running".equals(t.status)) .count(); if (runningTasks > 0) { System.out.printf("[Active background tasks: %d]\n", runningTasks); } // ... 省略相同的 LLM 调用和工具执行逻辑 } catch (Exception e) { System.err.println("Error in agent loop: " + e.getMessage()); e.printStackTrace(); return; } } } }

这段代码引入了后台任务系统,解决了 Agent 在执行长时间任务时的阻塞问题

关键洞察:Agent 可以在命令执行时继续工作,而不是被阻塞。

异步任务处理架构

核心思想:从同步阻塞的任务执行升级为异步非阻塞的并发处理,让Agent能够同时处理多个耗时任务,实现"并行计算"能力,大幅提升效率和响应性。

java

// 后台任务管理器 - 异步执行引擎 static class BackgroundManager { // 任务存储 private final Map<String, TaskInfo> tasks = new ConcurrentHashMap<>(); // 通知队列 private final Queue<TaskNotification> notificationQueue = new ConcurrentLinkedQueue<>(); // 任务 ID 生成器 private final AtomicInteger taskIdCounter = new AtomicInteger(1); // 并发安全:使用线程安全集合 // 异步通信:通过队列传递任务结果 // 唯一标识:自动生成任务ID }
  • 解耦执行:任务提交和执行分离,立即返回控制权
  • 并发管理:多个后台任务可以同时运行
  • 结果异步收集:通过队列机制收集完成的任务结果
  • 线程安全:使用并发集合确保多线程安全

任务信息结构设计

java

// 任务信息实体 static class TaskInfo { String taskId; // 唯一标识 String status; // 状态:running, completed, timeout, error String result; // 执行结果 String command; // 执行的命令 long startTime; // 开始时间 Thread thread; // 关联的执行线程 // 完整状态跟踪:从启动到完成的全生命周期 // 线程关联:可以控制或监控执行线程 // 时间戳:支持超时和性能分析 } // 任务通知实体 static class TaskNotification { String taskId; String status; String command; String result; // 轻量传输:只包含必要信息 // 结构化:易于解析和处理 // 结果截断:避免过大的通知消息 }
  • 状态驱动:明确的任务状态生命周期
  • 结果持久:任务结果可以多次查询
  • 线程管理:可以跟踪和控制执行线程
  • 事件驱动:通过通知机制传递完成事件

异步任务启动机制

java

/** * 启动后台任务 * 立即返回任务 ID,不等待命令完成 */ public String run(String command) { String taskId = "task_" + taskIdCounter.getAndIncrement(); TaskInfo task = new TaskInfo(taskId, command); tasks.put(taskId, task); // 创建并启动后台线程 Thread thread = new Thread(() -> executeTask(task), "BackgroundTask-" + taskId); thread.setDaemon(true); // 守护线程,不会阻止JVM退出 task.thread = thread; thread.start(); // 立即返回,不阻塞调用者 return String.format("Background task %s started: %s", taskId, command.substring(0, Math.min(command.length(), 80))); // 异步启动:立即返回任务ID,不等待命令完成 // 守护线程:不会阻止程序正常退出 // 线程命名:便于调试和监控 }
  • 立即返回:不阻塞主线程,立即返回控制权
  • 守护线程:后台任务不会阻止JVM退出
  • 资源管理:线程自动清理,避免内存泄漏
  • 友好反馈:返回任务ID和简化的命令描述

任务执行与结果收集

java

/** * 线程目标:执行子进程,捕获输出,推送结果到队列 */ private void executeTask(TaskInfo task) { String output; String status; try { ProcessBuilder pb = new ProcessBuilder("bash", "-c", task.command); pb.directory(WORKDIR.toFile()); pb.redirectErrorStream(true); Process process = pb.start(); boolean finished = process.waitFor(300, TimeUnit.SECONDS); // 5分钟超时 if (!finished) { process.destroy(); output = "Error: Timeout (300s)"; status = "timeout"; } else { output = new String(process.getInputStream().readAllBytes()).trim(); status = "completed"; } } catch (Exception e) { output = "Error: " + e.getMessage(); status = "error"; } // 更新任务状态 task.status = status; task.result = output.isEmpty() ? "(no output)" : output.substring(0, Math.min(output.length(), 50000)); // 添加通知到队列 synchronized (lock) { notificationQueue.offer(new TaskNotification( task.taskId, status, task.command.substring(0, Math.min(task.command.length(), 80)), task.result.substring(0, Math.min(task.result.length(), 500)) )); } }
  • 超时保护:防止长时间运行的任务阻塞
  • 异常安全:全面捕获执行异常
  • 内存管理:截断大结果,避免内存溢出
  • 事件驱动:完成后立即通知主线程

智能通知注入机制

java

// 在 LLM 调用前检查后台通知 List<BackgroundManager.TaskNotification> notifications = BG_MANAGER.drainNotifications(); if (!notifications.isEmpty() && !messages.isEmpty()) { StringBuilder notifText = new StringBuilder(); notifText.append("<background-results>\n"); for (BackgroundManager.TaskNotification notif : notifications) { notifText.append(String.format("[bg:%s] %s: %s\n", notif.taskId, notif.status, notif.result)); } notifText.append("</background-results>"); messages.add(Map.of( "role", "user", "content", notifText.toString() )); messages.add(Map.of( "role", "assistant", "content", "Noted background results." )); // 自动注入:自动将后台结果插入到对话中 // 结构化格式:XML标签明确标识内容类型 // 对话完整:添加assistant确认,保持对话结构 // 时机智能:在LLM调用前插入,确保LLM能看到最新结果 }
  • 自动同步:后台结果自动同步到主对话
  • 结构化格式:便于LLM识别和解析
  • 对话集成:无缝集成到现有对话流
  • 时机优化:在决策前注入,确保信息及时性

工具集成架构

java

// 后台任务工具集 public enum ToolType { BACKGROUND_RUN("background_run", "Run command in background thread. Returns task_id immediately."), CHECK_BACKGROUND("check_background", "Check background task status. Omit task_id to list all."); // 异步执行:立即返回,不阻塞 // 状态查询:支持单个和批量查询 // 语义清晰:工具名明确表示异步特性 } // 工具处理器映射 TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args -> { String command = (String) args.get("command"); return BG_MANAGER.run(command); // 委托执行:将命令转交给后台管理器 // 立即返回:不等待任务完成 }); TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args -> { String taskId = (String) args.get("task_id"); return BG_MANAGER.check(taskId); // 灵活查询:支持单任务详查和列表概览 });
  • 接口统一:与同步工具相同的调用方式
  • 异步语义:工具名明确区分同步/异步
  • 灵活查询:支持多种查询方式
  • 无缝集成:与现有工具系统完全兼容

架构演进与价值

从 ContextCompactSystem 到 BackgroundTasksSystem 的升级:

维度ContextCompactSystemBackgroundTasksSystem
执行模式同步串行异步并行
吞吐量一次一个任务并发多个任务
响应性阻塞等待立即响应
资源利用单线程多线程并发
任务类型短任务为主长短任务混合

相关新闻

  • UVa 571 Jugs
  • SMUDebugTool终极指南:免费开源AMD Ryzen处理器调试工具完全教程
  • AI写专著必备攻略:4款工具助力,快速生成20万字专业专著!

最新新闻

  • 发现 VS Code 的隐藏宝藏:7 个你或许不知道却能让效率翻倍的功能
  • 【工具优化】Windows工具MobaXterm_Personal_20.3解除最多保存14个Session的限制_20260505
  • 如何让喜欢的角色住进桌面?5分钟快速上手DyberPet桌宠系统
  • 项目实训博客(四)从Vulkan到D3D12:注入与拦截架构演变
  • Lexical富文本编辑器图片处理全攻略:从拖拽上传到智能裁剪的完整方案
  • 实时消息传递_azure-messaging-webpubsubservice-py

日新闻

  • 终极指南:如何用shadPS4在电脑上免费畅玩PS4游戏
  • 打造个性化Instagram Clone:主题定制与用户体验优化技巧
  • 未来展望:RoseTTAFold-All-Atom的发展路线图与社区支持资源汇总

周新闻

  • Visual C++运行库修复终极指南:5分钟快速解决Windows软件启动错误
  • 手把手教你构建统计局地区经济数据爬虫:从环境搭建到数据持久化全指南
  • 2026多Agent深度解析:用AI团队替代单一模型,四种架构实战落地

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号