大家好,我是君哥,今天来聊聊RocketMQ的广播消息实现机制。RocketMQ有两种消费模式,集群模式和广播模式。集群模式是指RocketMQ中的一条消息只能被同一个消费者组中的一个消费者消费。如下图,Producer并发向TopicTest写入3条新消息,分别分配到3个队列MessageQueue1~MessageQueue3,然后Group中的3个Consumer分别消费一条消息:广播方式是消息在RocketMQ中会被consumergroup中的每个consumer消费一次,如下图所示:当使用RocketMQ的广播方式时,需要在consumer端进行定义。下面是一个官方的例子:publicstaticvoidmain(String[]args)throwsInterruptedException,MQClientException{DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("please_rename_unique_group_name_1");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setMessageModel(MessageModel.BROADCASTING);registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,ConsumeConcurrentlyContextcontext){System.out.printf("%s接收新消息:%s%n",Thread.currentThread().getName(),消息);返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});消费者开始();系统.out.printf("BroadcastConsumerStarted.%n");}从代码中我们可以看出,在定义Consumer时,通过messageModel属性指定消费模式。这里指定为BROADCASTING,同样启动广播模式消费者1.消费者启动以RocketMQ推模式为例,看消费者调用关系类图:DefaultMQPushConsumer为启动入口类,其start方法调用DefaultMQPushConsumerImpl类的启动方法。下面重点说一下这个方法。(1)复制订阅关系启动方法中调用了copySubscription方法,代码如下::休息;caseCLUSTERING:finalStringretryTopic=MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionDatasubscriptionData=FilterAPI.buildSubscriptionData(retryTopic,SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(retryTopic,subscription;Data);默认值:中断;}}catch(Exceptione){thrownewMQClientException("订阅异常",e);}}这里的代码需要注意一点:cluster模式会创建一个retryTopic订阅关系,而broadcast模式则不会创建这个订阅关系。也就是说广播模式不考虑重试。(2)初始化offset下面是初始化offset的代码:()){caseBROADCASTING:this.offsetStore=newLocalFileOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());休息;caseCLUSTERING:this.offsetStore=newRemoteBrokerOffsetStore(this.mQClientFactory,this.defaultMQPushConsumerGroup(getConsumerGroup));休息;默认值:中断;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}从上面的代码可以看出,广播方式使用的是LocalFileOffsetStore,也就是说offset是保存在客户端本地的,除了在内存中会保存在文件,它也会保存在本地文件中。2.消息拉取ConsumeMessageService是真正拉取消息的地方。消费者初始化的时候会初始化ConsumeMessageService,这里会区分并发消息和顺序消息。(1)顺序消息在集群模式下,需要获取processQueue的锁来拉取消息,而在广播模式下,可以不获取锁直接拉取消息。判断逻辑如下://ConsumeMessageOrderlyService.ConsumeRequestfinalObjectobjLock=messageQueueLock.fetchLockObject(this.messageQueue);synchronized(objLock){if(MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()c(QueueModel.proc)|.isLocked()&&!this.processQueue.isLockExpired())){}}这里有个问题。对于顺序消息,需要加锁,保证一个processQueue只能被一个线程处理,从而保证消费顺序。那么对于广播模式,为什么不获取processQueue的锁呢?广播模式不支持顺序消息吗?(2)并发消息对于并发消息,广播方式的区别在于对消费结果的处理。集群模式消费失败后,需要将消息发回Broker等待再次拉取,而广播模式不需要重试。代码如下://ConsumeMessageConcurrentlyService.rocessConsumeResultswitch(this.defaultMQPushConsumer.getMessageModel()){案例广播:for(inti=ackIndex+1;imsgBackFailed=newArrayList(consumeRequest.getMsgs().size());对于(inti=ackIndex+1;imqSet=this.topicSubscribeInfoTable.get(topic);if(mqSet!=null){布尔值已更改=this.updateProcessQueueTableInRebalance(topic,mqSet,isOrder);//省略部分分发}else{}break;}caseCLUSTERING:{SetmqSet=this.topicSubscribeInfoTable.get(topic);ListcidAll=this.mQClientFactory.findConsumerIdList(topic,consumerGroup);//省策略部分分发if(mqSet!=null&&cidAll!=null){//省策略部分分发try{allocateResult=strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);}catch(Throwablee){返回;}SetallocateResultSet=newHashSet();if(allocateResult!=null){allocateResultSet.addAll(allocateResult);}booleanchanged=this.updateProcessQueueTableInRebalance(topic,allocateResultSet,isOrder);//省略一些逻辑}break;默认值:中断;}}在调用上面的updateProcessQueueTableInRebalance方法之前,需要获取需要消费的MessageQueue集合。获取订阅的Topic下的所有集合元素,而在集群模式下,需要通过负责均衡获取当前消费者要消费的MessageQueue集合。4.小结本文主要讲解RocketMQ广播消息的实现机制。要理解广播消息,必须掌握以下几点:1.偏移量存储在消费者的本地内存和文件中。2.广播消息不支持重试。3.从源码来看,广播模式不支持顺序消息。4.广播模式的消费者订阅Topic下的所有MessageQueue,不会再平衡。