当前位置: 首页 > 后端技术 > Java

如何使用redis实现限流

时间:2023-04-02 02:03:33 Java

如何使用redis实现限流首发于Dale的博客背景在工作中,我们经常会遇到需要对某个接口或者某个调用进行限流的需求。在涉及到限流的情况下也会遇到redis数据的一些处理。在分布式场景下,需要操作的原子性。限流算法主流的限流算法有以下四种:计数器(固定窗口)、滑动窗口(分段计数器)、漏桶算法、令牌桶算法。对于算法的讲解,网上有很多不错的文章。流算法。在本文中,讨论前两个,即计数器和滑动窗口。业务解释限流是业务中经常遇到的场景。例如:接口限流,调用限流等。以接口限流为例,流程如下:请求打到服务器后,需要判断当前接口是否达到阈值,如果达到阈值,则请求结束。如果没有达到阈值,就计数++,继续调用下一步。限制电流的方法有很多。如果只是一个小型的单机部署应用,可以考虑在内存中计数运行。对于复杂项目和分布式部署项目,可以考虑使用redis进行统计。而且限流逻辑也不必局限于Java代码,你也可以在nginx中使用lua来操作,比如大名鼎鼎的openresty,其他网关服务也可以实现。分布式业务中的限流先分析业务场景。在分布式部署的API场景中,需要注意以下几点:使用网关对API进行负载均衡,部署在不同服务器上的进程之间难以共享内存。基于限流的业务是对整个系统的一个或部分接口进行限流,因此计数必须是不同进程可读的。计数的触发发生在请求到达服务器之后,因此需要考虑原子性。即:同一时间,只有一个请求可以触发计数。这就对计数服务的需求提出了高并发要求。分析nginx+lua的可行性。Nginx经常被用作请求的入口。使用其负载均衡后,可以将请求分发到不同的服务。使用lua对内存进行操作,貌似可以达到上面的要求(可行性有待验证)。但是在实际情况下,一个系统并不一定只部署一个nginx作为入口。一方面是单机风险,另一方面是地理位置不同,同一台机器的访问速度可能因网络不同而有很大差异。所以大家更喜欢用DNS或者其他方式实现多态的nginx先做一层负载均衡。所以单靠nginx+lua是不能满足我们的需求的。分析redis的可行性。Redis是一个基于内存的非关系型数据库。它的并发性是经得起考验的。同时也可以满足不同进程对同一数据的读取和修改的需求。关于原子性,redis的操作天生就支持原子性,string类型的INCR(原子累加)操作非常适合限流业务。Redis实现限流让我们回到最初的流程。计数限流操作包括:查询当前计数和累加当前计数。在分布式系统中,我们必须始终注意原子性。在单进程中,我们保持数据线程安全的方式就是加锁。不管是可重入锁还是synchronized,它的语义都是告诉其他线程,我现在征用了这个数据(代码块),你稍后再来。在分布式系统中,我们很自然地可以想到分布式锁。伪代码如下:Locklock=getDistributedLock();try{lock.lock();//从redis获取计数Integercount=getCountFromRedis();if(count>=limit){//如果超过阈值,return不会被调用为false;}//没有超过阈值,允许调用incrRedisCount();返回真;}catch{...}finally{lock.unlock();}乍一看,这个逻辑没问题,但实际上是个大问题:使用分布式锁显然会拖慢整个系统,浪费大量资源。redis的incr操作会返回累加值,所以不需要查询操作。伪代码如下:Integercount=incrRedisCount();if(count>=limit){returnfalse;}returntrue;是不是简单多了。但随之而来的还有其他问题。大部分业务并不要求我们只限制次数,更多的是要求我们限制一段时间内对接口的请求次数----滑动窗口。滑动窗口的实现顾名思义,滑动窗口就是把一个固定的窗口往上滑动。对于限流,在一段时间内进行计数,时间结束后立即开始新的计数。如何实现这个逻辑一段时间?其实很简单,我们可以使用timestamp来实现这个功能。//二级时间戳longtimestamp=System.currentTimeMillis()/1000;LongaLong=redisTemplate.opsForValue().increment(RedisKeyEnum.SYSTEM_FLOW_LIMIT.getKey()+时间戳);返回一个龙;这时候就会出现一个问题,如果按照上面的代码,如果每秒创建一个key,redis内存迟早会爆。我们需要一个策略来删除这个键。用一种愚蠢的方式,你可以记录这些键,然后异步删除它们。但是更好的方法是在第一次创建密钥时设置一个比窗口稍大的过期值。所以,代码如下:/***统计秒发送的消息数**@return*/publicLonggetSystemMessageCountAtomic(){//秒级时间戳longtimestamp=System.currentTimeMillis()/1000;LongaLong=redisTemplate.opsForValue().increment(RedisKeyEnum.SYSTEM_FLOW_LIMIT.getKey()+timestamp);if(aLong!=null&&aLong==1){redisTemplate.expire(RedisKeyEnum.SYSTEM_FLOW_LIMIT.getKey()+timestamp,2,TimeUnit.SECONDS);}返回一个龙;expire命令只会在第一次计数时执行。为什么需要设置一个比窗口稍大的时间呢?想象一下,如果你设置的时间和window一样,keykeyA是在时间a生成的,过期时间是一秒。然后b时刻,生成的key也是keyA(同一秒内),但是由于网络或者其他原因,b时刻的命令在一秒后发送到redis服务器。由于过期时间为一秒,此时旧的keyA已经过期,那么会在b时刻创建一个新的key。此时,需要考虑另一个问题,如果超过限制,上述代码将如何表现。假设,一秒内只允许100个请求。那么第101次,在redis中也会执行incr命令,执行后续的请求。其实这些命令的执行是没有意义的,因为在第101次的时候,这一秒的请求已经达到了极限,所以我们需要另外一个存储来记录上面的数据。我选择了AtomicLong来记录已达到其限制的窗口。分析是否可行。AtomicLong属于java.util.concurrent.atomic包,使用CAS和volatile保证数据的线程安全。对于以上需求,我们只需要在单机上记录flag,不考虑分布式情况。讨论可行,代码如下所示。privatefinalAtomicLongflag=newAtomicLong();/***系统全局流量限制*/publicvoidsystemFlowLimit(){//判断flag是否与当前秒相同if(flag.get()!=System.currentTimeMillis()/1000){//由于合并后flag.get和flag.set之间的所有操作都不是原子的,小于线程数的线程会进入这里。//意思是,当第一个线程设置flag为当前二级时间戳时,已经有线程执行了flag.get的判断逻辑//此时,有线程会继续redis操作和log操作longcount=systemLimitService.getSystemMessageCountAtomic();if(count>=systemProperties.getFlowLimit()){//超过后,将flag设置为当前秒flag.set(System.currentTimeMillis()/1000);LOGGER.warn("系统流量现在超出系统流量限制,在:{}",System.currentTimeMillis()/1000);抛出新的BusinessException(...);}}else{抛出新的BusinessException(...);}}以上整理总结除了一些使用redis做限流的方法外,常用的算法是滑动窗口,所以花了不少时间来讲解滑动窗口的实现。当然,我们也可以使用lua脚本来操作redis,实现限流等redis操作的配合。我经常遇到的一个场景是,向redis队列写入数据需要限流,当流量到达时,需要删除redis队列中的一些内容。此时使用lua脚本可以优雅的维护多个redis操作的原子性,也可以减少网络条件的开销。