当前位置: 首页 > 科技观察

面试官:哥们,你对Go语言的读写锁了解多少?

时间:2023-03-20 17:02:53 科技观察

读写锁介绍互斥锁我们都知道会锁住代码的临界区。当一个goroutine获得互斥锁后,任何goroutine都无法获得互斥锁。它只能等待goroutine释放互斥锁,无论读写操作都会加一把大锁,在读多写少的场景下效率会很低,所以大佬们设计了读写锁。顾名思义,读写锁就是把锁分为两部分:读锁和写锁,读锁允许多个线程同时获取,因为读操作本身是线程安全的,而写操作lock是互斥锁,不允许多个线程同时获得写锁,写操作和读操作也是互斥的,总结一下:读和读不互斥,读和write是互斥的,write和write是互斥的。为什么会有读锁?有的朋友可能会疑惑为什么会有读锁。读取操作不会修改数据。多个线程同时读取同一个资源是安全的。为什么要加读锁?举个例子,Golang中变量的赋值并不是并发安全的。比如对一个int变量进行count++操作,并发执行时会出现意想不到的结果,因为count++操作分为三部分:读取count值,count值加1,然后将结果赋值给数数。这不是原子操作。当多个线程在未加锁的情况下同时对变量执行count++操作时,会出现数据不一致的情况。通过加写锁可以解决这个问题,但是如果我们读的时候不加读锁呢?写个例子看看,只加写锁,不加读锁:packagemainimport"sync"constmaxValue=3typeteststruct{rwsync.RWMutexindexint}func(t*test)Get()int{index}func(t*test)Set(){t.rw.Lock()t.index++如果t.index>=maxValue{t.index=0}t.rw.Unlock()}funcmain(){t:=test{}sw:=sync.WaitGroup{}fori:=0;我<100000;i++{sw.Add(2)gofunc(){t.Set()sw.Done()}()gofunc(){val:=t.Get()ifval>=maxValue{print("获取值error|value=",val,"\n")}sw.Done()}()}sw.Wait()}运行结果:getvalueerror|value=3取值错误|value=3取值错误|value=3取值错误|value=3取值错误|,因为我们没有加读锁,如果同时允许读和写,读到的数据可能在中间状态,所以我们可以得出结论,读锁是必要的。读锁可以防止在写中间读取值。读写锁的跳队策略在多个读操作同时进行时也是线程安全的。一个线程获得读锁后,另一个线程也可以获得读锁,因为读锁是共享的。如果一直有线程加读锁,后面又有线程加写锁,就会获取不到锁,造成阻塞。这时候就需要一些策略来保证锁的公平性,避免锁饥饿,那么Go语言中的读写锁是采用什么跳队策略来避免锁饥饿呢?这里我们用一个例子来说明Go语言的跳队策略:假设有五个goroutine,分别是G1、G2、G3、G4、G5。现在G1和G2已经成功获取到读锁,还没有释放读锁。G3需要执行Write操作,如果获取写锁失败,会阻塞等待。当前阻塞写锁的读锁goroutine个数为2:当G4后面进来想获取读锁时,会判断当前有写锁的goroutine是否阻塞等待,以避免写锁饥饿,G4也会进入阻塞等待,然后G5会进来获取写锁,因为G3正在占用mutex,所以G5会进入自旋/睡眠阻塞等待;现在G1和G2释放读锁。Lock,当释放读锁时,判断如果阻塞写锁goroutine的读锁goroutine数为0且有写锁等待,则唤醒阻塞等待的写锁G3,G3将awakened:G3在处理完写操作后会释放写锁,这一步会唤醒同时等待读锁/写锁的goroutine。至于G4和G5,谁先拿到锁就看谁快,就像抢老婆一样,先得先得。读写锁的实现接下来我们深入分析一下源码。首先,让我们看一下RWMutex结构有什么:typeRWMutexstruct{wMutex//如果有待处理的写入者writerSemuint32//写入者等待完成读者的信号量readerSemuint32//读者等待完成的信号量writersreaderCountint32//挂起的读者数readerWaitint32//离开的读者数}w:多路复用mutex提供的能力;writerSem:写操作goroutine阻塞等待信号量,当阻塞写操作的读操作goroutine释放读锁时,通过信号量通知被阻塞的写操作goroutine;readerSem:读操作goroutine阻塞等待信号量,当写操作goroutine释放写锁时,通过信号量goroutine通知被阻塞的goroutine进行读操作;redaerCount:当前正在执行的读操作goroutines个数;readerWait:写操作阻塞时等待的读操作goroutines的数量。readlock读锁对应的方法如下:func(rw*RWMutex)RLock(){//原子操作readerCount只要其值不为负就说明读锁获取成功如果atomic.AddInt32(&rw.readerCount,1)<0{//有一个等待的写锁,为了避免阻塞等待饿死后进来的读锁runtime_SemacquireMutex(&rw.readerSem,false,0)}}简化了race的方法检测,读锁方法只有两行代码,逻辑如下:使用原子操作更新readerCount,并将readercount值加1。只要原子操作后的值不为负,就说明加读锁成功。如果值为负数,说明已经有写锁成功获取互斥锁,写锁goroutine处于waiting或者Running状态,所以为了避免阻塞等待饿死后进来的读锁,调用runtime_SemacquireMutex阻止并等待。非阻塞读锁Go语言在1.18引入了非阻塞读锁的方法:func(rw*RWMutex)TryRLock()bool{for{//读取readerCount值可以知道当前是否有写锁等待阻塞,如果值为负,则随后的读取锁将被阻止c:=atomic.LoadInt32(&rw.readerCount)ifc<0{ifrace.Enabled{race.Enable()}returnfalse}//尝试获取读取锁,for循环不断尝试ifatomic.CompareAndSwapInt32(&rw.readerCount,c,c+1){ifrace.Enabled{race.Enable()race.Acquire(unsafe.Pointer(&rw.readerSem))}returntrue}}}因为读锁是共享的,在没有写锁阻塞等待的情况下,多个线程可以同时获取,所以原子操作可能会失败。这里使用for循环来增加尝试次数,非常巧妙。释放读锁释放读锁的代码主要分为两部分,第一部分:func(rw*RWMutex)RUnlock(){//将readerCount的值减1,如果该值等于0,直接退出即可;否则进入rUnlockSlow处理ifr:=atomic.AddInt32(&rw.readerCount,-1);r<0{//outlinedslow-path允许内联fast-pathrw.rUnlockSlow(r)}}我们都知道readerCount的值代表当前正在执行的读操作goroutines的数量。自减后的值大于等于0表示当前没有异常场景或者写锁阻塞等待,直接退出即可,否则需要处理这两个逻辑:rUnlockSlow逻辑如下:func(rw*RWMutex)rUnlockSlow(rint32){//r+1等于0表示释放读锁而不加读锁,异常场景应该抛出异常//r+1==-rwmutexMaxReaders也就是不加读锁,释放读锁//因为写锁加锁成功后,rwmutexMaxReaders会减去readerCout的值ifr+1==0||r+1==-rwmutexMaxReaders{race.Enable()throw("sync:RUnlockofunlockedRWMutex")}//如果有写锁在等待读锁,readerWait的值会被更新,所以值rw.readerWait的值将一步递减。//如果原子操作后readerWait的值等于0,说明当前阻塞写锁的读锁已经释放,需要唤醒等待的写锁ifatomic.AddInt32(&rw.readerWait,-1)==0{//最后一个reader解除阻塞writer.runtime_Semrelease(&rw.writerSem,false,1)}}解读这段代码:r+1等于0,表示当前goroutine释放读锁,没有加a读锁,属于非法操作;r+1==-rwmutexMaxReaders表示写入如果加锁成功,readerCount减去rwmutexMaxReaders就会变成负数。如果之前没有加读锁,那么直接释放读锁会导致这个等式成立。也属于释放读锁而不加读锁的操作。非法经营;readerWait表示当写操作被阻塞时,读操作的goroutines数量。如果有写锁在等待,readerWait的值将被更新。当读锁释放时,readerWait需要递减。如果自减后等于0,说明当前写锁被阻塞,读锁已经释放,需要唤醒等待的写锁(见下面写锁代码回显)。写锁写锁对应的方法如下:constrwmutexMaxReaders=1<<30func(rw*RWMutex)Lock(){//首先解决与其他writers的竞争。//写锁也是互斥锁,复用互斥锁解决与其他写锁竞争的能力//如果已经获取到写锁,其他goroutine在获取写锁时会进入自旋或休眠rw.w.Lock()//设置readerCount为负值,告诉读锁现在有一个写锁等待运行(获取互斥量成功)r:=atomic.AddInt32(&rw.readerCount,-rwmutexMaxReaders)+rwmutexMaxReaders//获取成功mutex并不意味着goroutine成功获取了写锁,默认情况下,我们的最大读操作数为2^30。减去这个最大数后,如果仍然不为0,说明前面还有读锁,我们需要等待读锁释放,在写操作阻塞时更新等待的读操作goroutines。数字;ifr!=0&&atomic.AddInt32(&rw.readerWait,r)!=0{runtime_SemacquireMutex(&rw.writerSem,false,0)}}代码量不是很大,但是理解起来还是有点复杂,我试着用文字来分析一下,主要分为两部分:获取互斥量,写锁也是互斥量,这里我们复用互斥量mutex的加锁能力,当互斥量加锁成功后,其他的写锁当goroutine再次尝试获取锁时,会进入自旋休眠等待;判断写锁是否获取成功,有一个变量rwmutexMaxReaders=1<<30,表示最多支持2^30并发读。mutex加锁成功后,假设有2^30次读操作释放了读锁,将readerCount设置为负数,通过原子操作加上2^30,如果此时r仍然不为0,则还有读正在进行的操作,写锁需要等待,同时通过原子操作更新readerWait字段,即更新写操作阻塞时等待的读操作goroutines的数量;readerWait会在上面的读锁释放时进行判断并递减,当前readerWait为0时会递减。WakeupwritelockNon-blockingaddwritelockGo语言在1.18引入了非阻塞锁的方法:func(rw*RWMutex)TryLock()bool{//首先判断是否获取互斥量成功,如果不成功,直接returnfalseif!rw.w.TryLock(){ifrace.Enabled{race.Enable()}returnfalse}//mutex获取成功,然后判断是否有读锁阻塞写锁,如果不是直接更新readerCount为//负数即可成功获取写锁;if!atomic.CompareAndSwapInt32(&rw.readerCount,0,-rwmutexMaxReaders){rw.w.Unlock()ifrace.Enabled{race.Enable()}returnfalse}returntrue}释放写锁func(rw*RWMutex)Unlock(){//向读者宣布没有活跃的作者。//将readerCount恢复为正数,即释放读锁的mutexr:=atomic.AddInt32(&rw.readerCount,rwmutexMaxReaders)ifr>=rwmutexMaxReaders{race.Enable()throw("sync:UnlockofunlockedRWMutex")}//如果后面有readgoroutines,需要为i唤醒它们:=0;我<整数(r);i++{runtime_Semrelease(&rw.readerSem,false,0)}//释放互斥量,写操作的goroutine和读操作的goroutine同时竞争rw.w.Unlock()}释放写锁逻辑比较简单。释放写锁会唤醒后面的读写操作的goroutines,然后它们就开始竞争了。总结因为上面我们分享了互斥锁的实现,所以看读写锁就容易多了。文章最后总结一下读写锁:读写锁提供四种操作:读加锁,读解锁,写加锁,写解锁;锁定规则为读写共享、写写互斥、读写互斥、写读互斥;读写锁必须存在于读写锁中,它们的目的也是为了避免原子性问题,只有写锁没有读锁,才会导致我们读到中间值;Go语言的读写锁在设计上也避免了写锁饥饿的问题,由readerCount和readerWait字段控制。当写锁goroutine被阻塞时,后续想要获取读锁的goroutine也会被阻塞。当写锁释放后,后续的读操作goroutine和写操作goroutine会被唤醒,剩下的就交给他们自己去竞争;读锁获取锁流程:当锁空闲时,可以立即获取读锁如果当前有写锁阻塞,则休眠想要获取读锁的goroutine释放读锁流程:目前没有异常场景或者写锁阻塞等待发生,直接释放读锁。如果没有加读锁就释放了读锁,会抛出异常;在写锁被读锁阻塞的场景下,readerWait的值会递减,readerWait表示阻塞goroutine的读操作的goroutines个数,当readerWait减为0时,readerWait的goroutines被阻塞的写操作可以被唤醒;写锁获取过程复用了mutex互斥锁的能力,首先尝试获取互斥锁,获取互斥锁失败会进入自旋/睡眠;获取互斥量成功不代表写锁加成功。此时如果还有goroutine占用读锁,则阻塞,否则写锁成功释放。释放写锁会将负的readerCount变为正值,同时释放读锁的mutex,唤醒所有当前阻塞的读锁。释放互斥锁读写锁的代码量并不多,因为它复用了互斥锁的设计,在读写锁的功能上做了更多的工作,比互斥锁更容易理解。你学会了吗?