(通信顺序过程)是指独立并发实体之间单独的通信管道(例如频道)之间的通信模型,并且不同语言中有不同的并发模型。
Java和C ++的并发模型通过实现实现。非常典型的方法是在访问共享数据(例如数组,地图等)时访问共享内存,因此得出了许多。
Golang借用了CSP模型的一些概念作为并发模型的理论支持。
不要通过共享记忆进行通信,而要通过通信实现内存共享。
在Golang,作为一个独立的并发实体,作为不同实体之间数据通信的管道。
本文主要在Golang中介绍
通道主要分为两种类型的通道:= make(通道,1)8两个通道之间的最大区别是有一个缓冲区通道是一个模型,而否延迟通道是模型。
当制作指定LEN为0时,它也是一个缓冲通道
这意味着通道只能读取或只能写入通道,通常使用频道
func main(){// ...开始背景goroutine ...
循环:for {select {case size,ok:= = <-fileSizes: if !ok { break loop // fileSizes was closed } nfiles++ nbytes += size case <-tick: printDiskUsage(nfiles, nbytes) } } printDiskUsage(nfiles, nbytes) // final totals }
type hchan struct { qcount ? uint ? ? ? ? ? // channel中元素个数 dataqsiz uint ? ? ? ? ? // channel循环队列的长度 ,make channel中的len属性 ,即缓冲区大小 buf ? ? ?unsafe.Pointer // channel缓冲区数据指针; elemsize uint16 ? ? ? ? // ?channel元素的大小,是 elem元数据类型的大小 closed ? uint32 elemtype *_type // ?sendx ? ?uint ? // channel的send操作处理到的位置; recvx ? ?uint ? // channel的recv操作处理到的位 recvq ? ?waitq ?// recv 等待队列(即 <- channel ) sendq ? ?waitq ?// send 等待队列(即 channel <- )
lock mutex }
type waitq struct { first sudog last ?sudog }
type sudog struct { g *g // 指向goroutine结构题
next sudog // 前sudog prev sudog // 后sudog
}
func makechan(t chantype, size int) hchan { elem := t.elem
if elem.size >= 1<<16 { throw("makechan: invalid channel element type") }
// 内存对齐相关 if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 计算需要分配的内存大小 mem, overflow := math.MulUintptr(elem.size, uintptr(size))
// 检查内存大小是否超过系统限制 && 是否堆内存溢出 ?if overflow || mem > Maxalloc-Hchansize ||尺寸 < 0 { panic(plainError("makechan: size out of range")) }
// 当buf不包含指针类型时,那么会为channel和底层数组分配一段连续的内存空间 // sodog会从其拥有的线程中引用该对象,因此该对象无法被gc收集(不会被gc回收)
var c hchan switch { case mem == 0: // 无缓冲区channel c = (hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // buf中元素不包含指针 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // buf中元素包含指针,为hchan和buf分配内存 c = new(hchan) c.buf = mallocgc(mem, elem, true) }
c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) return c }
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock()
// 1. 如果recvq队列中有等待者,直接进入send方法 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
if c.qcount < c.dataqsiz { // 计算下一个可以存储数据的位置 qp := chanbuf(c, c.sendx) // 参数 ep 放入上一步计算的 qp 对应的位置上 typedmemmove(c.elemtype, qp, ep) // 更新send index && qcount ?c.sendx++ // 环形队列 if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
if !block { unlock(&c.lock) return false }
// 3. 阻塞channel, 直到新的接收者从channel中读数据 // 获得当前运行的goroutine指针 gp := getg() // 分配sudog mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 }
// dosomething
// 当前sudog入发送队列 c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1) // gopark ,goroutine变为 gwaiting 状态 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 为确保往channel里发送的数据不被gc回收,sodog一直引用该对象 KeepAlive(ep)
// dosomething
// 释放sudog releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true }
func send(c hchan, sg sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 1. 当我们从空channel读数据,会调用gopark让出当前处理器占用 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
if !block && empty(c) { if atomic.Load(&c.closed) == 0 { return } if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } lock(&c.lock)
// 2. 当前channel已经被关闭并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回。 if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
// 3. 如果发送队列中有goroutine被阻塞, if sg := c.sendq.dequeue(); sg != nil { // 调用recv方法 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }
// 4. 如果channel缓冲区中有数据,直接从缓冲区中读数据 if c.qcount > 0 { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) // 更新 recv索引 和 环形队列长度 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
if !block { unlock(&c.lock) return false, false } // 5. 没有阻塞的发送者 && channel缓冲区为空 ,阻塞当前goroutine gp := getg() mysg := acquireSudog()
// dosomething
// 将当前sudog压入channel的接收队列 c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// gopark 让出处理器使用权 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 {blockevent(mysg.releasetime-t0,2)}成功:= mysg.success gp.param = nil mysg.c = nil releseSudog(mysg)
// func recv(c hchan,sg sudog,ep unsafe.pointer,unlockf func(),skip int){如果c.dataqsiz == 0 {// chinder nif ep!= nil {//发送Chanel发送chaannelthe在队列中存储在goroutine中的数据被复制到recvdirect(c.Elemtype,sg,ep)}}} else {// e {//带有缓冲室chaannel //;将发送到队列头部的数据复制到缓冲区,释放一个阻止发件人qp:= chanbuf(c,c.recvx)如果EP!= nil {//将队列中的数据复制到接收器的内存地址typedMemmove(c.Elemtype,ep,Qp)} //将发送队列以将头部数据复制到缓冲区中,并将一个封锁复制发件人typedmemmove(c.Elemtype,qp,sg.elem)c.rcvx ++如果c.rcvx == c.rcvx = 0} c.sendx = c.recvx} sg.elem = nil gp:= nil gp:= sg.g unlockf()gp.param = unsafe.pointer(sg)sg.success = true如果sg.releasetime!= 0 {sg.Releasetime = cputicks()} /) /当前处理器的runnext设置为发送数据的goroutine。调度程序计划时,阻止发件人唤醒了Goready(GP,Skip+1)}
数据,确定:= <-CH //阻止
func forrange(){ch:= make(channel,1)go read(ch)go Write(ch)
time.sleep(time.second)log.println(“宿舍1s”)
去更多(ch)时间。
func Write(ch channel){for i:= 0;我<1;i ++ {ch <-i log.printf(“ send:[%d]”,i)break} func read(ch channel){for {data,ok ok ok ok = <-Ch // blocking nog ok oking {log {log {log。printf(“ recv:[%d]”,data} else {log.println(“ channel close”)break}}}}}}}}}}}}}}}}}}}}}}
func readv2(ch channel){{select {case data,ok ok:= <-CH:如果ok {log.printf(“ recv:[%v]”,data} else {log.printf(“ ok-false”(“ ok-false”))}默认值:log.println(“ int-default”)}}}}}}}}}}
func写入(ch chan int,times int){for i:= 0;我<时代;i ++ {ch <-i log.printf(“ send:[%d]”,i)break} func读取(ch channelint){for {data,ok ok ok = <-Ch // blocking如果ok ok {log {log {log {log.printf(“ recv:[%d]”,data)} else {log.println(“ channel close”)break}}}}}}}}}}}}}
原始:https://juejin.cn/post/7097617352740569102