当前位置: 首页 > 科技观察

使用Redis实现延迟队列,研究了两种方案,发现不简单

时间:2023-03-13 02:49:53 科技观察

背景前段时间有个小项目需要使用延迟任务。说到延迟任务,第一个闪过我脑海的是使用消息队列,比如RabbitMQ的死信队列或者RocketMQ的延迟队列,不过这是一个小项目,MQ没有介绍,我也不了解因为一个延时任务想引入MQ,会增加系统的复杂度,所以这个方案直接通过了。基于MQ的方法虽然不行,但是本项目使用的是Redis,所以想知道是否可以用Redis代替MQ来实现延迟队列的功能,于是查了一下有没有现成的方案,更何况,还真的为我找到了两个解决方案,而且我也仔细研究对比了这两个方案,发现要很好地实现延迟队列并不容易。侦听过期密钥基于侦听过期密钥实现延迟队列是我找到的第一个解决方案。为了了解这个解决方案的实现细节,我去官网找了一些东西。一、Redis发布订阅说到发布订阅模式,首先想到的就是MQ,不过Redis也实现了一套,和MQ小偷差不多,如图:channel的概念图中类似于MQ中topic的概念,可以将channel理解为MQ中的topic。生产者在发送消息时需要指定将消息发送到哪个通道,消费者可以通过订阅这个通道来获取消息。2.Keyspacenotifications在Redis中,有很多默认的channel,但是向这些channel发送消息的生产者并不是我们写的代码,而是Redis本身。当消费者收听这些频道时,他们可以感知到Redis中数据的变化。这个功能Redis官方称为keyspacenotifications,字面意思是keyspacenotifications。这些默认通道分为两类:以__keyspace@__:为前缀,后面是key的名字,表示监听与这个key相关的事件。比如现在有一个消费者监听了__keyspace@0__:sanyou这个频道,而sanyou是Redis中的一个普通key,那么当keysanyou被删除或者其他事件发生时,消费者就会收到sanyoukey被删除或者其他事件的消息以__keyevent@__:为前缀,后面是消息事件类型,表示监听某个事件。比如现在有一个consumer监听__keyevent@0__:expiredchannel代表监听key的过期事件。那么当一个Redis的键过期(expired)时,消费者就可以收到这个键已经过期的消息。如果把expired换成del,那么监听的就是delete事件。支持哪些活动可以在官网查询。上面的db指的是具体的数据库。Redis不是默认分为16个库,序号是0-15,所以db是0到15的数字,例子中的0是指0对应的数据库。3.延迟队列的实现原理理解了以上两个概念之后,监控过期key的实现原理应该就一目了然了。其实当key过期的时候,Redis会发布一个key过期事件到__keyevent@__:expired这个通道,只要我们的服务监听这个通道,那么我们就可以知道过期的Key,这样延迟队列功能得以实现。所以这种方式实现延迟队列只需要两步:发送延迟任务,关键是延迟消息本身,过期时间就是延迟时间。监听通道__keyevent@__:expired并处理延时任务4.演示就绪。基本概念和核心原理说完了,就该给我看代码了。巧的是,Spring已经实现了监听通道__keyevent@*__:expired的功能,__keyevent@*__:expired中的*代表一个通配符,监听所有的数据库。所以demo写起来就很简单了,只需要3步即可引入pomorg.springframework.bootspring-boot-starter-data-redis2.2.5.RELEASEorg.springframework.bootspring-boot-starter-web2.2.5。RELEASE配置类@ConfigurationpublicclassRedisConfiguration{@BeanpublicRedisMessageListenerContainerredisMessageListenerContainer(RedisConnectionFactoryconnectionFactory){RedisMessageListenerContainerredisMessageListenerContainer=newRedisMessageListenerContainer();redisMessageListenerContainer.setConnectionFactory(connectionFactory);返回redisMessageListenerContainer;}@BeanpublicKeyExpirationEventMessageListenerredisKeyExpirationListener(RedisMessageListenerCon容器redisMessageListenerContainer){returnnewKeyExpirationEventMessageListener(redisMessageListenerContainer);}}KeyExpirationEventMessageListener实现对__keyevent@*__:expiredchannel的监听当KeyExpirationEventMessageListener收到Redis释放的过期Key的消息时,会释放RedisKeyExpiredEventRedisKey事件所以我们只需要该事件就可以获取到过期消息的key,也就是延迟消息监听RedisKeyExpiredEvent事件。实现MyRedisKeyExpiredEventListener@ComponentpublicclassMyRedisKeyExpiredEventListenerimplementsApplicationListener{@OverridepublicvoidonApplicationEvent(RedisKeyExpiredEventSevent){byte(event.body);System.out.println("获取延迟消息:"+newString(body));}}整个项目目录也是用简单的代码写的。启动应用后,我直接通过Redis命令设置消息,并没有通过代码发送消息,消息的key是sanyou,value是task,value不重要,过期时间是5ssetsanyoutaskexpiresanyou5如果以上理论都正确的话,如果不出意外的话,MyRedisKeyExpiredEventListener应该可以在5s后监听到keysanyou的过期消息,相当于拿到了延迟任务,控制台会打印出延迟消息:三友。于是满怀希望,静静等待5s。.5,4,3,2,1,时间到了,我查看控制台,但是控制台并没有按预期打印上面这句话。为什么打印不出来?难道是代码写错了?正要查代码的时候,官网的一段话道出了真正的原因。上面这段话的内容我来给大家翻译一下。上面这段话主要讨论了key过期事件的时效性。首先,它提到了Redis中清除过期键的两种策略,也就是面试作文中经常背的两种:lazyclearing。当key过期后,访问时会定时清除Key。后台会定时检查一些key。如果任何密钥已过期,它将被清除。这个key直到在Redis中被清理掉,也就是真正被删除了,才会被释放。至此,我终于明白,即使我在上面的例子中设置了5s的过期时间,5s过去后,只要不满足两种清算策略,就不会有人访问sanyoukey,定时清算后台任务将不起作用。如果扫描了keysanyou,key过期事件就不会发布,自然也就不会被监听了。至于什么时候可以扫描到后台的定时清理任务,这个没有固定的时间,有可能一到过期时间就扫描,也有可能等到一定时间才扫描,这可能会导致客户端从发布到监听到的消息的时间差会大于等于过期时间,从而导致消息延迟一定时间,真的是有点坑。.5.坑除了上面测试demo时遇到的坑,经过我的深入研究,我还发现了一些比较离谱的坑。丢失消息太频繁了。Redis的丢消息和MQ不同,因为MQ有消息持久化机制。它可能只会在机器宕机时丢失一些消息,但是Redis丢失消息非常离谱。比如你的服务在重启的时候,消息会丢失消息。Redis实现的发布-订阅模式,没有消息的持久化机制。消息发布到通道时,如果没有客户端订阅该通道,消息就会丢失,不会像MQ那样持久化。当消费者订阅时,它会被消费者消费。所以,假设在服务重启的过程中,某个生产者或者Redis自己发布了一条消息到某个通道,而这个通道因为服务重启没有被监听,那么消息自然会丢失。消息消费只有广播模式Redis发布订阅模式的消息消费只有广播模式。所谓广播模式就是多个消费者订阅同一个频道,因此每个消费者都可以消费发布到这个频道的所有消息。如图所示,生产者发布一条消息,内容为sanyou,那么两个消费者都可以同时收到sanyou的消息。因此,如果通过监听通道获取延时任务,一旦有多个服务实例,就必须保证消息不能被重复处理,额外增加了代码开发量。接收所有key的某个事件不是Redis发布订阅模型的问题,而是Redis自身事件通知的问题。当消费者监听到以__keyevent@__:开头的消息时,所有的按键事件都会通知给消费者。比如消费者监听通道__keyevent@*__:expired,只要key过期,不管key是张三还是李四,消费者都能收到。所以如果只想消费某类消息的key,就得自己加一些标签,比如给消息的key加前缀。消费时,判断带前缀的key就是需要消费的任务。所以,综上所述,可以得出一个很重要的结论,就是监听Redis过期key实现延迟队列,不稳定,坑多!延迟队列有没有更靠谱的实现?这就不得不提到我研究的第二种方案了。Redisson实现了延迟队列。Redisson是Redis之子(Redisson)。它基于Redis实现了很多功能。最常用的是Redis分布式锁的实现。但是除了实现了Redis分布式锁之外,还实现了延迟队列功能的加入。先来个demo,再说说这个实现的原理。1、demo引入pomorg.redissonredisson3.13.1封装了一个RedissonDelayQueue类@Component@Slf4jpublic类RedissonDelayQueue{privateRedissonClientredissonClient;私人RDelayedQueuedelayQueue;私有RBlockingQueue阻塞队列;@PostConstructpublicvoidinit(){initDelayQueue();startDelayQueueConsumer();}privatevoidinitDelayQueue(){Configconfig=newConfig();SingleServerConfigserverConfig=config.useSingleServer();serverConfig.setAddress("redis://localhost:6379");redissonClient=Redisson.create(config);blockingQueue=redissonClient.getBlockingQueue("三友");delayQueue=redissonClient.getDelayedQueue(blockingQueue);}privatevoidstartDelayQueueConsumer(){newThread(()->{while(true){try{Stringtask=blockingQueue.take();log.info("收到延迟任务:{}",task);}catch(Exceptione){e.printStackTrace();}}},"三友-消费者").start();}publicvoidofferTask(Stringtask,longseconds){log.info("添加延迟任务:{}延迟时间:{}s",task,seconds);delayQueue.offer(任务,秒,TimeUnit.SECONDS);}}该类在创建时会初始化延迟队列,创建RedissonClient对象,然后通过RedissonClient对象获取RDelayedQueue和RBlockingQueue对象。传入队列的名字是SANYOU,名字无所谓。创建延迟队列后,将打开一个延迟任务的消费者线程。该线程会一直阻塞,通过take方法从RBlockingQueue中获取延迟任务。添加任务时,通过RDelayedQueue的offer方法添加。Controller类,通过接口添加任务,延迟时间为5s@RestControllerpublicclassRedissonDelayQueueController{@ResourceprivateRedissonDelayQueueredissonDelayQueue;@GetMapping("/add")publicvoidaddTask(@RequestParam("task")Stringtask){redissonDelayQueue.offerTask(task,5);}}启动项目,在浏览器中输入以下链接,添加任务http://localhost:8080/add?task=sanyou静静等待5秒,成功获取任务。2.实现原理见上面demo如下图所示。延迟队列在Redis内部使用的channel和数据类型SANYOU前面的前缀是固定的,在创建Redisson的时候加上。redisson_delay_queue_timeout:SANYOU,sortedset数据类型,存储所有延迟任务,按照延迟任务的过期时间戳(提交任务时的时间戳+延迟时间)排序,所以list的第一个元素就是整个delay的队列中最早执行的任务,这个概念很重要redisson_delay_queue:SANYOU,list数据类型,也存储所有任务,不过研究了一下好像没什么用。.SANYOU,列表数据类型,称为目标队列。这个队列中存储的任务是已经达到延迟时间的任务,可以被消费者获取。所以上面demo中RBlockingQueue的take方法就是从这个目标队列中获取的。任务的redisson_delay_queue_channel:SANYOU是用来通知客户端启动延迟任务的通道。有了这些概念,我们再来看看整体的运行示意图。生产者在提交任务时,将任务放入redisson_delay_queue_timeout:SANYOU,得分为提交任务的时间戳+延迟时间,也就是延迟任务的过期时间戳。客户端将有一个延迟任务。为了区分,后面会说是client延迟任务。这个延时任务会发送一个lua脚本给RedisServer,Redis执行lua脚本中的命令,lua脚本是原子的。这个lua脚本主要做了两件事:从redisson_delay_queue_timeout:SANYOU中取出已经达到延迟时间的任务,保存到SANYOU的目标队列中,获取redisson_delay_queue_timeout:SANYOU中当前最早到期的延迟任务的到期时间戳,然后发布到redisson_delay_queue_channel:SANYOU频道。当客户端监听到redisson_delay_queue_channel:SANYOUchannel的消息时,会再次提交一个客户端延时任务,延时时间为消息(过期时间最早的延时任务的过期时间戳)-当前时间戳。这个时间其实就是redisson_delay_queue_channel:SANYOU任务最早的过期时间剩余延迟。这里可以等10s再考虑。.这样,一旦时间到达上面提到的最早过期时间??的任务的过期时间戳,上面提到的redisson_delay_queue_timeout:SANYOU中提到的最早过期时间??的任务就已经过期了,客户端的延迟任务也在同时,于是开始执行lua脚本操作,将达到延迟时间的任务及时放入目标队列中。然后将剩余延迟任务中最早过期的任务的过期时间戳再次发布到channe中,并在本次循环中继续运行,保证redisson_delay_queue_timeout:SANYOU中的过期数据能够及时放入目标队列中。所以上面很多的主要作用就是保证到达延迟时间的任务能够及时的放入目标队列。这里还有两个比较特殊的情况,图中没有显示:第一个是如果redisson_delay_queue_timeout:SANYOU是新加入的任务(有没有队列前的任务)需要在队列中先执行,它也会发布消息到频道,然后按照上面的流程离开。添加任务代码如下,通过lua脚本的第二种特殊情况是项目启动时会执行一个客户端延时任务。项目重启时,因为没有客户端延迟执行任务,redisson_delay_queue_timeout:SANYOU队列可能过期但没有放入目标队列。重启执行一次,保证过期数据能及时释放到目标队列。3、与第一种方案的对比下面我们将第一种方案与Redisson的方案进行对比,看看第一种方案有没有什么坑。对于第一个任务延迟问题,Redisson方案理论上是没有延迟的,但是当消息数量增加,消费者消费缓慢的时候,可能会造成任务消费延迟延迟。第二个丢失消息的问题,Redisson的方案大大降低了丢失消息的可能性,因为所有的任务都存在于list和sortedset这两种数据类型中,而Redis有持久化机制,即使Redis宕机,也可能丢失一个一点数据。第三个广播消费任务的问题,这个不会出现,因为每个client都从同一个目标队列中获取任务。第四个问题是Redis内部通道发布事件的问题。如果与这种解决方案无关,它就更不可能存在。因此,从上面的对比可以看出,Redisson的实现更加靠谱。