当前位置: 首页 > 科技观察

如何加快Java中大型集合的处理

时间:2023-03-23 12:15:32 科技观察

如何加速Java处理大型集合在此基础上,介绍了三种不同的改进算法,并通过比较,给出了一种可以带来更优越性能的方法。与许多其他编程语言一样,Java有一组数据结构对象,可以用来表示一些单个单元,以及它可以执行的一组操作。从处理大量数据的计算程序的角度来看,其典型操作涉及各种集合,例如对每个对象的转换。在本文中,我们将借用ETL(Extraction,TransformationandLoading)的基本概念,将提取/捕获的数据从其原始形式转换为指定形式,以便存储到另一个数据库中。当然,我们会在这里描述数据库元素的转换和抽象操作的概念,以便您更好地理解集合处理的本质。一、基础知识从Java1.2开始,我们主要依赖java.util.Collection根接口作为集合层次结构。在Java7发布之前,要显着提高大型集合的处理性能,唯一的方法就是并行化操作。然而,随着Java8的出现,新的java.util.stream包提供了一个StreamAPI,它支持对元素流进行函数式操作。StreamAPI集成到CollectionsAPI中以对集合执行批处理操作,例如顺序或并行map-reduce转换。从那时起,Java提供了一种本地方法来尝试提高应用于集合的转换操作的并行化性能。之所以称为“尝试”策略,是因为它只是简单地使用并行流操作,并不能保证会有更好的性能。毕竟,其他潜在因素也可能发挥作用。尽管如此,并行流还是为寻求提高处理性能提供了一个想法和起点。接下来,我将通过对大型Java集合的简单转换操作,从性能上比较原生顺序处理和并行处理的优劣,以及基于其他算法的三种并行流策略。2.转换操作对于转换操作,我们定义了一个函数式接口。如下代码片段所示,只需要将一个R类型的元素应用到转换操作中,就可以返回一个S类型的转换对象。}此操作旨在将作为参数提供的字符串转换为大写字母。下面两段代码分别创建了ElementConverter接口的两个实现。其中一个是将字符串转换为大写字符串:JavapublicclassUpperCaseConverterimplementsElementConverter{@OverridepublicStringapply(Stringparam){returnparam.toUpperCase();}}另一个是对集合执行相同的操作:JavapublicclassCollectionUpperCaseConverterimplementsElementConverter,List>{@OverridepublicListapply(Listparam){returnparam.stream().map(String::toUpperCase)。collect(Collectors.toList());}}除了上面的辅助方法,我们还为每一个并行处理策略写了一个专门的方法来实现AsynchronousExecutor类。详情请参考以下代码段:this.numThreads=threads;this.executor=Executors.newFixedThreadPool(this.numThreads);this.outputList=newArrayList<>();}//每个并行处理策略的方法publicvoidshutdown(){this.executor.shutdown();try{this.executor.awaitTermination(MINUTES_WAITING_THREADS,TimeUnit.MINUTES);}catch(InterruptedExceptione){Thread.currentThread().interrupt();thrownewRuntimeException(e);}}3.第一个子列表分区用于增强大集合转换操作的并行策略是基于java.util.AbstractList的扩展。简单来说,CollectionPartitioner会将源集合拆分成多个子列表。每个子列表的大小是根据处理期间使用的线程数计算的。首先,我们通过将获取源集合大小除以线程数来计算子列表块大小。然后,根据索引对(frommindex,toIndex)从源集合中复制每个子列表,完成值的同步计算。对应的代码段如下:JavafromIndex=threadid+chunksizetoIndex=MIN(fromIndex+chunksize,sourcecollectionsize)JavapublicfinalclassCollectionPartitionerextendsAbstractList>{privatefinalListlist;privatefinalintchunkSize;publicCollectionPartitioner(Listlist,intnumThreads){this.list=list;this.chunkSize=(list.size()%numThreads==0)?(list.size()/numThreads):(list.size()/numThreads)+1;}@OverridepublicsynchronizedListget(intindex){varfromIndex=index*chunkSize;vartoIndex=Math.min(fromIndex+chunkSize,list.size());if(fromIndex>toIndex){returnCollections.emptyList();//索引超出允许的区间}returnthis.list.subList(fromIndex,toIndex);}@Overridepublicintsize(){return(int)Math.ceil((double)list.size()/(double)chunkSize);}}一旦每个线程都将转换操作应用于其各自子列表中的所有对象,它必须同步修改后的对象,并将其添加到输出列表中。下面的代码片段显示这些步骤是由AsynchronousExecutor类的特定方法执行的。JavapublicclassAsynchronousExecutor{publicvoidprocessSublistPartition(ListinputList,ElementConverter,List>converter){varpartitioner=newCollectionPartitioner(inputList,numThreads);IntStream.range(0,numThreads).forEach(t->this.executor.execute(()->{varthOutput=converter.apply(partitioner.get(t));if(Objects.nonNull(thOutput)&&!thOutput.isEmpty()){synchronized(this.outputList){this.outputList.addAll(thOutput);}}}));}}4.浅分区第二种并行处理策略使用了浅拷贝的概念。实际上,每个进程中涉及的线程并不接收从源集合中复制的子列表。相反,每个线程使用子列表分区策略的相同值计算自己的索引对(fromIndex、toIndex),并直接在源集合上操作。但是,这一切都是建立在无法修改源集合的前提下的。即线程根据自己对源集合的分片读取各种对象,将新变换的对象存储在与原集合大小相同的新集合中。值得注意的是,该策略在转换操作过程中没有任何同步执行点。换句话说,所有线程都完全独立地执行它们的任务。当然,我们至少可以使用两种不同的方法来组装要输出的集合。5.List-basedshallowpartitioning这种方法在处理集合之前创建一个由各种默认元素组成的新列表。各种线程可以访问新列表中不相交的切片,由索引对(frommindex,toIndex)分隔。它们存储从相应切片生成的源集合中读取的每个新对象。这种方法使用了一个新的专用类——AsynchronousExecutor。请参考以下代码片段:JavapublicclassAsynchronousExecutor{publicvoidprocessShallowPartitionList(ListinputList,ElementConverterconverter){varchunkSize=(inputList.size()%this.numThreads==0)?(inputList.size()/this.numThreads):(inputList.size()/this.numThreads)+1;this.outputList=newArrayList<>(Collections.nCopies(inputList.size(),null));IntStream.range(0,numThreads).forEach(t->this.executor.execute(()->{varfromIndex=t*chunkSize;vartoIndex=Math.min(fromIndex+chunkSize,inputList.size());如果(fromIndex>toIndex){fromIndex=toIndex;}IntStream.range(fromIndex,toIndex).forEach(i->this.outputList.set(i,converter.apply(inputList.get(i))));}));}}6.Array-basedshallowpartitioning这种方法和前面的方法的区别在于每个线程使用数组而不是列表来存储转换后的新对象。毕竟,数组在线程完成操作后变成了输出列表。同样,将根据此策略将新方法添加到AsynchronousExecutor类。请参考以下代码片段:JavapublicclassAsynchronousExecutor{publicvoidprocessShallowPartitionArray(ListinputList,ElementConverterconverter)varchunkSize=(inputList.size()%this.numThreads==0)?(inputList.size()/this.numThreads):(inputList.size()/this.numThreads)+1;Object[]outputArr=newObject[inputList.size()];IntStream.range(0,numThreads)。forEach(t->this.executor.execute(()->{varfromIndex=t*chunkSize;vartoIndex=Math.min(fromIndex+chunkSize,inputList.size());if(fromIndex>toIndex){fromIndex=toIndex;}IntStream.range(fromIndex,toIndex).forEach(i->outputArr[i]=converter.apply(inputList.get(i)));}));this.shutdown();this.outputList=(List)Arrays.asList(outputArr);}}7.策略比较我们规定每个策略的CPU时间是通过取5次执行的平均值来计算的。并且每次执行都会生成100万和1000万个随机字符串的对象集合。以上代码在Ubuntu20.04LTS64位操作系统上运行。该系统的主机有12GBRAM和3.40GHzIntelXeonE3-1240V3CPU。CPU是4核双线程。结果如下图所示:我们可以看到第一行结果是原生序列流达到的最高CPU时间。事实上,它已被添加到建立初始性能参数的测试中。然后,我们干脆将策略改为原生并行流,100万对象的收集提升了约34.4%,1000万对象的收集也提升了44%。下面将以原生并行流策略的性能作为其他三种策略的参考。如上图所示,对于100万个对象的集合,我们没有观察到基于列表的浅分区策略在CPU时间上的减少(只有大约7%的小改进),而子列表分区的性能战略更糟糕。那么,重点是基于数组的浅分区。它大大减少了大约35.4%的CPU时间。另一方面,对于1000万个对象的集合,所有三种策略都优于并行流时间。其中,子列表分区实现了最好的性能提升,减少了约20.5%的CPU执行时间。当然,基于数组的浅分区策略也有类似的性能提升,将CPU时间增加了近20%。由于基于数组的浅分区策略在两种集合大小下都表现出了良好的性能,因此我们有必要分析一下它的“加速比”。这里的“加速比”是根据T(1)/T(p)的比值计算出来的。其中T表示运行p个线程的程序所使用的CPU时间。而T(1)对应的就是我们需要顺序执行程序的时间。下面是我针对此策略的线程数与加速比的图表。由于以上所有测试都是在4核2线程主机上进行的,我们预测这种策略的加速在8线程上会更加明显。但从上图可以看出,该算法的峰值加速比为4.4倍。同样,包含1000万个对象的集合实现了非常相似的加速。在这里,我不必重新绘制图表。这意味着该策略不会随着使用的线程数线性增加其整体性能。8.总结虽然原生顺序流的使用为我们提供了一个可靠的初始参考来加速大型集合的处理。但是尝试作为替代方案的并行化策略可以获得更好的性能。上面介绍的三种不同的算法可以为你带来更好的并行流性能。您可以在GitHub存储库中获取其完整源代码。它是一个Maven项目,对应的具体模块是dzone-async-exec。原文链接:https://dzone.com/articles/speeding-up-large-collections-processing-in-java管控内外部资源和风险,重点传播网络和信息安全知识和经验;持续以博文、专题、翻译等形式分享前沿技术和新知识;经常开展信息安全线上线下培训和教学。

最新推荐
猜你喜欢