当前位置: 首页 > 后端技术 > PHP

一文看懂Gosync.Cond设计

时间:2023-03-30 02:42:22 PHP

Go语言通过go关键字启用goroutine,让开发者可以轻松实现并发编程,而并发程序的有效运行往往离不开sync包的保驾护航。目前sync包的启用列表包括:sync.atomic下的原子操作、sync.Map的并发安全映射、sync.Mutex和sync.RWMutex提供的互斥量和读写锁、sync.Pool复用对象池、sync.Once单例模式、sync.Waitgroup多任务协作模式、sync.Cond监听模式。当然除了sync包,还有封装级别更高的channel和context。要写出一个合格的Go程序,必须掌握以上并发原语。对于大部分Gopher来说,sync.Cond应该是最陌生的,本文一探究竟。认识sync.Condsync.Cond字面意思就是同步条件变量,它实现了一种监控(Monitor)模式。在并发编程(也称为并行编程)中,监视器是一种同步构造,它允许线程具有互斥和等待(阻塞)特定条件变为假的能力。对于Cond,它实现了一个条件变量,是goroutines之间等待和通知的点。条件变量与共享数据隔离,可以同时阻塞多个goroutine,直到另一个goroutine改变条件变量,通知一个或多个被阻塞的goroutine唤醒。刚接触的读者可能看不懂,那我们先看看GopherCon2018上《Rethinking Classical Concurrency Patterns》的demo代码示例。typeItem=inttypeQueuestruct{items[]ItemitemAddedsync.Cond}funcNewQueue()*Queue{q:=new(Queue)q.itemAdded.L=&sync.Mutex{}//Cond绑定锁returnq}func(q*Queue)Put(itemItem){q.itemAdded.L.Lock()deferq.itemAdded.L.Unlock()q.items=append(q.items,item)q.itemAdded.Signal()//数据成功添加到Queue后,调用Signal发送通知}func(q*Queue)GetMany(nint)[]Item{q.itemAdded.L.Lock()deferq.itemAdded.L.Unlock()forlen(q.items)0;n--{wg.Add(1)gofunc(nint){items:=q.GetMany(n)fmt.Printf("%2d:%2d\n",n,items)wg.Done()}(n)}对于我:=0;我<100;i++{q.Put(i)}wg.Wait()}在这个例子中,Queue是一个存储数据Item的结构体,它传递的是Cond类型可以注意到这里使用了10个goroutines来消费数据,但是他们需要的数据量并不相等。我们可以称之为batch,依次在1-10之间。之后逐渐往Queue中加入100条数据。最终我们可以看到,10个gotoutine都可以被唤醒,拿到自己想要的数据。程序运行结果如下6:[789101112]5:[5051525354]9:[141516171819202122]1:[13]2:[3334]4:[35363738]3:[394041]7:[0123456]8:[4243444546474849]10:[23242526272829303132]当然,程序每次运行的结果都不会相同,以上输出只是某种情况。sync.Cond在$GOPATH/src/sync/cond.go中实现,Cond的结构体定义如下:typeCondstruct{noCopynoCopyLLockernotifyListcheckercopyChecker}其中,使用了noCopy和checker字段防止Cond被使用,详见小菜岛文章《no copy 机制》。L为Locker接口,一般该字段的实际对象为*RWmutex或*Mutex。typeLockerinterface{Lock()Unlock()}notifyList记录了一个基于票号的通知列表。第一次看不懂注释没关系,跟着下面连贯阅读。typenotifyListstruct{waituint32//用来记录下一个服务员的票号notifyuint32//用来记录下一个应该通知的服务员的票号lockuintptr//内部锁头unsafe.Pointer//指向wait等待者队列的头部tailunsafe.Pointer//指向等待者队列的尾部}其中head和tail是指向sudog结构体的指针,sudog代表等待列表中的goroutine,本身就是一个双向链表。值得一提的是,sudog中有一个字段ticket,用于记录当前goroutine的票号。Cond实现的核心模型是票务系统。每个想要买票的goroutine(调用Cond.Wait())被称为服务员。取票系统会给每个服务员分配一个取票码等,当票务供应商有取票码的号码时,就会叫醒服务员。卖票的goroutine有两种。第一个是调用Cond.Signal(),会根据票号唤醒一个服务员买票(如果有的话)。第二个是调用Cond.Broadcast()。它会通知唤醒所有阻塞的服务员。为了方便读者更容易理解票务系统,我们在下面给出一个图解示例。上面我们知道Cond字段中的notifyList结构是一个记录工单号的通知列表。这里把notifyList比作排队买电影票。当G1通过Wait买票时,发现此时没有票可买,只能在有票后阻塞等待通知。这个时候,他已经拿到了车票。专属取票码0。同理,G2和G3也没有票可买,分别获得了自己的取票码1和2。而G4是一个电影票提供者,它是一个售票者,它通过两个Signal带来了两张票,并通知G1和G2按照票号依次取票,并将notify更新为最新的1。G5也在买票。发现此时没有票可买。它拿着自己的取票码3,阻塞等待。G6是大票商,可以满足所有等待购票的买家通过广播购票。这时G3和G5都在等待,他直接唤醒G3和G5,更新notifyvalue等于waitvalue。了解了上述取票系统的运行原理后,我们再来看看Cond包下四个实际的外部方法函数的实现。NewCond方法funcNewCond(lLocker)*Cond{return&Cond{L:l}}用于初始化Cond对象,也就是初始化控件锁。Cond.Wait方法func(c*Cond)Wait(){c.checker.check()t:=runtime_notifyListAdd(&c.notify)c.L.Unlock()runtime_notifyListWait(&c.notify,t)c.L.Lock()}runtime_notifyListAdd的实现runtime/sema.go中的notifyListAdd用于自动增加waiter的waiterticketnumber,返回当前goroutine应该拿的ticketnumbert。runtime_notifyListWait的实现是runtime/sema.go中的notifyListWait,它会尝试将此时应该被goroutine拾取的票号t与notify中记录的当前应该通知的票号进行比较。如果t小于当前票号,那么可以直接退回,否则会等待并通知要取的号。同时这里需要注意的是,由于当前goroutine在进入runtime_notifyListWait时是通过c.L.Unlock()解锁锁的,这就意味着可能有多个goroutine来改变条件。那么当前goroutine并不能保证runtime_notifyListWait返回后条件一定为真,所以需要循环判断条件。Wait的正确用法如下://c.L.Lock()//for!condition(){//c.Wait()//}//...利用条件...//c.L.Unlock()Cond.Signalmethodfunc(c*Cond)Signal(){c.checker.check()runtime_notifyListNotifyOne(&c.notify)}runtime_notifyListNotifyOne的详细实现在runtime/sema.go的notifyListNotifyOne中。其目的是通知服务员取票。具体操作是:如果上次通知取票后没有新的服务员取票,则函数直接返回。否则,将票号加1,通知叫醒等候取票的服务员。需要注意的是,在调用Signal方法时,不需要持有c.L锁。Cond.Broadcastmethodfunc(c*Cond)Broadcast(){c.checker.check()runtime_notifyListNotifyAll(&c.notify)}runtime_notifyListNotifyAll的详细实现在runtime/sema.go的notifyListNotifyAll中,会通知唤醒所有服务员,并设置通知值等??于等待值。在调用Broadcast方法时,不需要持有c.L锁。讨论在$GOPATH/src/sync/cond.go下,我们可以发现它的代码量很小,但是只呈现了核心逻辑,其实现细节位于runtime/sema.go,依赖于runtime层调度原语,对细节感兴趣的读者可以深入学习。问题来了,为什么我们在日常开发中很少用到sync.Cond呢?Invalidwakeup上面我们说了,使用Cond.Wait的正确姿势如下文章开头的意思就是,如果每次调用Put方法都使用Broadcast方法唤醒所有的waiter,那么很有可能被唤醒的waiters醒来后发现条件不对遇见了,又会进入等待。虽然调用Signal方法唤醒了指定的waiter,但并不保证唤醒waiter的条件一定满足。因此,在实际使用中,我们需要尽可能保证唤醒操作有效。为了做到这一点,代码的复杂度必然会增加。饥饿问题还是以文章开头的例子为例。如果有多个goroutine同时执行G??etMany(3)和GetMany(3000),执行GetMany(3)和GetMany(3000)的goroutine被唤醒的概率是一样的,但是由于GetMany(3)只需要3个数据满足条件,所以如果GetMany(3)的goroutine一直存在,执行GetMany(3000)的goroutine永远拿不到数据,永远被无效唤醒。不能响应其他事件条件变量的目的是让goroutine进入睡眠状态等待某些条件发生。但是这样会让goroutine等待条件,可能会漏掉一些其他需要注意的事件。例如,调用Cond.Wait的函数包含上下文上下文。当上下文发送取消信号时,它并不能像我们预期的那样得到取消信号并退出。使用Cond可以防止我们同时选择条件和其他事件。可替代性通过对sync.Cond的几个外部方法的分析,不难看出其使用场景是可以被channel替代的,但是这样也会增加代码的复杂度。上面的例子可以使用channel重写如下。类型Item=inttypewaiterstruct{nintcchan[]Item}typestatestruct{items[]Itemwait[]waiter}typeQueuestruct{schanstate}funcNewQueue()*Queue{s:=make(chanstate,1)s<-state{}return&Queue{s}}func(q*Queue)Put(itemItem){s:=<-q.ss.items=append(s.items,item)forlen(s.wait)>0{w:=s.wait[0]iflen(s.items)=n{items:=s.items[:n:n]s.items=s.items[n:]q.s<-s返回项目}c:=make(chan[]Item)s.wait=append(s.wait,waiter{n,c})q.s<-sreturn<-c}最后,虽然上面的讨论中列出了sync.Cond的潜在问题,如果开发者能够在使用中考虑以上几点,对于监控模型的实现,从代码的语义逻辑来看,sync.Con使用d会比channel模式更容易理解和维护。请记住,易于理解的代码模型总是比深奥的噱头更接地气。