尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

JUC高并发编程—Fork / Join

JUC高并发编程—Fork / Join
📅 发布时间:2026/6/20 19:54:34

一、Fork/Join 解决什么问题?

1. 传统 ThreadPoolExecutor 缺点

普通线程池:任务提交到阻塞队列,线程从队列抢任务,任务粒度不能拆分。 如果一个超大耗时任务(比如遍历 100 万条数据求和)丢给单一线程,其他线程全部空闲,CPU 利用率极低,无法充分多核并行。

2. Fork/Join 核心思想:分而治之

类比归并排序:

  1. Fork(拆分):大任务拆成若干小任务,直到小任务足够简单不用再拆;
  2. Join(合并):等待所有子小任务执行完成,收集所有子结果汇总成最终结果。

一句话通俗理解: 大活拆成一堆小活分给多个人干,所有人干完把结果汇总,最大化利用多核 CPU。

3. 适用场景 vs 不适用场景

✅ 适合:CPU 密集型、可递归拆分、计算量大

  • 大数据分段求和、数组归并排序、树形递归遍历、海量数据统计 ❌ 不适合:IO 密集型(数据库 / 网络等待),线程阻塞会严重降低效率

二、Fork/Join 底层核心原理(重中之重)

1. 核心类结构

plaintext

ExecutorService 线程池顶层接口 ↑ ForkJoinPool :Fork/Join专属线程池(替代ThreadPoolExecutor) ↓ ForkJoinTask<V> :任务抽象类(不能直接new) ├ RecursiveTask<V> :有返回值的拆分任务(最常用) └ RecursiveAction :无返回值,只执行逻辑不汇总结果

2. 最关键机制:工作窃取 Work-Stealing(ForkJoinPool 灵魂)

普通线程池:全局共享 1 个队列,多线程竞争锁抢任务,空闲线程只能等待。 ForkJoinPool:

  1. 每个工作线程自带双端队列 Deque;
  2. 线程自己 fork 拆分出来的子任务,放到自己队列队尾;
  3. 线程优先从自己队列尾部取任务执行;
  4. 如果某个线程自己队列为空(干完了),会去其他忙碌线程的队列头部偷任务执行;

优势:

  • 减少锁竞争,性能更高;
  • 不会出现部分线程忙死、部分线程闲置,CPU 充分打满;
  • 双端队列:自己取尾部,别人偷头部,互不冲突。

3. Fork、Join 底层流程

  1. fork():把拆分后的子任务提交到当前线程的本地双端队列,异步执行;
  2. join():阻塞等待当前子任务完成,并获取任务返回结果;
    • 如果子任务还没执行,当前线程会去偷其他任务执行,不会空等浪费 CPU;
  3. 递归拆分阈值(重点):必须设置阈值,否则无限递归拆分栈溢出 例:数组长度小于 1000 就不拆,直接串行计算;大于 1000 继续二分拆分。

4. ForkJoinPool 线程池参数

java

运行

