尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

Spring Boot 4.1:用 MongoDB JobRepository 把 Agent 长任务做成可恢复批处理

Spring Boot 4.1:用 MongoDB JobRepository 把 Agent 长任务做成可恢复批处理
📅 发布时间:2026/6/24 10:47:40

同步 Agent 适合“回答问题”,Batch Agent 适合“完成一批可恢复任务”。当 Agent 开始处理成百上千个 item,Spring Batch 的 JobRepository、Step、Retry、Skip、Restart 比手写循环更接近生产系统需要的运行时。

分享日期:2026-06-23
主题:Java / Spring Boot 4.1.0 / Spring Batch 6.0.4 / MongoDB JobRepository / Spring AI 2.0.0 / Agent Job / Spring Cloud 2025.1.2
版本背景:截至 2026-06-23,Spring 官方项目页显示 Spring Boot4.1.0、Spring AI2.0.0、Spring Cloud2025.1.2为当前稳定入口;Spring Boot 4.1 文档已经把 Spring Batch 的 JobRepository 存储列为 In-memory、JDBC、MongoDB 三类。

1. 为什么今天值得关注

前几天我们连续看了 Spring AI 2.0 的 MCP、Agent 治理、可观测性、评估、递归 Advisors、Spring Modulith 事件一致性和 JSpecify 空安全。今天适合补一块更贴近生产运行的问题:Agent 的长耗时任务,应该怎么可靠地跑完、重试、恢复和审计。

很多 Agent Demo 都是一次 HTTP 调用:

String answer = chatClient.prompt() .user(userInput) .tools(agentTools) .call() .content();

这适合问答、摘要、代码建议、轻量工具调用。一旦进入企业场景,任务经常变成长流程:

  • 扫描一批需求文档,逐个生成测试用例。
  • 给几万条工单做分类、补全、去重和风险标注。
  • 对知识库做增量切片、Embedding、索引和质量评估。
  • 让 Agent 调用多个内部系统,生成迁移计划或巡检报告。
  • 对失败样本进行二次评估、人工复核和补偿执行。

这类任务有几个共同特征:耗时长、分片多、失败不可避免、需要断点续跑、必须能追溯。它们不应该只靠一个同步 Controller、一个@Async方法或一段手写 while 循环撑住。

Spring Boot 4.1 对 Spring Batch + MongoDB 的自动配置支持,给 Java 团队提供了一个新选择:如果团队本来已经把 Agent 运行态数据、文档片段、工具调用记录、向量元数据或审计事件放在 MongoDB 里,现在可以把 Batch 元数据也放进同一类基础设施,而不是为了 JobRepository 额外引入一套关系型数据库。

一句话:Agent 的“思考”和“工具调用”可以由 Spring AI 管,Agent 的“长任务生命周期”应该交给 Spring Batch 这类可恢复运行时来管。Boot 4.1 的 MongoDB JobRepository 让这件事更容易落到现有 NoSQL 技术栈里。

2. 版本坐标与事实边界

今天这篇分享基于下面几个官方事实:

  • Spring Boot 4.1.0:当前 Spring Boot 项目页展示的稳定版本。Boot 4.1 Release Notes 明确新增 Spring Batch 使用 MongoDB 的自动配置,并提供新的 Batch MongoDB starter。
  • Spring Batch 6.0.4:当前 Spring Batch 参考文档展示的稳定文档版本。Batch 文档说明JobRepository用来持久化JobExecution、StepExecution等批处理元数据,并提供 JDBC 与 MongoDB 两类数据库实现。
  • Spring AI 2.0.0:当前 Spring AI 项目页展示的稳定版本。Spring AI 工具调用文档说明,模型只会请求工具调用,真正执行工具的是客户端应用;ChatClient中的工具调用生命周期由ToolCallingAdvisor和ToolCallingManager管理。
  • Spring Cloud 2025.1.2:当前 Spring Cloud 项目页展示的稳定版本,对应2025.1.xOakwood release train。Spring Cloud 项目页也提醒,Cloud 版本要按 Boot 版本兼容矩阵选择。

