当前位置: 首页 > 科技观察

一篇给大家带来Go并发编程的文章Singleflight

时间:2023-03-19 22:10:05 科技观察

这篇文章的内容在Week05:点评系统架构设计中的易用性设计中有提到,不过这个属于Go官方的扩展同步包(golang.org/x/sync/singleflight)放在这里是为了统一内容。SingleFlight我们为什么需要SingleFlight(使用场景)?一般我们在写对外服务的时候,都会有一层缓存作为缓存来减轻底层数据库的压力,但是在redis抖动或者其他情况下,可能会导致大量的缓存未命中。如下图,可能有1000个来自桌面端和移动端用户的并发请求。都是访问获取文章列表的接口,获取前20条信息。如果此时我们的服务直接访问redis,出现了cachemiss,那么我们会请求数据库1000次,可能会给数据库带来很大的压力(这里的1000只是一个例子,实际上可能会大很多超过这个值),导致我们的服务异常或者超时。这时候就可以使用singleflight库了。直译过来就是单飞的意思。这个库的主要功能是将一组相同的请求组合成一个请求。实际上,它只会请求一次,然后对所有请求返回相同的结果。如下图所示,使用singleflight后,我们实际上在一个请求时间段内只向底层数据库发起一次请求,大大减轻了数据库的压力。如何使用SingleFlight包(教程)?函数签名主要是一个Group结构和三个方法。具体见下面注释typeGroup//Doexecutethefunction。多次调用同一个key时,没有执行完的时候//只会执行一次fn调用,其他调用会阻塞等待这个调用返回//v,err是传入的fn的返回值//shared表示fn返回的结果是实际执行还是返回共享结果func(g*Group)Do(keystring,fnfunc()(interface{},error))(vinterface{},errerror,sharedbool)//DoChan和Do类似,只不过DoChan返回的是一个channel,这是同步的,和异步的区别func(g*Group)DoChan(keystring,fnfunc()(interface{},error))<-chanResult//forget用于通知Group删除某个key,这样后面调用该key的时候就不会被阻塞等待func(g*Group)Forget(keystring)用法示例接下来看看我们实际是怎么使用的。我们先用一个常见的例子。这是获取文章详细信息的函数。我们在函数中使用一个count来模拟不同并发下的耗时差异,并发请求越多,耗时越长AddInt32(&count,1)time.Sleep(time.Duration(count)*time.Millisecond)returnfmt.Sprintf("article:%d",id),nil}当我们使用singleflight时,只需要new(singleflight.Group)然后调用对应的Do方法,是不是很简单funcsingleflightGetArticle(sg*singleflight.Group,idint)(string,error){v,err,_:=sg.Do(fmt.Sprintf("%d",id),func()(interface{},error){returngetArticle(id)})returnv.(string),err}效果测试只是说说而已,不练假动作。写一个简单的测试代码。接下来,我们将启动1000个Goroutines来并发调用这两个。方法varcountint32funcmain(){time.AfterFunc(1*time.Second,func(){atomic.AddInt32(&count,-count)})var(wgsync.WaitGroupnow=time.Now()n=1000sg=&singleflight.Group{})fori:=0;i0}doCall有点意思。它使用两个defer巧妙地将运行时错误和我们传递给函数的panic结合起来。区别是为了避免传入函数panicfunc(g*Group)doCall(c*call,keystring,fnfunc()(interface{},error)){normalReturn:=falserecovered:=false//第一次defer检查运行时错误deferfunc(){}()//使用匿名函数执行func(){deferfunc(){if!normalReturn{//如果有panic,我们就恢复它,然后新一个panic错误//Re-panicifr:=recover();r!=nil{c.err=newPanicError(r)}}}()c.val,c.err=fn()//如果fn没有panic,则执行这一步,如果panic,则不执行这一步//所以可以通过这个变量来判断是否panicnormalReturn=true}()//如果normalReturn为false,说明我们的fnpanic//如果执行到这一步,也说明我们的fnrecover卡住了,不是直接runtimeexitif!normalReturn{recovered=true}}来看代码在第一个defer中deferfunc(){//如果既没有正常执行也没有recover,说明需要直接退出if!normalReturn&&!recovered{c.err=errGoexit}c.wg.Done()g.mu。Lock()deferg.mu.Unlock()//如果已经忘记了,不要重复删除这个keyif!c.forgotten{delete(g.m,key)}ife,ok:=c.err.(*panicError);ok{//如果返回panic错误,为了避免通道死锁,我们需要保证panic无法恢复iflen(c.chans)>0{gopanic(e)select{}//Keepthisgoroutinearoundsothatitwillappearinthecrashdump.}else{panic(e)}}elseifc.err==errGoexit{//准备退出,所以不需要做其他操作}else{//正常情况下向通道写入数据for_,ch:=rangec.chans{ch<-Result{c.val,c.err,c.dups>0}}}}()DoChanDochan和Do类似,其实一个是同步等待,一个是异步返回。主要实现是如果调用DoChan,就会给调用.chans添加一个channel,这样执行完第一次调用后,会循环往这些channel中写入数据func(g*Group)DoChan(keystring,fnfunc()(interface{},error))<-chanResult{ch:=make(chanResult,1)g.mu.Lock()ifg.m==nil{g.m=make(map[string]*call)}ifc,ok:=g.m[key];ok{c.dups++c.chans=append(c.chans,ch)g.mu.Unlock()return}c:=&call{chans:[]chan<-Result{ch}}c.wg.Add(1)g.m[key]=cg.mu.Unlock()gog.doCall(c,key,fn)return}Forgetforget用于手动释放一个key,下次调用不会阻塞和等待func(g*Group)Forget(keystring){g.mu.Lock()ifc,ok:=g.m[key];ok{c.forgotten=true}delete(g.m,key)g.mu.Unlock()}有什么注意事项(避坑指南)?好但是不要滥用,还是有一些坑1.一个阻塞,所有员工都在等待使用singleflight我们一般直接使用Do方法,但是这种极端的情况会导致整个程序挂掉,如果我们的代码跑出去问题,如果一个调用挂了,会导致所有的请求挂掉还是前面的例子,我们加一个select模拟阻塞funcsingleflightGetArticle(sg*singleflight.Group,idint)(string,error){v,err,_:=sg.Do(fmt.Sprintf("%d",id),func()(interface{},error){//模拟有问题,挂liveselect{}returngetArticle(id)})returnv.(string),err}执行会发现死锁fataleror:allgoroutinesareasleep-deadlock!goroutine1[select(nocases)]:这个时候我们可以使用DoChan结合select做超时控制funcsingleflightGetArticle(ctxcontext.Context,sg*singleflight.Group,idint)(string,error){result:=sg.DoChan(fmt.Sprintf("%d",id),func()(接口{},错误){//模拟有问题,挂liveselect{}returngetArticle(id)})select{caser:=<-result:returnr.Val.(string),r.Errcase<-ctx.Done():返回“”,CTX。调用Err()}}时只要传入一个带超时的上下文,执行时会返回超时错误?gorun./1.gopanic:contextdeadlineexceeded2。一个错误,全部错误这本身不是问题,因为singleflight是这样设计的,但是在实际使用中,如果我们调用1秒,我们的数据库请求或者下游服务可以支持10rps的请求,这就会导致一个提高我们的错误阈值,因为实际上我们可以在一秒内尝试10次,但是使用singleflight之后你只能尝试一次。只要有错误,这段时间内的所有请求都会受到影响。在这种情况下,我们可以启动一个Goroutine来定时遗忘,相当于将rps从1rps增加到10rpsgofunc(){time.Sleep(100*time.Millisecond)//logging.Forget(key)}()总结了这个文章从使用场景,到使用方法,到源码分析和可能存在的陷阱,将singleflight介绍给大家。希望大家可以有所收获,没事看看官方代码还是很有收获的,这次又学习了一个show操作,使用doubledefer避免死锁,你学够了吗?我们的下一篇文章将开始一个新的系列,Go可用性,敬请期待!文章博客地址:https://lailin.xyz

猜你喜欢