大家好,我是树哥。本文将从一个简单的例子开始,解释为什么会有ForkJoinPool。然后介绍ForkJoinPool的基本信息和使用,最后讲解ForkJoinPool的基本原理。诞生原因对于线程池,我们经常使用ThreadPoolExecutor,它可以用来提高任务处理效率。一般来说,我们在使用ThreadPoolExecutor的时候,任务之间是没有联系的。但是在一些特殊情况下,我们处理的任务之间是有联系的,比如经典的斐波那契算法就是其中一种情况。对于斐波那契数列,我们知道F(N)=F(N-1)+F(N-2)。当前值的结果取决于以下值的结果。这个时候如果使用ThreadPoolExecutor,好像是不能解决问题的。虽然我们可以使用单线程的递归算法,但是它的计算速度慢,而且不能进行并行计算,不能发挥CPU多核的优势。ForkJoinPool旨在解决父子任务存在依赖的并行计算问题。快速排序、二分查找、集合运算等父子依赖的并行计算问题,都可以用ForkJoinPool来解决。对于斐波那契数列问题,如果使用ForkJoinPool实现,实现代码为:@Slf4jpublicclassForkJoinDemo{//1.Runentrypublicstaticvoidmain(String[]args){intn=20;//为了跟踪子线程的名称,需要重写ForkJoinWorkerThreadFactory方法worker.setName("我的线程"+worker.getPoolIndex()worker);返回};//创建分治任务线程池,可以追溯到线程名ForkJoinPoolforkJoinPool=newForkJoinPool(4,factory,null,false);//快速创建一个ForkJoinPool方法//ForkJoinPoolforkJoinPool=newForkJoinPool(4);//创建分而治之任务Fibonaccifibonacci=newFibonacci(n);//调用invoke方法启动分治任务Integerresult=forkJoinPool.invoke(fibonacci);log.info("斐波那契{}的结果是{}",n,result);}}//2.定义拆分任务,编写拆分逻辑@Slf4jclassFibonacciextendsRecursiveTask{finalintn;斐波那契(intn){this.n=n;}@OverridepublicIntegercompute(){//类似于递归,定义最小可计算单元if(n<=1){returnn;}//如果想查看子线程名的输出,可以打开下方注释//log.info(Thread.currentThread().getName());斐波那契f1=新斐波那契(n-1);//拆分成子线程Taskf1.fork();斐波那契f2=新斐波那契(n-2);//f1.join等待子任务执行结果returnf2.compute()+f1.join();}}如上代码所示,我们定义了一个Fibonacci类,它继承了RecursiveTask抽象类。在Fibonacci类中,我们定义了拆分逻辑,调用join()等待子线程的执行结果。运行程序,我们会得到如下结果:17:29:10.336[main]INFOtech.shuyi.javacodechip.forkjoinpool.ForkJoinDemo-TheresultofFibonacci20is6765上面代码中提到的fork()和join()为ForkJoinPool提供的API接口,主要用于执行任务和等待子线程的结果。关于它的详细用法,我们以后再说。ForkJoinPool除了用来处理父子任务有依赖的情况,还可以用来处理需要获取子任务执行结果的场景。例如:我们要计算1到1亿的总和。为了加快计算速度,我们很自然地想到了算法中的分治法则,将1亿个数分成10000个任务,每个任务计算10000个值的合成。利用CPU的并发计算性能,缩短计算时间。因为ThreadPoolExecutor也可以通过Future获取执行结果,所以ThreadPoolExecutor也是可行的。这时候我们有两种实现方式,一种是用ThreadPoolExecutor实现,一种是用ForkJoinPool实现。下面我们分别实现这两种方法,看看这两种实现方式的区别。不管是哪种实现方式,大体的思路是:根据线程池中线程的个数N,将1亿个数分成N等份,然后丢入线程池中进行计算。每个计算任务都使用Future接口获取计算结果,最后累加起来。我们首先使用ThreadPoolExecutor实现。首先,定义一个Calculator接口来表示计算数字总和的动作,如下所示。publicinterfaceCalculator{/***对传入的所有数字求和**@paramnumbers*@returnsum*/longsumUp(long[]numbers);}接下来,我们定义一个使用ThreadPoolExecutor类实现的线程池,如下图.包tech.shuyi.javacodechip.forkjoinpool;导入java.util.ArrayList;导入java.util.List;导入java.util.concurrent.Callable;导入java.util.concurrent.ExecutorService;导入java.util.concurrent.Executors;importjava.util.concurrent.Future;publicclassExecutorServiceCalculatorimplementsCalculator{privateintparallism;私有ExecutorService池;publicExecutorServiceCalculator(){//CPU的核心数默认就用cpu核心数了parallism=Runtime.getRuntime().availableProcessors();pool=Executors.newFixedThreadPool(parallism);}//1.处理计算任务的线路privatestaticclassSumTaskimplementsCallable{privatelong[]numbers;私人诠释来自;私人的;publicSumTask(long[]numbers,intfrom,intto){this.numbers=numbers;这个。从=从;这个.to=to;}@OverridepublicLongcall(){longtotal=0;for(inti=from;i<=to;i++){total+=numbers[i];}返回总数;}}//2.核心业务逻辑的实现@OverridepubliclongsumUp(long[]numbers){List>results=newArrayList<>();//2.1Numbersplitting//将任务分解成n个部分,交给n个线程处理。4个核心分为4个部分。//然后每个Throw一个SumTask线程进行处理intpart=numbers.length/parallelism;for(inti=0;if:results){尝试{total+=f.get();}赶上(除了ionignore){}}返回总计;}}如上面代码所示,我们实现了一个计算单个任务的类SumTask,在这个类中进行数值累加。其次,在sumUp()方法中,将1亿个数进行拆分,然后丢给线程池进行计算,最后阻塞等待计算结果,最后累加起来。当我们运行上面的代码,就可以顺利的得到最终的结果,如下图。耗时:10ms结果为:50000005000000然后我们使用ForkJoinPool来实现。我们首先实现SumTask继承RecursiveTask抽象类,在compute()方法中定义拆分逻辑和计算。最后在sumUp()方法中调用pool方法进行计算,代码如下。公共类ForkJoinCalculator实现计算器{privateForkJoinPool池;//1.定义计算逻辑privatestaticclassSumTaskextendsRecursiveTask{privatelong[]numbers;私人诠释来自;私人的;publicSumTask(long[]numbers,intfrom,intto){this.numbers=numbers;这个。从=从;这个.to=to;}//该方法是ForkJoin的核心方法:任务的拆分决定效率@OverrideprotectedLongcompute(){//当要计算的数字个数小于6时,直接使用for循环的方法计算结果if(to-from<6){longtotal=0;for(inti=from;i<=to;i++){total+=numbers[i];}返回总数;}else{//否则,将任务一分为二,递归拆分(注意这里有递归)需要多少个点看具体情况intmiddle=(from+to)/2;SumTasktaskLeft=newSumTask(numbers,from,middle);SumTasktaskRight=newSumTask(numbers,middle+1,to);taskLeft.fork();taskRight.fork();返回taskLeft.join()+taskRight.join();}}}publicForkJoinCalculator(){//你也可以使用公共线程池ForkJoinPool.commonPool()://pool=ForkJoinPool.commonPool()pool=newForkJoinPool();}@OverridepubliclongsumUp(long[]numbers){Longresult=pool.invoke(newSumTask(numbers,0,numbers.length-1));池。关闭();返回结果;}}运行上面的代码,结果为:耗时:860ms结果为:50000005000000对比ThreadPoolExecutor和ForkJoinPool的实现,可以发现它们都有任务拆分的逻辑和最后合并值的逻辑.但是,与ThreadPoolExecutor相比,ForkJoinPool做了一些实现封装,例如:不再手动获取子任务的结果,而是使用join()方法直接获取结果。任务拆分的逻辑封装在RecursiveTask实现类中,没有暴露出来。因此ForkJoinPool也可以用于不依赖父子任务但又想获取子任务执行结果的并行计算任务。这种情况下使用ForkJoinPool更方便代码实现,封装性更好。使用指南使用ForkJoinPool进行并行计算,主要分为两步:定义RecursiveTask或RecursiveAction的任务子类。初始化线程池和计算任务,丢入线程池处理,获取处理结果。首先我们需要定义一个RecursiveTask或者RecursiveAction的子类,然后在这个类的compute()方法中定义拆分逻辑和计算逻辑。这两个抽象类的区别在于前者有返回值,后者没有返回值。比如上面提到的1到1亿的叠加问题,定义的RecursiveTask实现类SumTask的代码如下:私人诠释来自;私人的;publicSumTask(long[]numbers,intfrom,intto){this.numbers=numbers;这个。从=从;这个.to=to;}@OverrideprotectedLongcompute(){//1.定义拆分退出逻辑if(to-from<6){longtotal=0;for(inti=from;i<=to;i++){total+=numbers[i];}返回总数;}else{//2.定义计算逻辑intmiddle=(from+to)/2;SumTasktaskLeft=newSumTask(numbers,from,middle);SumTasktaskRight=newSumTask(numbers,middle+1,to);taskLeft.fork();taskRight.fork();返回taskLeft.join()+taskRight.join();}}}对于compute()方法的实现,核心是想清楚:如何拆分成子任务?分裂什么时候结束?接下来,初始化ForkJoinPool线程池,初始化计算任务,最后将任务丢入线程池]numbers){Longresult=pool.invoke(newSumTask(numbers,0,numbers.length-1));池。关闭();returnresult;}通过以上两步,我们就完成了一个ForkJoinPool任务代码的编写。原理分析ForkJoinPool的设计思想是一种分而治之的算法,即不断将任务拆分(fork)成更小的任务,最后将各个任务的计算结果合并(join)。这样可以充分利用CPU资源,结合工作窃取算法(worksteal)提高整体执行效率。其简单流程如下图所示:图片来自思府用户“日工一兵”。从图中可以看出,ForkJoinPool必须先完成子任务,才能执行上层任务。所以ForkJoinPool最适合父子任务相互依赖的场景,其次是需要获取子任务执行结果的场景。例如:斐波那契数列、快速排序、二分查找等。源码实现了ForkJoinPool的主要实现类:ForkJoinPool和ForkJoinTask抽象类。ForkJoinTask实现了Future接口,可以用来获取处理结果。ForkJoinTask有两个抽象子类:RecursiveAction和RecursiveTask抽象类。不同的是前者没有返回值,后者有返回值。其类图如下所示。图片来自四福网友“日工一兵”。ForkJoinPool是具体的逻辑实现。由于暂时没有应用场景,了解的不是很深,这里就不深入分析了。有兴趣的朋友可以参考这篇文章:ForkJoinPool大型图文站(读到底vs直接收藏)——SegmentFault觉得不行。窃取算法我们知道ForkJoinPool的父子任务之间存在依赖关系,那么ForkJoinPool是如何实现的呢?答案是:使用不同的任务队列来执行。在ForkJoinPool中,有一个数组成员变量workQueue[],它对应一个队列数组,每个队列对应一个消费者线程。丢入线程池的任务按照特定的规则进行转发。图片来自四福网友“日工一兵”。于是就有了一个问题:有的队列可能任务多,有的队列任务少。这样会导致不同线程的负载不同,整体效率不高。我应该怎么办?答案是:使用窃取算法,空闲线程从尾部开始消费其他队列的任务。正常情况下,线程以LIFO(LastInputFirstOutput)的方式获取自己队列中的任务,类似于栈的操作方式。如下图所示,先把任务入队时,将任务推入队头(top),消费时再pop出队头(top)。图片来自Sifu用户“日工一兵”当某个线程对应的队列空闲时,该线程会去队列的底部(base)偷取(poll)任务到自己的队列,然后消费他们。那么问题来了:为什么不从最上面(top)获取任务,而是从最下面(base)获取任务呢?那是为了避免冲突!如果两个线程同时从顶层获取任务,就会出现多线程冲突,需要进行加锁操作,从而降低执行效率。参考资料线程池ForkJoinPool介绍-老K的Java博客ForkJoinPool大型图文站(读到底vs直接收藏)-SegmentFault思考(2篇)介绍ForkJoinPool的适用场景和实现原理_想运行,小胖子的博客-CSDN博客_forkjoinpoolForkJoinPool使用场景?ARLOOR