大家好,我是君哥,RocketMQ消息消费有PULL和PUSH两种模式。今天我们就来看看这两种模式的区别。PUSH模型首先看一段RocketMQ推模型的一个官方提示示例:publicstaticvoidmain(String[]args)throwsInterruptedException,MQClientException{Tracertracer=initTracer();DefaultMQPushConsumer消费者=newDefaultMQPushConsumer("CID_JODIE_1");consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(newConsumeMessageOpenTracingHookImpl(tracer));consumer.subscribe("TopicTest","*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setConsumeTimestamp("20181109221800");consumer.registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,ConsumeConcurrentlyContextcontext){System.out.printf("%sReceiveNewMessages:%s%n",Thread.currentThread().getName(),消息);返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});消费者开始();System.out.printf("消费erStarted.%n");}consumer会定义一个消息监听器,并将这个监听器注册到DefaultMQPushConsumer,同时也注册到DefaultMQPushConsumerIm-pl,当消息被拉取时,会使用这个监听器处理消息这个监听器什么时候被调用?看下面的UML类图:消费者真正拉取请求的类是DefaultMQPush-ConsumerImpl。该类的pullMessage方法调用了PullAPIWrapper的pullKernelImpl方法。该方法有一个参数为回调函数Pull-Callback,当PULL状态为PullStatus.FOU-ND时,表示拉取消息成功,处理逻辑如下:PullCallbackpullCallback=newPullCallback(){@OverridepublicvoidonSuccess(PullResultpullResult){if(pullResult!=null){pullResult=DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(),pullResult,subscriptionData);switch(pullResult.getPullStatus()){caseFOUND://省略一些逻辑//省略一些逻辑休息;//省略其他情况default:break;}}}@OverridepublicvoidonException(Throwablee){//省略}};这个处理逻辑调用了ConsumeMessage-Service类的submitConsumeRequest方法,我们来看看并发消费消息的处理逻辑,代码如下:messageQueue,finalbooleandispatchToConsume){finalintconsumeBatchSize=this.defaultMQPushConsumer.getConsumeMessize(g)<=consumeBatchSize){ConsumeRequestconsumeRequest=newConsumeRequest(msgs,processQueue,messageQueue);}尝试{this.consumeExecutor.submit(consumeRequest);}catch(RejectedExecutionExceptione){this.submitConsumeRequestLater(consumeRequest);处理,符合上述逻辑}ConsumeRequest类是线程类,run方法调用消费者定义的消息处理方法。代码如下:publicvoidrun(){//省略逻辑MessageListenerConcurrentlylistener=ConsumeMessageConcurrentlyService.this.messageListener;//省略逻辑try{//调用消费方法status=listener.consumeMessage(Collections.unmodifiableList(msgs),context);}catch(Throwablee){//Omitlogic}//Omitlogic}下面以并发消费模式同步拉取消息为例,总结一下消费者消息处理流程:在MessageListenerConcurrently中定义消费者处理逻辑,注册到DefaultMQPushConsumer当消费者启动DefaultMQ-PushConsumerImpl消费者时,会启动消费拉取线程PullMessageService,里面的死循环不断的从Broker中拉取消息。这里调用了DefaultMQPushConsumerImpl类的pullMessage方法。DefaultMQPushConsumerImpl类的pullMessage方法调用PullAPIWrapper的pullKernelImpl方法真正发送PULL请求,传入PullCallback的回调函数。消息拉取后调用PullCallback的onSuccess方法处理结果。这里调用了ConsumeMessageConcurrentlyService的submitConsumeRequest方法,使用ConsumeRequest线程处理拉取的消息。ConsumeRequest在处理消息时,会调用消费者定义的消费逻辑,即Message-ListenerConcurrently的consumeMessage方法。PULL模式下面是一段来自官方PULL模式拉取消息的代码:.start();try{while(running){ListmessageExts=litePullConsumer.poll();System.out.printf("%s%n",messageExts);}}最后{litePullConsumer.shutdown();}这里我们看到PULL模式需要在处理逻辑中不断拉取消息,例如上面代码中写了一个死循环。PULL模式下poll函数是如何实现的?我们看下面的UML类图:从trace源码可以看出,消息拉取最终是从DefaultLitePullConsumerImpl类中的一个LinkedBlockingQueue中拉取的。什么时候将消息放入LinkedBlockingQueue?官方拉取消息代码中有一个subscribe方法,用于订阅Topic。这里相关的UML类图如下:这个subscribe方法最终调用了DefaultLite-PullConsumerImpl类的subscribe。代码如下:publicsynchronizedvoidsubscribe(Stringtopic,StringsubExpression)throwsMQClientException{try{//省略逻辑this.defaultLitePullConsumer.setMessageQueueListener(newMessageQueueListenerImpl());assignedMessageQueue.setRebalanceImpl(this.rebalanceExImpl);//ceptionlogic}eatch({thrownewMQClientException("subscribeexception",e);}}这里给DefaultLitePullConsumer类的messageQueueListener的监听器赋值。当监听器检测到MessageQueue发送的变化时,会启动线程Pull-TaskImpl来拉取消息。代码如下:publicvoidrun(){//省略一些逻辑if(!this.isCancelled()){longpullDelayTimeMills=0;尝试{PullResultpullResult=pull(messageQueue,subscriptionData,offset,defaultLitePullConsumer.getPullBatchSize());switch(pullResult.getPullStatus()){案例发现:finalObjectobjLock=messageQueueLock.fetchLockObject(messageQueue);同步){如果(pullResult.getMsgFoundList()!=null&&!pullResult.getMsgFoundList().isEmpty()&&assignedMessageQueue.getSeekOffset(messageQueue)==-1){processQueue.putMessage(pullResult.getMsgFoundList());submitConsumeRequest(newConsumeRequest(pullResult.getMsgFoundList(),messageQueue,processQueue));}}休息;//省略其他情况}}//省略catchif(!this.isCancelled()){//开始下一次拉取scheduledThreadPoolExecutor.schedule(this,pullDelayTimeMills,TimeUnit.MILLISECONDS);}else{log.warn("执行doPullTask??后取消PullTask,{}",messageQueue);}}}拉取消息成功后,调用submitConsume-Request方法将拉取的消息放入consumeRequestCache中,然后开始下一次拉取这清除了示例代码中轮询消息的逻辑。那么还有一个问题。监听器什么时候触发监听事件?当消费者启动时,将启动RebalanceService线程。该线程的run方法如下:publicvoidrun(){while(!this.isStopped()){this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}}下面的UML类图展示了doRebalance方法的调用关系:可以看到最后调了Rebalance-LitePullImpl的messageQueueChanged方法,代码如下:publicvoidmessageQueueChanged(Stringtopic,SetmqAll,SetmqDivided){MessageQueueListenermessageQueueListener=this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();如果(messageQueueListener!=null){try{messageQueueListener.messageQueueChanged(topic,mqAll,mqDivided);}catch(Throwablee){log.error("messageQueueChanged异常",e);}}}这里监听器终于被触发了。下面以并发消费模式同步拉取消息为例,总结一下消费者消息处理流程:消费者从DefaultLitePullConsumer启动并订阅Topic,这个订阅过程会向DefaultLitePullConsumer注册一个监听器。在消费者启动过程中,会启动Message-Queue再平衡线程Rebalance-Service。当rebalance进程发现ProcessQueueTable发生了变化,就会启动消息拉取线程。消息拉取线程拉取消息后,将消息放入consumeRequestCache,下次再拉取。consumer启动后,不断从consumeReq-uestCache中拉取消息进行处理。总结通过本文的解释可以看出,PUSH模式和PULL模式本质上是客户端主动拉取,RocketMQ并没有真正实现Broker推送消息的PUSH模式。RocketMQ中PULL模式和PUSH模式的区别如下:PULL模式是从Broker中拉取消息放入缓存,然后消费者不断从缓存中取消息执行客户端定义的处理逻辑,而PUSH模式是死循环不断的从Broker中拉取消息,拉取后调用回调函数进行处理,在回调函数中调用客户端定义的处理逻辑。PUSH方式的拉取消息依赖无限循环不断的触发业务,而PULL方式的拉取消息是通过MessageQueue的监听器触发消息拉取线程,拉取一次后消息拉取线程会进行下一次拉取。