当前位置: 首页 > 科技观察

教你Golang的协程池设计

时间:2023-03-22 13:17:56 科技观察

本文转载自微信公众号《程序员小凡》,作者范米莉。转载本文请联系程序员小凡公众号。前言许多公司都在陆续构建golang语言栈。你有没有想过为什么会这样?一是因为go更适合做中间件,二是go有更好的并发支持,也就是我们通常所说的高并发,并发支持离不开协程。当然协程也不是乱用的。他们需要管理。管理协程的方式就是协程池,所以协程池并没有那么神秘。今天我们就来一步步揭开协程池的面纱。没接触过go的协程也没关系,我尽量写的详细点。Goroutine(协程)先看一个简单的例子funcgo_worker(namestring){fori:=0;i<5;i++{fmt.Println("Mynameis",name)time.Sleep(1*time.Second)}fmt.Println(name,"Completed")}funcmain(){go_worker("123")go_worker("456")fori:=0;i<5;i++{fmt.Println("我是主")time.Sleep(1*time.Second)}}当我们执行这段代码的时候,当然是执行go_worker("123")->go_worker("456")->我是主要执行输出如下我的名字是123我的名字是123我的名字是123我的名字是123我的名字是123123我的名字是456我的名字是456我的名字是456我的名字是456我的名字是456456我是主要的。我是主要的。我是主要的。我是主要的。我是主要的。这种执行是并行的,也就是说,一个任务执行完之后,才会开始下一个任务。如果某个任务比较慢的话,整个程序的效率可想而知,但是在go语言中,是支持协程的,所以我们可以修改上面的代码funcgo_worker(namestring){fori:=0;i<5;i++{英尺。Println("Mynameis",name)time.Sleep(1*time.Second)}fmt.Println(name,"执行完成")}funcmain(){gogo_worker("123")//协程gogo_worker("456")//coroutinefori:=0;i<5;i++{fmt.Println("Iammain")time.Sleep(1*time.Second)}}我们在不同的go_workers前面加了一个go,所以异步序列化所有任务,输出如下我的名字是456我的名字是123我的名字是123我是主要的我的名字是456我是主要的我的名字是456我的名字是123我是主要的我的名字是456我的名字是123我的名字是456我的名字是123我是main可以看到,这样一来,各个任务各干各的,互不影响,效率也大大提高了。这是goroutinechannel(pipeline)有了coordination之后,会带来一个新的问题,协程之间如何通信?于是引入了管道的概念。流水线其实很简单。无非是放入数据,取出数据。funcworker(cchanint){num:=<-c//读取管道中的数据,并输出fmt.Println("receivedparameterc:",num)}funcmain(){//创建channel,需要执行pipeline数据类型,这里我们是intc:=make(chanint)//开一个协程执行worker函数goworker(c)c<-2//write2fmt.Println("main")}到pipeline中,我们可以看到上面的例子,在main函数中,我们定义了一个pipeline,它是int类型的,往里面写了一个2,然后在worker中读取pipelinec,就可以得到由2个协程。golang中既然开启了协程,那么方便,那有没有坑呢?我们可以看看上图。在实际业务中,不同的业务开启不同的goroutine来执行,但是在cpu的微观层面,是串行执行一条指令的,只是执??行速度很快而已。如果指令过多,cpu的切换会增加,在切换过程中会消耗性能。所以协程池的主要作用就是管理goroutines,限制goroutines的数量。协程池的实现首先请求不同的任务,直接写入entryChannel,entryChannel与jobsChannel建立通信,然后我们固定开启三个协程(不一定三个,就用三个例子),固定从jobsChannel中读取数据给任务加工其实从本质上来说,通道就是一座桥梁,起到了中转的作用。我们之所以设计一个jobsChannel和entryChannel是为了解耦。entryChannel完全可以作为入口使用,jobsChannel可以做任务优先级或者加锁等更深入的任务,解锁等处理代码实现原理都清楚了,接下来我们详细看代码实现。首先,让我们处理任务任务。任务无非就是业务中的各种任务。他们需要能够得到加强和执行。代码如下//定义任务类型,每个任务Task可以抽象成一个函数类型Taskstruct{ffunc()error//一个任务必须包含具体的业务}//创建一个TaskfuncNewTask(arg_ffunc()error)*Task{t:=throughNewTaskTask{f:arg_f,}return&t}//Task还需要一个方法来执行业务func(t*Task)Execute(){t.f()//调用task中已经绑定的业务方法}next,let'sdefineCoroutinepool//定义池类型typePoolstruct{EntryChannelchan*TaskWorkerNumintJobsChanelchan*Task}//创建协程池funcNewPool(capint)*Pool{p:=Pool{EntryChannel:make(chan*Task),JobsChanel:make(chan*Task),WorkerNum:cap,}return&p}协程池需要创建一个worker,然后不断从JobsChannel的内部任务队列中取任务开始工作//协程池创建一个worker并开始工作func(p*Pool)worker(workerIdint){//worker不断从JobsChannel的内部任务队列中取taskfortask:=rangep.JobsChanel{task.Execute()fmt.Println("workerId",workerId,"执行任务成功")}}EntryChannel获取Task任务func(p*Pool)ReceiveTask(t*Task){p.EntryChannel<-t}//让协程池开始工作func(p*Pool)Run(){//1:首先根据协程池的worker数量开固定数量的workerfori:=0;i