Spark学习记录SparkCore初步概念Spark是一个基于内存的快速、通用、可扩展的大数据分析计算引擎。包含的模块有SparkCore、SparkSQL、SparkStreaming、SparkMLib、SparkGraphXSpark提交示例Standalonebin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masterspark://spark111:7077\。/examples/jars/spark-examples_2.12-3.0.0.jar\1000Yarnbin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modecluster\./examples/jars/spark-examples_2.12-3.0.0.jar\100bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modeclient\。/examples/jars/spark-examples_2.12-3.0.0.jar\100SparkOnYarn两种模式的区别在于Driver程序运行在不同的节点上。Client模式在客户端启动Driver模块进行监控和调度,Cluster模式在集群中启动Driver模块。所以一般测试用Client模式,生产部署用Cluster模式。YarnClient模式下,本地启动driver后,driver会和ResourceManager通信,申请启动ApplicationMasterResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,负责向ResourceManager申请Executor资源.ResourceManager收到ApplicationMaster的申请后,分配Container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程。Executor进程启动后,会反向注册到Driver中。所有的Executor启动后,Driver开始执行Action算子时的main函数,触发一个job,开始根据宽依赖划分stage。每个stage都会生成对应的TaskSet,然后Driver会把任务分发给各个Executor执行YarnCluster模式。YarnCluster模式下,任务提交后,会和ResourceManager通信申请启动ApplicationMaster,然后ResourceManger会分配Container,在合适的NodeManager上启动ApplicationMaster。此时ApplicationMaster就是DriverDriver启动后,向ResourceManager申请Exexutor资源。ResourceManager分配Contaioner,ApplicationMaster在对应的NodeManager上启动Excutor。Executor启动后,会向Driver反向注册。所有Executor启动后,Driver开始执行main函数并执行Action算子,触发一个job,根据宽依赖划分stage,在每个stage上生成对应的TaskSet。之后Driver将任务分发给各个Executor执行RDDRDD(ResilientDistributedDataset),弹性分布式数据集分区。与HadoopMapReduce相比,MapReduce有切片和分区,这是两个不同的概念。切片主要用在MapTask阶段,以TextInputFormat为例,切片是根据文件的块大小来确定的。默认情况下,切片大小等于块大小。当然,切片大小可以通过配置进行调整。一般块大小为128M或256M,由磁盘速度决定。一个文件会根据分片大小被分成多个逻辑上的小文件,所以分片的数量等于MapTasks的数量,即并行MapTasks的数量。MapReduce分区必须在代码中指定,默认为一个分区。,partitions的数量对应于ReduceTasks的数量。默认的分区器Partitioner是HashPartitioner,它的计算方式是(key&Long.MAX_VALUE)%numReduceTasks,也就是说在HashPartitioner的计算逻辑中,设置了多少numReduceTasks就会有多少个分区。用户可以通过继承Partioner来自定义分区器,实现指定的分区数。MapReduce的分区数也会用在MapTask阶段。数据处理map方法在进入ringbuffer之前,会对数据进行分区标记。在环形缓冲区的溢出排序和多个溢出文件的排序合并中,都会用到分区。在一个单元执行的RDD中,一个阶段的任务数是由该阶段最后一个算子的分区数决定的五个核心属性的分区列表。protecteddefgetPartitions:Array[Partition]分区计算函数defcompute(split:Partition,context:TaskContext):Iterator[T]RDD之间的依赖关系protecteddefgetDependencies:Seq[Dependency[_]]=depspartitioner(optional)@transientvalpartitioner:Option[Partitioner]=Nonepreferredlocation(optionalOptional)protecteddefgetPreferredLocations(split:Partition):Seq[String]=Nil分区和并行是相关的,但它们是不同的概念。多个partition会有多个task,但是如果只有一个Executor,就只能并发,不能并行。分区意味着写代码的人希望在资源充足的情况下实现分区个数的并行。分区内按顺序计算,分区间计算无序markRDD分区,从collection创建numSlices=loopi=0~3,filelength,start=((i*length)/numSlices).toIntend=(((i+1)*length)/numSlices).toInt[1,2,3,4,5]numSlices=2length=50=>[0,2)=>1,21=>[2,5)=>3,4,5defslice[T:ClassTag](seq:Seq[T],numSlices:Int):Seq[Seq[T]]={if(numSlices<1){thrownewIllegalArgumentException("需要正数分区")}//序列需要在同一组索引位置进行切片操作//像RDD.zip()那样表现得像预期的那样defpositions(length:Long,numSlices:Int):Iterator[(Int,Int)]={(0untilnumSlices).iterator.map{i=>valstart=((i*length)/numSlices).toIntvalend=(((i+1)*length)/numSlices).toInt(start,end)}}//...省略多行代码}textFile的分区采用Hadoop的文件读取方式,TextInputformat以行为单位读取。读取数据时,不会重复读取偏移量。1234567@@=>01234567889@@=>91011120=>1314/2=7[0,7]=>1234567@@[7,14]=>89@@0publicInputSplit[]getSplits(JobConfjob,intnumSplits)throwsIOException{//...省略more代码行longtotalSize=0L;//...省略多行代码totalSize+=file.getLen();//...省略多行代码longgoalSize=totalSize/(long)(numSplits==0?1:numSplits);longminSize=Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize",1L),this.minSplitSize);//...省略多行代码//blockSize,本地运行环境,32M,production128Mor256MlongblockSize=file.getBlockSize();longsplitSize=this.computeSplitSize(goalSize,minSize,blockSize);//...省略多行代码for(bytesRemaining=length;(double)b??ytesRemaining/(double)splitSize>1.1D;bytesRemaining-=splitSize){splitHosts=this.getSplitHostsAndCachedHosts(blkLocations,length-bytesRemaining,splitSize,clusterMap);splits.add(this.makeSplit(path,length-bytesRemaining,splitSize,splitHosts[0],splitHosts[1]));}//...省略多行代码}protectedlongcomputeSplitSize(longgoalSize,longminSize,longblockSize){returnMath.max(minSize,Math.min(goalSize,blockSize));}
