介绍要保证消息的可靠传递,无外乎保证以下三个阶段的正常执行。生产者成功地将消息传递给代理。代理在传递过程中保留消息。消费者可以消费从代理到消息发送者的消息。重试。producer向broker发送消息后,没有收到broker的ack,rocketmq会自动重试。.重试次数可以设置,默认2次DefaultMQProducerproducer=newDefaultMQProducer(RPODUCER_GROUP_NAME);//设置同步发送的重试次数为5次producer.setRetryTimesWhenSendFailed(5);//设置异步发送的重试次数5次producer.setRetryTimesWhenSendAsyncFailed(5);消息持久化我们先来了解一下消息的存储过程。这些知识对于后面分析消费者端的消息重试非常重要。有几种与消息相关的文件。CommitLog:存放消息的元数据ConsumerQueue:存放消息在CommitLog中的索引IndexFile:可以通过MessageKey和时间间隔快速找到消息。整个消息的存储过程如下。根据消息的队列信息,有线程写入相关的ConsumerQueue(minOffset为写入的起始位置,consumerOffset为当前消费位置,maxOffset为ConsumerQueue最新写入的位置)和IndexFileConsumerfromConsumerQueue的consumerOffset读取CommitLog中当前应该消费的消息的偏移量,在CommitLog中找到对应的消息。消费成功后,将consumerOffset刷入机制“异步刷入”:将消息写入内存的PAGECACHE,返回写入成功状态,当内存中的消息量累积到一定程度时,磁盘写操作统一触发,写入速度快。吞吐量高。当磁盘损坏时,消息将丢失。“同步刷新”:消息写入内存的PAGECACHE后,立即通知刷新线程刷新磁盘,然后等待刷新完成。刷新线程执行完毕后,唤醒等待线程,向应用返回消息写入成功状态。吞吐量低,但不会造成消息丢失。主从复制如果一个broker有一个master和一个slave,它需要将master上的消息复制到slave。复制“同步复制”有两种方式:master和slave都写入如果成功,则返回clientsuccess。master挂掉后,数据不会丢失,但同步复制会增加数据写入延迟,降低吞吐量。“异步复制”:master写入成功,成功返回给client。它具有低延迟和高吞吐量,但是当master发生故障时,可能会导致数据丢失。消费者端的消息重试。顺序消息的重试。对于顺序消息,当消费者消费消息失败时,消息队列RocketMQ版本会自动不断重试消息(每次间隔1秒)。此时,应用程序将被阻塞以进行消息消费。因此,一定要做好监控工作,避免堵塞现象的发生。"顺序消息消费失败后,不会再消费下一条消息,而是会不断重试这条消息。需要考虑的是,如果后续消息跨这条消息消费,对业务逻辑有影响""顺序消息暂时只支持集群消费模式,不支持广播消费模式》无序消息的重试对于无序消息(普通消息、定时消息、延迟消息、事务消息),当消费者消费消息失败时,可以实现消息重试结果通过设置返回状态。"乱序消息的重试只对集群消费方式有效;广播方式不提供失败重试特性,即消费失败后,失败的消息不会被重试,新的消息将继续被消耗。”"消费时间过后,重试的配置有如下三个方法"ReturnAction.ReconsumeLater(recommended)ReturnNull抛出异常publicclassMessageListenerImplimplementsMessageListener{@OverridepublicActionconsume(Messagemessage,ConsumeContextcontext){//消息处理逻辑抛出异常,消息将被重试。doConsumeMessage(message);//方法一:返回Action.ReconsumeLater,消息会被重试。returnAction.ReconsumeLater;//方法二:返回null,消息会被重试。returnnull;//方法三:直接抛出异常,消息会被重试。thrownewRuntimeException("ConsumerMessageexception");}}"消费失败后不重试的配置方法"在集群消费模式下,期望消息失败后不会重试,需要捕获可能被调用的异常扔进消费逻辑,最后回到Action。CommitMessage,之后将不会重试此消息。publicclassMessageListenerImplimplementsMessageListener{@OverridepublicActionconsume(Messagemessage,ConsumeContextcontext){try{doConsumeMessage(message);}catch(Throwablee){//捕获消费逻辑中的所有异常并返回Action.CommitMessage;returnAction.CommitMessage;}//消息正常处理,直接返回Action.CommitMessage;returnAction.CommitMessage;}}"MessageRetries""RocketMQ默认允许每条消息最多重试16次,每次消费失败,发送一条延迟消息到重试队列,同一条消息失败一次。增加延迟级别一次,然后放入重试队列,如果重试16次后仍未消费成功,则消息放入死信队列。《注意:重试队列和死信队列是根据ConsumerGroup划分的》重试队列主题名:%RETRY%+consumerGroup死信队列主题名:%DLQ%+consumerGroup《重试队列和死信队列为什么要按照ConsumerGroup划分?”“因为在RocketMQ使用的时候,必须保持订阅关系一致。即一个ConsumerGroup订阅的topic和tag必须完全一致,否则可能导致消费逻辑混乱,消息丢失。”以下任何一种情况都表明订阅关系不一致。同一个ConsumerGroup下的Consumer实例订阅了不同的主题。同一个ConsumerGroup下的Consumer实例订阅了同一个topic,只是订阅的标签不同。我们可以通过控制台查看各种类型的主题消息。每次重试的时间间隔如下:重试次数与上次重试的时间间隔。第一次重试和最后一次重试的间隔第一次重试和最后一次重试的间隔110秒97分钟230秒108分钟31分钟119分钟42分钟1210分钟53分钟1320分钟64minutes1430minutes75minutes151hour86minutes162hours《上面说到RocketMQ的消息重试是通过发送定时消息到重试队列来实现的》。RocketMQ支持18级定时Delay,每级定时消息的延时时间如下。//MessageStoreConfig.javaprivateStringmessageDelayLevel="1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h";消息重试只是去除了前2级的定时消息,我们可以在消费者端设置最大消息重试次数为每次发送下一级的定时消息时小于等于16次,即重试间隔与上表所述相同。最大重试次数大于16次,超过16次的重试间隔为每次2小时。Propertiesproperties=newProperties();//配置GroupID对应的消息最大重试次数为20次,最大重试次数为字符串类型。properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");Consumerconsumer=ONSFactory.createConsumer(properties);“那么重试队列中的消息是如何被消费的呢?”消息消费者启动时,会订阅正常的topictopic定时消息和重试队列的实现逻辑也比较简单,可以归纳为以下步骤:1.发送延迟消息1.1将topic替换为SCHEDULE_TOPIC_XXXX,queueId为消息延迟级别(如果不替换topic,直接发送到对应的consumeQueue,消息会立即被消费)1.2将消息的原topic和queueId放入消息扩展属性中1.3将消息的执行时间放入tagsCode,将消息的顺序写入CommitLog,将消息对应的信息分发到对应的ConsumerQueue中(主题为SCHEDULE_TOPIC_XXXX,一共有18个队列,对应18个延迟级别)定时任务不断判断消息是否到达投递时间,未到达则进行后续投递.如果到达传递时间,消息的内容将从commitLog中拉取并重置。消息的topic和queueId是原来的(原来的topic和queueId在消息扩展属性中),然后把消息投递到commitLog。这个时候,消息会被分发到对应的队列中,然后被消费。本文转载自微信公众号“Java知堂”,可通过以下二维码关注。转载本文请联系Java石塘公众号。
