当前位置: 首页 > 后端技术 > Java

送你一个并发编程的奇淫技巧,要你舒服的起飞...

时间:2023-04-01 22:39:52 Java

送你一个绝妙的并发编程技巧,让你舒舒服服的起飞。不得不说,这真的是一个非常有野心的项目。在整理的过程中,我发现早期写的作品还是很多的,阅读量很是捉襟见肘。在整理文章的过程中,我发现其实有很多知识点是我之前写的,可能记不太清了。惭愧惭愧。但是这确实是一个重新学习和巩固的过程,非常好。再说说Future。这篇文章我们就来说说CompletionService。在说它之前,我们先回顾一下Future的用法。先问你,当你向线程池提交一组计算任务的时候,你想得到返回值。您应该使用哪种执行器提交方法?这个提交方法的重载类型是什么?什么?你不能回答?呸,你个没良心的人,你上周白嫖完了就忘了?里面有一篇文章写到:既然是一组计算任务,就想得到返回值来做事情。这个返回值封装在Future中。如何获得?调用Future的get方法,有不超时的舔狗式get,也有超时不死心的get,到点就放弃:我们一起看一个例子:publicclassJDKThreadPoolExecutorTest{publicstaticvoidmain(String[]args)throwsException{ExecutorServiceexecutorService=Executors.newCachedThreadPool();ArrayList>list=newArrayList<>();Futurefuture_15=executorService.submit(()->{TimeUnit.SECONDS.sleep(15);System.out.println("15s执行完毕。");return15;});list.add(future_15);Futurefuture_5=executorService.submit(()->{TimeUnit.SECONDS.sleep(5);System.out.println("执行时间5s执行完成。");return5;});list.add(future_5);Futurefuture_10=executorService.submit(()->{TimeUnit.SECONDS.sleep(10);System.out.println("执行时间10s执行完毕。");return10;});list.add(future_10);System.out.println("开始准备获取结果");for(Futurefuture:list){System.out.println("future.get()="+future.get());}Thread.currentThread().join();}}现在有三个任务,执行时间分别是15s/10s/5s。这三个Callable任务都是通过JDK线程池的submit方法提交的。你用眼睛编译,心里输出。你认为这段代码的输出是什么。首先,主线程将三个任务提交到线程池中去,将对应返回的Future存入List中,然后执行“开始准备获取结果”的输出语句。然后进入for循环,在循环中执行future.get()操作,阻塞等待。看看你认为的输出是不是这样的:从这个输出中,我们可以看出问题所在。明显的木桶效应。三个异步任务中,耗时最长的最先执行,所以先进入链表,所以在循环中获取任务结果时,get操作会一直阻塞,即使执行时间为5s/10s已经执行完毕。好吧,让我们举个例子。想象一个场景:假设你是海王星,你有很多普通的女性朋友。你同时请了三个女性朋友吃饭。分别告诉他们:你先化妆,化完了告诉我,我开车来接你。小红化妆需要2个小时。小花化妆需要1个小时。小媛化妆需要30分钟。自从你先告诉了小红,你就一直在小红家门口等着小红化完妆。当小红化完妆,你去车里接她的时候,另外两个朋友已经准备好了,等着你去家里接她。这不是一个合格的海王应该有的样子。这是future在这种情况下的局限性。根据上面的场景编码可以得到(代码可以直接复制粘贴使用,建议大家拿出来运行一下):executorService=Executors.newCachedThreadPool();ArrayList>list=newArrayList<>();System.out.println("约几个女生一起吃饭。");Futurefuture_15=executorService.submit(()->{System.out.println("小红:好的,兄弟,我需要补上2个小时,等一下。");TimeUnit.SECONDS.sleep(15);System.out.println("小红:我按时2小时融化完了,哥哥来接我。");return"小红做完了。";});list.add(future_15);Futurefuture_5=executorService.submit(()->{System.out.println("小源:好的,兄弟,我需要30分钟补课,等一个分钟哦。");TimeUnit.SECONDS.sleep(5);System.out.println("小元:我准时30分钟做完了,哥哥来接我。");return"小元做完了。";});list.add(future_5);Futurefuture_10=executorService.submit(()->{System.out.println("小花:好的,兄弟,我需要一个小时补上,稍等。");TimeUnit.SECONDS.sleep(10);System.out.println("小花:我1小时内按时完成,哥哥来接我。");return"小花完成了。";});list.add(future_10);TimeUnit.SECONDS.sleep(1);System.out.println("所有通知都结束了,稍等。");for(Futurefuture:list){System.out.println(future.get()+"我去接她。");}Thread.currentThread().join();}}输出结果如下:都是一样的普通朋友,为什么是不是要等化妆时间最长的小红?你为什么不接谁动作快的人?你看,你这么操作,小元和小花是怎么想的?我只能说:你是个好人。什么?你中央空调还问我“什么是海王星”?CompletionService保存Neptune还是上面那个场景,当我们引入CompletionService的时候,看起来就不一样了。先直接看用法:ExecutorServiceexecutorService=Executors.newCachedThreadPool();ExecutorCompletionServicecompletionService=newExecutorCompletionService<>(executorService);使用起来很方便,直接用ExecutorCompletionService把线程池包裹起来就可以了。然后在提交任务的时候使用competitionService的submit方法。代码如下:publicclassExecutorCompletionServiceTest{publicstaticvoidmain(String[]args)throwsException{ExecutorCompletionServicecompletionService=newExecutorCompletionService<>(executorService);System.out"println(请几个女生一起吃饭。");completionService.submit(()->{System.out.println("小红:好的,兄弟,我需要补2个小时,等一下。");TimeUnit.SECONDS.sleep(15);System.out.println("小红:我按时2小时融化完了,哥哥来接我。");return"小红完成了。";});completionService.submit(()->{System.out.println("小远:好的,兄弟,我化妆需要30分钟,等一下。");TimeUnit.SECONDS.sleep(5);System.out.println("小媛:我30分钟后准备好,哥哥来接我。");return"小媛说完了。";});completionService.submit(()->{System.out.println("小花:好的,兄弟,我要化妆一个小时,等一下。");TimeUnit.SECONDS.sleep(10);System.out.println("小花:我我准时一个小时结束了,过来接我。");return"小花结束了。";});TimeUnit.SECONDS.sleep(1);System.out.println("所有通知都over,justwait.");//循环3次因为上面提交了3个异步任务for(inti=0;i<3;i++){StringreturnStr=completionService.take().get();System.out.println(returnStr+"我要去接她");}Thread.currentThread().join();}}先用眼睛编译,再在心里输出……算了,别了不编译,直接给大家看结果,我等不及了A:谁先化妆就先去接他。在我写这篇文章的时候,当我看到这个输出时,我不禁鼓掌。真正的海王星应该是时间管理大师。先对比一下输出,都站起来,一起鼓掌:再对比两个版本代码的区别:变化不大,甚至可以忽略不计。执行提交方法的对象变为ExecutorCompletionService。获取任务结果的方法变为:StringreturnStr=completionService.take().get();先别看原理。您可以仔细品味获得结果的方法。completionService.take()取出东西,然后调用get方法。根据这个get,直觉告诉我take的结果一定是future对象。而这个未来的对象必须放在一个队列中。下一节,我就带大家确认一下。CompletionService的原理首先,CompletionService是一个接口:ExecutorCompletionService是这个接口的实现类:查看ExecutorCompletionService的构造方法:可以看到需要传入一个线程池对象,队列默认使用LinkedBlockingQueue。当然我们也可以指定使用什么队列:然后看它的任务提交方式:因为ExecutorCompletionService主要是用来优雅地处理返回值的。所以它支持两种提交类型的提交,这两种类型都有返回值。上面的时间管理大师版Neptune使用的是Callable类型的方法。我们先比较一下Executor直接提交和ExecutorCompletionService提交的区别:区别在于execute方法。ExecutorCompletionService提交这样的任务:executor.execute(newQueueingFuture(f));区别在于execute方法中的Runable:看看这个QueueingFuture是什么:奥秘基本就在里面。QueueingFuture继承自FutureTask。覆盖done方法,然后将任务放入队列中。这个方法的意思是当任务被执行时,会被放入队列中。也就是说,队列中的任务都是done任务,tasks都是futures。如果调用队列的task方法,就是阻塞等待。等待的一定是readyfuture,调用get可以立即得到结果。你说这组操作是做什么的?这不是在做脱钩吗?以前,提交任务后,需要直接关心每个任务返回的future。现在CompletionService会为您跟踪这些未来。caller和future的解耦就完成了。原理分析完了,再来说说一个需要注意的地方。当你的使用场景是不关心返回值的时候,不要闲着用CompletionService提交任务。为什么?因为前面说了,里面是有队列的。而当你不关心返回值的时候,你就不会去处理这个队列,导致这个队列中堆积的对象越来越多。最后,它爆炸了,OOM了。在开源框架中的应用中,说CompletionService是一个接口。除了JDK的ExecutorCompletionService实现了这个接口。开源框架中也有相应的实现。比如Redisson:看这个实现,思路和ExecutorCompletionService是一模一样的,只是实现略有不同。它把future放到queue上的时候,并没有重写done方法,而是使用了响应式编程的onComplete:CompletionService的核心思想是:Executor加Queue。这个想法让我想起了在Dubbo看到的一个类:这个类的doInvoker方法中的核心逻辑如下:首先在标①的地方定义了一个队列。标有②的地方在循环体中提交了异步任务。多个服务提供商需要多个周期。子线程将返回结果放入队列中标有③的地方。只要放入,在标有④的地方(规定时间内)就可以获取到,然后程序立即返回。这样就可以并行调用多个服务提供者,只要有一个服务提供者返回就立即返回的功能。我觉得这个思路和CompletionService的思路有一点共同点。我们必须学习CompletionService及其思想。