当前位置: 首页 > 科技观察

关于相同RocketMQClientID导致的消息堆积问题

时间:2023-03-18 13:56:27 科技观察

首先,导致该问题的RocketMQ官方BUG已经在3月16日的本次提交中得到修复,这里只是讨论之前问题的具体细节修复。更多上下文可以参考我之前写的《RocketMQ Consumer 启动时都干了些啥?》。本文介绍RocketMQ的Consumer在启动后做了哪些操作。有助于理解本次要说明的BUG。里面提到:不用说了,消息积累和重复消费,你的ClientID是一样的。本文重点讨论消息堆积的原因。文中提到,当Consumer初始化的时候,Rebalance策略也会被初始化。Rebalance策略可以大致理解为如何将一个Topic下的m个MessageQueue分配给一个ConsumerGroup下的n个Consumer实例的策略。可以看出Consumer默认采用的Rebalance策略是AllocateMessageQueueAverage()。默认Rebalance策略默认策略很好理解,MessageQueue平均分配给Consumers。比如假设有8个MessageQueue和2个Consumer,那么每个Consumer会分配到4个MessageQueue。如果分布不均怎么办?比如只有7个MessageQueue,但是还有2个Consumer。此时RocketMQ会将多出来的部分平均分配给已经排序好的消费者,一个一个分配给消费者,直到分配完成。比如刚才说的7个MessageQueue和2个ConsumerGroup的情况,第一个Consumer会分配给4个MessageQueue,第二个会分配给3个MessageQueue。YoucanfirstunderstandtheimplementationofAllocateMessageQueueAveragely.Asthedefaultrebalancestrategy,itsimplementationislocatedhere:theimplementationlocationofthedefaultstrategy.ItscoreisactuallytheallocatemethodintheimplementedAllocateMessageQueueStrategyinterface.实际上,RocketMQ对该接口总共有5种实现:AllocateMachineRoomNearbyAllocateMessageQueueAveragelyAllocateMessageQueueAveragelyByCircleAllocateMessageQueueByConfigAllocateMessageQueueByMachineRoomAllocateMessageQueueConsistentHash其默认的AllocateMessageQueueAveragely只是其中的一种实现而已,那执行allocate它需要什么参数呢?入参需要以下四个:ConsumerGroup消费者ThenameofthegroupcurrentCIDTheclientIDofthecurrentconsumermqAllAlltheMessageQueuesundertheTopicconsumedbythecurrentConsumerGroupConsumerinstance,thegranularityisByTopic.Sotheresthereisverysimple,nothingmorethanhowtoallocatethisbunchofMessageQueuetothisbunchofConsumers.HowaboutthiscorrespondstodifferentimplementationsofAllocateMessageQueueStrategy.Next,let'stakealookathowAllocateMessageQueueAveragelyallocatesMessageQueue.Beforetalkingaboutthesourcecode,Iusuallygostepbystep,combiningthesourcecodewiththediagram,butthesourcecodeistooshort,soIwillgiveitdirectly.publicListallocate(StringconsumerGroup,StringcurrentCID,ListmqAll,ListcidAll){if(currentCID==null||currentCID.length()<1){thrownewIllegalArgumentException("currentCIDIsempty");}if(mqAll==null||mqAll.isEmpty()){thrownewIllegalArgumentException("mqAllisnullorcidAllempty");}if(cidAll==null||cidAll.isEmpty()){thrownewIllegalArgumentException("cidAllisnullorcidAllempty");}Listresult=newArrayList();//判断当前客户端是否在cidAll集合中if(!cidAll.contains(currentCID)){log.info([BUG]ConsumerGroup:{}TheconsumerId:{}notincidAll:{}",consumerGroup,currentCID,cidAll);returnresult;}//获取当前消费者在所有消费者实例数组中的位置intindex=cidAll.indexOf(currentCID);//使用messageQueue的个数对消费者实例取余number,实际计算的是没有被平分的MessageQueues的个数。//比如有12个MessageQueues和5个消费者,12%5=2intmod=mqAll.size()%cidAll.size();intaverageSize=mqAll.size()<=cidAll.size()?1:(mod>0&&index0&&index0&&index

最新推荐
猜你喜欢