当前位置: 首页 > 科技观察

揭秘Kafka的高性能吞吐

时间:2023-03-16 00:08:28 科技观察

Kafka作为一个开源的消息系统,被广泛应用于数据缓冲、异步通信、收集日志、系统解耦等方面。与其他常见的消息系统如RocketMQ相比,Kafka不仅保证了大部分的功能特性,还提供了读写性能。本文将简要分析Kafka的性能。首先简单介绍一下Kafka的架构和涉及到的名词:Topic:用来划分Messages的逻辑概念。一个Topic可以分布在多个Broker上。Partition:是Kafka横向扩展和所有并行化的基础。每个Topic被划分为至少一个Partition。Offset:消息在Partition中的编号,编号顺序不跨越Partition。消费者:用于从Broker检索/消费消息。Producer:用于向Broker发送/生产Message。Replication:Kafka支持以Partition为单位对Message进行冗余备份。每个Partition可以配置至少一个Replication(当只有一个Replication时,只有Partition本身)。Leader:为Replication集中的每个Partition选择一个唯一的Leader,所有的读写请求都由Leader处理。其他Replica从Leader同步数据更新到本地,过程类似于我们熟悉的MySQL中的Binlog同步。Broker:Kafka中使用Broker接受Producer和Consumer的请求,并将Message持久化到本地磁盘。每个Cluster会选出一个Broker作为Controller,负责处理PartitionLeader选举,协调Partition迁移等工作。ISR(In-SyncReplica):它是Replicas的一个子集,表示当前处于Alive状态并且可以“赶上”Leader的Replicas集合。由于读写都先落在Leader身上,一般来说,通过同步机制从Leader上拉取数据的Replica会与Leader产生一定的延迟(包括延迟时间和延迟记录数两个维度),以及任何一个超过阈值都会将Replica踢出ISR。每个Partition都有自己独立的ISR。以上几乎就是我们在使用Kafka过程中可能遇到的所有名词,都不是核心概念或组件。感觉Kafka从设计本身就足够简单了。本次,本文围绕Kafka出色的吞吐性能,一一介绍其设计和实现中使用的各种“黑科技”。Broker不同于Redis和MemcacheQ等内存消息队列。Kafka旨在将所有消息写入低速大容量的硬盘,以换取更强的存储能力。其实Kafka使用硬盘并没有带来太大的性能损失,它复制了一条“乖巧”的“捷径”。首先,我说“乖”是因为Kafka只对磁盘进行SequenceI/O。由于消息系统读写的特殊性,这个没有问题。关于磁盘I/O的性能,参考Kafka官方给出的一组测试数据(Raid-5,7200rpm):SequenceI/O:600MB/sRandomI/O:100KB/s所以只有SequenceI/O是used限制避免了磁盘访问速度慢可能对性能造成的影响。接下来说说Kafka是如何“偷工减料”的。首先,Kafka严重依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache,并将Page属性标记为脏。当发生读操作时,首先从PageCache中查找,如果发生pagefault,则进行磁盘调度,最终返回需要的数据。事实上,PageCache使用尽可能多的可用内存作为磁盘缓存。同时,如果其他进程申请内存,回收PageCache的代价很小,所以现代OS都支持PageCache。使用PageCache功能还可以避免在JVM内部缓存数据。JVM为我们提供了强大的GC能力,但也引入了一些不适用于Kafka设计的问题。如果缓存在Heap中管理,JVM的GC线程会频繁扫描Heap空间,造成不必要的开销。如果Heap过大,进行FullGC对系统的可用性是一个很大的挑战。JVM中的所有对象都不可避免地存在ObjectOverhead(这一点不可小觑),内存的有效空间利用率也会相应降低。所有In-ProcessCaches在操作系统中都有相同的PageCache。因此,通过将缓存仅放在PageCache中,您至少可以将可用缓存空间增加一倍。如果重启Kafka,所有In-ProcessCache都会失效,而OS管理的PageCache仍然可以使用。PageCache只是第一步。Kafka还使用了Sendfile技术来进一步优化性能。在讲解Sendfile之前,先介绍一下传统的网络I/O操作流程,大致分为以下4个步骤。OS从硬盘读取数据到内核区的PageCache。用户进程将数据从内核区复制到用户区。然后用户进程将数据写入Socket,数据流入内核区的SocketBuffer。OS再将Buffer中的数据复制到网卡的Buffer中,这样就完成了一次传输。整个过程经历了两次上下文切换和四次系统调用。相同的数据在内核缓冲区和用户缓冲区之间重复复制,效率低下。其中2、3这两个步骤不是必须的,直接在内核区完成数据拷贝即可。这正是Sendfile解决的问题。Sendfile优化后,整个I/O过程变成如下。从上面的介绍不难看出,Kafka的设计初衷是力求在内存中完成数据交换,无论是对外作为整个消息系统,还是对内与底层操作系统进行交互。如果Producer和Consumer之间的生产和消费调度协调得当,就可以实现数据交换的零I/O。这就是为什么我说Kafka使用“硬盘”并没有带来太大的性能损失。以下是我在生产环境中收集的一些指标。(20Brokers,75PartitionsperBroker,110kmsg/s)此时集群只有写操作,没有读操作。10M/s左右的Send流量是由ReplicatebetweenPartitions产生的。从recv和writrate的对比可以看出,写磁盘采用的是Asynchronous+Batch的方式,底层OS也可能会优化写磁盘的顺序。当一个ReadRequest进来的时候,有两种情况。首先是在内存中完成数据交换。Send流量从平均10M/s增加到平均60M/s,而diskRead不超过50KB/s。PageCache减少磁盘I/O的效果非常明显。下一步是读取一些已经接收了一段时间并且已经换出内存并写入磁盘的旧数据。其他指标依旧,磁盘Read飙升到40+MB/s。这个时候,所有的数据都已经到硬盘了(为了硬盘的顺序读取,OS层会优化PrefillPageCache)。仍然没有性能问题。TipsKafka官方不建议在Broker端通过log.flush.interval.messages和log.flush.interval.ms强制写盘。他们认为应该通过Replica来保证数据的可靠性,强制Flush数据到磁盘会对整体性能产生负面影响。影响。可以通过调整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio来调整性能。如果脏页率超过第一个指标,pdflush将启动FlushDirtyPageCache。如果脏页率超过第二个指标,所有的写操作都会被阻塞进行Flush。根据不同的业务需要,可以适当降低dirty_background_ratio,提高dirty_ratio。Partition分区是Kafka很好地横向扩展,提供高并发处理,实现Replication的基础。可扩展性。首先,Kafka允许Partition在集群中的Brokers之间任意移动,以平衡可能出现的数据倾斜问题。其次,Partition支持自定义分区算法,比如可以将同一个Key的所有消息路由到同一个Partition。同时,Leader也可以在In-SyncReplica中迁移。由于一个Partition的所有读写请求都只由Leader处理,因此Kafka会尽量将Leader平均分配给集群的各个节点,避免网络流量过度集中。并发方面。任何一个Partition在某一时刻只能被一个ConsumerGroup中的一个Consumer消费(反之,一个Consumer可以同时消费多个Partition)。Kafka非常简单的Offset机制将Broker和Consumer之间的交互降到最低,这使得Kafka不会像其他同类型的消息队列一样,随着下游消费者数量的增加而按比例降低性能。另外,如果多个消费者恰好在非常相似的时间顺序消费数据,那么PageCache的命中率就可以达到很高的水平。因此,Kafka可以非常高效地支持高并发读操作。在实践中基本可以达到单机网卡的上限。但是Partition的数量并不是越多越好,Partition的数量越多,每个Broker的平均数量就越多。考虑到Broker宕机(NetworkFailure,FullGC),Controller需要为所有宕机的Broker上的所有Partition重新选举Leader。假设每个Partition的选举耗时10ms。如果Broker上有500个Partition,那么在选举的5s期间,对上面Partition的读写操作都会触发LeaderNotAvailableException。再者,如果挂掉的Broker是整个集群的Controller,那么首先要做的就是重新指定一个Broker作为Controller。新上任的Controller需要从Zookeeper获取所有Partition的Meta信息,获取每条信息大约需要3-5ms。如果有10,000个Partition,时间会达到30s-50s。并且不要忘记这只是重新启动控制器所需的时间。在此基础上,前面提到的选举Leader的时间-_-!!!!!!另外,在Broker端,Producer和Consumer都采用了Buffer机制实现。其中,Buffer的大小统一配置,数量与Partition数量相同。如果Partition过多,Producer和Consumer的Buffer内存就会过大。TipsPartitions的数量应该尽可能提前预分配。虽然后面可以动态增加Partitions,但是存在破坏MessageKeys和Partitions对应关系的风险。Replicas的数量不宜过多。如果条件允许,尽量将Replicaset中的Partition调整到不同的Racks。尽量保证每次停止Broker都能cleanShutdown,否则问题不仅是恢复服务需要很长时间,还会出现数据损坏或者其他很奇怪的问题。ProducerKafka的研发团队表示,在0.8版本中,整个Producer都是用Java重写的,据说性能有了很大的提升。我自己没有尝试过,这里就不做数据对比了。在本文最后的扩展阅读中,我提到了一组我认为更好的控制组。有兴趣的同学可以试试。实际上,大多数生产者端的消息系统都是单一的优化方式,无非就是把部分变成整体,把同步改成异步。Kafka系统默认支持MessageSet,多条消息自动分组发送出去,均衡后降低每次通信的RTT。而且,在组织MessageSet的同时,还可以对数据进行重新排序,将突发流的随机写入优化为相对稳定的线性写入。另外需要重点介绍一下,Producer支持End-to-End压缩。数据在本地压缩,然后通过网络传输。Broker一般不解压(除非指定Deep-Iteration),直到消息被消费后在客户端解压。当然,用户也可以选择在应用层做压缩和解压的工作(毕竟Kafka目前只支持有限的压缩算法,只有GZIP和Snappy),但是这样做会出乎意料的降低效率!!!!Kafka的End-to-End压缩配合MessageSet效果最好,上面的做法直接切断了两者之间的联系。至于原因,其实很简单。压缩算法中的一个基本原则是“重复的数据越多,压缩率越高”。无论消息体的内容如何,??无论消息体的数量多少,在大多数情况下,输入数据量越大,压缩率就越高。但是Kafka对MessageSet的使用也导致了一定程度的可用性妥协。每次发送数据的时候,Producer都认为send()之后已经发送完了,但实际上大部分消息还在内存中的MessageSet中,并没有发送到网络上。如果此时Producer挂了,就会出现数据丢失的情况。为了解决这个问题,Kafka在0.8版本的设计中借鉴了网络中的ack机制。如果你对性能要求很高,可以在一定程度上允许Message丢失,可以设置request.required.acks=0来禁用ack,全速发送。如果需要确认发送的消息,需要设置request.required.acks为1或者-1,那么1和-1有什么区别呢?这里我们还需要提到前面讨论的Replicas的数量。如果配置为1,表示消息只需要Leader接收并确认,其他Replica无需立即确认即可进行异步拉取,在保证可靠性的同时不会降低效率。如果设置为-1,则表示消息只有提交给Partition的ISR集中的所有Replica后才能返回。消息的发送会更安全,整个过程的延迟会随着Replicas的数量按比例增加。这里是需要根据不同的需求进行优化。TipsProducer线程不要配置太多,尤其是用在Mirror或者Migration中,会加剧目标集群Partition消息的乱序(如果你的应用场景对消息的顺序敏感)。0.8版本的request.required.acks默认为0(与0.7相同)。ConsumerConsumer端的设计一般都比较常规。通过ConsumerGroup,可以支持生产者消费者和队列访问两种模式。ConsumerAPI分为Highlevel和Lowlevel。前者严重依赖Zookeeper,所以性能较差,也不免费,但超级省心。第二种不依赖Zookeeper服务,在自由度和性能上有更好的表现,但是所有异常(Leader迁移、Offset越界、Broker宕机等)和Offset维护都需要自己处理。大家可以关注这几天发布的0.9Release。开发者用Java重写了一个Consumer。将两套API合并在一起,去掉对Zookeeper的依赖。据说性能有很大提升~~Tips强烈推荐使用LowlevelAPI。虽然比较繁琐,但是目前只有这个API可以自定义对Error数据的处理,特别是在处理Broker异常或者UncleanShutdown导致的CorruptedData时。否则,Skip只能等待Broker上轮换的“坏消息”,在此期间Replica将保持不可用状态。