文章目录1. 概述2. 应用场景与核心价值2.1 典型应用场景2.2 核心价值3. 启用前置条件4. 确定性与一致性重放规范4.1 重放运行逻辑4.2 强制封装操作类型4.3 编码设计准则5. 持久化配置方式5.1 编译阶段配置5.2 执行阶段配置6. 业务编码规范与示例优化6.1 不推荐写法6.2 规范优化写法7. 工作流恢复机制7.1 三大恢复应用场景7.2 异常恢复代码示例7.3 恢复起始规则8. 生产最佳实践9. 长耗时任务实战案例10. 文档总结1. 概述持久化执行是工作流核心运行保障技术流程运行至关键节点时自动留存执行进度、状态数据与节点位置信息支持任务暂停、异常中断、服务重启后从断点位置接续运行无需重复执行历史步骤。Spring AI Alibaba Graph依托内置持久化层与检查点机制自动保存每一步运行状态。无论故障类型为系统宕机、网络波动、LLM调用超时或是人工介入暂停流程均可依据历史快照精准恢复任务执行。功能开启提示项目中配置检查点器后即可自动启用持久化执行能力全程支持随时暂停、异常重试、断点续跑。2. 应用场景与核心价值2.1 典型应用场景人在回路交互流程暂停等待人工审核、数据修正、指令确认完成干预后继续流转长周期运行任务批量数据处理、多轮智能问答、复杂业务编排等耗时任务异常容错处理第三方接口超时、程序报错、服务重启等意外中断场景2.2 核心价值断点续执依托历史快照恢复流程规避重复运算节约系统资源超长时效留存间隔数日仍可正常恢复执行适配离线业务场景执行链路溯源完整记录运行轨迹支持状态回溯与问题排查高可靠运行抵御各类中断故障保障业务流程最终闭环灵活人工干预预留操作入口满足人工校验、业务调整需求3. 启用前置条件使用持久化执行功能必须满足三项基础要求配置检查点器注册状态存储实例负责快照数据持久化保存绑定唯一线程标识通过threadId区分独立任务实例隔离不同流程运行数据遵循确定性重放规则特殊业务操作统一封装至节点内部保障恢复执行结果一致4. 确定性与一致性重放规范4.1 重放运行逻辑任务恢复时不会从代码中断行继续而是以中断节点起始位置为起点依次重放后续所有流程步骤因此业务代码必须保证可稳定重复执行。4.2 强制封装操作类型以下操作必须独立封装至专属节点禁止零散编写非确定性操作随机数、时间戳、唯一标识生成类逻辑副作用操作API调用、文件读写、数据库增删改、消息推送等外部交互逻辑高耗时操作大模型调用、海量数据运算、第三方服务请求4.3 编码设计准则拆分独立职责单个节点仅处理单一业务逻辑多类副作用操作拆分为多个节点隔离不确定逻辑非固定结果代码统一收纳在节点内执行结果存入全局状态实现操作幂等性重复执行操作不会产生脏数据、重复订单、冗余文件等异常结果5. 持久化配置方式5.1 编译阶段配置构建编译图时注册检查点存储策略全局定义持久化规则importcom.alibaba.cloud.ai.graph.CompileConfig;importcom.alibaba.cloud.ai.graph.CompiledGraph;importcom.alibaba.cloud.ai.graph.checkpoint.config.SaverConfig;importcom.alibaba.cloud.ai.graph.checkpoint.savers.MemorySaver;SaverConfigsaverConfigSaverConfig.builder().register(newMemorySaver()).build();CompiledGraphgraphstateGraph.compile(CompileConfig.builder().saverConfig(saverConfig).build());5.2 执行阶段配置任务发起时指定唯一线程ID绑定当前流程实例importcom.alibaba.cloud.ai.graph.CompiledGraph;importcom.alibaba.cloud.ai.graph.RunnableConfig;importjava.util.Map;importjava.util.UUID;RunnableConfigconfigRunnableConfig.builder().threadId(long-running-task-UUID.randomUUID()).build();graph.invoke(inputData,config);6. 业务编码规范与示例优化6.1 不推荐写法副作用操作直接内嵌节点任务恢复时会重复发起请求引发业务异常varcallApinode_async(state-{Stringurl(String)state.value(url).orElse();HttpClientclientHttpClient.newHttpClient();HttpRequestrequestHttpRequest.newBuilder().uri(URI.create(url)).build();HttpResponseStringresponseclient.send(request,HttpResponse.BodyHandlers.ofString());returnMap.of(result,response.body().substring(0,Math.min(100,response.body().length())));});6.2 规范优化写法抽取通用业务服务节点仅负责状态流转与逻辑编排满足重放与幂等要求importcom.alibaba.cloud.ai.graph.StateGraph;importstaticcom.alibaba.cloud.ai.graph.action.AsyncNodeAction.node_async;importjava.net.http.*;importjava.net.URI;importjava.util.*;importjava.util.stream.Collectors;// 通用HTTP请求服务统一管理外部调用classHttpRequestService{privatefinalHttpClientclientHttpClient.newHttpClient();publicStringmakeRequest(Stringurl)throwsException{HttpRequestrequestHttpRequest.newBuilder().uri(URI.create(url)).build();HttpResponseStringresponseclient.send(request,HttpResponse.BodyHandlers.ofString());returnresponse.body().substring(0,Math.min(100,response.body().length()));}}// 定义状态更新策略KeyStrategyFactorykeyStrategyFactory()-{MapString,KeyStrategykeyStrategyMapnewHashMap();keyStrategyMap.put(urls,newReplaceStrategy());keyStrategyMap.put(results,newAppendStrategy());returnkeyStrategyMap;};// 标准化业务节点varcallApinode_async(state-{ListStringurls(ListString)state.value(urls).orElse(List.of());HttpRequestServicehttpServicenewHttpRequestService();ListStringresultsurls.stream().map(httpService::makeRequest).collect(Collectors.toList());returnMap.of(results,results);});// 构建并运行状态图StateGraphstateGraphnewStateGraph(keyStrategyFactory).addNode(call_api,callApi).addEdge(StateGraph.START,call_api).addEdge(call_api,StateGraph.END);7. 工作流恢复机制7.1 三大恢复应用场景人工暂停恢复流程手动中断修改状态参数后重启执行故障异常恢复接口报错、程序异常后复用线程标识重试任务服务重启恢复进程关闭重启依据历史快照接续业务流程7.2 异常恢复代码示例importcom.alibaba.cloud.ai.graph.RunnableConfig;importcom.alibaba.cloud.ai.graph.CompiledGraph;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;LoggerloggerLoggerFactory.getLogger(RecoveryDemo.class);StringthreadIderror-recovery-thread;RunnableConfigconfigRunnableConfig.builder().threadId(threadId).build();try{// 首次执行任务graph.invoke(inputData,config);}catch(Exceptione){logger.error(任务执行异常启动断点恢复,e);// 传入空参数自动读取历史状态继续运行graph.invoke(null,config);}7.3 恢复起始规则普通节点中断从当前异常节点起始位置恢复执行子流程中断回溯至调用子图的父节点重新运行快照生成时机单个节点全部逻辑执行完毕后才会保存检查点8. 生产最佳实践节点单一职责拆分复杂业务精简单个节点逻辑便于故障定位与重试严格保障幂等性外部交互接口、数据写入操作兼容重复执行规避业务隐患精简状态数据仅存储序列化基础数据剔除大文件、网络连接等无效对象完善异常捕获节点内部拦截处理报错避免整体流程无故终止分片处理海量任务大批量数据拆分批次执行每批次完成留存快照区分存储环境测试环境使用MemorySaver生产环境替换Redis、数据库持久化方案完备日志监控记录节点流转、状态变更信息跟踪任务整体运行进度9. 长耗时任务实战案例适用于批量数据清洗、离线统计、文档批量解析等场景支持中断后无缝续跑importcom.alibaba.cloud.ai.graph.*;importcom.alibaba.cloud.ai.graph.action.AsyncNodeAction;importcom.alibaba.cloud.ai.graph.action.EdgeAction;importcom.alibaba.cloud.ai.graph.checkpoint.config.SaverConfig;importcom.alibaba.cloud.ai.graph.checkpoint.savers.MemorySaver;importjava.util.*;importjava.util.stream.IntStream;// 状态策略配置KeyStrategyFactorykeyStrategyFactory()-{MapString,KeyStrategykeyStrategyMapnewHashMap();keyStrategyMap.put(items,newReplaceStrategy());keyStrategyMap.put(processedCount,newReplaceStrategy());keyStrategyMap.put(results,newAppendStrategy());returnkeyStrategyMap;};// 分批数据处理节点varprocessDataAsyncNodeAction.node_async(state-{ListStringitems(ListString)state.value(items).orElse(List.of());intprocessedCount(int)state.value(processedCount).orElse(0);intbatchSize100;intendMath.min(processedCountbatchSize,items.size());ListStringbatchitems.subList(processedCount,end);ListStringprocessedResultsbatch.stream().map(item-Processed: item).toList();returnMap.of(processedCount,end,results,processedResults);});// 循环终止判断条件varcheckCompleteEdgeAction.edge_async(state-{intprocessedCount(int)state.value(processedCount).orElse(0);ListStringitems(ListString)state.value(items).orElse(List.of());returnprocessedCountitems.size()?StateGraph.END:process_data;});// 组装状态图StateGraphstateGraphnewStateGraph(keyStrategyFactory).addNode(process_data,processData).addEdge(StateGraph.START,process_data).addConditionalEdges(process_data,checkComplete,Map.of(StateGraph.END,StateGraph.END,process_data,process_data));// 配置持久化与编译SaverConfigsaverConfigSaverConfig.builder().register(newMemorySaver()).build();CompiledGraphgraphstateGraph.compile(CompileConfig.builder().saverConfig(saverConfig).build());// 启动长任务StringtaskThreadIdlong-running-task-UUID.randomUUID();RunnableConfigconfigRunnableConfig.builder().threadId(taskThreadId).build();ListStringlargeDataSetIntStream.range(0,10000).mapToObj(i-Item-i).collect(Collectors.toList());graph.invoke(Map.of(items,largeDataSet,processedCount,0),config);10. 文档总结持久化执行是Spring AI Alibaba Graph实现企业级稳定业务编排的核心能力以检查点快照为数据载体依托线程ID实现任务隔离配合确定性重放机制保障恢复结果无误。该能力完美适配人工交互、异常容错、超长任务三类核心业务场景开发过程中只需完成检查点配置、线程标识绑定、副作用节点封装三项基础操作即可让工作流具备断点续跑、故障恢复、状态溯源的工业级运行能力。