当前位置: 首页 > 科技观察

面试官:你对Go语言的读写锁了解多少?

时间:2023-03-21 23:10:56 科技观察

读写锁介绍互斥锁我们都知道会锁住代码的临界区。当一个goroutine获得互斥锁后,任何goroutine都无法获得互斥锁。它只能等待goroutine释放互斥锁,无论读写操作都会加一把大锁,在读多写少的场景下效率会很低,所以大佬们设计了读写锁。顾名思义,读写锁就是把锁分为两部分:读锁和写锁,读锁允许多个线程同时获取,因为读操作本身是线程安全的,而写操作lock是互斥锁,不允许多个线程同时获得写锁,写操作和读操作也是互斥的,总结一下:读和读不互斥,读和write是互斥的,write和write是互斥的;有的朋友可能会有疑问,为什么要加读锁,为什么要加读锁,而且读操作不会修改数据,多线程同时读同一个资源是安全的,为什么要加读锁呢?举个例子,Golang中变量的赋值并不是并发安全的。比如对一个int变量进行count++操作,并发执行时会出现意想不到的结果,因为count++操作分为三部分:读取count值,count值加1,然后将结果赋值给数数。这不是原子操作。当多个线程在未加锁的情况下同时对变量执行count++操作时,会出现数据不一致的情况。通过加写锁可以解决这个问题,但是如果我们读的时候不加读锁呢?写个例子看看,只加写锁,不加读锁:packagemainimport"sync"constmaxValue=3typeteststruct{rwsync.RWMutexindexint}func(t*test)Get()int{index}func(t*test)Set(){t.rw.Lock()t.index++如果t.index>=maxValue{t.index=0}t.rw.Unlock()}funcmain(){t:=test{}sw:=sync.WaitGroup{}fori:=0;我<100000;i++{sw.Add(2)gofunc(){t.Set()sw.Done()}()gofunc(){val:=t.Get()如果val>=maxValue{print("取值错误|value=",val,"\n")}sw.Done()}()}sw.Wait()}运行结果:取值错误|value=3取值错误|value=3取值错误|value=3取值错误|value=3取值错误|value=3.....每次操作的结果是不固定的,因为我们没有加读锁。取到的数据可能处于中间状态,所以我们可以断定需要读锁。读锁可以防止读写中间值。读写锁的跳队策略在多个读操作同时进行时也是线程安全的,一个线程获得读锁后,另一个线程也可以获得读锁,因为读锁是共享的。如果一直有线程加读锁,后面又有线程加写锁,就会获取不到锁,造成阻塞。这时候就需要一些策略来保证锁的公平性,避免锁饥饿。那么,Go语言中读写锁采用什么切队列策略来避免饥饿问题呢?这里我们用一个例子来说明Go语言的跳队策略:假设有五个goroutine,分别是G1、G2、G3、G4、G5。现在G1和G2已经成功获取到读锁,还没有释放读锁。G3需要执行Write操作,如果获取写锁失败,会阻塞等待。当前阻塞写锁的读锁goroutine个数为2:当G4后面进来想获取读锁时,会判断当前有写锁的goroutine是否阻塞等待,以避免写锁饥饿,G4也会进入阻塞等待,然后G5会进来获取写锁,因为G3正在占用mutex,所以G5会进入自旋/睡眠阻塞等待;现在G1和G2释放读锁。Lock,当释放读锁时,判断如果阻塞写锁goroutine的读锁goroutine数为0且有写锁等待,则唤醒阻塞等待的写锁G3,G3将awakened:G3在处理完写操作后会释放写锁,这一步会唤醒同时等待读锁/写锁的goroutine。至于G4和G5,谁先拿到锁就看谁快,就像抢老婆一样,先得先得。读写锁的实现接下来我们深入分析一下源码。首先,让我们看一下RWMutex结构有什么:typeRWMutexstruct{wMutex//如果有待处理的写入者writerSemuint32//写入者等待完成读者的信号量readerSemuint32//读者等待完成的信号量writersreaderCountint32//挂起的读者数readerWaitint32//离开的读者数}w:多路复用mutex提供的能力;writerSem:写操作goroutine阻塞等待信号量,当阻塞写操作的读操作goroutine释放读锁时,通过信号量通知被阻塞的写操作goroutine;readerSem:读操作goroutine阻塞等待信号量,当写操作goroutine释放写锁时,通过信号量goroutine通知被阻塞的goroutine进行读操作;redaerCount:当前正在执行的读操作goroutines个数;readerWait:写操作阻塞时等待的读操作goroutines个数;readlock和readlock对应的方法如下:func(rw*RWMutex)RLock(){//原子操作readerCount,只要value不为负,如果是atomic.就说明读锁获取成功。AddInt32(&rw.readerCount,1)<0{//有一个等待的写锁,是为了避免阻塞饿死后进来的读锁Waitingforruntime_SemacquireMutex(&rw.readerSem,false,0)}}简化racedetection方法,读锁方法只有两行代码,逻辑如下:使用原子操作更新readerCount,并将readercount的值加1,只要原子操作完成,如果值不为负,则表示读锁添加成功。如果值为负数,表示已经有写锁,互斥量获取成功。writelockgoroutine处于等待或运行状态,所以为了避免饿死,会在后面执行。传入的读锁需要阻塞等待。调用runtime_SemacquireMutex阻塞等待非阻塞加读锁Go语言在1.18引入了非阻塞加读锁的方法:func(rw*RWMutex)TryRLock()bool{for{//readerCount值可以知道有没有当前是等待阻塞的写锁。如果该值为负数,那么后续的读锁将被阻塞。c:=atomic.LoadInt32(&rw.readerCount)ifc<0{ifrace.Enabled{race.Enable()}returnfalse}//尝试获取读锁,for循环不断尝试ifatomic.CompareAndSwapInt32(&rw.readerCount,c,c+1){ifrace.Enabled{race.Enable()race.Acquire(unsafe.Pointer(&rw.readerSem))}returntrue}}}因为读锁是共享的,所以多个线程可以获取它同时在没有写锁的时候阻塞等待,所以原子操作可能会失败。这里用一个for循环来增加尝试的次数,非常巧妙。释放读锁释放读锁的代码主要分为两部分,第一部分:func(rw*RWMutex)RUnlock(){//将readerCount的值减1,如果该值等于0,直接退出即可;否则进入rUnlockSlow处理ifr:=atomic.AddInt32(&rw.readerCount,-1);r<0{//outlinedslow-path允许内联fast-pathrw.rUnlockSlow(r)}}我们都知道readerCount的值代表当前正在执行的读操作goroutines的数量。自减后的值大于等于0表示当前没有异常场景或者写锁阻塞等待,直接退出即可,否则需要处理这两个逻辑:rUnlockSlow逻辑如下:func(rw*RWMutex)rUnlockSlow(rint32){//r+1等于0表示释放读锁而不加读锁,异常场景应该抛出异常//r+1==-rwmutexMaxReaders也就是不加读锁,释放读锁//因为写锁加锁成功后,rwmutexMaxReaders会减去readerCout的值ifr+1==0||r+1==-rwmutexMaxReaders{race.Enable()throw("sync:RUnlockofunlockedRWMutex")}//如果有写锁在等待读锁,readerWait的值会被更新,所以值rw.readerWait的值将一步递减。//如果原子操作后readerWait的值等于0,说明当前阻塞写锁的读锁已经释放,需要唤醒等待的写锁ifatomic.AddInt32(&rw.readerWait,-1)==0{//最后一个reader解除阻塞writer.runtime_Semrelease(&rw.writerSem,false,1)}}解读这段代码:r+1等于0,表示当前goroutine释放读锁,没有加areadlock,这是非法操作。r+1==-rwmutexMaxReaders表示写锁如果加锁成功,readerCount减去rwmutexMaxReaders会变成负数。如果之前没有加读锁,那么直接释放读锁会导致这个等式成立。同样是释放读锁而不加读锁的操作,属于非法操作;readerWait表示当写操作被阻塞时读操作的goroutines数量。如果有写锁在等待,readerWait的值将被更新。当读锁释放时,readerWait需要递减。如果自减后等于0,说明当前阻塞的写锁读锁已经释放,需要唤醒等待的写锁(见下面writelock代码呼应。)writelock和writelock对应的方法如下:constrwmutexMaxReaders=1<<30func(rw*RWMutex)Lock(){//首先解决与其他writers的竞争.//写锁也是互斥锁,能够复用互斥锁解决与其他写锁的竞争//如果写锁已经获取,其他goroutine在获取到写锁rw.w时会进入自旋或休眠。Lock()//设置readerCount为负值,告诉reader现在有一个写锁在等待运行(获取mutex成功)r:=atomic.AddInt32(&rw.readerCount,-rwmutexMaxReaders)+rwmutexMaxReaders//获取mutex排斥锁成功并不意味着goroutine成功获取了写锁。默认情况下,我们最多有2^30次读取操作。减去这个最大数后//如果还是不为0,说明前面还有读锁,需要等待读锁释放,写的时候更新等待读操作的goroutines个数操作被阻止;ifr!=0&&atomic.AddInt32(&rw.readerWait,r)!=0{runtime_SemacquireMutex(&rw.writerSem,false,0)}}代码量不是很大,但是还是有点复杂理解。我试着用文字来分析它。主要分为两部分:获取互斥量,写锁也是互斥量。这里我们重用了互斥量的锁定能力。当互斥锁加锁成功后,其他写锁goroutine会进入自旋休眠等待再次尝试获取锁;判断是否获取写锁成功,有一个变量rwmutexMaxReaders=1<<30表示最大支持2^30个并发读,成功锁定互斥锁后,假设有2^30个读操作释放读锁,将readerCount设置为负数并通过原子操作加2^30,如果此时r仍不为0如果正在进行读操作,则写锁需要等待。同时通过原子操作更新readerWait字段,即更新写操作阻塞时等待读操作goroutines的个数;readerWait会判断上面的读锁什么时候被释放。,递减,当前重新当aderWait递减为0时,写锁会被唤醒非阻塞写锁Go语言在1.18引入了非阻塞锁的方法:func(rw*RWMutex)TryLock()bool{//首先判断是否获取互斥量成功,如果不成功,直接返回falseif!rw.w.TryLock(){ifrace.Enabled{race.Enable()}returnfalse}//mutex获取成功,然后判断是否有读锁阻塞写锁,如果没有则更新readerCount直接为负数成功获取写锁;if!atomic.CompareAndSwapInt32(&rw.readerCount,0,-rwmutexMaxReaders){rw.w.Unlock()ifrace.Enabled{race.Enable()}returnfalse}returntrue}releaseWritelockfunc(rw*RWMutex)解锁(){//向读者宣布没有活跃的作家。//将readerCount恢复为正数,即释放读锁的mutexr:=atomic.AddInt32(&rw.readerCount,rwmutexMaxReaders)ifr>=rwmutexMaxReaders{race.Enable()throw("sync:UnlockofunlockedRWMutex")}//如果后面有readgoroutines,需要唤醒它们fori:=0;我<整数(r);i++{runtime_Semrelease(&rw.readerSem,false,0)}//释放mutex,写操作的goroutine和读操作的goroutine同时竞争rw.w.Unlock()}释放的逻辑比较writelock简单,释放writelock会唤醒后面的读写操作的goroutines,然后进行竞争;综上所述,因为我们上面分享了mutex的实现,所以看读写锁就容易多了。文末总结一下读写锁:读写锁提供四种操作:读锁,读解锁,写锁,写解锁;锁定规则为读写共享、写写互斥、读写互斥、写读互斥;读写锁必须存在于读写锁中,它们的目的也是为了避免原子性问题。只有写锁,没有读锁,才会导致我们读到中间值;Go语言的读写锁在设计上也避免了写锁饥饿的问题,由readerCount和readerWait字段控制。当写锁goroutine处于阻塞状态时,后面进来的想要获取读锁的goroutine也会被阻塞。当写锁释放时,会唤醒后面的读操作goroutine和写操作goroutine,让它们竞争剩下的;readlockLock获取过程:当锁空闲时,可以立即获取readlock。如果当前有写锁被阻塞,那么想要获取读锁的goroutine会休眠释放读锁的过程:如果没有异常场景或者写锁等待发生,则直接释放读锁.如果没有加读锁就释放了读锁,会抛出异常;在写锁被读锁阻塞的场景下,readerWait的值会递减。readerWait表示阻塞写操作goroutine的读操作goroutine数量,当readerWait减为0时,可以唤醒被写操作阻塞的goroutine;写锁获取过程写锁复用了互斥锁mutex的能力,首先尝试获取互斥锁,如果获取互斥锁失败,则进入自旋/睡眠;获取互斥量成功不代表写锁加成功。这时候如果还有goroutine在占用读锁,就会阻塞,否则写锁会成功释放。写锁进程释放写锁会将readerCount的负值变为正值,解除读锁的互斥,唤醒当前阻塞的所有读锁,释放互斥量。读写锁的代码量不多,因为它复用了mutex的设计,我在读写锁的功能上做的比较多,比mutex容易理解多了,你学会了吗?