当前位置: 首页 > 科技观察

Go官方设计了一个信号量库

时间:2023-03-12 22:40:56 科技观察

前言大家好,我是asong。上一篇写的时候,请不要滥用goroutine,我发现Go语言的扩展包提供了一个加权信号量库Semaphore。使用信号量,我们可以实现一个“工作池”来控制一定数量的goroutine并发工作。因为对源码抱着好奇的态度,所以周末仔细看了看这个库并分析了一下,记录在这里。什么是信号量如果想知道什么是事物,爱上百度百科搜索,输入“信号量”,很快就有答案。百度百科解释:信号量(Semaphore),有时也称为信号量,是一种在多线程环境下使用的设施,可以用来保证两个或多个关键代码段不会被并发调用。在进入代码的临界区之前,线程必须获得一个信号量;一旦代码的关键部分完成,线程必须释放信号量。其他想要进入关键代码段的线程必须等到第一个线程释放信号量。为了完成这个过程,需要创建一个信号量VI,然后将AcquireSemaphoreVI和ReleaseSemaphoreVI放置在每个关键代码段的开头和结尾。确认信号量VI引用最初创建的信号量。通过这个解释,我们可以知道什么是信号量。实际上,信号量是一种变量或抽象数据类型,用于控制并发系统中多个进程对公共资源的访问。访问是原子的。信号量主要分为两类:二元信号量:顾名思义,信号量只有两种,0或1,相当于互斥量。当值为1时,资源可用。当值为0时,资源被锁定,进程Blocking无法继续执行。计数信号量:信号量是任意整数。开始时,如果计数器的计数值为0,则创建的信号量处于不可用状态。如果计数器的计数值大于0,则创建的信号量为可用状态,总的获取次数等于计数器的值。信号量的工作原理信号量由操作系统维护。信号量只能执行两种操作:等待和发送信号。综上所述,运算的核心是PV运算:P原语:P是荷兰语Proberen(test)的首字母。作为阻塞原语,它负责将当前进程从运行状态转换为阻塞状态,直到另一个进程将其唤醒。操作为:申请空闲资源(信号量减1),成功则退出;如果失败,则进程被阻塞;Vprimitive:V是荷兰语Verhogen(增加)的首字母。要唤醒原语,它负责唤醒阻塞的进程。它有一个参数表,里面存放着等待被唤醒的进程的信息。操作是:释放一个被占用的资源(信号量加1),如果发现有阻塞进程,选择一个将其唤醒。信号量的PV操作是一个原子操作,PV原语执行过程中不允许中断。PV原语对信号量的操作分为三种情况:将信号量视为某类共享资源的剩余个数,实现对某类共享资源的访问。信号量作为进程间的同步视觉信号,其值是锁标志,实现对共享变量的访问。在本文中,我们将不再继续分析。接下来我们重点关注Go语言提供的扩展包Semaphore,看看它是如何实现的。官方扩展包Semaphore我们在分析Go语言源码的时候总能看到这些函数:funcruntime_Semacquire(s*uint32)funcruntime_SemacquireMutex(s*uint32,lifobool,skipframesint)funcruntime_Semrelease(s*uint32,handoffbool,skipframesint)函数就是pv操作信号量,但它们都被Go内部使用。如果要使用信号量,可以使用官方的扩展包:Semaphore,它是一个加权信号量。接下来我们重点分析一下这个库。安装方法:goget-ugolang.org/x/sync数据结构typeWeightedstruct{sizeint64//设置一个最大权重curint64//标识当前使用的资源数musync.Mutex//提供临界区保护waiterslist.List//被阻塞等待的调用者列表}信号量库的核心结构是Weighted,主要有4个字段:size:这个代表最大权重,创建Weighted对象时,指定cur:相当于一个游标,用来记录当前使用的权重值mu:互斥锁,并发情况下的临界区保护waiter数据结构如下:typewaiterstruct{nint64//等待调用者的权重值readychan<-struct{}//closechannel是否唤醒}这里只有两个字段:n:这是调用者的权重值服务员的callerready:这是一个channel,利用channel的close机制唤醒信号量,return提供了创建Weighted对象的方法,初始化时需要给它一个最大权重://NewWeighted创建一个新的加权信号量对于并发访问,信号量具有给定的最大权重。funcNewWeighted(nint64)*Weighted{w:=&Weighted{size:n}returnw}块获取权重的方法-Acquire先直接看代码:func(s*Weighted)Acquire(ctxcontext.Context,nint64)error{s.mu.Lock()//锁定保护临界区//资源可用且不等待获取权重的Goroutinesifs.size-s.cur>=n&&s.waiters.Len()==0{s.cur+=n//Weighteds.mu.Unlock()//释放锁returnnil}//要获取的权重n大于最大权重ifn>s.size{//先释放锁,保证其他goroutine调用Acquire都是notblockedBlocks.mu.Unlock()//阻塞等待context返回<-ctx.Done()returnctx.Err()}//走到这里表示现在没有可用资源//创建通道用于通知唤醒ready:=make(chanstruct{})//创建服务员对象w:=waiter{n:n,ready:ready}//服务员按顺序入队elem:=s.waiters.PushBack(w)//释放thelock,Waitforwakeup,don'tblockothergoroutines.mu.Unlock()//阻塞并等待唤醒select{//contextclosecase<-ctx.Done():err:=ctx.Err()//先获取上下文的错误信息s.mu.Lock()select{case<-ready://在上下文关闭后被唤醒,然后尝试修复队列,假装我们没有取消err=nildefault://判断是否是第一个元素isFront:=s.waiters.Front()==elem//移除第一个元素s.waiters.Remove(elem)//如果是是第一个元素且资源可用,通知其他waiterifisFront&&s.size>s.cur{s.notifyWaiters()}}s.mu.Unlock()returner//被唤醒case<-ready:returnnil}}注释已添加到代码中。总结一下,该方法主要有3个流程:流程1:当资源可用且没有等待权重的goroutines时,走正常的权重流程;流程2:想要获取的权重n大于初始化设置的最大权重,这个goroutine永远获取不到信号量,所以阻塞等待上下文关闭;流程3:如果前两步都没有问题,说明系统没有可用资源,然后需要阻塞等待唤醒,阻塞等待唤醒这里有一个特殊的逻辑;特殊逻辑2:上下文关闭后,根据是否有可用资源决定通知等待唤醒的调用者。这样做的目的是为了避免当不同的上下文控制不同的goroutines时,未关闭的goroutine不会被阻塞,仍然会执行。我们来看这样一个例子(因为goroutine的抢占式调度,这个例子也会有偶然性):特殊逻辑1:如果是在上下文关闭后被唤醒,那么忽略这个cancel,尝试修复队列funcmain(){s:=semaphore.NewWeighted(3)ctx,cancel:=context.WithTimeout(context.Background(),time.Second*2)defercancel()fori:=0;i<3;i++{ifi!=0{gofunc(numint){iferr:=s.Acquire(ctx,3);err!=nil{fmt.Printf("goroutine:%d,erris%s\n",num,err.Error())return}time.Sleep(2*time.Second)fmt.Printf("goroutine:%drunover\n",num)s.Release(3)}(i)}else{gofunc(numint){ct,cancel:=context.WithTimeout(context.Background(),time.Second*3)defercancel()iferr:=s.Acquire(ct,3);err!=nil{fmt.Printf("goroutine:%d,erris%s\n",num,err.Error())return}time.Sleep(3*time.Second)fmt.Printf("goroutine:%drunover\n",num)s.Release(3)}(i)}}time.Sleep(10*time.Second)}上例中,goroutine:0使用ct对象进行控制,超时时间为3s。goroutine:1和goroutine:2对象使用ctx对象进行控制,超时时间为2s。这三个goroutine占用的资源等于最大资源数,也就是说只有一个goroutine可以运行成功,另外两个goroutine会被阻塞,因为goroutine是抢占式调度,所以我们不能确定是哪个goroutine会是第一个Execution,这里我们假设第一个拿到信号量的是goroutine:2,阻塞调用列表的顺序是:goroutine:1->goroutine:0,因为在goroutinetine:2有2s的延时,所以会触发ctx超时,ctx会发送Done信号,因为goroutine:2和goroutine:1都是由ctx控制的,所以goroutine:1会从waiter在队列中取消,但是因为goroutine:1属于队列的第一个成员,并且因为goroutine:2释放了资源,那么它会唤醒goroutine:0继续执行,画图说明:使用这个方法可以避免goroutine永久Insomnia不阻塞获取权重的方法-TryAcquirefunc(s*Weighted)TryAcquire(nint64)bool{s.mu.Lock()//Locking//有资源可用且没有goroutines等待获取资源success:=s.size-s.cur>=n&&s.waiters.Len()==0ifsuccess{s.cur+=n}s.mu.Unlock()returnsuccess}这个方法比较简单,可以得到权重为n的信号量没有阻塞,并且成功失败时返回true,失败时返回false并保持信号量不变。释放权重func(s*Weighted)Release(nint64){s.mu.Lock()//释放资源s.cur-=n//释放资源大于持有资源,panicifs.cur<0{s.mu.Unlock()panic("semaphore:releasedmorethanheld")}//通知其他等待调用者s.notifyWaiters()s.mu.Unlock()}这里是一个很常规的操作,主要是释放资源,同时进行安全判断,如果释放的资源大于持有的资源,就会发生panic。唤醒服务者在Acquire和Release方法中调用notifyWaiters。下面分析一下这个方法:func(s*Weighted)notifyWaiters(){for{//获取调用者队列中等待的团队成员next:=s.waiters.Front()//没有调用者通知ifnext==nil{break//Nomorewaitersblocked.}//断言服务员信息w:=next.Value.(waiter)ifs.size-s.cur