当前位置: 首页 > 科技观察

为什么卡夫卡这么快?

时间:2023-03-12 09:13:35 科技观察

Kafka是LinkedIn推出的高吞吐量分布式消息系统。通俗地说,就是一个基于发布和订阅的消息队列。温故知新,反复学习优秀的框架,总会有所收获。图片来自Pexels应用场景Kafka的应用场景如下:异步解构:上下游之间没有很强的业务关系或者不需要对单个请求立即处理的业务。系统缓冲:有利于解决业务系统吞吐量不一致的问题,尤其是对于处理速度较慢的业务。消峰功能:针对短时间内偶尔出现的极端流量,可以为后台服务开启保护功能。数据流处理:集成spark进行实时数据流处理。Kafka拓扑图(多副本机制)从上图我们可以发现Kafka是分布式的,每个分区都有多个副本,整个集群的管理是通过Zookeeper来管理的。Kafka核心组件①brokerKafka服务器负责消息的存储和转发;经纪人代表卡夫卡节点。一个代理可以包含多个主题。②topic消息分类,Kafka根据topic对消息进行分类。③partitiontopic分区,一个topic可以包含多个partition,每个partition中存储topicmessages。由于一个主题可以划分为多个分区,它为Kafka提供了并行处理的能力,这也是Kafka吞吐量高的原因之一。一个partition在物理上由多个segment文件组成,每个segment大小相等,顺序读写(这也是Kafka速度较快,不需要随机写入的原因之一)。每个Segment数据文件都以段中最小的偏移量开头,文件扩展名为.log。在查找有偏移量的Message时,使用二分查找可以快速找到Message所在的Segment。④offset消息在日志中的位置可以理解为消息在partition上的偏移量,也是代表消息的唯一序号。同时也是master和slave之间需要同步的信息。⑤Producer,负责向KafkaBroker发送消息的客户端。⑥Consumer消息消费者,负责消费KafkaBroker中的消息。⑦ConsumerGroup消费组,每个Consumer必须属于一个组;(注:一个partition只能被组内一个consumer消费,consumergroups之间互不影响。)⑧Zookeeper管理kafka集群,负责存储集群broker、topic、partition等元数据存储,以及还负责broker故障发现、partitionleader选举、负载均衡等功能。服务治理由于Kafka是一个分布式的发布/订阅系统,如果集群之间做好了数据的同步和一致性,Kafka会不会丢消息呢?如果在机器宕机时进行Leader选举怎么办?①数据同步Kafka中的一个Partition有一个leader和多个follower。producer向Partition写入数据时,只向leader写入数据,然后将数据复制到其他Replicas中。而每个follower可以理解为一个消费者,定期去leader那里拉消息。并且只有在数据同步完成后,Kafka才会向生产者返回一个ACK,告知消息已经存储并落地。②ISR在Kafka中,为了保证性能,Kafka不会采用强一致性的方式来同步主从数据。相反,它维护一个同步副本列表。Leader不需要等待所有Follower完成同步。只要ISR中的Follower完成数据同步,就可以向producer发送ACK,就认为消息同步完成。同时,如果发现ISR中的某个follower落后太多,则会将其移除。具体过程如下:上述做法并不能保证Kafka不会丢消息。虽然Kafka通过多副本机制最大程度保证消息不丢失,但是如果数据已经写入系统pagecache但还没有来得及刷入磁盘,机器突然死机或者丢失此时断电,短信自然会丢失。③Kafka故障恢复Kafka通过Zookeeper来管理集群,所以这里的选举机制是Zab(zookeeper使用的):producer向leader发送消息。此时leader完成了数据入库,但是突然失效,并没有返回ack给producer。通过ZK选举,其中一个追随者成为领导者。这时,生产者请求新的领导者并存储数据。为什么卡夫卡这么快?1、顺序写入磁盘Kafka采用顺序写入磁盘,由于顺序写入磁盘相对随机,减少了寻找地址的时间。(消息在Kafka的每个partition中是有序的)②PageCacheKafka使用PageCache代替我们通常在OS系统中使用的Buffer。PageCache并不陌生,也不是新鲜事物。我们在linux上查看内存的时候,经常可以看到buff/cache,这两个都是用来加速IO读写的,而cache是??用来读取的。也就是说,可以将磁盘的内容读入缓存中,这样应用程序就可以非常快速地读取磁盘。buff是用来写字的。我们开发和写入磁盘。一般写到一个buff,然后flush,会很快的。而Kafka则将两者发挥到了极致:Kafka虽然是用scala写的,但是仍然运行在Java虚拟机上。尽管如此,Kafka还是试图避开JVM的限制。它使用Pagecache进行存储,从而避免了数据因GC而在JVM中的STW。另一方面,是PageCache使其实现了零拷贝,下面会详细讨论。③零拷贝无论是优秀的Netty还是其他优秀的Java框架,零拷贝基本上都减少了CPU上下文切换和磁盘IO。卡夫卡当然也不例外。零拷贝的概念这里不再赘述,我简单介绍一下这个概念。一个应用程序请求数据的传统流程:在这里,可以大致发现,传统的方式有4个副本,2个DMA和2个CPU,CPU有4个switch。DMA简单理解就是在I/O设备和内存之间进行数据传输时,数据处理的工作全部交给DMA控制器,CPU不再参与任何与数据处理相关的事情。④零拷贝方式通过优化我们可以发现CPU只有2次上下文切换和3次数据拷贝。Linux系统提供了系统意外调用函数“sendfile()”,使得系统调用可以直接将内核缓冲区中的数据复制到socket缓冲区中,而不用复制到用户态。⑤分区与分段我们在上面也介绍过,Kafka采用分区的方式,每个分区对应一个物理段。查找时,可以根据二分查找快速定位。这不仅为数据读取提供了查询效率,也提供了一种并行操作的方式。⑥数据压缩Kafka为数据提供了Gzip、Snappy等压缩协议,对消息结构进行压缩。一方面,它减少了带宽和数据传输的消耗。Kafka安装①安装JDK使用压缩包需要自己配置环境变量,所以推荐使用yum直接安装,熟悉查看当前Java版本:yum-ylistJava*安装你要的版本,我这边1.8:yuminstalljava-1.8.0-openjdk-devel.x86_64查看是否安装成功:Java-version②安装Zookeeper,首先需要到官网下载安装包,然后解压:tar-zxvfzookeeper-3.4.9.tar.gz你需要做的就是复制这个文件,并命名为:zoo.cfg,然后在zoo.cfg中修改你自己的配置。cpzoo_sample.cfgzoo.cfgvimzoo.cfg的主要配置解释如下:#zookeeper内部的基本单位,单位是毫秒,也就是说一个tickTime是2000毫秒,在zookeeper的其他配置中,tickTime=2000#是根据转换的ontickTime#集群中follower服务器(F)和leader服务器(L)初始连接时所能容忍的最大心跳次数(numberoftickTimes)。initLimit=10#syncLimit:集群中follower服务器(F)和leader服务器(L)之间的请求和响应之间可以容忍的最大心跳数(numberoftickTimes)syncLimit=5#数据存放文件夹,zookeeper运行过程中有两个数据需要保存,一个是快照数据(持久化数据)一个是事务日志dataDir=/tmp/zookeeper##client访问端口clientPort=2181配置环境变量:vim~/.bash_profileexportZK=/usr/local/src/apache-zookeeper-3.7.0-binexportPATH=$PATH:$ZK/binexportPATH//启动zkServer.shstart下面可以看到启动成功:③安装Kafka下载Kafka:https:///www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka-2.8.0-src.tgz安装Kafka:tar-xzvfkafka_2.12-2.0.0.tgzbr配置环境变量:exportZK=/usr/local/src/apache-zookeeper-3.7.0-binexportPATH=$PATH:$ZK/binexportKAFKA=/usr/local/src/kafkaexportPATH=$PATH:$KAFKA/bin启动Kafka:nohupkafka-server-start.sh自己的配置文件路径/server.properties&br大功告成!作者:何永康,腾讯CDG后台研发工程师。编辑:陶家龙来源:转载自公众号云家社区(ID:QcloudCommunity),参考资料:《深入理解 Kafka:核心设计实践原理》