《RocketMQ技术专题》帮你梳理RocketMQ相关消费问题及原理分析总结严重后果,如重复扣款等消息重复消费场景及解决方案RocketMQ在什么情况下会出现消息重复消费?Producer重复发送场景当系统的调用链路比较长时,比如系统A调用系统B,系统B再将消息发送给RocketMQ,当系统A调用系统B时,如果系统B处理成功,但返回失败成功调用结果给系统A,系统A会尝试重新向系统B发起请求,导致系统B重复处理并向RocketMQ发起多条消息,导致重复消费。消费者重复发送场景系统B向RocketMQ发送消息时,也可能会出现与上述相同的问题。消息发送超时,导致系统B重试,导致RocketMQ重复接收消息。消费者重复发送当RocketMQ成功接收到消息并交给消费者处理时,如果消费者在消费完成后还没有来得及将offset提交给RocketMQ,或者自己关机重启,那么RocketMQ会做没有收到offset,就会认为消费失败,会重新发送消息给消费者再次消费。消费者没有立即返回成功重复消费问题的一个可能的问题:消费者消费消息时出现异常,没有返回CONSUME_SUCCESS标志。由于消息处理异常导致的消息重新消费,RocketMQ可以很好的保持消息,一定消费成功!官方的consumerMessage方法不建议抛出异常,而不是在消费失败时返回ConsumeConcurrentlyStatus.RECONSUME_LATER无论如何,不??要抛出异常。如果需要重新消费,可以返回RECONSUME_LATER主动请求重新消费。catchException根据异常来捕获业务处理的异常:consumer.registerMessageListener(newMessageListenerConcurrently(){publicConsumeConcurrentlyStatusconsumeMessage(List
*后续重启从进度开始消费lastconsumer*/CONSUME_FROM_LAST_OFFSET,@DeprecatedCONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,@DeprecatedCONSUME_FROM_MIN_OFFSET,@DeprecatedCONSUME_FROM_MAX_OFFSET,/***一个新的订阅组第一次从队列的最前面位置开始消费
*后续重启开始消费从上次消费进度*/CONSUME_FROM_FIRST_OFFSET,/***新订阅组从指定时间点开始第一次消费
*后续重启按照上次消费进度开始消费
*时间点设置见DefaultMQPushConsumer.consumeTimestamp参数*/CONSUME_FROM_TIMESTAMP,}CONSUME_FROM_LAST_OFFSET:从最后一个offset开始消费,从消费者上次消费的位置开始。如果是新消费者,要根据客户所属消费群体的情况判断。如果你所属的消费群体是新上线的,订阅的消息从来不是最早的消息,RocketMQ的设计者认为你是一个新上线的业务,你会被强制从第一条消息开始消费。如果订阅的消息已经产生了过期消息,那么从我们的客户端启动开始就开始消费了。ConsumeFromWhere该参数仅在新消费者第一次启动时有效CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始消费,CONSUME_FROM_TIMESTAMP:从某个时间开始消费。而判断是否是一个新的ConsumerGroup是在broker端判断的。消费哪个offset先存储在Consumer本地,消费offset会定时同步给broker。broker在判断是否是新的consumergroup时,就是查看broker端是否有这个consumergroup的offset记录。offsetinvalidation对于一个新的queue,这个参数也是无用的,消费从0开始。所以,这里有一个问题。我已经设置了CONSUME_FROM_LAST_OFFSET,为什么还是重复消费?可能你不是一个新的consumergroup,也可能是一个新的Queue。重试队列和死信队列的消费端永远不会返回消费结果。RocketMQ认为消息没有收到,下次consumer拉取的时候,broker还是会发送消息。任何异常都必须被捕获并返回:ConsumeConcurrentlyStatus.RECONSUME_LATERRocketMQ会被放入重试队列,TOPIC为:%RETRY%+COnsumerGroup名称。延时一定时间点后(默认10秒,业务可设置),再次Deliver给这个ConsumerGroup。而如果重复消费持续失败一定次数(默认16次),就会投递到DLQ死信队列中,此时需要人工干预。/**批量消费大小*/privateintconsumeMessageBatchMaxSize=1;/**批量拉取大小*/privateintpullBatchSize=32;consumeMessageBatchMaxSize为最大批量消费条数pullBatchSize为每次最大拉取条数privateStringonthe代理方消息延迟级别="1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h";参数是设置重试时间,即第一次1s后,第二次5s后,不改生产环境messageDelayLevel=5s5s5s5s5s5s5s5s5s5s5s5s5s5s5s5s5s5s16次后,还有一个名为:%DLQ%+consumergroup的附加主题。默认的16次是可以改的,但是只有在使用DefaultMQPullConsumer的时候才可以。DefaultMQPushConsumer不能修改这个值。consumeMessageBatchMaxSize这个大小是消费者注册的回调监听器一次处理的消息数,默认是1,不是每次拉取的消息数(默认是32),这个不要搞混了。
