大家好,我是君哥,今天来聊聊RocketMQ的顺序消息实现机制。在某些场景下,使用MQ需要保证消息的顺序。比如在一个电商系统中,用户提交、支付、订单发货这三个消息应该是有顺序的,如下图所示:对于RocketMQ来说,主要是由Producer和Consumer来保证消息的顺序。1、Producer下面代码是Producer发送顺序消息的官方提示示例:publicstaticvoidmain(String[]args)throwsUnsupportedEncodingException{try{DefaultMQProducerproducer=newDefaultMQProducer("please_rename_unique_group_name");生产者.start();String[]tags=newString[]{"TagA","TagB","TagC","TagD","TagE"};for(inti=0;i<100;i++){intorderId=i%10;Messagemsg=newMessage("TopicTestjjj",tags[i%tags.length],"KEY"+i,("HelloRocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResultsendResult=producer.send(msg,newMessageQueueSelector(){@OverridepublicMessageQueueselect(Listmqs,Messagemsg,Objectarg){整数id=(整数)arg;intindex=id%mqs.size();返回mqs.get(index);}},orderId);System.out.printf("%s%n",sendResult);}producer.shutdown();}catch(MQClientException|RemotingException|MQBrokerException|InterruptedExceptione){e.printStackTrace();}}与发送并发消息的区别在于发送消息时传入了MessageQueueSelector,这里可以指定消息发送到一个固定的MessageQueue注意:以上代码会将相同orderId的消息发送到同一个MessageQueue,这样对orderId相同的消息进行排序,也称为偏序。另一种对应的是全局排序,要求所有的消息都发送到同一个MessageQueue。下面再来看一下发送的代码:privateSendResultsendSelectImpl(Messagemsg,MessageQueueSelectorselector,Objectarg,finalCommunicationModecommunicationMode,finalSendCallbacksendCallback,finallongtimeout)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{//省战略部分分发topicPublishtopicPublishInfo=this.tryToFindTopicPublishInfo(msg.getTopic());if(topicPublishInfo!=null&&topicPublishInfo.ok()){MessageQueuemq=null;尝试{ListmessageQueueList=mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());消息userMessage=MessageAccessor.cloneMessage(msg);StringuserTopic=NamespaceUtil.withoutNamespace(userMessage.getTopic(),mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);mq=mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList,userMessage,arg));}catch(Throwablee){thrownewMQClientException("选择消息队列抛出异常。",e);}//省略一些逻辑if(mq!=null){returnthis.sendKernelImpl(msg,mq,communicationMode,sendCallback,null,timeout-costTime);}else{thrownewMQClientException("selectmessagequeuereturnnull.",null);}}//省略一些逻辑}可以看到在发送的时候,使用MessageQueueSelector选择一个MessageQueue,然后向这个MessageQueue发送消息,对于并发消息,这里没有传递MessageQueueSelector。如果发送方式没有指定MessageQueue,则按照默认策略选择一个。2.Consumer以RocketMQ的推送方式为例,Consumer会注册一个监听器来拉取消息并消费。下面的UML类图展示了调用关系:上图包含顺序消息和并发消息。处理。其中MessageListenerOrderly和ConsumeMessageOrderlyService处理顺序消息。与并发消息不同的是,顺序消息定义了一个MessageQueueLock类,里面存放着每个MessageQueue对应的锁。代码如下:privateConcurrentMapmqLockTable=newConcurrentHashMap();下面的代码是官方给出的顺序消费的例子:consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_subicconsumer.subiconest_OFFmer")TagA||TagC||TagD");consumer.registerMessageListener(newMessageListenerOrderly(){AtomicLongconsumeTimes=newAtomicLong(0);@OverridepublicConsumeOrderlyStatusconsumeMessage(Listmsgs,ConsumeOrderlyContextcontext){context.setAutoCommit(;System.out.printf("%s接收新消息消息:%s%n",Thread.currentThread().getName(),msgs);this.consumeTimes.incrementAndGet();if((this.consumeTimes.get()%2)==0){returnConsumeOrderlyStatus.SUCCESS;}elseif((this.consumeTimes.get()%5)==0){context.setSuspendCurrentQueueTimeMillis(3000);返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}返回ConsumeOrderlyStatus.SUCCESS;}}。开始();System.out.printf("ConsumerStarted.%n");}再来看看时序消息的消费端处理逻辑(一)注册和监听上面的代码定义了时序消息监听器MessageListenerOrderly,并注册到DefaultMQPushConsumer,这个Registration也注册到了DefaultMQPushConsumerImpl。(2)PushConsumer初始化DefaultMQPushConsumerImpl类在初始化的时候会判断注册的MessageListener是否为MessageListenerOrderly,如果是则将consumeOrderly变量设置为true来标记是顺序消息拉取还是并发消息拉取。然后将ConsumeMessageService初始化为ConsumeMessageOrderlyService。(3)锁mq保证消息的顺序,需要保证同一个MessageQueue只能被同一个Consumer消费。ConsumeMessageOrderlyService在初始化时,会启动一个定时任务,周期性(默认20s)向Broker发送锁消息(请求类型为LOCK_BATCH_MQ)。Broker收到后会绑定MessageQueue、group和clientId,这样其他client就无法从这个MessageQueue中拉取消息。注意:broker的锁有一个过期时间,默认为60s,可以配置。锁过期后,可能会被其他Consumer消费。Broker端的锁结构如下图所示:(4)当拉取消息消费者启动时,消费者拉取线程PullMessageService被启动,其中的死循环不断的从Broker中拉取消息。这里调用了DefaultMQPushConsumerImpl类的pullMessage方法。这里拉取消息的逻辑和并发消息的逻辑是一样的。消息拉取后调用PullCallback的onSuccess方法处理结果。这里调用了ConsumeMessageOrderlyService的submitConsumeRequest方法,在线程池中提交ConsumeRequest线程。PullCallbackpullCallback=newPullCallback(){@OverridepublicvoidonSuccess(PullResultpullResult){if(pullResult!=null){pullResult=DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(),pullResult,subscriptionData);.getPullStatus()){caseFOUND://省略if(pullResult.getMsgFoundList()==null||pullResult.getMsgFoundList().isEmpty()){}else{//省略booleandispatchToConsume=processQueue.putMessage(pullResult.getMsgFoundList());DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);//省略了;省略}}}//省略};上面获取消息后,先将消息放入ProcessQueue,然后调用submitConsumeRequest方法。与并发消息处理方法不同,submitConsumeRequest方法不处理取回的消息,而是实际从ProcessQueue中处理。(5)处理消息处理消息的逻辑在ConsumeMessageOrderlyService的内部类ConsumeRequest中,是一个线程类,run方法如下:publicvoidrun(){//省略部分逻辑//1.获取MessageQueueLock对应的锁finalObjectobjLock=messageQueueLock.fetchLockObject(this.messageQueue);synchronized(objLock){if(MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())||(this.processQueue.isLocked()&&!this.processQueueed.isLock())){finallongbeginTime=系统.currentTimeMillis();for(booleancontinueConsume=true;continueConsume;){//省略延迟执行的逻辑finalintconsumeBatchSize=ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();//2.从processQueue中拉取消息Listmsgs=this.processQueue.takeMessages(consumeBatchSize);如果(!msgs.isEmpty()){finalConsumeOrderlyContextcontext=newConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus状态s=空;//省略部分逻辑booleanhasException=false;尝试{//3。获取处理锁this.processQueue.getConsumeLock().lock();//4。执行消费处理逻辑status=messageListener.consumeMessage(Collections.unmodifiableList(msgs),context);}catch(Throwablee){log.warn(String.format("consumeMessageexception:%sGroup:%sMsgs:%sMQ:%s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,消息队列),e);有异常=真;}最后{//5。释放处理锁this.processQueue.getConsumeLock().unlock();}//省略一些逻辑continueConsume=ConsumeMessageOrderlyService.this.processConsumeResult(msgs,status,context,this);}else{continueConsume=false;}}}else{//省略一些逻辑ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue,this.processQueue,100);}}}总结以上代码,Consumer消费消息的逻辑如下:ForMessageQueueLock被锁住,这样只有一个线程在处理当前的MessageQueue,从ProcessQueue中拉取一批消息。获取ProcessQueue锁,保证只有当前线程可以处理消息,也可以防止Rebalance线程移除当前处理的MessageQueue。执行消费处理逻辑。释放ProcessQueue处理锁;6.processConsumeResult方法更新消息偏移量。注意:ProcessQueue中的锁是ReentrantLock。3、重试不同于并发消息。顺序消息消费失败后,消息不会发送给Broker,而是直接在Consumer端重试。如果重试次数超过最大重试次数(16次),则发送给Broker,Broker将消息推入死信队列。如下图所示:4.总结一下RocketMQ顺序消息的原理就是将Producer端需要保证顺序的一批消息发送到同一个MessageQueue中,Consumer端使用锁机制来保证消息消费的顺序。Broker端使用MessageQueue进行Locking,保证同一个MessageQueue只能被同一个Consumer消费。根据实现原理可以看出,RocketMQ的顺序消息可能存在两个陷阱:顺序消息需要发送到同一个MessageQueue,这可能会导致单个MessageQueue中的消息量很大,以及consumer在消费的时候只能在单线程消费。很可能导致当前MessageQueue上有消息积压。如果顺序消息的MessageQueue所在broker挂了,Producer只能将消息发送到其他Brokers的MessageQueue,如果新的MessageQueue被其他Consumers消费,则两个Consumers消费消息的顺序无法保证。如下图所示:Broker1发生故障,将订单发货消息发送给Broker2,由Consumer2消费。消息的顺序可能是乱序的。