当前位置: 首页 > 科技观察

你真的知道如何实现延迟队列吗?

时间:2023-03-13 09:10:53 科技观察

前言延迟队列是我们在日常开发过程中经常接触和需要用到的技术方案。前段时间在开发业务需求的时候,也遇到了一个需要使用延迟消息队列的需求场景,所以也在网上调研了一系列延迟队列的不同实现方式,在此做一个总结,送给大家分享。延迟队列定义首先,队列的数据结构相信大家都不陌生。它是一种先进先出的数据结构。普通队列中的元素是有序的,先进入队列的元素先被取出消费;延迟队列和普通队列最大的区别体现在它的delay属性上,普通队列的元素是先进先出的,按照入队顺序处理,而延迟队列中的元素会指定一个延迟入队时间,表示希望在指定时间后能够处理。从某种意义上说,延迟队列的结构不像队列,更像是一个以时间为权重的有序堆结构。应用场景我在开发业务需求时遇到的使用场景如下。用户可以在小程序中订阅不同的微信或QQ模板消息,产品同学可以在小程序的管理端新建消息推送方案。向时间节点所有订阅模板消息的用户推送消息。如果你只服务于单个小程序,那么也许设置一个定时任务,甚至手动定时执行可以是满足这个需求最方便快捷的方式,但是我们希望抽象出一个消息订阅模块服务给所有的业务使用,这时候就需要一个通用的系统解决方案,这时候就需要用到延迟队列。除了上面我遇到过的典型需求,延迟队列的应用场景其实也很广泛,比如以下场景:用户下新订单,如果15分钟内不付款,则自动取消。公司会议预约系统会在会议预约成功后,在会议开始前半小时通知所有已预约会议的用户。安全工单超过24小时未处理,自动拉企业微信群提醒相关责任人。用户下单外卖后,当距离超时还有10分钟时,会提醒外卖小哥超时即将结束。对于数据量比较小,时效性要求不是那么高的场景,比较简单的方式就是轮询数据库,比如每秒轮询一次数据库中的所有数据,处理所有过期的数据。比如我是一个内部会议预约系统的开发者,我可能会采用这个方案,因为整个系统的数据量不会很大,提前30分钟提醒和29分钟提醒的区别会议开始前规模不大。但是,如果要处理的数据量比较大,那么实时性要求就比较高。比如淘宝上每天所有的新订单,如果15分钟内没有付款,就会自动超时,数量级可达数百万甚至数千万。如果这时候敢去轮询数据库,可能是怕被老板打死,如果不被老板打死,很可能会被你的操作打死和维修同学。在这种场景下,就需要用到我们今天的主角——延迟队列。延迟队列为我们处理大量需要延迟消费的消息提供了一种高效的解决方案。那么废话不多说,下面我们就来看看几种常见的延迟队列方案以及各自的优缺点。RedisZSet的实现方案我们知道Redis有一个有序的集合数据结构ZSet,ZSet中的每一个元素都有一个对应的Score,ZSet中的所有元素都按照其Score进行排序。那么我们可以使用Redis的ZSet通过以下操作来实现一个延迟队列:Enqueue操作:ZADDKEY时间戳任务,我们将需要处理的任务根据其需要延迟处理的时间作为一个Score添加到ZSet中。Redis的ZAdd的时间复杂度是O(logN),N是ZSet中元素的个数,所以我们可以比较高效的进行入队操作。定时(比如每秒)启动一个进程,通过ZREANGEBYSCORE方法查询ZSet中Score最小的元素。具体操作是:ZRANGEBYSCOREKEY-inf+inflimit01WITHSCORES。查询结果有两种情况:查询得到的分数小于等于当前时间戳,说明任务需要执行,任务会被异步处理;b.查询得到的分数大于当前时间戳。查询操作提取得分最小的元素,所以说明ZSet中的所有任务都还没有到执行时间,休眠一秒后继续查询;同样,ZRANGEBYSCORE操作的时间复杂度是O(logN+M),其中N是ZSet中的元素个数,M是要查询的元素个数,所以我们的常规查询操作也更加高效。这里从网上转过来一套Redis实现延迟队列的后端架构。它在原有的RedisZSet实现上进行了一系列的优化,使得整个系统更加稳定健壮,能够应对高并发场景,并且具有更好的扩展性,是一个非常好的架构设计。其整体架构图如下:其核心设计思想:通过哈希算法将延迟消息任务路由到不同的RedisKey有两个优点:解决了当一个KEY存放更多延迟消息时,入队操作和查询操作速度变慢的问题(这两个操作的时间复杂度都是O(logN))。b.系统具有更好的横向扩展性。当数据量激增时,我们可以通过增加RedisKey的数量来快速扩展整个系统,以抵抗数据量的增长。每个RedisKey对应一个处理进程,称为Event进程,通过上面第2步中描述的ZRANGEBYSCORE方法轮询Key,检查是否有延迟消息需要处理。所有Event进程只负责分发消息,具体的业务逻辑通过额外的消息队列异步处理。这样做的好处也很明显:a.速度非常快,而且由于复杂的业务逻辑导致消息堆积的可能性较小。b.另一方面,采用额外的消息队列后,消息处理的扩展性会更好,我们可以通过增加消费者进程的数量来扩展整个系统的消息处理能力。Event进程采用Zookeeper选择master进程部署方式,避免Event进程宕机后RedisKey中消息堆积。一旦Zookeeper的Leader主机宕机,Zookeeper会自动选择一个新的Leader主机来处理RedisKey中的消息。从上面的讨论可以看出,通过RedisZset实现延迟队列是一种理解起来更直观,可以快速实现的方案。而我们可以依靠Redis自身的持久化来实现持久化。使用Redis集群来支持高并发和高可用是延迟队列的一个很好的实现。RabbitMQRabbitMQ本身并不直接提供对延迟队列的支持。我们依靠RabbitMQ的TTL和死信队列功能来实现延迟队列的效果。那我们先来了解一下RabbitMQ的死信队列和TTL功能。死信队列死信队列实际上是RabbitMQ的一种消息处理机制。RabbmitMQ在生产和消费消息时,遇到以下情况消息会变成“死信”:已被消费,即TTL已过期消息队列已达到最大长度一旦消息成为死信,将重新投递到死信交换(Dead-Letter-Exchange),然后死信letterswitch根据绑定规则转发到对应的死信队列,通过监听队列可以重新消费消息。消息生命周期TTLTTL(Time-To-Live)是RabbitMQ的一个高级特性,它表示一条消息的最大生命周期,以毫秒为单位。如果一个消息在TTL设置的时间内没有被消费,它就会变成死信,进入我们上面提到的死信队列。有两种不同的方法来设置消息的TTL属性。一种方式是在创建队列的时候直接设置整个队列的TTL过期时间。所有进入队列的消息都设置了统一的过期时间。一旦消息过期,将被立即丢弃并进入死信队列。参考代码如下:Mapargs=newHashMap();args.put("x-message-ttl",6000);channel。queueDeclare(queueName,durable,exclusive,autoDelete,args);当延迟队列的延迟时间为固定值时,这种方法比较适用。另一种方法是为单个消息设置它。参考代码如下。消息设置为6秒的过期时间:AMQP.BasicProperties.Builderbuilder=newAMQP.BasicProperties.Builder();builder.expiration("6000");AMQP。BasicPropertiesproperties=builder.build();channel.basicPublish(exchangeName,routingKey,mandatory,properties,"msgcontent".getBytes());如果需要为不同的消息设置不同的延迟时间,上述队列的TTL设置不能满足我们的需要,需要对单条消息使用此TTL设置。但是需要注意的是,这样设置TTL,消息可能不会按时消亡,因为RabbitMQ只会检查第一条消息是否过期。例如,在这种情况下,如果第一条消息的TTL为20秒,第二条消息的TTL为10秒,那么RabbitMQ将等到第一条消息过期后再让第二条消息过期。这个问题的解决方法也很简单,只需要安装一个RabbitMQ的插件:https://www.rabbitmq.com/community-plugins.html安装这个插件后,所有的消息都可以根据TTL已过期。RabbitMQ实现了延迟队列。介绍完RabbitMQ的死信队列和TTL这两个特性后,我们离延迟队列的实现只差一步了。聪明的读者可能已经发现,TTL不是延迟队列中的消息被延迟的时间吗?如果我们对需要延迟的消息设置TTL作为延迟时间,将其投递到RabbitMQ的普通队列中,并且永不消费,那么在TTL时间过后,消息会自动投递到死信队列中。这时候我们使用消费进程实时消费死信队列中的消息,实现了延迟队列的效果。从下图中,我们可以直观的看到使用RabbitMQ实现延迟队列的整体流程:使用RabbitMQ实现延迟队列,我们??可以很好的利用RabbitMQ的一些特性,比如消息的可靠发送,消息的可靠传递,以及死信队列保证消息至少被消费一次且没有被正确处理的消息不会被丢弃。另外通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单节点故障导致延迟队列不可用或者消息丢失。TimeWheelTimeWheel时间轮算法是一种巧妙高效的延迟队列实现算法,在Netty、Zookeeper、Kafka等各种框架中都有应用。时间轮如上图所示。时间轮是一个循环队列,用于存储延迟消息。它的底层是用数组实现的,可以高效的遍历。循环队列中的每个元素对应一个延迟任务列表,这是一个双向循环链表,链表中的每一项代表一个需要执行的延迟任务。时轮会有刻度盘指针,指示时轮当前时间。随着时间的推移,指针会不断前进,处理相应位置的延迟任务列表。添加延迟任务由于时间轮的大小是固定的,时间轮中的每个元素都是一个双向循环链表,我们可以向时间轮中添加延迟任务,时间复杂度为O(1)。如下图,比如我们有这样一个时间轮。当表盘指针指向当前时间2时,我们需要添加一个延迟3秒的新任务。我们可以快速计算出延迟任务在时间轮中对应的位置是5,并在5的位置添加到任务列表的末尾。多层时间轮到此为止已经很不错了,但是细心的同学可能有发现上面时间轮的大小是固定的,只有12秒。如果此时我们有一个任务需要延迟200秒,怎么办?是不是直接扩大了整个时间轮的体积?这显然是不可取的,因为如果我们这样做,我们需要维护一个非常非常大的时间轮,内存是无法接受的,而且底层数组很大时寻址效率会下降,影响性能。为此,Kafka引入了多层时间轮的概念。其实,多层时间轮的概念,和我们机械表上的时分秒针概念非常相似。当当前时间不能仅用秒针表示时,就用分针和秒针一起表示。同样,当任务的到期时间超过当前时间轮所代表的时间范围时,会尝试将其添加到上层时间轮,如下图:第一层时间轮为0-12秒,第二层时间轮每一格所能代表的时间范围,就是整个第一层时间轮所代表的范围,也就是12秒,所以时间范围整个秒级时间轮可以表示的就是12*12=144秒,以此类推。第三层时间轮所能代表的范围是1728秒,第四层是20736秒,以此类推。比如现在我们需要添加一条延迟消息,延迟200秒,发现已经超出了一级时间轮可以代表的时间范围,所以需要继续看上层时间轮加到二级时间轮200/12=17,然后我们发现17超出了二级时间轮的表示范围,那么我们需要继续往上查找加到的位置17/12=2的第三个时间轮位置。Kafka中时间轮算法添加延时任务,促进时间轮滚动的核心流程如下,其中Bucket是时间轮中的延时任务队列,Kafka引入的DelayQueue解决了时间轮效率低的问题大部分Buckets为空导致的滚动:时间轮实现的延迟队列可以支持大量任务的高效触发。并且在Kafka的时间轮算法的实现中,还引入了DelayQueue,通过DelayQueue来推动时间轮滚动,延时任务的增删都放在时间轮中。这种设计大大提高了整个延迟队列的性能。效力。总结延迟队列在我们的日常开发中被广泛使用。本文介绍三种不同的延迟队列实现方案。三种方案各有特点。比如Redis的实现方案是最容易理解的,可以快速实现,但是Redis毕竟是基于内存的。虽然有数据持久化的方案,但是仍然存在数据丢失的可能。RabbitMQ的实现,由于RabbitMQ本身消息可靠发送、消息可靠投递、死信队列等特性,可以保证消息至少被消费一次,没有被正确处理的消息不会被消费丢弃,从而提高消息的可靠性。保证。最后,Kafka的时间轮算法是三种实现方案中最难理解的,但也是一个非常巧妙的实现方案。最后希望以上内容可以帮助大家在实现自己的延迟队列时提供一些思路。