这里要分清两件事:

第一,MongoDB JobRepository 解决的是 Batch 元数据持久化,不是替代业务库、向量库或 Agent Memory。它记录的是作业实例、作业执行、步骤执行、执行上下文、状态、失败和重启信息。

第二,Spring Batch 不是让 Agent 变“智能”的框架。它解决的是长任务的工程问题:切分、事务、重试、跳过、恢复、并行、监控和审计。模型推理、工具定义、结构化输出、MCP、RAG 仍然属于 Spring AI 和业务层的职责。

3. MongoDB JobRepository 到底改变了什么

过去很多 Spring Batch 项目默认使用 JDBC JobRepository。这个选择成熟、稳定、易审计,也适合多数交易型系统。但 Agent 应用的运行态数据经常天然偏文档模型:

  • 文档切片、解析结果、Embedding 元数据。
  • 工具调用输入输出摘要。
  • 模型响应、评估分数、引用来源。
  • Agent 计划、步骤状态、人工复核备注。
  • 多租户、多模型、多版本 Prompt 的执行上下文。

这些对象不一定适合拆成很多关系表。团队如果已经用 MongoDB 保存 Agent 运行态数据,再为了 Batch 元数据单独维护一套关系数据库,架构会更重。

Boot 4.1 的意义是把这个选择变成一等自动配置入口:当应用使用 Spring Batch 并引入对应 MongoDB 支持后,Boot 可以帮你自动配置 MongoDB-backed JobRepository,并提供 schema 初始化配置。

概念上可以这样理解:

flowchart LR Api[Agent API] --> Launcher[JobLauncher] Launcher --> Job[Spring Batch Job] Job --> Step1[Read Work Items] Step1 --> Step2[Call Spring AI Agent] Step2 --> Step3[Persist Result] Job --> Repo[(MongoDB JobRepository)] Step2 --> Audit[(Agent Audit Store)] Step3 --> Domain[(Business Data / Vector Store)]

JobRepository不保存所有业务结果,它保存“任务跑到哪里了”。业务结果、审计摘要、向量数据、工具调用详情可以放在独立集合或独立系统里。这样才能避免 Batch 元数据集合被业务大对象撑爆。

4. 适合用 Batch 承载的 Agent 场景

不是所有 Agent 请求都要变成 Batch Job。下面这些场景更适合:

场景为什么适合 BatchAgent 负责什么
知识库增量入库文件多、步骤固定、失败可重试摘要、标签、切片质量判断
工单批量分类记录多、可分片、需要跳过坏数据意图识别、优先级判断、原因解释
代码仓库巡检耗时长、需要报告和断点续跑规则解释、修复建议、风险分级
数据迁移评估多源读取、结果可汇总字段映射建议、异常解释
Agent 评估回归样本集固定、指标可聚合生成回答、LLM-as-a-judge 评分

不太适合的场景:

  • 用户等在页面上的低延迟聊天。
  • 一次性的小工具调用。
  • 状态必须由外部分布式工作流系统统一编排的跨团队流程。
  • 强事务、强一致、必须同步返回的业务动作。

一个实用判断标准:如果你已经在讨论“失败后从第几条继续”“跑一半怎么停”“重试几次算失败”“怎么查看每个分片的状态”,那就不要只靠普通异步任务了,应该考虑 Batch、Workflow 或队列式任务运行时。

5. 最小配置思路

在 Boot 4.1 中,Spring Batch 文档展示的 JobRepository 存储类型包括 In-memory、JDBC 和 MongoDB。MongoDB schema 初始化可以通过配置打开:

spring: batch: data: mongodb: schema: initialize: true

