介绍示例环境:springboot2.3.9+RocketMQ4.8.0RocketMQ组织和概念1消息模型(MessageModel)RocketMQ主要由Producer、Broker、Consumer组成。Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。Broker在实际部署过程中对应一个服务器,每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储在不同的Broker中。MessageQueue用于存储消息的物理地址,每个Topic中的消息地址存储在多个MessageQueue中。ConsumerGroup由多个Consumer实例组成。2消息生产者(Producer)负责生产消息,一般由业务系统负责生产消息。消息生产者将业务应用系统中产生的消息发送给中介服务器。RocketMQ提供了多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步方式和异步方式都需要Broker返回确认信息,单向传输不需要。3消息消费者(Consumer)负责消费消息,通常后台系统负责异步消费。消息使用者将从Broker服务器拉取消息并将它们提供给应用程序。从用户应用的角度,提供两种消费形式:拉取消费和推送消费。4主题(Topic)表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。5代理服务器(BrokerServer)消息中转角色,负责存储和转发消息。在RocketMQ系统中,代理服务器负责接收和存储生产者发来的消息,同时为消费者的pullrequest做准备。代理服务器还存储消息相关的元数据,包括消费组、消费进度偏移量、主题和队列消息。如下图所示:6名称服务(NameServer)名称服务充当路由消息的提供者。生产者或消费者可以通过名称服务为每个主题查找对应的BrokerIP列表。多个Namesrv实例组成一个集群,但它们相互独立,不存在信息交互。7拉式消费者(PullConsumer)是Consumer消费的一种。应用通常主动调用Consumer的消息拉取方法从Broker服务器拉取消息,主动权由应用控制。一旦获取了一批消息,应用程序就开始消费过程。8推送消费者(PushConsumer)是Consumer消费的一种。在这种模式下,Broker收到数据后会主动推送给消费者。这种消费模式一般具有较高的实时性。9生产者组(ProducerGroup)是同类型Producer的集合,该类型Producer发送同类型消息,发送逻辑一致。如果发送事务性消息,原生产者发送后崩溃,Broker服务器会联系同一生产者组的其他生产者实例进行提交或回溯消费。10消费者组(ConsumerGroup)是同一类型消费者的集合。这些消费者通常消费相同类型的消息,具有一致的消费逻辑。消费组使得在消息消费方面很容易达到负载均衡和容错的目的。需要注意的是,消费者组的消费者实例必须订阅完全相同的主题。RocketMQ支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。11集群消费(Clustering)在集群消费模式下,同一个ConsumerGroup的每个Consumer实例均等地共享消息。12广播消费(Broadcasting)在广播消费模式下,同一个ConsumerGroup的每个Consumer实例都会接收到全量的消息。13正常有序消息(NormalOrderedMessage)在正常顺序消费模式下,消费者通过同一个消费队列接收到的消息是有序的,而不同消息队列接收到的消息可能是乱序的。14严格有序消息(StrictlyOrderedMessage)在严格有序消息模式下,消费者收到的所有消息都是有序的。15消息(Message)消息系统传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中的每条消息都有一个唯一的MessageID,可以携带一个带有业务标识的Key。系统提供了通过MessageID和Key查询消息的功能。16标签(Tag)是为消息设置的标志,用于区分同一主题下不同类型的消息。对于来自同一业务单元的消息,可以根据不同的业务目的,在同一主题下设置不同的标签。标签可以有效保持代码的清晰度和连贯性,优化RocketMQ提供的查询系统。消费者可以基于Tags对不同的subtopic实现不同的消费逻辑,以达到更好的扩展性。ActiveMQ、Kafka、RocketMQ对比:RocketMQ服务1下载RocketMQ2配置环境变量3启动NameServer4启动Broker5通过命令行收发消息设置环境变量:C:\Users\MSI-NB>setNAMESRV_ADDR=localhost:9876发送消息:C:\Users\MSI-NB>toolsorg.apache.rocketmq.example.quickstart.Producer收到消息:SpringBoot集成RocketMQ入口依赖:
