本文转载自微信公众号《吴沁强的深夜食堂》,作者吴沁库里。转载本文请联系吴勤强深夜食堂公众号。业务场景在做任务开发的时候,肯定会遇到以下场景:场景一:在调用第三方接口时,一个需求需要调用不同的接口进行数据组装。场景二:一个应用首页可能依赖很多服务。这涉及到在加载页面时需要同时请求多个服务的接口。这一步往往被后端调用,将数据组装起来,然后返回给前端,也就是所谓的BFF(BackendForFrontend)层。对于以上两种场景,假设没有强依赖,选择串行调用,那么总耗时为:time=s1+s2+....就问候一下祖宗十八代吧。为了好的KPI,我们往往会选择并发调用这些依赖的接口。那么总的耗时是:time=max(s1,s2,s3...,sn)当然开始堆业务的时候可以先序列化,等上面人着急的时候,将展示他们的技巧。这样,年末PPT可以加一笔厚账:业务某个接口性能提升XXX个百分点,间接产生XXX价值。当然,这一切的前提是做老板不懂技术,做技术“懂”你。言归正传,如果修改成并发调用,可能会这样写,packagemainimport("fmt""sync""time")funcmain(){varwgsync.WaitGroupwg.Add(2)varuserInfo*UservarproductList[]Productgofunc(){deferwg.Done()userInfo,_=getUser()}()gofunc(){deferwg.Done()productList,_=getProductList()}()wg.Wait()fmt.Printf("用户信息:%+v\n",userInfo)fmt.Printf("产品信息:%+v\n",productList)}/*********用户服务************/typeUserstruct{NamestringAgeuint8}funcgetUser()(*User,error){time.Sleep(500*time.Millisecond)varuUseru.Name="wuqinqiang"u.Age=18return&u,nil}/********产品服务***********/typeProductstruct{TitlestringPriceuint32}funcgetProductList()([]Product,error){time.Sleep(400*time.Millisecond)varlist[]Productlist=append(list,Product{Title:"SHib",Price:10,})returnlist,nil}在实现上,需要多少个服务,多少个G就会开启,利用sync.WaitGroup的特性,达到并发编排的效果g任务。看来问题不大。但是随着代号996业务场景的增加,你会发现很多模块功能都差不多,但是对应的业务场景却不一样。那么我们是不是可以针对这个业务场景,抽象出一套工具,把具体的业务实现交给业务方。本着不重新发明轮子的原则,我搜索了开源项目,最终在go-zero中爱上了一个工具mapreduce。随意谷歌自己的术语。使用起来非常简单。让我们用它来修改上面的代码:func()(errerror){userInfo,err=getUser()returnerr},func()(errerror){productList,err=getProductList()returnerr})fmt.Printf("用户信息:%+v\n",userInfo)fmt.Printf("ProductInformation:%+v\n",productList)}//打印用户信息:&{Name:wuqinqiangAge:18}ProductInformation:[{Title:SHibPrice:10}]是不是比较舒服。但这里还有一件事需要注意。如果你调用的其中一个服务出错,你返回了err对应的错误,那么其他调用的服务就会被取消。比如我们修改getProductList,直接响应错误。funcgetProductList()([]Product,error){returnnil,errors.New("testerror")}//打印//用户信息://产品信息:[]然后连用户信息都会打印在end为空,由于服务错误,用户服务请求被取消。一般情况下,我们在请求服务错误时会有底线操作,服务错误不能影响其他请求的结果。所以具体的处理要看使用时的业务场景。既然源码都用上了,那我们就追源码吧。funcFinish(fns...func()error)error{iflen(fns)==0{returnnil}returnMapReduceVoid(func(sourcechan<-interface{}){for_,fn:=rangefns{source<-fn}},func(iteminterface{},writerWriter,cancelfunc(error)){fn:=item.(func()error)iferr:=fn();err!=nil{cancel(err)}},func(pipe<-chaninterface{},cancelfunc(error)){drain(pipe)},WithWorkers(len(fns)))}funcMapReduceVoid(generatorGenerateFunc,mapperMapperFunc,reducerVoidReducerFunc,opts...Option)error{_,err:=MapReduce(generator,mapper,func(input<-chaninterface{},writerWriter,cancelfunc(error)){reducer(input,cancel)drain(input)//我们需要写一个placeholdertoletMapReducetocontinueonreducerdone,//否则,allgoroutinesarewaiting.TheplaceholderwillbediscardedbyMapReduce.Writewrite(...)returnerr}对于MapReduceVoidfunction,主要检查三个闭包参数,第一个GenerateFunc用于生产数据,MapperFunc读取生成的数据并进行处理,这里的VoidReducerFunc表示mapper之后的数据不会被聚合返回内德。所以这个关闭对这个操作的影响几乎为零。funcMapReduce(generateGenerateFunc,mapperMapperFunc,reducerReducerFunc,opts...选项)(接口{},错误){source:=buildSource(generate)returnMapReduceWithSource(source,mapper,reducer,opts...)}funcbuildSource(generateGenerateFunc)chaninterface{}{source:=make(chaninterface{})//创建一个无缓冲通道threading.GoSafe(func(){deferclose(source)generate(source)//开始生成数据})returnsource//返回无缓冲通道}在buildSource函数中,返回一个无缓冲的通道。并打开一个G运行generate(source)以将数据插入无缓冲通道。这个generate(source)不就是一开始Finish传递的第一个闭包参数吗。returnMapReduceVoid(func(sourcechan<-interface{}){//就是这个for_,fn:=rangefns{source<-fn}})然后查看MapReduceWithSource函数,funcMapReduceWithSource(source<-chaninterface{},mapperMapperFunc,reducerReducerFunc,opts...Option)(interface{},error){options:=buildOptions(opts...)//任务执行结束通知信号output:=make(chaninterface{})//将mapper处理后的数据写入collectorcollector:=make(chaninterface{},options.workers)//取消操作.Do(func(){done.Close()close(output)})}cancel:=once(func(errerror){iferr!=nil{retErr.Set(err)}else{retErr.Set(ErrCancelWithNil)}drain(来源)finish()})gofunc(){deferfunc(){ifr:=recover();r!=nil{cancel(fmt.Errorf("%v",r))}else{finish()}}()reducer(collector,writer,cancel)drain(collector)}()//真正从generator通道取数据执行MappergoexecuteMappers(func(iteminterface{},wWriter){mapper(item,w,cancel)},source,收集器,done.Done(),选项s.workers)value,ok:=<-outputiferr:=retErr.Load();err!=nil{returnil,err}elseifok{returnvalue,nil}else{returnnil,ErrReduceNoOutput}}这段代码比较长,我们来说到这里的核心点。使用G调用executeMappers方法。goexecuteMappers(func(iteminterface{},wWriter){mapper(item,w,cancel)},source,collector,done.Done(),options.workers)funcexecuteMappers(mapperMapFunc,input<-chaninterface{},collectorchan<-interface{},done<-chanlang.PlaceholderType,workersint){varwgsync.WaitGroupdeferfunc(){//等待所有任务执行完毕wg.Wait()//关闭通道close(collector)}()//根据创建worker到指定数量的Poolpool:=make(chanlang.PlaceholderType,workers)writer:=newGuardedWriter(collector,done)for{select{case<-done:returncasepool<-lang.Placeholder://从返回的无缓冲通道获取数据bybuildSource()item,ok:=<-input//当通道关闭时,结束if!ok{<-poolreturn}wg.Add(1)//bettertosafelyruncallerdefinedmethodthreading.GoSafe(func(){deferfunc(){wg.Done()<-pool}()//真正运行闭包函数的地方//func(iteminterface{},wWriter){//mapper(item,w,cancel)//}mapper(item,writer)})}}}具体的逻辑已经注释了,代码通俗易懂。一旦executeMappers函数返回,关闭收集通道,执行reducer将不再阻塞。gofunc(){deferfunc(){ifr:=recover();r!=nil{cancel(fmt.Errorf("%v",r))}else{finish()}}()reducer(collector,writer,cancel)//这里的drain(collector)}()这里的reducer(collector,writer,cancel)其实就是MapReduceVoid传过来的第三个闭包函数。funcMapReduceVoid(generatorGenerateFunc,mapperMapperFunc,reducerVoidReducerFunc,opts...Option)error{_,err:=MapReduce(generator,mapper,func(input<-chaninterface{},writerWriter,cancelfunc(error)){reducer(input,cancel)//这里drain(input)//WeneedtowriteaplaceholdertoletMapReducetocontinueonreducerdone,//否则allgoroutinesarewaiting.TheplaceholderwillbediscardedbyMapReduce.writer.Write(lang.Placeholder)},opts...)returnerr}然后这个闭包函数执行reducer(input),can,reducer这里就是我们一开始解释的VoidReducerFunc,它来自于Finish()。等等,看到上面三个地方的drain(input)了吗?//draindrainsthechannel.funcdrain(channel<-chaninterface{}){//drainthechannelforrangechannel{}}其实就是对channel进行drain的操作,但是三个地方在同一个channel上做同样的操作也让我很困惑。还有更重要的一点。gofunc(){deferfunc(){ifr:=recover();r!=nil{cancel(fmt.Errorf("%v",r))}else{finish()}}()reducer(collector,writer,cancel)drain(collector)}()上面的代码,如果reducer被执行,writer被写入触发panic,那么drain(collector)就没有机会执行了。不过作者已经通过直接把drain(collector)放到defer中解决了这个问题。具体问题[1]。至此,关于Finish的源码就结束了。有兴趣的可以看看其他的源码。我非常喜欢go-zero中的一些工具,但是这些工具往往不是独立的,依赖于其他文件包,这导致即使你只想使用其中一个工具也需要安装整个包。所以最后的结果就是捡起源码,创建一个无依赖的库工具集,跟着MIT走。附录[1]https://github.com/tal-tech/go-zero/issues/676