当前位置: 首页 > 科技观察

通过SingleFlight模式学习Go并发编程

时间:2023-03-19 13:32:35 科技观察

本篇阅读源码位于core/syncx/singleflight.go。SingleFlight在go-zero中的作用是将并发请求合并为一个请求,以减轻底层服务的压力。应用场景1、查询缓存时,合并请求,提高服务性能。假设有一个IP查询服务。用户每次请求查询缓存中某个IP的归属地,如果缓存中有结果则直接返回,如果不存在则进行IP解析操作。如上图所示,n个用户请求查询同一个IP(8.8.8.8)会对应n个Redis查询。在高并发场景下,如果能将n个Redis查询合并为一个Redis查询,那么性能肯定会更好。改进了很多,使用SingleFlight实现请求合并。效果如下:2.防止缓存崩溃。缓存击穿问题是指:在高并发场景下,大量请求同时查询一个key。如果key恰好过期,就会向数据库发送大量请求,导致数据库连接数和负载增加。通过SingleFlight,可以合并对同一个Key的并发请求,只发送一个请求到数据库查询,其他请求共享同一个结果,可以大大提高并发能力。直接应用于代码:funcmain(){round:=10varwgsync.WaitGroupbarrier:=syncx.NewSingleFlight()wg.Add(round)fori:=0;我<圆;i++{gofunc(){deferwg.Done()//启用10个??协程模拟获取缓存操作val,err:=barrier.Do("get_rand_int",func()(interface{},error){time.Sleep(time.Second)returnrand.Int(),nil})iferr!=nil{fmt.Println(err)}else{fmt.Println(val)}}()}wg.Wait()}以上代码模拟10个协程请求Redis获取某个key的内容,代码很简单,就是执行Do()方法。其中,接收两个参数,第一个参数是获取资源的标识,可以是redis中缓存的key,第二个参数是一个匿名函数,封装了要做的业务逻辑。最终结果如下:从上面可以看出,10个协程都得到了相同的结果,即只有一个协程真正执行了rand.Int()得到了一个随机数,其他协程共享了这个结果。源码分析,先看代码结构:type(//定义接口,有Do和DoEx两个方法,其实逻辑是一样的,DoEx多了一个标识,主要看一下就够了DoSingleFlight接口的逻辑{Do(keystring,fnfunc()(interface{},error))(interface{},error)DoEx(keystring,fnfunc()(interface{},error))(interface{},bool,error)}//定义call结构体callstruct{wgsync.WaitGroup//用于实现一个调用通过,其他调用阻塞valinterface{}//表示调用的返回结果calloperationerrerror//表示调用操作出错}//整体控制结构,实现SingleFlight接口flightGroupstruct{callsmap[string]*call//不同的调用对应不同的keylocksync.Mutex//使用锁来控制请求})并查看核心Do方法的作用:func(g*flightGroup)Do(keystring,fnfunc()(interface{},error))(interface{},error){c,done:=g.createCall(key)如果完成{returnc.val,c.err}g.makeCall(c,key,fn)returnc.val,c.err}代码很简单,使用g.createCall(key)向key发起调用请求(其实就是做一件事),如果已经有其他协程已经阻塞了调用请求(doneistrue),等待结果后直接返回。如果done为false,表示当前协程是第一个发起调用的协程,然后执行g.makeCall(c,key,fn)真正发起调用请求(之后其他协程会阻塞在g.创建调用(键))。从上图可以看出,其实有两个关键步骤:判断第一个请求的协程(使用map)阻塞所有其他协程(使用sync.WaitGroup),看g.createCall(key)是如何实现的:func(g*flightGroup)createCall(keystring)(c*call,donebool){g.锁。Lock()如果c,ok:=g.调用[键];好的{g。锁。解锁()C。wg.Wait()returnc,true}c=new(call)c.wg.Add(1)g.calls[key]=cg.lock.Unlock()returnc,false}先看第一步:判断是第一个请求的协程(使用map)g.lock.Lock()ifc,ok:=g.calls[key];ok{g.lock.Unlock()c.wg.Wait()returnc,true}这里判断map中的key是否存在。如果已经存在,说明其他协程已经在请求了。当前协程只需要等待即可。等待是通过sync.WaitGroup的Wait()方法实现的,这里还是很巧妙的。需要注意的是map在Go中并不是并发安全的,所以需要加锁。再看第二步:阻塞所有其他协程(使用sync.WaitGroup)c=new(call)c.wg.Add(1)g.calls[key]=c因为它是第一个发起调用的协程,所以需要新的调用,然后wg.Add(1),对应上面的wg.Wait(),阻塞剩下的协程。然后将新呼叫放入地图中。注意此时只是完成了初始化,并没有真正执行调用请求。真正的处理逻辑在g.makeCall(c,key,fn)中。func(g*flightGroup)makeCall(c*call,keystring,fnfunc()(interface{},error)){deferfunc(){g.lock.Lock()delete(g.calls,key)g.lock.Unlock()c.wg.Done()}()c.val,c.err=fn()}这个方法做的事情很简单,就是执行传入的匿名函数fn()(即真正的调用请求做)。最后的处理(通过defer)也分为两步:删除map中的key,让下一次请求可以得到新的value。调用wg.Done()让所有之前阻塞的协程得到结果并返回。至此,SingleFlight的核心代码解析完毕。代码虽然不长,但是思路还是很棒的,可以在实际工作中借鉴。SummaryMap不是并发安全的,记得加锁。熟练使用sync.WaitGroup完成需要阻塞控制协程的应用场景。使用匿名函数fn来封装和传递具体的业务逻辑,在调用fn的上层函数中完成统一的逻辑处理。项目地址https://github.com/zeromicro/go-zero