1.简介在Go语言中,我们可以使用errgroup库来处理goroutine中的错误。最近更新了errgroup库,增加了对并发数限制的支持。在本文中,我们介绍errgroup库的使用和实现原理。2.用法errgroup库使用起来非常简单。我们通过三个简单的示例代码来介绍三种使用方法。基本使用funcmain(){eg:=errgroup.Group{}eg.Go(func()error{fmt.Println("go1")returnnil})eg.Go(func()error{fmt.Println("go2")err:=errors.New("go2err")returnerr})err:=eg.Wait()iferr!=nil{fmt.Println("err=",err)}}阅读上面的段落代码,我们使用errgroup库的Go()方法启动两个goroutine,分别模拟errorgoroutine和normalgoroutine。然后,使用errgroup库的Wait()方法来确定是否有任何goroutine返回错误消息。附加取消函数funcmain(){例如,ctx:=errgroup.WithContext(context.Background())eg.Go(func()error{time.Sleep(1*time.Second)select{case<-ctx.Done():fmt.Println("go1cancel,err=",ctx.Err())默认值:fmt.Println("go1run")}returnnil})eg.Go(func()error{err:=errors.New("go2err")returnerr})err:=eg.Wait()iferr!=nil{fmt.Println("err=",err)}}阅读上面的代码,我们使用了errgroup库WithContext()函数可以添加一个取消函数。在第一个使用Go()方法启动的协程函数中,我们使用select...case...default来监听其他协程是否返回错误并进行相应的逻辑处理。限制并发数funcmain(){eg:=errgroup.Group{}eg.SetLimit(2)eg.TryGo(func()error{fmt.Println("go1run")returnnil})eg.TryGo(func()error{err:=errors.New("go2err")returnerr})eg.TryGo(func()error{fmt.Println("go3run")返回nil})err:=eg.Wait()iferr!=nil{fmt.Println("err=",err)}}看了上面的代码,我们使用了errgroup库中新增的限制并发数的功能。首先使用SetLimit()方法设置并发量,然后使用TryGo()方法代替Go()方法。3.实现原理我们通过阅读errgroup库的源码来简单介绍一下errgroup的实现原理。我们先看一下Group结构的源码。typeGroupstruct{cancelfunc()wgsync.WaitGroupsemchantokenerrOncesync.Onceerrerror}在源码中,我们可以发现Group结构中包含了5个字段,其中sem字段最近用于实现并添加了限制并发数的功能。通过Group结构体的字段,我们可以看出errgroup其实是对sync和context的封装。其中,cancel为使用context的取消方法;wg是使用sync.WairGroup的相关方法;sem是通过channel控制并发数;errOnce是利用sync.Once的特性,只保存第一个A返回的goroutineerror;err是goroutine返回的错误。func(g*Group)Go(ffunc()error){ifg.sem!=nil{g.sem<-token{}}g.wg.Add(1)gofunc(){deferg.done()如果错误:=f();err!=nil{g.errOnce.Do(func(){g.err=errifg.cancel!=nil{g.cancel()}})}}()}我们读了Go()方法错误组库。首先通过判断g.sem的值是否为nil,如果g.sem的值不为nil,则说明并发数已经设置好,然后传递给g.sem发送一个空结构令牌{}来抢占资源。如果资源被抢到,则启动一个goroutine,否则,它被阻塞并等待其他正在执行的goroutines释放资源。细心的读者可能已经发现,除了在Go()方法的开头添加逻辑代码判断g.sem是否为nil外,defer也发生了变化,从之前直接调用sync.WaitGroupDone()方法更改为调用errgroup库的新done()方法。done()方法的源码:func(g*Group)done(){ifg.sem!=nil{<-g.sem}g.wg.Done()}通过阅读done的源码()方法,我们可以发现在调用sync.WaitGroup的Done()方法之前,先判断g.sem的值是否为nil,如果不为nil,则释放资源。我们再读一遍Wait()方法的源码:func(g*Group)Wait()error{g.wg.Wait()ifg.cancel!=nil{g.cancel()}returng.err}通过阅读Wait()方法的源码,我们可以发现它其实封装了sync.WaitGroup的Wait()方法和context包的cancel,返回所有运行的goroutine返回的第一个错误。最后,我们阅读新增函数TryGo()方法和SetLimit()方法的源码,控制并发数:func(g*Group)TryGo(ffunc()error)bool{ifg.sem!=nil{select{caseg.sem<-token{}://注意:这允许闯入iff通道通常允许闯入。默认值:返回false}}g.wg.Add(1)gofunc(){deferg.done()iferr:=f();err!=nil{g.errOnce.Do(func(){g.err=errifg.cancel!=nil{g.cancel()}})}}()returntrue}通过阅读源码TryGo()方法,我们可以发现它和Go()方法的区别在于处理g.sem的值的逻辑。TryGo()方法在处理g.sem的值时,会使用select...case...default语句先尝试抢一次资源。当无法抢到资源时,不再阻塞,而是直接返回false,表示执行失败。SetLimit()方法源码:func(g*Group)SetLimit(nint){ifn<0{g.sem=nilreturn}iflen(g.sem)!=0{panic(fmt.Errorf("errgroup:modifylimitwhile%vgoroutinesinthegrouparestillactive",len(g.sem)))}g.sem=make(chantoken,n)}通过阅读SetLimit()方法的源代码,我们可以看到,当输入参数n小于0时,直接给g.sem赋值nil,表示没有并发数限制。调用SetLimit()方法时,g.sem必须是空通道,否则程序会panic。除去SetLimit()方法的判断逻辑代码,其实SetLimit()方法就是创建一个大小为n的缓冲通道。SetLimit()和TryGo()通常一起使用。4.小结本文介绍了Go方法提供的errgroup库,该库最近增加了控制并发数的功能。我们首先介绍了三种使用方法,然后通过阅读源码分析了其实现原理。
