TopicTopic是消息的集合,是一个逻辑分区。为什么叫逻辑分区呢?因为最终的数据是存储在Broker上的,为了满足高可用,所以采用了分布式存储。这与Kafka中的实现完全相同。Kafka的Topic也是一个逻辑概念。每个Topic的数据会被分成很多部分,然后存储在不同的Broker上。这个“分区”叫做Partition。在RocketMQ中,Topic数据也会采用分布式的方式存储。这个“共享”称为MessageQueue。它的分布可以用下图表示。这样,如果某个Broker所在的机器意外挂掉,MessageQueue中的数据还没有持久化到磁盘,那么Topic下的这部分消息就会彻底丢失。如果此时有备份,MQ就可以继续对外提供服务。为什么还会出现没有持久化到磁盘的情况?在现在的OS中,程序将数据写入文件后,并不会马上写入磁盘,因为磁盘I/O是一个非常耗时的操作。观看是一个非常缓慢的操作。因此,写入文件的数据会先写入OS自己的缓存中,然后在合适的时候将Buffer中的数据异步刷新到磁盘中。通过多副本冗余机制,RocketMQ具有高可用的特点。另外,分布式存储可以处理后期业务中大量的数据存储。如果不使用分布式存储,那么随着后期业务的发展,消息量会越来越大,单机无论如何都满足不了RocketMQ消息的存储需求。如果不做任何处理,机器的磁盘将永远是满的。这时系统将不具备可扩展性的特点,无法满足业务的要求。但是这里的可扩展性与微服务中的服务可扩展性是不一样的。因为在微服务中,每个服务都是无状态的。Broker是有状态的,每个Broker上存储的数据是不同的,因为Producer在发送消息时会从MessageQueue列表中选择一个MessageQueue通过指定的算法发送消息。如果你对这种横向扩展不是很理解,可以把它想象成一个RedisCluster,通过一致性哈希在RedisCluster中选择一个特定的节点,然后将数据写入RedisMaster。如果此时方便扩容,只需要在RedisCluster中增加一个Master节点即可。因此,分布式数据存储本质上是一种数据分片机制。在此基础上,通过冗余的多副本实现高可用。BrokerBroker在我们的微服务中可以理解为一个服务的实例,因为我们在微服务中的服务一般都是用多个实例部署的,RocketMQ也是如此。多实例部署可以帮助系统处理更多的流量。它还在某些方面提高了系统的健壮性。在RocketMQ4.5之前,它采用的是master-slave架构,每个MasterBroker都有自己的SlaveBroker。RocketMQ的主从Broker是如何进行数据同步的?Broker启动时,会启动一个定时任务,定时从MasterBroker同步全量数据。这个你不用管,后面我们会通过源码来验证主从同步逻辑。上面说了Broker会部署很多实例,那么既然部署了多实例,肯定有一个问题,客户端怎么知道自己连接的是哪个服务器呢?如何知道对应Broker的IP地址和端口?如果一个Broker突然挂了怎么办?名称服务器需要名称服务器。什么是名称服务器?我们以SpringCloud为例——当SpringCloud中的服务启动时,它会向Eureka注册中心注册自己。服务实例启动时,会从Eureka中拉取完整的registry,然后周期性地从Eureka中进行增量同步,每30秒向Eureka发送心跳来续约。如果Eureka检测到某个服务超过90秒没有发送心跳,该服务就会宕机,并从注册中心中删除。在RocketMQ中,NameServer起着类似的作用。两者的功能也有一定的区别。Broker启动时会向NameServer注册自己,每30秒向NameServerv发送一次心跳。如果某个Broker超过120秒不发送心跳,则认为该Broker宕机,将从维护信息中移除。这块后面也会从源码层面进行验证。当然,NameServer中不仅保存了各个Broker的IP地址和端口,还保存了对应Topic的路由数据。什么是路由数据?即某个Topic下的哪个MessageQueue在哪个Broker上。Producer的整体流程接下来我们看一下Producer向Broker发送消息时会做些什么。整体流程如下。校验消息的合法性,整体来说其实是一个很简单的操作。和我们平时写代码一样。当提出请求时,首先验证请求是否合法。Producer启动时,会校验当前Topic数据的有效性。主题名称是否包含非法字符。主题名称的长度是否超过最大长度限制由常量TOPIC_MAX_LENGTH决定,其默认值为127。当前消息体是否为NULL或空消息。当前消息体是否超过最大限制由常量maxMessageSize决定,值为1024*1024*4,即4M。这些都是很常规的操作,类似于我们平时写的跳棋。获取主题的详细信息。通过消息有效性检查后,还需要继续往下走。此时,重点应该从消息是否合法转移到我想将消息发送给谁。此时需要通过当前消息所属的topic获取该topic的详细数据。上面已经给出了获取Topic的方法源码。首先,数据会从内存中维护的一个Map中获取。对了,这里的Map是ConcurrentHashMap,是线程安全的,类似于Golang中的Sync.Map。当然,如果是第一次发送,Map必须是空的。此时会调用NameServer接口,通过Topic获取详细的Topic数据。这时候就会在上面的方法中添加到Map中。这样,再次向Topic发送消息,直接从内存中获取。下面是缓存机制的简单实现。从方法名来看,路由数据是通过Topic获取的。实际上,该方法是通过调用NameServer提供的API更新两部分数据,即:Topic路由信息主题下Broker相关信息,这两部分数据来自同一个结构体TopicRouteData。它的结构如下。从源码可以看出,里面包含了MessageQueue相关的数据和Topic下所有Broker的地址信息。具体要发送的Queue此时,我们已经获取了需要发送给Broker的详细信息,包括地址和MessageQueue,那么问题的重点应该从“消息发送给谁”转移到“消息具体发送到哪里”。什么东西寄到哪里?开头提到了一个topic会被分到很多MessageQueue中,“发到哪里”是指发送到哪个MessageQueue中。MessageQueue选择机制的核心选择逻辑是先给出流程图的核心逻辑。通俗点说就是取一个随机数和MessageQueue的容量取模。这个随机数保存在ThreadLocal中,第一次计算的时候会直接随机一个数。之后直接从ThreadLocal取值,返回+1。得到MessageQueue个数和随机数这两个关键参数后,就会执行最终的计算逻辑。接下来我们看一下选择MessageQueue的SelectOneMessageQueue方法是干什么的。可以看出主要逻辑被变量sendLatencyFaultEnable分成了两部分。容错机制下的选择逻辑该变量表示发送延迟故障。它本质上是一种容错策略。在原有MessageQueue选择的基础上,过滤掉不可用的Broker,将之前失败的Broker回退一定时间。可以看出,如果在调用Broker信息时出现异常,则会调用updateFault方法更新Broker的Aviable状态。请注意,此参数isolation的值为true。下面我们从源码层面来验证一下上面提到的3000ms的backoff。可以看出,如果隔离值为真,则三元运算符计算的持续时间为30000,即30秒。所以我们可以得出结论,如果发送消息时抛出异常,那么Broker会在30秒内直接设置为不可用。而如果发送延迟高,则会根据延迟的具体时间,根据下图判断设置多少时间不可用。例如,如果最后一个请求的延迟超过550ms,则将退避3000ms;如果超过1000,则退回60000;一般情况下,如果不开启当前发送故障延时,会按照常规逻辑,同样会进入for循环计算。循环中拿到MessageQueue后,会判断是否和上次选中的MessageQueue属于同一个Broker。如果是同一个Broker,就会重新选择,直到不属于同一个Broker的MessageQueue被选中,或者直到循环结束。这也是为了均匀分布和存储消息,防止数据倾斜。选择具体的MessageQueue发送消息后,就会开始执行发送消息的逻辑,调用Netty底层接口发送出去。这块暂时没什么可看的。Broker的启动过程的主从同步上面说了。RocketMQ有自己的主从同步,但是有两个不同的版本。分水岭版本是4.5版。这两个版本有什么区别?4.5之前:有点类似于Redis,我们通过slave的命令手动将某台机器换成另外一台RedisSlave节点,这样就变成了比较原始的一主一从结构。为什么说原始?因为如果此时Master节点宕机,我们就需要人肉来做failover。RocketMQ的主从架构也是如此。4.5后:引入Dleger,可以实现一主多从,实现自动故障转移。这和Redis后来推出的Sentinel是一样的。德莱格也有类似的效果。下图是Broker启动代码中的源码。可以看到判断是否开启了Dleger,默认是不开启的。所以里面的逻辑就会被执行。我们刚刚看到有Rocket主从同步数据的相关代码。如果当前Broker节点的角色是Slave,则会启动周期性定时任务,周期性(即10秒)去MasterBroker同步全量数据。同步的数据包括:Topic相关配置、Cosumer的消费offset、延迟消息Offset、订阅组相关数据和配置,注册的broker完成主动同步定时任务启动后,会调用registerBrokerAll注册broker。这里可能会有一些疑问。我这里是启动Broker,只有一个Broker实例。全部是什么意思?All是指所有NameServer。当Broker启动时,它会向每个NameServer注册自己。为什么不只注册一个名称服务器就可以了?这样也可以提高效率。说到底还是高可用的问题。如果Broker只注册了一个NameServer,NameServer挂了怎么办?经纪人将不会对所有客户可见。实际上,Broker还在正常运行。去注册BrokerAll。可以看到,会判断是否需要注册。从上面的截图可以看出,此时forceRegister的值为true,是否注册就交给needRegister来决定了。为什么要判断是否注册?因为Broker一旦注册到NameServer上,由于Producer不断的在写数据,Consumers也在不断的消费数据,Broker也有可能因为故障而改变某些Topic下的MessageQueue等关键路由信息。这样NameServer中的数据就会和Broker中的数据不一致。判断是否需要注册的大致思路是Broker会从各个NameServer获取当前Broker数据,与当前Broker节点中的数据进行比较。每当一个NameServer的数据与当前Broker不一致时,就会进行注册操作。接下来,我们从源码层面来验证这个逻辑。我也在图中标出了关键逻辑。可以看出是通过比较Broker中的数据版本和NameServer中的数据版本来实现的。在这个版本中,在注册的时候,会写入注册数据,存储在NameServer中。由于这里有多个,RocketMQ使用线程池实现多线程操作,使用CountDownLatch等待所有的返回结果。经典的空间换时间,Golang也有类似的操作,就是sync.waitGroup。对于任何数据不匹配都会被重新注册,我们也从源代码层面进行了验证。可以看出,如果任何一个NameServer的数据发生变化,它就会中断并返回true。这里的结果列表是使用CopyOnWriteList实现的。因为这里是多线程执行的判断逻辑,普通链表不是线程安全的。CopyOnWriteArrayList之所以是线程安全的,是因为COW(CopyOnWrite),读取请求时共享同一个List。写请求的时候会复制一个List,写数据的时候加排他锁。与直接对所有操作加锁相比,读写锁的形式将读写请求分开,使它们互不影响,只对写请求加锁,减少了加锁次数,减少了加锁的消耗,并且提高整体操作的并发性。执行注册逻辑就是构建数据,然后多线程并发发送请求,使用CopyOnWriteArrayList保存结果。但是,正如我们上面提到的,当Broker注册时,它会将数据版本发送给NameServer并存储。这里我们可以看看发送给NameServer的数据结构。可以看出Topic的数据分为两部分,一部分是核心逻辑,另一部分是DataVersion,也就是我们刚才说的数据版本。Broker如何存储数据?我刚刚谈到了制片人。我最后提到的是向Broker发送消息就结束了。不知道大家有没有想过Broker是如何存储消息的?Commitlog先给出流程图,再给出结论。Producer发送的消息保存在一个叫做commitlog的文件中,Producer端每次写入的消息长度是不相等的。当CommitLog文件写入到1G时,会创建另一个新的CommitLog继续写入。这一次,采用顺序写入。那么问题来了,当Consumer来消费的时候Broker如何快速找到对应的消息呢?我们首先排除遍历文件搜索的方法,因为RocketMQ以高吞吐和高性能着称,运行很慢的情况肯定不可能采用这种方法。那么RocketMQ是怎么做到的呢?答案是ConsumerQueue什么是ConsumerQueue?这是一个文件。介绍的目的是什么?提高消费绩效。当Broker收到消息时,在写入CommitLog的同时,会将当前消息在commitlog中的offset、消息的大小、对应Tag的Hash写入consumer队列文件。每个MessageQueue都会有一个对应的ConsumerQueue文件存储在磁盘上。每个ConsumerQueue文件包含300,000条消息。每条消息的大小为20字节,包括8字节的CommitLogOffset和4字节的消息长度。,8字节的标签散列值。这样,每个ConsumerQueue的文件大小约为5.72M。当ConsumerQueue文件写满后,会新建一个ConsumerQueue文件继续写入。因此,ConsumerQueue文件可以看作是CommitLog文件的索引。负载均衡是什么意思?假设我们一共有6个MessageQueue,此时分布在3个Broker上,每个Broker包含两个队列。此时有3个Consumer,我们可以粗略的认为每个Consumer负责2个MessageQueues的消费。但是这里有个原则,就是一个MessageQueue只能被一个Consumer消费,一个Consumer可以消费多个MessageQueue。为什么?原因很简单。RocketMQ支持的顺序消费是指分区的顺序性,即在单个MessageQueue中,消息是顺序的。如果多个Consumer消费同一个MessageQueue,消费的顺序很难保证。由于消费多个MessageQueue的消费者较多,为了避免数据倾斜,合理分配和利用资源,Producer在发送消息时,需要尽可能均匀地将消息分发到多个MessageQueues中。同时,在上面一个Consumer消费两个MessageQueue的情况下,Consumer挂了怎么办?这两个MessageQueue不会没有人消费吗?以上两种情况分别是Producer端的负载均衡和Consumer端的负载均衡。.Producer端负载均衡关于Producer端的负载均衡,上面已经给出了流程图,并给出了源码的验证。首先是容错策略,会在一段时间内避免出现问题的Broker,如果最后一个被选中的Broker会重新被选中。Consumer端负载均衡首先,Consumer端负载均衡可以由两个对象触发:BrokerConsumer本身Consumer也会向所有Brokers发送心跳,发送消息的消费者组名、订阅关系集、消息通信方式和clientID,ETC。。Broker接收到消费者的心跳后,会存储在Broker维护的一个Manager中,命名为ConsumerManager。当Broker检测到Consumer数量发生变化时,会通知Consumer进行Rebalance。但是如果Broker通知Consumer进行Rebalance的消息丢失了怎么办?这就是为什么需要触发Consumer本身。Consumer在启动时会启动定时任务,周期性的执行rebalance操作。默认是每20秒执行一次。具体代码如下。具体流程首先,Consumer的Rebalance会获取本地缓存的所有Topic数据,然后向Broker发起请求,拉取该Topic和ConsumerGroup下的所有消费者信息。这里Broker的数据来源是Consumer之前心跳发送的数据。然后对Topic中的MessageQueue和consumerID进行排序,然后使用消息队列默认的分配算法进行分配。这里默认的分配策略是平均分配。首先,MessageQueue会按照类似分页的思路,平均分配给Consumer。如果分配不均匀,剩余的MessageQueue会按照排序的顺序从上到下依次分配。所以这里Consumer1分配给了4个MessageQueue,Consumer2分配给了3个MessageQueue。rebalance完成后,会将结果与Consumer缓存的数据进行比较,不在Rebalance结果中的MessageQueue会被移除,不存在的MessageQueue会被添加到缓存中。触发时机当Consumer启动时,启动后会立即重新平衡。Consumer在运行过程中会监听Broker发送的Rebalance消息,以及Consumer自身的定时任务触发的Rebalance。Consumer停止时不会直接调用Rebalance,而是通知Broker。当你下线时,Broker会通知其余的Consumers重新平衡。换个角度看,其实有两个方面,一个是队列信息变了,一个是消费者变了。源码验证然后给出核心代码验证,获取数据的逻辑如下,验证我们刚才说的获取本地Topic数据缓存,从Broker端拉取所有的ConsumerID。下一步就是验证刚才说的排序逻辑。下一步是查看源代码以确定结果是否已更改。可以看出,Consumer通知Broker策略,本质上是发送心跳,通过心跳将更新后的数据发送给所有的Broker。关于Consumer的更多细节可能是关于Consumer,我们用的多一点。比如我们知道可以设置集群消费和广播消息,分别对应RocketMQ中的CLUSTERING和BROADCASTING**。再比如,我们知道可以设置顺序消费和并发消费等,下面我们就通过源码来看一下这些功能在RocketMQ中是如何实现的。Consumer中的消费模型默认使用集群消费,这一点在Consumer的代码中也有所体现。不同的消费方式会影响管理抵消的具体实现。可以看出,当消费模型为广播模式时,Offset的持久化管理会使用LocalFileOffsetStorage;当消费模型为集群消费时,会使用RemoteBrokerOffsetStore。具体原因是什么?首先我们要知道广播模式和集群模式的区别:在广播模式下,ConsumerGroup中的每台机器都会消费一条消息。在集群模式下,一条消息只会被ConsumerGroup中的一台机器消费。所以在广播模式下,每个ConsumerGroup的消费进度是不一样的,所以Offset需要Consumer自己管理。在集群模式下,同一个ConsumerGroup下的消费进度其实是一样的,所以可以由Broker来管理。消费模式消费模式分为顺序消费和并发消费,分别对应MessageListenerOrderly和MessageListenerConcurrently两种实现方式。不同的消费方式会采用不同的底层实现,配置完成后会调用start。拉取消息接下来,我们来看一个与我们最相关的问题,即我们平时消费的消息是如何从Broker发送到Consumer的。在Rebalance开始附近,Consumer也启动了一个定期拉取消息的线程。这个线程是做什么的?它会不断从内存中维护的Queue中获取写入时构造的PullRequest对象,调用具体实现不断拉取消息。这里无论是否启用AutoCommit来处理消费结果,所做的处理都没有太大区别。大家都知道,唯一的区别就是是否自动提交Offset。处理成功的逻辑也是类似的,我们平时的业务逻辑中可能并不关心消费成功的消息。我们比较关心的是如果消费失败RocketMQ是怎么处理的呢?这是AutoCommit下消费失败的处理逻辑。失败的TPS会被记录下来,然后这里有一个很关键的逻辑,就是checkReconsumeTimes。如果当前消息的重试次数大于最大消费重试次数,则将消费发送回Broker。如何定义最大重试次数?如果值为-1,那么最大数为MAX_VALUE,即2147483647。这里有点奇怪。按照我们平时的认知,不就是重试16次吗?然后看到一个很搞笑的评论。-1表示16次,这段代码确实有点难以描述。然后,如果超过最大次数,消息将被调用到Prodcuer的默认实现,并发送到死信队列。当然,死信队列并不是一个特殊的存在,它只是一个单独的话题。通过getRetryTopic获取,默认为当前ConsumerGroup名称添加前缀。
