本文转载请联系SH全栈笔记公众号。基本概念Broker首先,我们要知道我们在使用RocketMQ的时候都经历了什么。即生产者向RocketMQ发送消息,RocketMQ获取消息并持久化存储,然后消费者去MQ消费消息。RocketMQ运行上图中,RocketMQ被识别为单点,但实际上肯定不是这样。对于随时可以横向扩容的服务,生产者生产给MQ的消息数量也会相应发生变化,所以一个合格的成熟的MQ必须能够处理这种情况;并且MQ本身需要高可用,否则一旦这个单点宕机,MQ存储的所有消息都会丢失,无法找回。所以在实际生产环境中,肯定会部署一个MQ集群。在RocketMQ中,这个“实例”有一个专门的名词叫Broker。而且每个Broker都会部署一个SlaveBroker,MasterBroker会周期性的和SlaveBroker同步数据,形成Broker主从架构。那么问题来了。在微服务架构中,部署的服务也有多实例部署。服务之间的相互调用,通过注册中心获取对应服务的实例列表。以SpringCloud为例,服务通过Eureka注册中心获取一个服务的所有实例,然后交给Ribbon,Ribbon链接Eureka从Eureka获取服务实例列表,然后通过加载选择一个实例平衡算法,最后发起询问。同理,此时MQ中有多个Broker实例,那么生产者如何知道MQ集群中有多少个Broker实例呢?我应该连接到哪个实例?首先,我们直接排除代码中的HardCode。我认为不应该出于特定原因使用它。更多关于那个。RocketMQ是如何解决这个问题的?这就是我们接下来要介绍的NameServer。NameServerNameServer可以简单理解为上一节提到的注册中心。所有的Broker在启动时都会向NameServer注册并上报自己的信息。这些信息除了Broker的IP和端口相关数据外,还包括RocketMQ集群的路由信息??。路由信息我们后面再说。RocketMQ操作有一个NameServer。客户端启动后,会与NameServer进行交互,获取当前RocketMQ集群中的所有Broker信息和路由信息。这样producer就知道了自己需要连接的Broker信息,就可以进行消息的投递了。那么问题来了,如果一个Broker在运行过程中突然宕机,NameServer会如何处理呢?这就需要提到RocketMQ的合约续约机制和故障感知机制。Broker在NameServer完成注册后,每30秒向NameServer发送心跳续约;如果NameServer感知到某个Broker超过120秒没有发送心跳,它将认为该Broker不可用并将其从自己维护的信息中移除。这种机制与Eureka在SpringCloud中的实现如出一辙。Eureka中的Service在启动后也会向Eureka注册自己,以便其他服务向该服务发起请求并交换数据。服务将每30秒向Eureka发送一次心跳更新。如果一个Service超过90秒没有发送心跳,Eureka将认为该服务已关闭,并将其从Eureka维护的注册表中删除。在上图中,我讲了多实例部署。这种多实例部署和微服务中的多实例部署是不一样的。在微服务中,所有服务都是无状态的,可以水平扩展。在RocketMQ中,每个Broker存储的数据可能是不同的。下面来看看RocketMQ的简单使用。Messagemsg=newMessage("TopicTest","TagA",("HelloRocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResultsendResult=producer.send(msg);可以看到Message的第一个参数就是当前Thismessage指定了一个Topic,那么什么是Topic呢?TopicTopic是发送给RocketMQ的消息的逻辑分类。比如我们的订单系统、积分系统、仓储系统都会用这个MQ来区分,我们可以为不同的系统创建不同的Topics。那为什么叫逻辑分区呢?因为RocketMQ在realstorage中不存储一个topic的数据,原因很简单。如果当前Broker宕机,甚至极端情况下磁盘坏了,那么这个topic的数据将永久丢失。所以在真实存储中,消息是分布存储在多个Broker上的。这些分散在多个Broker上的存储介质,我们称之为MessageQueue。如果熟悉Kafka的底层原理,就知道这和Kafka中的Partition是一样的。类似的实现。MessageQueue存储从上图可以看出,同一个Topic的数据被分成若干部分,分别存储在不同的Broker上。RocketMQ为什么要实现这个?第一,如果一个Topic中只有一个Queue,那么consumer的消费速度势必会受到影响;而如果一个Topic有很多Queue,那么Consumer可以同时进行消费操作,从而承载更多的并发。另外,单机的资源是有限的。一个topic的消息量可能非常大,很快就会把一台机器的磁盘塞满。所以RocketMQ将一个Topic的数据分布到多台机器上去中心化存储。它本质上是一种数据分片存储的机制。所以我们知道发送到一个Topic的数据是分布存储在多个Brokers中的MessageQueue上的。Broker消息存储原理那么Producer发送给Broker的消息是如何存储的呢?答案是提交日志。Broker收到消息后,会按顺序写入消息,追加到磁盘上的CommitLog中。文件中,每个CommitLog的大小为1G。如果1G满了,会新建一个CommitLog继续写入。CommitLog文件的特点是顺序写入和随机读取。Topicdetails这是最底层的存储方式,那么问题来了,当Consumer来拿消息的时候,Broker是怎么从一堆CommitLog中找到对应的数据呢?众所周知,说到磁盘I/O操作,你会想到耗时这个词,而RocketMQ的一大特点就是高吞吐量,这看起来很矛盾。RocketMQ是怎么做到的?答案是ConsumeQueue。Broker在写入CommitLog时,也会将当前消息在CommitLog中的Offset、消息的Size、对应Tag的Hash写入ConsumeQueue文件中。每个MessageQueue都会有一个相应的ConsumeQueue文件存储在磁盘上。和CommitLog一样,一个ConsumeQueue包含30W条消息,每条消息大小为20字节,所以每个ConsumeQueue文件的大小约为5.72M;满了会新建一个ConsumeQueue文件继续写入回车。ConsumeQueue不仅是一个逻辑队列,还是一个索引,让Consumer在消费时可以快速从磁盘文件中定位到这条消息。看到这里,你可能会想,上面说的Tag是什么?TagTag,标签,用于对同一Topic中的消息进行分类。为什么要对Topic的消息类型进行分类?举个极端的例子,某个新的服务需要消费订单系统的MQ,但是由于业务的特殊性,它只需要消费数码产品的订单消息,如果没有Tag,那么Consumer会判断,订单消息是否为数字产品,如果不是则丢弃,如果是则消费。这样Consumer端就做了很多无用的工作。引入tag后,Producer在生产消息时会打上顺序的tag,而Consumer在消费时,可以配置只消费指定Tag的消息。这样就不需要Consumer自己来做这个了,RocketMQ会帮我们实现这个过滤。过滤的原理是什么?首先在Broker端利用消息中保存的Tag的Hash值进行过滤,然后Consumer端在拉取消息时需要再次过滤。为什么在Broker上过滤完还要在Consumer端再过滤?因为Hash冲突,不同的Tags经过Hash算法后可能得到相同的值,所以Consumer端在拉取消息时会通过字符串进行二次过滤。Producer发送消息的源码分析流程概览。首先,给出整个消息发送过程的大概流程。先熟悉一下流程再看看源码,会比较清楚。初始化Prodcuer的整体流程还是按照下面的例子。Producer使用示例首先我们会初始化一个DefaultMQProducer,RocketMQ会给这个Producer一个默认的DefaultMQProducerImpl实现。然后producer.start()将启动一个线程池。有效性验证下一步是比较核心的producer.send(msg)。首先,RocketMQ会调用checkMessage检查发送的消息是否合法。发送消息的这些检查包括要发送的消息是否为空,Topic是否为空,Topic是否包含非法字符串,Topic的长度是否超过最大限制127,然后检查Body是否满足发送要求,比如msg的消息体是否为空,msg的消息体是否超过最大限制等,这里的消息体最大不能超过4M。检查消息合法性源码调用sendmessage对于msg主题,RocketMQ会用NameSpace对其进行包装,然后在DefaultMQProducerImpl中调用sendDefaultImpl的默认实现向Broker发送消息。发送消息的默认超时时间为3秒。发送消息是默认实现的。在发送消息时,MQ会再次调用checkMessage检查消息的合法性,然后尝试获取Topic的详细信息。所有的Topic信息都会存储在一个名为topicPublishInfoTable的ConcurrentHashMap中。此Map中的Key是Topic字符串,Value是TopicPublishInfo。这个TopicPublishInfo包含了从基本概念中提到的Broker中获取到的相应元数据,其中包含了关键的MessageQueue和集群元数据。基本结构如下。Topic详情messageQueueList包含了该Topic下所有的MessageQueue,每个MessageQueue所属的Topic,每个MessageQueue所在的Broker的名称,以及专属的queueId。topicRouteData包含了Topic下所有Queue和Broker相关的数据。获取topic详细数据在最终发送消息之前,需要获取topic详细信息,比如Broker地址等数据,在Producer中通过tryToFindTopicPublishInfo方法获取。我在下图中写了详细的注释。获取topic详情对于第一次使用的Topic,上面的Map肯定是不存在的。所以RocketMQ会把它添加到Map中,调用updateTopicRouteInfoFromNameServer方法从NameServer中获取Topic的元数据,写入到Map中。除了第一次,路由信息和Broker详情会分别放入topicRouteTable和brokerAddrTable中,两者都是Producer在内存中维护的ConcurrentHashMap。获取到topic的详细信息后,接下来就是确认timesTotal重试发送次数。假设timesTotal为N,那么如果发送消息失败,会重试N次。但是只有发送失败才会重试,其他情况不会重试,比如超时,或者没有选择合适的MessageQueue。重试次数timeTotal受参数communicationMode影响;CommunicationMode有三个值,分别是SYNC、ASYNC和ONEWAY。在RocketMQ的默认实现中,选择了SYNC同步。计算重试次数通过代码可以看出,如果communicationMode为SYNC,timesTotal的值为1+retryTimesWhenSendFailed,retryTimesWhenSendFailed的值默认为2,表示发送消息失败后的重试次数发送。这样,如果我们选择SYNC方式,那么Producer发送消息时默认的重试次数是3次。但是只有当且仅当发送失败时才会进行重试,其余情况不会。我们之前讲过MessageQueue的选择机制。一个topic的数据分片存储在一个或多个Broker上。底层存储介质是MessageQueue。上图中我们并没有展示Producer是如何选择发送到哪个MessageQueue的,这里我们通过源码来看一下。在Producer中,通过selectOneMessageQueue选择MessageQueue。该方法根据上次选择MessageQueue的Topic和Broker的详细元数据来确定下一次选择。核心选择逻辑什么是核心选择逻辑?通俗点说就是选择一个索引,然后对它取模,和当前Topic的MessageQueues个数。第一次选择这个索引的时候,一定要不存在,RocketMQ会生成一个随机数。然后在这个值的基础上+1,因为一般来说,从外层来说,这个索引上次已经用过了,所以每次拿到的时候,直接帮我+1就好了。核心选择机制上图是MessageQueue核心底层的原理机制。但是由于实际业务情况的复杂性,RocketMQ在实现上做了很多额外的事情。在实际的选择过程中,sendLatencyFaultEnable下的选择逻辑会判断当前sendLatencyFaultEnable是否开启。这由变量sendLatencyFaultEnable的值决定。默认值为false,表示默认不启用。从代码中,我找不到打开它的地方。但是我们可以谈谈当它打开时会发生什么。它还会开启一个for循环,次数就是MessageQueue的个数。计算得到某个Queue后,会使用内存中的一张表faultItemTable来判断当前Broker是否可用。每次发送消息时都会更新此表。.如果当前没有可用的Broker,就会触发它的自下而上的逻辑,然后选择一个MessageQueue出来。选择队列的源代码。传统的选择逻辑。如果不开启当前发送失败延时,则采用常规逻辑。在for循环中将计算相同的内容。循环中取出MessageQueue后,会判断与上次选择的MessageQueue是否属于同一个Broker,如果是同一个Broker,则重新选择,直到选择了一个不属于同一个Broker的MessageQueue,或者直到循环结束。这也是为了均匀分布和存储消息,防止数据倾斜。常规逻辑下的选择逻辑消息发送最终会调用Netty相关组件发送消息。EOF会在这里分析RocketMQ中的一些基本概念和RocketMQ的Producer发送消息的源码,其他部分的源码看完fate再分享。
