当前位置: 首页 > 科技观察

Java8中的Stream这么强大,你知道它的原理是什么吗?

时间:2023-03-17 16:08:47 科技观察

Java8API添加了一个名为Stream的新抽象,可让您以声明方式操作数据。Stream以类似于使用SQL语句从数据库中查询数据的直观方式为Java集合操作和表示提供了高级抽象。StreamAPI可以大大提高Java程序员的生产力,让程序员写出高效、干净、简洁的代码。本文将分析Stream的实现原理。Stream的组成和特点Stream(流)是一个由数据源中的元素组成的队列,支持聚合操作:元素是特定类型的对象,组成一个队列。Java中的Stream不像集合那样存储和管理元素,而是按需计算。数据源流的来源可以是集合、数组、I/O通道、生成器等类似于SQL语句的聚合操作。比如filter,map,reduce,find,match,sorted等和之前的Collection操作是不一样的。流操作也有两个基本特征:流水线:中间操作将返回流对象本身。这样就可以将多个操作串联成一个pipeline,就像fluent风格一样。这样做可以优化操作,例如惰性评估和短路内部迭代:过去,集合遍历是通过Iterator或For-Each显式地在集合外进行的。称为外部迭代。Stream提供了内部迭代的方法,通过访问者模式(Visitor)实现。与迭代器不同的是,Stream可以并行操作,而迭代器只能串行和命令式操作。顾名思义,在串行模式下遍历时,先读取每一项,然后再读取下一项。使用并行遍历时,数据会被分成多个段,每个段在不同的线程中处理,结果一起输出。Stream的并行运行依赖于Java7引入的Fork/Join框架(JSR166y)来拆分任务,加快处理过程。Java的并行API的演变基本如下:java.lang.Thread5.0in1.0-1.4Phasersinjava.util.concurrent6.0etc.Fork/Joinin7.0FrameworkLambdaStreamin8.0具有并行处理能力,处理过程将是分而治之,即把一个大任务分成多个小任务,也就是说每个任务是一个操作:Listnumbers=Arrays.asList(1,2,3,4,5,6,7,8,9);numbers.parallelStream().forEach(out::println);可以看到一行简单的代码就可以帮助我们实现集合中元素并行输出的功能,但是由于并行执行的顺序是不可控的,所以每次执行的结果不一定相同。如果一定要相同,可以使用forEachOrdered方法进行终止操作:Listnumbers=Arrays.asList(1,2,3,4,5,6,7,8,9);numbers.parallelStream().forEachOrdered(out::println);这里有个问题,如果需要对结果进行排序,是不是违背了我们并行执行的初衷?是的,在这种场景下,显然不需要使用并行流,直接使用串行流来执行,否则性能可能会更差,因为所有的并行结果最后都被强制排序了。OK,我们先介绍一下Stream接口的相关知识。BaseStream接口Stream的父接口是BaseStream,是所有流实现的顶层接口,定义如下:publicinterfaceBaseStream>extendsAutoCloseable{Iteratoriterator();Spliteratorspliterator();booleanisParallel();Ssequential();Sparallel();Sunordered();SonClose(RunnablecloseHandler);voidclose();}其中T为流中元素的类型,S为实现类BaseStream的,里面的元素也是T,S也是自己:SextendsBaseStream是不是有点晕?其实很好理解。我们看一下S在接口中的使用:比如sequential()和parallel()这两个方法,它们都返回S实例,也就是说它们分别支持对当前流进行序列化。或者并行操作并返回“改变”的流对象。如果是并行,则必然涉及对当前流的拆分,即将一个流拆分为多个子流,子流必须与父流的类型相同。子流可以继续拆分子流,一直拆分……也就是说这里的S是BaseStream的一个实现类,它也是一个流,比如Stream、IntStream、LongStream等。Stream接口我们看一下Stream的接口声明:publicinterfaceStreamextendsBaseStream>参考上面的解释,这里就不难理解了:就是Stream可以继续拆分成Stream,我们可以通过它的一些方法来确认:Streamfilter(Predicatepredicate);Streammap(Functionmapper);StreamflatMap(Function>mapper);Streamsorted();Streampeek(Consumeraction);Streamlimit(longmaxSize);Streamskip(longn);...这些是操作流的中间操作,它们的返回结果必须是流对象本身。关闭流操作BaseStream实现了AutoCloseable接口,即关闭流时会调用close()方法。同时,BaseStream中还给我们提供了onClose()方法:/***Returnsanequivalentstreamwithanadditionalclosehandler.Close*handlersarerunwhenthe{@link#close()}method*iscalledonthestream,andareexecutedintheordertheywere*added.Allclosehandlersarerun,evenifearlierclosehandlersthrow*exceptions.Ifanyclosehandlerthrowsanexception,thefirst*exceptionthrownwillberelayedtothecallerof{@codeclose()},将*任何剩余的异常添加到该异常被抑制的异常*(除非剩余的异常与*第一个异常相同,因为异常无法抑制自身。)可能*返回自身。**

这是一个intermetamateps**@paramcloseHandlerAtasktoexecutewhenthestreamisclosed*@returnsstreamwithahandlerthatisrunifthestreamisclosed*/SonClose(RunnablecloseHandler);当调用AutoCloseable的close()接口时,会触发stream对象的onClose()方法,但是有几点需要注意:onClose()方法会返回stream对象本身,也就是说,mu可以对该对象进行多次调用。如果调用多个onClose()方法,它们将按照调用顺序触发,但如果一个方法如果有异常,只会向上抛出第一个异常。前面的onClose()方法抛出的异常不会影响后面的onClose()方法的使用。如果多个onClose()方法抛出异常,则只显示第一个异常。其他异常的栈会被压缩,只显示部分信息。,但最终以最后一次方法调用的返回结果为准。参考parallel()方法的说明:返回一个等价的并行流。可能返回自身,因为流已经是并行的,或者因为底层流状态被修改为并行。所以多次调用同一个方法不会产生新的流,而是直接复用当前的流对象。在以下示例中,总和是根据对parallel()的最后一次调用并行计算的:stream.parallel().filter(...).sequential().map(...).parallel().sum();ParallelStream背后的人:ForkJoinPoolForkJoin框架是JDK7的一项新功能。与ThreadPoolExecutor一样,它也实现了Executor和ExecutorService接口。它使用一个“无限队列”来保存需要执行的任务,线程数通过构造函数传入。如果没有将所需的线程数传递给构造函数,则将设置当前计算机可用的CPU数。是默认的线程数。ForkJoinPool主要用于解决使用分而治之算法的问题,典型应用如快速排序算法。这里的重点是ForkJoinPool需要使用相对较少的线程来处理大量的任务。比如对1000万条数据进行排序,这个任务会分为两个500万条排序任务和一个针对这两组500万条数据的合并任务。以此类推,对500万条数据进行同样的切分过程,最后会设置一个阈值,指定当数据量达到极限时,这种切分过程将停止。例如,当元素个数小于10时,停止分裂,改用插入排序对它们进行排序。那么最后所有任务加起来大概200万+。问题的症结在于,对于一??个任务来说,只有在它的所有子任务都完成后才能执行。想象一下合并和排序的过程。所以在使用ThreadPoolExecutor的时候,就出现了分而治之的问题,因为ThreadPoolExecutor中的线程不能再往任务队列中添加一个任务,等待任务完成再继续执行。使用ForkJoinPool时,其中的线程可以创建一个新任务并挂起当前任务。这时线程可以从队列中选择一个子任务执行。那么使用ThreadPoolExecutor和ForkJoinPool在性能上有什么区别呢?首先,使用ForkJoinPool可以用有限的线程数完成很多有“父子关系”的任务,比如用4个线程完成200万以上的任务。使用ThreadPoolExecutor时,无法完成,因为ThreadPoolExecutor中的Thread不能选择先执行子任务。当需要完成200万个具有父子关系的任务时,也需要200万个线程。显然,这是不可行的。WorkStealing原理:-(1)每个工作线程都有自己的工作队列WorkQueue;-(2)这是一个双端队列dequeue,线程私有;-(3)ForkJoinTask中fork的子任务会被放入运行该任务的工作线程的队列头中,工作线程会按照后进先出的顺序处理工作队列中的任务,即入栈方式;-(4)为了最大化CPU的利用率,空闲线程会从队列中的其他线程开始“偷”任务执行;-(5)但从工作队列的尾部窃取任务,减少与队列所属线程的竞争;-(6)双端队列操作:push()/pop()只在其属主工作线程中调用,其他线程窃取任务时调用poll();-(7)当只剩下最后一个任务时,仍然会有竞争,通过CAS实现;从ForkJoinPool的角度看ParallelStreamJava8在ForkJoinPool中增加了一个通用的线程池,用于处理没有显式提交给任何线程池的任务。它是ForkJoinPool类型的静态元素,其默认线程数等于正在运行的计算机上的CPU数。当调用添加到Arrays类的新方法时,会发生自动并行化。例如,用于对数组进行排序的并行快速排序用于并行遍历数组中的元素。Java8新增的StreamAPI中也使用了自动并行化。例如,下面的代码用于遍历列表中的元素并执行所需的操作::setUserIdUserInfo);对列表元素的操作是并行执行的。forEach方法会为每个元素的计算操作创建一个任务,由上面提到的ForkJoinPool中的commonPool处理。当然,上面的并行计算逻辑也可以使用ThreadPoolExecutor来完成,但是从代码可读性和代码大小来说,使用ForkJoinPool显然更好。对于ForkJoinPool通用线程池的线程数,通常使用默认值即可,即运行时计算机的处理器数。您还可以通过设置系统属性来调整ForkJoinPool中的线程数:-Djava.util.concurrent.ForkJoinPool.common.parallelism=N(N是线程数)。值得注意的是,当前正在执行的线程也会被用来执行任务,所以最终线程数为N+1,1为当前主线程。这里有一个问题。如果在并行流的执行计算中使用I/O等阻塞操作,可能会导致一些问题:engines.add("http://www.google.com/?q=");engines.add("http://duckduckgo.com/?q=");engines.add("http://www.bing.com/search?q=");//getelementassoonasitisavailableOptionalresult=engines.stream().parallel().map((base)-{Stringurl=base+question;//openconnectionandfetchtheresultreturnWS.url(url).get();}).findAny();returnresult.get();}这个例子很典型,我们分析一下:这个并行流计算操作会被主线程和JVM默认的ForkJoinPool.commonPool()实现共享.map是一个阻塞方法,需要访问HTTP接口并得到它的响应,所以任何工作线程执行到这里都会阻塞等待结果。所以此时通过并行流在其他地方调用计算方法时,会受到这里阻塞等待方法的影响。ForkJoinPool的当前实现没有考虑补偿等待被阻塞等待新生成的线程的工作线程,因此最终ForkJoinPool.commonPool()中的线程将被免除并被阻塞等待。正如我们从上面例子的分析中了解到的,lambda的执行不是瞬时的,所有使用并行流的程序都可能成为阻塞程序的源头,程序的其他部分在执行期间将无法访问这些worker,这意味着当其他东西占用公共ForkJoinPool时,任何依赖并行流的程序都将变得不可预测并具有潜在危险。总结:在处理递归分治算法时考虑使用ForkJoinPool。仔细设置不再进行任务划分的阈值,这对性能有影响。Java8中的一些特性将使用ForkJoinPool中的通用线程池。在某些情况下,需要调整线程池中的默认线程数。Lambda应该尽量避免副作用,即避免基于堆状态的突变和任何IOlambda不应该相互干扰,即避免修改数据源(因为这可能会导致线程安全问题)避免访问状态在流操作生命周期中可能会发生变化并行流的性能并行流框架的性能受以下因素影响:数据大小:数据足够大,每个流水线的处理时间足够长,只有并行才有意义;源数据结构:每个流水线操作都是基于初始数据源,通常是一个集合,拆分不同的集合数据源会消耗一定的成本;装箱:处理基本类型比装箱类型快;core数:默认情况下,core数越多,底层fork/join线程池启动的线程越多;单元处理开销:流中每个元素花费的时间越长,并行操作带来的性能提升越明显;源数据结构分为以下三组:性能好的:ArrayList、array或IntStream。是)性能差:LinkedList(需要遍历链表,难分解成对半),Stream.iterate和BufferedReader.lines(未知长度,难分解)注:以下部分摘自:Thebehind-the-Streams的场景原理,顺便感谢作者BrianGoetz,写的这么清楚。NQ模型确定并行性是否会带来加速的最后两个因素是:可用数据量和对每个数据元素执行的计算量。在我们最初对并行分解的描述中,我们采用了拆分源的概念,直到一个片段足够小,以至于用顺序方法解决该片段上的问题会更有效。段的大小必须取决于要解决的问题,更准确地说,取决于每个元素完成的工作量。例如,计算字符串的长度比计算字符串的SHA-1散列涉及的工作少得多。每个元素完成的工作越多,“足够大以利用并行性”的阈值就越低。同样,您拥有的数据越多,在不违反“太小”阈值的情况下可以拆分的段就越多。一个简单但有用的并行性能模型是NQ模型,其中N是数据元素的数量,Q是对每个元素执行的工作量。N*Q的乘积越大,您获得并行加速的可能性就越大。对于小Q的问题,例如对数字求和,您通常可能希望看到N>10,000以实现加速;随着Q的增加,获得加速比所需的数据量减少。许多并行化的障碍(例如拆分成本、组合成本或遇到顺序敏感性)可以通过具有更高Q的操作来缓解。尽管拆分LinkedList特征的结果可能很糟糕,但仍然有可能获得并行加速只要因为Q足够大。遇到顺序遇到顺序是指源分发元素的顺序是否对计算至关重要。一些来源(例如基于散列的集合和地图)没有有意义的遭遇顺序。流标志ORDERED描述流是否具有有意义的相遇顺序。JDK集合的拆分器根据集合的规范设置此标志;一些中间操作可能会注入ORDERED(sorted())或清除它(unordered())。如果流没有遇到顺序,则大多数流操作必须遵守该顺序。对于顺序执行,遇到顺序是“自动保留的”,因为元素会按照遇到它们的顺序自然地处理。即使在并行执行中,许多操作(无状态中间操作和一些终止操作(如reduce()))尊重遇到顺序也不会产生任何实际成本。但是对于其他操作(有状态的中间操作、语义与遇到顺序相关的终止操作,例如findFirst()或forEachOrdered()),在并行执行中尊重遇到顺序的责任可能很重要。如果流有定义的遇到顺序,但该顺序对结果没有意义,您可以通过使用unordered()操作删除ORDERED标志来加速包含顺序敏感操作的管道的顺序执行。作为对遇到顺序敏感的操作示例,请考虑limit(),它以指定大小截断流。在顺序执行中实现limit()很简单:记录已看到的元素数量,然后丢弃所有元素。但是在并行执行中,实现limit()要复杂得多;您需要保留前N个元素。这个要求极大地限制了利用并行性的能力;如果输入被分成几个部分,那么在该部分之前的所有部分都完成之前,您不知道该部分的结果是否会包含在最终结果中。因此,实现通常会错误地选择不使用所有可用内核,或者缓存整个实验结果,直到达到目标长度。如果流不符合顺序,limit()操作可以自由选择任何N个元素,这使得执行效率更高。元素可以在已知时立即发送到下游,无需任何缓冲,线程之间唯一的协调是发送信号以确保不超过目标流长度。另一个不太常见的订单成本例子是排序。如果遇到顺序有意义,则sorted()操作实现稳定排序(相同元素出现在输出中的顺序与它们输入输入的顺序相同),而对于无序流,稳定性(有成本)是不需要的。distinct()有类似的情况:如果流有遇到顺序,那么对于多个相同的输入元素,distinct()必须发出第一个元素,而对于无序流,它可以发出任何元素——再次高效得多的并行可以获得实现。当您使用collect()进行聚合时,会出现类似的情况。如果对无序流执行collect(groupingBy())操作,则对应于任何键的元素必须按照它们在输入中出现的顺序提供给下游收集器。这个顺序一般对应用来说意义不大,没有顺序是没有意义的。在这些情况下,最好选择可以忽略遇到顺序并让所有线程直接收集到一个共享的并发数据结构(例如ConcurrentHashMap)的并发收集器(例如groupingByConcurrent()),而不是让每个线程都进入它的在合并中间地图之前拥有自己的中间地图(这可能很昂贵)。何时使用并行流?说了这么多,需要特别注意使用parallelStream的注意事项。它不是性能的灵丹妙药。相反,如果使用不当,会严重影响性能。我将在另一篇文章中单独讨论这个问题。