1.概述本文将首先介绍SparkAQESkewedJoin的基本原理以及字节跳动在实践中使用AQESkewedJoin遇到的一些问题;针对问题所做的相关优化和功能增强,以及字节跳动相关优化带来的收益;另外,我们会分享使用SkewedJoin的心得。2.背景首先简单介绍一下SparkAQESkewedJoin。SparkAdaptiveQueryExecution,简称SparkAQE,大意是动态优化和修改stage的物理执行计划。利用上游阶段执行后的统计信息(主要是数据量和记录数)来优化下游阶段的物理执行计划。SparkAQE在stage提交执行前,可以根据上游stage所有MapTasks的统计信息,计算出下游各个ReduceTasks的shuffleinput,从而SparkAQE可以自动发现有数据倾斜的Join并进行优化处理。这个函数就是SparkAQESkewedJoin。比如A表到B表的innerjoin,A表的第0个分区(A0)是一个偏斜分区,正常情况下A0会和B表的第0个分区(B0)join,因为A0是偏斜的这时,task0就会变成一个长尾任务。SkewedJoin在执行AJoinB之前,通过upstream阶段的统计,发现partitionA0明显高于平均值数倍,即判断AJoinB发生了数据倾斜,倾斜的分区是分区A0。SparkAQE会将A0的数据拆分成N份,使用N个任务来处理分区。每个任务只读取几个MapTasks的shuffle输出文件。如下图所示,A0-0只读取了Stage0#MapTask0中属于A0的数据。然后这N个task都读取B表分区0的数据做join。这N个任务的执行结果相当于A表中A0joinB0的结果,不难看出在这样的处理中,B表的partition0会被读取N次。虽然这增加了一定的额外成本,但是通过N个任务处理倾斜数据的收益还是大于这个成本的。Spark从3.0版本开始支持AQESkewedJoin函数,但是我们在实践中发现了一些问题。不准确的统计数据会导致Spark无法识别数据倾斜。不均匀的分割导致不理想的优化结果。不支持同字段连续join等复杂场景。我将在[优化增强]中详细介绍这些问题以及我们的优化和解决方案。3.优化增强3.1提高识别数据倾斜的能力SparkAQE处理数据倾斜的原理不难弄清楚。SparkAQE识别偏斜和分割数据偏斜的功能依赖于上游阶段的统计数据。统计数据越准确,对偏斜的识别就越准确容量和处理能力越高,直观的表现就是偏斜数据被分割的很均匀,分割数据的大小和中位数几乎一样,这最大限度地减少了长尾任务的影响。MapStage执行后,每个MapTask都会生成一个统计结果MapStatus,发送给Driver。MapStatus维护了一个Array[Long],记录了MapTask中下游各个ReduceTask的数据大小。Driver收集到所有MapTask的MapStatus后,可以计算出每个ReduceTask的输入数据量和属于每个上游MapTask的数据大小。SparkAQE可以根据每个ReduceTask的数据大小判断数据倾斜,并根据上游MapTask的统计信息,对ReduceTask进行合理拆分,尽可能保证拆分的均匀性。如下图所示,ReduceTask0的ShuffleRead(shuffle过程中读取的数据量)为200,明显大于ReduceTask1和ReduceTask2的100,出现数据倾斜。我们可以将ReduceTask0分成两部分。ReduceTask0-0读取MapTask0和MapTask1的数据,ReduceTask0-1读取MapTask2和MapTask3的数据。两个拆分任务的ShuffleRead都是100,我们可以看出统计信息大小的空间复杂度为O(M*R)。对于大任务,会占用大量的Driver内存,所以Spark原生对其进行了限制。对于MapTask,当下游的ReduceTasks数量大于某个阈值(spark.shuffle.minNumPartitionsToHighlyCompress,默认2000)时,会压缩MapStatus,所有小于spark.shuffle.accurateBlockThreshold(默认100M)的值都会被替换按平均值。比如下图是一个SkewedJoin没有生效的job。从运行指标来看,ShuffleRead存在严重的skew,符合SkewedJoin生效的场景,但实际运行时并没有生效。通过读取日志可以看出,SparkAQE在运行时,获取到的join两侧shufflepartitions的中值和最大值是相同的,所以没有识别出偏斜。这是由于压缩后的MapStatus统计数据不准确造成的。在实践中,我们会遇到很多由于统计数据不准确而无法识别倾斜的大型作业。然而,当我们试图增加这个阈值时,一些大型作业由于Driver内存使用量的增加而失败。为了解决这个问题,我们做了如下优化:Driver收到详细的MapStatus后,先用数据更新每个ReduceTask的累计输入数据,然后压缩MapStatus,使其不至于占用过多内存。这时候压缩后的MapStatus虽然不能让我们获取到ReduceTask准确的上游分布,但是可以获取到ReduceTask输入数据的准确总大小,这样我们就可以准确的识别出偏斜的ReduceTask。上面的优化增加了一个MapStatus的解压操作,MapStatus的解压是一个比较耗CPU的操作。对于大型作业,DriverCPU可能已满,无法处理Executor心跳,导致作业失败。对此,我们通过缓存来保证Driver端在消费MapStatus时,每个MapStatus只会解压一次,大大减少了优化带来的开销。通过以上优化,我们成功将线上默认阈值从2000调整为5000,确保线上96.6%的Spark作业能够准确识别数据倾斜(如果存在)。3.2提高切片数据的均匀性由于HighlyCompressMapStatus将所有低于spark.shuffle.accurateBlockThreshold的值都用平均值填充,每个ReduceTask通过压缩后的MapStatus累加计算得到的总数据量和数据分布与实际相差甚远大的。举个简单的例子:我们得到ReduceTask0实际总数据为1G,中位数为100M,所以我们的期望是将ReduceTask0分成10份,每份100M。此时上游MapStage共有100个MapTask。除了MapTask0中属于ReduceTask0的数据为100M外,其余99个MapStaks的数据均为10M。我们压缩完所有的MapStatus后,AQE得到的ReduceTask0的上游分布是MapTask0有100M(因为数据量大,所以保留),其他99个MapTask的数据都是1M(压缩时取平均值)。此时SparkAQE按照期望值100M进行分裂,只分裂为两个ReduceTasks:ReduceTask0-0(读取MapTask0)和ReduceTask0-1(读取剩下的99个MapTasks)。基于此,我们改进的方法是利用精确的ReduceTask数据量推导出每个MapperTask对应的数据量,从而得到尽可能准确的数据分布。同样是刚才的例子,我们知道ReduceTask0的实际总数据是1G,MapStatus压缩的阈值为100M,那么可以肯定的是MapTask0在ReduceTask0上的100M数据是准确保留的(因为大于或等于阈值),而其他99个MapTask数据不准确。此时,AQE将不会使用压缩后的数据。而其他99个MapTasks中属于ReduceTask0的数据通过1G的总数据倒过来就是10M。虽然也是有误差的平均值,但与压缩后的数据相比,准确的总量反演得到的平均值会更加准确。这时候如果Spark按照100M的预期值进行划分,会被划分为10个ReduceTasks,符合我们的预期。在实际应用中,使用新的方案,AQESkewedJoin可以更均匀地拆分和倾斜数据,优化效果得到显着提升。下图是一个歪斜处理效??果不理想的作业。SkewedJoin生效后,StageShuffleReadSize的中值和最大值分别为4M和9.9G。经过我们的优化,该阶段的ShuffleReadSize的中值和最大值分别为149M和1427M,倾斜分区的划分更加均匀,该阶段的运行时间也从原来的2h缩短为20m。3.3支持更多场景场景一:JoinWithAggOrWin下图为示例。Stage10虽然只有一个SortMergeJoin,但是join的一侧并不是Sort+Exchange的组合,而是一个Aggregate算子或者Window算子,所以社区没有实现。范围内。场景二:MultipleSkewedJoin在用户的业务逻辑中,经常会出现这样的场景:一个表的主键需要连续join多个表。这种场景体现在Spark的具体执行上,即continuousjoin存在于同一个Stage中。如下图,Stage21中有多个连续的SortMergeJoin,这个场景社区无法优化。场景三:JoinWithUnionStage中有一个Union算子,Union的children中有SMJ。此外,我们还支持ShuffleHashJoin、BucketJoin、MultipleJoinWithAggOrWin等更多场景。4、字节的实践上面介绍的SparkAQESkewedJoin的LAS优化功能,在字节跳动已经使用了一年左右。截至2022年8月,平均每天优化覆盖18000+个Spark作业,优化命中作业平均性能提升35%,优化后的Spark作业中有30%属于LAS自研支持的场景.您可以通过火山引擎激活LAS服务,体验这些优化功能。5.使用指南5.1AQESkewedJoin不支持哪些场景AQESkewedJoin函数不能处理所有数据倾斜的join,这是由它的实现逻辑决定的。首先,如果skewedpartition中的大部分数据来自同一个upstreamMapper,AQESkewedJoin无法处理,因为Spark不支持ReduceTask只读取upstreamMapper一个block的部分数据。其次,如果指定requiredChildDistribution的Join的偏斜侧有Agg或Window等算子,那么SkewedJoin优化无法处理,因为拆分partition会破坏RDD的outputPartitioning,导致不再满足requiredChildDistribution.第三,对于Outer/SemiJoin,AQESkewedJoin无法处理非Outer/Semi侧的数据倾斜。比如LeftOuterJoin,SkewedJoin无法处理右边的数据倾斜。第四,AQE无法处理倾斜的BroadcastHashJoin。5.2AQESkewedJoin优化效果不明显时的措施如果遇到满足应用场景但SkewedJoin不生效或者skew处理效果不理想的情况,有以下优化方法:增加spark.shuffle.minNumPartitionsToHighlyCompress,并保证该值大于等于shuffleconcurrency(当AQE开启时为spark.sql.adaptive.coalescePartitions.initialPartitionNum)。降低spark.shuffle.accurateBlockThreshold,比如4M。但是需要注意的是,这样会增加Driver的内存消耗,需要同时增加Driver的CPU和内存。降低spark.sql.adaptive.skewJoin.skewedPartitionFactor以降低定义偏斜的阈值。6.总结本文首先简单介绍了SparkAQE的基本思想和SkewedJoin函数的原理,然后提出了我们在应用SkewedJoin过程中遇到的一些问题。针对这些问题,我们引入了AQESkewedJoin的优化和增强——提高统计的准确性;提高倾斜数据分割的均匀性;支持更多场景。接下来本文介绍字节跳动AQESkewedJoin的使用,包括覆盖作业的日均优化和优化效果。其中,优化后的Spark作业中有30%属于字节跳动支持的场景。最后,我们分享了我们关于AQESkewedJoin的用户指南:AQESkewedJoin不支持哪些场景;AQESkewedJoin失效时可以采取什么措施。7.附录A:本文涉及的AQESkewedJoin优化相关参数。配置参数配置名称默认值参数含义spark.shuffle.minNumPartitionsToHighlyCompress2000决定Mapstatus使用的阈值是HighlyCompressedMapStatus还是CompressedMapStatus。如果huffle分区大于此值,则使用HighlyCompressedMapStatus。spark.shuffle.accurateBlockThreshold100MHighlyCompressedMapStatus记录了shuffle块的确切大小的阈值。当块小于该值时,取平均值。spark.sql.adaptive.skewJoin.skewedPartitionFactor10如果分区大于此因子乘以分区大小的中值,则该分区是倾斜的。
