当前位置: 首页 > 科技观察

浅谈Go流水线编程模式

时间:2023-03-17 01:34:51 科技观察

文末,本文转载自微信公众号《Golang技术分享》,作者:机器贝尔斩波。转载本文请联系Golang技术分享公众号。流水线工作模式在工业领域非常普遍。它将工作过程划分为多个环节,并根据工作强度为每个环节安排合适的人员数量。好的流水线设计力求平衡各环节的循环速度,使生产效率最大化。Go是一门实用的语言,管道工作模型与Go结合得很好,但我们通常用另一个词来表示管道:pipeline。pipeline管道是由多个环节组成的。具体来说,在Go中,链接通过通道进行通信,同一个链接任务可以同时被多个goroutine处理。pipeline流水线的核心是数据,通过channel保证数据的流动,各个环节的数据处理都由goroutine完成。除了起始阶段和结束阶段,每个阶段都有任意数量的输入通道和输出通道。开始阶段称为发送者或生产者,结束阶段称为接收者或消费者。我们来看一个简单的流水线例子,分为三个环节。第一个环节,generate函数:它充当生产者,将数据写入通道,并返回通道。写入所有数据后,关闭通道。funcgenerate(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}第二个环节,square函数:是数据处理的作用。它从起始链接的通道中获取数据,计算平方,将结果写入新通道,并返回新通道。计算完所有数据后,关闭新通道。funcsquare(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}main函数负责整理整个管道,并作为消费者:读取第二个链路的通道数据并打印出来。funcmain(){//Setupthepipeline.c:=generate(2,3)out:=square(c)//Consumetheoutput.forn:=rangeout{fmt.Println(n)}}上面的扇出,扇入In该示例中,数据通过非缓冲通道在链接之间传输,节点中的数据由单个goroutine处理和消费。这种工作模式效率不高,整个流水线的效率取决于最慢的环节。因为每个环节的任务量是不一样的,这就意味着我们需要的机器资源是不一样的。对于任务量小的环节,尽量占用少量的机器资源,对于任务量大的环节,需要更多的线程进行并行处理。以汽车装配为例,我们可以把装配轮胎的工作分配给4个人一起工作,当轮胎装配好后,再交给剩下的工序。多个goroutine可以从同一个通道读取数据,直到通道关闭,这称为扇出。这个名字比较形象,它把数据分散了,所以叫fan-out。扇出是一种分配任务的模式。扇出一个goroutine可以从多个输入通道读取,直到所有输入都关闭。具体做法是将输入通道多路复用到同一个通道上,当所有输入通道都关闭时,该通道也关闭,称为扇入(fan-in)。它聚合数据,因此称为扇入。扇入是一种整合任务结果的模式。fan-in以汽车组装为例,将轮胎任务分配给大家就是Fan-out,合并轮胎组装结果就是Fan-in。信道的复用扇出编码模型比较简单。本文不多研究,提供一个fan-in编程例子。创建一个生成器函数generate,通过interval参数控制消息生成的频率。生成器返回消息通道mc和停止通道sc,用于停止生成器任务。funcgenerate(messagestring,intervaltime.Duration)(chanstring,chanstruct{}){mc:=make(chanstring)sc:=make(chanstruct{})gofunc(){deferfunc(){close(sc)}()for{select{case<-sc:returndefault:time.Sleep(interval)mc<-message}}}()returnmc,sc}stopGenerating函数通过向sc传递一个空结构体来通知generate退出,并调用close(mc)关闭MessagechannelfuncstopGenerating(mcchanstring,scchanstruct{}){sc<-struct{}{}close(mc)}多路复用函数multiplex创建并返回集成消息通道并控制并发的wg。funcmultiplex(mcs...chanstring)(chanstring,*sync.WaitGroup){mmc:=make(chanstring)wg:=&sync.WaitGroup{}for_,mc:=rangemcs{wg.Add(1)gofunc(mcchanstring,wg*sync.WaitGroup){deferwg.Done()form:=rangemc{mmc<-m}}(mc,wg)}returnmmc,wg}在main函数中,创建两个消息通道,复用生成mmc,打印Every来自mmc的消息。此外,我们还实现了接收系统中断信号的优雅关闭机制(在终端执行CTRL+C发送中断信号)。funcmain(){//createtwosamplemessageandstopchannelsmc1,sc1:=generate("messagefromgenerator1",200*time.Millisecond)mc2,sc2:=generate("messagefromgenerator2",300*time.Millisecond)//multiplexmessagechannelsmmc,wg1:=multiplex(mc,mc2)//createerrschannelforgracefulshutdownerrs:=make(chanerror)//waitforinterruptorterminatesignalgofunc(){sc:=make(chanos.Signal,1)signal.Notify(sc,syscall.SIGINT,syscall.SIGTERM)errs<-fmt.Errorf("%ssignalreceived",<-sc)}()//waitformmultiplexedmessageswg2:=&sync.WaitGroup{}wg2.Add(1)gofunc(){deferwg2.Done()form:=rangemmc{fmt.Println(m)}}()//waitforerrorsiferr:=<-errs;err!=nil{fmt.Println(err.Error())}//stopgeneratorsstopGenerating(mc1,sc1)stopGenerating(mc2,sc2)wg1.Wait()//closemultiplexedmessageschannelclose(mmc)wg2.Wait()}总结本文简单介绍了流水线编程模型,它与我们熟悉的生产者-消费者模型非常相似。具体在Go编程实践中,pipeline将数据流分成多个环节,channel用于数据流转,goroutine用于处理数据。扇出用于分发任务,扇入用于数据集成。FAN模式可以让流水线更高并发。当然,还有一些细节需要注意,比如停止通知机制,可以参考本文通道复用章节例子中的stopGenerating函数;如何通过sync.WaitGroup做好并发控制,这些都需要读者在实际编码中去体会才能掌握。参考Go并发模式:管道和取消:https://go.dev/blog/pipelines在Go中多路复用通道:https://medium.com/@ermanimer/multiplexing-channels-in-go-a7dccdcc4134