前言延迟队列是我们在日常开发过程中经常接触和需要用到的技术方案。前段时间在开发业务需求的时候,也遇到了一个需要使用延迟消息队列的需求场景,所以也在网上调研了一系列延迟队列的不同实现方式,在此做一个总结,送给大家分享。延迟队列定义首先,队列的数据结构相信大家都不陌生。它是一种先进先出的数据结构。普通队列中的元素是有序的,先进入队列的元素先被取出消费;延迟队列和普通队列最大的区别体现在它的delay属性上,普通队列的元素是先进先出的,按照入队顺序处理,而延迟队列中的元素会指定一个延迟入队时间,表示希望在指定时间后能够处理。从某种意义上说,延迟队列的结构不像队列,更像是一个以时间为权重的有序堆结构。应用场景我在开发业务需求时遇到的使用场景如下。用户可以在小程序中订阅不同的微信或QQ模板消息,产品同学可以在小程序的管理端新建消息推送方案。向时间节点所有订阅模板消息的用户推送消息。如果你只服务于单个小程序,那么也许设置一个定时任务,甚至手动定时执行可以是满足这个需求最方便快捷的方式,但是我们希望抽象出一个消息订阅模块服务给所有的业务使用,这时候就需要一个通用的系统解决方案,这时候就需要用到延迟队列。除了上面我遇到过的典型需求,延迟队列的应用场景其实也很广泛,比如以下场景:用户下新订单,如果15分钟内不付款,则自动取消。公司会议预约系统会在会议预约成功后,在会议开始前半小时通知所有已预约会议的用户。安全工单超过24小时未处理,自动拉企业微信群提醒相关责任人。用户下单外卖后,当距离超时还有10分钟时,会提醒外卖小哥超时即将结束。对于数据量比较小,时效性要求不是那么高的场景,比较简单的方式就是轮询数据库,比如每秒轮询一次数据库中的所有数据,处理所有过期的数据。比如我是一个内部会议预约系统的开发者,我可能会采用这个方案,因为整个系统的数据量不会很大,提前30分钟提醒和29分钟提醒的区别会议开始前规模不大。但是,如果要处理的数据量比较大,那么实时性要求就比较高。比如淘宝上每天所有的新订单,如果15分钟内没有付款,就会自动超时,数量级可达数百万甚至数千万。如果这时候敢去轮询数据库,可能是怕被老板打死,如果不被老板打死,很可能会被你的操作打死和维修同学。在这种场景下,就需要用到我们今天的主角——延迟队列。延迟队列为我们处理大量需要延迟消费的消息提供了一种高效的解决方案。那么废话不多说,下面我们就来看看几种常见的延迟队列方案以及各自的优缺点。RedisZSet的实现方案我们知道Redis有一个有序的集合数据结构ZSet,ZSet中的每一个元素都有一个对应的Score,ZSet中的所有元素都按照其Score进行排序。那么我们可以使用Redis的ZSet通过以下操作来实现一个延迟队列:Enqueue操作:ZADDKEY时间戳任务,我们将需要处理的任务根据其需要延迟处理的时间作为一个Score添加到ZSet中。Redis的ZAdd的时间复杂度是O(logN),N是ZSet中元素的个数,所以我们可以比较高效的进行入队操作。定时(比如每秒)启动一个进程,通过ZREANGEBYSCORE方法查询ZSet中Score最小的元素。具体操作是:ZRANGEBYSCOREKEY-inf+inflimit01WITHSCORES。查询结果有两种情况:查询得到的分数小于等于当前时间戳,说明任务需要执行,任务会被异步处理;b.查询得到的分数大于当前时间戳。查询操作提取得分最小的元素,所以说明ZSet中的所有任务都还没有到执行时间,休眠一秒后继续查询;同样,ZRANGEBYSCORE操作的时间复杂度是O(logN+M),其中N是ZSet中的元素个数,M是要查询的元素个数,所以我们的常规查询操作也更加高效。这里从网上转过来一套Redis实现延迟队列的后端架构。它在原有的RedisZSet实现上进行了一系列的优化,使得整个系统更加稳定健壮,能够应对高并发场景,并且具有更好的扩展性,是一个非常好的架构设计。其整体架构图如下:其核心设计思想:通过哈希算法将延迟消息任务路由到不同的RedisKey有两个优点:解决了当一个KEY存放更多延迟消息时,入队操作和查询操作速度变慢的问题(这两个操作的时间复杂度都是O(logN))。b.系统具有更好的横向扩展性。当数据量激增时,我们可以通过增加RedisKey的数量来快速扩展整个系统,以抵抗数据量的增长。每个RedisKey对应一个处理进程,称为Event进程,通过上面第2步中描述的ZRANGEBYSCORE方法轮询Key,检查是否有延迟消息需要处理。所有Event进程只负责分发消息,具体的业务逻辑通过额外的消息队列异步处理。这样做的好处也很明显:a.速度非常快,而且由于复杂的业务逻辑导致消息堆积的可能性较小。b.另一方面,采用额外的消息队列后,消息处理的扩展性会更好,我们可以通过增加消费者进程的数量来扩展整个系统的消息处理能力。Event进程采用Zookeeper选择master进程部署方式,避免Event进程宕机后RedisKey中消息堆积。一旦Zookeeper的Leader主机宕机,Zookeeper会自动选择一个新的Leader主机来处理RedisKey中的消息。从上面的讨论可以看出,通过RedisZset实现延迟队列是一种理解起来更直观,可以快速实现的方案。而我们可以依靠Redis自身的持久化来实现持久化。使用Redis集群来支持高并发和高可用是延迟队列的一个很好的实现。RabbitMQRabbitMQ本身并不直接提供对延迟队列的支持。我们依靠RabbitMQ的TTL和死信队列功能来实现延迟队列的效果。那我们先来了解一下RabbitMQ的死信队列和TTL功能。死信队列死信队列实际上是RabbitMQ的一种消息处理机制。RabbmitMQ在生产和消费消息时,遇到以下情况消息会变成“死信”:已被消费,即TTL已过期消息队列已达到最大长度一旦消息成为死信,将重新投递到死信交换(Dead-Letter-Exchange),然后死信letterswitch根据绑定规则转发到对应的死信队列,通过监听队列可以重新消费消息。消息生命周期TTLTTL(Time-To-Live)是RabbitMQ的一个高级特性,它表示一条消息的最大生命周期,以毫秒为单位。如果一个消息在TTL设置的时间内没有被消费,它就会变成死信,进入我们上面提到的死信队列。有两种不同的方法来设置消息的TTL属性。一种方式是在创建队列的时候直接设置整个队列的TTL过期时间。所有进入队列的消息都设置了统一的过期时间。一旦消息过期,将被立即丢弃并进入死信队列。参考代码如下:Map
