前天发完版本终于休息了,又遇到了一个问题。项目组反映RocketMQ的一个消费组的一条消息被消费了两次,但是两次之间的时间间隔超过10小时,现象如下图所示:这是为什么?两者相差10多个小时。这条消息是不是重复消费了16次,但是没有从日志中打印出来?发出了16个留言问题,只打印了两条留言。从日志来看,应该不会重复消费16次。基于不轻易相信日志的原则,我觉得应该去RocketMQ服务器查看这条消息存储的重试次数,从而推断消息是否消费失败,重试消费。正好项目组也提供了消息的Key。根据消息key和发送主题名称查找消息,只能找到一条消息:说明发送者确实只发送了一条消息。当时服务集群没有发现异常,consumer没有重启,queue没有rebalance,不满足RocketMQ会重复向客户端推送消息的场景,所以基本可以断定是它是消息消费失败重启导致的,但是RocketMQ的消息消费重试是间隔重试延时的。通常第一次重试只需要10秒就可以重试。不应该是10个小时吗?为此,我特意根据KeyRetry主题搜索了主题SCHEDULE_TOPIC_XXXX和%RETRY%,发现只有一条关于这个key的信息,说明这只是第一次重试,如下图:但是从这里可以看到,消息写入SCHEDULE_TOPIC_XXXX的时间是2022-05-2722:26:20,然后10s后通过调度机制进入重试topic开始消费,所以执行了第一次重试。扩展机制知识:RocketMQ并发消费的Consumer会在客户端消费失败后向服务端发送ACK,根据重试次数进入不同SCHEDULE_TOPIC_XXXX主题的队列。每个队列所代表的延迟时间不同,经过一定的延迟一定时间后再次调度到消费者组的重试topic会再次被消费者消费,实现定时重试,提高成功率重试。那为什么第一次消费是12:51,为什么这么久才进入Broker的SCHEDULE_TOPIC_XXXX?这就是问题的症结所在。我平时不太注意这个细节,所以接下来要分析这段代码,代码详细定义在ConsumeMessageConcurrentlyService的processConsumeResult方法中。原来如果client向Broker消费ACK失败,会加入msgBack失败队列,重新提交给consumer端消费,而这条消息的位置是不会提交的,因为有一个关键代码:consumeRequest.getMsgs().removeAll(msgBackFailed);longoffset=consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if(offset>=0&&!consumeRequest.getProcessQueue().isDropped()){这个。defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(),offset,true);}RocketMQ消费站点采用最小站点提交,只要本地处理队列中存在消息,站点就不会提交,会触发消息积压。但尴尬的是,这个话题的留言很少,通过积压也没有发现问题。结合现象和代码来看,原因是client一直消费失败,但是一直向Broker提交ACK失败,直到晚上22:26,才触发重新消费。现在的关键是为什么ACK失败了?这次比较遗憾,因为项目组用的是容器,而这段代码没有采集日志集群,导致无法查看错误日志,所在的集群是正常的。等我分析完后,我会和你分享这个问题。
