当前位置: 首页 > 后端技术 > Java

Java中的函数式编程

时间:2023-04-01 22:51:25 Java

在本系列的第一篇文章中,我们提到了函数式编程的一个优点是“易于并发编程”。Java作为一种多线程语言,通过Stream提供了并发编程的便利。题外话:严格来说,并发和并行是两个不同的概念。“并发”强调多个任务同时执行,通常涉及多个线程之间的上下文切换;“并行”强调的是将一个大任务分解成多个小任务后,再同时执行这些小任务,得到多个中间结果再聚合成一个最终结果。但是在多CPU和分布式时代,并发和并行的概念越来越接近了。至少在Java的Stream中,我们可以把并发和并行理解为同一个意思:基于多线程技术,将一个大任务拆分成多个小任务,分配给不同的线程执行,得到多个中间结果聚合成一个最终的结果。本文的示例代码可以从gitee获取:https://gitee.com/cnmemset/ja...Stream的并行编程并行编程是Stream的一个重要功能和特性。它的优点之一是可以通过并行流轻松实现并行编程,而不管数据源是否线程安全。Stream的并行编程,底层是基于ForkJoinPool技术实现的。ForkJoinPool是Java7引入的用于并行执行的任务框架。核心思想是将一个大任务拆分为多个小任务(即fork),然后将多个小任务的处理结果聚合为一个结果(即join)。此外,它还提供了基本的线程池功能,如设置最大并发线程数、关闭线程池等。在本系列的前几篇文章中,也时不时提到一些并行编程的知识点。本文对此做了比较系统的总结。并行流(parallelstream)Stream的并行操作是基于并行流(parallelstream)。生成并行流也很简单:通过Collection.parallelStream方法即可获取并行流。生成串行Stream后,可以通过BaseStream.parallel()方法将串行流(serialstream)转换为并行流。当然,我们也可以通过BaseStream.sequential()方法将并行流转换成串行流。BaseStream.isParallel()方法可用于判断流是否为并行流。不管数据源是否线程安全(如ArrayList、HashSet等不支持多线程),我们都可以使用parallelStream轻松实现并行编程,无需额外的线程同步操作,这是parallelStream最大的优势。顺序遇到顺序是指元素在Stream中出现的顺序。如果觉得遇到顺序过于抽象,可以简单理解为数据源的元素顺序。本节所说的有序或无序特指相遇顺序。一个Stream是否有遇到顺序的顺序取决于它的数据源(datasource)和中间操作(intermediateoperations)。例如,List或Array的Steam是有序的,但HashSet的Steam是无序的。中间操作Stream.sorted可以将无序Stream转换为有序Stream;中间操作Stream.unordered将有序Stream转换为无序Stream。有趣的是,一些终端操作忽略了遇到的顺序。这意味着什么?以最常见的Stream.forEach为例。并行执行时,即使数据源是List,forEach方法处理元素的顺序也是乱序的。要保证处理顺序,请使用Stream.forEachOrdered方法。示例代码:publicstaticvoidforEachExample(){ArrayListlist=newArrayList<>(Arrays.asList(1,2,3,4,5));System.out.println("===forEach====");//在并行流中,forEach方法是list.parallelStream().forEach(i->{System.out.println(i+":thread-"+Thread.currentThread)不管遇到orderoftheStream().getName());});System.out.println("===forEachOrdered====");//在并行流中,forEachOrdered方法可以保持遇到orderlist.parallelStream().forEachOrdered(i->{System.out.println(i+":thread-"+Thread.currentThread().getName());});上面代码的输出类似:===forEach====3:thread-main5:thread-ForkJoinPool.commonPool-worker-21:thread-main4:thread-ForkJoinPool.commonPool-worker-32:thread-ForkJoinPool.commonPool-worker-1===forEachOrdered====1:thread-ForkJoinPool.commonPool-worker-42:thread-ForkJoinPool.commonPool-worker-13:thread-ForkJoinPool.commonPool-worker-14:thread-ForkJoinPool。commonPool-worker-15:thread-ForkJoinPool.commonPool-worker-1可以看出在并行执行的时候,forEach忽略了Stream的相遇顺序,虽然forEachOrdered也是在多线程环境下执行的,但是依然可以保证Stream的相遇顺序。在Stream并行编程中,理解遇到顺序非常重要。因为对于大多数Stream操作来说,即使是并行执行,如果Stream是有序的,那么操作之后得到的Stream也会保持有序。例如,对于数据源为List[1,2,3]的有序Stream,执行map(x->x*x)操作后,结果一定是[1,4,9]。对于遇到顺序的顺序和无序,示例代码如下:publicstaticvoidunorderedExample(){//我们用TreeMap做实验,由于ArrayList的特殊性,很难表现出无序的特点//TreeSet中的元素是从小到大排序的,即[-7,-3,1,5,12]TreeSetset=newTreeSet<>(Arrays.asList(1,12,5,-7,-3));//按照相遇顺序打印集合,输出为:-7,-3,1,5,12System.out.println("集合的相遇顺序:");set.stream().forEachOrdered(s->System.out.print(s+""));System.out.println();//TreeSet是有序的,所以从TreeSet出来的Stream也是有序的//当Stream是有序的,执行操作limit(2)不管是串行还是并行,也不管执行多少次结果都是前两位[-7,-3]System.out.println("limitordered流:");set.stream()。parallel().limit(2).forEachOrdered(s->System.out.print(s+""));System.out.println();//我们使用unordered方法将Stream转换为无序。//当Stream无序时,并行执行limit(2)操作,会发现执行多次,输出的个数不一样(不确定)System.out.println("LimitunorderedStream:");System.out.print("第一次:");set.stream().unordered().parallel().limit(2).forEachOrdered(s->System.out.print(s+""));System.out.println();System.out.print("第二次:");set.stream().unordered().parallel().limit(2).forEachOrdered(s->System.out.print(s+""));System.out.println();}上面示例代码的输出类似:集合的遇到顺序:-7-31512LimitorderedStream:-7-3LimitunorderedStream:第一次:-35第二次:512你可以仔细体会。欢迎加入群讨论!!!纯函数操作回顾本系列文章的第一篇,纯函数(purelyfunction)是指不改变函数外的其他状态,换句话说,不改变定义在函数外的变量的值。纯函数不会导致“副作用”。在Stream的并行编程中,纯函数操作是非常关键的,否则还是需要考虑线程安全的问题。例如:publicstaticvoidunsafeParallelOperation(){Listprovinces=Arrays.asList("Guangdong","Jiangsu","Guangxi","Jiangxi","Shandong");//“副作用”引起的线程不是安全问题ArrayListresults=newArrayList<>();provinces.parallelStream()//过滤掉G开头的省份。filter(s->!s.startsWith("G"))//表示在lambda在公式中修改了results的值,//说明“s->results.add(s)”不是纯函数,//带来了不必要的“副作用”,//并行执行时,会引起线程不安全的问题。.forEach(s->results.add(s));System.out.println(results);}上面的示例代码存在线程不安全的问题——多个线程会同时修改ArrayList类型的结果,我们需要将变量锁定。正确的做法是:publicstaticvoidsafeParallelOperation(){Listprovinces=Arrays.asList("广东","江苏","广西","江西","山东");Listresults=provinces.parallelStream()//过滤掉以G开头的省份.filter(s->!s.startsWith("G"))//没有“副作用”.collect(Collectors.toList());System.out.println(results);}通过内置的Collectors.toList()方法,没有“副作用”,所以不需要考虑线程安全问题。Collectors和ConcurrentMap回想一下,我们在介绍Stream规范方法Stream.collect(Collector)时,提到了一个需求场景:按部门对员工进行分组。并行执行的实现代码是类似的:(Employee::getDepartment));System.out.println(map);}以上代码虽然可以实现功能,但是性能可能不尽如人意,因为并行执行时,需要将多个中间结果聚合成最终结果,但是,合并两个Map时,性能损失可能非常大(比如HashMap,底层是数组+红黑树实现的,合并的复杂度不低)。聪明的Java程序员自然会想到:如果并行执行得到的中间结果和最终结果使用同一个Map实例,那么就没有必要合并两个Map。当然,由于并行执行涉及到多线程,因此,ThisMap实例需要是线程安全的。典型的线程安全的Map,当然首选ConcurrentHashMap。这是Collectors工具类中ConcurrentMap相关方法的实现原理,主要包括:toConcurrentMap系列方法groupingByConcurrent系列方法,但是使用ConcurrentHashMap有个缺点:不能保证Stream的遇到顺序,所以只有当你确定元素的顺序不影响最终得到结果时,使用ConcurrentMap相关的方法。最后还需要注意的是,只有在并行编程中,才应该考虑使用toConcurrentMap或者groupingByConcurrent方法,否则会因为不必要的线程同步操作而影响性能。协议操作的注意事项在介绍协议操作的这一系列文章中,提到了很多并行编程的注意事项。本节对其进行总结,供您参考。reduce(T,BinaryOperator)reduce(T,BinaryOperator)的方法签名是:Treduce(Tidentity,BinaryOperatoraccumulator);其中T是Stream的通用类型。参数identity是reduce操作的初始值。参数累加器需要关联。参数累加器定义的函数必须满足结合律,否则在一些未定义或并行的场景下会导致结果不正确。另外,如果是并行执行,对参数identity还有一个要求:对于任意值t,必须满足accumulator.apply(identity,t)==t。否则,将导致错误的结果。publicstaticvoidreduceStream2(){Listlist=Arrays.asList(1,3,5,7,9);//这是正确的范式:因为数字0是累加操作的标识。sum=list.parallelStream().reduce(0,(x,y)->x+y);//输出为0+1+3+5+7+9=25System.out.println(sum);//这是一个错误的范式:因为数字5不是累加操作的恒等式。sum=list.parallelStream().reduce(5,(x,y)->x+y);//本意是输出5+1+3+5+7+9=30,但实际上会输出一个大于30的数System.out.println(sum);}reduce(U,BiFunction,BinaryOperator)具体方法签名为:Ureduce(Uidentity,BiFunction累加器,BinaryOperator组合器);其中U是返回值的类型,T是Stream的泛型类型。参数identity是reduce操作的初始值。参数accumulator是对Stream中单个元素的合并操作,相当于函数Uapply(Uu,Tt)。参数组合器是将并行执行得到的多个中间结果组合起来的操作,相当于函数Uapply(Uu1,Uu2)。在并行编程中,对这三个参数有一些特殊的要求:参数combiner必须满足associativeparameteridentity,对于任意值u,必须满足combiner.apply(identity,u)==uparameteraccumulator和combiner必须兼容,即对于任何值u和t,必须满足:combiner.apply(u,accumulator.apply(identity,t))==accumulator.apply(u,t)collect(Supplier,BiConsumer,BiConsumer)的collect方法的签名是:Rcollect(Suppliersupplier,BiConsumeraccumulator,BiConsumercombiner);其中R是返回值的类型,通常是容器类(例如Collection或Map)。T是Stream中的元素类型。参数供应商是用于创建容器实例的函数。参数accumulator是一个函数,它将Stream中的一个元素合并到容器中。参数combiner是将两个容器合并为一个容器的函数,仅在并行执行时使用。在并行执行的场景下,我们还有一些额外的要求:combiner函数满足结合律要求combiner和accumulator相容(compatible),即对于任意的r和t,combiner.accept(r,accumulator.accept(supplier.get(),t))==accumulator.accept(r,t)结论Stream提供了一个非常方便的并行编程API,但是它仍然存在很多问题,非常容易踩坑。其中,最受诟病的就是它的不可控性。因为ParallelStream底层是基于ForkJoinPool的,而ForkJoinPool的工作线程数是在虚拟机启动的时候指定的,如果Stream并行执行的任务数过多或者耗时过多,就会甚至影响应用程序中使用ForkJoinPool的其他功能。通常,除非您非常清楚自己在做什么,否则不要使用Stream的并行编程API。相反,我们可以直接使用Java中的多线程技术(比如线程池)来处理。