当前位置: 首页 > 科技观察

Go分布式令牌桶限流+背包保证

时间:2023-03-12 22:00:11 科技观察

本文转载自微信公众号《微服务实战》,作者欧阳安。转载本文请联系微服务实践公众号。上一篇文章提到固定时间窗口限流无法应对突发的请求泛滥情况,而本文介绍的令牌桶线算法可以更好地应对这种场景。工作原理生产令牌在单位时间内以恒定速率放入桶中,直到达到桶容量上限。处理请求,每次尝试获取一个或多个token,获取到则处理请求,失败则拒绝请求。优缺点优点可以有效处理瞬时突发流量,桶内存令牌可以作为流量缓冲区,平滑处理突发流量。缺点是实现比较复杂。代码实现core/limit/tokenlimit.go在分布式环境下,考虑使用redis作为bucket和token的存储容器,使用lua脚本实现整个算法流程。redislua脚本--每秒生成的token个数就是token生成速度localrate=tonumber(ARGV[1])--bucketcapacitylocalcapacity=tonumber(ARGV[2])--当前时间戳localnow=tonumber(ARGV[3])--当前请求的token数量localrequested=tonumber(ARGV[4])--需要多少秒才能填满桶localfill_time=capacity/rate--向下舍入,ttl是填满时间的两倍localttl=math.floor(fill_time*2)--当前时间桶容量locallast_tokens=tonumber(redis.call("get",KEYS[1]))--如果当前桶容量为0,表示是第一次进入,默认容量为bucket最大容量iflast_tokens==nilthenlast_tokens=capacityend--最后一次刷新的时间locallast_refreshed=tonumber(redis.call("get",KEYS[2]))--第一次进入,settherefreshtimeto0iflast_refreshed==nilthenlast_refreshed=0end--距离上次请求的时间跨度localdelta=math.max(0,now-last_refreshed)--距离上次请求的时间跨度equest,总共可以生产的token数量,如果超过最大容量,多余的token会被丢弃是否足够localallowed=filled_tokens>=requested--剩余bucket数量localnew_tokens=filled_tokens--允许申请本次token,计算剩余数量,KEYS[1],ttl,new_tokens)--设置刷新时间redis.call("setex",KEYS[2],ttl,now)returnallowedtokenbucketcurrentlimiterdefinitiontypeTokenLimiterstruct{//每秒生产率rateint//桶容量burstint//存储容器store*redis.Redis//rediskeytokenKeystring//桶刷新timekeytimestampKeystring//lockrescueLocksync.Mutex//redis健康标识redisAliveuint32//redis失败使用进程内令牌桶限流器rescueLimiter*xrate.Limiter//redis监控检测任务标识monitorStartedbool}funcNewTokenLimiter(rate,burstint,store*redis.Redis,keystring)*TokenLimiter{tokenKey:=fmt.Sprintf(tokenFormat,key)timestampKey:=fmt.Sprintf(timestampFormat,key)return&TokenLimiter{rate:rate,burst:burst,store:store,tokenKey:tokenKey,timestampKey:timestampKey,redisAlive:1,rescueLimiter:xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)),burst),}}获取令牌func(lim*TokenLimiter)reserveN(nowtime.Time,nint)bool{//判断redis是否健康//redis失败时使用进程内限流器//包保ifatomic.LoadUint32(&lim.redisAlive)==0{returnlim.rescueLimiter.AllowN(now,n)}//执行脚本gettokenresp,err:=lim.store.Eval(script,[]string{lim.tokenKey,lim.timestampKey,},[]string{strconv.Itoa(lim.rate),strconv.Itoa(lim.burst),strconv.FormatInt(now.Unix(),10),strconv.Itoa(n),})//redisallowed==false//Luabooleanfalse->rNilbulkreply//key的特殊处理不存在iferr==redis.Nil{returnfalse}elseiferr!=nil{logx.Errorf("failtouseratelimiter:%s,usein-processlimiterforrescue",err)//执行异常,启动redis健康检测任务//同时进程内限流器作为底线lim.startMonitor()returnlim.rescueLimiter.AllowN(now,n)}code,ok:=resp.(int64)if!ok{logx.Errorf("failtoevalredisscript:%v,usein-processlimiterforrescue",resp)lim.startMonitor()returnlim.rescueLimiter.AllowN(now,n)}//redisallowed==true//Luabooleantrue->rintegerreplywithvalueof1returncode==1}redis的设计故障策略很贴心细节,当redis不可用时,启动单机版的ratelimit作为备用限流器,保证基本的ratelimit可用,服务不会被压垮//开启redis健康检测func(lim*TokenLimiter)startMonitor(){lim.rescueLock.Lock()deferlim.rescueLock.Unlock()//防止重复开启iflim.monitorStarted{return}//设置任务和健康标志lim.monitorStarted=trueatomic.StoreUint32(&lim.redisAlive,0)//健康检测golim.waitForRedis()}//redis健康检测定时任务func(lim*TokenLimiter)waitForRedis(){ticker:=time.NewTicker(pingInterval)//健康检测成功回调此函数内置健康检测命令iflim.store.Ping(){//健康检测成功,设置健康标志atomic.StoreUint32(&lim.redisAlive,1)return}}}项目地址https://github.com/zeromicro/go-zero欢迎来到go-zero和star支持我们!