当前位置: 首页 > 科技观察

阿里二面:Java8的Streamapi是迭代一次还是多次

时间:2023-03-20 00:27:31 科技观察

面试官:你用过java8新的streamapi吗?我:这个必须用。面试官:给你一个字符串数组。如果使用streamapi来实现的话,找出以字符'a'开头的最长字符串。如何使用streamapi来实现呢?{"abb","abcd","fegc","efe","adfes"}我:使用如下方法实现:publicstaticvoidmaxLength(Listlist){System.out.println(list.stream().filter(s->s.startsWith("a")).mapToInt(r->length(r)).max().orElse(0));;}面试官:这个操作是迭代一次还是两次?也就是先迭代一次,过滤掉以字符'a'开头的字符串数组,然后再迭代寻找最大长度,还是一次迭代完成?我:这是在一次迭代中完成的。如果是多次迭代,stream后面的操作函数很多,这样的话,效率会很低。我们可以加一个print来验证结果,代码如下:publicstaticvoidmain(String[]args){Listlist=Arrays.asList("abb","abcd","fegc","efe","adfes");intmaxLength=list.stream().filter(s->isStartWitha(s)).mapToInt(StreamTest1::length).max().orElse(0);System.out.println("字符开始字符aString最大长度:"+maxLength);}privatestaticbooleanisStartWitha(Stringa){System.out.println(a+"isstartwitha:"+a.startsWith("a"));returna.startsWith("a");}privatestaticintlength(Stringa){System.out.println("thelengthof"+a+":"+a.length());returna.length();}打印结果如下:abbisstartwitha:truethelengthofabb:3abcdisstartwitha:truelengthofabcd:4fegcisstartwitha:falseefeisstartwitha:falseadfesisstartwitha:truethelengthofadfes:5以字符a开头的字符串的最大长度:5面试官:你确定只迭代一次吗?还有其他情况吗?我可以。filter是一个无状态的中间操作。对于这个中间操作,流处理只需要迭代一次。但是对于有状态的中间操作,需要多次迭代。面试官:您刚才提到了有状态操作和无状态操作。你如何区分它们?我:streamapi中,无状态操作是指当前元素的操作不受前面元素的影响,主要包括以下方法:filter(),flatMap(),flatMapToInt(),flatMapToLong(),flatMapToDouble(),map(),mapToInt(),mapToDouble(),mapToLong(),peek(),unordered()和statefuloperations表示当前操作只有在所有元素都处理完后才能执行,主要包括以下方法:distinct(),limit(),skip(),sorted(),sorted()面试官:有状态的操作,能举个例子吗我?我:比如下面的代码:publicstaticvoidmain(String[]args){Listlist=Arrays.asList(5,2,3,1,4);ListnewArray=list.stream().map(StreamTest2::map1).sorted((o1,o2)->o1-o2).map(StreamTest2::map2).collect(Collectors.toList());System.out.println("NewhasSequencearray:"+newArray);}privatestaticIntegermap1(Integeri){intresult=i*10;System.out.println("Thread:"+Thread.currentThread().getName()+"方法map1输入参数:"+i+",输出:"+result);returnresult;}privatestaticIntegermap2(Integeri){intresult=i*10;System.out.println("线程:"+Thread.currentThread().getName()+"方法map2输入参数:"+i+",输出:"+结果);返回结果;}在上面的代码中,原始数组被迭代了两次。第一次迭代调用map1方法将所有数组元素乘以10,然后对新数组进行排序。第二次迭代对排序后的数组元素调用map2方法,即排序后的数组元素乘以10的方法输出如下:线程:main方法map1输入参数:5,输出:50线程:main方法map1输入参数:2,输出:20线程:main方法map1输入参数:3,输出:30个线程:main方法map1输入参数:1,输出:10个线程:main方法map1输入参数:4,输出:40个线程:main方法map2输入参数:10,输出:100个线程:main方法map2输入参数:20,输出:200线程:main方法map2输入参数:30,输出:300线程:main方法map2输入参数:40,输出:400线程:main方法map2输入参数:50,输出:500neworderedarray:[100,200,300,400,500]面试官:你了解底层原理了吗?我:先画一个Stream的UML类图:这个类图说明了以下几点:AbstractPipeline有基本类型的子类,比如LongPipeline和DoublePipeline,还有一个SubclassesReferencePipeline作为引用类型。无论是ReferencePipeline,还是LongPipeline、DoublePipeline等基本类型Pipeline,其内部都有3个类可以继承。StatelessOp对应无状态操作,StatefulOp对应有状态操作,Head对应Collection.stream()方法的返回结果。无论是StatelessOp、StatefulOp还是Head,都是Pipeline。这些Pipelines通过一个双向链表串联起来。每个Pipeline节点都看成是一个Stage,Head是链表的头节点。上面UML类图中AbstractPipeline类中的previousStage和nextStage分别表示双向链表的当前节点对前后节点的引用。如下图所示:面试官:所有的操作都是用一个双向链表串联起来的,这样所有的操作都可以从Head节点开始依次执行。但是这些操作如何叠加在一起呢?比如下面的代码有3个map方法,后面的方法依赖前面的计算结果:Listlist=Arrays.asList(5,2,3,1,4);ListnewArray=list.stream().map(StreamTest2::map1).map(StreamTest2::map2).map(StreamTest2::map3).collect(Collectors.toList());i:Stream提供了Sink接口来处理操作的叠加。上面代码的map方法将操作封装到了Sink中。当每个节点执行操作时,都会调用Sink的accept方法将操作结果传递给下一个节点的Sink。例如map方法的源码如下:,StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED|StreamOpFlag.NOT_DISTINCT){@Override//返回打包好的SinkSinkopWrapSink(intflags,Sinksink){returnnewSink.ChainedReference(sink){@Overridepublicvoidaccept(P_OUTu){//Downstream是下游节点的Sink,将当前节点的执行结果传递给下游节点downstream.accept(mapper.apply(u));}};}};}面试官:你能不能详细说说Sink?我:Sink主要是提供了以下4个方法//在执行操作之前调用这个方法voidbegin(longsize)//在执行操作之后调用这个方法voidend()//操作是否可以结束booleancancellationRequested()//操作执行函数voidaccept()是针对有状态的操作,必须实现begin和end两个方法,因为begin方法会创建一个容器用于存放中间结果,accept方法会将元素放入容器中,end方法是负责对容器中的元素进行处理,比如排序。面试官:什么时候使用cancellationRequested方法?我:这个方法用于短路操作,比如stream.findAny。采访者:您刚才提到了短路操作。如何区分短路操作和非短路操作?我:短路操作和非短路操作都是Stream的结束操作,结束操作是针对中间操作的。短路操作是指不处理所有元素就可以结束,包括以下方法:anyMatch()、allMatch()、noneMatch()、findFirst()、findAny()非短路操作是指所有元素都需要处理到结束,包括以下方法:forEach(),forEachOrdered(),toArray(),reduce(),collect(),max(),min(),count()总结Stream操作,如图下图:当遇到结束操作时,会将所有被Pipeline节点封装的Sinks串成一个链表,如下图:将Sinks串成链表的过程可以参考下面的源码:finalSinkwrapSink(Sinksink){Objects.requireNonNull(sink);for(@SuppressWarnings("rawtypes")AbstractPipelinep=AbstractPipeline.this;p.depth>0;p=p。previousStage){sink=p.opWrapSink(p.previousStage.combinedFlags,sink);}return(Sink)sink;}这样,从Head节点开始,四个方法各节点封装的Sink中的begin、accept、cancellationRequested、end的ds可以依次调用完成Steam流水线的执行。面试官:上面说了Sinks会形成一条链,那么对于有返回结果的操作,返回的结果存放在哪里呢?我:这里分三种情况:如果返回的结果是boolean(比如anyMatch、allMatch、noneMatch)和Optional(比如findFirst、findAny),则返回的结果存储在对应的Sink中。collect、reduce等规约操作,返回结果存放在用户指定的容器中,例如下面代码的返回结果放在Optional容器中:OptionalaccResult=Stream.of(1,2,3,4,5).reduce((sum,item)->{sum+=item;returnsum;});max和min也是归约操作,因为底层是调用reduce方法实现的。对于返回数组的情况,在返回数组之前,数据会存储在一个多叉树数据结构中。这种多叉树结构的元素存储在树的叶子中,一个叶子节点可以存储多个元素。面试官:你上面提到返回数组的时候用到了多叉树结构。这样做对Stream处理有什么好处?我:按照官方的说法,这样做是为了避免并行操作时不必要的数据拷贝。面试官:能简单介绍一下Stream的并行处理吗?我:Stream的并行处理使用了Fork/Join框架,如下图所示:在计算过程中,首先将任务拆解成子任务,进行并行计算。计算完成后,将子任务计算结果合并为一个结果集。面试官:Fork/Join框架和普通线程池相比有什么优势吗?我:fork/join框架的好处是,如果一个子任务需要等待另一个子任务完成后才能继续工作,处理线程会主动寻找其他未完成的子任务执行。与普通线程池相比,减少了等待时间。面试官:使用Stream并行流,会不会比串行快?我:不一定。使用时要考虑以下因素:处理的元素个数,数据越多,性能提升越明显。数据结构的可分性。Arrays和ArrayList支持随机读取,可分性好。HashSet和TreeSet虽然可以分,但是要平分并不容易。由于长度未知,LinkedList、Streams.iterate和BufferedReader.lines的可分性很差。.尝试使用原始类型并避免装箱和拆箱。单个子任务花费的时间越长,性能增益就越大。面试官:据说Streamapi相对于普通迭代有性能损失。你怎么认为?我:对于简单的处理操作,Streamapi的性能确实不如普通的迭代。但如果CPU性能好,使用Stream并行处理的性能在细节上会有提升。对于复杂的处理操作,无论是并行还是串行,Streamapi都有明显的优势。对于并行处理,应考虑CPU的核心数。

猜你喜欢