本文转载自微信公众号《花在聊天技术》,作者王江华。转载本文请联系安迪闲聊科技公众号。1Kafka三层架构概述由于最近事情多,工作忙,这篇文章几乎难以产出。经过几个周末的构思和整理,我终于见到你了。在上一篇文章中,我们讲述了Kafka的基本介绍和工作流程、存储机制、副本等知识,本文将揭开Kafka高可用、高性能、高并发架构设计的秘密。Kafka一直号称高吞吐、低延迟、高并发、高扩展性,应用场景越来越多。这时候对其稳定性的要求就更高了。接下来,我将一一为您详细介绍。2Kafka高可用设计Leader选举机制Kafka中的选举大致分为三类:controller选举,leader选举,consumer选举。在讲解Leader选举之前,先说说Kafka的controller,即Broker。除了一般的Broker功能外,它还具有选举分区领导节点的功能。当Kafka系统启动时,会选举其中一个Brokers作为controller,负责管理topic分区和replicas的状态,同时也执行再分发任务。controller的启动顺序如下:1)第一个启动的节点会在Zookeeper系统中创建一个临时节点/controller,并写入该节点的注册信息,使该节点成为controller。2)其他节点陆续启动时,也会尝试在Zookeeper系统中创建一个/controller节点,但是/controller节点已经存在,所以会抛出“创建/controller节点异常失败”的信息。创建失败的节点会根据返回的结果判断Kafka集群中已经成功创建了一个controller,从而放弃/controller节点的创建,从而保证了Kafka集群控制器的唯一性。3)其他节点也会在控制器上注册相应的监听器,每个监听器负责监听各自代理节点的状态变化。当检测到节点状态变化时,会触发相应的监控函数进行处理。说完controller的知识,我们来讲解一下Leader节点的选举过程。选举controller的核心思想是:各个节点公平竞争抢占Zookeeper系统中已创建/controller的临时节点,最先创建成功的节点将成为controller。并具有选举主题分区领导节点的功能。选举过程如下图所示:复制机制复制机制简单来说就是一种备份机制,即在分布式集群中保存相同的数据备份。那么副本机制的好处就是提供了数据冗余。副本机制是Kafka保证系统高可用和高持久性的重要基石。为了保证高可用,Kafka的partitions有多个副本。如果其中一个副本丢失,还可以从其他副本中获取分区数据(要求对应副本的数据必须完整)。这是Kafka数据一致性的基础。下面将详细介绍Kafka的复制机制。Kafka使用Zookeeper来维护集群Brokers的信息,每个Broker都有一个唯一的标识符broker.id,用于在集群中标识自己。Brokers会通过Zookeeper选举出一个名为ControllerBroker的节点。除了其他Broker的功能外,它还负责管理主题分区及其副本的状态。在Kafka中,Topic被划分为多个分区(Partition),分区是Kafka最基本的存储单元。创建主题时,可以使用replication-factor参数指定分区的副本数。一个partition副本总是有一个leader副本,所有的消息都直接发送给leader副本,其他副本需要复制leader中的数据来保证数据的一致性。当Leader副本不可用时,将选举其中一个Follower成为新的Leader。ISR机制理解ISR如上图所示,每个分区都有一个ISR(in-syncReplica)列表来维护所有已同步和可用的副本。Leader副本必须是同步副本,即ISR不仅仅是follower副本的集合,它包括Leader副本,例如。甚至在某些情况下,ISR只有一个Leader副本,而对于Follower副本,需要满足以下条件才算同步副本:1)必须定期向Zookeeper发送心跳;2)从Leader副本获得消息“lowlatency”。如果replica不满足上述条件,则将其从ISR列表中移除,直到满足条件后才会再次加入。因此可能存在Follower无法与Leader实时同步的风险。Kafka判断Follower是否与Leader同步的条件是Broker端参数replica.lag.time.max.ms的值。该参数的含义是Follower副本可以落后于Leader副本的最大时间间隔。当前默认值为10秒。也就是说,只要一个Follower副本不落后于Leader副本超过10秒,Kafka就认为两者是同步的,即使follower副本保留的消息少于leader副本。Kafka中对ISR的管理最终会反馈给Zookeeper节点。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。目前Zookeeper节点维护的地方有两个:1)Controller维护:Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和replica状态管理,同时还会执行重新分配Partitions之类的管理任务。当满足一定条件时,Controller下的LeaderSelector会选举出新的Leader,ISR、新的leader_epoch和controller_epoch会写入Zookeeper的相关节点。同时发起一个leaderAndIsrRequest通知所有的Replicas。2)Leader维护:Leader有一个单独的线程,周期性地检测ISR中的Follower是否脱离ISR,如果发现ISR发生变化,就会将新的ISR信息返回给Zookeeper的相关节点。ACK机制的acks参数是Kafka使用中非常核心和关键的参数,决定了很多东西。这个ack和copy机制、同步机制、ISR机制密切相关。如果不能理解这些,就不能完全理解acks参数的含义。首先在KafkaProducer中设置acks参数,也就是生产者客户端。那么也就是说,当你向kafka写入数据时,你可以设置acks参数。这个参数其实可以设置三个常用的值:0、1、all。acks=0如果acks设置为0,那么Producer不会等待Broker的反馈。该消息将立即添加到SocketBuffer中并被视为已发送。在这种情况下,无法保证服务器是否收到请求,参数Retries不会生效(因为客户端无法获取失败信息)。此时每条记录返回的Offset总是设置为-1。这种模式下Kafka的吞吐量最大,并发度最高,但是数据非常容易丢失。通常适用于一些记录应用日志,对数据要求不高的业务场景。acks=1如果acks设置为1,此时Leader节点会先将记录写入本地日志,并在所有Follower节点反馈之前确认成功。在这种情况下,如果Leader节点在收到记录后发生错误,而Follower节点尚未完成数据复制,则记录将丢失。这种模式和Mysql的主从异步复制是一样的。主从之间会有数据差异。这个配置是Kafka的默认配置。它平衡了数据安全性和性能。acks=all&min.insync.replicas>=2如果acks设置为all,Leader节点会等待所有同步LSR副本确认后,再确认记录是否已经发送。只要存在至少一个同步副本,记录就不会丢失。如果此时Leader刚收到消息,而Follower还没有收到消息,此时Leader宕机了,那么client就会感知到消息没有发送成功,会重试发送再次留言。其中,Broker有一个配置项min.insync.replicas(默认值为1),表示正常写入producer数据所需的最少ISR数。当ISR中的副本数小于min.insync.replicas时,Leader停止写入Enter生产者生产的消息,并向生产者抛出NotEnoughReplicas异常,阻塞等待更多的Follower追上来重新进入ISR,因此可以容忍min.insync.replicas-1个副本同时宕机。这种方式是以牺牲性能为代价的,适用于对数据要求比较高的业务场景。3kafka高性能设计Reactor多路复用模型说到Reactor(多路复用)就不得不提到Java中的NIO。接下来,我们来看看Java中的NIO。JavaNIO由以下核心部分组成:1)Channels;2)缓冲器;3)选择器;Channel和Java中的Stream一样,用于传输数据流,数据可以从Channel读取到Buffer,也可以从Buffer写入Channel,如下图所示:Selector允许单个线程处理多个Channel。要使用Selector,首先要向Selector注册Channel,然后调用它的select()方法。此方法会阻塞,直到已注册的Channel准备好事件为止。一旦这个方法返回,线程就可以处理这些事件。事件的例子有新建连接,数据接收等,下图是单线程使用一个Selector处理3个Channel:KafkaSocketServer是基于JavaNIO开发的,使用Reactor模型(已经被证明非常高效)大量实践,在Netty和Mina中广泛使用)。KafkaReactor模型包括三个角色:1)Acceptor;2)处理器;3)处理程序;KafkaReacator包括1个Acceptor负责接受客户端请求,N个Processor线程负责读写数据(即创建一个Processor用于单独处理,每个Processor指一个独立的Selector),M个Handlers用于处理业务逻辑。Acceptor和Processor、Processor和Handler之间有队列缓冲请求。下图是简化版的KafkaProduction消息流程Reactor模型架构图生产者发送到Kafka集群的详细流程如下图所示:1)首先,一个消息到来后,生产者源代码将消息封装到一个ProducerRecord对象中。2)封装成对象后,对象会被序列化【涉及网络传输】,会调用Serializer组件进行序列化,序列化后发送。3)发送前需要确定一件事,将这条消息发送到主题的哪个分区。这时候就需要通过Partitioner分区器从KafkaBroker集群中获取集群元数据。获取到元数据后,就可以发送了。4)0.8版本之前,此时有消息过来,会封装成request发送给Broker。在这种情况下,性能很差。0.8版本之后做了一个简单的改进,性能得到了指数级的提升,就是消息来了之后,不会马上发送出去,而是先写入一个缓存(RecordAccumulator)队列,封装成批次(记录批次)。5)这时候会有sender线程将多个batch封装成一个request(Request),然后发送,这样会减少很多request,提高吞吐量。这时候有个问题,消息来了不是马上发出去,而是打包成批,所以不会有延迟。batch.size默认为16K,满了会立即发送。如果没有满,也会在指定时间发送(linger.ms=500ms)6)发送时,每个Request对应多路复用器(Selector)中的每个kafka通道,然后将数据发送到Broker集群7)在packageBatch批处理和Request请求过程中,还涉及到一个重要的设计概念,那就是内存池方案,后面会在服务端内存池部分详细介绍。顺序写入磁盘+OSCache首先,Kafka为了保证磁盘写入性能,基于操作系统的pagecache来实现文件写入。操作系统本身有一层缓存,叫做pagecache,是内存中的缓存。我们也可以称之为oscache,意思是操作系统自己管理的缓存。那么在写磁盘文件的时候,可以直接先写入os缓存,也就是只写入内存,然后由操作系统决定什么时候真正把os缓存中的数据刷到磁盘,这样就大大提高写入效率和性能。如下图所示:另外,还有一个非常关键的操作,就是Kafka在写数据的时候,是顺序写入磁盘的,也就是数据追加到文件的末尾,而不是在一个文件中文件的随机位置。修改数据,对于普通的机械盘,如果是随机写入,会涉及到磁盘寻址的问题,导致性能极低,但是如果只是按顺序追加到文件末尾,这种磁盘顺序写入的性能基本上可以和写内存的性能差不多。零拷贝技术(zero-copy)完成了上面的写入过程。再来说说消费过程。从Kafka消费数据实际上就是在消费的时候从Kafka的磁盘文件中读取数据发送给下游。消费者。大体流程如下:1)首先检查要读取的数据是否在os缓存中,如果不在,则从磁盘文件中读取数据放入os缓存中。2)然后从os缓存中拷贝数据到应用进程的缓存中,在操作系统层面从应用进程的缓存中拷贝数据到socket缓存中,最后从socket缓存中读取数据并发送到网卡。最后由网卡发送给下游消费者。从上图可以看出,整个过程中有两个不必要的复制操作1)从操作系统的os缓存中复制数据到应用进程的缓存中。2)然后从应用程序缓存中复制到操作系统的socket缓存中。在这两次拷贝过程中,还发生了几次上下文切换,所以相对比较耗性能的Kafka在读取数据时引入了零拷贝技术来解决这个问题。即直接将操作系统os缓存中的数据发送给网卡再传输给下游消费者,跳过了中间复制数据的两个步骤,从而减少了复制的CPU开销,减少了context的数量切换用户态和内核态,优化数据传输性能,Socket缓存中只会复制一个描述符,不会有数据复制到Socket缓存中。如下图所示:常见的零拷贝思想主要有两种实现方式:1)直接I/O:数据直接跳过内核,在用户空间和I/O设备之间传输。在这种情况下,内核只是执行必要的辅助工作2)copy-on-write:写时复制,数据不需要提前复制,而是在需要修改的时候复制部分数据这里,Kafka主要使用mmap和sendfile实现零拷贝,对应java中的MappedByteBuffer和FileChannel.transferIO。javaNIO实现的零拷贝如下:transferTo()方法将数据从文件通道传输到给定的可写字节通道。在内部它依赖于底层操作系统对零拷贝的支持;在Linux系统上,这个调用被传递到sendfile()系统调用中,调用过程如下:复制。压缩有助于提高吞吐量、减少延迟并提高磁盘利用率。在Kafka中,压缩可能发生在两个地方:producer端和Broker端。一句话概括压缩和解压,就是Producer端压缩,Broker端维护,Consumer端解压。Kafka支持多种压缩算法:lz4、snappy、gzip,从Kafka2.1.0开始增加了ZStandard算法。该算法是Facebook开源的压缩算法,可以提供超高的压缩比。Producer、Broker和Consumer必须使用相同的压缩算法。Producer向Broker写入数据,Consumer从Broker读取数据时,不需要解压。只需要在最终的Consumer收到消息的时候解压即可,节省了大量的网络和磁盘开销。服务器端内存池设计在上一节中,我们讲解了一个消息生产的详细过程,其中涉及到批处理(Batch)和请求(Request)。在这个过程中,Kafka还有一个重要的设计理念,内存池方案。下面我详细描述一下内存池的实现过程。1)这里简化了流程。一条消息会先封装,然后序列化,最后计算分区号,将消息存入缓存。2)这个缓存也是设计的也就是batchqueue,那么这个batchqueue是用什么策略来存储的呢?一个partition对应一个queue,这里有一个重要的数据结构:Batches,这个数据结构是Key-value的形式,key是消息topic的partition,value是一个queue,里面存储thebatchessenttothecorrespondingpartitions3)那么假设此时我们有2个topic,每个topic有2个partition,那么是不是一共有4个partition,也就是4个queue,每个queue中都有batch。此时消息经过计算和分区后,会被写入队列的最新一批。4)Sender线程会检查批次(Batch)是否满了,或者时间是否到了。如果Sender线程满足了,就会取出包,作为Request发送出去。5)packagebatch会使用内存,Sender发送后会回收内存。如果你在Java中经常操作内存和回收,一定会遇到FullGC的头疼。工作线程的性能会降低,影响整个生产者的性能。Kafka的解决方案是内存池,与数据库的连接池使用同一个内存块。6)整个BufferPoll内存池的大小为32M,内存池分为两部分,一部分是内存队列,队列中有内存块(16K),另一部分是可用的内存,消息过来后,会从内存池申请内存块,申请完成后,分批打包写入数据,发送端线程发送响应,然后清空内存存回在内存池中重复使用,大大降低了GC的频率,保证了生产者的稳定性和效率,大大提高了4kafka的性能高并发设计高并发以上网络设计已经说明了高可用和高的方方面面Kafka生产者和服务器的性能通过大量空间。这里主要分析一下Kafka的超高并发网络架构设计。这种架构设计是Kafka中最为经典的。这里我们将Kafka的网络架构抽象为如上图所示的三层架构。整个请求流程的路径如下:1)当客户端发送请求时,Kafka服务器上会有一个Acceptor线程,这个线程绑定了OP_ACCEPT事件,用于监听传入的请求。下面有一个while死循环,会不断的监听Selector是否有发送请求。收到请求链接后,封装成socket通道,然后将socket通道发送到网络架构的第一层。2)第一层架构中有3个相同的Processor线程。这些线程中的每一个都有一个连接队列,用于存储套接字通道。存储规则是轮询存储。随着请求的不断增加,连接队列中就会出现一个连接队列。有很多套接字通道。这时,socket通道会在每个selector上注册OP_READ事件。参考上图中第一层的第三个Processor线程,即每个线程中有一个while循环,会遍历每个socket通道,监听事件。之后,就会收到客户端发送的请求。此时Processor线程会解析请求(发送的请求是二进制的,上面说了跨网传输需要序列化),解析封装成一个Request对象发送给网络二层架构如上图所示。3)第二层架构中会有两个队列,一个是RequestQueue(请求队列),一个是ResponseQueue(返回队列)。每个Request请求都会被存放在请求队列中,起到缓冲的作用。这个时候来到了网络的第三层架构中。4)第三层架构中有一个RequestHandler线程池,默认有8个RequestHandler线程。这8个线程启动后,会不断从第二层的RequestQueue队列中获取请求,解析请求体中的数据,并通过内置的工具类将数据写入磁盘5)写入成功后,它需要响应客户端。这时候会封装一个Response对象,返回的结果会存放到第二层的ResponseQueue队列中。此时默认有3个小队列。Response队列,其数量与第一层架构中的Processor线程一一对应。6)此时第一层Processor线程中的while循环会遍历Response请求。遍历完成后,会在selector上注册OP_WRITE事件。此时,响应请求将被发送回客户端。7)整个过程涉及到2个参数:num.network.threads=3和num.io.threads=8如果觉得默认参数的性能不够好,可以优化这2个参数,比如如num.network.threads=9,num.io.threads=32(与CPU数量一致),每个RequestHandler线程可处理2000QPS,2000*8=16000QPS,扩容后可支持64000QPS,扩容后,Kafka可以支持6万QPS。从上面的架构讲解可以看出,Kafka是可以支持高并发请求的。5小结至此,我已经将Kafka的三层架构的方方面面完整的揭开了。下一篇文章将讲解Kafka的生产级部署和容量。敬请期待规划知识...
