当前位置: 首页 > Linux

RocketMq的认知

时间:2023-04-06 21:38:49 Linux

MQ相关介绍消息队列是一种可以实现系统异步通信的中间件,常用于解决系统异步解耦和请求填峰(TPS)问题。即面向开发者,而不是最终用户可以直接使用的产品;三种消息协议JMS(JavaMessageService)JMS本质上是一个JAVAAPI。生产者、消费者和提供者在JMS中定义。Producer是消息的发送者,Consumer是消息的接收者,Provider是服务的提供者。Producer和Consumer统称为Client。JMS定义了两种消息模型:点对点和发布-订阅。在发布-订阅模型中,消息通过主题进行路由。生产者可以向指定的主题发送消息,消费者可以通过订阅该主题来接收生产者发送的消息。生产者可以向一个或多个主题发送消息,消费者也可以消费一个或多个主题中的消息。一个主题也可以有多个生产者或消费者。生产者和消费者只需要关联主题,而不管消息是谁发送或消费的。提供者为每个主题维护一个或多个队列来存储消息。消息在队列中排序并遵循先进先出的原则。不同队列之间的消息乱序。点对点模式没有topic的概念。生产者直接将消息发送到指定队列,消费者也指定队列进行消费。消息只能被一个消费者消费,不能被多个消费者消费。Kafka和RocketMQ都实现或部分实现了JMS协议。AMQP(AdvancedMessageQueuingProtocol)【高级消息队列协议】不同于JMS。AMQP是应用层网络传输协议,定义了消息格式,与开发语言无关。在AMQP中,同样有生产者和消费者两种角色,消息也存储在队列中。但是不同于JMS使用topic来路由消息,AMQP的路由方式是由exchange和binding决定的。客户端可以创建一个队列,并在创建队列时通知交换机该队列将接受什么条件。这个条件就是Bingdingkey。当生产者向交换器发送消息时,它会指定一个路由器密钥。exchange收到消息后,会和自己维护的Bingdingkey进行比对,发送到满足条件的队列中。消费者在消费时指定一个队列进行消费。RabbitMQ实现了AMQP协议。MQTT(MessageQueuingTelemetryTransport)MQTT协议是一种基于发布和订阅的轻量级协议。它同时支持TCP和UDP连接方法。主要应用于即时通讯、小型设备、移动应用等领域。MQTT中一共有三种角色:Publisher、Subscriber和Broker。Broker是服务提供者,发布者和前两个协议中的生产者一样,向Broker发送消息(Message),Subscribe从Broker获取消息并进行业务处理。MQTTMessage中的固定头(Fixedheader)只有2个字节,开销很小。此外,它还分为两部分:可变头(Variableheader)和消息体(payload)。固定头包含消息类型、消息级别、可变长度头的大小、消息体的总长度等信息。可变长度报头根据消息类别包含不同的标识信息。MQTT允许客户端动态创建主题。发布者与服务器建立会话后,可以通过Publish方法向服务器上相应的主题发送数据。订阅者通过Subscribe订阅主题后,服务器会将主题Push中的消息发送给相应的订阅者。RocketMq-架构组件NameServerNameServer是一个几乎无状态的节点,可以部署在集群中,节点之间不需要任何信息同步。它们是独立并行的,各自保留着一套完整的Broker、Topic等集群信息。非常简单的Topic路由注册中心;支持Broker动态注册和发现;Broker心跳检测;为Producer和Consumer集群提供Topic路由功能(维护Topic和Broker的映射关系);BrokerServerBroker主要负责消息的存储、传递和查询以及服务的高可用保证,包括四个主要模块。RemotingModule:整个Broker实体,负责处理来自客户端的请求;ClientManager:负责管理客户端(Producer/Consumer),维护ConsumerTopic订阅信息;StoreService:提供方便简单的API接口来处理消息并存储到物理硬盘和查询功能;HAService:高可用服务,提供MasterBroker和SlaveBroker之间的数据同步功能;IndexService:根据特定的Messagekey对传递给Broker的消息进行索引服务,提供消息的快速查询;Producer消息发布的角色,支持分布式集群部署,无状态信息。Consumer消息消费角色,支持分布式集群部署。它支持推送和拉取两种模式来消费消息。同时还支持集群模式和广播模式消费,提供实时消息订阅机制。网络部署特点NameServer是一个几乎无状态的节点,可以部署在集群中,节点之间不需要任何信息同步。Broker分为Master和Slave。一个Master可以对应多个Slave,但一个Slave只能对应一个Master。BrokerId为0表示Master,0以外的值表示Slave。每个Broker与NameServer集群中的所有节点建立持久连接,并定期向所有NameServers注册Topic信息。Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定时从NameServer获取Topic路由信息,与提供Topic服务的Broker节点中的Master建立长连接,并向其发送心跳大师定期。Producer是完全无状态的,可以集群部署。Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定时从NameServer获取Topic路由信息,与提供Topic服务的Broker节点中的Master和Slave建立长连接,并定时向Master和Slave发送心跳。连接关系NameServer是一个集群;Broker是一个集群,分为主节点和从节点,主节点可以读写,从节点只能读取消息,定时向所有NameServer发送心跳信息,定时向NameServer中注册Topic信息;Producer是一个集群Consumer是一个集群Producer随机建立一个长连接NameServer,从中获取Topic信息,与Topic所在的Master建立长连接,发送消息,定时发送心跳信息。Consumer随机连接一个NameServer,从中获取Topic信息,与Topic所在的Master或Slaver建立长连接,发送消息,定时发送心跳信息。工作流启动NameServer,当NameServer唤醒后监听端口,等待Broker、Producer、Consumer连接,相当于一个路由控制中心;Broker启动,与所有NameServer保持长连接,定时发送心跳包。心跳包中包含当前的Broker信息(IP+端口等),存储了所有的Topic信息。注册成功后,NameServer集群中的Topic和Broker之间就存在映射关系;在发送和接收消息之前,先创建一个Topic,在创建Topic的时候需要指定这个Topic存储在哪些Brokers上,或者在发送消息的时候自动创建一个Topic;Producer发送Message,启动时先与其中一个NameServer集群建立长连接,并从NameServer中获取当前发送的Topic存在于哪个Broker[Master]上,轮询从队列列表中选择一个队列,然后进行通信queue所在的Broker建立长连接向Broker发送消息;Consumer类似于Producer,与其中一个NameServer建立长连接,获取当前有哪些Broker订阅了Topic,然后直接与Broker建立连接通道,开始消费消息;消息存储整体结构CommitLog:消息主体和元数据存储体存放Producer写入的消息主体内容,消息内容不定长。消息主要按顺序写入日志文件。当文件满时,写入下一个文件;单个文件默认大小为1G,文件名长度为20位,左边补零,其余为起始偏移量。比如000000000000000000000代表第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件已满时,第二个文件为00000000001073741824,起始偏移量为1073741824,依此类推。ConsumeQueue:消息消费队列,引入的主要目的是提高消息消费的性能。由于RocketMQ是基于主题的订阅模式,消息消费是针对主题进行的。根据主题遍历commitlog文件来检索消息是非常低效的。.Consumer可以根据ConsumeQueue找到要消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,在CommitLog中保存指定主题下队列消息的起始物理偏移量offset、消息大小和消息Tag的HashCode值。consumequeue文件可以看作是一个基于topic的commitlog索引文件,所以consumequeue文件夹的组织结构为:topic/queue/file三层组织结构,具体存放路径为:$HOME/store/consumequeue/{主题}/{queueId}/{文件名}。同样,consumequeue文件采用定长设计。每个条目一共20个字节,分别是8字节的commitlog物理偏移量,4字节的消息长度,8字节的taghashcode。单个文件包含30W个条目。每个条目都可以像数组一样随机访问,每个ConsumeQueue文件的大小约为5.72M;IndexFile:IndexFile(索引文件)提供了一种通过key或者timeinterval来查询消息的方法。Index文件的存储位置为:$HOME/store/index/{fileName},文件名fileName以创建时的时间戳命名,固定单个IndexFile文件大小约400M,一个IndexFile可存2000Windexes,IndexFileHashMap的底层存储设计是为了实现文件系统中的HashMap结构,所以rocketmq索引文件的底层实现是哈希索引;从上面RocketMQ的消息存储整体架构图可以看出,RocketMQ采用了混合存储结构。即单个Broker实例下的所有队列共享一个日志数据文件(CommitLog)进行存储;RocketMQ的混合存储结构(多个Topics的消息实体内容存储在一个CommitLog中)分别用于Producer和Consumer。数据和索引部分分离的存储结构,Producer将消息发送给Broker端,然后Broker端使用同步或异步的方式将消息flush并持久化,保存到CommitLog;只要消息被刷新并持久化到磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer一定会有机会消费这条消息。当消息无法拉取时,可以等待下一次消息拉取。同时服务器也支持长轮询方式。如果一个消息拉取请求没有拉取消息,允许Broker等待30s,只要在这段时间内有新消息到达,就会直接返回给消费者。这里RocketMQ的具体做法是利用Broker端的后台服务线程——ReputMessageService不断分发请求,异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。Consumer消费是以“Topic”为粒度,而CommitLog是所有Topic消息的汇总存储。这个时候需要一个以Topic为维度的commitLog文件偏移量的索引,方便消费这个Topic下的数据,所以产生了ConsumeQueue。相当于一个Topic,有多个MessageQueue消息队列,然后将消息队列映射到ConsumeQueue消息消费队列,供Consumer快速消费这个Topic下的数据。RocketMQ术语Producer位于用户的进程中。Producer通过NameServer获取所有Broker的路由信息??,根据负载均衡策略选择将消息发送到哪个Broker,然后调用Broker接口提交消息。ProducerGroup生产者组,简单的说就是多个生产者发送相同类型的消息,称为一个生产者组。Consumer消息消费者,位于用户进程中。Consumer通过NameServer获取到所有broker的路由信息??后,向Broker发送Pull请求获取消息数据。Consumer可以有两种模式启动,广播(Broadcast)和集群(Cluster)。在广播模式下,消息将发送给所有消费者。在集群模式下,一条消息只会发送给一个消费者。ConsumerGroup消费者组,类似于生产者,多个消费同一类消息的Consumer实例组成一个消费者组。TopicTopic用于按主题划分消息,Producer将消息发送到指定的Topic,Consumer订阅该Topic后即可收到消息。主题与发送者或消费者没有很强的关系。发送者可以同时向多个主题发送消息,消费者也可以订阅多个主题的消息。在RocketMQ中,Topic是一个逻辑概念。消息存储不按主题分隔。Message代表一条消息,由MessageId唯一标识,发送时用户可以设置messageKey,方便后期查询跟踪。一个Message必须指定一个Topic,相当于邮寄地址。Message还有一个可选的Tag设置,这样消费者就可以根据Tag过滤消息。您还可以添加其他键值对。比如在Broker上查找消息需要一个业务key,方便开发时诊断问题。Tag标签可以认为是Topic的进一步细化。一般在同一个业务模块中通过引入标签来标记不同用途的标签。BrokerBroker是RocketMQ的核心模块,负责接收和存储消息,并提供Push/Pull接口向Consumers发送消息。Consumer可以选择从Master或者Slave读取数据。多个master/slave组成一个Broker集群,集群中Master节点之间没有数据交互。Broker还提供了消息查询的功能,可以通过MessageID和MessageKey查询消息。Borker会实时同步自己的Topic配置信息到NameServer。QueueTopic和Queue是一对多的关系。一个Topic可以包含多个Queue,主要用于负载均衡。发送消息时,用户只指定Topic,Producer会根据Topic的路由信息??选择发送到哪个Queue。Consumer订阅消息时,会根据负载均衡策略决定订阅哪些Queue消息。OffsetRocketMQ在存储消息时会为每个Topic下的每个Queue生成一个消息索引文件,每个Queue对应一个Offset来记录当前Queue中的消息条数。NameServerNameServer可以看作是RocketMQ的注册中心,管理着两部分数据:集群的Topic-Queue的路由配置;Broker的实时配置信息。其他模块通过Nameserv提供的接口获取最新的Topic配置和路由信息。Producer/Consumer:通过查询接口获取Topic对应Broker的地址信息Broker:将配置信息注册到NameServer,实时更新Topic信息到NameServer