目前市面上的消息队列有很多,ActiveMQ、RabbitMQ、ZeroMQ等,已经为大多数人所熟悉。图片来自Pexels,特别喜欢ActiveMQ早期在企业中对总线通信的应用,基本上是企业级IT设施解决方案的组成部分。目前Kafka非常稳定,应用也逐渐广泛。它已经不是新鲜事物,但不可否认的是,卡夫卡正如雨后春笋般蓬勃发展。它非常耀眼。今天我们就来仔细拆解Kafka,了解它的内幕。以下内容版本基于最新稳定版Kafka2.4.0。文章主要包括以下内容:Kafka为什么快为什么Kafka稳定如何使用KafkaKafka为什么快是一个相对的概念,没有比较就没有伤害。所以,我们通常说的Kafka是比喻我们常见的ActiveMQ,比如RabbitMQ,会产生IO,主要依靠IO进行信息传递。像ZeroMQ这种基本完全依赖内存进行信息流传输的消息队列当然会更快一些,但是这样的消息队列只在特殊场景下使用,不在比较之列。因此,当我们说Kakfa快的时候,通常是基于以下几个场景:吞吐量:当我们需要每秒处理几十万条或者几百万条Message的时候,Kafka的处理速度比其他MQ快。高并发:当有数百万或数千万消费者时,Kafka在相同配置的机器下会有更多的生产者和消费者。磁盘锁:相比其他MQ,Kafka在进行IO操作时,同步锁IO的场景更少,等待时间更短。那么基于以上几点,我们来详细了解一下Kafka为什么快。消息队列的推拉模型首先,如果单纯从消费者的角度来看“Kafka快”,这是一个伪命题,因为Kafka相对于其他MQ来说,生产消息是从生产者到消费者消费消息。该时间必须大于等于其他MQ。其背后的原因涉及到消息队列设计的两种模型:推模型和拉模型如下图所示:对于拉模型,Producer产生Message后,会主动发送给MQServer。为了提高性能和降低成本,一些客户端还会设计成批量发送。但是无论是单条还是批量,Producer都会主动向MQServer推送消息。当MQServer收到消息时,对于pull模型,MQServer不会主动将消息发送给Consumer,也不会维护和记录消息的Offset。Consumer会自动给服务器设置一个定时器,询问是否有新消息。.通常查询时间不超过100ms。一旦有新的消息产生,就会同步到本地,并修改并记录offset。服务器可以存储偏移量,但不会主动记录和验证偏移量的合理性。同时,Consumer可以完全独立维护offset,实现自定义信息读取。对于推送模型,服务端收到消息后,首先会记录消息的信息,并从自身的元数据库中查询对应消息的Consumer是谁。由于服务器和Consumer在连接时建立了长链接,所以消息可以直接发送给Consumer。Kafka是基于拉模型的消息队列,所以从消费者获取消息的角度来看,延迟会小于等于轮询周期,所以会比推模型的消息队列有更高的消息获取延迟,但推模型也是不同的问题。首先,由于服务端需要记录对应消费者的元信息,包括消息应该发送给谁,偏移量是多少,同时又需要将消息推送给消费者,必然会带来关于一系列问题。如果此时网络不好,Consumer收不到,消息发送不成功怎么办?假设消息发送了,我怎么知道是否收到了呢?所以server和Consumer需要先对密码进行多层确认,以达到至少Consumeonce,onlyonce等。在Kafka这样的pull模型中,这个功能是由消费者自动维护的,所以服务端减少了更多不必要的开销。所以从同样的资源来看,Kafka会有更多相连的Producers和Consumers,大大减少了消息的拥堵,所以看起来更快。在OSPageCache和BufferCache的阳光下并没有什么新鲜事。对于一个框架来说,想要跑得更快,通常只有那么几招可以使用。Kafka正是将这一招发挥到了极致。其中之一就是最大限度地利用OSCache,主要是PageCache和BufferCache。对于这两个Cache,通常使用Linux的同学都比较熟悉。比如我们在linux下执行free命令,会看到如下输出:图片来自网络,会有两列名为buffers和cached,一行名为“-/+buffers/cache”,这两个信息的具体解释如下:pagecache:文件系统级缓存,从磁盘读取的内容存放在这里,这样程序读取磁盘内容会非常快,比如使用grep、find等Linux命令时搜索内容和文件,第一次会慢很多,再次执行时会快很多倍,几乎是瞬间。另外,pagecache中的数据被修改后,也就是脏数据,到了要写入磁盘的时候,会被转移到buffercache中,而不是直接写入磁盘。我们看到的cached列的值表示当前页面缓存(pagecache)的占用情况,页面缓存文件的页面数据,页面是一个逻辑概念,所以页面缓存与文件系统处于同一级别。Buffercache:磁盘等块设备的缓冲区,这部分内存是要写入磁盘的。buffers列表示当前块缓存(buffercache)的占用情况,buffercache用于缓存块设备(如磁盘)的块数据。Block是一个物理概念,所以buffercache和blockdevicedriver是一个级别的。两者都是用来加速数据IO,将写入的page标记为dirty,然后flush到外部存储。读取数据时,先读取缓存,如果未命中,再去外存读取,读取到的数据也加入到缓存中。操作系统总是主动使用所有空闲内存作为PageCache和BufferCache,当OS内存不够时,也会使用LRU等算法淘汰缓存页面。带着上面的概念,我们来看看Kafka是如何使用这个特性的。首先,对于一次数据IO,通常会发生以下过??程:操作系统将数据从磁盘复制到内核区的PageCache中。用户程序将内核区的PageCache复制到用户区缓存中。用户程序将用户区的缓冲区复制到Socket缓冲区中。操作系统将Socket缓存中的数据复制到网卡的Buffer中并发送数据。可以发现,一次IO请求操作需要2次上下文切换和4次系统调用,同一份数据在缓存中被复制多次。其实拷贝可以直接在内核态进行。也就是省略了第二步和第三步,变成了这样:因为可以这样修改数据流,所以Kafka在设计之初就参考了这个流程,尽可能多的使用OS的PageCache尽可能复制数据,尽量减少磁盘操作。如果Kafka的生产和消费配合得好,那么数据会完全存储在内存中,这样会大大提高集群的吞吐量。早期操作系统中的PageCache和BufferCache是??两个独立的缓存。后来发现同一个数据可能会被缓存两次,所以大多数情况下两者合二为一。Kafka虽然是用JVM语言写的,但是它离不开JVM,离不开JVM的GC。但是Kafka本身并不管理缓存,而是直接使用OS的PageCache作为缓存。这样做有以下好处:JVM中的一切都是对象,因此无论对象大小,总会有一些额外的JVM对象元数据浪费空间。JVM自身的GC不是由程序手动控制的,所以如果将JVM作为缓存使用,在遇到大对象或者频繁的GC时会降低整个系统的吞吐量。如果程序异常退出或重启,所有缓存都会失效,影响容灾架构下的快速恢复。又因为PageCache是??OS的Cache,即使程序退出,缓存依然存在。因此Kafka优化了IO流程,充分利用了PageCache,耗时更少,吞吐量更高,比其他MQ更快。用一张图简单描述一下三者的关系如下:当Producer和Consumer的速率相差不大的时候,Kafka几乎可以在不放置磁盘的情况下完成信息的传递。除了以上的重要特性,Kafka还有一个appendsequentialwrite的设计,就是数据持久化存储的sequentialappending。当Kafka将消息放入每个主题的分区文件中时,它只会按顺序附加它们。它充分利用了磁盘的快速顺序访问。图片来自网络Kafka,文件存储按照Topic下的Partition存储。每个Partition都有自己的序列文件,每个Partition的序列是不共享的。主分区根据消息的key进行散列,确定落在哪个分区上。我们先详细解释一下Kafka的术语,以便全面了解它的特点:Broker:Kafka中用来处理消息的服务器,也是Kafka集群的一个节点,多个节点组成一个Kafka集群。主题:消息主题。每个业务系统或Consumer都需要订阅一个或多个主题来获取消息。Producer需要指定消息发生的Topic,相当于信息传输的密码名。Partition:一个Topic会被拆分成多个Partition发送到磁盘。文件会存放在Kafka配置的存储目录中根据对应的分区ID创建的文件夹中。它是磁盘可见的最大存储单元。Segment:一个Partition会有多个Segment文件来实际存储内容。Offset:每个Partition都有自己独立的序号,范围仅在当前Partition下,用于读取对应的文件内容。Leader:每个Topic都需要有一个Leader,负责编写Topic的信息,维护数据的一致性。Controller:每个Kafka集群会选择一个Broker作为Controller,负责决定谁是每个Topic的Leader,监控集群Broker信息的变化,维护集群状态的健康。可以看出,所有的Segment文件最终都落地到磁盘上,每个Partion(目录)相当于一个巨型文件,平均分布到多个大小相等的Segment(段)数据文件中。但是,每个段中的段文件消息数不一定相等。此功能有助于快速删除旧段文件。因为Kafka处理消息的功夫都交给了Partition,它只需要维护Partition对应的顺序处理,Segment可以独立维护自己的状态。段文件由索引文件和数据文件组成,登陆磁盘的后缀为.index和.log。文件是按照序号生成的,如下图:图片来源于网络,其中index维护的是数据的物理地址,data存放的是数据的偏移地址,相互关联。看来这和磁盘的顺序写入关系不大。想想HDFS的块存储。是不是和这里的Segment类似?另外,因为有索引文件本身是以Offset为文件名命名的。搜索时,可以根据需要查找的Offset快速定位到对应的文件,然后根据文件搜索内容。所以Kafka的查找过程是先根据要查找的Offset对文件名进行二分查找,找到对应的文件,然后根据元数据的物理地址依次读取区域到对应的Offset位置索引和日志文件的偏移位置。只是内容。段索引文件采用稀疏索引存储方式,减小了索引文件的体积,可以直接通过mmap操作内存。稀疏索引为数据文件的每个对应的Message设置了一个元数据指针。它比密集索引节省了更多的存储空间,但是需要更多的时间来查找,尤其是在随机读的场景下,Kafka是非常不合适的。所以因为Kafka特殊的存储设计,也让Kafka感觉更快。为什么卡夫卡稳定?前面说过,Kafka为什么快?除了速度快,Kafka还有一个特点,那就是:稳定性。Kafka的稳定性在于几个方面:数据安全,几乎不丢失数据。集群是安全的,发生故障时消费者几乎可以无感知地切换。可用性强,即使部分Partition不可用,剩余Partition的数据依然不影响读取。流量控制限制,防止大量Consumers拉低服务器带宽。Kafka限流机制的稳定性通常是由其整体架构设计决定的。许多优秀的特性结合起来就更加优秀了,比如Kafka的Qutota就是其中之一。既然是限流,就意味着需要控制Consumer或者Producer的流量带宽。通常需要在网卡上做限流处理,比如普通的N路交换机或者高端的路由器。那么对于Kafka来说,通过OS的网卡来控制流量显然是非常困难的,所以Kafka采用了另外一种特殊的思路。即:没有办法控制网卡通过的流量大小,所以控制返回数据的时间。对于JVM程序,这是Wait或Seelp的问题。所以对于Kafka来说,有一套特殊的延迟计算规则。Kafka按照一个窗口统计单位时间内传输的流量。当流量超过设定的阈值时,触发流控,将当前请求丢到Kafka的QutotaManager中,延迟时间到后再次返回数据。我们看一下Kafka的ClientQutotaManager类中的方法:这几行代码代表了Kafka的限流计算逻辑。大致思路是:假设我们设置当前流量限制不超过T,根据窗口计算当前速率为O。如果O超过T,那么就会进行限速。限速的公示为:X=(O-T)/T*WX为需要延迟的时间。我举个形象的例子,假设我们限制流量不超过10MB/s,过去5秒(公告中的W,窗口间隔)通过的流量为100MB,那么延迟时间为:(100-5*10)/10=5秒。这样就可以保证下一个窗口完成后整个流量的大小不会超过限制。通过KafkaApis中Producer和Consumer的回调代码,可以看到限流的延迟返回:对于Kafka的限流,默认是根据clientid或者user限流。从实际使用的角度来看,意义不是很大。目前基于Topic或Partition分区级别的限制相对于使用场景来说要大一些。Kafka竞选机制背后的元信息在很大程度上依赖于Zookeeper。同样,我们不会解释Zookeeper本身,而是重点介绍Kafka是如何使用ZK的。先用一张图说明Kafka对ZK的严重依赖:图片来自网络对ZK的使用。除了自身的信息存储,最重要的是Kafka使用ZK来实现选举机制。Controller是主要的介绍。首先,Controller作为Kafka的心脏,主要负责包括但不限于以下重要事项:也就是说Controller是Kafka的核心角色。对于Controller,通过公平竞争,任何Broker都可能成为Controller,保证了集群的健壮性。性别。对于Controller,选举过程如下:①先获取ZK/Cotroller节点信息,获取Controller的brokerid。如果该节点不存在(比如刚创建集群时),*那么获取到的controllerid为-1。②如果controllerid不为-1,即Controller已经存在,则直接结束进程。③如果controllerid为-1,则证明该Controller还不存在。此时,当前Broker开始在ZK中注册Controller。④如果注册成功,则当前Broker成为Controller。这时调用onBecomingLeader()方法,正式初始化Controller。注意:Controller节点是一个临时节点。如果当前Controller与ZK会话断开连接,Controller的临时节点就会消失,从而触发Controller的重新选举。⑤如果注册失败(恰好Controller被其他Broker创建,抛出异常等),则直接返回。通过KafkaController可以直接看到它的代码:一旦Controller被选举出来,其他Broker会监听ZK的变化来响应集群中Controller挂掉的情况:从而触发新的Controller选举动作。对于Kafka来说,整体设计非常紧凑,代码质量相当高,很多设计也很有意义。类似的功能在Kafka的很多特性中都有体现。这些特性组合在一起,构成了Kafka整个稳定的局面。如何使用Kafka虽然Kafka整体看起来很不错,但是Kafka并不是万能的灵丹妙药,肯定有其相应的缺点。那么如何使用Kafka或者如何更好的使用它需要实际的练习。感觉出来了。经过归纳和总结,可以发现以下不同的使用场景和特点:①Kafka不适合高频交易系统虽然Kafka拥有非常高的吞吐量和性能,但不可否认的是,Kafka在性能方面还是不如传统的MQ单条消息的低延迟,毕竟依赖推送模型的MQ在消息实时发送的场景下可以获得先天优势。②Kafka没有完整的事务机制。0.11之后,Kafka增加了事务机制,可以保证Producer的批量提交。为了保证脏数据不会被读取,Consumer可以通过过滤消息状态来过滤掉不合适的数据,但是仍然保留读取所有数据的操作。即便如此,Kafka的事务机制仍然不完善。其背后的主要原因是Kafka对Clients不感兴趣,所以不会统一所有通用协议。因此,在只消费一次等场景下,效果对客户端的依赖性很大。完成。③Kafka的异地容灾方案非常复杂。对于Kafka来说,如果要实现跨机房的无感知切换,就需要支持跨集群代理。由于Kafka特殊的appendlog设计机制,同一个Offset不能在不同的Broker和不同的内容上复用。也就是说,一旦文件被复制到另一台服务器上,它就不可读了。与类似基于数据库的MQ相比,难以实现数据的跨集群同步。同时,Offset的复现难度也很大。曾经帮一个客户实现了一套跨机房的Kafka集群代理,花了很多钱。④KafkaController架构不能充分利用集群资源。KafkaController类似于ES的去中心化思想,根据选举规则从集群中选出一台服务器作为Controller。意味着改变server同时承担了Controller和Broker的职责。这样一来,在海量消息的压力下,服务器的资源很容易成为集群的瓶颈,导致集群资源无法最大化利用。Controller虽然支持HA,但不支持分布式分布,这意味着如果要优化Kafka的性能,每台服务器至少需要达到最高配置。⑤Kafka没有非常智能的分区均衡能力。通常在设计落地存储时,对于热点或者对性能要求足够高的场景,会采用SSD+HD的组合。同时,如果集群的磁盘容量不相等,对于Kafka来说也会出现非常严重的问题。Kafka的partitiongeneration是按照Paratition的个数来统计的,新的partition是在编号最少的磁盘上创建的。见下图:我曾经帮一个企业修改分区创建规则,考虑容量,也就是根据磁盘容量选择分区。那么第二个问题来了:大容量的磁盘,分区比较多,会导致大量的IO压到磁盘上,最后问题又回落到IO上,影响其他topic的性能磁盘。因此,在考虑MQ系统时,需要手动合理设置Kafka的分区规则。最后,Kafka并不是唯一的解决方案。比如几年前诞生的Pulsar,势头强劲,打着替代Kafka的口号冲入市场。它可能会成为下一个解决Kafka的一些痛点的框架。我将在下面谈论Pulsar。作者:白发川编辑:陶家龙来源:转自微信公众号ThoughtWorksInsights(ID:TW-Insight)
