当前位置: 首页 > 科技观察

关于Semaphore原理与实现的文章

时间:2023-03-14 23:32:28 科技观察

本文转载自微信公众号《代码农桃花源》,作者曹春晖。转载本文请联系码农桃花源公众号。信号量数据结构//Go语言中暴露的信号量实现//具体用法是提供sleep和wakeup原语//以便在其他同步原语竞争的情况下使用//所以信号量和linux这里的futex目标是相同的//只是在语义上更简单////也就是说,不要将这些视为信号量//将这里的事物视为实现睡眠和唤醒的一种方式//每次睡眠都会与一个配对唤醒//即使发生竞争,唤醒也在睡眠之前////SeeMullenderandCox,``SemaphoresinPlan9,''//http://swtch.com/semaphore.pdf//是同步的。Mutex准备的异步信号量//semaRoot持有一棵不同地址的sudogs(s.elem)的平衡树//每个sudogs依次(通过s.waitlink)指向一个等待相同地址的其他sudogs//时间复杂度在同一地址的sudogs内部列表上的操作是O(1)。扫描顶级semaRoot列表的时间复杂度//是O(logn),其中n是散列到同一个semaRoot的不同地址的总数,一些goroutine将在每个地址上被阻塞。//访问golang.org/issue/17953可以看到引入secondarylist之前性能很差的一个程序样本,test/locklinear.go//有一个测试复现了这个样本typesemaRootstruct{lockmutextreap*sudog//rootofbalancedtreeofuniquewaiters。nwaituint32//Numberofwaiters.Readw/othelock.}//Primetonotcorrelatewithanyuserpatterns.constsemTabSize=251varsemtable[semTabSize]struct{rootsemaRootpad[sys.CacheLineSize-unsafe.Sizeof(semaRoot{})]root3Root*2*embyte*2addfun{return&semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root}┌──────┬──────┬──────┬────────┬──────┬────────────────────────────┬──────┐│0│1│2│3│4│.....│250│└──────┴────────┴──────┴──────┴───────────────────────────────────────────────────────────────────────────────┴──────┘│││└──┐└─┐┐│││▼▼┌──────────┐┌──────────┐│结构││结构│├───────────────────────────────────────────────────────────────────────────────────────────────────┐│rootsemaRoot│──┐│rootsemaRoot│──┐├─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤││垫│││垫│└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────│:──────────────┘┌────────────────┘││││▼▼┌────────────┐┌────────────┐│semaRoot││semaRoot│├────────────┴──────────┐├────────────┴──────────┐│lockmutex││lockmutex│├──────────────────────────────┤├──────────────────────┤│treap*sudog││treap*sudog│├────────────────────────────┤├──────────────────────┤│nwaituint32││nwaituint32└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘树结构:┌──────────┐┌──┬──?│sudog│││├──────────────┴──────────────┐┌──────────────────────────────────┼──┼──│prev*sudog││││├────────────────────────────┤││││next*sudog│──────┐││├────────────────────────────┤││││parent*sudog││││├────────────────────────┤│││││elemunsafe.Pointer│││├──────────────────────────────────────────────────┤│││││ticketuint32│││││└──────────────────────────┘│││││││││││││││││││▼││▼┌────────────┐││┌──────────────┐│sudog││││sudog├──────────────┴──────────────┐││├────────────┴────────────────────┐┐│prev*sudog││││prev*sudog│├────────────────────────────────────────────────┤││├──────────────────────────────┤│下一个*sudog││││下一个*sudog│├────────────────────────────────────────┤││├────────────────────────────┤│parent*sudog│────┘└────────────────────────────────│parent*sudog│├──────────────────────────────────────────────────────────────────────────────────────────────────────────────┤│elemunsafe.Pointer││elemunsafe.Pointer│├──────────────────────────────────────────────────────────────────────────────────┤│ticketuint32││ticketuint32│└──────────────────────────────────────────────────────────────────────────────────────────────┘在这个treap结构体中,从elem(其实就是lock的addr)来看,这个结构体就是一个二叉查找树。从ticket的角度来看,整个结构就是一个小顶堆。这就是为什么它被称为陷阱。同样的addr,也就是g锁在同一个mutex上,会阻塞在同一个地址上。这些阻塞在同一个地址上的goroutines会被打包到sudog中,形成一个链表。使用sudog的waitlink连接:┌────────────┐┌────────────┐┌──────────────┐│sudog│┌───────?│SUDOG│┌────────────│Sudog│──────────────────────┐─────────────────────────────────────────────────────────────────────────────────────奥加西尼──┴────────────────┐┐├────────────┴──────────────────────────────────────────┐┐┐│waitlink*sudog│──────┘│waitlink*sudog│────────┘│waitlink*sudog│├──────────────────────────────────┤├──────────────────────────────┤├──────────────────────────────────────────────────┤│waittail*sudog││waittail*sudog││waittail*sudog│└─────────────────────────────────────────────────────────────────────────────────────────────────────┘└──────────────────────────┘中间元素的waittail会指向最后一个元素:┌────────────┐│sudog│├────────────┴──────────────┐│waitlink*sudog│├──────────────────────────┤│waittail*sudog│——————————————————————————————————————————————————───────────────────────────────────────────────────────────────┌│──────────────────┐│Sudog│├──────────────────────┐││waitlink*sudog││├──────────────────────────────────────┤││waittail*sudog│──────────────────────────────┤└──────────────────────────────────────────────────────────────────────────────────┐│sudog│├──────────────┴────────────────┐│waitlink*sudog│├────────────────────────────┤│waittail*sudog│└─────────────────────────────────────────────────────────────────────────────────────────────────────┘对外封装sema.go中实现的内容,使用go:linkname导出sync和poll库使用,链接期间也做一些tricks/poll.runtime_Semacquirefuncpoll_runtime_Semacquire(addr*uint32){semacquire1(addr,false,semaBlockProfile)}//go:linknamesync_runtime_Semreleasesync.runtime_Semreleasefuncsync_runtime_Semrelease(addr*uint32,handoffbool){semrelease1(addr,handoff)}//go:linknamesync_runtime_SemacquireMutexsync.runtime_SemacquireMutexfuncsync_runtime_SemacquireMutex(addr*uint32,lifobool){semacquire1(addr,lifo,semaBlockProfile|semaMutexProfile)}//go:linknamepoll_runtime_Semreleaseinternal/poll.runtime_Semreleasefuncpoll_runtime_Semrelease(addr*uint32){semrelease(addr)}实现sem本身常说的支持acquire和release,在其实就是OSP操作和V操作公共部分funccansemacquire(addr*uint32)bool{for{v:=atomic.Load(addr)ifv==0{returnfalse}ifatomic.Cas(addr,v,v-1){returntrue}}}获取过程类型semaProfileFlagsintconst(semaBlockProfilesemaProfileFlags=1<0{blockevent(s.releasetime-t0,3)}releaseSudog(s)}发布过程addr,1)//低成本情况:没有waiter?//这个原子检查必须在xadd之前发生,以避免误唤醒//(详见semacquire中的循环)ifatomic.Load(&root.nwait)==0{return}//高成本情况:搜索waiter并唤醒它lock(&root.lock)ifatomic.Load(&root.nwait)==0{//count值已经被另一个goroutine消费了//所以我们不需要唤醒其他goroutineunlock(&root.lock)return}s,t0:=root.dequeue(addr)ifs!=nil{atomic.Xadd(&root.nwait,-1)}unlock(&root.lock)ifs!=nil{//可能会很慢,所以先解锁off&&cansemacquire(addr){s.ticket=1}readyWithTime(s,5)}}funcreadyWithTime(s*sudog,traceskipint){ifs.releasetime!=0{s.releasetime=cputicks()}goready(s.g,traceskip)}treap结构sudog根据地址hash到251个bucket中的一个,每个bucket都是一个treap并且同一个sudog上addr将形成一个链表。为什么同一个地址的sudog不需要扩容放到treap中呢?显然,当sudog唤醒时,block在同一个addr上的goroutines说明他们都加了同一个锁,这些goroutines在被唤醒的时候必须一起被唤醒。是的,同地址的g不需要查找,只要判断是高级队列(fifo)的唤醒还是后向队列(lifo)的唤醒即可.//队列函数会将s添加到semaRoot上被阻塞的goroutines中//其实就是将s添加到其地址对应的treap中func(root*semaRoot)queue(addr*uint32,s*sudog,lifobool){s.g=getg()s.elem=unsafe.Pointer(addr)s.next=nils.prev=nilvarlast*sudogpt:=&root.treapfort:=*pt;t!=nil;t=*pt{ift.elem==unsafe.Pointer(addr){//Alreadyhaveaddrinlist.iflifo{//在treap中,在t的位置用s覆盖t*pt=ss.ticket=t.tickets.acquiretime=t.acquiretimes.parent=t。父母。prev=t.prevs.next=t.nextifs.prev!=nil{s.prev.parent=s}ifs.next!=nil{s.next.parent=s}//把t放到s的waitlist中第一个位置s.waitlink=ts.waittail=t.waittailifs.waittail==nil{s.waittail=t}t.parent=nilt.prev=nilt.next=nilt.waittail=nil}else{//把s是添加到t的等待列表的末尾(addr))s.ticket{ifs.parent.prev==s{root.rotateRight(s.parent)}else{ifs.parent.next!=s{panic("semaRootqueue")}根。rotateLeft(s.parent)}}}//dequeue会在addr地址的semaRoot中寻找第一个阻塞的goroutine//如果这个sudog需要profile,dequeue会返回它被唤醒的时间(现在),否则现在是0func(root*semaRoot)dequeue(addr*uint32)(found*sudog,nowint64){ps:=&root.treaps:=*psfor;s!=nil;s=*ps{ifs.elem==unsafe.指针(地址){gotoFound}ifuintptr(unsafe.Pointer(addr))