1背景最近在开发新项目时遇到一个比较有意思的问题,如何控制SpringBoot项目中RocketMQ消费线程数。如何设置单个主题消费线程的最小数量和最大数量,以区分不同主题的不同吞吐量。先介绍一下RocketMQ消息监听,再讲解RocketMQ消费线程。2RocketMQ消息监听设置组组组为组组组组组消费者设置设置消费者设置设置我的消费者组");5consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);6consumer.subscribe("TopicTest","*");7consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");8consumer.registerMessageListener(newMessageListenerConcurrently(){9@Override10publicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,11ConsumeConcurrentlyContextcontext){12System.out.printf("%s接收新消息:%s%n",Thread.currentThread().getName(),msgs);13返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS;14}15});16consumer.start();17System.out.printf("ConsumerStarted.%n");18}19}3RocketMQ中的连接结构图4Consumer监听接口:org.apache.rocketmq.client。consumer.listener.MessageListener有两个子接口:-顺序消费:MessageListenerOrderly-并发消费:MessageListenerConcurrently4.1MessageListenerConcurrently作用:消息并发消费的消费者监听器比如快速启动中,就是使用的并发消费消息监听器:1consumer.registerMessageListener(newMessageListenerConcurrently(){2@Override3publicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,4ConsumeConcurrentlyContextcontext){5System.out.printf("%sReceiveNewMessages:%s%n",Thread.currentThread().getName(),msgs);6returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;7}8});该方法的返回值是一个枚举:1packageorg.apache.rocketmq.client.consumer.listener;23/**4*并发消费mq消息结果5*/6publicenumConsumeConcurrentlyStatus{78/**9*消费成功10*消费成功11*/12CONSUME_SUCCESS,1314/**15*消费失败,稍后尝试消费16*消费失败,稍后尝试消费17*18*19*如果{@linkMessageListener}返回的消费结果为RECONSUME_LATER,则需要将这些消息发送给BrokerfordelayMessage20*如果向broker发送消息失败,则延迟5s后提交到线程池消费。21*22*RECONSUME_LATER消息发送入口:MQClientAPIImpl#consumerSendMessageBack,23*命令码:{@linkorg.apache.rocketmq.common.protocol.RequestCode#CONSUMER_SEND_MSG_BACK}24*/25RECONSUME_LATER;26}画外音:目前,我们正在在具体开发中,肯定不会直接使用这种方法来写消费者。常用的消费者实现有:推式消费者:DefaultMQPushConsumer4.2MessageListenerOrderly作用:消息顺序消费的消费者监听器5消费者线程池5.1DefaultMQPushConsumer作用:推式消费者消费者5.2注册并发消息监听器org.apache.rocketmq。client.consumer.DefaultMQPushConsumer#registerMessageListener使用该方法注册消息监听器时,事件消息监听器实际上会被设置为org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#messageListenerInner属性。5.3设置消费者消费服务有两种选择:并发消费服务和顺序消费服务消费者启动时,会根据MessageListener的具体实现类型来判断:MessageListener有两种类型:并发和顺序,所以也有两种类型的服务。1publicsynchronizedvoidstart()throwsMQClientException{2switch(this.serviceState){3caseCREATE_JUST:45//省略部分代码.........67//根据注册的监听类型[并发消息监听器/有序执行消息监听器]来决定使用哪个消费服务。8if(this.getMessageListenerInner()instanceofMessageListenerOrderly){9this.consumeOrderly=true;10this.consumeMessageService=newConsumeMessageOrderlyService(this,(MessageListenerOrderly)this.getMessageListenerInner());11}elseif(this.getMessageListenerInner()instanceofMessageListenerConcurrently){12this.consumeOrderly=false;13this.consumeMessageService=newConsumeMessageConcurrentlyService(这个,(MessageListenerConcurrently)14}15this.consumeMessageService.start();1617//省略部分代码......................18this.serviceState=ServiceState.RUNNING;19break;20caseRUNNING:21caseSTART_FAILED:22caseSHUTDOWN_ALREADY:23thrownewMQClientException("ThePushConsumerservicestatenotOK,maybestartedonce");24default:25break;26}2728//省略部分代码........29}如果使用并发消费,使用ConsumeMessageConcurrentlyService:实例化时会创建一个线程池:1//无界队列,不可配置Capacity。DefaultMQPushConsumer#consumeThreadMax配置没有意义。2this.consumeRequestQueue=newLinkedBlockingQueue();3this.consumeExecutor=newThreadPoolExecutor(4this.defaultMQPushConsumer.getConsumeThreadMin(),//默认MaxsugetConsumeThreadMin()205this.defaultMQPushConsumer.(),//默认6461000*60,7TimeUnit.MILLISECONDS,8this.consumeRequestQueue,9newThreadFactoryImpl("ConsumeMessageThread_"));消费线程池参数:默认最小消费线程数20默认最大消费线程数64keepAliveTime=60*1000单位:秒队列:newLinkedBlockingQueue<>()无界队列线程名:前缀为:ConsumeMessageThread_注意:因为线程池使用的是无界队列,所以设置的最大线程数其实没有意义5.4修改线程池的线程数上面我们已经知道了,设置线程池的最大线程数是没用的。那么我们其实可以通过设置线程池的最小线程数来修改消费者消费消息时线程池的大小。1publicstaticvoidmain(String[]args)throwsInterruptedException,MQClientException{2DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("please_rename_unique_group_name_4");34消费者.setConsumeThreadMin(30);5消费者.setConsumeThreadMax(64);67消费者.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);8consumer.subscribe("TopicTest","*");9consumer.registerMessageListener(newMessageListenerConcurrently(){1011@Override12publicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,13ConsumeConcurrentlyContextcontext){14System.out.printf("%s接收新消息:%s%n",Thread.currentThread().getName(),msgs);15returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;16}17});18consumer.start();19System.out.printf("ConsumerStarted.%n");20}注意:消费者meThreadMin如果大于64,还需要设置consumeThreadMax参数,因为这里有个验证:-修改线程池的线程数-SpringBoot版本如果consumer是使用springboot集成的,可以设置数量像这样的消费者线程: