本文转载自微信公众号《Golang来了》,作者Seekload。本文转载请联系Golang公众号。大家好,我是四哥。信号量是并发编程中常用的同步机制。它在标准库的并发原语中被频繁使用,比如Mutex、WaitGroup等,这些并发原语的实现都有信号量的影子,所以我们只有通过实现原理来学习理解信号量量,做到“知其然,又知其所以然”,我们才能有更多的“武器”来应对我们面临的实际业务场景问题。今天我们要修复信号量。通过本文,您可以掌握:什么是信号量?有哪些操作?Go官方是如何实现信号量的?实际场景中如何使用信号量?使用信号量需要注意哪些问题?还有其他实现信号量的方法吗?什么是信号量?有哪些操作?维基百科对信号量的解释如下:在不同的操作系统中。在系统中,每个进程都会被赋予一个信号量,它代表了每个进程的当前状态。没有被控制的进程会被强制停在特定的地方,等待信号继续。下面用G来表示goroutine。通俗的解释是,信号量通常用一个整型变量S来表示一组资源。当G等待完信号量时,S会减1。当G完成释放信号量时,S会加1。当计数值为0时,G调用wait等待信号量会阻塞,除非S是大于0,等待的G会解除阻塞并成功返回。比如图书馆的10本书《Go 语言编程之旅》,10000人想看这本书,“僧多粥少”。所以,图书管理员会先让这10000人登记,按照登记的先后顺序借阅这本书。如果所有的书都借完了,其他想看书的人就得等了。如果有人还书,图书管理员会通知下一位同学借书。这里的资源是十本书《Go 语言编程之旅》。想看这本书的同学就是goroutines,图书管理员就是信号量。通过上面的解释,我们可以知道什么是信号量。实际上,信号量是一个变量或抽象数据类型,用于控制并发系统中多个进程对公共资源的访问。访问是原子的。信号量主要分为两类:计数信号量。上面提到的借书的例子就是计数信号量。它的计数可以是任何正整数;二值信号量实际上是一种特殊的计数信号量,它的值只有0或1,相当于一个互斥量。当值为1时,资源可用。当值为0时,资源被锁定,进程被阻塞,无法继续执行;PV操作信号量定义了两个操作P和V,P操作是减少信号量的计数值,V操作是增加信号量的计数值。通常,在初始化时,信号量S被赋值为n,就像一个池子里有n个资源。P操作相当于请求资源,如果资源可用,则立即返回;如果没有资源或者资源不够,G就会阻塞等待。V操作会释放持有的资源,并将资源返回给信号量。信号量的值只能通过P/V操作来改变,初始化操作除外。我们一般使用信号量来保护一组资源,比如数据库连接池,几台打印机资源等等。如果信号量转化为二进制信号量,那么它的P/V与互斥量的Lock/Unlock是一样的。信号量实现——官方扩展包Semaphore查看Go源码时,我们经常可以看到以下关于信号量的函数:s*uint32,handoffbool,skipframesint)这些函数就是信号量的PV操作。不幸的是,它们被Go运行时内部使用,并没有被打包和暴露为外部信号量并发源。语言,我们没有办法使用它。不过没关系,Go在它的扩展包中提供了一个semaphore信号量,只不过这个信号量的类型名不叫Semaphore,而是Weighted。这是一个加权信号量。接下来我们重点分析一下这个库。Weighted的实现思路:使用mutex+List实现。mutex实现了对其他字段的保护,而List实现了一个等待队列,waiter的通知是通过Channel的通知机制实现的。加权主要包括两种结构和几种常用的方法。StructuretypeWeightedstruct{sizeint64//最大资源数,初始化时指定curint64//计数器,当前使用的资源数musync.Mutex//mutex,保护字段waiterslist.List//wait等待者列表,当前阻塞等待的请求者goroutine}各字段含义见代码注释,其中waiters中存放的数据为waiter对象,waiter数据结构如下:typewaiterstruct{nint64//The调用者请求的资源数readychan<-struct{}//当调用者可以获得信号量资源时,关闭chan,调用者会收到通知,成功返回}字段含义见注释。这里是初始化资源个数NewWeighted的方法,很简单://CreateasemaphorewithnresourcesfuncNewWeighted(nint64)*Weighted{w:=&Weighted{size:n}returnw}方法一。阻塞获取资源方法——Acquire(),源码如下:func(s*Weighted)Acquire(ctxcontext.Context,nint64)error{s.mu.Lock()//有可用资源,如果s.size-s则直接返回。cur>=n&&s.waiters.Len()==0{s.cur+=ns.mu.Unlock()returnnil}//程序执行到这里表示没有足够的资源可以使用ifn>s.size{s.mu.Unlock()<-ctx.Done()returnctx.Err()}//资源不足,构造一个waiter,加入等待队列//就绪通道用于通知被阻塞的调用者资源可用,释放资源的goroutine负责关闭,相当于消息通知ready:=make(chanstruct{})w:=waiter{n:n,ready:ready}elem:=s.waiters.PushBack(w)//添加到等待队列s.mu.Unlock()//调用者陷入selectblock,除非收到外部ctx的取消信号或被通知资源可用select{case<-ctx.Done()://接收到外部ctx控制信号err:=ctx.Err()s.mu.Lock()select{case<-ready://再次检查是否可以被唤醒,如果被唤醒,忽略控制信号,并返回nil表示成功err=nildefault://收到控制信息后没有获取到资源,删除原来添加的waiterisFront:=s.waiters.Front()==elem//当前waiter是否为链表头元素s.waiters.Remove(elem)//删除waiterifisFront&&s.size>s.cur{//如果为链表头元素且资源可用,尝试唤醒链表中的第一个Waitingwaiters.notifyWaiters()}}s.mu.Unlock()returnerrcase<-ready://消息通知,请求资源的goroutine被释放资源的goroutine唤醒returnnil}}详见注释,Acquire()相当于P操作,可以一次获取多个资源。如果没有足够的资源,调用者将被阻塞。可以通过第一个参数Context增加超时或者取消机制。如果正常获取资源,则返回nil;否则返回ctx.Err(),信号量不变。2、非阻塞资源获取方法——TryAcquire,源码如下:func(s*Weighted)TryAcquire(nint64)bool{s.mu.Lock()success:=s.size-s.cur>=n&&s.waiters.Len()==0ifsuccess{s.cur+=n}s.mu.Unlock()returnsuccess}这个方法比较简单,非阻塞获取指定个数resources,如果没有空闲资源,直接returnfalse。3.通知服务员notifyWaiters,源码如下:func(s*Weighted)notifyWaiters(){for{next:=s.waiters.Front()//获取队列头元素ifnext==nil{//队列中没有元素break}w:=next.Value.(waiter)ifs.size-s.cur
