《Stream学习笔记(一)》我们介绍了Stream常用的基本操作,今天再介绍一些。前言今天主要介绍Stream中的map和reduce方法,为什么要介绍这个呢?原因是之前看过大数据领域的一个框架叫MapReduce。这个大数据框架的核心关键词是Map和Reduce。同时,这两个关键字也是MapReduce框架中的两个关键功能。Map函数的作用是获取输入并作为键值对,作为函数的输入参数,经过Map函数处理后返回键值对。Reduce处理结果,即合并。下图演示了MapReduce的流程:input输入,split切分,map映射转换,combine组合,reduce是动词,减量减价。这让我很困惑。从示意图来看,reduce执行最后的合并过程。让我们将其理解为聚合。在上图中,combine想要聚合相同的键,最后将它们分成四组,而reduce阶段聚合每个小块本身。基本类似于Stream的reduce功能。reduceJava的Stream也有map和reduce。先说reduce。reduce函数比较难理解。Stream中reduce函数的重载有3种:1.Optionalreduce(BinaryOperatoraccumulator);2.Treduce(Tidentity,BinaryOperatoraccumulator);3.Ureduce(Uidentity,BiFunctionaccumulator,BinaryOperatorcombiner);理解难度从上到下递增。让我们一个一个地看,第一个有如下注解:booleanfoundAny=false;T结果=空;for(Telement:thisstream){if(!foundAny){foundAny=true;结果=元素;}elseresult=accumulator.apply(result,element);返回foundAny?Optional.of(result):Optional.empty();第一个函数相当于上面的代码,上面的代码首先从集合元素中找到第一个,然后对集合中的其他元素进行apply操作。得到结果后,再与其他元素进行运算,直到所有元素都参与运算。这么说可能有点抽象,一个简单的例子就是累加和求和,如下图,所以reduce函数的表达意思是:reduce()方法依次执行你提供的一个reducer函数流中的元素,每次运行reducer都会将前面元素的计算结果作为参数传入,最后将其结果聚合成一个单一的返回值。第一次执行reduce函数时,没有前面元素的计算结果,可以在外部指定(reduce函数的第二次重载)。如果外部不指定,则默认选择第一个元素作为上一次reduce的计算结果(对应reduce函数的第一次重载)。第三个重载的作用是什么,或者第三个函数的第三个参数是做什么用的?首先让我们看一下上面的注释:使用提供的标识、累加和组合函数对该流的元素执行归约。使用提供的初始值、累加和组合对流执行归约操作。等效于以下函数这等效于:Uresult=identity;for(Telement:thisstream)result=accumulator.apply(result,element)返回结果;但不限于按顺序执行。但不限于顺序流标识值必须是组合器函数的标识。这意味着对于所有u,combiner(identity,u)等于u。此外,组合器功能必须与累加器功能兼容(兼容);对于所有u和t,必须满足以下条件:combiner.apply(u,accumulator.apply(identity,t))==accumulator.apply(u,t)必须将特征值用作组合函数的特征,这意味着对于任何元素u,combine(identity,u)等于u。此外,组合器必须与累加器功能兼容。也就是说,对于任意的u和t,都需要满足combiner.apply(u,accumulator.apply(identity,t))==accumulator.apply(u,t)看看这个评论是不是有点云里雾里,没关系,我们看例子,分析combiner和accumulator在流程中是如何工作的。@TestpublicvoidreduceDemo03(){String[]arr={"lorem","ipsum","sit","amet"};列表<字符串>strs=Arrays.asList(arr);intijk=strs.stream().reduce(1,(identity,element)->{System.out.println("执行线程名"+Thread.currentThread().getName());System.out.println("accumulator,identity="+identity+",element="+element);returnidentity+element.length();},(leftResult,rightResult)->{System.out.println("执行线程名"+Thread.currentThread().getName());System.out.println("combine,identity="+leftResult+",element="+rightResult);returnleftResult*rightResult;});System.out.println(ijk);}在上面的例子中,我们传入的accumulator函数将传入的元素相加,combiner函数将传入的元素相乘执行结果如下:看来我们传入的combine函数根本没用,我们用并行流试试:@TestpublicvoidreduceDemo05(){String[]arr={"lorem","ipsum","坐","amet"};列表<字符串>strs=Arrays.asList(arr);intreduceResult=strs.parallelStream().reduce(1,(identity,element)->{System.out.println("累加器执行线程名称:"+Thread.currentThread().getName());System.out.println("Accumulator,identity="+identity+",element="+element);returnidentity+element.length();},(leftResult,rightResult)->{System.out.println("合并执行线程名"+Thread.currentThread().getName());System.out.println("组合,identity="+leftResult+",element="+rightResult);return左结果*左结果;});System.out.println("reduceResult:"+reduceResult);}输出结果如下:我们切换到parallelstream后,执行了combine函数。执行过程就是我们先传入的identity和stream。流中的元素由累加器操作,流中有四个元素。经过累加器运算,得到四个结果,最后通过combine函数将四个结果合并理解了这个过程,我们再看上面的注解:combiner.apply(u,accumulator.apply(identity,t))==accumulator.apply(u,t),我们把上面的过程取值代入到看公式是不是N??ottrue,u是累加器运算后的值,所以我们选择u=6,t是流中的元素,我们选择ipsumcombiner.apply(6,accumulator.apply(1,5))=36accumulator.apply(6,5)=30combiner.apply(u,accumulator.apply(identity,t))==accumulator.apply(u,t)不成立,如果成立会有什么效果?我们尝试重写上面的reduceDemo05,让combiner.apply(u,accumulator.apply(identity,t))==accumulator.apply(u,t),看看结果会是什么:@TestpublicvoidreduceDemo05(){String[]arr={"lorem","ipsum","sit","amet"};列表<字符串>strs=Arrays.asList(arr);intparallelStreamReduceResult=strs.parallelStream().reduce(0,(identity,element)->{System.out.println("累加器执行线程名称:"+Thread.currentThread().getName());System.out.println("累加器,identity="+identity+",element="+element);returnidentity+element.length();},(leftResult,rightResult)->{System.out.println("合并执行线程名"+Thread.currentThread().getName());System.out.println("组合,identity="+leftResult+",element="+rightResult);returnleftResult+rightResult;});intstreamResult=strs.stream().reduce(0,(identity,element)->{System.out.println("AccumulatorexecuteThreadname:"+Thread.currentThread().getName());System.out.println("Accumulator,identity="+identity+",element="+element);returnidentity+element.length();},(leftResult,rightResult)->{System.out.println("合并执行线程名"+Thread.currentThread().getName());System.out.println("组合,identity="+leftResult+",element="+rightResult);返回左结果+右结果;});System.out.println("parallelStreamReduceResult:"+parallelStreamReduceResult);System.out.println("streamResult:"+streamResult);}结论是combiner.apply(u,accumulator.apply(identity,t))==accumulator.apply(u,t)成立,那么reduce的第三个重载函数在parallelstream和serialstream下的结果是一样的如何在并行流中指定一个自定义的线程池我记得之前有人在微信群里问过一个问题,如何在并行流中指定一个自定义的ForkJoinPool,如果不指定,所有的并行流都会使用一个线程池,有时候我们想指定流中的线程池。我记得当时我的回答是否定的。我想我误解了我的孩子们。实际上,您可以通过以下方式使用自定义线程池:@TestpublicvoidforkJoinPool(){ForkJoinPoolforkJoinPool=newForkJoinPool(4);forkJoinPool.submit(()->{Stream.of("lorem","ipsum","sit","amet").parallel().forEach(e->{System.out.println(Thread.currentThread().getName());});});}输出结果:参考什么是MapReduce?https://www.talend.com/resour...Array.prototype.reduce()https://developer.mozilla.org...java源码中对Stream#reduce的解读https://www.bigbrotherlee.com...