当前位置: 首页 > 科技观察

并发编程中的经典分治思想!!

时间:2023-03-17 10:56:48 科技观察

作者个人研发在高并发场景下提供了一个简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。开源半年多以来,已成功为十几家中小企业提供精准定时调度解决方案,经受住了生产环境的考验。为了造福更多的童鞋,这里提供一个开源的框架地址:https://github.com/sunshinelyz/mykit-delay早先在JDK中写的,它提供了这样一个功能:它可以把复杂的逻辑拆分成各个简单的逻辑是并行执行。各并行执行逻辑执行完毕后,将结果进行汇总,得到最终的结果数据。有点像Hadoop中的MapReduce。ForkJoin是JDK1.7之后提供的多线程并发处理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是把一个复杂的计算按照设定的阈值分解成多个计算,然后对每个计算的结果进行聚合。相应地,ForkJoin将复杂的计算作为一个任务,分解后的多个计算作为子任务并行执行。Java并发编程的发展对于Java语言来说,天生就是支持多线程并发编程,它也在并发编程领域不断发展。Java在发展过程中对并发编程的支持越来越完善,恰恰印证了这一点。Java1支持线程,synchronized。Java5引入了线程池、阻塞队列、并发集合、锁、条件队列。Java7添加了fork-join库。Java8添加了并行流。并发与并行并发与并行在本质上还是有区别的。Concurrency并发是指在同一时刻,只有一个线程可以拿到CPU来执行任务,而多个线程是快速轮流执行的,这样就可以在宏观上产生多个线程同时执行的效果。并发不是真正的同时执行,并发可以用下图表示。并行是指在任何时刻,多个线程同时在多个CPU核上执行,是真正意义上的同时执行。分治法的基本思想是将一个大的问题分解成更小的子问题,然后分治,最后将子问题的解组合起来得到原问题的解。步骤①对原问题进行划分;②解决子问题;③将子问题的解合并为原问题的解。我们可以使用以下伪代码来表达此步骤。if(任务很小){直接计算结果}else{拆分成N个子任务调用子任务的fork()计算调用子任务的join()合并计算结果}在分治法中,子问题一般是相互独立的,因此往往通过递归调用算法来解决子问题。典型应用二分搜索大整数乘法斯特拉森矩阵乘法棋盘覆盖归并排序快速排序线性时间选择河内塔ForkJoin并行处理框架ForkJoin框架概述Java1.7引入了一个新的并发框架——Fork/Join框架,主要用于实现“分而治之”"算法,尤其是分而治之后递归调用的函数。ForkJoin框架的本质是一个并行执行任务的框架,可以将一个大任务拆分成若干个小任务,最后将各个小任务的结果汇总得到大任务的计算结果。在Java中,ForkJoin框架与ThreadPool共存,并不是为了取代ThreadPool。实际上,Java8中引入的并行流计算在内部是使用ForkJoinPool实现的。例如,下面实现了一个使用并行流打印元组数组的程序。publicclassSumArray{publicstaticvoidmain(String[]args){ListnumberList=Arrays.asList(1,2,3,4,5,6,7,8,9);numberList.parallelStream().forEach(System.out::println);}}这段代码后面用到了ForkJoinPool。说到这里,可能有读者会问:用线程池的ThreadPoolExecutor可以实现吗?为什么要使用ForkJoinPool?ForkJoinPool是什么鬼?!接下来,我们就来回答这个问题。ForkJoin框架原理ForkJoin框架是jdk1.7引入的新特性。与ThreadPoolExecutor一样,它也实现了Executor和ExecutorService接口。它使用一个无限队列来保存需要执行的任务,线程数通过构造函数传入。如果未将指定的线程数传递给构造函数,则将当前计算机可用的CPU数设置为默认的线程数。ForkJoinPool主要使用分而治之的算法来解决问题。一个典型的应用是快速排序算法。这里的重点是ForkJoinPool能够使用相对较少的线程处理大量任务。比如对1000万条数据进行排序,任务会分为两个500万条的排序任务和一个针对两组500万条数据的合并任务。以此类推,对500万条数据进行同样的切分过程,最后会设置一个阈值,指定当数据量达到极限时,这种切分过程将停止。例如,当元素个数小于10时,停止分裂,改用插入排序对它们进行排序。那么最后所有任务加起来大概200万+左右。问题的症结在于,只有当任务的所有子任务都完成时,任务才能执行。所以在使用ThreadPoolExecutor的时候,就出现了分而治之的问题,因为ThreadPoolExecutor中的线程不能再往任务队列中添加一个任务,等待任务完成再继续执行。使用ForkJoinPool可以解决这个问题,它可以让其中的线程创建一个新任务,并暂停当前任务,此时线程可以从队列中选择子任务执行。那么使用ThreadPoolExecutor和ForkJoinPool在性能上有什么区别呢?首先,使用ForkJoinPool可以用有限的线程数完成很多有父子关系的任务,比如用4个线程完成200万以上的任务。但是在使用ThreadPoolExecutor的时候是无法完成的,因为ThreadPoolExecutor中的Thread不能选择先执行子任务。当需要完成200万个具有父子关系的任务时,也需要200万个线程。显然,这是不可行的,也很不合理!!工作窃取算法如果我们需要做一个比较大的任务,我们可以把这个任务分成几个独立的子任务。为了减少线程之间的竞争,我们把这些子任务分别放到不同的队列中去,为每个队列创建一个单独的线程来执行队列中的任务。线程和队列一一对应。比如线程A负责处理队列A中的任务。但是有些线程会先完成自己队列中的任务,而其他线程对应的队列中还有任务等待处理。与其等待,已经完成工作的线程还不如帮助其他线程工作,于是它去其他线程的队列中偷一个任务来执行。这时候他们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常使用双端队列,被窃取任务线程总是从头部开始执行任务的双端队列。窃取任务的线程总是从双端队列的尾部开始执行任务。工作窃取算法的优点:充分利用线程进行并行计算,减少线程间的竞争。工作窃取算法的缺点:在某些情况下仍然存在竞争,比如双端队列中只有一个任务时。而且算法会消耗更多的系统资源,比如创建多个线程,多个双端队列。Fork/Join框架的局限性:对于Fork/Join框架来说,当一个任务正在等待它使用Join操作创建的子任务结束时,执行该任务的工作线程会寻找其他未执行的任务并开始执行这些未执行的任务。任务以线程充分利用其运行时间来提高应用程序性能的方式执行。为了实现这个目标,Fork/Join框架执行的任务有一些限制。(1)任务只能使用Fork和Join操作来执行同步机制。如果使用其他同步机制,工作线程在同步操作期间不能执行其他任务。例如在Fork/Join框架中,如果任务被置于休眠状态,那么在休眠期间,正在执行该任务的工作线程将不会执行其他任务。(2)在Fork/Join框架中,拆分任务不应该进行IO操作,比如读写数据文件。(3)任务不能抛出已检查的异常,必须通过必要的代码才能摆脱这些异常。ForkJoin框架的实现ForkJoin框架中的一些重要类如下所示。ForkJoinPool框架涉及的主要类如下。1、ForkJoinPool类实现了ForkJoin框架中的线程池。从类图中可以看出,ForkJoinPool类实现了线程池的Executor接口。从下图我们也可以看出ForkJoinPool的类图关系。其中,ForkJoinPool可以使用Executors.newWorkStealPool()方法创建。ForkJoinPool提供了以下提交任务的方法。publicvoidexecute(ForkJoinTasktask)publicvoidexecute(Runnabletask)publicTinvoke(ForkJoinTasktask)publicList>invokeAll(Collection>tasks)publicForkJoinTasksubmit(ForkJoinTasktask)publicForkJoinTasksubmit(Callabletask)publicForkJoinTasksubmit(Runnabletask,Tresult)publicForkJoinTasksubmit(Runnabletask)2.ForkJoinWorkerThread类实现了ForkJoin框架中的线程。3、ForkJoinTask类ForkJoinTask封装了数据及其对应的计算,支持细粒度的数据并行。ForkJoinTask比线程更轻量,ForkJoinPool中的少量工作线程可以运行大量的ForkJoinTask。ForkJoinTask类主要包括fork()和join()两个方法,分别实现了任务的拆分和合并。fork()方法类似于Thread.start(),但它不是立即执行任务,而是将任务放入工作队列。与Thread.join()方法不同,ForkJoinTask的join()方法不是简单地阻塞线程,而是使用工作线程来运行其他任务。当工作线程调用join()时,它将处理其他任务,直到它注意到Target子任务已完成。我们可以用下图来表示这个过程。ForkJoinTask有3个子类:RecursiveAction:没有返回值的任务。RecursiveTask:返回值的任务。CountedCompleter:任务完成后会触发其他任务。4.RecursiveTask类有一个返回结果的ForkJoinTask实现Callable。5、RecursiveAction类中不返回结果的ForkJoinTask实现Runnable。6、CountedCompleter类在任务完成后触发自定义钩子函数的执行。ForkJoin示例程序包io.binghe.concurrency.example.aqs;importlombok.extern.slf4j.Slf4j;importjava.util.concurrent.ForkJoinPool;importjava.util.concurrent.Future;importjava.util.concurrent.RecursiveTask;@Slf4jpublicclassForkJoinTaskExampleextendsRecursiveTask{publicstaticfinalintthreshold2;privateintstart;privateintend;publicForkJoinTaskExample(intstart,intend){this.start=start;this.end=end;}@OverrideprotectedIntegercompute(){intsum=0;//如果任务足够小,计算任务booleancanCompute=(end-start)<=threshold;if(canCompute){for(inti=start;i<=end;i++){sum+=i;}}else{//如果任务大于阈值,拆分成两个子任务计算intmiddle=(start+end)/2;ForkJoinTaskExampleleftTask=newForkJoinTaskExample(start,middle);ForkJoinTaskExamplerightTask=newForkJoinTaskExample(middle+1,end);//执行子任务leftTask.fork();rightTask.fork();//等待任务执行并合并结果inleftResult=leftTask.join();intrightResult=rightTask.join();//合并子任务sum=leftResult+rightResult;}returnsum;}publicstaticvoidmain(String[]args){ForkJoinPoolforkjoinPool=newForkJoinPool();//生成一个计算任务,计算1+2+3+4ForkJoinTaskExampletask=newForkJoinTaskExample(1,100);//执行一个任务Futureresult=forkjoinPool.submit(task);try{log.info("result:{}",result.get());}catch(Exceptione){log.error("exception",e);}}}本文转载自微信公众号“冰河科技”,关注二维码转载本文请联系冰川科技公众号。