概述相信大家在开发过程中经常会用到Go中的并发工具通道。Channel是CSP并发模型中最重要的组件。两个独立的并发实体通过共享通信通道进行通信。大多数人只是使用这样的结构,很少有人讨论它的底层实现。这篇文章讲的是channel的底层实现。channelchannel的底层实现是一个结构体,源码如下:typehchanstruct{qcountuint//队列中的总数据dataqsizuint//循环队列的大小bufunsafe.Pointer//指向一个dataqsize的数组elementselemsizeuint16closeduint32elemtype*_type//元素类型sendxuint//发送索引recvxuint//接收索引recvqwaitq//recv等待者列表sendqwaitq//发送等待者列表//锁定保护hchan中的所有字段,因为以及sudogs中的几个//字段在此通道上被阻止。////在持有此锁的同时不要更改另一个G的状态//(特别是不要准备G),因为这可能会死锁//堆栈收缩。lockmutex}源码可能不是很容易看懂。这里我亲自画了一张图给大家看看。我在上面标记了不同的颜色并评论了它的功能。通道就像传送带或队列。它始终遵循先进先出的规则,保证发送和接收数据的顺序。channel是goroutines之间重要的通信方式,具有并发性和安全性。bufhchan结构体中的buf指向一个循环队列,用来实现循环队列,sendx是循环队列的尾指针,recvx是循环队列的队头指针,dataqsize是缓存通道的大小,qcount为记录通道数中的元素个数。在日常开发过程中,最常使用的方法是ch:=make(chanint,10)来创建通道。如果需要声明初始化,这个通道有一个buffer,也就是图中紫色的buf。buf是程序在make时创建的,它有元素大小*元素个数组成一个循环队列,可以看成是一个环形结构,buf是指向这个环形的指针。上图对应的代码是ch=make(chanint,6),buf指向环在堆上的地址。funcmakechan(t*chantype,sizeint)*hchan{elem:=t.elem//编译器会检查这个但要安全。ifelem.size>=1<<16{throw("makechan:invalidchannelelementtype")}ifhchanSize%maxAlign!=0||elem.align>maxAlign{throw("makechan:badalignment")}mem,overflow:=math.MulUintptr(elem.size,uintptr(size))如果溢出||内存>maxAlloc-hchanSize||size<0{panic(plainError("makechan:sizeoutofrange"))}//当buf中存储的元素不包含指针时,Hchan不包含GC感兴趣的指针。//buf指向相同的分配,elemtype是持久的。//SudoG是从其拥有的线程中引用的,因此无法收集它们。//TODO(dvyukov,rlh):重新思考收集器何时可以移动分配的对象。varc*hchanswitch{casemem==0://队列或元素大小为零。c=(*hchan)(mallocgc(hchanSize,零,true))//竞争检测器使用此位置进行同步。c.buf=c.raceaddr()caseelem.ptrdata==0://元素不包含指针。//在一次调用中分配hchan和buf。c=(*hchan)(mallocgc(hchanSize+mem,nil,true))c.buf=add(unsafe.Pointer(c),hchanSize)default://元素包含指针。c=new(hchan)c.buf=mallocgc(mem,elem,true)}c.elemsize=uint16(elem.size)c.elemtype=elemc.dataqsiz=uint(size)lockInit(&c.lock,lockRankHchan)ifdebugChan{print("makechan:chan=",c,";elemsize=",elem.size,";dataqsiz=",size,"\n")}returnc}上面是对应的代码实现,它会检查你这一系列参数是否合法,然后通过mallocgc在内存中开辟这块空间,然后返回sendx&recvx接下来我手动模拟一个环的实现代码://队列循环buffertypeCycleQueuestruct{data[]interface{}//一个存储元素的数组,准确的说是切片frontIndex,rearIndexint//frontIndex头指针,rearIndexTailpointersizeint//循环的大小}//NewQueueCircularQueuefuncNewQueue(sizeint)(*CycleQueue,error){ifsize<=0||size<10{returnnil,fmt.Errorf("初始化循环队列大小失败,%d不合法,size>=10",size)}cq:=new(CycleQueue)cq.data=make([]interface{},size)cq.size=sizereturncq,nil}//Pushadddatatoqueuefunc(q*CycleQueue)Push(valueinterface{})error{if(q.rearIndex+1)%cap(q.data)==q.frontIndex{returnerrors.New("循环队列已满")}q.data[q.rearIndex]=valueq.rearIndex=(q.rearIndex+1)%cap(q.data)returnnil}//pop返回队列afrontelementfunc(q*CycleQueue)Pop()interface{}{ifq.rearIndex==q.frontIndex{returnnil}v:=q.data[q.frontIndex]q.data[q.frontIndex]=nil//移除元素的位置并设置为空q.frontIndex=(q.frontIndex+1)%cap(q.data)returnv}循环队列一般采用freeunit的方式来解决font=rear在队空队满队时造成的歧义问题,但是这样会浪费一个unit。在golang的channel中,增加了qcount字段记录队列长度,解决歧义。一方面,它不会浪费存储单元。另一方面,在使用len函数查看队列长度时,可以直接返回qcount字段,一石二鸟。当我们需要读取数据时,直接从recvx指针上的元素中获取,从sendx位置写入元素,如图:sendq&recvq写数据时,如果buffer已满或者buffer为read没有更多数据时,会发生协程阻塞。如果写阻塞,当前协程会被加入到sendq队列中,直到一个recvq发起读操作,然后写队列才会被程序唤醒工作。当缓冲区满时,会将所有g-w加入sendq队列等待g-r操作,然后唤醒g-w继续工作。这种设计非常接近操作系统中线程的五种状态。可见go的设计者是在你可能参考了操作系统的线程设计。当然,以上只是对整个过程的简单描述。其实go也优化了其他的细节。当sendq不为空且没有buffer时,即没有buffer通道。这时候会从sendq的第一个协程开始取数据,有兴趣的gophers可以自行去查看源码。这篇文章也是作者最近看到这段源码的笔记总结。点击关注还没有关注的请点击关注!持续更新中...如果需要更多,可以关注我的同名微信公众号,分享一些Rust、Golang、SystemDesign相关的内容。
