当前位置: 首页 > 科技观察

Go语言与WorkerPool中的并发

时间:2023-03-16 00:46:28 科技观察

本文转载自微信公众号《Golang来了》,作者Seekload。本文转载请联系Golang公众号。四哥水平有限,如有翻译或理解错误,请大家帮忙指出,谢谢!昨天分享了一篇关于workerPool的文章,后台有同学说昨天的demo刚好贴合项目的业务场景。真的很棒!所以我今天再来分享一篇文章。原文如下:在现代编程语言中,并发已经成为一个必不可少的特性。今天的大多数编程语言都有一些实现并发的方法。其中一些实现非常强大,可以将负载转移到不同的系统线程,比如Java等;有些在同一个线程上模拟这种行为,例如Ruby等。Golang有一个非常强大的并发模型,称为CSP(CommunicationSequentialProcesses),它将一个问题分解成更小的顺序进程,然后调度这些进程的实例(称为Goroutines).这些进程通过通道传递信息来进行通信。在本文中,我们将探讨如何利用golang的并发性以及如何在workerPool中使用它。在系列的第二篇文章中,我们将探索如何构建一个健壮的并发解决方案。一个简单的例子假设我们需要调用一个外部API接口,整个过程耗时100ms。如果我们需要同步调用这个接口1000次,需要100s。////model/data.gopackagemodeltypeSimpleDatastruct{IDint}////basic/basic.gopackagebasicimport("fmt""github.com/Joker666/goworkerpool/model""time")funcWork(allData[]model.SimpleData){start:=time.Now()fori,_:=rangeallData{Process(allData[i])}elapsed:=time.Since(start)fmt.Printf("Took===============>%s\n",elapsed)}funcProcess(datamodel.SimpleData){fmt.Printf("Startprocessing%d\n",data.ID)time.Sleep(100*time.Millisecond)fmt.Printf("Finishprocessing%d\n",data.ID)}////main.gopackagemainimport("fmt""github.com/Joker666/goworkerpool/basic""github.com/Joker666/goworkerpool/model""github.com/Joker666/goworkerpool/worker")funcmain(){//PreparethedatavarallData[]model.SimpleDatafori:=0;i<1000;i++{data:=model.SimpleData{ID:i}allData=append(allData,data)}fmt.Printf("Startprocessingallwork\n")//Processbasic.Work(allData)}StartprocessingallworkTook===============>1m40.226679665s上面的代码创建了一个模型包,里面包含一个结构体,这个结构体只有一个int类型的成员。我们同步处理数据,这显然不是最好的选择,因为这些任务是可以并发处理的。我们换个方案,用goroutine和channel来处理。异步////worker/notPooled.gofuncNotPooledWork(allData[]model.SimpleData){start:=time.Now()varwgsync.WaitGroupdataCh:=make(chanmodel.SimpleData,100)wg.Add(1)gofunc(){deferwg.Done()fordata:=rangedataCh{wg.Add(1)gofunc(datamodel.SimpleData){deferwg.Done()basic.Process(data)}(data)}}()fori,_:=rangeallData{dataCh<-allData[i]}close(dataCh)wg.Wait()elapsed:=time.Since(start)fmt.Printf("Took================>%s\n",elapsed)}////main.go//Processworker.NotPooledWork(allData)StartprocessingallworkTook===============>101.191534ms上面的代码,我们创建了一个容量100缓存通道,通过NoPooledWork()将数据推入通道。通道长度达到100后,在读取某些元素之前,我们无法向其中添加元素。用于范围读取通道,并生成goroutine处理。这里我们不限制派生的goroutines的数量,它可以处理尽可能多的任务。理论上,在给定所需资源的情况下,可以处理尽可能多的数据。执行代码,完成1000个任务只用了100ms。这很疯狂!不是全部,请继续阅读。问题除非我们拥有地球上的所有资源,否则在任何给定时间可以分配的资源数量都是有限的。一个goroutine占用的内存最小为2k,但也可以达到1G。上述并发执行所有任务的解决方案,假设有一百万个任务,将很快耗尽机器的内存和CPU。我们要么升级机器的配置,要么寻找其他更好的解决方案。计算机科学家很早以前就思考过这个问题,并想出了一个极好的解决方案——使用ThreadPool或WorkerPool。这个解决方案是使用一个workerpool,worker数量有限,来处理任务。Workers会按顺序一个接一个地处理任务,从而避免了CPU和内存使用率的快速增加。解决方案:工作池我们通过实施工作池解决了之前遇到的问题。////worker/pooled.gofuncPooledWork(allData[]model.SimpleData){start:=time.Now()varwgsync.WaitGroupworkerPoolSize:=100dataCh:=make(chanmodel.SimpleData,workerPoolSize)fori:=0;i%s\n",elapsed)}///main.go//Processworker.PooledWork(allData)StartprocessingallworkTook===============>1.002972449s以上代码,worker数量限制为100个,我们创建了对应的数量处理任务的goroutines。我们可以将通道视为队列,将工作协程视为消费者。多个goroutines可以监听同一个channel,但是channel中的每个元素只会被处理一次。Go语言的channel可以当做队列使用。这是一个更好的解决方案,执行代码,我们看到完成所有任务需要1s。虽然没有100ms那么快,但是已经可以满足业务需求了,我们已经有了更好的方案,可以将负载均匀的分散到不同的时间片上。处理错误不是我们所能做的。上面看起来像是一个完整的解决方案,但事实并非如此,我们没有处理错误情况。所以我们需要模拟错误的情况,看看我们需要如何处理。////worker/pooledError.gofuncPooledWorkError(allData[]model.SimpleData){start:=time.Now()varwgsync.WaitGroupworkerPoolSize:=100dataCh:=make(chanmodel.SimpleData,workerPoolSize)错误:=make(chanerror,1000)fori:=0;i%s\n",elapsed)}funcprocess(datamodel.SimpleData,errorschan<-error){fmt.Printf("开始处理%d\n",data.ID)time.Sleep(100*time.Millisecond)ifdata.ID%29==0{errors<-fmt.Errorf("erroronjob%v",data.ID)}else{fmt.Printf("完成处理%d\n",data.ID)}}////main.go//Processworker.PooledWorkError(allData)我们修改了process()函数来处理一些随机错误并将错误推送到错误通道。所以,为了处理并发错误,我们可以使用错误通道来保存错误数据。处理完所有任务后,可以检查错误通道中的数据。错误通道中的元素存储任务ID,以便在需要时可以处理这些任务。这显然是比之前不处理错误更好的解决方案。但我们可以做得更好。在下一篇文章中,我们将讨论如何编写强大的工作线程池包,并在工作线程数量有限的情况下处理并发任务。综上所述,Go语言的并发模型足够强大,只需要搭建一个workerpool就可以解决问题,不需要做太多的工作,这也是它没有被纳入标准库的原因。但是,我们可以构建一个解决方案来满足我们自己的需求。很快,我会在下一篇文章中讲到,敬请期待!点击【阅读原文】可直接进入代码仓库【1】。参考文献[1]代码仓库:https://github.com/Joker666/goworkerpool?ref=hackernoon.comvia:https://hackernoon.com/concurrency-in-golang-and-workerpool-part-1-e9n31ao作者:哈桑