介绍:典型的Spark作业读取位于OSS的Parquet表时,如何在源头判断并发度(任务/分区)?特别是在做TPCH测试的时候,有一些问题,比如在源头如何确定扫描文件的并发度?一个parquet文件是否对应一个分区?多个parquet文件对应一个分区?还是一个parquet文件对应多个分区?本文将从源码的角度来分析回答这些问题。简介典型的Spark作业读取位于OSS的Parquet表时,如何判断源的并发度(任务/分区)?特别是在做TPCH测试的时候,有一些问题,比如在源头如何确定扫描文件的并发度?一个parquet文件是否对应一个分区?多个parquet文件对应一个分区?还是一个parquet文件对应多个分区?本文将从源码的角度来分析回答这些问题。读取分析数据源对应的物理执行节点为FileSourceScanExec,读取数据代码块如下lazyvalinputRDD:RDD[InternalRow]={valreadFile:(PartitionedFile)=>Iterator[InternalRow]=relation.fileFormat。buildReaderWithPartitionValues(sparkSession=relation.sparkSession,dataSchema=relation.dataSchema,partitionSchema=relation.partitionSchema,requiredSchema=requiredSchema,filters=pushedDownFilters,options=relation.options,hadoopConf=relation.sparkSession.sessionState.newHadoopConfreadOptions(relation.options))bucketedScan){createBucketedReadRDD(relation.bucketSpec.get,readFile,dynamicallySelectedPartitions,relation)}else{createReadRDD(readFile,dynamicallySelectedPartitions,relation)}sendDriverMetrics()readRDD}主要针对非bucket处理,调用非bucket的createReadRDD方法定义-bucketscanning如下/**CreateanRDDfornon-bucketedreads.thebucketedvariantofthisfunc化是[[createBucketedReadRDD]]。*@paramreadFile一个函数来读取每个(部分ofa)file.@paramselectedPartitionsHive-style分区是read的一部分。@paramfsRelation[[HadoopFsRelation]]与读取相关联。*/privatedefcreateReadRDD(readFile:(PartitionedFile)=>Iterator[InternalRow],selectedPartitions:Array[PartitionDirectory],fsRelation:HadoopFsRelation):RDD[InternalRow]={//文件打开销售,每次打开文件最少需要读的字节valopenCostInBytes=fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes//最大切分片大小valmaxSplitBytes=FilePartition.maxSplitBytes(fsRelation.sparkSession,selectedPartitions)logInfo(s"计划用bin打包扫描,最大尺寸:$maxSplitBytesbytes,"+s"opencost被认为是扫描$openCostInBytes字节。")//如果可能的话,用桶修剪过滤文件一些(bucketSet)如果bucketingEnabled=>//如果bucket文件名不合法则不剪枝文件filePath=>BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)case_=>_=>true}//分割合并分区下的文件Sort从最大到最小valsplitFiles=selectedPartitions.flatMap{partition=>partition.files.flatMap{file=>//getPath()非常昂贵所以我们只想在这个块中调用它一次:valfilePath=file.getPathif(shouldProcess(filePath)){//文件是否可以拆分,parquet/orc/avro可以拆分valisSplitable=relation.fileFormat.isSplitable(relation.sparkSession,relation.options,filePath)//拆分文件PartitionedFileUtil.splitFiles(sparkSession=relation.sparkSession,file=file,filePath=filePath,isSplitable=isSplitable,maxSplitBytes=maxSplitBytes,partitionValues=partition.values)}else{Seq.empty}}}.sortBy(_.length)(implic[Ordering[Long]].reverse)valpartitions=FilePartition.getFilePartitions(relation.sparkSession,splitFiles,maxSplitBytes)newFileScanRDD(fsRelation.sparkSession,readFile,partitions)}可以看出,确定最大分割大小maxSplitBytes对于后续分割多少文件非常重要,其core逻辑如下defmaxSplitBytes(sparkSession:SparkSession,selectedPartitions:Seq[PartitionDirectory]):Long={//读取文件时,打包成最大分区大小,默认128MB,对应一个块大小valdefaultMaxSplitBytes=sparkSession.sessionState.conf。filesMaxPartitionBytes//打开每个文件的开销,默认为4MB。valopenCostInBytes=sparkSession.sessionState.conf.filesOpenCostInBytes//推荐的(不保证)最小文件分区数,默认不设置,从leafNodeDefaultParallelism获取//代码逻辑调用链SparkSession#leafNodeDefaultParallelism->SparkContext#defaultParallelism//->TaskSchedulerImpl#defaultParallelism->CoarseGrainedSchedulerBackend#defaultParallelism//->总核数max(执行核数之和,2),至少2valminPartitionNum=sparkSession.OrssionState.conf.filesMin(sparkSession.leafNodeDefaultParallelism)//总读取大小valtotalBytes=selectedPartitions.flatMap(_.files.map(_.getLen+openCostInBytes)).sum//单核读取的大小valbytesPerCore=totalBytes/minPartitionNum//计算出来的大小不会超过设定的128MBMath.min(defaultMaxSplitBytes,Math.max(openCostInBytes,bytesPerCore))}对于PartitionedFileUtil#splitFiles,其核心逻辑如下,比较简单,直接按照最大拆分大小拆分大文件defsplitFiles(sparkSession:SparkSession,file:FileStatus,filePath:Path,isSplitable:Boolean,maxSplitBytes:Long,partitionValues:InternalRow):Seq[PartitionedFile]={if(isSplitable){//分成多个分区(0L直到file.getLenbymaxSplitBytes).map{offset=>val剩余=file.getLen-offsetvalsize=if(remaining>maxSplitBytes)maxSplitByteselseremainingvalhosts=getBlockHosts(getBlockLocations(file),offset,size)PartitionedFile(partitionValues,filePath.toUri.toString,offset,size,hosts)}}else{顺序(getPartitionedFile(file,filePath,partitionValues))}}得到Seq[PartitionedFile]列表后,还没有完成文件分割,需要调用FilePartition#getFilePartitions做最后的处理,该方法的核心逻辑如下valcurrentFiles=newArrayBuffer[PartitionedFile]varcurrentSize=0L/**关闭当前分区并移动到下一个。*/defclosePartition():Unit={if(currentFiles.nonEmpty){//复制到一个新数组。//新建一个NewPartitionFilevalnewPartition=FilePartition(partitions.size,currentFiles.toArray)partitions+=newPartition}currentFiles.clear()currentSize=0}//打开文件开销,默认为4MBvalopenCostInBytes=sparkSession.sessionState.conf.filesOpenCostInBytes//使用“NextFitDecreasing”将文件分配给分区分区,表示一个Split任务读取的数据closePartition()}//添加给定的文件到thecurrentpartition.currentSize+=file.length+openCostInBytescurrentFiles+=file}//最后一次关闭分区,文件可能会变小会合并,生成一个maxSplitBytes大小的PartitionFile,可以避免拉起太多task,读取太多小文件生成的FileScanRDD(newFileScanRDD(fsRelation.sparkSession,readFile,partitions))的并发度为分区的长度,即Spark最后生成的Task个数overrideprotecteddefgetPartitions:Array[RDDPartition]=filePartitions.toArray整体流程图如下图所示,拆分合并的过程如下图所示。对于TPCH10G生成的客户parquet表https://oss.console.aliyun.co...一共8个Parquet文件,文件总大小113.918MBSpark作业配置如下,executor只有1个核心confspark.driver.resourceSpec=small;confspark.executor.instances=1;confspark.executor.resourceSpec=small;confspark.app.name=SparkSQLTest;confspark.adb.connectors=oss;使用tpcd;select*fromcustomerorderbyC_CUSTKEYdesclimit100;根据前面的公式计算defaultMaxSplitBytes=128MBopenCostInBytes=4MBminPartitionNum=max(1,2)=2totalBytes=113.918+8*4MB=145.918MB1845.PerCore9=9MBmaxSplitBytes=72.959MB=Math.min(defaultMaxSplitBytesopentes,Math.max(bytesPerCore))得到maxSplitBytes是72.959MB。从日志中我们也可以看到,对应大小的排序后的文件顺序为(00000,00001,00002,00003,00004,00006,00005,00007),再次合并后得到3个FilePartitioned,分别对应FilePartitioned1:00000、00001、00002FilePartitioned2:00003、00004、00006FilePartitioned3:00005、00007,即一共会生成3个Task。从SparkUI查看确实会生成3个任务。从日志中查看也生成了3个TasksTask更改了Spark作业配置,5个executorstotal10corename=SparkSQLTest;confspark.adb.connectors=oss;usetpcd;select*fromcustomerorderbyC_CUSTKEYdesclimit100;根据前面的公式计算defaultMaxSplitBytes=128MBopenCostInBytes=4MBminPartitionNum=max(10,2)=10totalBytes=113.918+8*5.9MB=8MBPer1Core1145.918MB/10=14.5918MBmaxSplitBytes=14.5918MB,defaultMats.maxSplitBytes(Math.max.Splityx,openCostInBytes,bytesPerCore))查看日志可以看到14.5918MB会分割源文件,会把00001,00002,00003,00004,00005,00006分成两部分,00007不会分割,因为它小于14.5918MB,经过PartitionedFileUtil#splitFiles,会有7*2+1=15PartitionedFile00000(0->14.5918MB),00000(14.5918MB->15.698MB)00001(0->14.5918MB),00001(14.5918MB->15.632MB)00002(0->14.5918MB),00002(14.5918MB->15.629MB)00003(0->14.5918MB),00003(14.5918MB2->415.)00004(0->14.5918MB),00004(14.5918MB->15.617MB)00005(0->14.5918MB),00005(14.5918MB->15.536MB)00006(0->14.5918MB),00006(14.59)->15.539MB)00007(0->4.634MB)排序后得到如下和合并后的10个FilePartitioned,对应FilePartitioned1:00000(0->14.5918MB)FilePartitioned2:00001(0->14.5918MB)文件分区3:00002(0->14.5918MB)文件分区4:00003(0->14.5918MB)文件分区5:00004(0->14.5918MB)文件分区6:00005(0->14.5918MB)文件分区7:0000->14.5918MB)文件分区8:00007(0->4.634MB),00000(14.5918MB->15.698MB)文件分区9:00001(14.5918MB->15.632MB),00002(14.5918MB->15.6009MB3),0(14.5918MB->15.624MB)文件分区10:00004(14.5918MB->15.617MB),00005(14.5918MB->15)。536MB),00006(14.5918MB->15.539MB)即一共会生成10个任务。通过SparkUI,还可以查看10个任务。检查日志,000004(14.5918MB->15.617MB),00005(14.5918MB->15.536MB),00006(14.5918MB->15.539MB)在同一个任务中00007(0->4.634MB),00000(14.5918MB)->15.698MB)00001(14.5918MB->15.632MB),00002(14.5918MB)->15.629MB),00003(14.5918MB->15.624MB)inthesameTaskSummary通过源码可以看出Spark会考虑到分区下所有文件的大小和打开每个文件的代价,还会涉及到大文件的切分和小文件的合并,最终得到一个相对合理的Partition
