当前位置: 首页 > 科技观察

几种限流算法的Go语言实现

时间:2023-03-16 14:52:54 科技观察

1.漏桶算法算法思想和令牌桶是“逆向”算法。从桶中取出请求进行响应。如果桶满了,会直接返回错误码或者请求频率超过限制的页面。db可以承受访问数据库。不太适合在电商抢购、微博热点事件等场景限流。首先,应对突发流量不是很灵活。其次,它为每个user_id/ip维护一个队列(bucket),worker从这些队列中拉取。抓取任务会消耗大量资源。Go语言的实现通常是使用队列来实现的。在go语言中,可以通过缓冲通道快速实现。在通道中添加任务,开启一定数量的worker从通道中获取任务执行。packagemainimport("fmt""sync""time")//每次请求过来的时候,把需要执行的业务逻辑封装成一个Task,放到一个桶里,等待worker取出来执行ittypeTaskstruct{handlerfunc()Result//worker从桶中取出request对象后要执行的业务逻辑函数。resChanchanResult//等待worker执行并返回channeltaskIDint}//封装业务逻辑类型Resultstruct{}//模拟业务逻辑函数funchandler()Result{time.Sleep(300*time.Millisecond)returnResult{}}funcNewTask(idint)Task{returnTask{handler:handler,resChan:make(chanResult),taskID:id,}}//漏桶类型LeakyBucketstruct{BucketSizeint//桶的大小NumWorkerint//获取同时从桶中执行任务的worker个数bucketchanTask//存储任务的桶}funcNewLeakyBucket(bucketSizeint,numWorkerint)*LeakyBucket{return&LeakyBucket{BucketSize:bucketSize,NumWorker:numWorker,bucket:make(chanTask,bucketSize),}}func(b*LeakyBucket)validate(taskTask)bool{//如果桶满了,返回falseselect{caseb.bucket<-task:default:fmt.Printf("request[id=%d]isrefused\n",task.taskID)returnfalse}//等待f或orkerexecutes<-task.resChanfmt.Printf("request[id=%d]isrun\n",task.taskID)returntrue}func(b*LeakyBucket)Start(){//启动worker从bucketFetch任务执行gofunc(){fori:=0;我100ms,则添加(now-last)/100毫秒令牌。然后,如果token个数>0,则token个数-1继续执行后续业务逻辑,否则返回请求频率超过限制的错误码或页面。packagemainimport("fmt""sync""time")//并发访问同一个user_id/ip的记录需要加锁varrecordMumap[string]*sync.RWMutexfuncinit(){recordMu=make(map[string]*sync.RWMutex)}funcmax(a,bint)int{ifa>b{returna}returnb}typeTokenBucketstruct{BucketSizeint//桶中的容量:最多可以存储多少个令牌令牌率时间。Duration//多长时间生成一个token记录map[string]*record//user_id/ip的错误访问记录}//时间戳和上次访问的token个数typerecordstruct{lasttime.Timetokenint}funcNewTokenBucket(bucketSizeint,tokenRatetime.Duration)*TokenBucket{return&TokenBucket{BucketSize:bucketSize,TokenRate:tokenRate,records:make(map[string]*record),}}func(t*TokenBucket)getUidOrIp()字符串{//获取请求用户的user_id或ip地址return"127.0.0.1"}//获取user_id/ip上次访问的时间戳和token号func(t*TokenBucket)getRecord(uidOrIpstring)*record{ifr,好的:=t.records[uidOrIp];ok{returnr}return&record{}}//保存user_id/ip最近一次请求的时间戳和token数量func(t*TokenBucket)storeRecord(uidOrIpstring,r*record){t.records[uidOrIp]=r}//验证是否可以获得tokenfunc(t*TokenBucket)validate(uidOrIpstring)bool{//并发修改同一用户记录的写锁rl,ok:=recordMu[uidOrIp]if!ok{varmusync.RWMutexrl=&murecordMu[uidOrIp]=rl}rl.Lock()deferrl.Unlock()r:=t.getRecord(uidOrIp)now:=time.Now()ifr.last.IsZero(){//第一次访问初始化为最大令牌数r.last,r.token=now,t.BucketSize}else{ifr.last.Add(t.TokenRate).Before(now){//如果上次请求的间隔超过令牌速率//添加令牌并更新lastr.token+=max(int(now.Sub(r.last)/t.TokenRate),t.BucketSize)r.last=now}}varresultboolifr.token>0{//如果token个数大于1,取一个token,validate结果为truer.token--result=true}//保存最新记录t.storeRecord(uidOrIp,r)returnresult}//返回是否限流func(t*TokenBucket)IsLimited()bool{return!t.validate(t.getUidOrIp())}funcmain(){tokenBucket:=NewTokenBucket(5,100*time.Millisecond)fori:=0;我<6;i++{fmt.Println(tokenBucket.IsLimited())}time.Sleep(100*time.Millisecond)fmt.Println(tokenBucket.IsLimited())}3.滑动时间窗算法Algorithm滑动时间窗算法是对普通时间窗的计数进行优化。在使用普通时间窗时,我们会为每个user_id/ip维护一个KV:uidOrIp:timestamp_requestCount。假设每秒有1000个请求的限制,那么第100毫秒有一个请求,这个KV就变成了uidOrIp:timestamp_1,第200毫秒有一个请求。我们先比较距离记录的时间戳是否超过1秒。如果不是,则只更新计数。这时KV变成uidOrIp:timestamp_2。当1100ms有请求进来时,更新记录中的时间戳并重置计数,KV变为uidOrIp:newtimestamp_1正常时间窗口有问题,假设500个请求集中在前1s的最后100ms,并且500个请求集中在后面1秒的前100毫秒,200毫秒之前请求已经超过限制。但是,由于在时间窗后每1秒重置一次计数,此时无法识别请求已超过限制。对于滑动时间窗,我们可以将1ms的时间窗分成10个时隙,每个时隙统计某100ms的请求数。每隔100ms,一个新的时隙加入窗口,比当前时间早100ms的时隙在窗口外。窗口中最多可以维护10个时隙,对存储空间的消耗也比较低。适用场景与令牌桶一样,具有应对突发流量的能力。Go语言实现主要是实现滑动窗口算法。可以参考哔哩哔哩开源的kratos框架中断路器的实现,将时隙对象保存在一个循环链表中。它们实现的优点是不需要频繁创建和销毁时隙对象。下面给出了一个简单的基本实现:timeSlotstruct{timestamptime.Time//该timeSlot的时间起点countint//落入该timeSlot的请求数}funccountReq(win[]*timeSlot)int{varcountintfor_,ts:=rangewin{count+=ts.count}returncount}typeSlidingWindowLimiterstruct{SlotDurationtime.Duration//时间槽的长度WinDurationtime.Duration//滑动窗口的长度numSlotsint//窗口最大槽数windowsmap[string][]*timeSlotmaxReqint//winduration内允许的最大请求数}funcNewSliding(slotDurationtime.Duration,winDurationtime.Duration,maxReqint)*SlidingWindowLimiter{return&SlidingWindowLimiter{SlotDuration:slotDuration,WinDuration:winDuration,numSlots:int(winDuration/slotDuration),窗口:make(map[string][]*timeSlot),maxReq:maxReq,}}//获取user_id/ip时间窗口函数c(l*SlidingWindowLimiter)getWindow(uidOrIpstring)[]*timeSlot{win,ok:=l.windows[uidOrIp]if!ok{win=make([]*timeSlot,0,l.numSlots)}returnwin}func(l*SlidingWindowLimiter)storeWindow(uidOrIpstring,win[]*timeSlot){l.windows[uidOrIp]=win}func(l*SlidingWindowLimiter)validate(uidOrIpstring)bool{//相同的user_id/ip并发安全mu,ok:=winMu[uidOrIp]if!ok{varmsync.RWMutexmu=&mwinMu[uidOrIp]=mu}mu.Lock()defermu.Unlock()win:=l.getWindow(uidOrIp)now:=time.Now()//过期时间槽被移出时间窗口timeoutOffset:=-1fori,ts:=rangewin{ifts.timestamp.Add(l.WinDuration).After(now){break}timeoutOffset=i}iftimeoutOffset>-1{win=win[timeoutOffset+1:]}//判断请求是否超限varresultboolifcountReq(win)0{lastSlot=win[len(win)-1]iflastSlot.timestamp.Add(l.SlotDuration).Before(now){lastSlot=&timeSlot{时间戳:现在,计数:1}win=append(win,lastSlot)}else{lastSlot.count++}}else{lastSlot=&timeSlot{timestamp:now,count:1}win=append(win,lastSlot)}l.storeWindow(uidOrIp,赢)返回结果}func(l*SlidingWindowLimiter)getUidOrIp()string{return"127.0.0.1"}func(l*SlidingWindowLimiter)IsLimited()bool{return!l.validate(l.getUidOrIp())}funcmain(){limiter:=NewSliding(100*time.Millisecond,time.Second,10)fori:=0;我<5;i++{fmt.Println(limiter.IsLimited())}time.Sleep(100*time.Millisecond)fori:=0;我<5;i++{fmt.Println(limiter.IsLimited())}fmt.Println(limiter.IsLimited())for_,v:=rangelimiter.windows[limiter.getUidOrIp()]{fmt.Println(v.timestamp,v.count)}fmt.Println("一千年后...")time.Sleep(time.Second)fori:=0;我<7;i++{fmt.Println(limiter.IsLimited())}for_,v:=rangelimiter.windows[limiter.getUidOrIp()]{fmt.Println(v.timestamp,v.count)}}