大家好,我是君哥。今天分享RocketMQ中的一个关键知识点,消费者启动流程。在大多数消息队列中,消费者与Broker通信有两种方式,PUSH模式和PULL模式:PUSH模式:Broker主动向订阅的消费者推送消息。PULL模式:消费者主动从Broker拉取消息。注意RocketMQ并没有真正实现PUSH模式,RocketMQ中的PUSH。该模式本质上是PULL模式,只是consumer端封装了轮询过程,相当于启动一个定时线程不断从Broker拉取消息,拉取消息后唤醒本地业务线程处理消息.本文讲解PULL模式的启动过程。涉及到的启动过程如下:先看下图:从图中可以看出,消费者需要注册到NameServer,拉取消息时,可以从Broker主节点拉取,也可以从Broker从节点拉取节点选择。在RocketMQ的源码中,pull模式下有两个consumer相关的类,其中DefaultMQPullConsumer类已经被废弃,官方推荐使用DefaultLitePullConsumer类。以下代码来自官方示例:litePullConsumer.setConsumeFromWhere(ConsumeFromsubWhere.CONSUME_FROM_FIRST"TOFFSET(ConsumeFromsubWhere.CONSUME_FROM_FIRST"TOFFSET);litePull);//启动方法litePullConsumer.start();try{while(running){//这里可以看到在PULL模式下,消费者需要业务代码主动拉取消息ListmessageExts=litePullConsumer.poll();System.out.printf("%s%n",messageExts);}}最后{litePullConsumer.shutdown();}}上面代码中,消费者属于消费组lite_pull_consumer_test,订阅了topic[TopicTest]标签下的所有topic。让我们来看看如何开始。下图是消费者启动过程中的类调用关系图。图中中央的pullRequestQueue是核心。拉取请求会先发送到这个队列,然后循环拉取处理。检查启动配置当消费者启动时,它会首先检查配置。检查的配置项如下:消费组名称是否合法。检查项包括【不为空】、【长度小于等于255】、符合正则表达式【^[%|a-zA-Z0-9_-]+$】、【不等于"DEFAULT_CONSUMER"].消息模式不能为空,包括集群模式和广播模式。MessageQueue负载策略不能为空,包括:平均分配策略、round-robin分配策略、自定义分配策略、按机房平均分配策略、按最近机房分配策略、一致性HASH策略。在长轮询模式下,Consumer连接挂起时间不小于长轮询模式下Broker挂起时间。Broker暂停时间默认为20s,官方不建议修改。这部分源代码参见DefaultLitePullConsumerImpl#checkConfig。修改消费者实例名如果是集群模式,将实例名改为【进程ID+"#"+系统时间(纳秒)】,代码如下://ClientConfigclasspublicvoidchangeInstanceNameToPID(){if(this.instanceName.equals("DEFAULT")){this.instanceName=UtilAll.getPid()+"#"+System.nanoTime();}初始化MQ客户端创建MQClientInstance实例,然后将消费者注册到MQClientInstance。privatevoidinitMQClientFactory()抛出MQClientException{this.mQClientFactory=MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer,this.rpcHook);booleanregisterOK=mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(),this);if(!registerOK){this.serviceState=ServiceState.CREATE_JUST;thrownewMQClientException("消费组["+this.defaultLitePullConsumer.getConsumerGroup()+"]之前已经创建,请指定其他名称。"+FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}}初始化负载均衡器初始化RebalanceLitePullImpl实例,并为以下参数赋值:消费者名称。消息模型。MessageQueue负载均衡策略。MQ客户端,上一节初始化的MQClientInstance实例。负载均衡线程启动后,默认每20s进行一次负载均衡,见如下代码//RebalanceServiceclasspublicvoidrun(){while(!this.isStopped()){//waitInterval默认为20s,你可以配置this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}}初始化WrapperPullAPIWrapper这个Wrapper类是MQ-ClientInstance类的Wrapper类。类中的pullKernelImpl方法修饰了MQClientInstance类中的pullMessage方法。该装饰类主要增加以下功能:获取Broker地址。检查RocketMQ版本。如果Broker是从节点,则将sysFlag标志偏移量的位更改为0,(偏移量0x1)。封装请求头。获取filterServer地址(如果consumer通过filterServer从Broker拉取消息,这里是一个随机的filterServer地址)。代码如下://PullAPIWrapperpublicPullResultpullKernelImpl(//省略所有参数)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{//1.获取Broker地址FindBrokerResultfindBrokerResult=this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerAddressWodeFromBrokerProker()(mq),假);//省略从名称服务器更新本地Broker缓存逻辑if(findBrokerResult!=null){{//2.检查RocketMQ版本if(!ExpressionType.isTagType(expressionType)&&findBrokerResult.getBrokerVersion()