生产环境建议谨慎使用自动初始化:

  • 本地开发、测试环境可以打开,减少环境准备成本。
  • 预发和生产更建议由变更脚本或平台流程创建集合和索引。
  • 如果需要完全控制 Batch 配置,可以使用@EnableBatchProcessing或继承DefaultBatchConfiguration,让 Boot 自动配置退让,再按 Spring Batch API 显式配置。

一个 Agent 批处理应用的依赖边界通常是:

<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-batch-data-mongodb</artifactId> </dependency> <dependency> <groupId>org.springframework.ai</groupId> <artifactId>spring-ai-starter-model-openai</artifactId> </dependency> </dependencies>

具体 artifact 名称以当前 Boot 4.1 依赖清单和 Initializr 生成结果为准。Release Notes 中提到的是新的 Batch MongoDB starter;项目落地时不要手写猜版本,优先让 Spring Boot BOM 管理依赖。

6. 把 Agent 调用放进 Step,而不是放进 Controller

常见错误是 Controller 里直接循环调用模型:

@PostMapping("/agent/import") String importDocuments(@RequestBody ImportRequest request) { for (String documentId : request.documentIds()) { agentService.analyze(documentId); } return "ok"; }

这段代码的问题不是不能跑,而是运行态不可治理:HTTP 超时、进程重启、部分失败、重复提交、审计补偿都会变复杂。更合理的形态是:Controller 只负责提交 Job,Batch Step 负责逐条处理。

@RestController class AgentJobController { private final JobLauncher jobLauncher; private final Job documentAnalysisJob; AgentJobController(JobLauncher jobLauncher, Job documentAnalysisJob) { this.jobLauncher = jobLauncher; this.documentAnalysisJob = documentAnalysisJob; } @PostMapping("/agent/jobs/document-analysis") JobExecutionResponse launch(@RequestBody DocumentAnalysisRequest request) throws Exception { JobParameters parameters = new JobParametersBuilder() .addString("tenantId", request.tenantId()) .addString("batchId", request.batchId()) .addLong("submittedAt", System.currentTimeMillis()) .toJobParameters(); JobExecution execution = jobLauncher.run(documentAnalysisJob, parameters); return new JobExecutionResponse(execution.getJobId(), execution.getStatus().name()); } }

Batch Job 则负责读取任务项、调用 Agent、写入结果:

@Bean Job documentAnalysisJob(JobRepository jobRepository, Step analyzeDocumentStep) { return new JobBuilder("documentAnalysisJob", jobRepository) .start(analyzeDocumentStep) .build(); } @Bean Step analyzeDocumentStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, ItemReader<DocumentTask> reader, ItemProcessor<DocumentTask, DocumentAnalysis> processor, ItemWriter<DocumentAnalysis> writer) { return new StepBuilder("analyzeDocumentStep", jobRepository) .<DocumentTask, DocumentAnalysis>chunk(20, transactionManager) .reader(reader) .processor(processor) .writer(writer) .faultTolerant() .retry(TransientAiException.class) .retryLimit(3) .skip(BadDocumentException.class) .skipLimit(100) .build(); }

这里的关键点是:Agent 调用不再是一个不可恢复的循环,而是 Batch 的ItemProcessor。每条数据的成功、失败、重试和跳过都能进入 Batch 生命周期。

7. Agent Processor 的边界设计

ItemProcessor里可以调用 Spring AI,但不要把所有逻辑都塞进 Prompt。建议把边界拆清楚:

