本文转载自微信公众号《Golang来了》,作者Seekload。本文转载请联系Golang公众号。四哥水平有限,如有翻译或理解错误,请帮忙指出,谢谢!今天给大家分享一篇关于workPool的文章,大家应该经常用到,一起来看看吧。原文如下:workerpool就是这样一个池子,创建指定数量的worker,这些worker可以获取任务并进行处理。允许同时处理多个任务,但需要维护固定数量的worker,避免过度使用系统资源。创建任务池通常有两种方式:一种是预先创建固定数量的worker;另一种是在需要的时候创建worker,当然会有数量限制;本文将与您讨论第一种方式。这种方式通常用在我们事先知道有很多任务需要同时运行,并且大概率会使用最大worker数量的情况下。为了演示,我们首先创建Worker结构,它获取任务并执行它们。import("fmt")//Worker...typeWorkerstruct{IDintNamestringStopChanchanbool}//开始...func(w*Worker)Start(jobQueuechanJob){w.StopChan=make(chanbool)successChan:=make(chanbool)gofunc(){successChan<-truefor{//takejobjob:=<-jobQueueifjob!=nil{job.Start(w)}else{fmt.Printf("worker%stobestopped\n",w.Name)w.StopChan<-truebreak}}}()//waitfortheworkertostart<-successChan}//Stop...func(w*Worker)Stop(){//waitfortheworkertostop,blocking_=<-w.StopChanfmt.Printf("worker%sstopped\n",w.Name)}Worker有一些属性用来保存当前状态,同时也声明了两个分别启动和停止worker的方法。在Start()方法中,创建了两个通道,分别用于worker的启动和停止。最重要的是在for循环中,worker会等待job执行完毕,直到任务队列关闭。Job是一个包含单一方法Start()的接口,因此只要实现Start()方法就可以拥有不同类型的作业。//Job...typeJobinterface{Start(worker*Worker)error}一旦确定了Worker,下一步就是创建一个pool来管理worker。import("fmt""sync")//Pool...typePoolstruct{NamestringSizeintWorkers[]*WorkerQueueSizeintQueuechanJob}//Initiualize...func(p*Pool)Initialize(){//maintainminimum1workerifp.Size<1{p.Size=1}p.Workers=[]*Worker{}fori:=1;i<=p.Size;i++{worker:=&Worker{ID:i-1,Name:fmt.Sprintf("%s-worker-%d",p.Name,i-1),}p.Workers=append(p.Workers,worker)}//maintainminqueuesizeas1ifp.QueueSize<1{p.QueueSize=1}p.Queue=make(chanJob,p.QueueSize)}//开始...func(p*Pool)Start(){for_,worker:=rangep.Workers{worker.Start(p.Queue)}fmt.Println("allworkersstarted")}//停止...func(p*Pool)Stop(){close(p.Queue)//关闭队列通道varwgsync.WaitGroupfor_,worker:=rangep.Workers{wg.Add(1)gofunc(w*Worker){deferwg.Done()w.Stop()}(worker)}wg.Wait()fmt.Println("allworkersstopped")}Pool包含worker切片和一个用于保存作业的队列。worker的数量可以在初始化时自定义。关键是Stop()的逻辑。调用时会先关闭作业队列,worker会从作业队列中读取nil,然后关闭对应的worker。然后在for循环中,等待worker并发停止,直到最后一个worker停止。为了演示整体逻辑,下面的示例显示了一个仅输出值的作业。import"fmt"funcmain(){pool:=&Pool{Name:"test",Size:5,QueueSize:20,}pool.Initialize()pool.Start()deferpool.Stop()fori:=1;i<=100;i++{job:=&PrintJob{Index:i,}pool.Queue<-job}}//PrintJob...typePrintJobstruct{Indexint}func(pj*PrintJob)Start(worker*Worker)error{fmt.Printf("job%s-%d\n",worker.Name,pj.Index)returnnil}如果你看了上面的代码逻辑,你会发现很简单,创建一个有5个worker的workpool和jobqueue。size为20,接下来模拟job的创建和处理过程:job一旦创建,就会被push到任务队列中,等待的worker会从队列中取出job并进行处理。类似下面这样的输出:allworkersstartedjobtest-worker-3-4jobtest-worker-3-6jobtest-worker-3-7jobtest-worker-3-8jobtest-worker-3-9jobtest-worker-3-10jobtest-worker-3-11jobtest-worker-3-12jobtest-worker-3-13jobtest-worker-3-14jobtest-worker-3-15jobtest-worker-3-16jobtest-worker-3-17jobtest-worker-3-18jobtest-worker-3-19jobtest-worker-3-20workertest-worker-3tobestoppedjobtest-worker-4-5jobtest-worker-0-1workertest-worker-3stoppedjobtest-worker-2-3workertest-worker-2tobestoppedworkertest-worker-2stoppedworkertest-worker-4tobestoppedworkertest-worker-4stoppedworkertest-worker-0tobestoppedworkertest-worker-0stoppedjobtest-worker-1-2workertest-worker-1tobestoppedworkertest-worker-1stoppedallworkersstoppedvia:https://www.pixelstech.net/article/1611483826-Demo-on-creating-worker-pool-in-GoLang作者:sonic0002
