前言大家好,我是asong;前几天访问github的时候发现了一个有趣的并发库-conc。它的目标是:goroutineleaks更难处理panic,更友好,并发代码可读性高。从介绍来看,主要的封装功能如下:封装waitGroup,避免很多重复的代码,同时封装recover,更安全。提供panics。Catcher封装了恢复逻辑,统一捕获panics,打印调用堆栈的一些信息,并提供并发执行任务的workerpool,可以控制并发,goroutine可以复用,支持函数签名,提供stream方法保证结果有序。提供ForEach和map方法来优雅地处理切片。接下来,我们将通过区分模块来介绍库;仓库地址:https://github.com/sourcegraph/concWatiGroup封装Go语言标准库提供了sync.waitGroup控制等待goroutine,我们通常会写如下代码:funcmain(){varwgsync.WaitGroupfori:=0;我<10;i++{wg.Add(1)gofunc(){deferwg.Done()deferfunc(){//恢复恐慌err:=recover()iferr!=nil{fmt.Println(err)}}//dosomethinghandle()}}wg.Wait()}上面的代码需要一堆重复的代码,需要在每个func中分别处理recover逻辑,所以conc库对其进行了封装,简化了代码如下:funcmain(){wg:=conc.NewWaitGroup()fori:=0;我<10;i++{wg.Go(doSomething)}wg.Wait()}函数doSomething(){fmt.Println("test")}conc库封装也比较简单,结构如下:typeWaitGroupstruct{wgsync.WaitGrouppcpanics.Catcher}它实现了Catcher类型来封装恢复逻辑,封装思路如下:typeCatcherstruct{recoveredatomic.Pointer[RecoveredPanic]}recovered是原子指针类型,RecoveredPanic是捕获的recover包,里面封装了stack等信息:typeRecoveredPanicstruct{//原来恐慌的价值。值any//runtime.Callers在panic被恢复时返回的调用者列表。可用于通过//runtime.CallersFrames.Callers[]uintptr生成更详细的堆栈信息//来自恢复恐慌的goroutine的格式化堆栈跟踪。//比Callers更容易使用。Stack[]byte}提供了一个Try方法执行方法,只会记录第一个panic的gououtine信息:func(p*Catcher)Try(ffunc()){deferp.tryRecover()f()}func(p*Catcher)tryRecover(){如果val:=recover();val!=nil{rp:=NewRecoveredPanic(1,val)//只有第一个panic会被记录goroutine信息p.recovered.CompareAndSwap(nil,&rp)}}提供Repanic()方法重放捕获的panic:func(p*Catcher)Repanic(){如果val:=p.Recovered();val!=nil{panic(val)}}func(p*Catcher)Recovered()*RecoveredPanic{returnp.recovered.Load()}waitGroup还提供了Wait()和WaitAndRecover()方法:func(h*WaitGroup)Wait(){h.wg.Wait()//如果我们从子goroutine.h.pc.Repanic()}func(h*WaitGroup)WaitAndRecover()*panics.RecoveredPanic{h.wg.Wait()//如果我们从子goroutine中捕获到恐慌,则返回恢复的恐慌。returnh.pc.Recovered()}wait方法只要一个goroutine发生panic就会向上抛出panic,比较简单粗暴;waitAndRecover方法只有在一个goroutinepanic发生时才会返回第一个恢复的goroutine信息;总结:conc库对waitGrouop的封装总体来说还是不错的,可以减少重复代码;workerpoolconc提供了几种类型的workerpool:ContextPool:可以传递context的pool,如果一个goroutine出错,可以取消其他goroutineErrorPool:通过参数,可以控制只收集第一个error还是所有errorResultContextPool:如果一个goroutine发生错误,它会取消其他goroutines并收集错误。ResultPool:收集工作池中各个任务的执行结果,不能保证顺序。为了保证顺序,需要使用stream或者iter.map;让我们看一个简单的例子:import"github.com/sourcegraph/conc/pool"funcExampleContextPool_WithCancelOnError(){p:=pool.New().WithMaxGoroutines(4).WithContext(context.Background()).WithCancelOnError()fori:=0;我<3;i++{i:=ip.Go(func(ctxcontext.Context)error{ifi==2{returnerrors.New("我将取消所有其他任务!")}<-ctx.Done()返回nil})}err:=p.Wait()fmt.Println(err)//Output://Iwillcancelallothertasks!}创建池时,可以调用以下方法:p.WithMaxGoroutines()配置pool中最大goroutine数p.WithErrors:配置池中任务是否返回错误p.WithContext(ctx):配置池中运行的任务遇到第一个错误时取消p.WithFirstError:配置池中任务pooltoreturnonlythefirsterrorp.WithCollectErrored:configurethetaskinthepooltocollectall错误池的基本结构如下:typePoolstruct{handleconc.WaitGrouplimiterlimitertaskschanfunc()initOncesync.Once}是controller,使用chan来控制goroutine的个数:typelimiterchanstruct{}func(llimiter)limit()int{returncap(l)}func(llimiter)release(){ifl!=nil{<-l}}pool的核心逻辑也比较简单。如果没有设置limiter,那么就看它是否是freeworker,否则创建一个新的worker,然后deliver任务进来;如果设置了限制器,则达到如果达到limiterworker限制,则将任务交给空闲worker,如果没有空闲时间,则阻塞等待;func(p*Pool)Go(ffunc()){p.init()ifp.limiter==nil{//没有限制select{casep.tasks<-f://一个goroutine可以用来处理任务。default://没有可用的goroutine来处理任务。//生成一个新的并向其发送任务。p.handle.Go(p.worker)p.tasks<-f}}else{select{casep.limiter<-struct{}{}://如果我们低于限制,则产生一个新的worker//而不是等待一个可用。p.handle.Go(p.worker)//我们知道至少有一个worker在运行,所以等待它可用。这确保我们永远不会产生比任务数量更多的工人。p.tasks<-fcasep.tasks<-f://一个worker可用并且已经接受了任务。return}}}这里的工作使用了一个unbufferedchannel,这种多路复用的方式非常巧妙,如果goroutine执行的很快,可以避免创建过多的goroutine;使用pool处理任务不能保证顺序,conc库提供了Stream方法,返回结果可以保持顺序;StreamSteam的实现也依赖于pool,在此基础上做封装,保证结果的顺序,先看一个例子:funcExampleStream(){times:=[]int{20,52,16,45,4,80}stream:=stream2.New()for_,millis:=rangetimes{dur:=time.Duration(millis)*time.Millisecondstream.Go(func()stream2.Callback{time.Sleep(dur)//这将按照任务提交的顺序打印returnfunc(){fmt.Println(dur)}})}stream.Wait()//输出://20ms//52ms//16ms//45ms//4ms//80ms}流的结构如下:typeStreamstruct{poolpool.PoolcallbackerHandleconc.WaitGroupqueuechancallbackChinitOncesync.Once}queue是通道类型,callbackCh也是通道类型-chanfunc():typecallbackChchanfunc()提交goroutine时按顺序生成callbackCh投递结果:func(s*Stream)Go(fTask){s.init()//从缓存中获取通道.ch:=getCh()//将通道加入回调队列.s.queue<-ch//提交任务执行.s.pool.Go(func(){deferfunc(){//在f出现恐慌的情况下,我们不希望回调//饿死等待g用于此通道的回调,因此给它一个//空回调。如果r:=恢复();r!=nil{ch<-func(){}panic(r)}}()//运行任务,将其回调发送到任务的通道。callback:=f()ch<-callback})}varcallbackChPool=sync.Pool{新:func()any{returnmake(callbackCh,1)},}funcgetCh()callbackCh{returncallbackChPool.Get().(callbackCh)}funcputCh(chcallbackCh){callbackChPool.Put(ch)}ForEach和mapForEachconc库提供了ForEach方法来优雅地并发处理切片。看一下官方的例子:conc库是用泛型封装的。我们只需要关注handle代码即可,避免代码冗余。我们自己写一个例子:funcmain(){input:=[]int{1,2,3,4}iterator:=iter.Iterator[int]{MaxGoroutines:len(input)/2,}iterator.ForEach(input,func(v*int){if*v%2!=0{*v=-1}})fmt.Println(input)}ForEach内部实现为一个Iterator结构,其核心逻辑如下:typeIterator[Tany]struct{MaxGoroutinesint}func(iterIterator[T])ForEachIdx(input[]T,ffunc(int,*T)){ifiter.MaxGoroutines==0{//iter是一个值接收器和我因此可以安全地改变iter.MaxGoroutines=defaultMaxGoroutines()}numInput:=len(input)ifiter.MaxGoroutines>numInput{//并发任务数不超过输入项的数量。iter.MaxGoroutines=numInput}varidx原子。int64//通过原子控制只创建一个闭包任务task:=func(){i:=int(idx.Add(1)-1)for;我