class DocumentAgentProcessor implements ItemProcessor<DocumentTask, DocumentAnalysis> { private final ChatClient chatClient; private final DocumentRepository documentRepository; private final AgentAuditService auditService; DocumentAgentProcessor(ChatClient chatClient, DocumentRepository documentRepository, AgentAuditService auditService) { this.chatClient = chatClient; this.documentRepository = documentRepository; this.auditService = auditService; } @Override public DocumentAnalysis process(DocumentTask task) { DocumentContent content = documentRepository.load(task.documentId()) .orElseThrow(() -> new BadDocumentException(task.documentId())); AgentPromptInput input = AgentPromptInput.from(task, content); auditService.beforeCall(task.batchId(), task.documentId(), input.summary()); DocumentAnalysisResult result = chatClient.prompt() .system(""" You are a document analysis agent. Return risk level, topic tags, summary, and citations. """) .user(input.promptText()) .call() .entity(DocumentAnalysisResult.class); auditService.afterCall(task.batchId(), task.documentId(), result.auditSummary()); return DocumentAnalysis.from(task, result); } }

这个 Processor 应该遵守几条规则:

  1. 读业务数据要显式失败,不要让模型猜缺失字段。
  2. Prompt 输入要可摘要审计,但不要默认记录全文敏感内容。
  3. 模型输出要结构化,不要靠自然语言解析状态。
  4. 可重试异常和不可重试异常要分开。
  5. 写入业务结果要幂等,避免 Job 重启后重复产生副作用。

8. Tool Calling 与 Batch 重试不要互相打架

Spring AI 文档强调,模型只能请求工具调用,客户端应用负责执行工具并返回结果。ChatClient使用ToolCallingAdvisor管理工具调用生命周期。这对 Batch 场景很重要,因为会出现两层重试:

  • Agent 内部重试:模型输出格式不合法、工具调用失败、Advisor 触发重试。
  • Batch 外部重试:整个 item 处理失败,Step 对该 item 重新执行。

如果不设计边界,就可能重复调用高风险工具。例如发送通知、创建工单、扣减额度、提交审批。

建议把工具分三类:

工具类型是否允许 Batch 重试处理建议
只读查询可以超时、限流、临时网络错误可重试
可幂等写入可以但要有幂等键使用batchId + itemId + actionType作为幂等键
非幂等高风险动作不建议自动重试先生成计划,交给人工或独立审批流程执行

示例:让 Agent 只生成“建议动作”,不要直接执行动作:

public record RemediationPlan( String itemId, String riskLevel, List<String> proposedActions, boolean requiresHumanApproval) { }

对于批量 Agent 作业,默认策略应该是“模型产出结构化计划,系统按策略执行”,而不是“模型想到什么工具就直接执行什么工具”。

9. JobParameters 要可追溯、可重放、可去重

Batch 的JobParameters决定一个 JobInstance 的身份。Agent 长任务里不要只塞时间戳,否则每次提交都会变成全新作业,难以判断重复提交。

建议包含:

  • tenantId:租户或业务域。
  • batchId:业务批次 ID,来自上游提交。
  • agentProfile:Agent 配置版本,例如document-risk-v3。
  • promptVersion:Prompt 模板版本。
  • modelProfile:模型配置逻辑名,不一定直接暴露真实供应商。
  • inputSnapshot:输入数据快照 ID 或查询条件 hash。
  • submittedBy:提交人或系统来源。

示例:

JobParameters parameters = new JobParametersBuilder() .addString("tenantId", request.tenantId(), true) .addString("batchId", request.batchId(), true) .addString("agentProfile", "document-risk-v3", true) .addString("promptVersion", "2026-06-23", true) .addString("inputSnapshot", request.inputSnapshot(), true) .addLong("submittedAt", System.currentTimeMillis(), false) .toJobParameters();

这里最后一个布尔值表示参数是否用于识别 JobInstance。像submittedAt这类纯审计字段通常不应该参与身份识别,否则会破坏重复提交检测。

10. MongoDB 存储下的几个生产注意点

Spring Batch 文档提醒,MongoDB JobRepository 需要集合来保存批处理元数据,这些集合定义在 Spring Batch core jar 的schema-mongodb.jsonl中。文档还提醒 MongoDB 不推荐字段名里使用.,因此需要通过MongoTemplate的MappingMongoConverter做 map key 替换。

示例:

