1你了解Kafka的超高并发网络架构是如何设计的吗?我们知道Kafka的网络通信架构采用了JavaNIO和Reactor设计模式。我们整体来看一下完整的网络通信层架构,如下图所示:1)从上图中可以看出,Kafka网络通信架构中使用的组件主要由两部分组成:SocketServer和RequestHandlerPool。2)SocketServer组件是Kafka超高并发网络通信层最重要的子模块。它包含Acceptor线程、Processor线程和RequestChannels等对象,它们都是网络通信的重要组成部分。3)RequestHandlerPool组件就是我们常说的I/O工作线程池,它定义了若干个I/O线程,主要用于执行真正的请求处理逻辑。01Accept线程在经典的Reactor设计模式中具有“Dispatcher”的作用,主要用于接收外部请求,分发给下面的实际处理线程。在Kafka网络架构设计中,这个Dispatcher就是“Acceptor线程”,用来接收和创建外部TCP连接。Broker端的每个SocketServer实例只会创建一个Acceptor线程。它的主要功能是创建连接,将接收到的Request传递给下游的Processor线程进行处理。1)我们可以看到Acceptor线程主要是利用JavaNIO的Selector和SocketChannel来循环轮询就绪的I/O事件。2)将ServerSocketChannel通道注册到nioSelector,注意网络连接创建事件:SelectionKey.OP_ACCEPT。3)事件注册后,一旦后续收到连接请求,Acceptor线程会指定一个Processor线程,将请求交给它,并创建网络连接进行后续处理。02Processor线程Acceptor只处理请求入口连接,那么,实际的网络连接的创建和网络请求的分发都是由Processor线程完成的。而每个Processor线程在创建的时候都会创建3个队列。1)newConnections队列:主要用于保存新创建的连接信息,即SocketChannel对象。目前硬编码的队列长度为20,每当Processor线程收到新的连接请求时,都会将对应的SocketChannel对象放入队列中,稍后创建连接时,会从队列中获取SocketChannel,并且然后注册新连接。2)inflightResponse队列:是一个临时的Response队列。Processor线程返回Repsonse给Client后,必须将Response放入队列中。它存在的意义:由于有些Response回调逻辑只有在Response返回给Request发送方后才能执行,所以需要暂时存放在一个临时队列中。3)ResponseQueue队列:主要存放所有需要返回给Request发送者的Response对象。每个处理器线程维护自己的响应队列。03RequestHandlerPool线程池Acceptor线程和Processor线程只是请求和响应的“搬运工”,而“真正处理Kafka请求”的是KafkaRequestHandlerPool线程池。在上面的网络超高并发通信架构图中,有两个参数与整个流程相关,分别是“num.network.threads”和“num.io.threads”。其中,num.io.threads是I/O工作线程池的大小配置。下面结合Kafka的超高并发网络架构图来解释下一个完整的请求处理核心流程:1)Clients向Acceptor线程发送请求。2)Acceptor线程会创建一个NIOSelector对象,创建一个ServerSocketChannel通道实例,然后将Channel和OP_ACCEPT事件绑定到Selector多路复用器上。3)Acceptor线程默认创建3个Processor线程参数:num.network.threads,将请求对象SocketChannel轮询到连接队列中。4)此时连接队列有源源不断的请求数据,然后马不停蹄的执行NIOPoll,获取相应SocketChannel上准备好的I/O事件。5)Processor线程向SocketChannel注册OP_READ/OP_WRITE事件,这样客户端发送的请求就会被SocketChannel对象获取到,具体是processCompleteReceives方法。6)此时客户端可以不断的发送请求,服务端不断的通过SelectorNIOPoll获取就绪的I/O事件。7)然后根据在Channel中获取到的完成的Receive对象,构造一个Request对象,存放到RequestChannel的RequestQueue请求队列中。8)这时候就到了I/O线程池发挥作用的时候了。KafkaRequestHandler线程循环从请求队列RequestQueue中获取Request实例,然后交给KafkaApis的handle方法执行真正的请求处理逻辑,最后将数据存入磁盘。9)请求处理完成后,KafkaRequestHandler线程会将Response对象放入Processor线程的Response队列中。10)然后Processor线程不断通过Request中的ProcessorID从Response队列中定位并取出Response对象,返回给Request发送方。2你了解Kafka的高吞吐日志存储架构是如何设计的吗?对于Kafka来说,主要是用来处理海量的数据流。该场景的特点主要包括:1)写操作:写并发要求非常高,基本达到百万级TPS,只需顺序添加和写入日志,无需考虑更新操作。2)读操作:相对于写操作,比较简单,只要能按照一定规则高效查询即可,支持(偏移量或时间戳)读。根据以上两点分析,对于写操作,直接采用“日志顺序追加”的方式,可以满足Kafka百万TPS写效率的要求。如何高效的查询这些日志呢?我们可以想象,将消息的Offset设计成一个有序的字段,使得消息有序的存储在日志文件中,不需要引入额外的哈希表结构,可以直接将消息分成几个块。对于每一个区块,我们只需要索引当前区块的第一条消息的Offset即可。这有点像二分查找算法吗?即先根据Offset的大小找到对应的block,然后从block开始依次查找。如下图所示:这样就可以快速定位到要搜索的消息所在的位置。在Kafka中,我们称这种索引结构为“稀疏哈希索引”。以上是Kafka最终的存储实现方案,基于顺序追加日志+稀疏哈希索引。接下来我们看一下Kafka的日志存储结构:从上图可以看出,Kafka是按照“topic+partition+copy+segment+index”的结构来存储日志的。了解了整体的日志存储架构之后,我们再来看看Kafka的日志格式。Kafka日志格式也经历了多个版本迭代。这里我们主要看一下V2版本的日志格式:从上图可以看出,V2版本的日志格式主要是通过变长来提高消息格式的空间利用率,将一些字段提取到消息批处理(RecordBatch)。同时消息批处理可以存储多条消息,这样在批量发送消息的时候,可以有很大的提升。节省了磁盘空间。接下来我们看一下日志消息写入磁盘的整体流程,如下图所示:3Kafka在线集群部署方案是怎么做的?这里我们从架构师必备的能力出发,以电商平台为例,介绍如何做Kafka生产级能力评估方案?如何得到公司领导和运维部门的认可,你的方案被认可。详情可深入阅读:八步带你深入剖析Kafka生产级能力评估方案4如何监控Kafka在线系统?Kafka作为大型系统架构的重要组成部分,起着举足轻重的作用,因此Kafka集群Kafka集群的稳定性尤为重要。我们需要全方位监控生产Kafka集群。一般可以从以下五个维度对线上系统进行监控:01主机节点监控所谓主机节点监控,就是监控Kafka集群Broker所在节点机器的性能。主机节点监控对于Kafka来说是最重要的,因为很多线上环境问题首先是主机的一些性能问题导致的。因此,对于Kafka来说,主机监控通常是发现问题的第一步。主要性能指标如下:“机器负载(Load)”、“CPU使用率”、“内存使用率”、“磁盘I/O使用率”、“网络I/O利用率”、“TCP连接数”、“打开文件数”和“索引节点使用情况”。如果你想更好的监控宿主机的性能,有以下两个教程可以学习参考:02JVM监控另外一个重要的监控维度是JVM监控。监控JVM进程主要是为了让大家全面了解KafkaBroker进程。监控JVM进程需要关注三个指标:“监控FullGC的频率和持续时间”、“监控堆上活跃对象的大小”、“监控应用线程总数”03另一个重要的Kafka集群监控的监控维度是KafkaBroker集群,监控各种客户端主要有3种方式:1)查看Broker端的重要日志:主要包括Broker端的server日志server.log,controller日志controller.log和主题分区状态更改日志state-change.log。其中server.log最为重要。如果你的Kafka集群出现故障,你应该尽快查看server.log,定位故障原因。2)查看Broker端关键线程的运行状态,例如:LogCompaction线程:logcompactionandcleanup。一旦挂掉,所有的compaction操作都会中断,但用户通常不会意识到这一点。副本拉消息线程:主要执行从Follower副本拉消息到Leader副本的逻辑。如果他们挂了,系统会显示Follower副本延迟,Leader副本越来越大。3)查看Broker端JMX关键性能指标:主要有BytesIn/BytesOut、NetworkProcessorAvgIdlePercent、RequestHandlerAvgIdlePercent、UnderReplicatedPartitions、ISRShrink/ISREpand、ActiveControllerCount。04Kafka客户端监控客户端监控主要是对生产者和消费者的监控。生产者向Kafka发送消息。这时候我们就需要了解client机器和Broker机器之间的往返时延RTT。对于跨数据中心或者远程集群,RTT会比较大,很难支撑大的TPS。生产者视角:request-latency是一个需要关注的JMX指标,即消息生产请求的延迟;另外,Sender线程的运行状态也很重要。如果Sender线程挂了,用户是察觉不到的,表象只出现在Producer端。消息发送失败。Consumer视角:对于ConsumerGroup,需要关注joinrate和syncrate指标,这些指标表示rebalance的频率。此外,还包括消息消费偏移量、消息累积数量等。05Brokers之间的监控最后一个监控维度是Brokers之间的监控,主要是指replicapull的性能。Follower副本实时拉取Leader副本的数据。这个时候我们希望拉取的过程越快越好。Kafka提供了一个特别重要的JMX指标,叫做“underreplicatedpartitions”,意思就是比如我们规定这条消息应该保存在两个Brokers上。假设只有一个Broker保存消息,那么消息所在的partition在replicatedpartitions下被调用。这种情况特别值得关注,因为它可能会导致数据丢失。另一个重要指标是“activecontrollercount”。在整个Kafka集群中,要保证只有一台机器的index为1,其他的都应该为0,如果发现有一台机器的index大于1,那肯定是脑裂了。这时候应该检查是否存在网络分区。Kafka本身无法对抗裂脑,完全依赖于Zookeeper。但是,如果确实发生了网络分区,就没有办法处理了。它应该很快失败并重新启动。5如何调优Kafka在线系统?对于Kafka来说,“吞吐量”和“延迟”是非常重要的优化指标。ThroughputTPS:指Broker或Client每秒可以处理的消息数,越大越好。Delay:表示从Producer端发送消息到Broker端完成持久化,再到Consumer端消费成功的时间间隔。与吞吐量TPS相反,延迟越低越好。简而言之,高吞吐量和低延迟是我们调优Kafka集群的主要目标。01提高吞吐量,首先要提高吞吐量参数和措施:Brokernum.replica.fetchers:表示Follower副本使用多少个线程来获取消息,默认为1个线程。如果Broker端的CPU资源充足,适当增加这个值“但不要超过CPU核数”,以加快Follower副本的同步。这是因为在生产环境中,配置acks=all的Producer端影响吞吐量的首要因素是副本同步性能。适当增加该值后,通常可以看到Producer的吞吐量有所增加。replica.lag.time.max.ms:在ISR中,如果Follower长时间不向Leader发送通信请求或同步数据,则Follower将被踢出ISR,默认值为30s。num.network.threads:单个Acceptor创建一个Processor处理器的线程数,默认值为3,可适当增加到9。num.io.threads:服务器用来处理请求的线程数,可能包括磁盘I/O。默认值为8,可以适当增加到32。调优参数避免频繁FullGCProducerbatch.size:表示消息批大小,默认为16kb。batch设置太小会导致网络请求频繁,吞吐量下降;如果批量设置过大,会导致消息在发送前等待很长时间,增加网络延迟。因此,适当增加会增加吞吐量。建议从默认的16kb增加到512kb或1M。buffer.memory:RecordAccumulator发送的消息的缓冲区总大小。默认值为32M,可以增加到64M。linger.ms:表示批量缓存时间。如果数据还没有达到batch.size,sender会等待linger.ms后发送数据。单位为毫秒,默认值为0,表示必须立即发送消息。如果设置得太短,会导致网络请求频繁,吞吐量下降;如果设置的太长,会导致消息在发送前等待很长时间,增加网络延迟。因此,适当增加会提高吞吐量,建议为10~100毫秒。compression.type:默认为none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩后可以减少数据量,提高吞吐量,但是会增加producer端的CPU开销。支持的压缩类型:none、gzip、snappy、lz4和zstd。设置acks=0/1,retries=0,优化目标是吞吐量,不设置acks=all“延长副本同步时间”,启用retry“延长执行时间”。Consumer使用多线程提高整体吞吐量fetch.min.bytes:表示只要Broker端积累了多少数据,就可以返回给Consumer端。默认为1字节,可以适当增加到1kb或更大。fetch.max.bytes:消费者可以从服务器的一批消息中获取的最大字节数。batch的大小受message.max.bytes【broker配置】或max.message.bytes【topic配置】影响,默认为50M。max.poll.records:表示一次轮询拉取数据返回的最大消息数,默认为500。Partitions添加分区以增加吞吐量02减少延迟减少延迟的目的是最小化端到端延迟。相对于上述提高吞吐量的参数,我们只能调整Producer端和Consumer端的参数配置。对于Producer端,这时候我们希望把消息快速发送出去,必须设置linger.ms=0,同时关闭压缩,同时设置acks=1,减少副本同步时间。对于Consumer端,我们只保留fetch.min.bytes=1,即只要Broker端有数据可以返回,就会立即返回给Consumer,减少延迟。03合理设置分区数分区数不多也不少。需要搭建集群,进行压测,然后灵活调整分区数量。这里可以使用Kafka官方脚本对Kafka进行压测。1)生产者压测:kafka-producer-perf-test.sh2)消费者压测:kafka-consumer-perf-test.sh
