本文转载自微信公众号《吴沁强的深夜食堂》,作者吴沁咖喱。转载本文请联系吴勤强深夜食堂公众号。之前写过一篇文章,有个响亮的名字:Handling1MillionRequestsperMinutewithGo使用Go来处理每分钟一百万个请求。这是国外的作者写的。我做了一个解释,从这个标题开始。出乎意料的是,阅读次数是我最好的一次。果然,文章都是以标题为准的……今天偶然看到另一篇文章(原文在文末[1])。两篇文章的原理类似:有一批工作任务(jobs),通过worker-pool(工作池)的方式,达到多个worker并发处理job的效果。它们之间还是有很多区别的,实现上的差别还是挺大的。首先,我在上一篇放了一张图,大概是上一篇的整体工作流程吧。每个worker只要处理完任务,就不关心结果,也不对结果做进一步处理。只要请求不停止,程序就不会停止,没有控制机制,除非宕机。本文的不同之处在于:首先,数据会从生成(生产数据)->并发处理数据->处理结果进行聚合。图片大致是这样的,然后可以通过context.context达到控制workingpool停止工作的效果。最后,通过代码,你会发现它不是传统意义上的worker-pool,这个后面会解释。下图可以清晰的表达出整体的流程。顺便说一句,本文中实现的代码比Go中每分钟处理数百万个请求的代码要简单得多。先看工作。这可以简化。最后,每一个job处理完之后,都会被打包成一个Result返回。下面这段是核心代码。整个WorkerPool结构非常简单。jobs是一个缓冲通道。每个task都会放入jobs中等待woker处理。results也是一个channel类型,它的作用是保存每个job处理的结果Result。首先通过New初始化一个worker-pool工作池,然后执行Run开始运行。初始化时传入worker数量,运行每个g对应的work(ctx,&wg,wp.jobs,wp.results),组成一个worker-pool。同时,通过sync.WaitGroup,我们可以等待所有worker完成他们的工作,这意味着工作池已经完成了它的工作。当然也有可能是因为任务处理完成,也有可能是停止了。每个职位的数据源是怎么来的?对应每个worker的工作,每个worker都试图从同一个job中获取数据,这是典型的fan-out模式。当相应的g获取到job进行处理时,会将处理结果发送到同一个resultschannel,这是另一种fan-in模式。当然我们可以通过context.Context来控制每个worker的停止和运行。最后对结果集进行处理,所以整体测试代码为:看了代码,我们知道这不是传统意义上的worker-pool。它不像上一篇文章那样初始化一个真正的工作池。一旦接收到作业,它会尝试从池中获取一个worker,并将相应的作业交给work进行处理。工作处理完毕后,重启进入workerpool,等待下一次利用。附录[1]https://itnext.io/explain-to-me-go-concurrency-worker-pool-pattern-like-im-five-e5f1be71e2b0#fe56