@Bean MongoTemplate mongoTemplate(MongoDatabaseFactory mongoDatabaseFactory) { MongoTemplate template = new MongoTemplate(mongoDatabaseFactory); MappingMongoConverter converter = (MappingMongoConverter) template.getConverter(); converter.setMapKeyDotReplacement("_"); return template; }

生产落地时再补几条:

  • Batch 元数据集合和业务结果集合分开命名、分开权限。
  • 给高频查询字段建立索引,例如 job name、status、create time、tenantId。
  • 限制 ExecutionContext 大小,不要把完整 Prompt、完整文档或大模型响应塞进去。
  • 对失败摘要、工具参数、模型输出做脱敏和保留期控制。
  • 对大批量并发任务压测 MongoDB 写入延迟,避免元数据写入成为瓶颈。
  • 明确备份恢复策略,因为 JobRepository 丢失会影响任务恢复语义。

MongoDB JobRepository 不是“随便存 JSON”的许可。Batch 元数据是运行时控制面,应该比普通业务日志更谨慎。

11. 可观测性:Batch 指标和 Agent 指标要能关联

Agent 长任务排障时,光看最终失败原因不够。建议建立三层观测:

第一层是 Batch 维度:

  • jobName
  • jobExecutionId
  • stepExecutionId
  • readCount
  • writeCount
  • skipCount
  • retryCount
  • status
  • durationMs

第二层是 Agent 维度:

  • conversationId
  • agentProfile
  • promptVersion
  • modelProfile
  • toolCallCount
  • inputTokens
  • outputTokens
  • estimatedCost
  • structuredOutputValid

第三层是业务维度:

  • tenantId
  • batchId
  • itemId
  • riskLevel
  • reviewRequired
  • resultVersion

关键是把jobExecutionId、stepExecutionId、batchId、itemId贯穿起来。否则 Batch 只能告诉你“第 327 条失败”,Agent 日志只能告诉你“某次模型调用失败”,两边对不上。

一个简单的审计对象可以这样设计:

public record AgentBatchAuditEvent( String tenantId, String batchId, Long jobExecutionId, Long stepExecutionId, String itemId, String agentProfile, String promptVersion, String modelProfile, int toolCallCount, boolean structuredOutputValid, String status, String errorCode, long durationMs) { }

注意这里仍然没有记录完整 Prompt 和完整回答。生产默认记录摘要、状态、引用 ID 和错误码,需要全文审计时再走受控采样或加密存储。

12. 和 Spring Cloud 的关系

Spring Cloud 不直接替你跑 Batch Job,但它对分布式 Agent 任务仍然有价值:

  • Spring Cloud Config 管理 Agent 配置、模型 profile、Prompt 版本开关。
  • Spring Cloud Gateway 提供统一入口、鉴权、限流和任务提交路由。
  • Spring Cloud Circuit Breaker 包住外部模型、MCP Server、业务工具服务。
  • Spring Cloud Stream 用于 Job 完成事件、人工复核事件、补偿事件的异步流转。
  • Spring Cloud Kubernetes 帮助在 Kubernetes 中治理配置、发现和滚动部署。

这里有一个版本提醒:Spring Cloud 项目页当前把2025.1.xOakwood 标为 Boot4.0.x代际。真实项目升级时不要只看“最新版本”,要以 Spring Cloud BOM、release notes、start.spring.io 生成结果和兼容矩阵为准。Boot4.1.0是否能直接搭配某个 Cloud train,需要以官方兼容说明为准。

13. 推荐落地架构

一个相对稳妥的 Agent Batch 架构可以这样分层:

flowchart TB UI[Admin UI / API Client] --> Gateway[Spring Cloud Gateway] Gateway --> JobApi[Spring Boot Agent Job API] JobApi --> Batch[Spring Batch JobLauncher] Batch --> Repo[(MongoDB JobRepository)] Batch --> Worker[Batch Step Worker] Worker --> AI[Spring AI ChatClient] AI --> Tools[Tool Calling / MCP Tools] Worker --> Result[(Business Result Store)] Worker --> Audit[(Audit Events)] Audit --> Observe[Metrics / Traces / Logs] Batch --> Event[Job Completion Event]

