当前位置: 首页 > 后端技术 > Java

RocketMQ学习14-消息的PUSH和PULL消费方式

时间:2023-04-01 15:35:46 Java

虽然RocketMQ中有PUSH和PULL两种消费方式,但实现机制其实是PULL模式。PUSH模式是伪推送,是对PULL模式的封装,每一批消息拉取后,提交给消费者端的线程池(异步),然后立即从Broker拉取消息,这达到类似“推”的效果。下面是消息拉取的示意图:在RocketMQ中的大多数场景下,通常会选择PUSH模式,具体原因在下文进行说明。下面介绍两者的关系。1.什么是PUSH和PULL?PULL表示客户端主动向服务器请求并拉取数据。二、PUSH和PULL的特点PUSH的特点之一是及时,一旦有数据,服务端会立即将数据推送给客户端;对客户端更友好,无需处理没有数据的情况;但是服务端并不知道客户端的处理能力,如果客户端的处理能力低,就会造成客户端消息堆积的问题。PULL是因为客户端主动去服务端拉取数据,所以不存在消息堆积问题;但有数据时客户端无法感知,拉取时间间隔不易控制,间隔长时消息消费不及时;间隔时间短会出现无效的pullrequest。PULL模式下,为了保证实时消费,采用长轮询消息服务器拉取消息。客户端每隔一定时间向服务器发起一次请求。如果有数据,它将被检索以供消费。如果服务器没有数据,客户端线程会阻塞,阻塞时间为15S,有数据时会被唤醒。长轮询还是由消费者发起,所以即使broker端有大量数据,也不会主动推送给消费者。长轮询的实现在PullRequestHoldService类中。三、PUSH和PULL的实现首先看一个PULL的使用例子:通用.message.MessageExt;导入org.apache.rocketmq.common.message.MessageQueue;导入java.util.HashMap;导入java.util.List;导入java.util.Map;导入java.util.Set;导入java。util.concurrent.CountDownLatch;导入java.util.concurrent.TimeUnit;公共类PullConsumerTest{publicstaticvoidmain(String[]args)throwsException{Semaphoresemaphore=newSemaphore();线程t=新线程(新任务(信号量));t.开始();CountDownLatchcdh=newCountDownLatch(1);try{//程序运行120scdh.await(120*1000,TimeUnit.MILLISECONDS);}最后{semaphore.running=false;}}/***消息拉取核心实现逻辑*/staticclassTaskimplementsRunnable{Semaphores=newSemaphore();公共任务(信号量s){this.s=s;}publicvoidrun(){try{DefaultMQPullConsumerconsumer=newDefaultMQPullConsumer("dw_pull_consumer");consumer.setNamesrvAddr("127.0.01:9876");消费者开始();MapoffsetTable=newHashMap();SetmsgQueueList=consumer.fetchSubscribeMessageQueues("TOPIC_TEST");//获取该主题的所有队列if(msgQueueList!=null!msgQueueList.isEmpty()){booleannoFoundFlag=false;while(this.s.running){if(noFoundFlag){// 没有找到消息,停止消费Thread.sleep(1000);}for(消息队列q:msgQueueList){PullResultpullResult=consumer.pull(q,"*",decivedPulloffset(offsetTable,q,consumer),3000);System.out.println("pullStatus:"+pullResult.getPullStatus());开关(pullResult.getPullStatus()){案例发现:doSomething(pullResult.getMsgFoundList());休息;案例NO_MATCHED_MSG:中断;案例NO_NEW_MSG:案例OFFSET_ILLEGAL:noFoundFlag=true;休息;默认值:继续;}//提交站点consumer.updateConsumeOffset(q,pullResult.getNextBeginOffset());}System.out.println("balacne队列为空:"+consumer.fetchMessageQueuesInBalance("TOPIC_TEST").isEmpty());}}else{System.out.println("结束,因为队列是空的");}consumer.shutdown();System.out.println("消费者关闭");}catch(Throwablee){e.printStackTrace();}}}/**具体处理逻辑 */privatestaticvoiddoSomething(Listmsgs){System.out.println("本次拉取消息数:"+msgs.size());}publicstaticlongdecivedPulloffset(MapoffsetTable,MessageQueuequeue,DefaultMQPullConsumerconsumer)throwsException{longoffset=consumer.fetchConsumeOffset(queue,false);}如果(偏移量<0){偏移量=0;}System.out.println("偏移量:"+偏移量);返回偏移量;}staticclassSemaphore{publicvolatilebooleanrunning=true;}}消息抓取的实现主要在任务Task的run方法中,重点:首先根据MQConsumer的fetchSubscribeMessageQueues方法获取Topic的所有队列信息然后遍历所有队列依次从Broker端拉取消息通过MQConsuemr的PULL方法消费拉取的消息。通过调用MQConsumer的updateConsumeOffset方法更新位置,但需要注意的是,该方法并不是实时提交给Broker的,而是客户端会开启线程,默认每5s向Broker上报一次.上面的例子逻辑已经很清晰了,但是我们在使用的时候需要考虑以下问题:从broker拉取一批消息后,需要多个消费者手动完成队列分配。上面的例子只是一个消费者组,组内只有一个消费者。如果有多个消费者,我们需要考虑队列的分布。消费消息后,我们需要主动上报消费进度,然后拉取下一批。如果遇到消息消费失败,需要通知Broker消息消费失败,需要稍后重试,通过手动调用sendMessageBack方法实现描述,在MQPullConsumer类中,有一个MessageQueueListener,它的作用就是当队列发生变化的时候,通知Consumer。也正是这个接口帮助我们实现了Pull模式下的负载均衡。/***MessageQueueListener由应用程序实现,可以在消息队列更改时指定*/publicinterfaceMessageQueueListener{/***@paramtopic消息主题*@parammqAll此消息主题中的所有队列*@parammqDividedcollection队列,分配给当前消费者*/voidmessageQueueChanged(finalStringtopic,finalSetmqAll,finalSetmqDivided);}我们再来看一下PUSH使用的示例publicstaticvoidmain(String[]args)抛出InterruptedException,MQClientException{DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("dw_test_consumer_6");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TOPIC_TEST","*");consumer.setAllocateMessageQueueStrategy(新分配teMessageQueueAveragelyByCircle());consumer.registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,ConsumeConcurrentlyContextcontext){try{System.out.printf("%sReceive:%NewMessage,Thread.currentThread().getName(),msgs);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}catch(Throwablee){e.printStackTrace();returnConsumeConcurrentlyStatus.RECONSUME_LATER;}}});consume.start();System.out.printf("消费者开始.%n");}管理流程:首先新建一个DefaultMQPushConsumer对象,指定一个消费者组名称,然后通过调用setConsumeFromWhere方法设置相关参数,如nameSrvAdd、失败重试次数、线程数等指定第一次启动时从哪里消费,默认消费最新消息。通过调用setAllocateMessageQueueStrategy指定队列加载机制,默认均匀分布。通过调用registerMessageListener设置消息监听器,即消息处理逻辑,最后返回CONSUME_SUCCESS(消费成功)或RECONSUME_LATER(需要重试)。与PULL方式相比,使用PUSH方式时,我们只需要指定相关策略,然后在MessageListener回调中处理消息即可。由于PUSH消息方法返回的是消息的状态,服务端会维护每个消费者的消费进度,内部记录消费进度,消息发送成功后更新消费进度。另外我们不需要过多干预队列的负载,这些问题都被封装了。本文主要介绍了两种消费方式:PULL和PUSH,同时也介绍了两者的特点,然后分别给出了相应的例子,然后简单总结一下:PULL:消费者订阅主题,然后在集群中自动发送消息队列动态加载自动拉取消息。准实时。PUSH:消费者无需订阅主题,业务方(应用)直接根据MessageQueue拉取消息。与PULL方法相比,PUSH方法封装的更多,使用起来更方便。大多数场景通常使用PUSH方式。参考文章:08消息消费API及版本变更说明【RocketMq实战第四篇??】——不同类型的消费者(DefaultMQPushConsumer&DefaultMQPullConsumer)RocketMQ消息消费模式Push-Pull模式RocketMQPush-Pull模式