可能乍一看我们对RocketMQ的消费者的理解很简单。它只是一个消费消息的客户端。你只需要指定对应的Topic和ConsumerGroup,剩下的只需要:接收消息和处理消息就可以了。简化消费模型当然,在实际业务场景中可能确实是这样。但是如果我们不确切知道消费者启动后会做什么,以及底层实现的一些细节,那么在面对复杂的业务场景时,我们就会像大海捞针一样一头雾水。相反,如果你了解了细节,你在排查问题的时候就会更有来龙去脉,可能会提出更多的解决方案。RocketMQ的一些基本概念和一些底层实现已经写在RocketMQ基本概念分析&源码分析一文中。如果没有相关上下文,可以先填一个部分。一个简单例子的整体逻辑首先我们从一个简单的例子来看一下RocketMQConsumer的基本使用。从使用开始,一点一点了解细节。publicclassConsumer{publicstaticvoidmain(String[]args)throwsInterruptedException,MQClientException{DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("please_rename_unique_group_name_4");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTesterListener.,"*");consumer.subscribe("TopicTesterListenernew.,"*");concurrentlyMessageListenerConsumer){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,ConsumeConcurrentlyContextcontext){System.out.printf("%sReceiveNewMessages:%s%n",Thread.currentThread().getName(),msgs);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}};consumer.start();System.out.printf("ConsumerStarted.%n");}}代码一定很难看,下面的流程图和上面的代码在逻辑上是等价的,大家可以一起看。Consumer使用示例消费点策略,除了Topic、注册消息监听器等常规内容外,setConsumeFromWhere更值得关注。终止消费者将开始消费的地方。共有三个可选值:三个可选的ConsumeFromWhere值。其实在ConsumeFromWhere枚举类的源码中还有另外三个值,但是都已经弃用了。但是这个配置只对新的ConsumerGroup有效,已有的ConsumerGroup会按照上次消费的Offset继续消费。其实很好理解。假设有1000条消息,你的服务已经消费了500条,那你启动一个新的东西重启服务,然后又从头开始消费?这不是很荒谬吗?缓存订阅的Topic信息好像只有一行consumer.subscribe("TopicTest","*")。事实上,很多事情都是在幕后完成的。这里我先为大家画一个简单的流程。subscribe_topicsubscribe函数的第一个参数就是我们需要消费的Topic,这个不用说了。第二个参数,复杂的就叫过滤表达式字符串,简单的其实就是你要订阅的消息的Tag。每条消息都有自己的标签。如果你不知道这个,你可以考虑阅读上面的文章。这里我们传*,表示订阅所有类别的消息。当然我们也可以传入tagA||标签B||tagC,表示我们只消费带有这三个标签的消息。RocketMQ会根据我们传入的两个参数构造SubscriptionData,放入一个位于内存中的ConcurrentHashMap中进行维护。简单来说,一句话,订阅的Topic会被缓存。缓存完成后,会进行一个比较关键的操作,就是开始给所有的Broker发送心跳。Consumer客户端会:消费者的名称消费类型表示通过Push或Pull方式消费消息消费模型是指集群消费(CLUSTERING)或广播消费(BROADCASTING)消费点策略是消费者的订阅如CONSUME_FROM_LAST_OFFSETA数据收集一个消费者可以收听多个主题生产者的集合。当前实例上注册的生产集合是正确的。Consumer实例启动后,也会运行Producer的相关代码。另外,如果客户端既没有配置生产者也没有配置消费者,心跳逻辑不会被执行,因为它没有意义。启动消费者实例上面说的核心逻辑其实就到这里了,下面我们会详细讨论,简单的例子到此结束。进入启动核心逻辑在启动核心入口类中,一共处理了4个状态,分别是:CREATE_JUSTRUNNINGSTART_FAILEDSHUTDOWN_ALREADY但是既然我们刚刚创建了它,那么我们就来到CREATE_JUST的逻辑,我们重点关注一下它是做什么的消费者在刚开始的时候做。检查配置的基本操作,和我们平时写的业务代码没什么区别,检查配置中的各个参数是否合法。配置项太多,就不一一赘述了。你只需要知道RocketMQ在启动的时候会对配置中的参数进行校验即可。算了,列个表吧:消费组的名字是不是空的?消费组名称不能为RocketMQ保留的名称,即DEFAULT_CONSUMER消费模型(CLUSTERING、BROADCASTING)是否配置了消费点策略(如CONSUME_FROM_LAST_OFFSET)来判断消费方式是否合法。只能是顺序消费或者并发消费。消费组的最小消费线程数和最大消费线程数是否在指定范围内。这个范围指的是(1,1000),从左到右开。还有最小值不能大于最大值的判断……等等。所以你可以看到,即使是牛X的开源框架,也会有这么繁琐、通用的业务代码。更改实例名称instanceName会从系统的配置项rocketmq.client.name中获取,如果没有配置则设置为DEFAULT。,而消费模型是CLUSTERING(默认是),它会将DEFAULT改为${PID}#${System.nanoTime()}的字符串,这里是一个例子。instanceName="90762#75029316672643"为什么要单独提这个?这相当于给每个实例一个唯一的标识符。这个唯一标识符其实非常重要。如果一个消费组的instanceName相同,可能会造成重复消费,或者消息堆积的问题,消息堆积这个点比较有意思。有时间我应该单独写一篇文章来讨论一下。但是眼尖的同学可能已经看出来了,instanceName的组成不是PID,System.nanoTime?PID可能和Docker容器宿主机的PID是一样的,这是可以理解的。System.nanoTime呢?这可以重复吗?其实从RocketMQ的Github的commit记录来看,至少到2021年3月16日,这个问题可能还存在。RocketMQ官方Github提交记录3月16日RocketMQ官方提交修复了这个问题。给大家看看有什么变化:在原来的版本中,instanceName只是由PID组成,这可能会导致不同的Consumer实例具有相同的instanceName。熟悉RocketMQ的同学有疑问。clientID不是Broker端Consumer的唯一标识吗?没错,但是clientID是由clientIP和instanceName组成的。上面说到clientIP可能是因为Docker获取的,最终会导致clientID相同。好的,这就是更改实例名称的全部内容。真没想到会说这么多。用于实例化消费者的关键变量名为mQClientFactory。接下来,消费者实例将被实例化。上面修改实例名称中提到的clientID就是在这一步初始化的。这里就不给大家列出源码了,大家要知道这个地方会实例化一个consumer就OK了,不要太纠结细节。然后会为Rebalance的实现设置一些属性,比如消费者组的名称,消息模型,Rebalance采用的策略,刚刚实例化的消费者实例等。Rebalance默认的策略是:AllocateMessageQueueAveragely是一种将MesssageQueue平均分配给消费者的策略。更详细的可以参考我上面的文章。此外,还会初始化拉取消息的核心实现PullAPIWrapper。初始化offsetStore这里会根据不同的消息模型(即BROADCASTING或CLUSTERING)实例化不同的offsetStore实现。BROADCASTING采用的实现是LocalFileOffsetStore。CLUSTERING采用的实现是RemoteBrokerOffsetStore。不同的是,LocalFileOffsetStore在本地管理Offset,而RemoteBrokerOffsetStore将offset交给Broker,用于ConsumeMessageService的原始启动。它被缓存在实例中,具体是在内存中一个名为consumerTable的concurrentHashMap中。其实源码叫做registerConsumer:registerConsumer源码,但是我觉得给大家“翻译”成缓存更合理,因为它只是把构造好的consumer实例缓存到map中,仅此而已。哦对了,我还做了一个returnfalseifitexists,表示没有注册成功。那为什么需要返回false呢?如果存在,不执行缓存逻辑?即使在外面,您也需要根据这个false抛出MQClientException?如果注册失败,为什么会抛出异常?假设你的同事A已经使用了名称consumer_group_name_for_a,在线消费消息正常运行。是的,你添加了一个需要监控MQ的函数,也用到了consumer_group_name_for_a。想一想,如果RocketMQ不校验,你注册成功了,但是你的同事A估计要骂她了:“怎么回事?怎么就已经开始重复消费了?”启动mQClientFactory这个mQClientFactory就是在实例化消费者步骤中创建的消费者实例,最后调用mQClientFactory.start()。这是最终的核心逻辑。初始化NameServer地址初始化Netty客户端进行通信初始化Netty客户端启动一堆定时任务这一堆不夸张,确实有很多,例如:在刚才的步骤中,如果没有获取到NameServer,一个定时任务task会被启动每隔一段时间拉取一次比如定时任务会启动,每隔一段时间从NameServer中拉取指定Topic的路由数据。这个路由数据具体是指和MessageQueue相关的数据,比如写队列有多少,读队列有多少,以及Topic分发的Broker的brokerName、cluster、IP地址相关的数据。这些大致称为路由数据的另一个例子是启动发送心跳的定时任务。如果没有启动心跳,Broker可能会挂掉,对吧?客户端是否需要及时杀掉掉线的Broker?所以RocketMQ有一个cleanOfflineBroker方法专门用来干这个的,关键点就是持久化offset。这里因为是CLUSTERING消费,所以会定时向Broker汇报当前consumer的消费情况。