职责划分:

  • Gateway 负责入口治理。
  • Job API 负责提交、查询、停止、重启。
  • Spring Batch 负责任务生命周期。
  • Spring AI 负责模型调用、工具调用、结构化输出。
  • MongoDB JobRepository 负责批处理元数据。
  • 业务库负责结果数据。
  • 审计和可观测系统负责排障、成本和合规。

14. 迁移建议:从@Async到 Batch

如果你现在已经有一批@AsyncAgent 任务,不建议一次性重写全部。可以按风险迁移:

第一步:把任务入口改成提交batchId,不要同步等待结果。

第二步:把原来的循环拆成ItemReader、ItemProcessor、ItemWriter。

第三步:把失败分类成可重试、可跳过、必须终止三类。

第四步:把模型输出改成结构化 DTO,并为 DTO 做校验。

第五步:把工具调用副作用加幂等键。

第六步:打通 Batch 执行 ID 和 Agent 审计 ID。

第七步:再考虑并行 step、partition、远程 worker 或事件驱动补偿。

一开始不要急着做复杂分布式。先让单 JVM、单 JobRepository、可恢复、可查询、可审计跑稳。Agent 长任务最先需要的是确定性和可恢复性,不是更复杂的调度拓扑。

15. 今日结论

Spring Boot 4.1 的 MongoDB Spring Batch 支持,不是一个孤立的存储选项,而是对 Agent 工程很实用的一块拼图。

它适合解决的问题是:

  • Agent 长任务如何持久化执行状态。
  • 失败后如何重试、跳过、恢复。
  • 大批量样本如何分片处理。
  • 批量模型调用如何审计和计量。
  • NoSQL 技术栈下如何减少额外关系库依赖。

但它不应该被误用为:

  • Prompt 全文仓库。
  • 工具调用明细仓库。
  • 向量数据库。
  • 高风险业务动作的自动执行许可。
  • 替代工作流平台的万能调度器。

参考资料

  • Spring Boot 项目页:当前版本 4.1.0
  • Spring Boot 4.1 Release Notes:MongoDB Support for Spring Batch
  • Spring Boot Reference:Spring Batch
  • Spring Batch Reference:Configuring a JobRepository
  • Spring AI 项目页:当前版本 2.0.0
  • Spring AI Reference:Tool Calling
  • Spring Cloud 项目页:当前版本 2025.1.2 与兼容矩阵

相关新闻

  • Linux 中断处理:从硬件信号到软中断的全链路剖析
  • 小程序商城哪个平台好
  • WarcraftHelper魔兽争霸III终极优化工具:3步解锁现代游戏体验完整指南

最新新闻

  • 移动开发中的工程伦理实践:从隐私保护到算法公平
  • 基于事件触发与神经网络的无人机机械臂自适应控制方案
  • NestJS模块化架构实战:DDD+AI驱动的学生画像系统设计
  • 基于LLM与多平台策略的社交媒体献血请求智能识别与响应系统设计
  • Vue3前端AI Agent实战:浏览器内运行WASM模型的智能开发助手
  • 神经网络量化训练:挑战、原理与LOTION框架

日新闻

  • 终极指南:如何用shadPS4在电脑上免费畅玩PS4游戏
  • 打造个性化Instagram Clone:主题定制与用户体验优化技巧
  • 未来展望:RoseTTAFold-All-Atom的发展路线图与社区支持资源汇总

周新闻

  • Visual C++运行库修复终极指南:5分钟快速解决Windows软件启动错误
  • 手把手教你构建统计局地区经济数据爬虫:从环境搭建到数据持久化全指南
  • 2026多Agent深度解析:用AI团队替代单一模型,四种架构实战落地

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号