本文转载自微信公众号《Golang来了》,作者Seekload。本文转载请联系Golang公众号。四哥水平有限,如有翻译或理解错误,请大家帮忙指出,谢谢!昨天分享了一篇关于workerPool的文章,后台有同学说昨天的demo刚好贴合项目的业务场景。真的很棒!所以我今天再来分享一篇文章。原文如下:在现代编程语言中,并发已经成为一个必不可少的特性。今天的大多数编程语言都有一些实现并发的方法。其中一些实现非常强大,可以将负载转移到不同的系统线程,比如Java等;有些在同一个线程上模拟这种行为,例如Ruby等。Golang有一个非常强大的并发模型,称为CSP(CommunicationSequentialProcesses),它将一个问题分解成更小的顺序进程,然后调度这些进程的实例(称为Goroutines).这些进程通过通道传递信息来进行通信。在本文中,我们将探讨如何利用golang的并发性以及如何在workerPool中使用它。在系列的第二篇文章中,我们将探索如何构建一个健壮的并发解决方案。一个简单的例子假设我们需要调用一个外部API接口,整个过程耗时100ms。如果我们需要同步调用这个接口1000次,需要100s。////model/data.gopackagemodeltypeSimpleDatastruct{IDint}////basic/basic.gopackagebasicimport("fmt""github.com/Joker666/goworkerpool/model""time")funcWork(allData[]model.SimpleData){start:=time.Now()fori,_:=rangeallData{Process(allData[i])}elapsed:=time.Since(start)fmt.Printf("Took===============>%s\n",elapsed)}funcProcess(datamodel.SimpleData){fmt.Printf("Startprocessing%d\n",data.ID)time.Sleep(100*time.Millisecond)fmt.Printf("Finishprocessing%d\n",data.ID)}////main.gopackagemainimport("fmt""github.com/Joker666/goworkerpool/basic""github.com/Joker666/goworkerpool/model""github.com/Joker666/goworkerpool/worker")funcmain(){//PreparethedatavarallData[]model.SimpleDatafori:=0;i<1000;i++{data:=model.SimpleData{ID:i}allData=append(allData,data)}fmt.Printf("Startprocessingallwork\n")//Processbasic.Work(allData)}StartprocessingallworkTook===============>1m40.226679665s上面的代码创建了一个模型包,里面包含一个结构体,这个结构体只有一个int类型的成员。我们同步处理数据,这显然不是最好的选择,因为这些任务是可以并发处理的。我们换个方案,用goroutine和channel来处理。异步////worker/notPooled.gofuncNotPooledWork(allData[]model.SimpleData){start:=time.Now()varwgsync.WaitGroupdataCh:=make(chanmodel.SimpleData,100)wg.Add(1)gofunc(){deferwg.Done()fordata:=rangedataCh{wg.Add(1)gofunc(datamodel.SimpleData){deferwg.Done()basic.Process(data)}(data)}}()fori,_:=rangeallData{dataCh<-allData[i]}close(dataCh)wg.Wait()elapsed:=time.Since(start)fmt.Printf("Took================>%s\n",elapsed)}////main.go//Processworker.NotPooledWork(allData)StartprocessingallworkTook===============>101.191534ms上面的代码,我们创建了一个容量100缓存通道,通过NoPooledWork()将数据推入通道。通道长度达到100后,在读取某些元素之前,我们无法向其中添加元素。用于范围读取通道,并生成goroutine处理。这里我们不限制派生的goroutines的数量,它可以处理尽可能多的任务。理论上,在给定所需资源的情况下,可以处理尽可能多的数据。执行代码,完成1000个任务只用了100ms。这很疯狂!不是全部,请继续阅读。问题除非我们拥有地球上的所有资源,否则在任何给定时间可以分配的资源数量都是有限的。一个goroutine占用的内存最小为2k,但也可以达到1G。上述并发执行所有任务的解决方案,假设有一百万个任务,将很快耗尽机器的内存和CPU。我们要么升级机器的配置,要么寻找其他更好的解决方案。计算机科学家很早以前就思考过这个问题,并想出了一个极好的解决方案——使用ThreadPool或WorkerPool。这个解决方案是使用一个workerpool,worker数量有限,来处理任务。Workers会按顺序一个接一个地处理任务,从而避免了CPU和内存使用率的快速增加。解决方案:工作池我们通过实施工作池解决了之前遇到的问题。////worker/pooled.gofuncPooledWork(allData[]model.SimpleData){start:=time.Now()varwgsync.WaitGroupworkerPoolSize:=100dataCh:=make(chanmodel.SimpleData,workerPoolSize)fori:=0;i
