RocketMQ消费端将学习如何使用RocketMQ消费消息。首先,简单地创建一个Maven项目并添加依赖项。org.apache.rocketmqrocketmq-client4.9.2启动消费者包mq.consumer;importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;导入org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;导入org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;导入org.apache.rocketmq.client.consumer。listener.MessageListenerConcurrently;导入org.apache.rocketmq.client.exception.MQClientException;导入org.apache.rocketm.common.message.MessageExt;导入org.apache.rocketmq.common.protocol.heartbeat.MessageModel;导入java.util。List;publicclassBroadcastConsumer{publicstaticvoidmain(String[]args)throwsMQClientException{//创建推送模式的Consumer组DefaultMQPushConsumerpushConsumer=newDefaultMQPushConsumer("pushConsumer");pushConsumer.setNamesrvAddr("localhost:9876");//集群模式pushConsumer.setMessageModel(MessageModel.CLUSTERING);//订阅的主题标签pushConsumer.subscribe("topic_test01","Tag1||Tag2");pushConsumer.registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,ConsumeConcurrentlyContextcontext){System.out.printf(Thread.currentThread().getName()+"接收新消息:"+msgs+"%n");返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf("广播消费者开始.%n");}启动生成器packagemq.producer;importorg.apache.rocketmq.client.exception.MQBrokerException;importorg.apache.rocketmq.client.exception.MQClientException;导入org.apache.rocketmq.client.producer.DefaultMQProducer;导入org.apache.rocketmq.client.producer.SendResult;导入org.apache.rocketmq.common.message.Message;导入org.apache.rocketmq。remoting.common.RemotingHelper;importorg.apache.rocketmq.remoting.exception.RemotingException;importjava.io.UnsupportedEncodingException;publicclassSyncProducerV2{/***同步消息发送**@paramargs*@throwsMQClientException*@throwsMQBrokerException*@throwsRemotingException*@throwsInterruptedException*@throwsUnsupportedEncodingException*/publicstaticvoidmain(String[]args)throwsMQClientException,MQBrokerException,RemotingException,InterruptedException,UnsupportedEncodingException{System.out.println("SyncProducer开始......");DefaultMQProducerdefaultMQProducer=newDefaultMQProducer("pg_sync_01");defaultMQProducer.setNamesrvAddr("localhost:9876");defaultMQProducer.start();for(inti=0;i<10;i++){send(defaultMQProducer,i,i%3);}defaultMQProducer.shutdown();System.out.println("SyncProducer结束......");}privatestaticvoidsend(DefaultMQProducerdefaultMQProducer,Integeri,inttag)抛出MQClientException、RemotingException、MQBrokerException、InterruptedException、UnsupportedEncodingProducer。("topic_test01","Tag"+tag,("hellothisissyncmessage_"+i+"!").getBytes(RemotingHelper.DEFAULT_CHARSET)));System.out.println(sendResult);}}consumerconsumption可以看到消费带有Tag1和Tag2等标签的消息会被过滤掉。消费分类RocketMQ有两种消费模式:BROADCASTING(广播)和CLUSTERING(集群)。这两种模式有什么区别?广播:同一个消费组下的实例会重复消费同一个主题的消息。可以理解为大家做同样的工作,消费进度存储在客户端,可能会导致部分消息没有被消费。Cluster:同一个消费组下的实例,会负载均衡地消费同一个Topic的消息,可以理解为分工协作,消费进度存储在Bro在ker端,大部分系统会使用集群模式来消费信息。毕竟消费者可以横向扩展,承受更大的消费压力。广播模式用的比较少。一般是消息通知同步的场景。如果你想同步刷新缓存等这篇博文多发平台OpenWrite发布吧!