当前位置: 首页 > 科技观察

深入理解JavaStream管道,学习了!

时间:2023-03-18 01:03:15 科技观察

我们之前学习过StreamAPI的使用方法,用起来真的很爽,但是简洁的方法下似乎隐藏着无穷无尽的秘密。如此强大的API是如何实现的呢?Pipeline是如何执行的,每个方法调用是否都会导致迭代?自动并行是如何实现的?线程数是多少?本节我们学习Stream管道的原理,这是Stream实现的关键。首先回顾一下容器执行Lambda表达式的方式。以ArrayList.forEach()方法为例,具体代码如下://ArrayList.forEach()publicvoidforEach(Consumeraction){...for(inti=0;modCount==expectedModCount&&is.startsWith("A")).mapToInt(String::length).max();确定以字母A开头的字符串的最大长度的一种直接方法是对每个函数调用执行一次迭代。这样可以实现功能,但是效率肯定是不行的。类库的实现通过使用管道(Pipeline)巧妙地避免了多次迭代。基本思想是在一次迭代中执行尽可能多的用户指定操作。为了方便说明,我们总结一下Stream的所有操作。Stream上的所有操作分为两类:中间操作和结束操作。中间操作只是一个标记,只有结束操作才会触发真正的计算。中间操作可以分为无状态(Stateless)和有状态(Stateful)。无状态中间操作意味着元素的处理不受前面元素的影响,而有状态中间操作必须等到所有元素都被处理完。最终的结果,比如排序是一个有状态的操作,需要读取所有元素才能确定排序结果;末端操作可分为短路操作和非短路操作。短路操作是指不处理所有元素就可以返回结果,比如找到第一个满足条件的元素。如此细粒度划分的原因是因为底层对每种情况的处理方式不同。一个简单的实现仍然考虑上述寻找最长字符串的程序。一种直接的流水线实现是对每个函数调用执行一次迭代,并将中间结果放入某种数据结构(如数组、容器等)中。具体是调用filter()方法后立即执行,选择所有以A开头的字符串放入一个列表list1中,然后将list1传递给mapToInt()方法并立即执行,生成的结果放入list2中,最后遍历list2,找出最大的数作为最终结果。程序的执行流程如下:实现起来非常简单直观,但是有两个明显的缺点:迭代次数大。迭代次数等于函数调用次数。经常产生中间结果。每个函数调用都会产生一个中间结果,存储开销是不可接受的。这些缺点使得效率低下且不可接受。如果不使用StreamAPI,我们都知道如何在一次迭代中完成上面的代码,大致是这样的形式:intlongest=0;for(Stringstr:strings){if(str.startsWith("A")){//1.filter(),保留A开头的字符串intlen=str.length();//2.mapToInt(),转为长度longest=Math.max(len,longest);//3.max(),保留Thelongestlength}}这样,我们不仅减少了迭代次数,还避免了存储中间结果。显然这是管道,因为我们在一次迭代中放置了三个操作。只要我们提前知道用户的意图,我们总能用上面的方法实现相当于StreamAPI的功能,但问题是Stream类库的设计者并不知道用户的意图是什么。如何在不假设用户行为的情况下实现流水线是类库设计者需要考虑的问题。Streampipeline方案我们可以粗略的认为,应该用某种方式记录用户每一步的操作,当用户调用结束操作时,之前记录的操作会叠加执行一次迭代。沿着这个思路,有几个问题需要解决:如何记录用户的操作?操作如何叠加?叠加后如何进行运算?执行后的结果(如果有的话)在哪里?如何记录操作注意这里用了“操作”这个词,指的是“Stream中间操作”的操作。很多Stream操作都会需要回调函数(Lambda表达式),所以一个完整的操作就是<数据源,操作,回调函数>三元组。在Stream中使用Stage的概念来描述一个完整的操作,并使用某种实例化的PipelineHelper来表示Stage,将具有顺序顺序的阶段连接在一起,形成整个流水线。与Stream相关的类和接口的继承关系说明。还有图中没有显示的IntPipeline、LongPipeline、DoublePipeline。这三个类是专门为三个基本类型(不是封装类型)定制的,与ReferencePipeline是平行关系。图中的head用于表示第一个Stage,即调用Collection.stream()方法生成的Stage。显然,这个Stage不包含任何操作;StatelessOp和StatefulOp分别代表stateless和statefulStages,分别对应Stateless和stateful中间操作。Stream流水线组织结构示意图如下:图中通过Collection.stream()方法得到Head,也就是stage0,然后调用一系列的中间操作,不断产生新的Streams。这些Stream对象以双向链表的形式组织在一起,构成了整个流水线。由于每个Stage都记录了上一个Stage以及本次操作和回调函数,所以所有对数据源的操作都可以依赖这个结构建立起来。这就是Stream记录操作的方式。操作如何叠加上面只是解决了操作记录的问题。为了让流水线发挥应有的作用,我们需要一个解决方案,将所有的操作叠加在一起。你可能认为这很简单,你只需要从管道的头部开始依次执行每一步操作(包括回调函数)。这听起来可行,但是你忽略了前面的Stage,不知道后面的Stage进行了什么样的操作,回调函数是什么样的。换句话说,只有当前Stage本身知道如何执行它包含的动作。这就需要某种协议来协调相邻Stage之间的调用关系。该协议由Sink接口完成。Sink接口包含的方法如下表所示:有了上面的协议,相邻Stage之间的调用就非常方便了。每个Stage都会将自己的操作封装到一个Sink中。一个Stage只需要调用后面Stage的accept()方法即可,不需要知道它内部是如何处理的。当然,对于有状态的操作,Sink的begin()和end()方法也是必须要实现的。例如Stream.sorted()是一个有状态的中间操作,它对应的Sink.begin()方法可能会创建一个容器用于乘法和放置结果,而accept()方法负责向容器中添加元素,最后end()负责对容器进行排序。对于短路操作,还必须实现Sink.cancellationRequested()。例如Stream.findFirst()是一个短路操作。只要找到一个元素,cancellationRequested()就应该返回true,以便调用者尽快结束搜索。Sink的四个接口方法经常相互配合完成计算任务。其实StreamAPI内部实现的本质就是如何重载Sink的四个接口方法。(Java知音公众号回复“面试题汇总”,我送你一套Java面试题集)有了Sink对操作的封装,解决了Stage之间的调用问题。执行的时候,只需要从流水线的头部开始。数据源依次调用每个Stage对应的Sink.{begin(),accept(),cancellationRequested(),end()}方法。一个可能的Sink.accept()方法流程如下:voidaccept(Uu){1.使用当前Sink封装的回调函数处理u2。将处理结果传递给管道下游的Sink}Sink接口的其他几个方法也是按照这种【处理->转发】模型实现的。下面结合具体的例子,看看Stream的中间操作是如何将自己的操作打包到Sink中,以及Sink是如何将处理结果转发给下一个Sink的。先看Stream.map()方法://Stream.map(),调用这个方法会生成一个新的StreampublicfinalStreammap(Functionmapper){...returnnewStatelessOp(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED|StreamOpFlag.NOT_DISTINCT){@Override/*opWripSink()方法返回Sink*/SinkopWrapSink(intflags,Sinkdownstream){returnnewSink.ChainedReference(下游){@Overridepublicvoidaccept(P_OUTu){Rr=mapper.apply(u);//1.使用当前Sink包裹的回调函数mapper处理udownstream.accept(r);//2.将处理结果传递给管道下游的Sink}};}};}上面的代码看似复杂,其实逻辑很简单,就是将回调函数mapper包装成一个Sink。由于Stream.map()是一个无状态的中间操作,map()方法返回一个StatelessOp内部类对象(一个新的Stream),调用这个新的Stream的opWripSink()方法会得到一个当前回调函数的包装器Sink.让我们看一个更复杂的例子。Stream.sorted()方法将对Stream中的元素进行排序。很明显,这是一个有状态的中间操作,因为在读取所有元素之前无法得到最终的顺序。抛开模板代码,直接进入问题本质,sorted()方法是如何将操作封装成一个Sink的呢?可以封装的一个sorted()Sink代码如下://Stream.sort()方法使用的Sink实现classRefSortingSinkextendsAbstractRefSortingSink{privateArrayListlist;//存储用于排序的元素RefSortingSink(Sinkdownstream,Comparatorcomparator){super(downstream,comparator);}@Overridepublicvoidbegin(longsize){...//创建一个列表来存储排序后的元素list=(size>=0)?newArrayList((int)size):newArrayList();}@Overridepublicvoidend(){list.sort(comparator);//接收到所有元素后才能开始排序downstream.begin(list.size());if(!cancellationWasRequested){//下游Sink不包含短路操作list.forEach(downstream::accept);//2.将处理结果传递给管道下游的Sink}else{//下游Sink包含短路操作for(Tt:list){//每次调用cancellationRequested()询问是否可以结束处理。if(downstream.cancellationRequested())break;down??stream.accept(t);//2.将处理结果传递给管道下游的Sink}}downstream.end();list=null;}@Overridepublicvoidaccept(Tt){list.add(t);//1.使用当前Sink的wrapperaction来处理t,只需简单地将元素添加到中间列表即可}}上面的代码完美的展示了Sink的四个接口方法是如何协同工作的:首先,beging()方法告诉Sink涉及的元素个数排序时,便于确定中间结果容器的大小;然后通过accept()方法向中间结果添加元素,最后执行时调用者会继续调用这个方法,直到遍历完所有元素;最后end()方法告诉Sink所有元素都遍历完毕,开始排序步骤。排序完成后,将结果传递给下游的Sink;完成处理。叠加后如何执行操作Sink完美的封装了Stream的每一个操作,提供了[处理->转发]的方式进行叠加操作。这一系列的齿轮已经啮合,最后一步就是拨动齿轮开始执行。是什么开始了这个操作链?也许你已经想到,启动的原动力是终端操作(TerminalOperation)。一旦某个终端操作被调用,就会触发整个流水线的执行。结束操作后不能再有其他操作,所以结束操作不会创建新的流水线阶段(Stage)。直观来说,pipeline的链表后面不会再扩展了。endoperation会创建一个Sink来包装自己的operation,这也是pipeline中的最后一个Sink。这个Sink只需要处理数据,不需要把结果传递给下游的Sink(因为没有下游)。对于Sink的[处理->转发]模型,结束操作的Sink是调用链的出口。让我们检查一下上游Sink是如何找到下游Sink的。一个可选方案是在PipelineHelper中设置一个Sink字段,找到pipeline中的下游Stage并访问Sink字段。但是Stream类库的设计者并没有这样做,而是设置了一个SinkAbstractPipeline.opWrapSink(intflags,Sinkdownstream)方法来获取Sink,这个方法的作用是返回一个包含当前Stage的新操作代表并可以将结果传递给下游的Sink对象。(Java知音公众号回复“面试题汇总”,送你Java面试题集)为什么要生成一个新的对象,而不是返回一个Sink字段?这是因为使用opWrapSink()可以将当前的操作和下游的Sink(上面的下游参数)组合成一个新的Sink。试想,只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink()方法,直到最开始(不包括stage0,因为stage0代表数据源,不包含操作),你可以获得管道上所有操作的代表。Sink,用代码表达是这样的://AbstractPipeline.wrapSink()//从下游到上游不断地包裹Sink。如果最初传入的sink代表操作结束,//函数返回时可以得到一个代表管道上所有操作的Sink。finalSinkwrapSink(Sinksink){...for(AbstractPipelinep=AbstractPipeline.this;p.depth>0;pp=p.previousStage){sink=p.opWrapSink(p.previousStage.combinedFlags,sink);}return(Sink)sink;}现在pipeline上从头到尾的所有操作都被打包到一个Sink中。执行这个Sink相当于执行整个pipeline,执行Sink的代码如下://AbstractPipeline.copyInto(),对spliterator代表的数据进行wrappedSink代表的操作。finalvoidcopyInto(SinkwrappedSink,Spliteratorspliterator){...if(!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())){wrappedSink.begin(spliterator.getExactSizeIfKnown());//Notification开始遍历spliterator.forEachRemaining(wrappedSink);//迭代wrappedSink.end();//Notification遍历结束}...}上面代码首先调用wrappedSink.begin()方法告诉Sink数据来了,然后然后调用spliterator.forEachRemaining()方法迭代数据(Spliterator是容器的迭代器,看),最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑很清楚。执行后的结果在哪里最后一个问题是管道上的所有操作都执行完后,用户需要的结果(如果有的话)在哪里?首先要说明的是,并不是所有的Stream结束操作都需要返回结果。有些操作只是为了利用它们的副作用(Side-effects)。比如使用Stream.forEach()方法打印出结果就是一个常见的副作用场景(其实除了打印,其他场景应该避免使用副作用),真正需要的end操作的结果在哪里返回结果是否存在?特别提示:副作用不可滥用。也许你认为在Stream.forEach()中收集元素是一个不错的选择,就像下面的代码一样,但不幸的是,这种使用的正确性和效率无法得到保证。因为Stream可能会并行执行。大多数使用副作用的地方都可以使用reduce操作更安全、更高效地完成。//错误的集合方法ArrayListresults=newArrayList<>();stream.filter(s->pattern.matcher(s).matches()).forEach(s->results.add(s));//不必要的使用副作用!//正确的收集方法Listresults=stream.filter(s->pattern.matcher(s).matches()).collect(Collectors.toList());//无副作用!回到pipeline执行结果的问题,需要返回的pipeline结果存在哪里呢?这应该在不同的情况下讨论。下表显示了返回结果的各种Stream结束操作。对于表中返回boolean或Optional的操作(Optional是一个存储值的容器),由于值有返回值,所以只需要将这个值记录在对应的Sink中,执行结束时返回即可。对于reduce操作,最终结果放在调用时用户指定的容器中(容器类型由收集器指定)。collect()、reduce()、max()和min()都是归约操作。虽然max()和min()也返回一个Optional,但底层其实是调用reduce()方法实现的。对于return是数组的情况,会把未查询的结果放在数组中。这当然是对的,但是在最终返回数组之前,结果实际上是存储在一个叫做Node.js的数据结构中的。节点是一个多叉树结构,元素存储在树的叶子中,一个叶子节点可以存储多个元素。这样做是为了方便并行执行。关于Node的具体结构,我们会在下一节探讨Stream是如何并行执行的时候给出详细的说明。结语本文详细介绍了Stream管道的组织和执行过程。学习本文将帮助您理解原理并编写正确的Stream代码,同时打消您对StreamAPI效率的顾虑。如您所见,StreamAPI的实现非常巧妙,即使我们使用外部迭代手动编写等效代码,也未必效率更高。