public ForkJoinPool( int parallelism, // 并行度,默认CPU核心数,代表最大并发线程数 ForkJoinWorkerThreadFactory factory, // 线程工厂 UncaughtExceptionHandler handler, // 异常处理器 boolean asyncMode // 是否异步模式:true只从队列头部取,不用于递归合并 )

日常开发直接用默认构造new ForkJoinPool()即可,并行度等于 CPU 核心数。

三、两大任务类区分(RecursiveTask / RecursiveAction)

1. RecursiveTask<V> 有返回值

需要拆分计算后汇总结果:求和、求最大值、数据统计 重写compute()方法,返回泛型结果

2. RecursiveAction 无返回值

只做遍历、写入、打印,不需要合并结果:批量文件处理、批量数据更新 重写compute(),返回 void

四、完整代码示例 1:RecursiveTask 数组分段求和(最经典)

需求:超大数组 1~100000 求和,递归二分拆分,最后合并所有分段结果

java

运行

import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; // 拆分求和任务:有返回值,继承RecursiveTask class SumTask extends RecursiveTask<Long> { // 数组、计算区间 [start, end] private final long[] arr; private final int start; private final int end; // 阈值:区间长度小于该值不再拆分,直接计算 private static final int THRESHOLD = 1000; public SumTask(long[] arr, int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Long compute() { // 1. 区间长度小于阈值,直接串行计算,终止递归 if (end - start <= THRESHOLD) { long sum = 0; for (int i = start; i <= end; i++) { sum += arr[i]; } return sum; } // 2. 区间过大,二分拆分 int mid = (start + end) / 2; SumTask leftTask = new SumTask(arr, start, mid); SumTask rightTask = new SumTask(arr, mid + 1, end); // fork:异步提交子任务到本地队列 leftTask.fork(); rightTask.fork(); // join:阻塞获取两个子任务结果 Long leftSum = leftTask.join(); Long rightSum = rightTask.join(); // 合并结果返回 return leftSum + rightSum; } } public class ForkJoinSumDemo { public static void main(String[] args) { // 构造10万长度数组,存放1~100000 long[] arr = new long[100000]; for (int i = 0; i < arr.length; i++) { arr[i] = i + 1; } // 创建ForkJoin线程池 ForkJoinPool pool = new ForkJoinPool(); // 提交根任务 SumTask rootTask = new SumTask(arr, 0, arr.length - 1); Long total = pool.invoke(rootTask); System.out.println("1~100000总和 = " + total); // 校验结果:公式 n*(n+1)/2 System.out.println("公式计算校验 = " + 100000L * 100001 / 2); pool.shutdown(); } }

执行流程拆解

  1. 根任务区间 0~99999,远超阈值 1000,拆左右两半;
  2. 两半各自继续 fork 拆分,直到所有子任务区间≤1000;
  3. 空闲线程自动窃取其他线程未执行的子任务;
  4. 底层子任务全部计算完成后,逐层 join 向上汇总;
  5. 最终得到全局总和。

五、代码示例 2:RecursiveAction 无返回值批量遍历打印

场景:只遍历处理数据,不需要汇总结果,继承 RecursiveAction

java

运行

import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; class PrintTask extends RecursiveAction { private final int start; private final int end; private static final int THRESHOLD = 20; public PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { // 区间小,直接打印 if (end - start <= THRESHOLD) { for (int i = start; i <= end; i++) { System.out.print(Thread.currentThread().getName() + "-" + i + " "); } System.out.println(); return; } // 拆分 int mid = (start + end) / 2; PrintTask left = new PrintTask(start, mid); PrintTask right = new PrintTask(mid + 1, end); // 两个子任务异步执行 left.fork(); right.compute(); // 优化:一个fork,一个当前线程直接执行,减少队列压力 left.join(); } } public class ForkJoinActionDemo { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); PrintTask task = new PrintTask(1, 200); pool.invoke(task); pool.shutdown(); } }

小优化技巧

不要全部都调用fork(),可以一个 fork、一个直接调用compute(): 减少队列任务数量,降低内存开销,性能更好。

六、关键 API 方法详解

1. 任务方法(ForkJoinTask)

  1. fork()将子任务提交到当前工作线程的本地双端队列,异步执行,无返回值。
  2. join()阻塞等待任务完成,返回计算结果;线程阻塞时会自动窃取其他任务,不浪费 CPU。
  3. invoke()同步执行当前任务,内部自动完成 fork+join,主线程调用阻塞等待最终结果(最常用提交方式)。
  4. invokeAll(task1,task2)批量 fork 多个任务,等价于多个 fork+join 组合。

2. ForkJoinPool 提交任务三种方式

  1. invoke(task):同步阻塞,直接返回任务结果(递归计算首选)
  2. submit(task):异步提交,返回 ForkJoinTask,后续可 join/get 获取结果
  3. execute(task):异步无返回,类似普通线程池 execute

3. get () 和 join () 核心区别

  • join():只能抛出未检查异常(RuntimeException),递归拆分场景专用;
  • get():继承 Future 方法,会抛出受检异常InterruptedException、ExecutionException,需要 try-catch 捕获; 递归分治代码优先使用join(),代码更简洁。

七、工作窃取机制演示对比(通俗举例)

假设 CPU4 核,4 条线程 T1/T2/T3/T4

  1. T1 分到超大任务,不断 fork 拆分,本地队列堆满子任务;
  2. T2/T3/T4 很快干完自己的小任务,队列为空;
  3. T2 去偷 T1 队列头部的子任务执行,T3、T4 同理;
  4. 所有 CPU 核心同时干活,不会出现资源闲置。

普通线程池做不到:如果 T1 持有大任务,其他线程只能空等全局队列,无法拆分窃取。

八、Fork/Join 典型业务应用场景

场景 1:大规模数值计算(最主流)

数组求和、求最大 / 最小值、矩阵运算、统计学批量计算。

场景 2:分治类算法并行化

归并排序、快速排序、二分查找并行优化。

场景 3:树形结构递归遍历

文件夹递归遍历、树形菜单批量数据处理、树节点统计。

场景 4:海量内存数据批量处理

千万级内存集合过滤、转换、聚合(非数据库 IO)。

九、ForkJoinPool 与 ThreadPoolExecutor 对比

表格

特性ForkJoinPool普通 ThreadPoolExecutor
任务模型支持递归拆分(fork/join)任务不可拆分,只能整体执行
队列模型每个线程独立双端队列,工作窃取全局共享单阻塞队列,竞争锁
适用负载CPU 密集、可分治计算IO 密集、独立零散小任务
线程等待join 阻塞时自动偷任务,无空闲get 阻塞线程原地等待,CPU 空转
结果合并原生支持子任务结果汇总需要手动维护集合汇总 Future

十、使用注意事项(避坑重点)

1. 必须设置合理阈值 THRESHOLD

阈值太小:拆分层级极深,创建大量任务对象,GC 压力大; 阈值太大:拆分太少,并发度不足,多核利用率低; 经验值:单任务串行执行 1~5ms 左右最合适。

2. 禁止在 ForkJoin 任务中执行 IO 阻塞

如果子任务出现数据库 / 网络阻塞,对应工作线程被占死,无法窃取任务,并行效率大幅下降。IO 场景使用普通线程池。

3. 避免无限递归拆分

区间划分逻辑出错会无限 fork,直接栈溢出 StackOverflowError。

4. 线程池尽量复用,不要频繁新建

ForkJoinPool 创建开销大,全局单例复用;不要在循环里 new ForkJoinPool。

5. JDK8 Stream 底层就是 ForkJoin

并行流list.parallelStream()底层使用 ForkJoinPool 实现分治并行,本质是对 Fork/Join 框架的封装。 示例:

// 并行流求和,底层ForkJoin long sum = LongStream.rangeClosed(1, 100000).parallel().sum();

十一、整体核心总结

  1. 核心设计:分治算法 + 工作窃取双端队列,专门优化多核 CPU 密集计算;
  2. 两大任务:RecursiveTask(有返回合并)、RecursiveAction(无返回纯执行);
  3. 核心方法:fork 拆分任务、join 阻塞合并结果、invoke 同步执行;
  4. 优势:线程空闲时自动窃取任务,CPU 利用率拉满;
  5. 局限:只适合纯内存计算,IO 阻塞场景不推荐,优先普通线程池。

相关新闻

  • 2026深圳全屋定制避坑指南:跑了6家店,这家轻高定让我直接签了合同 - 爱格研究所
  • 如何快速实现专业级音频转文字:免费开源智能字幕生成工具完整指南
  • 2026年6月最新真力时中国官方售后电话热线客服地址服务网点 - 亨得利官方服务中心

最新新闻

  • GPT-4o架构解析:低延迟语音与原生多模态统一建模
  • xray被动扫描器实战指南:从安装配置到精准漏洞挖掘
  • 2026合肥理工学校官方最全招生简章|办学详情、管理模式、升学数据、报名入口全公布 - 教育为先
  • 如何获得赞助:Instagram、活动赞助及其他赞助
  • 鸣潮自动化工具终极指南:基于YOLOv8图像识别的智能辅助解决方案
  • 2026帝王宫海鲜加工饭店排行榜:内行推荐这5家 - 官方资讯

日新闻

  • 信任的进化:技术实现详解——如何用JavaScript构建博弈论模拟器
  • Terrakube自定义工作流:如何集成OPA、Infracost等工具扩展IaC能力
  • grunt-concurrent快速入门:5分钟学会并行运行Grunt任务

周新闻

  • 3步解锁iOS设备:applera1n激活锁绕过完全指南
  • 39 2026 人工智能证书终极盘点,普通人选 AI 证书可以从这些方向入手
  • Redis 暴露公网有多危险?从端口检查到补救步骤

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号