DefaultMQPushConsumer流控PullMessageService根据offset从broker中拉取一批消息先放入ProccessQueue处理队列,然后将这些消息消费任务提交给线程池返回。ProcessQueue对象的主要内容是一个TreeMap和一个读写锁。TreeMap中,以MessageQueue的Offset作为Key,以消息内容的引用作为Value,保存所有从MessageQueue中获取到但尚未处理的消息;读写锁控制多个线程对TreeMap对象的并发访问。有了ProcessQueue对象,流程控制就方便灵活多了。client在每次Pullrequest前都会做几次判断,取未处理的消息数,消息的总大小,Offset的跨度,任意值,如果大小超过设置的大小,则消息会被拉一段时间后,从而达到流量控制的目的。另外,ProcessQueue还可以辅助实现顺序消费的逻辑。该代码在DefaultMQPushConsumerImpl#pullMessage方法中实现。将消息任务提交到线程池是在该方法中的PullCallback回调中进行的。ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService有两种实现,ConsumeRequest是封装的任务。defaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest消息拉取方法在经过队列加载机制后,会分配给当前消费者的一些队列。注意一个消费者组可以订阅多个主题,如上面的pullRequestQueue,topic_test和topic_test2。分配了一个队列。依次从pullRequestQueue中取出一个PullRequest对象,根据对象中的拉取偏移量向Broker发起拉取请求。默认拉取32个,可以通过pullBatchSize参数更改。该方法不仅会返回消息列表,还会返回PullRequest对象中更改下一次拉取的偏移量。收到Broker返回的消息后,会先放入ProccessQueue(处理队列)中。队列内部结构为TreeMap,key存储消息在消息消费队列(consumequeue)中的偏移量,value为具体的消息对象。然后将拉取的消息提交给消费者组内部的线程池,立即返回,并将这个PullRequest对象放入pullRequestQueue中,再取出下一个PullRequest对象,继续重复拉取消息的过程。从这里可以看出,消息的获取和消息的消费是不同的线程。消息消费组的线程池处理完一条消息后,会将消息从ProccessQueue中删除,然后将消息消费的进度报告给Broker,以便下次重启时从上次消费的位置开始消费时间。消息消费进度的提交通过上面拉取消息的过程,我们知道消息消费组的线程池处理完一条消息后,会从ProccessQueue中移除消息,并将消息消费进度上报给Broker。那么请思考下面的问题:比如处理队列中有5条消息被线程池并发消费,那么如果消息偏移量为3(3:msg3)的消息在2之前。2的消息处理完毕,如何向Broker报告消息消费进度?如果msg3提交的offset是消息消费的进度,上报完成后,如果consumer出现内存溢出等问题导致JVM异常退出,而msg1的消息还没有处理完,那就重启consumer,因为消息消费进度文件中保存的信息是msg3的消息偏移量会从msg3开始继续消费,会造成消息丢失。RocketMQ采用的方式是在处理完msg3后,将msg3从消息处理队列中移除,但是在向Broker上报消息消费进度时,会取ProceeQueue中的最小offset作为消息消费进度,即上报的消息消费进度为0但是如果出现上面的情况,也就是0、1、3已经被消费掉了,并且从ProcessQueue中移除了,那么如果我们上报的消费进度为2,如果出现内存溢出等异常情况,消费者重启,则会从消息偏移量为2的消息继续消费,msg3会被多次消费(msg3会再次从broker获取),所以RocketMQ不保证消息的重复消费,所以消费的幂等性需要通过业务方面。我们来看一下提交消费进度的流程图:为了减少消费者和Broker之间的网络交互,提高性能,在提交消息消费进度时,会先存储到本地缓存表中,然后定时上报给Broker间隔。同样,Broker会先存储本地缓存表,然后定期刷新到磁盘。
