当前位置: 首页 > 科技观察

Master在Worker上分配资源并启动Executor,逐行代码注释版

时间:2023-03-19 00:49:02 科技观察

本文转载自微信公众号《KK架构》,作者wangkai。转载请联系KKArchitecture公众号。1.回顾之前的内容。上次看了SparkContext初始化。在继续之前,让我们回顾一下之前的内容。这里有一个假设:Spark集群以Standalone启动,作业也提交到SparkStandalone集群。首先需要启动Spark集群,使用start-all.sh脚本依次启动Master(主备)和多个Worker。启动后,开始提交作业,使用spark-submit命令提交。首先在提交任务的机器上使用java命令启动一个虚拟机,以执行主类SparkSubmit的main方法为入口。然后根据提交到不同的集群,创建新的不同的客户端类。如果是standalone,则创建一个新的ClientApp;然后将javaDriverWrapper命令封装到RequestSubmmitDriver消息中,将消息发送给Master;Master随机找一个满足资源条件的Worker启动Driver,实际执行虚拟机中DriverWrapper的main方法;然后Worker开始启动Driver,执行启动时用户提交的java包中的main方法,然后开始初始化SparkContext。Driver中依次创建DAGScheduler、TaskScheduler、SchedulerBackend三个重要实例。并启动DriverEndpoint和ClientEndpoint与Worker和Master通信。2.Master处理申请的注册。ClientEndpoint上次启动后,会向Master发送RegisterApplication消息,Master会开始处理这条消息。然后看Matster类是在哪里处理RegisterApplication消息的:可以看到,用应用的描述和Driver的引用创建了一个Application,然后就开始注册Application了。RegisterApplication很简单,就是在Master的内存中添加各种信息。重点是在waitingApps结构中加入ApplicationInfo,然后schedule()方法会遍历这个链表,为Application分配资源,并进行调度。然后将Application信息写入zk,向Driver发送RegisteredApplication消息表示应用已经注册。然后启动schedule(),这个方法上次讲过,会遍历两个链表,一个是遍历waitingDrivers启动Driver,一个是遍历waitingApps启动Application。waitingDrivers列表在客户端请求启动Driver时已经处理完毕。本次重点关注这个方法:startExecutorsOnWorkers()3.Master调度资源的步骤如下:遍历waitingApps中的所有app;如果app需要的core数小于一个Executor所能提供的core数,则不会为app分配新的Executor;过滤掉仍有cpu和内存可调度的worker,按照core大小降序排列,作为usableWorkers;计算在所有可用的WorkersCPU上分配多少;然后遍历可用的Worker,分配资源并进行调度,启动Executor。源码从Master类的schedule()方法的最后一行startExecutorsOnWorkers()开始:该方法的主要作用是计算worker执行器的数量和分配的资源,并启动执行器。/***Scheduleandlaunchexecutorsonworkers*/privateefstartExecutorsOnWorkers():Unit={//RightnowthisisaverysimpleFIFOscheduler.Wekeepingtofitinthefirstapp//inthequeue,thenthesecondapp,etc.for(app<-waitingApps){valcoresPerExecutor=app.desc.coresPerExecutor.get/thePerftisecless(1),thecoresleftwillnotbeallocatedif(app.coresLeft>=coresPerExecutor){//1。剩余内存大于单个执行器所需内存//2.剩余核数大于单个执行器需要的核数//3.按照核数从大到小排序).reversevalappMayHang=waitingApps.length==1&&waitingApps。head.executors.isEmpty&&usableWorkers.isEmptyif(appMayHang){logWarning(s"App${app.id}requiresmoreresourcethananyofWorkerscouldhave.")}//计算每个Worker上可用的coresvalassignedCores=scheduleExecutorsOnWorkers(app,usableWorkers,spreadOutApps)//N既然我们已经决定为每个工作人员分配多少个核心,让我们为(pos<-0untilusableWorkers.lengthifassignedCores(pos)>0){allocateWorkerResourceToExecutors(app,assignedCores(pos),app.desc.coresPerExecutor,usableWorkers(pos)分配它们))}}}}(1)遍历waitingApps,如果app需要的cpu??核数大于每个executor的核数,继续分配(2)过滤可用worker,条件1:剩余内存worker大于单个executor所需的内存;条件2:worker剩余的cpu核数大于单个executor需要的核数;然后按照可用cpu核数从大到小排序。(3)下面两个方法是关键方法scheduleExecutorsOnWorkers(),用于计算每个Worker上可用的cpu核数;allocateWorkerResourceToExecutors()用于在Worker上实际分配Executors。4.scheduleExecutorsOnWorkers计算每个Worker可用的核心数。这个方法很长。先看方法注释,大致翻译一下:当显示并设置了executor分配的cpu核数(spark.executor.cores)时,如果worker有足够的核数和内存,那么每个worker都可以执行多个执行者;反之,不设置时,每个worker上只能启动一个executor;而这个executor会使用worker可以提供的尽可能多的core出来;appA和appB都有一个运行在worker1上的执行器。但是appA仍然需要一些cpu内核。当appB执行完,释放了worker1上的cores后,appA会在下一次调度时启动一个新的executor来获取worker1上所有可用的cores,所以appA在worker1上启动了多个executors。设置coresPerExecutor(spark.executor.cores)很重要,请考虑以下示例:集群有4个worker,每个worker有16个核心;用户请求3个执行器(spark.cores.max=48,spark.executor.cores=16)。如果不设置这个参数,那么每次分配1个cpucore,每个worker依次分配一个cpucore,最后4个executor给每个executor分配12个core,4个worker也分配48个core,但是在end,每个executor只有12个cores<16cores,所以最后没有executor启动。如果看我的翻译还是有难度,我会进一步简化:如果不设置spark.executor.cores,那么每个Worker只能启动一个Executor,这个Executor会占用所有Worker所能提供的cpu核数;如果显示设置了,则每个Worker可以启动多个Executor;源码在下面,每一句都一一注释,中间有一个判断Executor是否可以分配给这个Worker的方法。重点在中间方法后面的部分,遍历每个Worker分配cpu,如果不是SpendOut模式,就会分配到一个Worker上,直到分配完Worker资源。privatedefscheduleExecutorsOnWorkers(app:ApplicationInfo,usableWorkers:Array[WorkerInfo],spreadOutApps:Boolean):Array[Int]={//每个执行器的核心数valcoresPerExecutor=app.desc.coresPerExecutor//每个执行器的最小核心数为1valminCoresPerExecutor=coresPerExecutor.getOrElse(1)//给每个Worker分配一个Executor?这个参数可以控制这个行为valoneExecutorPerWorker=coresPerExecutor.isEmpty//每个Executor的内存valmemoryPerExecutor=app.desc.memoryPerExecutorMBvalresourceReqsPerExecutor=app.desc.resourceReqsPerExecutor//可用的Workers总数valnumUsable=usableWorkers.length//每个的coresWorkerNumbervalassignedCores=newArray[Int](numUsable)//Numberofcorestogivetoeachworker//每个Worker的新Executors数量valassignedExecutors=newArray[Int](numUsable)//NumberofnewExecutorsoneeachworker//app需要的core数和所有的coresworker可以提供Total,取最小值varcoresToAssign=math.min(app.coresLeft,usableWorkers.map(_.coresFree).sum)//判断指定的worker是否可以为这个app启动一个executor/**返回是否指定workercanlaunchexecutorforthisapp.*/defcanLaunchExecutorForApp(pos:Int):Boolean={//如果可以如果提供的core数大于executor需要的最小core数,则继续分配每个worker分配的cores大于每个executor的最小core数valenoughCores=usableWorkers(pos).coresFree-assignedCores(pos)>=minCoresPerExecutor//当前worker新分配的executor数valenoughCores=usableWorkers(pos).valenoughCores=usableWorkers(pos)。Canalwaysstartanewexecutor//如果这个worker上已经有一个executor,给这个executor更多的cores//如果我们允许每个worker有多个executor,那么我们总是可以启动新的executor。//否则,如果已经有一个executoronthisworker,justgiveitmorecores.//如果一个worker可以为每个worker启动多个executor,那么我们总是可以启动新的executor。executor或者worker还没有分配executorvallaunchingNewExecutor=!oneExecutorPerWorker||assignedExecutorNum==0if(launchingNewExecutor){//总分配内存vaassignedMemory=assignedExecutorNum*memoryPerExecutor//是否有足够的内存:当前worker的剩余内存减去已经存在的分配的内存大于每个执行器所需的内存unt*assignedExecutorNum}.toMapvalresourcesFree=usableWorkers(pos).resourcesAmountFree.map{case(rName,free)=>rName->(free-assignedResources.getOrElse(rName,0))}valenoughResources=ResourceUtils.resourcesMeetRequirements(resourcesFree,resourceReqsPerExecutor)//所有分配的核心数+应用需要的核心数小于应用的核心限制valunderLimit=assignedExecutors.sum+app.executors.sizevarkeepScheduling=truewhile(keepScheduling&&canLaunchExecutorForApp(pos)){coresToAssign-=minCoresPerExecutorassignedCores(pos)+=minCoresPerExecutor//如果我们在每个worker上启动一个executor,那么每次迭代都会为每个executor添加一个core//否则,每次迭代都会为一个新的executor分配cores//ifwearelaunchingoneexecutorperworker,theneveryiterationassigns1core//totheexecutor。否则,everyiterationassignscorestoanewexecutor.if(oneExecutorPerWorker){assignedExecutors(pos)=1}else{assignedExecutors(pos)+=1}//如果不使用Spreadingout方式,我们继续将执行程序安排在工作程序上,直到它的所有资源都被使用//否则,跳转到下一个工作程序//Spreadingoutanapplicationmeansspreadingoutitsexecutorsacrossas//manyworkersaspossible。false}}}freeWorkers=freeWorkers.filter(canLaunchExecutorForApp)}assignedCores}然后在Worker上实际启动Executor:在launchExecutor方法中:privatedelaunchExecutor(worker:WorkerInfo,exec:ExecutorDesc):Unit={logInfo("Launchingexecutor"+exec.fullId+"onworker"+worker.id)worker.addExecutor(exec)worker.endpoint.send(LaunchExecutor(masterUrl,exec.application.id,exec.id,exec.application.desc,exec.cores,exec.memory,exec.resources))exec.application.driver.send(ExecutorAdded(exec.id,worker.id,worker.hostPort,exec.cores,exec.memory))}向Worker发送LaunchExecutor消息然后ExecutorAdded消息发送给executor对应的Driver5.总结这次讲的是Master处理应用程序的注册,cpu核数,以及在Worker上启动executor。