大家好,我是花仔。接触kubernetes已经4年多了,但是大部分还是可以用的,对它的原理和源码不是很熟悉。对于平时执行的命令,其背后的流程和逻辑不是很清楚。所以最近打算看一下k8s各个模块的源码。一是加深对k8s各个模块的理解和理解;二是方便以后遇到问题分析问题根源。这个很难(硬。那么今天就简单说说pod创建的源码。文中有错误请指正,轻喷。首先,github上有k8s的源码。这次我看的是1.21.3。此外,很多翻译都是通过直译或翻译软件进行翻译的。敬请谅解。正文1.k8s源码中,pod的增删改查是在源码包/pkg/kubelet/kubelet.go中的syncLoop()中进行的。如下所示://syncLoopisthemainloopforprocessingchanges.Itwatchesforchangesfrom//三个通道(文件、api服务器和http)并创建它们的联合。对于//看到的任何新更改,将运行同步反对期望状态和运行状态。如果//没有看到配置的更改,将同步最后已知的期望//状态每个同步频率秒。永不返回。//同步循环是处理更改的主循环。它从三个渠道(文件、apiserver、http)感知pod的变化并聚合它们。发生任何变化,并且运行状态同步到期望的状态。否则,最后已知的期望状态在每个同步周期同步。func(kl*Kubelet)syncLoop(updates<-chankubetypes.PodUpdate,handlerSyncHandler){klog.InfoS("Startingkubeletmainsyncloop")在syncLoop()中,kl.syncLoopIteration()用于对pod进行特定的操作。kl.syncLoopMonitor.Store(kl.clock.Now())if!kl.syncLoopIteration(updates,handler,syncTicker.C,housekeepingTicker.C,plegCh){break}2、syncLoopIteration中有几个重要的参数,如下所示://Arguments://1.configCh:achanneltoreadconfigeventsfrom//2.handler:theSyncHandlertodispatchpodsto//3.syncCh:achanneltoreadperiodicsynceventsfrom//4.housekeepingCh:achanneltoreadhousekeepingeventsfrom//5.plegCh:achanneltoreadPLEGupdatesfrom//*configCh:dispatchthepodsfortheconfigchangetotheappropriate//handlercallbackfortheevent/*plegCh:updatetheruntimecache;syncpod//*syncCh:syncallpodswaitingforsync//*housekeepingCh:triggercleanupofpods//*healthmanager:syncpodsthathavefailedorinwhichoneormore//containershavefailedhealthchecksfunc(kl*Kubelet)syncLoopIteration(configCh<-chankubetypes.PodUpdate,handlerSyncHandler.syncCh<-,housekeepingCh<-chantime.Time,plegCh<-chan*pleg.PodLifecycleEvent)bool{select{caseu,open:=<-configCh://Updatefromaconfigsource;调度它到therighthandler//callback.if!open{klog.ErrorS(nil,"Updatechannelisclosed,exitingthesyncloop")returnfalse}SyncHandler是一个接口,它包含了几种用于pod上常见操作的方法。该接口由kubelet实现。如下://SyncHandlerisaninterfaceimplementedbyKubelet,fortestability#podcreate,update,delete...typeSyncHandlerinterface{HandlePodAdditions(pods[]*v1.Pod)HandlePodUpdates(pods[]*v1.Pod)HandlePodRemoves(pods[]*v1.Pod)HandlePodReconcile(pods[]*v1.Pod)HandlePodSyncs(pods[]*v1.Pod)HandlePodCleanups()error}3、可以对pod进行的操作如下,每个操作都有对应的方法。比如ADD,就会去执行HandlePodAdditions方法//TheseconstantsidentifythePodOperationsthatcanbemadeonapodconfiguration.const(//SETisthecurrentpodconfiguration.SETPodOperation=iota//ADDsignifiespodsthatarenewtothissource.ADD//DELETEsignifiespodsthataregracefullydeletedfromthissource.DELETE//REMOVEsignifiespodsthathavebeenremovedfromthissource.REMOVE//UPDATEsignifiespodshavebeenupdatedinthissource.UPDATE//RECONCILEsignifiespodsthathaveunexpectedstatusinthissource,//kubeletshouldreconcilestatuswiththissource.RECONCILE)switchu.Op{casekubetypes.ADD:klog.V(2).InfoS("SyncLoopADD","source",u.Source,"pods",format.Pods(u.Pods))//重启后,kubelet会通过//ADDasiftheyarenewpods.Thesepodthengothroughthe//admissionprocessand*may*berejected.Thiscanberesolved//oncewehavecheckpointing.handler.HandlePodAdditions(u.Pods)4.HandlePodAdditions如何创建一个pod?主要有以下操作:1.根据pod的创建时间排序sort.Sort(sliceutils.PodsByCreationTime(pods))2.将pod添加到podmanager。因为kubelet将依赖这个podmanager作为所需状态的凭证。如果在podmanager中查询不到一个,说明它已经被apiserver删除了,不需要其他了.podManager.AddPod(pod)3.判断pod是不是静止状态podmirrorPod,_:=kl.podManager.GetMirrorPodByPod(pod)4.通过dispatchWork分发任务kl.dispatchWork(pod,kubetypes.SyncPodCreate,mirrorPod,start)5.添加pod到probemanager,即健康检查。包括startupprobe、livenessprobe、readinessprobe。kl.probeManager.AddPod(pod)dispatchWork是做什么的?如下://Runthesyncinanasyncworker。在异步工作者中执行同步kl.podWorkers.UpdatePod(&UpdatePodOptions{Pod:pod,MirrorPod:mirrorPod,UpdateType:syncType,OnCompleteFunc:func(errerror){iferr!=nil{metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))}},})那么UpdatePod()做了什么?//创建一个新的podworker要么意味着这是一个新的pod,要么意味着kubelet刚刚重启。在任何一种情况下,kubelet都愿意相信//第一个podworker同步的pod状态。见相应的//commentinsyncPod.//创建一个新的podworker,意思是这是一个新的podgofu??nc(){Popruntime.Handlemand}Crageashod(){deferruntime.Handlemand}Crageashod()managePodLoop()来执行同步。forupdate:=rangepodUpdates{err:=func()error{podUID:=update.Pod.UID//这是一个阻塞调用,如果缓存//hasanentryforthepodthatisnewerthanminRuntimeCache//Time.Thisensurestheworkerdoesn'tstartsyncinguntil//afterthecacheisatleastnewerthanthefinishedtimeof//theprevioussync.podCache=status.,erraGetNewerThan(podUID,lastSyncTime)iferr!=nil{//Thisisthelegacyeventthrownbymanagepodloop//allothereventsarenowdispatchedfromsyncPodFnp.recorder.Eventf(update.Pod,v1.EventTypeWarning,events.FailedSync,"这里的错误确定状态:%/recorder",errr})Syncerr=p.syncPodFn(syncPodOptions{mirrorPod:update.MirrorPod,pod:update.Pod,podStatus:status,killPodOptions:update.KillPodOptions,updateType:update.UpdateType,})lastSyncTime=time.Now()returnerr}()5.最后,在pkg/kubelet/kuberuntime/kuberuntime_manager.go中调用SyncPod()创建pod//SyncPodssyncsotherrunningpodintothedesiredpodbyexecutingfollowingsteps://执行以下步骤将正在运行的pod同步到想要的状态//1.Computesandboxandcontainerchanges.//计算sanbox和容器的变化//2.Killpodsandboxifnecessary.//必要时删除podsandbox//3.Killaanycontainersthatshouldnotberunning.//删除不需要运行的容器//4.Createsandboxifnecessary.//Createifneedssandbox//5.Createephemeralcontainers.//创建一个临时容器//6.Createinitcontainers.//创建一个初始化容器//7.Createdenormalcontainers.//创建一个普通容器func(m*kubeGenericRuntimeManager)SyncPod()//第一步:Computesandboxandcontainerchanges.podContainerChanges:=m.computePodActions(pod,podStatus)klog.V(3).InfoS("computePodActionsgotforpod","podActions",podContainerChanges,"pod",klog.KObj(pod))ifpodContainerChanges.CreateSandbox{ref,错误:=ref{m.recorder.Eventf(ref,v1.EventTypeNormal,events.SandboxChanged,"Podsandboxchanged,itwillbekilledandre-created.")}else{klog.V(4).InfoS("SyncPodreceivednewpod,willcreateasandboxforit","pod",klog.KObj(pod))}}//Step2:Killthepodifthesandboxhaschanged.ifpodContainerChanges.KillPod{//Step3:killanyrunningcontainersinthispodwhicharenotkeep.forcontainerID,containerInfo:=rangepodContainerChanges.ContainersToKill{klog.V(3).InfoS(“Killingunwantedcontainerforpod”,“containerName”,containerInfo.name,“containerID”,containerID,“pod”,klog.KObj(pod))killContainerResult:=kubecontainer.NewSyncResult(kubecontainer.KillContainer,containerInfo.name)result.AddSyncResult(killContainerResult)iferr:=m.killContainer(pod,containerID,containerInfo.name,containerInfo.message,containerInfo.reason,nil);err!=nil{killContainerResult.Fail(kubecontainer.ErrKillContainer,err.Error())klog.ErrorS(err,"killContainerforpodfailed","containerName",containerInfo.name,"containerID",containerID,"pod",klog.KObj(pod))return}//Step4:Createasandboxforthepodifnecessary.podSandboxID:=podContainerChanges.SandboxIDifpodContainerChanges.CreateSandbox{varmsgstringvarerrerrorklog.V(4).InfoS("CreatingPodSandboxforpod","pod",klog.KObj(pod))createSandboxResult:=kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox,format.Pod(pod))result.AddSyncResult(createSandboxResult)podSandboxID,msg,err=m.createPodSandbox(pod,podContainerChanges.Attempt)//Step5:startephemeralcontainers//Thesearestarted"prior"toinitcontainerstoallowrunningephemeralcontainersevenwhenthere//areerrorsstartinganinitcontainer.Inpracticeinitcontainerswillstartfirstsinceephemeral//containerscannotbespecifiedonpodcreation.ifutilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers){for_,idx:=rangepodContainerChanges.EphemeralContainersToStart{start("ephemeralcontainer",ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))}}//Step6:starttheinitcontainer.ifcontainer:=podContainerChanges.NextInitContainerToStart;container!=nil{//Startthenextinitcontainer.iferr:=start("initcontainer",containerStartSpec(container));err!=nil{return}//成功启动容器;cle??artheentryinthefailureklog.V(4).InfoS("Completedinitcontainerforpod","containerName",container.Name,"pod",klog.KObj(pod))}//Step7:startcontainersinpodContainerChanges.ContainersToStart.for_,idx:=rangepodContainerChanges.ContainersToStart{start("container",containerStartSpec(&pod.Spec.Containers[idx]))}6.此外,podworker还需要做以下工作:#创建pod数据目录,volume,获取imagepullsecrets。.newPodWorkers(klet.syncPod--->pkg/kubelet/kubelet.go)//通过syncPodkubetypes.SyncPodKillkubetypes.SyncPodCreatepodStatus.IPs=append(podStatus.IPs,ipInfo.IP)runnable.Admitkubetypes.IsStaticPod(pod)kl.makePodDataDirs(pod)kl.volumeManager.WaitForAttachAndMount(pod)kl.getPullSecretsForPod(pod)kl.containerRuntime.SyncPod(pkg/kubelet/container/runtime.go)
