当前位置: 首页 > 科技观察

拥抱Java8并行流并加速!

时间:2023-03-20 16:36:38 科技观察

前言在Java7之前,如果我们要并行处理一个集合,需要经过以下步骤:手动分成几个部分为每个部分创建线程在合适的时候合并。并且还需要注意多线程之间共享变量的修改。Java8为我们提供了并行流,可以一键开启并行模式。是不是很酷?让我们来看看并行流。识别并启用并行流。什么是并行流:并行流是将一个流的内容分成多个数据块,并使用不同的线程分别处理每个不同的数据块的流。比如有这样一个需求:有一个List集合,list中的每个apple对象只有weight。我们还知道,一个苹果的单价是5元/公斤。现在我们需要计算每个苹果的单价。传统的方式是这样的:ListappleList=newArrayList<>();//假装从库中检出数据for(Appleapple:appleList){apple.setPrice(5.0*apple.getWeight()/1000);}我们通过迭代器遍历列表中的苹果对象,完成对每个苹果价格的计算。该算法的时间复杂度为O(list.size()),随着列表大小的增加,时间消耗会线性增加。并行流可以大大减少这个时间。集合的并行流处理方法如下:appleList.parallelStream().forEach(apple->apple.setPrice(5.0*apple.getWeight()/1000));与普通流不同的是这里调用了parallelStream()方法。当然,普通流也可以通过stream.parallel()转换为并行流。推荐阅读:Java8创建Stream的10种方法,更多可以关注Java技术栈公众号回复java获取系列教程。并行流也可以通过sequential()方法转换为顺序流,但是要注意:流的并行和顺序转换不会对流本身做任何实际的改变,它只是一个标记。以及流在一个pipeline上的多次parallel/sequential转换,效果就是最后一个方法调用parallelstream这么方便,它的线程从哪里来?多少?如何配置?并行流在内部使用默认的ForkJoinPool线程池。默认的线程数是处理器的核心数,配置系统的核心属性:java.util.concurrent.ForkJoinPool.common.parallelism可以改变线程池的大小。但是,该值是一个全局变量。更改它会影响所有并行流。当前无法为每个流配置专用线程数。一般来说,用处理器核心数来测试并行流的性能是一个不错的选择。为了更容易测试性能,我们在每次计算完苹果价格后让线程休眠1s,表示这期间还有其他IO相关的操作,并输出程序执行耗时和顺序执行耗时:publicstaticvoidmain(String[]args)throwsInterruptedException{ListappleList=initAppleList();Datebegin=newDate();for(Appleapple:appleList){apple.setPrice(5.0*apple.getWeight()/1000);Thread.sleep(1000);}Dateend=newDate();log.info("苹果个数:{},耗时:{}s",appleList.size(),(end.getTime()-begin.getTime())/1000);}并行版本ListappleList=initAppleList();Datebegin=newDate();appleList.parallelStream().forEach(apple->{apple.setPrice(5.0*apple.getWeight()/1000);try{Thread.sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}});Dateend=newDate();log.info("苹果数量:{}个,耗时:{}s",appleList.size(),(end.getTime()-begin.getTime())/1000);耗时情况和我们预测的一致,我的电脑是四核I5处理器。开启并行后,4个处理器各执行一个线程,1秒完成任务!并行流能随便用吗?可拆分性影响流的速度。通过上面的测试,有些人很容易得出一个结论:并行流的速度非常快,我们完全可以放弃foreach/fori/iter的外部迭代,使用Stream提供的内部迭代来实现。真的是这样吗?并行流真的那么完美吗?答案当然是否定的。你可以复制下面的代码,在你自己的电脑上测试一下。经过测试,事实证明并行流并不总是最快的处理方式。对于iterate方法处理的前n个数,无论并行与否,总是比循环慢。非并行版本可以理解为没有循环的streaming操作导致的慢,更偏向于底层。为什么可并行版本更慢?这里有两点需要注意:2.Iterate生成装箱的对象,必须拆箱成数字才能求和。3、我们很难把iterate分成多个独立的block并行执行。这个问题很有意思。我们重要的是要认识到一些流操作比其他的更容易并行化。对于iterate,函数的每次应用都依赖于前一次应用的结果。那么在这种情况下,我们不仅不能有效地将流分成小块进行处理。相反,由于并行化,开销再次增加。4、对于LongStream.rangeClosed()方法,没有iterate的第二个痛点。它生成基本类型的值,无需拆箱操作。此外,它还可以直接将待生成的数1-n拆分为1-n/4、1n/4-2n/4、……3n/4-n等四部分。因此并进行状态下的rangeClosed()是快于用于跟随外部代理的packagelambdasinaction.chap7;importjava.util.stream.*;publicclassParallelStreams{publicstaticlongiterativeSum(lonng){longresult=0;for(longi=0;i<=n;i++){result+=i;}returnresult;}publicstaticlongsequentialSum(longn){returnStream.iterate(1L,i->i+1).limit(n).reduce(Long::sum).get();}publicstaticlongparallelSum(longn){returnStream.iterate(1L,i->i+1).limit(n).parallel().reduce(Long::sum).get();}publicstaticlongrangedSum(longn){returnLongStream.rangeClosed(1,n).reduce(Long::sum).getAsLong();}publicstaticlongparallelRangedSum(longn){returnLongStream.rangeClosed(1,n).parallel().reduce(Long::sum).getAsLong();}}packagelambdasinaction。chap7;importjava.util.concurrent.*;importjava.util.function.*;publicclassParallelStreamsHarness{publicstaticfinalForkJoinPoolFORK_JOIN_POOL=newForkJoinPool();publicstaticvoidmain(String[]args){System.out.println("IterativeSumdonein:"+measurePerf(ParallelStreams::迭代的Sum,10_000_000L)+"msecs");System.out.println("SequentialSumdonein:"+measurePerf(ParallelStreams::sequentialSum,10_000_000L)+"msecs");System.out.println("ParallelforkJoinSumdonein:"+measurePerf(ParallelStreams::parallelSum,10_000_000L)+"msecs");System.out.println("RangeforkJoinSumdonein:"+measurePerf(ParallelStreams::rangedSum,10_000_000L)+"msecs");System.out.println("ParallelrangeforkJoinSumdonein:"+measurePerf(ParallelStreams::parallelRangedSum,10_000_000L)+"msecs");}publicstaticlongmeasurePerf(Functionf,Tinput){longfastest=Long.MAX_VALUE;for(inti=0;i<10;i++){longstart=System.nanoTime();Rresult=f.apply(input);longduration=(System.nanoTime()-start)/1_000_000;System.out.println("Result:"+result);if(duration