当前位置: 首页 > 科技观察

ParallelStream的坑,不踩不知道,踩了吓一跳

时间:2023-03-11 22:23:39 科技观察

转载本文请联系味觉小姐姐公众号。很多同学喜欢用lambda表达式,可以让你定义短小精悍的函数,体现出你高超的编码水平。当然,这个功能对于一些以代码行数衡量工作量的公司来说是比较不利的。例如,下面的代码片段让人读起来就像一首诗。但是如果使用不好,那就是致命的。ListtransactionsIds=widgets.stream().filter(b->b.getColor()==RED).sorted((x,y)->x.getWeight()-y.getWeight()).mapToInt(Widget::getWeight).sum();这段代码有一个关键的功能,就是stream。通过它,可以将一个普通的列表转换为流,然后以类似管道的方式操作列表。总而言之,用过的都很好。对这些功能还不是很熟悉?可以参考:《到处是map、flatMap,啥意思?》问题是,如果我们把stream换成parallelStream会怎么样?按照字面意思,stream会从serial变成parallel。既然是并行,用屁股想想,就知道肯定有线程安全问题。但是我们这里讨论的不是让你使用线程安全的集合,这个话题太低级了。在这个阶段,知道如何在线程不安全的环境下使用线程安全的集合已经是基本功了。这次踩坑的地方是并行流的性能问题。我们用代码说话。下面的代码启动了8个线程,都是使用并行流进行数据计算。在执行逻辑中,我们让每个任务休眠1秒,这样可以模拟一些I/O请求的耗时等待。使用stream,30秒后程序会返回,但是我们期望程序在1秒以上返回,因为是并行流,对得起这个称号。测试发现我们等待了很久才执行任务。staticvoidparalleTest(){Listnumbers=Arrays.asList(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29);finallongbegin=System.currentTimeMillis();numbers.parallelStream().map(k->{try{Thread.睡眠(1000);System.out.println((System.currentTimeMillis()-begin)+"ms=>"+k+"\t"+Thread.currentThread());}catch(InterruptedExceptione){e.printStackTrace();}returnk;}).collect(Collectors.toList());}publicstaticvoidmain(String[]args){//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");newThread(()->paralleTest()).start();newThread(()->paralleTest()).start();newThread(()->paralleTest()).start();newThread(()->paralleTest().start();newThread(()->paralleTest()).start();newThread(()->paralleTest()).start();newThread(()->paralleTest()).start();newThread(()->paralleTest()).start();}坑其实这段代码在不同的机器上执行的时间是不一样的。既然是并行,就一定要有一定程度的并行。如果太低,则不能体现并行性能;如果太大,会浪费上下文切换的时间。我很沮丧地发现,很多高级研发开发人员死记硬背了线程池的各种参数,各种调优,还敢在I/O密集型业务中使用parallelStream视而不见。要理解这种并行度,我们需要看具体的构造方法。在ForkJoinPool类中找到这样的代码。try{//ignoreexceptionsinaccessing/parsingpropertiesStringpp=System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");if(pp!=null)parallelism=Integer.parseInt(pp);fac=(ForkJoinWorkerThreadFactory)newInstanceFromSystemProperty("java.util.concurrent.ForkJoinPool.common.threadFactory");handler=(UncaughtExceptionHandler)newInstanceFromSystemProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler");}catch(Exceptionignore){}if(fac==null){if(System.getSecurityManager()==null)fac=defaultForkJoinWorkerThreadFactory;else//usesecurity-manageddefaultfac=newInnocuousForkJoinWorkerThreadFactory();}if(parallelism<0&//default1lessthan#cores(parallelism=Runtime.getRuntime().availableProcessors()-1)<=0)并行度=1;if(并行度>MAX_CAP)并行度=MAX_CAP;可以看出,并行度是由以下参数控制的。如果获取不到该参数,则默认使用CPU个数-1的并行度。可见这个功能是为计算密集型业务设计的。如果你给它提供很多任务,它就会从并行执行退化为类似串行的效果。-Djava.util.concurrent.ForkJoinPool.common.parallelism=N即使您使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=N设置初始大小,它仍然有问题。因为parallelism变量是final的,一旦设置,就不允许修改。也就是说,以上参数只会生效一次。张三可以使用如下代码设置并行度大小为20。System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");李斯可能用同样的方法将这个值设置为30。那么项目中实际使用的是哪个值,那你就得问JVM是怎么加载类信息的。这种方法不是很可靠。作为解决方案,我们可以通过提供外部的forkjoinpool,也就是改变提交方式来实现不同类型的任务分离。代码如下所示,可以通过显式提交代码实现任务分离。ForkJoinPoolpool=newForkJoinPool(30);finallongbegin=System.currentTimeMillis();try{pool.submit(()->numbers.parallelStream().map(k->{try{Thread.sleep(1000);System.out.println((System.currentTimeMillis()-begin)+"ms=>"+k+"\t"+Thread.currentThread());}catch(InterruptedExceptione){e.printStackTrace();}returnk;}).collect(Collectors.toList())).get();}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionExceptione){e.printStackTrace();}这样,不同的场景可以有不同的并行度。这个方法类似于CountDownLatch,我们需要手动管理资源。使用这种方法,代码量增加,与优雅度关系不大。不仅不优雅,还很丑陋。白天鹅变成了丑小鸭,你还会爱它吗?作者简介:品味小姐姐(xjjdog),一个不允许程序员走弯路的公众号。专注于基础架构和Linux。十年架构,每天百亿流量,与你探讨高并发世界,给你不一样的滋味。我的个人微信xjjdog0,欢迎加好友进一步交流。