大家好,我是君哥,今天给大家分享一下最近在生产环境遇到的一个RocketMQ异常:首先我们来回顾一下RockemtMQ的架构:Broker的主从节点会注册到NameServer集群,NameServer集群保存Broker相关信息。RocketMQ客户端会在本地维护topic和Broker地址的映射关系,放在MQClientInstance#brokerAddrTable中。发送消息RocketMQ客户端在发送消息时,会先根据topic从本地缓存(brokerAddrTable)中获取Broker。如果获取不到,则从NameServer集群中获取。//DefaultMQProducerImpl类私有SendKernelImpl(finalMessagemsg,finalMessageQueuemq,finalCommunicationModecommunicationMode,finalSendCallbacksendCallback,finalTopicPublishInfotopicPublishInfo,finallongtimeout)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{StringbrokerAddr=this.mQClientFactory.findBrokerAddressInPublish(}mq.getBrokerName());if(null==brokerAddr){//获取tryToFindTopicPublishInfo(mq.getTopic());brokerAddr=this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}if(brokerAddr!=null){//省略处理逻辑}thrownewMQClientException("Thebroker["+mq.getBrokerName()+"]notexist",null);}从上面的代码可以看出,如果本地缓存和名称服务器不保存Broker信息,会抛出Broker不存在的异常。这种情况的解决方法是在Broker启动时分析是否注册成功。获取消息偏移量当客户端获取消息偏移量(ConsumeOffset)时,也可能会抛出这个异常://RemoteBrokerOffsetStoreclasspubliclongreadOffset(finalMessageQueuemq,finalReadOffsetTypetype){if(mq!=null){switch(type){caseMEMORY_FIRST_THEN_STORE:caseREAD_FROM_MEMORY:{//省略实现逻辑}caseREAD_FROM_STORE:{//省略longbrokerOffset=this.fetchConsumeOffsetFromBroker(mq);//省略}默认值:break;}}return-1;}从上面的代码我们可以看出获取offset有3种方式:MEMORY_FIRST_THEN_STORE:先从内存中获取,如果获取不到,再向Broker请求;READ_FROM_MEMORY:直接从内存中获取;READ_FROM_STORE:直接从Broker请求。从Broker请求的代码如下://RemoteBrokerOffsetStore类privatelongfetchConsumeOffsetFromBroker(MessageQueuemq)throwsRemotingException,MQBrokerException,InterruptedException,MQClientException{if(null==findBrokerResult){this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult=this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),MixAll.MASTER_ID,false);}if(find=nullBrokerResult){//忽略处理逻辑}else{thrownewMQClientException("Thebroker["+mq.getBrokerName()+"]notexist",null);}}这段代码跟上一节的代码一样,在发送消息时获取Broker地址,同理,先从本地内存中获取,如果以前获取不到,就从NameServer中获取,如果是获取不到,会抛出Broker不存在的异常。其他获取偏移量的方法除了上述获取偏移量的方法外,还有3种获取偏移量的方法,在MQAdminImpl类中:searchOffset:从Broker获取Message-Queue偏移量,与上述方法类似;maxOffset:从Broker获取MessageQ-ueue的最大offset;minOffset:从Broker获取MessageQ-eue的最小offset。这些方法在源码【rocketmq-tools】模块中使用。还有一个与offset相关的方法,可以得到最早消息的存储时间。代码如下://RemoteBrokerOffsetStoreclasspubliclongearliestMsgStoreTime(MessageQueuemq)throwsMQClientException{StringbrokerAddr=this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if(null==brokerAddr){this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());brokerAddr=this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}//省略处理逻辑thrownewMQClientException("Thebroker["+mq.getBrokerName()+"]notexist",null);}该方法是获取MessageQueue中偏移量最小的消息的存储时间。这些方法在源码【rocketmq-tools】模块中使用。拉取消息正常拉取消息的核心代码如下:,this.recalculatePullFromWhichNode(mq),false);if(null==findBrokerResult){this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult=this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName)(mq),false);}if(findBrokerResult!=null){//省略处理逻辑}thrownewMQClientException("Thebroker["+mq.getBrokerName()+"]notexist",null);}从上面的代码可以看出,客户端在从Broker拉取消息之前,先从本地缓存中获取Broker地址。如果获取不到Broker地址,则从NameServer获取Broker地址,如如果获取失败,会抛出Broker不存在的异常。偏移量是非法的。如果拉取消息时返回的offset无效(OFFSET_ILLEGAL),则需要重新处理offset。客户端代码的调用关系如下:这个发生在交易消息的场景。当RocketMQ客户端从Broker拉取消息时,如果Broker返回PULL_OFFSET_MOVED,则客户端会通知Broker更新offset为nextPullOffset(上次拉取消息时broker返回)。代码如下:publicvoidupdateConsumeOffsetToBroker(MessageQueuemq,longoffset,booleanisOneway)throwsRemotingException,MQBrokerException,InterruptedException,MQClientException{FindBrokerResultfindBrokerResult=this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),MixAll.MASTER_ID,true);if(null==findBrokerResult){this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult=this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),MixAll.MASTER_ID,false);}if(findBrokerResult){//!=null省略业务代码}else{thrownewMQClientException("Thebroker["+mq.getBrokerName()+"]notexist",null);}}小结今天根据之前遇到的一个生产环境的异常日志,研究了【Thebroker[xxx]notexis的6种场景】,每个场景都差不多,先从本地缓存中获取Broker地址,如果不存在,从名称服务器获取它。出现这种情况一般有3种原因:Broker挂了,客户端定时任务会判断Broker离线,从本地缓存中移除(MQClientInstance#cleanOfflineBroker);Broker网络异常;broker有主备切换,client获取Broker地址时还没有完成切换。这些场景其实都有定时任务刷新本地缓存,看下面代码:this.updateTopicRouteInfoFromNameServer();}catch(Exceptione){log.error("ScheduledTaskupdateTopicRouteInfoFromNameServer异常",e);}}},10,this.clientConfig.getPollNameServerInterval(),TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleRunAtFixednRate(new(){@Overridepublicvoidrun(){try{MQClientInstance.this.persistAllConsumerOffset();}catch(Exceptione){log.error("ScheduledTaskpersistAllConsumerOffset异常",e);}}},1000*10,这个.clientConfig.getPersistConsumerOffsetInterval(),TimeUnit.MILLISECONDS);}
