1。什么是Fork/JoinOracle官方的定义是:Fork/Join框架是一个实现了ExecutorService接口的多线程处理器。它可以将一个大任务拆分成若干个小任务并发执行,充分利用可用资源,提高应用程序的执行效率。Fork/Join实现了ExecutorService,所以它的任务也需要在线程池中执行。它的不同之处在于它使用了工作窃取算法,空闲线程可以从满载线程中窃取任务来帮助执行。(我个人对workstealing的理解是:由于线程池中的每个线程都有一个队列,线程之间互不影响。那么线程每次都从自己的任务队列的头部取一个任务。如果在某个时刻,一个线程的任务队列为空,而其他线程的任务队列中有任务,那么这个线程就会从其他线程的任务队列中取出一个任务来帮助执行。这就像偷了别人的工作一样)的Fork/Join框架的核心是继承自AbstractExecutorService的ForkJoinPool类,它保证了工作窃取算法和ForkJoinTask的正常工作。下面是引用Oracle官方定义的译文:fork/join框架是ExecutorService接口的实现,可帮助您利用多个处理器。它是为可以递归地分解成更小的部分的工作而设计的。目标是使用所有可用的处理能力来增强应用程序的性能。与任何ExecutorService实现一样,fork/join框架将任务分发给线程池中的工作线程。fork/join框架与众不同,因为它使用工作窃取算法。无事可做的工作线程可以从其他仍然忙碌的线程中窃取任务。fork/join框架的中心是ForkJoinPool类,它是AbstractExecutorService类的扩展。ForkJoinPool实现了核心工作窃取算法,可以执行ForkJoinTaskprocesses.2。Fork/Join的基础使用方法(1)Fork/Join基础类上文已经提到,Fork/Join就是要讲一个大的任务分解成若干小的任务,所以***步骤当然是要做任务的分解,大方法如下:if(这个任务足够小){执行要做的任务}else{将任将任务分成两个小部分,分别执行两个小部分,等待执行结果}实现FrokJoinTask,需要一个继承RecursiveTask或者RecursiveAction的基类,将上面的代码根据我们自己的业务情况RecursiveTask和RecursiveAction都继承了FrokJoinTask,它们的区别在于RecursiveTask有返回值而RecursiveAction没有。下面是我做的一个选择字符串列表中仍然有“a”的元素的Demo:@OverrideprotectedListcompute(){//当end和start的差值小于阈值时,开始真正的筛选if(end-this.starttemp=list.subList(this.start,end);returntemp.parallelStream().filter(s->s.contains("a")).collect(Collectors.toList());}else{//如果end和start的差值大于阈值//分解大任务分成两个小任务。intmiddle=(this.start+end)/2;ForkJoinTestleft=newForkJoinTest(list,this.start,middle,threshold);ForkJoinTestright=newForkJoinTest(list,middle,end,threshold);//并行执行两个“小任务”left.fork();right.fork();//合并两个“小任务”的结果Listjoin=left.join();join.addAll(right.join());returnjoin;}}(2)基类就绪后,执行类就可以开始调用了。调用时,我们首先需要Fork/Join线程池ForkJoinPool,然后向线程池提交一个ForkJoinTask,得到结果。ForkJoinPool的submit方法的入参是一个ForkJoinTask,返回值也是一个ForkJoinTask,提供了get方法获取执行结果。代码如下:ForkJoinPoolpool=newForkJoinPool();//提交可分解的ForkJoinTask任务ForkJoinTask>future=pool.submit(forkJoinService);System.out.println(future.get());//关闭线程池pool.shutdown();这样,我们就完成了一个简单的Fork/Join的开发。提示:Java8中java.util.Arrays的parallelSort()方法和java.util.streams包中封装的方法也使用了Fork/Join。(细心的读者可能已经注意到,我在Fork/Join中也使用了stream,所以这个Fork/Join其实是多余的,因为stream已经实现了Fork/Join,但这只是一个demo展示,对任何没有用处没关系)引用官方文本:JavaSE8中引入的一个这样的实现被java.util.Arrays类用于其parallelSort()方法。这些方法类似于sort(),但通过fork/join框架利用并发性。在多处理器系统上运行时,大型数组的并行排序比顺序排序更快。fork/join框架的另一个实现由java.util.streams包中的方法使用,它是计划用于JavaSE8版本的ProjectLambda的一部分。附上完整代码供以后参考:1.定义一个抽象类(为了扩展,在本例中没有实际作用,所以不需要定义这个类):importjava.util.concurrent.RecursiveTask;/***Description:ForkJoinInterface*Designer:jack*Date:2017/8/3*Version:1.0.0*/publicabstractclassForkJoinServiceextendsRecursiveTask{@OverrideprotectedabstractTcompute();}2.定义基类importjava.util。List;importjava.util.stream.Collectors;/***说明:ForkJoin基类*Designer:jack*Date:2017/8/3*Version:1.0.0*/publicclassForkJoinTestextendsForkJoinService>{privatestaticForkJoinTestforkJoinTest;privateintthreshold;//阈值privateListlist;//待拆分ListprivateForkJoinTest(Listlist,intthreshold){this.list=list;this.threshold=threshold;}@OverrideprotectedListcompute(){//当end和start的差值小于阈值时,开始实际筛选if(list.size()s.contains("a")).collect(Collectors.toList());}else{//如果在end和start之间时difference大于阈值,将大任务分解为两个小任务intmiddle=list.size()/2;ListleftList=list.subList(0,middle);ListrightList=list.subList(middle,list.size());ForkJoinTestleft=newForkJoinTest(leftList,threshold);ForkJoinTestright=newForkJoinTest(rightList,threshold);//并行执行两个“小任务”left.fork();right.fork();//合并两个“小任务”的结果Listjoin=left.join();join.addAll(right.join());returnjoin;}}/***获取ForkJoinTest实例*@paramlist待处理List*@paramthresholdthreshold*@returnForkJoinTest实例*/publicstaticForkJoinService>getInstance(Listlist,intthreshold){if(forkJoinTest==null){synchronized(ForkJoinTest.class){if(forkJoinTest==null){forkJoinTest=newForkJoinTest(list,threshold);}}}returnforkJoinTest;}}3。执行类/***说明:fork/Join执行类*Designer:jack*Date:2017/8/3*Version:1.0.0*/publicclassTest{publicstaticvoidmain(Stringargs[])throwsExecutionException,InterruptedException{String[]strings={"a","ah","b","ba","ab","ac","sd","fd","ar","te","se","te","sdr","gdf","df","fg""gh","oa","ah","qwe","re??","ty","ui"};ListstringList=newArrayList<>(Arrays.asList(strings));ForkJoinPoolpool=newForkJoinPool();ForkJoinService>forkJoinService=ForkJoinTest.getInstance(stringList,20);//提供可解析的ForkJoinTask任务ForkJoinTask>future=pool.submit(forkJoinService);System.out.println(future.get());//关闭线程池pool.shutdown();}}