当前位置: 首页 > 科技观察

Spark的两个核心洗牌详解

时间:2023-03-11 21:11:28 科技观察

本文转载请联系五分钟学大数据公众号。在MapReduce框架中,Shuffle阶段是Map和Reduce之间的桥梁,Map阶段通过Shuffle过程将数据输出到Reduce阶段。由于Shuffle涉及到磁盘读写和网络I/O,所以Shuffle的性能直接影响到整个程序的性能。Spark也有Map阶段和Reduce阶段,所以Shuffle也会出现。SparkShuffleSparkShuffle分为两种:一种是Hash-basedShuffle;另一种是Sort-basedShuffle。先介绍一下它们的发展历程,有助于我们更好地理解Shuffle:在Spark1.1之前,Spark中只实现了一种Shuffle方法,即Hash-basedShuffle。Spark1.1引入了Sort-basedShuffle实现,Spark1.2之后,默认实现由Hash-basedShuffle改为Sort-basedShuffle,即使用的ShuffleManager由默认的hash改为sort。在Spark2.0中,不再使用HashShuffle方法。Spark之所以一开始就提供基于Hash的Shuffle实现机制,其主要目的之一就是避免不必要的排序。想想Hadoop中的MapReduce,排序是一个固定的步骤,有很多任务是不需要排序的,MapReduce也会对其进行排序,造成很多不必要的开销。在基于Hash的Shuffle实现中,每个Mapper阶段Task都会为每个Reduce阶段Task生成一个文件,通常是大量的文件(即对应M*R个中间文件,其中M代表Mapper阶段Task的数量,R代表Reduce阶段的Task数量)伴随着大量的随机磁盘I/O操作和大量的内存开销。为了缓解以上问题,在Spark0.8.1版本的Hash-basedShuffle实现中引入了ShuffleConsolidate机制(即文件合并机制),这是一种合并Shuffle生成的中间文件的处理机制映射器侧。通过配置属性spark.shuffie.consolidateFiles=true减少中间生成文件的数量。通过文件合并,可以修改中间文件的生成方式,使每个执行单元为Reduce阶段的每个Task生成一个文件。执行单元对应:每个Mapper端的Core数/每个Task分配的Core数(默认为1)。最后,文件数量可以从M*R改为E*C/T*R,其中E代表Executors数量,C代表可用Core数量,T代表Task分配的Core数量。Spark1.1版本引入SortShuffle:在Hash-basedShuffle的实现中,生成中间结果文件的数量将取决于Reduce阶段的Task数量,即Reduce端的并行度,因此文件仍然不可控,不能真正解决问题。为了更好的解决问题,在Spark1.1版本中引入了基于Sort的Shuffle实现,而在Spark1.2版本之后,默认的实现也由基于Hash的Shuffle改为了基于Sort的Shuffle实现,也就是说,使用ShuffleManager从默认哈希更改为排序。Sort-basedShuffle中,每个Mapper阶段Task不会为每个Reduce阶段Task生成单独的文件,而是将它们全部写入一个数据(Data)文件,同时生成一个索引(Index)文件,ReduceEach阶段中的任务可以通过索引文件获取相关数据。避免大文件的直接好处是减少了随机磁盘I/0和内存开销。最终生成的文件数量减少为2*M,其中M代表Mapper阶段的任务数量,Mapper阶段的每个任务生成2个文件(1个数据文件,1个索引文件),最终的数量为files有M个数据文件和M个索引文件。因此,最终的文件数量为2*M。从Spark1.4开始,Shuffle流程中也引入了基于Tungsten-Sort的Shuffie实现。通过Tungsten项目所做的优化,Spark在数据处理方面的性能可以有很大的提升。(Tungsten翻译成中文为钨丝)注:在某些特定的应用场景下,基于Hash的Shuffle机制的性能会超过基于Sort的Shuffle机制。一张图看懂SparkShuffle的迭代历史:SparkShuffle迭代历史为什么Spark最终放弃了HashShuffle,转而使用Sorted-BasedShuffle?我们可以从Spark最根本的优化和紧迫的问题中找到答案,使用HashShuffleSpark在Shuffle的时候会产生大量的文件。当数据量增大时,产生的文件量不可控,严重制约了Spark的性能和可扩展性,所以Spark必须解决这个问题,减少Mapper端ShuffleWriter产生的文件数量,让Spark从数百个集群的规模瞬间可以支持数千甚至数万个集群。但是使用Sorted-BasedShuffle就完美了吗?答案是不。Sorted-BasedShuffle也有缺点。缺点是它的排序特性。它强制在Mapper端先对数据进行排序,这导致了它的排序速度。有点慢。好在出现了Tungsten-SortShuffle,改进了排序算法,优化了排序速度。Tungsten-SortShuffle已经合并到Sorted-BasedShuffle中,Spark的引擎会自动识别程序是否需要Sorted-BasedShuffle或Tungsten-SortShuffle。下面详细分析各个Shuffle的底层执行原理:1.HashShuffle分析下面的讨论假设每个Executor有1个CPU核。1、HashShuffleManagershufflewritestage主要是在一个stage的计算完成后,对下一个stage执行shuffleoperators(比如reduceByKey),将各个task处理的数据按照key进行“划分”。所谓“分区”就是对同一个key执行hash算法,使同一个key写入同一个磁盘文件,每个磁盘文件只属于下游阶段的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲区,当内存缓冲区满后,再溢出到磁盘文件中。下一阶段有多少个任务,当前阶段为每个任务创建多少个磁盘文件。例如,如果下一阶段共有100个任务,则当前阶段的每个任务必须创建100个磁盘文件。如果当前阶段有50个任务,一共有10个Executor,每个Executor执行5个任务,那么每个Executor上总共会创建500个磁盘文件,所有Executor上都会创建5000个磁盘文件。可以看出,未经优化的shuffle写操作产生的磁盘文件数量极其惊人。随机读取阶段通常是一个阶段在开始时所做的。此时,本阶段的各个任务需要通过网络将上一阶段计算结果中所有相同的key从各个节点拉取到自己的节点,然后进行key聚合或连接等操作。在shufflewrite过程中,map任务会为下游阶段的每个reduce任务创建一个磁盘文件。因此,在shuffleread的过程中,每个reducetask只需要拉取属于自己磁盘文件之一的文件即可。shuffleread的拉取过程是边拉边聚合。每个shuffleread任务都会有自己的bufferbuffer,每次只能拉取与bufferbuffer相同大小的数据,然后通过内存中的一个Map进行聚合等操作。一批数据聚合完成后,拉取下一批数据,放入buffer缓冲区进行聚合。以此类推,直到最终拉取所有数据,得到最终结果。HashShuffleManager的工作原理如下图所示:未优化的HashShuffleManager的工作原理2.优化的HashShuffleManager为了优化HashShuffleManager,我们可以设置一个参数:spark.shuffle.consolidateFiles。此参数的默认值为false。设置为true开启优化机制,一般来说,如果我们使用HashShuffleManager,建议开启该选项。启用consolidate机制后,在shufflewrite过程中,任务不会为下游阶段的每个任务创建磁盘文件。这时候就会出现shuffleFileGroup的概念。每个shuffleFileGroup都会对应一批磁盘文件。磁盘文件数量与下游阶段的任务数量相同。一个Executor上有多少个CPU核,就可以并行执行多少个任务。第一批并行执行的每个任务都会创建一个shuffleFileGroup并将数据写入对应的磁盘文件。当Executor的cpu核执行完一批任务,再执行下一批任务时,下一批任务会复用已有的shuffleFileGroup,包括里面的磁盘文件,也就是说任务会把数据写入到一个现有的磁盘文件,而不是一个新的磁盘文件。因此,consolidate机制可以让不同的task复用同一批次的磁盘文件,使得多个task的磁盘文件在一定程度上得到有效的合并,从而大大减少磁盘文件的数量,提高shufflewrite的性能。假设第二阶段有100个任务,第一阶段有50个任务,那么总共还有10个Executor(Executor的CPU个数为1),每个Executor执行5个任务。那么在使用未优化的HashShuffleManager时,每个Executor会生成500个磁盘文件,所有Executor都会生成5000个磁盘文件。但是此时经过优化,每个Executor创建磁盘文件数的计算公式为:cpu核数*下一阶段任务数,即每个Executor只会创建100个磁盘文件这一次,AllExecutors只会创建1000个磁盘文件。该功能优势明显,但为什么Spark在基于HashShuffle的实现中没有将该功能设置为默认选项呢?官方说法是这个功能还不稳定。优化后的HashShuffleManager的工作原理如下图所示:优化后的HashShuffleManager的工作原理基于Hash的Shuffle机制的优缺点优点:可以省略不必要的排序开销。避免排序所需的内存开销。缺点:产生的文件太多,对文件系统造成压力。大量小文件的随机读写带来一定的磁盘开销。写入数据块所需的缓存空间也会相应增加,对内存造成压力。2.SortShuffle分析SortShuffleManager的运行机制主要分为三种:普通运行机制;bypass运行机制,当shuffleread任务数小于等于spark.shuffle.sort.bypassMergeThreshold参数的值(默认为200)时,将启用bypass机制;TungstenSort运行机制,启用该运行机制需要设置配置项spark.shuffle.manager=tungsten-sort。开启这个配置并不能保证这个运行机制会被采用(稍后解释)。1.正常运行机制在这种模式下,数据会先写入一个内存数据结构,可以根据不同的shuffle算子选择不同的数据结构。如果是聚合类型的shuffle算子比如reduceByKey,那么会选择Map数据结构,通过Map进行聚合的同时写入内存;如果是join等常见的shuffle算子,那么会选择Array数据结构,直接写入内存。然后,每条数据写入内存数据结构后,都会判断是否达到某个临界阈值。如果达到临界阈值,它会尝试将内存数据结构中的数据溢出到磁盘,然后清除内存数据结构。在溢出到磁盘文件之前,内存数据结构中已有的数据会根据key进行排序。排序后,数据将批量写入磁盘文件。默认批号为10000,也就是说排序后的数据会以10000条数据为批次写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream完成的。BufferedOutputStream是Java的缓冲输出流。首先将数据缓存在内存中,当内存缓存满后,再次写入磁盘文件,可以减少磁盘IO次数,提高性能。当一个任务将所有数据写入内存数据结构时,会发生多次磁盘溢出操作,会产生多个临时文件。最后,所有以前的临时磁盘文件将被合并。这就是合并过程。这时候,之前的所有临时磁盘文件中的数据都会被读出,然后依次写入最终的磁盘文件中。另外,由于一个任务只对应一个磁盘文件,也就是说该任务为下游阶段的任务准备的数据都在这个文件中,所以会单独写入一个索引文件,用来标识下游任务。文件中数据的起始偏移量和结束偏移量。因为SortShuffleManager有一个磁盘文件合并过程,所以大大减少了文件数量。比如第一阶段有50个任务,一共有10个Executor,每个Executor执行5个任务,第二阶段有100个任务。由于每个task最后只有一个磁盘文件,所以此时每个Executor上只有5个磁盘文件,所有Executor上只有50个磁盘文件。普通运行机制的SortShuffleManager工作原理如下图所示:普通运行机制的SortShuffleManager工作原理2.旁路运行机制当Reducer端的任务数量比较少时,基于HashShuffle的实现机制是明显比基于SortShuffle的实现机制要快,所以在SortShuffle实现机制的基础上,提供了一种Hash风格的fallback方案,即bypass运行机制。当Reducer端任务数量小于配置属性spark.shuffle.sort.bypassMergeThreshold设置的数量时,使用Hash风格的回退计划。旁路运行机制的触发条件如下:shufflemap任务数小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。不是聚合类的洗牌操作符。此时,每个任务都会为每个下游任务创建一个临时磁盘文件,根据key对数据进行哈希处理,然后根据key的哈希值将key写入对应的磁盘文件中。当然,写入磁盘文件时,也是先写入内存缓冲区,等缓冲区满了再溢出到磁盘文件。最后,所有临时磁盘文件也合并为一个磁盘文件,并创建一个索引文件。这个过程的写盘机制其实和未优化的HashShuffleManager如出一辙,因为创建的磁盘文件数量惊人,但最后会进行一次磁盘文件合并。因此,少量的finaldiskfiles也使得该机制在shuffleread性能上优于未优化的HashShuffleManager。该机制与普通SortShuffleManager运行机制的区别在于:第一,写盘机制不同;其次,不会进行排序。也就是说,启用这种机制最大的好处就是在shufflewrite过程中,不需要进行数据排序操作,省去了这部分性能开销。旁路运行机制SortShuffleManager工作原理如下图所示:旁路运行机制SortShuffleManager工作原理3.TungstenSortShuffle运行机制TungstenSort是对普通Sort的优化。TungstenSort会排序,但内容本身不会排序。而是通过内容序列化后的字节数组的指针(元数据)将数据的排序改为指针数组的排序,实现了序列化后的二进制数据的直接排序。由于是直接对二进制数据进行操作,所以里面没有序列化和反序列化的过程。内存消耗大大降低,相应的GC开销也会大大降低。Spark提供了配置属性来选择具体的Shuffle实现机制,但需要注意的是,虽然Spark默认启用了基于SortShuffle的实现机制,但实际上参考Shuffle框架内核部分,我们可以看到基于SortShuffle的implementationSortShuffleManager机制和基于TungstenSortShuffle的实现机制都使用了SortShuffleManager,内部使用的具体实现机制通过提供的两个方法来判断:当不是基于TungstenSort时,使用SortShuffleWriter.shouldBypassMergeSort方法判断是否需要回滚到Hash风格的Shuffle实现机制,当不满足该方法返回的条件时,再使用SortShuffleManager.canUseSerializedShuffle方法判断是否需要使用TungstenSortShuffle实现机制,当两种方法都使用时返回false,即它们是不满足满足相应条件时,自动采用正常运行机制。所以设置spark.shuffle.manager=tungsten-sort时,不保证会采用基于TungstenSort的Shuffle实现机制。要实现TungstenSortShuffle机制,需要满足以下条件:Shuffle依赖中没有聚合操作或不需要对输出进行排序。Shuffle的序列化器支持序列化值的重定位(目前只支持KryoSerializerSparkSQL框架自定义序列化器)。Shuffle时outputpartition的个数小于16777216,其实在使用过程中还有一些其他的限制。例如引入Page形式的内存管理模型后,单条内部记录的长度不能超过128MB(具体内存模型请参考PackedRecordPointer类)。另外,分区数量的限制也是这种内存模型造成的。因此,使用基于TungstenSortShuffle的实现机制的条件比较苛刻。参考资料:《Spark大数据商业实战三部曲》https://spark.apache.org/docs/2.0.0/programming-guide.html#shuffle-operationshttps://mp.weixin.qq.com/s/2yT4QGIc7XTI62RhpYEGjw