RocketMQ有两种消费模式,集群模式和广播模式。集群模式是指RocketMQ中的一条消息只能被同一个消费者组中的一个消费者消费。如下图,Producer并发向TopicTest写入3条新消息,分别分配到3个队列MessageQueue1~MessageQueue3,然后Group中的3个Consumer分别消费一条消息:广播方式是消息在RocketMQ中会被consumergroup中的每个consumer消费一次,如下图所示:当使用RocketMQ的广播方式时,需要在consumer端进行定义。下面是一个官方的例子:publicstaticvoidmain(String[]args)throwsInterruptedException,MQClientException{DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("please_rename_unique_group_name_1");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setMessageModel(MessageModel.BROADCASTING);registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,ConsumeConcurrentlyContextcontext){System.out.printf("%s接收新消息:%s%n",Thread.currentThread().getName(),消息);返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});消费者开始();System.out.printf("BroadcastConsumerStarted.%n");}从代码中可以看出,在定义Consumer时,通过属性messageModel来指定消费模式,这里指定为BROADCASTING,即启动consumer在广播模式1Consumer启动以RocketMQ推送模式为例,看一下Consumer调用关系类图:DefaultMQPushConsumer作为启动入口类,其start方法调用DefaultMQPushConsumerImpl类的start方法,为关注这个方法。1.1在复制订阅关系启动方法中调用copySubscription方法,代码如下:privatevoidcopySubscription()throwsMQClientException{try{//复制订阅关系切换(this.defaultMQPushConsumer.getMessageModel()){caseBROADCASTING:break;caseCLUSTERING:finalStringretryTopic=MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionDatasubscriptionData=FilterAPI.buildSubscriptionData(retryTopic,SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(retryTopic,subscriptionData);;}}catch(Exceptione){thrownewMQClientException("订阅异常",e);}}这里的代码需要注意一点:cluster模式会创建一个retryTopic订阅关系,而broadcast模式不会创建这个订阅关系。也就是说广播模式不考虑重试。1.2初始化offset下面是初始化offset的代码:){caseBROADCASTING:this.offsetStore=newLocalFileOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());休息;caseCLUSTERING:this.offsetStore=newRemoteBrokerOffsetStore(this.mQClientFactory,this.defaultMQPushConsumerGroup();break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}从上面的代码可以看出广播mode使用的是LocalFileOffsetStore,也就是说offset保存在客户端本地,除了在内存中保存,还会保存在本地文件中。2消息拉取ConsumeMessageService是真正拉取消息的地方,消费者初始化的时候,他们会初始化ConsumeMessageService,这里会区分并发消息和顺序消息2.1顺序消息集群模式下需要获取processQueue的锁才能拉取消息,广播模式下可以直接拉取消息没有获得锁。判断逻辑如下://ConsumeMessageOrderlyService.ConsumeRequestfinalObjectobjLock=messageQueueLock.fetchLockObject(this.messageQueue);synchronized(objLock){if(MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()c(QueueModel.proc)|.isLocked()&&!this.processQueue.isLockExpired())){}}这里有个问题。对于顺序消息,需要加锁,保证一个processQueue只能被一个线程处理,从而保证消费顺序。那么对于广播模式,为什么不获取processQueue的锁呢?广播模式不支持顺序消息吗?2.2并发消息对于并发消息,广播方式的区别在于对消费结果的处理。集群模式消费失败后,需要将消息发回Broker等待再次拉取,而广播模式不需要重试。代码如下://ConsumeMessageConcurrentlyService.rocessConsumeResultswitch(this.defaultMQPushConsumer.getMessageModel()){案例广播:for(inti=ackIndex+1;imsgBackFailed=newArrayList(consumeRequest.getMsgs().size());对于(inti=ackIndex+1;imqSet=this.topicSubscribeInfoTable.get(topic);if(mqSet!=null){布尔值已更改=this.updateProcessQueueTableInRebalance(topic,mqSet,isOrder);//省略部分分发}else{}break;}caseCLUSTERING:{SetmqSet=this.topicSubscribeInfoTable.get(topic);ListcidAll=this.mQClientFactory.findConsumerIdList(topic,consumerGroup);//省策略部分分发if(mqSet!=null&&cidAll!=null){//省策略部分分发try{allocateResult=strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);}catch(Throwablee){返回;}SetallocateResultSet=newHashSet();if(allocateResult!=null){allocateResultSet.addAll(allocateResult);}booleanchanged=this.updateProcessQueueTableInRebalance(topic,allocateResultSet,isOrder);//省略一些逻辑}break;默认值:中断;}}在调用上面的updateProcessQueueTableInRebalance方法之前,需要获取需要消费的MessageQueue集合。获取订阅的Topic下的所有集合元素,而在集群模式下,需要通过负责均衡获取当前消费者要消费的MessageQueue集合。4小结本文主要讲解RocketMQ广播消息的实现机制。要理解广播消息,必须掌握以下几点:1.偏移量存储在消费者本地内存和文件中;2.广播消息不支持重试;3.从源码来看,广播模式不支持顺序消息;4、广播模式消费者订阅该Topic下的所有MessageQueue,不会rebalance。