首先,导致该问题的RocketMQ官方BUG已经在3月16日的本次提交中得到修复,这里只是讨论之前问题的具体细节修复。更多上下文可以参考我之前写的《RocketMQ Consumer 启动时都干了些啥?》。本文介绍RocketMQ的Consumer在启动后做了哪些操作。有助于理解本次要说明的BUG。里面提到:不用说了,消息积累和重复消费,你的ClientID是一样的。本文重点讨论消息堆积的原因。文中提到,当Consumer初始化的时候,Rebalance策略也会被初始化。Rebalance策略可以大致理解为如何将一个Topic下的m个MessageQueue分配给一个ConsumerGroup下的n个Consumer实例的策略。可以看出Consumer默认采用的Rebalance策略是AllocateMessageQueueAverage()。默认Rebalance策略默认策略很好理解,MessageQueue平均分配给Consumers。比如假设有8个MessageQueue和2个Consumer,那么每个Consumer会分配到4个MessageQueue。如果分布不均怎么办?比如只有7个MessageQueue,但是还有2个Consumer。此时RocketMQ会将多出来的部分平均分配给已经排序好的消费者,一个一个分配给消费者,直到分配完成。比如刚才说的7个MessageQueue和2个ConsumerGroup的情况,第一个Consumer会分配给4个MessageQueue,第二个会分配给3个MessageQueue。YoucanfirstunderstandtheimplementationofAllocateMessageQueueAveragely.Asthedefaultrebalancestrategy,itsimplementationislocatedhere:theimplementationlocationofthedefaultstrategy.ItscoreisactuallytheallocatemethodintheimplementedAllocateMessageQueueStrategyinterface.实际上,RocketMQ对该接口总共有5种实现:AllocateMachineRoomNearbyAllocateMessageQueueAveragelyAllocateMessageQueueAveragelyByCircleAllocateMessageQueueByConfigAllocateMessageQueueByMachineRoomAllocateMessageQueueConsistentHash其默认的AllocateMessageQueueAveragely只是其中的一种实现而已,那执行allocate它需要什么参数呢?入参需要以下四个:ConsumerGroup消费者ThenameofthegroupcurrentCIDTheclientIDofthecurrentconsumermqAllAlltheMessageQueuesundertheTopicconsumedbythecurrentConsumerGroupConsumerinstance,thegranularityisByTopic.Sotheresthereisverysimple,nothingmorethanhowtoallocatethisbunchofMessageQueuetothisbunchofConsumers.HowaboutthiscorrespondstodifferentimplementationsofAllocateMessageQueueStrategy.Next,let'stakealookathowAllocateMessageQueueAveragelyallocatesMessageQueue.Beforetalkingaboutthesourcecode,Iusuallygostepbystep,combiningthesourcecodewiththediagram,butthesourcecodeistooshort,soIwillgiveitdirectly.publicList
