当前位置: 首页 > 后端技术 > Java

教你用Java7的Fork-Join框架开发高并发程序

时间:2023-04-01 20:15:16 Java

教你使用Java7的Fork/Join框架开发高并发程序将任务分成若干个小任务,将每个小任务的结果汇总后得出最终结果。本文分享自华为云社区《【高并发】如何使用Java7提供的Fork/Join框架实现高并发程序?》,作者:冰河。Fork/Join框架位于J.U.C(java.util.concurrent)中。它是Java7中提供的用于执行并行任务的框架。它可以将一个大任务分解成若干个小任务,最后汇总每个小任务的结果。最终结果。基本思想类似于Hadoop的MapReduce思想。主要使用work-stealing算法(一个线程从其他队列中窃取任务执行)。为什么并行分治计算中的工作窃取策略需要使用工作窃取算法?如果我们需要做一个比较大的任务,我们可以把这个任务分成几个独立的子任务。为了减少线程之间的竞争,我们把这些子任务放到不同的队列中,每个队列创建一个队列。创建一个单独的线程来执行队列中的任务。线程与队列一一对应。比如线程A负责处理队列A中的任务,但是有的线程会先完成自己队列中的任务,而其他线程对应的队列中还有任务等待处理。与其等待,已经完成工作的线程还不如帮助其他线程工作,于是它去其他线程的队列中偷一个任务来执行。这时候他们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常使用双端队列,被窃取任务线程总是从头部开始执行任务的双端队列。窃取任务的线程总是从双端队列的尾部开始执行任务。工作窃取算法的优点是充分利用线程进行并行计算,减少线程间的竞争。工作窃取算法的缺点是在某些情况下仍然存在竞争,例如双端队列中只有一个任务时。而且算法会消耗更多的系统资源,比如创建多个线程,多个双端队列。Fork/Join框架的局限对于Fork/Join框架,当一个任务正在等待它使用Join操作创建的子任务结束时,执行这个任务的工作线程会寻找其他未执行的任务并开始执行这些未执行的任务。以线程充分利用其运行时间来提高应用程序性能的方式执行的任务。为了实现这个目标,Fork/Join框架执行的任务有一些限制,如下所示。?任务只能使用Fork和Join操作来执行同步机制。如果使用其他同步机制,工作线程在同步操作期间不能执行其他任务。例如在Fork/Join框架中,如果任务被置于休眠状态,那么在休眠期间,正在执行该任务的工作线程将不会执行其他任务。?在Fork/Join框架中,拆分后的任务不应进行IO操作,如读写数据文件?任务不能抛出已检查的异常,这些异常必须通过必要的代码产生。Fork/Join框架的核心类Fork/Join框架的核心是两个类:ForkJoinPool和ForkJoinTask。ForkJoinPool负责实施工作窃取算法、管理工作线程以及提供有关任务的状态和执行信息。ForkJoinTask主要提供了一种在任务中进行Fork和Join操作的机制。示例代码示例代码如下:packageio.binghe.concurrency.example.aqs;导入lombok.extern.slf4j.Slf4j;导入java.util.concurrent.ForkJoinPool;导入java.util.concurrent.Future;importjava.util.concurrent.RecursiveTask;@Slf4jpublicclassForkJoinTaskExampleextendsRecursiveTask{publicstaticfinalintthreshold=2;私人int开始;私人意图结束;publicForkJoinTaskExample(intstart,intend){this.start=start;this.end=结束;}@OverrideprotectedIntegercompute(){intsum=0;//如果任务足够小,计算任务booleancanCompute=(end-start)<=threshold;if(canCompute){for(inti=start;i<=end;i++){sum+=i;}}else{//如果任务大于阈值,则拆分成两个子任务计算intmiddle=(start+end)/2;ForkJoinTaskExampleleftTask=newForkJoinTaskExample(start,middle);ForkJoinTaskExamplerightTask=newForkJoinTaskExample(中间+1,结束);//执行子任务leftTask.fork();rightTask.fork();//等待任务执行结束并合并其结果intleftResult=leftTask.join();intrightResult=rightTask.join();//组合子任务sum=leftResult+rightResult;}返回总和;}publicstaticvoidmain(String[]args){ForkJoinPoolforkjoinPool=newForkJoinPool();//生成计算任务,计算1+2+3+4ForkJoinTaskExampletask=newForkJoinTaskExample(1,100);//执行任务Futureresult=forkjoinPool.submit(task);尝试{log.info("result:{}",result.get());}catch(Exceptione){log.error("exception",e);}}}点击关注,第一时间了解华为云新技术~