当前位置: 首页 > 科技观察

Sync.Cond

时间:2023-03-14 16:49:14 科技观察

被遗弃在角落本文转载请联系Golang技术分享公众号。Go语言使用go关键字启用goroutine,让开发者可以轻松实现并发编程,而并发程序的有效运行往往离不开sync包的保驾护航。目前sync包的启用列表包括:sync.atomic下的原子操作、sync.Map并发安全映射、sync.Mutex和sync.RWMutex提供的互斥量和读写锁、sync.Pool复用对象池、sync.曾经是单例模式,sync.Waitgroup多任务协作模式,sync.Cond监听模式。当然除了sync包,还有封装级别更高的channel和context。要写出一个合格的Go程序,必须掌握以上并发原语。对于大部分Gopher来说,sync.Cond应该是最陌生的,本文一探究竟。了解sync.Condsync.Cond字面意思就是同步条件变量,它实现了一个监听模式。在并发编程(也称为并行编程)中,监视器是一种同步构造,它允许线程具有互斥和等待(阻塞)特定条件变为假的能力。对于Cond,它实现了一个条件变量,是goroutines之间等待和通知的点。条件变量与共享数据隔离,可以同时阻塞多个goroutine,直到另一个goroutine改变条件变量,通知一个或多个被阻塞的goroutine唤醒。刚接触它的读者可能不太理解,那么让我们来看看在GopherCon2018的?中的演示代码示例。1typeItem=int23typeQueuestruct{4items[]Item5itemAddedsync.Cond6}78funcNewQueue()*Queue{9q:=new(Queue)10q.itemAdded.L=&sync.Mutex{}//为Cond绑定锁11returnq12}1314func(q*Queue)Put(itemItem){15q.itemAdded.L.Lock()16deferq.itemAdded.L.Unlock()17q.items=append(q.items,item)18q.itemAdded.Signal()//当数据添加到排队成功,调用Signal发送通知19}2021func(q*Queue)GetMany(nint)[]Item{22q.itemAdded.L.Lock()23deferq.itemAdded.L.Unlock()24forlen(q.items)0;n--{37wg.Add(1)38gofunc(nint){39items:=q.GetMany(n)40fmt.Printf("%2d:%2d\n",n,items)41wg.Done()42}(n)43}4445fori:=0;i<100;i++{46q.Put(i)47}4849wg.Wait()50}in在这个例子中,Queue是一个存储数据Item的结构体,它控制数据th的输入输出Cond类型的roughitemAdded。可以注意到这里有10个goroutines消费数据,但是他们需要的数据量并不相等。我们可以称之为batch,就是1-10之间的顺序。之后逐渐往Queue中加入100条数据。最终我们可以看到,10个gotoutine都可以被唤醒,拿到自己想要的数据。程序运行结果如下16:[789101112]25:[5051525354]39:[141516171819202122]41:[13]52:[3334]64:[35363738]73:[394041]87:[0123456]98:[424344454964748:[23242526272829303132]当然程序每次运行的结果都不会相同,以上输出只是某种情况。sync.Cond在$GOPATH/src/sync/cond.go中实现,Cond的结构定义如下:1typeCondstruct{2noCopynoCopy3LLocker4notifynotifyList5checkercopyChecker6}其中noCopy和checker字段用于防止Cond在使用过程中被复制。《no copy 机制》文章。L为Locker接口,一般该字段的实际对象为*RWmutex或*Mutex。1typeLockerinterface{2Lock()3Unlock()4}notifyList根据票号记录通知列表。第一次看不懂注释没关系,连贯地读下面的内容。1typenotifyListstruct{2waituint32//记录下一个服务员的票号3notifyuint32//记录下一个需要通知的服务员的票号4lockuintptr//内锁5headunsafe.Pointer//指向服务员的队头6tailunsafe.指针//指向等待者队列的尾部7}其中head和tail是指向sudog结构体的指针,sudog代表等待列表中的goroutine,它本身就是一个双向链表。值得一提的是,sudog中有一个字段ticket,用于记录当前goroutine的票号。Cond实现的核心模型是票务系统。每个想要买票的goroutine(调用Cond.Wait())都被称为服务员。取票系统会给每个服务员分配一个取票码等,当票务供应商有取票码的号码时,就会叫醒服务员。卖票的goroutine有两种。第一个调用Cond.Signal(),唤醒一个根据票号买票的服务员(如果有的话)。第二个调用Cond.Broadcast()。它会通知唤醒所有阻塞的服务员。为了方便读者更容易理解票务系统,我们在下面给出一个图解示例。上面我们知道Cond字段中的notifyList结构是一个记录工单号的通知列表。这里把notifyList比作排队买电影票。当G1通过Wait买票时,发现此时没有票可买,只能在有票后阻塞等待通知。这个时候,他已经拿到了车票。专属取票码0。同理,G2和G3也没有票可买,分别获得了自己的取票码1和2。而G4是一个电影票提供者,它是一个售票者,它通过两个Signal带来了两张票,并通知G1和G2按照票号依次取票,并将notify更新为最新的1。G5也在买票。发现此时没有票可买。它拿着自己的取票码3,阻塞等待。G6是大票商,可以满足所有等待购票的买家通过广播购票。这时G3和G5都在等待,他直接唤醒G3和G5,更新notifyvalue等于waitvalue。了解了上述取票系统的运行原理后,我们再来看看Cond包下四个实际的外部方法函数的实现。NewCond方法1funcNewCond(lLocker)*Cond{2return&Cond{L:l}3}用于初始化Cond对象,也就是初始化控件锁。Cond.Wait方法1func(c*Cond)Wait(){2c.checker.check()3t:=runtime_notifyListAdd(&c.notify)4c.L.Unlock()5runtime_notifyListWait(&c.notify,t)6c.L.Lock()7}runtime_notifyListAdd在runtime/sema.go中的notifyListAdd中实现,用于自动增加服务员的waiterticketnumber,返回当前goroutine应该拿的ticketnumbert。runtime_notifyListWait的实现是runtime/sema.go中的notifyListWait,它会尝试将此时应该被goroutine拾取的票号t与notify中记录的当前应该通知的票号进行比较。如果t小于当前票号,那么可以直接退回,否则会等待并通知要取的号。同时这里需要注意的是,由于当前goroutine在进入runtime_notifyListWait时是通过c.L.Unlock()解锁锁的,这就意味着可能有多个goroutine来改变条件。那么当前goroutine并不能保证runtime_notifyListWait返回后条件一定为真,所以需要循环判断条件。正确的Wait姿势如下:1//c.L.Lock()2//for!condition(){3//c.Wait()4//}5//...makeuseofcondition...6//c.L.Unlock()Cond.Signal方法1func(c*Cond)Signal(){2c.checker.check()3runtime_notifyListNotifyOne(&c.notify)4}runtime_notifyListNotifyOne的详细实现在runtime/sema.go的notifyListNotifyOne中,其作用是通知服务员入住。具体操作是:如果上次通知取票后没有新的服务员取票,则函数直接返回。否则,将票号加1,通知叫醒等候取票的服务员。需要注意的是,在调用Signal方法时,不需要持有c.L锁。Cond.Broadcast方法1func(c*Cond)Broadcast(){2c.checker.check()3runtime_notifyListNotifyAll(&c.notify)4}runtime_notifyListNotifyAll的详细实现在runtime/sema.go的notifyListNotifyAll中,会通知并唤醒所有服务员。并将通知值设置为等于等待值。在调用Broadcast方法时,不需要持有c.L锁。讨论在$GOPATH/src/sync/cond.go下,我们可以发现它的代码量很小,但是只呈现了核心逻辑,其实现细节位于runtime/sema.go,依赖于runtime层调度原语,对细节感兴趣的读者可以深入学习。问题来了,为什么我们在日常开发中很少使用sync.Cond呢?Invalidwakeup上面我们说了,使用Cond.Wait的正确姿势是:1c.L.Lock()2for!condition(){3c。Wait()4}5...makeuseofcondition...6c.L.Unlock()以文章开头的例子为例,如果每次调用Put方法时都使用Broadcast方法唤醒所有等待者,那么有很大概率被唤醒的waiter醒来后发现不满足条件,会再次进入wait。虽然调用Signal方法会唤醒指定的waiter,但并不保证一定满足唤醒waiter的条件。因此,在实际使用中,我们需要尽可能保证唤醒操作有效。为了做到这一点,代码的复杂度必然会增加。饥饿问题还是以文章开头的例子为例。如果有多个goroutine同时执行G??etMany(3)和GetMany(3000),执行GetMany(3)和GetMany(3000)的goroutine被唤醒的概率是一样的,但是由于GetMany(3)只需要3个数据满足条件,所以如果GetMany(3)的goroutine一直存在,执行GetMany(3000)的goroutine永远拿不到数据,永远被无效唤醒。不能响应其他事件条件变量的目的是让goroutine进入睡眠状态等待某些条件发生。但是这样会让goroutine等待条件,可能会漏掉一些其他需要注意的事件。例如,调用Cond.Wait的函数包含上下文上下文。当上下文发送取消信号时,它并不能像我们预期的那样得到取消信号并退出。使用Cond可以防止我们同时选择条件和其他事件。可替代性通过对sync.Cond的几个外部方法的分析,不难看出其使用场景是可以被channel替代的,但是这样也会增加代码的复杂度。上面的例子可以使用channel重写如下。1typeItem=int23typewaiterstruct{4nint5cchan[]Item6}78typestatestruct{9items[]Item10wait[]waiter11}1213typeQueuestruct{14schanstate15}1617funcNewQueue()*Queue{18s:=make(chanstate,1)19s<-state{}20}return2223func(q*Queue)Put(itemItem){24s:=<-q.s25s.items=append(s.items,item)26forlen(s.wait)>0{27w:=s.wait[0]28iflen(s.items)=n{41items:=s.items[:n:n]42s.items=s.items[n:]43q.s<-s44returnitems45}4647c:=make(chan[]Item)48s.wait=append(s.wait,waiter{n,c})49q.s<-s5051return<-c52}最后,虽然上面的讨论中列出了sync.Cond的潜在问题,如果开发者在使用中能够考虑以上几点,对于monitormodel的代码实现而言,使用sync.Cond在代码的语义逻辑上比channel模式更容易理解和维护。请记住,易于理解的代码模型总是比深奥的炫耀更接地气。