ofConcurrentProgrammingPackage转载请联系Golang公众号。四哥的水平有限。如有翻译或理解上的错误,请大家帮忙指出,谢谢!这是该系列的第二篇文章。点击此处查看第一篇文章。原文如下:基于goroutine和channel的并发特性,Go成为一门强大的并发语言。在上一篇文章中,我们讨论了如何构建workerPool来提高程序的并发性能,也就是避免耗尽系统资源。但这只是我们应该如何做的一个简单例子。基于我们从上一篇文章中学到的知识,在本文中,我们将构建一个可以在任何其他应用程序中使用的强大解决方案。网络上还有其他架构复杂的解决方案,比如使用调度器等。其实我们不需要这些复杂的设计,只需要使用共享通道就可以解决问题。让我们看看如何构建它?代码结构我们创建了一个通用的workerPool包,根据业务需要的并发度使用worker来处理任务。我们看一下目录结构:workerpool├──pool.go├──task.go└──worker.goworkerpool目录在项目的根目录下。任务是需要处理的单个工作单元;Worker是一个简单的worker函数,用于执行任务;而Pool用于创建和管理worker。先看Task代码://workerpool/task.gopackageworkerpoolimport("fmt")typeTaskstruct{ErrerrorDatainterface{}ffunc(interface{})error}funcNewTask(ffunc(interface{})error,datainterface{})*Task{return&Task{f:f,Data:data}}funcprocess(workerIDint,task*Task){fmt.Printf("Worker%dprocessestak%v\n",workerID,task.Data)task.Err=task.f(task.Data)}Task是一个简单的结构,它包含处理任务所需的所有数据。创建任务时,传入Data和要执行的函数f,process()函数会处理任务。在处理任务时,将Data作为参数传递给函数f,并将执行结果保存在Task.Err中。看看Worker是如何处理任务的://workerpool/worker.gopackageworkerpoolimport("fmt""sync")//WorkerhandlesalltheworktypeWorkerstruct{IDinttaskChanchan*Task}//NewWorkerreturnsnewinstanceofworkerfuncNewWorker(channelchan*Task,IDint)*Worker{return&Worker{ID:ID,taskChan:channel,}}//启动startstheworkerfunc(wr*Worker)Start(wg*sync.WaitGroup){fmt.Printf("Startingworker%d\n",wr.ID)wg.Add(1)gofunc(){deferwg.Done()fortask:=rangewr.taskChan{process(wr.ID,task)}}()}我们创建了一个小型Worker结构,其中包含workerID和一个用于保存待处理任务的通道。在Start()方法中,使用forrange从taskChan中读取任务并进行处理。可以想象多个工作人员可以并发执行任务。workerPool我们通过实现Task和Worker来处理任务,但是好像少了点什么,谁负责生成这些worker并向他们发送任务?答案是:WorkerPool。//workerpoo/pool.gopackageworkerpoolimport("fmt""sync""time")//PoolistheworkerpooltypePoolstruct{Tasks[]*Taskconcurrencyintcollectorchan*Taskwgsync.WaitGroup}//NewPoolinitializesanewpoolwiththegiventasksand//atthegivenconcurrency.func*concurrencyNewPool*(taskPool{return&Pool{Tasks:tasks,concurrency:concurrency,collector:make(chan*Task,1000),}}//Runrunsallworkwithinthepoolandblocksuntilit's//finished.func(p*Pool)Run(){fori:=1;i<=p.concurrency;i++{worker:=NewWorker(p.collector,i)worker.Start(&p.wg)}fori:=rangep.Tasks{p.collector<-p.Tasks[i]}close(p.collector)p.wg。Wait()}在上面的代码中,pool保存了所有挂起的任务,并生成了等于并发数的goroutines来并发处理任务。缓存通道——collector是worker之间共享的。因此,当我们运行这个workerpool时,我们可以根据需要生成任意多个worker,worker之间共享collectorchannel,然后使用forrange读取任务,将读取到的任务写入collector中。esync.WaitGroup实现协程之间的同步。现在我们有了一个不错的解决方案,让我们来测试一下。//main.gopackagemainimport("fmt""time""github.com/Joker666/goworkerpool/workerpool")funcmain(){varallTask??[]*workerpool.Taskfori:=1;i<=100;i++{task:=workerpool.NewTask(func(datainterface{})error{taskID:=data.(int)time.Sleep(100*time.Millisecond)fmt.Printf("Task%dprocessed\n",taskID)returnnil},i)allTask??=append(allTask??,task)}pool:=workerpool.NewPool(allTask??,5)pool.Run()}上面的代码创建了100个任务并以5个并发处理它们。输出如下:Worker3processestask98Task92processedWorker2processestask99Task98processedWorker5processestask100Task99processedTask100processedTook================>2.0056295s处理100个任务耗时2秒,我们如何将并发数增加到10,我们将看到它只大约需要1s。我们通过实现具有并发、错误处理、数据处理等功能的workerPool构建了一个健壮的解决方案。这是一个不耦合特定实现的通用包。我们可以用它来解决一些大问题。进一步扩展:后台处理任务其实我们可以进一步扩展上面的方案,让worker等待我们发布新的任务,在后台进行处理。为此,需要修改代码。Task结构保持不变,但Worker需要稍做改动。见如下代码://workerpool/worker.go//WorkerhandlesalltheworktypeWorkerstruct{IDinttaskChanchan*Taskquitchanbool}//NewWorker返回newinstanceofworkerfuncNewWorker(channelchan*Task,IDint)*Worker{return&Worker{ID:ID,taskChan:channel,quit:make(chanbool),}}....//StartBackgroundstartstheworkerinbackgroundwaitingfunc(wr*Worker)StartBackground(){fmt.Printf("Startingworker%d\n",wr.ID)for{select{casetask:=<-wr.taskChan:process(wr.ID,task)case<-wr.quit:return}}}//停止退出workerfunc(wr*Worker)Stop(){fmt.Printf("Closingworker%d\n",wr.ID)gofunc(){wr.quit<-true}()}Worker结构中增加了一个新的退出通道,并增加了两个新的方法。StartBackgorund()在for循环中使用select-case从taskChan队列中读取并处理任务,如果从quit读取到结束信号则立即返回。Stop()方法负责写入结束信号以退出。添加这两个新方法后,我们来修改Pool://workerpool/pool.gotypePoolstruct{Tasks[]*TaskWorkers[]*Workerconcurrencyintcollectorchan*TaskrunBackgroundchanboolwgsync.WaitGroup}//AddTaskaddsatasktothepoolfunc(p*Pool)AddTask(task*Task){p.collector<-task}//RunBackgroundrunsthepoolinbackgroundfunc(p*Pool)RunBackground(){gofunc(){for{fmt.Print("?Waitingfortaskstocomein...\n")time.Sleep(10*time.Second)}}()fori:=1;i<=p.concurrency;i++{worker:=NewWorker(p.collector,i)p.Workers=append(p.Workers,worker)goworker.StartBackground()}fori:=rangep.Tasks{p.collector<-p.Tasks[i]}p.runBackground=make(chanbool)<-p.runBackground}//Stopstopsbackgroundworkersfunc(p*Pool)Stop(){fori:=rangep.Workers{p.Workers[i].Stop()}p.runBackground<-true}Pool结构增加两个成员:Workers和runBackground,Workers保存所有worker,runBackground用于维护pool存活。新增三个方法,AddTask()方法用于向收集器添加任务;RunBackground()方法生成一个无限运行的goroutine,以便池保持活动状态,因为runBackground通道是空的,读取空通道会阻塞,因此池可以保持运行状态。接下来,启动协程中的worker;Stop()方法用于停止worker,向runBackground发送停止信号结束RunBackground()方法。让我们看看它是如何工作的。在真实的业务场景中,池会和HTTP服务器一起运行,消费任务。我们用一个for无限循环来模拟这个场景,如果满足某个条件,池就会停止。//main.go...pool:=workerpool.NewPool(allTask??,5)gofunc(){for{taskID:=rand.Intn(100)+20iftaskID%7==0{pool.Stop()}时间。睡眠(time.Duration(rand.Intn(5))*time.Second)task:=workerpool.NewTask(func(datainterface{})error{taskID:=data.(int)time.Sleep(100*time.Millisecond)fmt.Printf("Task%dprocessed\n",taskID)returnnil},taskID)pool.AddTask(task)}}()pool.RunBackground()上面代码执行的时候,我们会看到有随机的任务被传递给后台运行的worker,其中一个worker会读取任务并完成处理。当满足某个条件时,程序停止并退出。在上一篇文章的基础上总结了初步的解决方案,本文讨论通过workPool构建一个健壮的解决方案。同时,我们进一步扩展了解决方案,实现了后台运行的pool和处理交付任务。点击【阅读原文】可直接进入代码仓库【1】。参考文献[1]代码仓库:https://github.com/Joker666/goworkerpoolvia:https://hackernoon.com/concurrency-in-golang-and-workerpool-part-2-l3w31q7作者:Hasan
