以前的情况:本文不打算将所有人带到RocketMQ源代码,以分析其对负载平衡原理的实现,重点是触发负载平衡。
大概您已经了解了RocketMQ的设计理念。同时,只能由消费者保留相同的队列,但是同一消费者可以同时消费多个队列。为了有效地订阅和消费模型,Rocket始终希望分配队列分配是统一的。在日常使用中,消费者的向上和下线,队列的动态扩展能力可能会破坏分配平衡,因此Rocket为上述情况提供了完整的重新平衡机制。
Essencethe以下是开始()的主要代码:
看到众所周知的,与消费者负载平衡相关的所有操作都委托给rebalanceimpl对象。east消费者对象均具有rebalanceimpl实例,每个rebalanceimpl实例只能为消费者提供服务。两者之间的关系是互相抱住和循环参考的关系.LET查看此对象的关键成员属性:
其中,一个processqueue对象特别明显,因为我没有发表任何评论。如果您已经阅读了火箭源代码,则应该知道他是消费者消费消息过程中极为重要的一部分。可以认为TA是客户端的消息承载者。因为时间与负载时机无关,所以我不会在这里详细介绍。(作者强调它与负载时机无关,并且它是与负载平衡有直接关系)
负责平衡负载的是dorebalance()。实际上,真正的负载逻辑是rebalanceByTopic();
RebalanceByTopic()是负载平衡的最后一点,即系统中需要负载的所有调用最终都会到达这里。在这里,我仅分析集群消耗模型中实现实现的关键代码:
看似轻巧和现实,两个关键的词:
这两个句子的意义在于,所有消费者在消费类方面获得的所有消费者和消费者ID的顺序都是一致的。在保证分配视图的一致性的前提下,分配算法是相同的。通过这种方式,尽管每个消费者在负载平衡期间都不交换任何信息,但它不能相互干扰以使队列平衡。如何在ClasoCateMessageSuperategy中分配特定细节,RocketMQ也默认来支持各种分发算法,,,,,分发分发算法,,,,分配算法,更简单,我不想重复它们。在负载平衡实现之后,谁会称呼ta,如何打电话给ta以及何时呼叫ta时,问题在我们心中徘徊。在分析上述问题之前,我们不能绕过Rebalanceservice对象。
RocketMQ中有一类物体,该物体已超过和小牛。一个物体主导着一个字段。TA通常仅受操作系统的限制。操作系统似乎也更喜欢它们,因为它们会分配时间胶片并直接派遣其操作(实际上,原因很简单,稍后会有答案)。TA是传奇服务。河流和湖泊中的人们通常将TAS称为服务线程。
ServiceThread家族很繁荣。除了专门从事负载平衡的重新平衡外,还有一群兄弟姐妹:
每个都是Rocket稳定行动背后的英雄。以上三个实际上是与刷牙有关的服务线程。(涉及的面太宽,如果有人想看,我会尝试分析它)
在上述路面上,应充分理解Rebalanceservice。它可以单独使用一个线程以进行负载平衡。当然,TA不是无尽的负载处理。
了解上述重新平衡,您应该了解及时触发的逻辑,您只需要唤醒服务线程
每个Java服务单点只能启动RebalancesErvice Service实例,并且只会同时激活MQClientFactory实例
单个点中的所有消费者实例都将共享实例对象。
每次触发mqclientfactory.dorebalance()时,所有在JVM下持有负载平衡的消费者
当负载平衡显然更复杂时,经纪人发出了通知命令消费者更加复杂的,但与祖先密不可分。它不过是更多的RPC调用。它不过是网络传输。
消费者开始后,将立即发送心跳以告知经纪人。
根据requestCode.heart_beat,RPC的处理器是clientmanageProcessor clientmanageProcessor.heartBeat() - > complotermanager.registerconsumer()
摘录键代码:
致电DefaultConsumerSchangelistener.handle()查看如果是更改事件,请致电BROKER2CLIENT.NOTIFYCONSUMERIDSCHANGEDSCHANGED()
此RPC请求最终将交给clientremotingprocessor.notifyConsumerIdSchanged()
港口
switch(this.servicestate){
case create_just:
/ *检查配置 */
this.checkconfig();
/ *构造主题订阅信息-subscriptinata,并添加rebalanceimpl的订阅信息 */
this.copysubscription();
/ *初始化mqclientinstance */
this.mqclientfactory = mqclientmanager.getInstance()
.getorCreateMqClientInstance(this.defaultmqpushconsumer,this.rpchook);
/**
*丰富的rebalanceimpl对象属性,您是否注意到先前初始化的对象被拥挤?
*以前的rebalanceimpl对象确实
* rebalanceimpl是负载平衡的相关实现
*/
this.rebalanceimpl.setConsumerGroup(this.defaultmqpushconsumer.getConsumerGroup()););););););););););
this.rebalanceimpl.setMessAgeModel(this.defaultmqpushconsumer.getMessAgeModel();););););
this.rebalanceimpl.setallocatemequeuestrategy(this.defaultmqpushconsumer.getalocatemeuestrategy());
this.rebalanceimpl.setmqclientFactory(this.mqclientFactory);
/**
*用MQClientInstance注册消费者并启动MQClientInstance
* JVM中的所有消费者和生产商都拥有相同的MQClientInstance,MQClientInstance只能启动一次
*/
boolean registerok = mqclientfactory.registerconsumer(this.defaultmqpushconsumer.getConsumerGroup(),this);
休息;
案件
...;
默认:
休息;
}
/ *消费者成功开始,并立即向所有经纪人发送心跳 */
this.mqclientfactory.sendheartbeattoallbrokerwithlock();
/*
*请注意,消费者将立即触发负载平衡
*但这不是负载平衡的实现。这实际上正在唤醒相关的服务线程
*作者将重点关注以下简介
*/
this.mqclientfactory.rebalanceimdiaiapatery();
} 4作者:Yan Ruyu