当前位置: 首页 > 科技观察

深入理解RocketMQ延迟消息

时间:2023-03-20 21:25:59 科技观察

延迟消息在实际开发中是一个非常有用的功能。本文第一部分从整体上介绍了秒级精度延时消息的实现思路。第二部分结合RocketMQ延迟消息的实现,进行了详细的Explain,指出了关键部分的源码。第三步介绍延迟消息和消息重试的关系。1延迟消息介绍基本概念:延迟消息是指生产者发送消息后,不能立即被消费者消费,需要等待指定的时间才能消费。场景案例:用户下单后,需要在指定时间内(比如30分钟)付款,可以发送消息提醒用户在到期日之前付款。一些消息中间件的Broker端内置了延迟消息支持能力,比如:NSQ:这是一个go语言的消息中间件,通过内存中的优先级队列存储延迟消息,支持秒级精度,及以上延迟2小时。Java中也有相应的实现。例如,ScheduledThreadPoolExecutor实际上在内部使用了一个优先级队列。QMQ:用双时轮实现。可以参考:随时消息延迟原理详解:设计与实现RabbitMQ:需要安装一个rabbitmq_delayed_message_exchange插件。RocketMQ:RocketMQ开源版延时消息暂存在一个内部topic中,不支持任意时间精度,支持特定级别,如定时5s、10s、1m等。broker内置延时消息处理能力,以及核心实现思路是一样的:通过临时存储临时存储延迟消息,过期后投递到目标主题。如下图所示:步骤描述如下:生产者向某个主题发送延迟消息。broker判断为延迟消息后,将其暂存在临时存储中。Broker内部通过延迟服务(delayservice)检查消息是否过期,并将过期的消息投递到目标Topic。这个延迟服务的名字是delayservice,不同的消息中间件的延迟服务模块的名字可能不一样。消费者在目标主题中消费延迟传递的消息显然,临时存储模块和延迟服务模块是实现延迟消息的关键。上图中,暂存和延时服务都是在Broker内部实现的,对业务是透明的。另外,有些消息中间件本身并不支持延迟消息,比如Kafka。这种情况下可以选择改造Kafka,但是成本比较高。另一种方法是使用第三方临时存储并添加一层代理。第三方存储选型要求:对于第三方临时存储,需要满足以下特点:高性能:写入延迟要低。MQ的一个重要作用就是削峰填谷。选择临时存储时,写入性能一定要高,而关系型数据库(如Mysql)通常达不到要求。高可靠性:延时消息写入后,不会丢失。它需要持久化和备份。排序支持:支持按某个字段对消息进行排序。对于延迟消息,需要按时间排序。普通消息通常会先发送,会先被消费。延迟消息不同于普通消息,需要进行排序。比如先发送一条延时10s的消息,再发送一条延时5s的消息,那么后面发送的消息需要先消费。支持长期保存:部分业务的延迟消息需要延迟数月甚至更长时间,因此延迟消息必须长期保留。但是一般不建议延迟太久,存储成本比较高,而且业务逻辑可能发生了变化,所以没有必要去消费这些消息。比如滴滴开源的消息中间件DDMQ,在底层消息中间件的基础上增加了一层proxy,独立部署了延时服务模块,使用rocksdb进行暂存。rocksdb是一个高性能的KV存储,支持排序。此时延迟消息的流程如下图所示:描述如下:生产者将其发送给生产者代理,代理会判断为延迟消息并投递到缓冲的主题;延迟服务启动消费者从缓冲的topic中读取消费延迟消息以时间为key存储在rocksdb中;延迟服务判断消息过期后,将其投递到目标Topic。这种方式消费者消费目标topic中数据的好处在于,由于延迟服务的延迟传递能力是独立于broker实现的,不需要对broker做任何修改,它可以提供能力支持任何MQ类型的延迟消息。比如DDMQ为RocketMQ和Kafka都提供了秒级精度的延时消息传递能力,但是Kafka本身不支持延时消息,RocketMQ支持延时消息,但是不支持秒级精度。其实DDMQ还提供了很多其他的功能。从延迟消息的角度来看,完全没有必要使用这个代理。消息直接投递到buffer主题,然后通过延迟服务完成延迟投递逻辑。具体到延迟服务模块的实现,还有一些重要的细节:1.为了保证服务的高可用,延迟服务也需要部署多个节点。2、为了保证数据不丢失,每个延迟服务节点需要消费buffertopic中的全量数据,并保存在自己的持久化存储中,做到多备份,需要时间作为钥匙。但是因为是分开拉的,所以不能保证强一致性。如果一定要强一致,那么延迟服务不需要内置存储实现,可以使用其他支持强一致性的存储。3.为了避免重复投递,延迟服务需要选择一个master,可以借助zookeeper,etcd等实现,只有master可以通过producer投递到目标topic,其他节点都在待机状态。否则,如果每个节点都投递,延迟的消息会被投递多次,导致重复消费。4.master应该在共享存储中记录它当前的交付时间。如果master挂掉了,会从slave节点中选出一个新的master节点,从之前的记录时间开始继续投递。5.延迟消息的取消:有些延迟消息可能希望在到期前取消。通常取消逻辑实现是复杂且不精确的。对于那些即将过期的消息,在取消之前可能已经发送完了,所以需要在消费者端进行检查,以防万一。2RocketMQ中的延迟消息开源的RocketMQ支持延迟消息,但不支持秒级精度。默认支持18级延迟消息,由broker端的messageDelayLevel配置项决定,如下:messageDelayLevel=1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2hBroker启动时,会在内部创建一个topic:SCHEDULE_TOPIC_XXXX,以及相应数量的队列会根据延迟级别的个数来创建,也就是说18个级别对应18个队列。注意,这并不是说这个内部topic只会有18个队列,因为Broker通常是集群部署的,所以每个节点有18个队列。可以修改延迟级别的值以满足自己的业务需求,也可以修改/添加新的级别。例如:如果要支持延迟2天,修改最后一个level的值为2d,此时还是18个level;也可以加一个2d,此时一共19关。可以看出这里不支持秒级精度。按照《rocketmq developer guide》的说法,是为了避免在broker端对消息进行排序,造成性能影响。不过笔者认为之所以不支持应该是更多的商业考虑。生产者发送延迟消息:生产者发送延迟消息非常简单,只需要设置一个延迟级别,注意不是具体的延迟时间,如:Messagemsg=newMessage();msg.setTopic("TopicA");消息。setTags("Tag");msg.setBody("thisisadelaymessage".getBytes());//设置延迟级别为5,对应延迟1分钟msg.setDelayTimeLevel(5);producer.send(msg);如果延迟设置水平超过最大值,则最大值将被重置。Broker端存储延迟消息:RocketMQBroker端延迟消息的流程如下图所示:可以看到,一共有6个步骤,下面将对这6个步骤进行详细说明:修改message主题名称和队列信息,并将消息转发给延迟服务消费延迟主题CosumeQueue中的SCHEDULE_TOPIC_XXXX消息,将信息重新存储到CommitLog,将消息传递给目标主题中的消费者。消费者消费目标主题中的数据。第一步:修改消息主题名称和队列信息,生产者存放在RocketMQBroker端。写消息时,会先写入CommitLog。然后根据消息中的Topic信息和队列信息转发到目标Topic的指定队列(ConsumeQueue)。由于消息存入ConsumeQueue后消费者可以消费,但延迟的消息不能立即消费,所以这里将topic的名称改为SCHEDULE_TOPIC_XXXX,根据延迟级别决定投递队列。同时,消息的属性中会存储消息最初发送到的目标Topic和队列信息。相关源码如下:org.apache.rocketmq.store.CommitLog#putMessage第二步:将消息转发到delaytopic的CosumeQueue。CommitLog中的消息被异步转发到CosumeQueue。在转发过程中,会对延迟消息进行特殊处理,主要是计算延迟消息什么时候需要投递。投递时间=消息存储时间(storeTimestamp)+延迟级别对应的时间。需要注意的是,计算出的投递时间会作为消息Tag的哈希值存储在CosumeQueue中。CosumeQueue单个存储单元的结构如下图所示:其中:CommitLogOffset:记录在CommitLog中的位置。Size:记录消息的大小MessageTagHashCode:记录消息Tag的哈希值,用于消息过滤。特别地,对于延迟消息,该字段记录消息的传递时间戳。这就是为什么java中的hashCode方法返回的是int类型,只占4个字节,而这里MessageTagHashCode字段设计为8个字节。相关源码见:CommitLog#checkMessageAndReturnSize第三步:延时服务消费SCHEDULE_TOPIC_XXXX消息Broker内部有一个ScheduleMessageService类,作为延时服务,消费SCHEDULE_TOPIC_XXXX中的消息,投递到目标Topic。ScheduleMessageService启动时,会创建一个定时器Timer,并根据延迟级别的数量启动相应数量的TimerTask,每个TimerTask负责一个延迟级别的消费和传递。相关源码如下:ScheduleMessageService#start需要注意的是,每个TimeTask在检查消息是否过期时,首先会检查对应队列中的第一条消息是否还未送达。不会检查。如果过期,则投递,并检查后续消息是否过期。第四步:将信息恢复到CommitLog。消息过期后,需要投递到目标Topic。由于第一步已经记录了原来的Topic和队列信息,所以这里可以重新设置,存入CommitLog。另外,由于MessageTagHashCode字段之前存储了消息的投递时间,因此在存储之前需要重新计算标签的哈希值。参见源码:DeliverDelayedMessageTimerTask的messageTimeup方法。第五步:将消息投递到目标主题这一步与第二步类似,只是消息的主题名称已更改为目标主题。因此,消息会直接投递到目标Topic的ConsumeQueue中,然后由消费者消费消息。3延迟消息与消费重试的关系RocketMQ提供了消息重试的能力。在并发模式消费失败的情况下,可以返回一个枚举值RECONSUME_LATER,稍后重试消息。如:consumer.registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,ConsumeConcurrentlyContextcontext){//处理消息,失败,返回RECONSUME_LATER,重试returnConsumeConcurrentlyStatus.RECONSUME);默认重试;}16次。使用过RocketMQ消息重试功能的用户可能看到过下图:前几次重试和最后一次重试的间隔前几次重试和最后一次重试的间隔110秒97分钟230秒108分钟31Minute119Minute42Minute1210Minute53Minute1320Minute64Minute1430Minute75Minute151Hour86Minute162Hour细心的读者发现,这16级消息重试其实删除18级延迟消息中的前两级。其实RocketMQ的消息重试也是基于延迟消息。在消息消费失败的情况下,会作为延迟消息投递回Broker。回发时,将跳过前两级,因此只会重试16次。当然,消息重试还有一些其他的设计逻辑,后面的文章会分析。