资源锁是通过对一个资源的CRUD操作完成的,再配合分布式锁的一些机制。在分布式环境中Leader节点的选举中,今天我们就来推测一下在K8s中是如何基于configMap来实现的。面向最终状态的锁基础知识在分布式系统中,通常会有各种各样的锁。我们先来看看主流锁具的共性以及它们是如何设计的。分布式系统中的锁分布式系统中锁的实现方式有很多种:基于CP模型,基于AP模型,但是这些锁机制都有一些通用的设计原则,我们先来看这部分。1.锁证锁证主要用来证明谁持有锁。在不同的系统中实现是不同的。比如在zookeeper中是一个临时的顺序节点,在redission中是uuid+threadID组成,在k8s中是LeaderElectionRecord,通过这个凭证来标识当前锁定了哪个client。2.锁超时当一个领导节点持有锁时,其余节点需要尝试竞争锁。在CP系统中,服务器通常维护它。即如果发现对应的节点没有心跳,则将该节点踢出,通过watch的机制回调,而在AP系统中,需要客户端自行维护,比如redission中的时间戳。3、时钟在分布式系统中,我们通常不能保证各个节点的物理时钟完全一致。通常,有一个逻辑时钟的概念。在raft、zab等很多系统中,其实是一个增量的全局计数器,但是在redission中是通过物理时钟的,也就是要保证大家的物理时钟尽量同步,锁超时不能超过时间。网络分区问题无论是CP还是AP,在分布式系统中,我们通常需要保证P的可用性,也就是分区。如果网络分区发生在持有锁的Leader节点上,则需要一个保护机制,即Leader节点需要主动退出。在zookeeper中,因为leader节点需要通过session来维护心跳,如果对应的leader节点分区了,session无法执行心跳就会退出,所以我们需要通知我们的主进程进行退出清理。资源锁的实现机制资源锁其实可以借助上面提到的锁思想,通过操作一个资源(顺序一致性)来实现分布式锁。第一个核心流程如下:通过资源对象存储锁凭证信息,识别当前Leader节点的信息放入对应的证书中,尝试竞争锁,尝试获取锁。锁超时K8s的锁超时机制比较有意思,就是不关心你的逻辑时钟,而是基于本地时钟的,也就是每个节点都会存储leader节点被观察到的时间,以及然后根据本地锁超时时间来检测是否重新发起leader竞争。核心源码分析由于篇幅原因,这里只介绍基于configMap的resourceLock,其他类似。在我的理解中,LeaderElectionRecord这个number结构的设计才是真正的锁(就好像我们可以买任何一把锁,锁上生活中的各种门一样)。通过这个锁,屏蔽了各种底层锁来实现系统的实现细节,但是注意这个锁严格来说并不是分布式互斥量。数据结构在锁的实现中,数据主要分为三类:身份凭证、时间戳和全局计数器。那我们就依次看看对应的设计思路。typeLeaderElectionRecordstruct{HolderIdentitystring`json:"holderIdentity"`LeaseDurationSecondsint`json:"leaseDurationSeconds"`AcquireTimemetav1.Time`json:"acquireTime"`RenewTimemetav1.Time`json:"renewTime"`LeaseDurationSecondsint`json:"leaderTransitions"`}身份凭证:HolderIdentity身份证书主要用于标识一个节点信息。在一些分布式协调系统中,通常是系统自身的机制,比如zookeeper中的session。这里的资源锁场景,主要用于后续流程验证当前节点是否获取到了锁。Timestamp:LeaseDurationSeconds,AcquireTime,RenewTime由于前面提到的时间同步问题,这里与时间相关的主要用于leader节点触发节点变化(也使用Lease类型),非leader节点根据当前记录来检测领导节点是否存活。LeaderTransitions计数器主要通过计数来记录leader节点切换的次数。ConfigMapLock所谓的资源锁,其实就是通过创建一个ConfigMap实例来保存我们的锁信息,并通过维护这个实例信息,来实现锁的竞争和释放。1、创建锁利用etcd的幂等操作,可以保证同一时刻只有一个leader节点成功创建锁,通过Annotations提交上述LeaderElectionRecord来提交锁。func(cml*ConfigMapLock)Create(lerLeaderElectionRecord)error{cml.cm,err=cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(&v1.ConfigMap{ObjectMeta:metav1.ObjectMeta{Name:cml.ConfigMapMeta.Name,Namespace:cml.ConfigMapMeta.Namespace,Annotations:map[string]string{LeaderElectionRecordAnnotationKey:string(recordBytes),},},})returnerr}2.获取锁func(cml*ConfigMapLock)Get()(*LeaderElectionRecord,[]byte,error){cml.cm,err=cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(cml.ConfigMapMeta.Name,metav1.GetOptions{})recordBytes,found:=cml.cm.Annotations[LeaderElectionRecordAnnotationKey]iffound{iferr:=json.Unmarshal([]byte(recordBytes),&record);err!=nil{returnnil,nil,err}}返回&记录,[]byte(recordBytes),nil}3.更新锁func(cml*ConfigMapLock)Update(lerLeaderElectionRecord)error{cml.cm.Annotations[LeaderElectionRecordAnnotationKey]=string(recordBytes)cml.cm,err=cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(cml.cm)returnerr}LeaderElvectorLeaderElector的核心流程分为三个部分:竞争锁、超时检测、心跳维护。首先所有节点都会竞争资源锁,但最终只有一个节点成为Leader节点,然后核心进程会根据角色分为两个master。流程,来看看它的实现1.核心流程如果节点没有获取成功,会一直尝试,直到取消或者选举成功,leader节点执行回调成为leader节点(补充基于领导者的动物园管理员机制的实现)func(le*LeaderElector)Run(ctxcontext.Context){deferfunc(){runtime.HandleCrash()le.config.Callbacks.OnStoppedLeading()}()if!le.acquire(ctx){//Featuredlockreturn//ctxsignalleddone}//如果锁选举成功,leader节点会执行剩下的流程,非leader节点会继续尝试acquirex,cancel:=context.WithCancel(ctx)defercancel()gole.config.Callbacks.OnStartedLeading(ctx)le.renew(ctx)}2.RenewingthelockIfyouareelectedastheleadernode,youneedtorenewthelock,whichistoperiodicallyupdatethelockrecordinformation通过调用上面提到的更新锁操作,即LeaderElectionRecord,从而实现续约的目标。func(le*LeaderElector)renew(ctxcontext.Context){ctx,cancel:=context.WithCancel(ctx)defercancel()wait.Until(func(){timeoutCtx,timeoutCancel:=context.WithTimeout(ctx,le.config.RenewDeadline)defertimeoutCancel()err:=wait.PollImmediateUntil(le.config.RetryPeriod,func()(bool,error){done:=make(chanbool,1)gofunc(){deferclose(done)//锁更新完成<-le.tryAcquireOrRenew()}()选择{case<-timeoutCtx.Done():returnfalse,fmt.Errorf("failedtotryAcquireOrRenew%s",timeoutCtx.Err())caseresult:=<-done:returnresult,nil}},timeoutCtx.Done())cancel()},le.config.RetryPeriod,ctx.Done())//ifweholdthelease,giveitupifle.config.ReleaseOnCancel{//释放锁le.release()}}3.锁释放锁的释放比较好玩,就是更新对应的资源,去掉注解中的信息,这样在获取到锁的时候会尝试竞选,因为检测到当前资源没有被覆盖任何凭证信息。func(le*LeaderElector)release()bool{if!le.IsLeader(){returntrue}leaderElectionRecord:=rl.LeaderElectionRecord{LeaderTransitions:le.observedRecord.LeaderTransitions,}iferr:=le.config.Lock.Update(leaderElectionRecord);err!=nil{klog.Errorf("Failedtoreleaselock:%v",err)returnfalse}le.observedRecord=leaderElectionRecordle.observedTime=le.clock.Now()returntrue}4.锁的竞争分为四个部分两部分:1)获取锁2)创建锁3)检测锁4)更新锁,我们依次看一下对应的实现。要获取锁,它会首先尝试获取相应的锁。在获取锁的时候,会检查对应的注解是否存在。如果不存在,则oldLeaderElectionRecord将为空,即当前资源锁不被任何人持有。oldLeaderElectionRecord,oldLeaderElectionRawRecord,err:=le.config.Lock.Get()创建一个锁。如果检测到对应的锁不存在,则直接创建锁。如果创建成功,则说明当前节点获得了锁,成为了leader,执行leader的回调逻辑。iferr!=nil{if!errors.IsNotFound(err){klog.Errorf("errorretrievingresourcelock%v:%v",le.config.Lock.Describe(),err)returnfalse}//创建锁iferr=le.config.Lock.Create(leaderElectionRecord);err!=nil{klog.Errorf("errorinitiallycreatingleaderelectionrecord:%v",err)returnfalse}//记录当前选举记录,时钟le.observedRecord=leaderElectionRecordle.observedTime=le。clock.Now()returntrue}查看锁没有使用K8s中的逻辑时钟,而是使用本地时间。通过每次比较锁证书是否更新,更新本地的observedTime。如果leader在LeaseDuration信息内没有更新对应的锁证书,则当前节点会尝试成为leader。同时这里也保证了最终一致性锁,因为后面的renew其实也是按照这个逻辑来的。如果当前节点最初持有锁但被其他节点抢占,则当前节点会主动放弃锁。if!bytes.Equal(le.observedRawRecord,oldLeaderElectionRawRecord){le.observedRecord=*oldLeaderElectionRecordle.observedRawRecord=oldLeaderElectionRawRecordle.observedTime=le.clock.Now()//这里更新本地时钟}iflen(oldLeaderElectionRecord.HolderIdentity)>0&&le.observedTime.Add(le.config.LeaseDuration).After(now.Time)&&!le.IsLeader(){//如果当前Leader任期没有超时,则当前选举锁失败klog.V(4)。infof("lockishheldby%vandhasnotyetexpired",oldLeaderElectionRecord.HolderIdentity)returnfalse}更新锁的核心逻辑其实就是Lock.Update。这个设计很有趣。与强一致性锁不同,在K8s中我们可以让多个节点同时走。到此为止,但是因为更新etcd是一个原子操作,最终只会有一个节点更新成功,那么如何保证最终锁的语义,其实需要配合上面的检测锁,所以即面向最终状态的最终锁定机制。ifle.IsLeader(){leaderElectionRecord.AcquireTime=oldLeaderElectionRecord.AcquireTimeleaderElectionRecord.LeaderTransitions=oldLeaderElectionRecord.LeaderTransitions}else{leaderElectionRecord.LeaderTransitions=oldLeaderElectionRecord.LeaderTransitions+1}//updatethelockitselfiferr=le.UpdateRecorder.Upleader!nil{klog.Errorf("Failedtoupdatelock:%v",err)returnfalse}le.observedRecord=leaderElectionRecordle.observedTime=le.clock.Now()returntrue这个问题是因为最近在做系统设计的时候想到的一个问题。PaaS系统中通常有N个以上的Operator,那么如何解决一些冲突的场景呢?比如扩缩容、发布、容灾的controller需要在同一个app下操作pod,应该怎么处理?调度呢?其实我理解这个过程不可能完美的覆盖各种异常冲突,但是我们可以玩另外一个有意思的东西,比如我们可以加一个保护状态,因为对于生产稳定性来说是压倒性的。即相应的控制器关注当前状态是否处于稳定状态。如果处于不稳定状态,则应将自身冻结,等待当前应用处于未受保护状态后再运行,保证SLA不影响各种好玩的操作。
