原文:juejin.cn/post/6844904150703013901延时消息(定时消息)是指分布式异步消息场景,生产者发送一条消息,希望在指定的延迟时间或指定的时间点被消费者消费,而不是立即被消耗掉。延迟消息适用于广泛的业务场景。在分布式系统环境中,延迟消息的功能一般会下沉到中间件层。通常,此功能内置于MQ中或内聚形成公共基础服务。本文旨在探讨常见延迟消息的实现方案及方案设计的优缺点。实现方案是基于外部存储实现的。这里所说的外部存储,是指除了MQ本身自带的存储之外,引入的其他存储系统。基于外部存储的解决方案本质上是一个套路,这是MQ区别于延时模块的地方。延迟消息模块是一个独立的服务/进程。延迟消息先保留在其他存储介质中,等消息过期再投递给MQ。当然,还有一些细节设计,比如消息进入延时消息模块,已经过期,直接投递的逻辑,这里就不展开讨论了。以下解决方案的区别在于使用了不同的存储系统。它是基于关系数据库(如MySQL)延迟消息表的数据库(如MySQL)实现的。CREATETABLE`delay_msg`(`id`bigintunsignedNOTNULLAUTO_INCREMENT,`delivery_time`DATETIMENOTNULLCOMMENT'发送时间',`payloads`blobCOMMENT'消息内容',PRIMARYKEY(`id`),KEY`time_index`(`delivery_time`))通过定时线程定时扫描过期消息,然后进行投递。定时线程的扫描间隔理论上就是你延时消息的最小时间精度。推荐一个开源免费的SpringBoot最全教程:https://github.com/javastacks/spring-boot-best-practice优点:实现简单;缺点:B+Tree索引不适合大规模写入消息的场景;基于RocksDBRocksDB的最佳方案其实是在上述方案的基础上选择更合适的存储介质。RocksDB使用LSMTree,比较适合写量大的场景。滴滴开源的DDMQ中的延时消息模块Chronos就是采用了这种方案。简单来说,DDMQ项目就是在RocketMQ之外增加一个统一的代理层。在这个代理层中,可以进行一些功能维度的扩展。延时消息的逻辑是代理层实现延时消息的转发。如果是延迟消息,会先投递到RocketMQ中Chronos专用的topic。延迟消息模块Chronos消费延迟消息并转储到RocksDB中,其次是类似的逻辑,定时扫描过期消息,然后投递到RocketMQ。这个程序老实说是一个比较重的程序。因为如果是基于RocksDB实现的话,从数据可用性的角度来说,还需要自己处理多副本的数据同步等逻辑。优点:RocksDBLSMtree非常适合大规模写入消息的场景;缺点:实现方案比较重。如果采用这种方案,需要自己实现RocksDB的数据容灾逻辑;基于Redis,再说说Redis的方案。下面是一个更完整的解决方案。MessagesPool存储所有延迟消息,结构为KV结构,key为消息ID,value为具体消息(这里选择RedisHash结构主要是因为hash结构可以存储大量数据,并且当数据较多时,会进行渐进的rehash扩展,对于HSET和HGET命令,时间复杂度为O(1))DelayedQueue为16个有序队列(队列支持水平扩展),结构为ZSET,以及该值是消息池中的消息ID。score是过期时间(分成多个队列提高扫描速度)Worker代表处理线程,通过定时任务扫描DelayedQueue中的过期消息。在我看来,本方案选择Redis存储有几个方面的考虑。RedisZSET非常适合实现延迟队列性能问题。ZSET插入虽然是一个O(logn)的操作,但是Redis是基于内存操作的,内部做了很多性能优化。不过,这个方案其实也有一些考虑。上面的方案通过创建多个DelayedQueue满足了并发性能的要求,但是这也带来了多个DelayedQueue在多节点的情况下是如何均匀分布的,很可能会出现并发重复处理expired的情况消息,是否有必要引入分布式锁等并发控制设计?在体量不大的场景下,上述方案的架构其实可以转化为主从架构,只允许主节点处理任务,从节点仅用于容灾和备份。实施难度更小,更可控。定时线程检查的缺陷与改进上述方案中,所有过期消息都是通过线程定时扫描的方案获取的。定时线程方案在消息量较小时会造成资源浪费,而当消息量很大时,由于扫描间隔设置不合理,会导致延迟时间不准确。可以利用JDKTimer类中的思路,通过wait-notify来节省CPU资源。获取最新的延迟消息,然后等待(执行时间-当前时间),这样就不用浪费资源,时间到了自动响应。如果有新的消息进来,并且比我们等待的消息还小,那么直接notify唤醒,重新获取这个更小的消息,然后再次等待,以此类推。开源MQ中的实现方案先说说目前开源的带有延时消息功能的MQ,它们是如何实现的RocketMQ开源版的RocketMQ是支持延时消息的,但是只支持18级延时,不支持任意时间。只不过RocketMQ中可以自定义这一层,还好对于普通业务来说已经足够了。默认值为“1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h”,共18级。另外,搜索公众号Java后端栈,回复“面试”,即可获得惊喜礼包。通俗的说,设置了延迟级别的消息会被临时存储在一个名为SCHEDULE_TOPIC_XXXX的topic中,并根据级别存储在特定的队列中,queueId=delayTimeLevel-1,即一个队列只存储具有相同延迟时间的消息,确保可以顺序消费具有相同传递延迟的消息。broker会定时消费SCHEDULE_TOPIC_XXXX,将消息写入真实topic。下面是整个实现方案的示意图。红色代表延迟的消息传递,紫色代表由于scheduled调度导致的延迟消息:优点:层数固定,每一层都有自己的定时器,开销不大。同级别的消息放在同一个Queue中,保证同级别消息的顺序;不同的Level放在不同的Queue中,保证投递时间的准确性;通过只支持固定的Level,不同延迟的消息的顺序变成固定LevelTopic追加写操作的缺点:Level配置的修改代价太大,固定的Level不灵活,CommitLog会因为延迟消息的存在而变大。PulsarPulsar支持“任意时间”的延迟消息,但实现方式与RocketMQ不同。通俗地说,Pulsar的延迟消息会直接进入客户端指定的Topic,然后在堆外内存中创建一个基于时间的优先级队列来维护延迟消息的索引信息。延迟时间最短的会放在头上,延迟时间越长越靠后。在执行消费逻辑时,判断是否有消息过期需要投递。如果有则从队列中取出,根据延迟消息的索引查询对应的消息进行消费。如果一个节点挂了,这个broker节点上的Topic会被转移到其他可用的broker上,上面提到的优先级队列也会被重建。下面是Pulsar公众号中Pulsar延迟消息的示意图。乍一看,你可能会觉得这个方案其实很简单,而且还可以支持随时消息。但是这个方案有几个主要问题:内存开销:维护延迟消息索引的队列放在堆外内存中,这个队列由订阅组(Kafka中的消费者组)来维数。例如,如果你这个主题有N个订阅组,那么如果你的主题使用延迟消息,就会创建N个队列;并且随着延迟消息数量的增加和时间跨度的增加,每个队列的内存占用也会增加。(是的,在这个方案中,支持任意延迟消息可能会使这个缺陷更加严重)延迟消息索引队列在故障转移后重建时间开销:对于跨度较长的大规模延迟消息,重建时间可能会下降到小时级别。(摘自Pulsar官方公众号文章)存储开销:延迟消息的时间跨度会影响Pulsar中消耗消息数据的空间回收。比如你的topic在业务上需要支持1个月的延时消息,而你发送了1个月的延时消息,那么你的topic中的底层存储会保留整个月的消息数据。就算当月的正常消息已经消耗了99%。针对上面第一点和第二点的问题,社区也设计了解决方案,在队列中加入时间分区,Broker只将当前时间片比较近的队列加载到内存中,其余时间片分区持久化在磁盘上,如示例图所示如下图所示:但目前该方案没有对应的实现版本。在实际使用中,可以规定只使用时间跨度较小的延迟消息,以减少前两个缺陷的影响。另外,由于内存中并没有存储全量的延迟消息数据,而只存储了索引,因此可能需要数百万条延迟消息才会对内存产生重大影响。从这个角度来说,官方还没有完善前两个这个问题也是可以理解的。至于第三个问题,估计比较难解决。需要在数据存储层区分延迟消息和正常消息,将延迟消息分开存储。QMQQMQ随时提供延迟/定时消息,您可以指定在未来两年内的任何时间发送消息(可配置)。我把QMQ放在最后是因为我觉得QMQ是开源MQ中延迟消息设计最合理的。里面的设计核心很简单就是多级时间轮+延迟加载+延迟消息单独磁盘存储。QMQ的延迟/定时消息是使用两层哈希轮实现的。第一层位于磁盘上,每个小时为一个刻度(默认为一个小时,可以根据实际情况在配置中调整),每个刻度会生成一个日志文件(schedulelog),因为QMQ支持两年延迟消息(默认支持两年内,可修改配置),那么最多会生成2*366*24=17568个文件(如果要支持的最大延迟时间越短,文件越少生成)。第二层在内存中。当消息的投递时间到来时,该小时的消息索引(该索引包括消息在调度日志中的偏移量和大小)将从磁盘文件加载到内存中的哈希轮中。中的哈希轮是以500ms为尺度的。总结一下设计亮点:时间轮算法适用于延迟/定时消息场景,省去了对延迟消息的排序,插入和删除操作都是O(1)时间复杂度;通过多级时间轮设计,支持大时间跨度的Delayed消息;通过延迟加载,内存中只存储最新被消费的消息,延迟时间较长的消息存储在磁盘中,对内存友好;延迟消息单独存储(调度日志),不会影响正常消息的空间回收;本文总结了业界常见的延迟消息方案,并讨论了每种方案的优缺点。希望对读者有所启发。近期热点文章推荐:1.1000+Java面试题及答案(2022最新版)2.厉害了!Java协程来了。..3.SpringBoot2.x教程,太全面了!4.不要用爆破爆满画面,试试装饰者模式,这才是优雅的方式!!5.《Java开发手册(嵩山版)》最新发布,赶快下载吧!感觉不错,别忘了点赞+转发!
