1Kafkabroker启动(一)Kafkabroker,代表一个节点,包含多个分区,一个分区可能是leader也可能是follower。入口在核心包的Kafka类中,依次启动KafkaServerStartable和KafkaServer,然后启动几个组件。每个组件几乎都是一个封装好的线程启动,而且是统一风格的启动。(2)LogManager:磁盘日志文件操作组件。主要负责在加载时检查日志目录和加载日志文件。它还具有几个功能:a。删除旧的日志段,当它们超过时间阈值或大小时删除它们,b。物理刷机功能,默认操作系统控制闪盘,也提供配置定时刷盘。cCheckpointrecoveryCheckpointcheckpointcheckpoint记录最后一个闪存盘的偏移量,异常关机后可以恢复d分区目录删除当broker收到stopReplica请求时,删除分区和对应的段(3)SocketServer网络组件,它实现react模式,负责发送和接收请求。(4)ReplicaManager复制组件负责复制管理,包括复制写入数据、复制拉取数据等。(5)KafkaController负责配合zk、选举leader和isr、处理broker变更、分区分配处理等。(6)GroupCoordinator负责消费者组管理,包括消费者加入、心跳、离开等。(7)kafkaRequestHandlerPool是一个线程池,KafkaApis是一个用策略模式封装的工具类,可以处理各种类型的请求。2kafkabroker发送和接收请求的过程(1)发送和接收请求,核心是socketServer组件,它和kafkaserver启动一起启动,核心是创建一个Acceptor对象和Processor数组,从中可以看出这个名字,这一定是反应堆网络模型。(2)接受器是包含多个处理器(默认为三个)的线程。acceptor实际上是在nioSelector上注册accept事件,不断训练新的accept事件,等待建立连接。连接设置包括:非阻塞连接,禁用tcpdelay(减少延迟),keepalive为true保持连接,然后新的accept出来channel丢给某个处理器(多个处理器取模给进去)转动)。(3)Processor看上图,每个processor都有一个selector和一个connectionqueue。从Acceptor创建的新连接会被放入某个进程的连接队列中,然后处理器的选择器会不断地为自己的连接队列注册和处理事件。接收到的请求和处理后的返回会放在requestChannel列表中。核心是线程的run方法如下:其中:一个configureNewConnections(),这里是获取一个新的连接并注册读取事件。bprocessNewResponses,从名字就可以看出,就是在处理完请求后,再处理响应。这里有点跳转,因为下面是处理请求的过程,这里是处理返回,从requestChannel列表中获取response,通过sendResponse()将response发回给请求者。发送的过程是先获取到具体的channel,然后通过selector的send,将responshe放到kafkaChannel的发送对象中,同时注册op_write事件,接着就是网络发送函数networkClient,与生产者客户端相同。cpoll方法,kafkaselector的poll方法,与producer的发送原则是一致的。通过pollSelectionKeys处理有事件到达的连接,包括connect事件、read事件、write事件。Selector本身封装的一些队列是用来存放中间结果的,比如compeledReceives表示接受完成请求,completedSends表示完成请求,stageReceives是一个临时队列等,在producer使用的时候分析过。dprocessCompleteReceives从selector循环上面的compeledReceives,其中存放了read事件读请求,然后封装成一个request放入SocketServer的requestChannel中。这个requestChannel是一个队列,用于其他线程准备请求eprocessCompletedSends,处理完发送的response后,channel重新关注read事件。(4)上图右侧,kafkaServer在启动时,会启动requestHandlerPool,其中包含了KafkaApis工具类,可以操作各种类型的请求。这个池是一个线程池。请求从上面的requestChannel获取,由KafkaApis处理。这是一个策略模式下的句柄处理器,指向不同的方法。以produce类型为例,调用replicaManager.appendMessages处理producer发送的消息后,使用callback方法将response放回requestChannel。核心方法:requestChannel.sendResponse(newRequestChannel.Response(request,newResponseSend(request.connectionId,respHeader,respBody)))之后,网络组件就可以发送响应了。整个过程就完成了。
