什么是Fork/Join框架?Fork/Join框架是JDK7引入的线程池,用于并行执行。将一个大任务拆分成多个小任务并行执行,最后将每个小任务结果聚合SpecialTask,得到大任务结果。从它的命名也很容易看出,该框架主要分为两个阶段:Fork和Join。第一阶段,Fork是将一个大任务拆分成多个子任务并行执行。第二阶段Join就是合并这些子任务的所有执行。如此一来,大任务的结果终于得到了。这里不难发现它执行的主要流程:首先判断一个任务是否足够小,如果任务足够小,直接计算,否则,分成几个更小的任务,分别计算,这个过程可以被反复分成一系列的小任务。Fork/Join框架是一种分而治之的算法。通过将一个大任务拆分成多个独立的小任务,这些小任务并行执行,最后合并小任务的结果,得到大任务的最终结果。提高效率。Fork/Join框架使用示例下面我们通过一个计算列表中所有元素之和的例子来了解一下Fork/Join框架是如何使用的。大致思路是:把列表分成很多子列表,然后计算每个子列表的元素,然后,我们把这些值全部相加,得到原始列表的总和。Fork/Join框架中定义了ForkJoinTask,代表一个Fork/Join任务,它提供fork()和join()等操作。通常,我们不需要直接继承这个ForkJoinTask类,而是使用框架提供的两个。ForkJoinTask的子类:RecursiveAction用于表示不返回结果的Fork/Join任务。RecursiveTask用于表示返回结果的Fork/Join任务。显然,在这个例子中,需要返回结果。您可以定义SumAction类继承自RecursiveTask。代码如下:/***@authormghio*@since2021-07-25*/publicclassSumTaskextendsRecursiveTask{privatestaticfinalintSEQUENTIAL_THRESHOLD=50;私人最终列表<长>数据;publicSumTask(Listdata){this.data=data;}@OverrideprotectedLongcompute(){if(data.size()<=SEQUENTIAL_THRESHOLD){longsum=computeSumDirectly();System.out.format("%s的总和:%d\n",data.toString(),sum);返回总和;}else{intmid=data.size()/2;SumTaskfirstSubtask=newSumTask(data.subList(0,mid));SumTasksecondSubtask=newSumTask(data.subList(mid,data.size()));//先执行子任务Subtask.fork();secondSubtask.fork();//等待子任务完成并得到结果longfirstSubTaskResult=firstSubtask.join();longsecondSubTaskResult=secondSubtask.join();返回firstSubTaskResult+secondSubTaskResult;}}privatelongcomputeSumDirectly(){longsum=0;for(Longl:data){sum+=l;}返回总和;}publicstaticvoidmain(String[]args){Randomrandom=newRandom();Listdata=random.longs(1_000,1,100).boxed().collect(Collectors.toList());ForkJoinPoolpool=newForkJoinPool();SumTasktask=newSumTask(数据);pool.invoke(任务);System.out.println("总和:"+pool.invoke(task));}}这里,当list的size小于SEQUENTIAL_THRESHOLD变量的值(threshold)时,视为小任务,直接计算list元素求和结果,否则重新划分小任务,运行结果如下:通过这段示例代码可以发现,Fork/Join框架中的ForkJoinTask任务与通常的通用任务的主要区别在于ForkJoinTask需要实现抽象方法compute()来定义计算逻辑。该方法中大致的实现模板是先判断当前任务是否为小任务,如果是则执行该任务,如果不是小任务则再次拆分为两个子任务,然后当每个子任务调用fork()方法,会再次进入compute()方法检查当前任务是否需要拆分为子任务,如果已经是小任务则执行当前任务并返回结果,否则继续拆分,最后调用join()方法等待所有子任务完成并得到执行结果伪代码如下:if(problemissmall){directlysolveproblem.}else{Step1.将问题拆分成独立的部分。步骤2.fork新的子任务来解决每个部分。步骤3.加入所有子任务。Step4.composeresultfromsubresults.}Fork/Join框架设计Fork/Join框架的核心思想是将一个大任务拆分成若干个小任务,然后将各个小任务的结果进行聚合,最终得到结果的大任务。如果你设计这样一个框架,你会如何实现它?(建议想一想),Fork/Join框架的整个流程,顾名思义,分为两步:大任务切分需要这样一个类,用于将大任务拆分成子任务,以及可能一次拆分最终的子任务还是比较大,需要拆分多次,直到拆分的子任务满足我们定义的小任务。执行任务并合并任务结果第一步拆分出来的子任务存放在双端队列中(P.S.这里为什么使用双端队列请看下文),然后每个队列启动一个线程从队列中获取任务实现.这些子任务的执行结果会放在一个统一的队列中,然后会启动一个线程从这个队列中获取数据,最后将数据合并返回。Fork/Join框架使用下面两个类来完成上面的两个步骤:ForkJoinTask类在上面的例子中也提到了,它代表了ForkJoin任务。使用框架时,首先要先定义任务,通常只需要从ForkJoinTask类继承一个RecursiveAction(无返回结果)或RecursiveTask(有返回结果)的子类就足够了。ForkJoinPool从名字上也能猜出一二,就是用来执行ForkJoinTask的线程池。从大任务中拆分出来的子任务会加入到当前线程的双端队列的头部。如果你喜欢思考,你的脑海中一定会有这样的场景。当我们需要完成一个大任务的时候,我们会先把这个大任务拆分成多个独立的子任务,这些子任务会分别放在不同的队列中。并且为每个队列创建一个单独的线程来执行队列中的任务,即线程和队列之间是一对一的关系,那么当有的线程可能先完成自己队列中的任务,而一些线程没有完成执行,导致一些完成任务的线程等待。这是一个很好的问题。既然是做并发,就必须最大限度地发挥计算机的性能。对于这种场景,并发大师DougLea使用了工作窃取算法来应对。使用工作窃取算法后,先完成自己队列中任务的线程会转到其他线程的队列中。在“偷”任务执行的过程中,哈哈,一方有难,各方都会支持。但是此时线程和持有队列的线程会同时访问同一个队列,所以为了减少窃取任务的线程和窃取任务的线程之间的竞争,ForkJoin选择了数据结构双端队列的,这样它就可以按照这个规则执行任务:被偷任务的线程总是从队头获取任务并执行,而偷任务的线程使用获取到的任务从队列的尾部开始执行。在大多数情况下,该算法可以充分利用多线程进行并行计算,但在双端队列中只有一个任务等极端情况下,仍然会存在一定程度的竞争。Fork/Join框架实现原理Fork/Join框架的核心是ForkJoinPool类。此类的重要组件是ForkJoinTask数组和ForkJoinWorkerThread数组。执行这些任务。任务有以下四种状态:NORMALCompletedCANCELLEDCancelledSIGNALsignalEXCEPTIONAL发生异常我们来看看这两个类的核心方法的实现原理。首先,让我们看一下ForkJoinTask的fork()方法。源码如下:方法为ForkJoinWorkerThread类型线程会先调用ForkJoinWorkerThread的workQueue的push()方法异步执行任务,然后立即返回结果。继续跟进ForkJoinPool的push()方法,源码如下:该方法将当前任务添加到ForkJoinTask任务队列数组中,然后调用ForkJoinPool的signalWork方法创建或唤醒一个工作线程执行任务。然后我们看一下ForkJoinTask的join()方法。方法源码如下:方法首先调用doJoin()方法,返回当前任务的状态,根据返回的任务状态进行不同的处理:完成状态直接返回结果。如果状态为取消,则直接抛出异常(CancellationException)。如果出现异常状态,则直接抛出相应的异常继续跟进doJoin()方法。方法源码如下:该方法首先判断当前任务状态是否已经执行,执行完成则直接返回任务。状态。如果没有执行完成,则从任务数组(workQueue)中取出任务执行,如果任务执行完成则设置任务状态为NORMAL。如果发生异常,则记录异常并将任务状态设置为EXCEPTIONAL(在doExec()方法中)。总结本文主要介绍Java并发框架中Fork/Join框架的基本原理、工作窃取算法(work-stealing)、设计方法和部分实现源码。JDK官方标准库中也使用了Fork/Join框架。例如JDK1.8+标准库提供的Arrays.parallelSort(array)可以进行并行排序。它的原理是通过Fork/Join框架在内部对大数组进行并行排序,可以提高排序的速度,而集合中Collection.parallelStream()方法的底层也是基于Fork/Join框架实现的.最后,定义小任务的门槛往往只有通过测试验证才能给出,并保证程序能够达到最佳性能。