当前位置: 首页 > 后端技术 > Java

RocketMQ学习九Broker初探

时间:2023-04-01 19:20:52 Java

1、Broker缓存数据Broker主要缓存路由信息,包括producer表、consumer表、consumerGroup表和topic表。这些信息在ProducerManager、ConsumerManager、SubscriptionGroupManager和TopicConfigManager类别中进行管理。ProducerManager//生产者列表HashMap>groupChannelTablegroupChannelTable:每个ProducerGroup中存活的Producer连接有哪些;每个连接的Producer最后一次发送心跳的时间ConsumerManager//Consumer列表ConcurrentMapconsumerTableconsumerTable:每个ConsumerGroup中有哪些consumer连接存活,订阅了哪些topic,过滤条件是什么(TAG)用于每个订阅的主题。SubscriptionGroupManager//ConsumerGroup表ConcurrentMapsubscriptionGroupTablesubscriptionGroupTable:各ConsumerGroup的消费行为特征,例如:消费失败后的最大重试次数;重试队列的数量;如果从MasterBroker消费慢,切换到哪个SlaveBroker消费TopicConfigManager//topiclistConcurrentMaptopicConfigTabletopicConfigTable:分布在当前Broker上的各个topic分片的配置信息,如:read个数/包括写队列;是否有读/写权限2.Broker启动设计和创建BrokerController在BrokerStartup#createBrokerController方法中创建BrokerController类。首先参考分析,创建BrokerController类,然后调用它的initialize方法。里面的逻辑主要包括:1)加载topic,consumer消费进度,订阅关系和consumer过滤配置,加载消息的log文件2)然后创建netty服务监听10909VIP端口3)初始化一个series的线程池,然后在registerProcessor方法中将这些线程池与处理器关联起来,以后针对不同的业务使用不同的线程池,也就是线程隔离4)启动一些定时任务,比如记录Broker状态,消费的持久化progress等5)最后进行权限验证初始化和Rpc调用hook相关服务。这些服务以JavaSPI的方式加载。StartBrokerpublicvoidstart()throwsException{//启动消息存储相关任务if(this.messageStore!=null){this.messageStore.start();}//启动代理服务器if(this.remotingServer!=null){this.remotingServer.start();}//启动消息发送者使用的netty服务if(this.fastRemotingServer!=null){this.fastRemotingServer.start();}//启动SSL连接文件监听服务if(this.fileWatchService!=null){this.fileWatchService.start();}//启动外部API客户端if(this.brokerOuterAPI!=null){this.brokerOuterAPI.start();}//启动pull模式相关服务if(this.pullRequestHoldService!=null){this.pullRequestHoldService.start();}//启动心跳检测服务if(this.clientHousekeepingService!=null){this.clientHousekeepingService.start();}//启动消息过滤服务if(this.filterServerManager!=null){this.filterServerManager.start();}//如果DLegerCommitLog没有启动,将Broker注册到NameServer如果(!messageStoreConfig.isEnableDLegerCommitLog()){startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());}/*向namesrv注册*/this.registerBrokerAll(true,false,true);this.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){try{BrokerController.this.registerBrokerAll(true,false,brokerConfig.isForceRegister());}catch(Throwablee){log.error("registerBrokerAllException",e);}}},1000*10,Math.max(10000,Math.min(brokerConfig.getRegisterNameServerPeriod(),60000)),TimeUnit.MILLISECONDS);if(this.brokerStatsManager!=null){this.brokerStatsManager.start();}if(this.brokerFastFailure!=null){this.brokerFastFailure.start();}}messageStore服务:处理消息的存储相关的日志,比如CommitLog、ConsumeQueue等remotingServer服务:处理clientproducer&consumer请求fastRemotingServer服务:默认端口可能被多用途使用,可能造成业务拥堵,新开VIP端口进行消息处理。不过4.5版本以后,默认是关闭的,这是针对之前版本的makeup。fileWatchService服务:启动监听服务连接时使用的SSL连接文件服务brokerOuterAPI服务:RocketMQ控制台与客户端交互时的pullRequestHoldService服务Broker:处理推送方式消费或延迟消费的服务clientHousekeepingService服务:心跳连接filterServerManager的服务service:过滤消息服务transactionalMessageCheckService服务:定时检查处理事务消息服务slaveSynchronize服务:主从路由信息同步服务netty服务器启动这里可以参考第三大点和4小点的服务器创建前面的文章参考文章:Broker部分Broker启动过程BrokerStartup(2)