本篇博客将从源码层面验证RocketMQ的基本概念分析,分析Producer底层源码中提到的结论,分别是:Broker在启动时,将自己注册到所有名称服务器。Broker启动后,每隔30S会向NameServer发送一次心跳。在上一篇文章中,我们了解了RocketMQ中的一些核心概念,如Broker、NameServer、Topic和Tag等,Producer从启动到发送消息的整个过程,从源码层面分析了Producer在发送时是如何获取Broker数据的向Broker发送消息,如何从多个MessageQueue中选择对应的Queue发送消息。但由于文章篇幅,文章开头提到的两个已知结论在之前的博客中并没有得到验证。这次,我们将从源码层面进行验证。一开始看到Broker主从架构相关的源码在之前的博客中有提到。Broker为了保证自身的高可用,会采用主从架构。即使MasterBroker因为意外原因挂掉了,SlaveBroker上仍然有完整的数据,Broker可以继续提供服务。isEnableDLegerCommitLog中提到的DLeger可以不用管,我们只需要知道默认返回的结果是false即可。所以Broker在第一次启动的时候,会执行If包裹的逻辑。RocketMQ本身具有主从架构,但功能并不完善。如果MasterBroker出现故障,需要手动将SlaveBroker切换为Master。有点类似于手动将一个Redis设置为另一个Redis的Slave节点。如果此时Redis的Master挂掉了,需要手动切换。为了解决这个问题,Redis开发了Sentinel,它可以在发生故障时自动实现故障转移。所以RocketMQ在4.5版本之后推出的Dleger几乎是一回事。此外,Dleger还可以实现多副本。在不使用Dleger的情况下,先总结一下如何同步主从数据。在RocketMQ的主从架构下,主从同步采用从主动拉取的方式。如果当前注册的Broker角色是Slave,它会使用ScheduledExecutorService启动一个周期性的定时任务,每10秒同步一次到Master。同步的数据包括Topic的相关配置、Consumer的消费offset、延迟消息的Offset、订阅组的相关数据和配置。下面简单介绍一下ScheduledExecutorService的作用和原理。Broker注册是在第一次启动时强制注册的,因为是第一次启动,所以参数forceRegister直接设置为true。通过入口使用ScheduledExecutorService启动一个定时任务后,Broker会启动一个定时任务并周期性地注册。ScheduledExecutorService底层是一个newSingleThreadScheduledExecutor,一个只有一个线程的线程池,它的关键参数corePoolSize值为1,然后按照指定的频率周期性的执行一个任务。ScheduledExecutorService有两个主要功能,即:ScheduledExecutorService以固定频率执行任务。ScheduledExecutorService执行完成后,会在指定的时间间隔后执行下一个任务。使用scheduleAtFixedRate实现心跳机制。这里我们使用scheduleAtFixedRate,如下图所示。至于执行的频率,我们可以配置的最大范围不能超过一分钟,也就是说这个范围在10-60秒之间,默认是30秒执行一次,这也验证了每30秒,Broker会向NameServer发送心跳。获取执行频率的判断有点意思,甚至显得有些简洁,但具体可配置的时间范围可能需要花些时间去理解。在实际业务代码中,个人建议不要这样写。我觉得需要把业务中代码的可读性和可维护性放在首位。值得注意的是,这里心跳的开始给出了10秒的延迟,因为在之前的逻辑中已经进行了一次注册,没有使用Dleger。如果没有延迟,几乎同时会有两次注册操作,这显然不符合预期;同时forceRegister通过函数isForceRegister由true变为acquisition。调用registerBrokerAll注册定时任务。注册完成后,后续每次触发都会执行registerBrokerAll方法进行注册。您可能有疑问。我现在不是Broker吗?为什么名称有后缀All?那是因为有很多NameServers。首先,Broker在启动时会向所有的NameServers注册自己。当然,文字是没有根据的,我们继续往下看。继续往里走,如果满足当前的注册条件,就会进行实际的注册操作。具体满足什么条件?它由变量forceRegister和needRegister方法确定。forceRegister默认为true,所以先执行这个逻辑的时候,肯定会进行注册操作。如果大家有兴趣通过数据版本的对比来判断当前Broker是否需要注册,可以继续关注文章了解needRegister是根据什么来判断是否需要注册的。首先,一旦Broker注册到NameServer上,由于Producer不断写入数据,Consumer也在不断消费数据,Broker也可能因故障导致MessageQueue等关键路由信息发生变化。NameServer中的数据和Broker中的实际数据会不一致。如果不及时更新,Producer拉取的路由数据可能是错误的。所以每次触发定时任务,都会对比NameServer和Broker的数据。如果发现数据版本不一致,Broker会重新注册并更新最新的数据到NameServer。说白了,就是做一个定时的数据更新。下图红框中的代码是数据对比的核心代码。当Broker和所有NameServer节点都一一完成数据比对后,才会进行结果判断。如果有节点数据不一致,需要重新注册并更新最新的数据到NameServer。核心判断逻辑也用红框标出。至此,我们实际上完成了Broker启动时会向所有NameServer注册的验证。不过由于后续还有值得关注的地方,我们继续阅读后续源码。使用CountDownLatch获取所有注册的异步任务的返回结果。此外,还值得注意的是,在needRegister中,对于与多个NameServer的交互,RocketMQ是通过线程池异步实现的,使用CountDownLatch等待所有的请求。完成后,将结果返回给主线程。既然说了CountDownLatch,那我就顺便提一下。假设我们有5个互不依赖的计算任务。如果我们快速计算结果并返回呢?当然,并发执行5个任务,需要开新线程来实现,不能一起返回结果。而CountDownLatch可以让主线程等待,等待这5个计算任务完成,唤醒主线程继续后面的逻辑。这就是CountDownLatch的作用。如果只是一个简单的CRUD函数,你可能连CountDownLatch是什么都不知道。这也是为什么大厂的面试会问这些问题,因为在大厂复杂的业务背景下,你必须会用。指定需要注册之后,接下来就是核心注册方法,核心逻辑由registerBrokerAll实现。Broker也会在每个NameServer节点上注册自己,为了前期执行的效率,它也采用异步的方式开启线程。在得到所有结果的时候,还要用到CountDownLatch。使用CopyOnWriteArrayList存储注册请求的返回另外,用于保存注册结果的list使用的是CopyOnWriteArrayList,面试过虐过的同学应该不陌生。我们知道这里启用了多线程来注册不同的NameServer。在写入注册结果时,多线程写入同一个列表会造成线程安全问题。而且我们知道ArrayList不是线程安全的,这也是为什么这里要用CopyOnWriteArrayList来保存注册结果。为什么CopyOnWriteArrayList可以保证线程安全?这是由于COW(写时复制),它在读取请求时共享相同的列表。写请求的时候会复制一个List,写数据的时候加排他锁。与直接对所有操作加锁相比,读写锁的形式将读写请求分开,互不影响,只对写请求加锁,减少了加锁的消耗,提高了整体操作的并发度。上面并发执行的注册操作到底做了什么?我们先看代码。以上就是单次注册的全部逻辑。可以看到request构造完成后,有一个oneway判断。oneway的值为false,表示单向通信。Broker不关心NameServer的返回,也不会触发任何回调函数。接下来,Broker会将请求体中写入的所有数据发送给NameServer。请求数据统一由一个名为TopicConfigSerializeWrapper的Wrapper进行包装。可以看成两部分:Broker节点上所有topic的数据版本,有了这些数据,Broker会同步调用invokeSync向NameServe发送请求,执行后触发回调函数实现特定的功能。EOF至此,我们就完成了对开篇提到的结论的验证。同时,我们也发现了RocketMQ的主从架构,Master和Slave同步数据的方式,心跳机制的实现等等,从源码上基本把Broker的启动全部看完了。过程。看这些老哥们写的源码还是挺有意思的,有空再看看NameServer端相关的源码。
