kafka是LinkedIn推出的一款高吞吐量分布式消息系统。通俗地说,就是一个基于发布和订阅的消息队列。官网地址:https://kafka.apache.org/intro应用场景的异步解构:上下游没有强依赖的业务关系或者不需要对单个请求立即处理的业务;系统缓冲:有助于解决业务系统吞吐不一致的流量,尤其是处理速度较慢的业务;消峰作用:针对短期偶发的极端流量,可以启动对后端服务的保护;数据流处理:集成spark进行实时数据流处理。Kafka拓扑图(多副本机制)从上图我们可以发现Kafka是分布式的,每个分区都有多个副本,整个集群的管理是通过zookeeper来管理的。Kafka的核心组件,brokerKafka服务器,负责消息的存储和转发;经纪人代表卡夫卡节点。一个broker可以包含多个topic主题消息类别,Kafka根据topic对消息分区topic分区。一个主题可以包含多个分区,主题消息存储在每个分区中;由于一个topic可以分为多个partition,鉴于Kafka提供了并行处理的能力,这也是Kafka吞吐量高的原因之一。一个partition在物理上由多个segment文件组成,每个segment大小相等,顺序读写(这也是Kafka速度较快的原因之一,不需要随机写入)。每个Segment数据文件都以段中最小的偏移量开头,文件扩展名为.log。在查找有偏移量的Message时,使用二分查找可以快速找到Message所在的Segment。offset消息在日志中的位置,可以理解为消息在分区上的偏移量,也代表消息的唯一序号。同时也是master和slave之间需要同步的信息。Producer生产者,负责向KafkaBroker客户端发送消息Consumer消息消费者,负责消费KafkaBrokerConsumerGroup消费组中的消息,每个Consumer必须属于一个组;(注意一个partition只能被groupOneconsumer消费,consumergroup之间互不影响。)Zookeeper管理Kafka集群,负责集群broker、topic、partitions等元数据存储。它还负责broker故障发现、分区领导者选举和负载平衡。服务治理既然Kafka是一个分布式的发布/订阅系统,那么如果集群之间做好了数据的同步和一致性,Kafka就一定不会丢消息吗?而如果在系统宕机的情况下进行leader选举怎么办?数据同步Kafka中的Partition有一个leader和多个follower。producer向Partition写入数据时,只会向leader写入数据,然后再将数据复制到其他Replicas中。而每个follower可以理解为一个消费者,定期去leader那里拉取消息。并且只有在数据同步完成后,Kafka才会向生产者返回一个ACK,告知消息已经存储并落地。在Kafka的ISR中,为了保证性能,Kafka不会使用强一致性的方式来同步主从数据。相反,它维护一个同步副本列表。Leader不需要等待所有Follower完成同步。只要ISR中的Follower完成了数据同步,就可以向producer发送ack,就认为消息同步完成。同时,如果发现ISR中的某个follower落后太多,则会将其移除。具体过程如下:上述做法并不能保证Kafka不会丢消息。Kafka虽然通过多副本机制最大程度保证消息不丢失,但是如果数据已经写入系统pagecache但还没有来得及刷入磁盘,突然死机或者掉电关闭,消息自然会丢失。Kafka故障恢复Kafka是通过Zookeeper连接到集群的管理上的,所以这里的选举机制使用的是Zab(zookeeper使用的)。生产者向领导者发送了消息。此时leader完成了数据入库,但是突然失败了,并没有给producer返回一个ack;通过ZK选举,其中一个followers成为leader。这时,生产者重新请求了一个新的领导者并存储了数据。为什么Kafka顺序写入磁盘如此之快?Kafka采用顺序写磁盘,因为顺序写磁盘比较随机,减少了找地址的时间。(在Kafka的各个partition中,消息是有序的。PageCacheKafka使用的是OS系统中的PageCache,而不是我们平时使用的Buffer。PageCache其实并不陌生,也不是什么新鲜事,我们在linux上查看内存有时候,经常可以看到buff/cache,这两个都是用来加速IO读写的,cache是??用来读的,也就是说可以把磁盘的内容读到cache里面。这样,应用程序读磁盘非常快;而buff是用来写的,我们开发写磁盘,一般写到一个buff里,然后flush,会很快的。而kafka就是把两者做到了极致:Kafka虽然是用Scala写的,但是仍然运行在Java虚拟机上,但是Kafka还是尽量避开了JVM的限制,它使用Pagecache来存储,从而避免了数据在JVM中由于STWGC.另一方面,也是PageCache使之能够实现零拷贝,后面会讲到w。零拷贝无论是优秀的Netty还是其他优秀的Java框架,基本上都是在零拷贝的时候减少了CPU上下文切换和磁盘IO。当然,卡夫卡也不例外。零复制的概念这里不再赘述,只是给大家大致介绍一下这个概念。传统的一个应用程序请求数据的流程,大致可以发到这里。传统方式有4份,2倍DMA,2倍CPU。而且CPU切换了4次。_(对DMA的简单理解就是在I/O设备和内存之间进行数据传输时,数据处理的工作全部交给DMA控制器,CPU不再参与任何数据处理相关的事情)。通过零拷贝的方式优化,我们可以发现CPU只有2次上下文切换和3次数据拷贝。(linux系统提供了系统意外调用函数“sendfile()”,这样系统调用就可以直接将内核缓冲区中的数据复制到socket缓冲区中,而不是复制到用户态)。Partition和segmentation我们上面也有介绍。Kafka采用分区的方式,每个分区对应一个物理段,查找的时候可以根据二分查找快速定位。这样既提高了数据读取的查询效率,又提供了一种并行操作的方式。数据压缩Kafka为数据提供了Gzip、Snappy等压缩协议,对消息结构进行压缩。一方面,它减少了带宽和数据传输消耗。Kafka安装安装JDK由于使用压缩包需要自己配置环境变量,所以建议直接用yum安装,熟悉当前Java版本:$yum-ylistJava*安装自己想要的版本,我这里是1.8$yuminstalljava-1.8.0-openjdk-devel.x86_64查看是否安装成功$Java-version安装Zookeeper首先需要到官网下载安装包,然后解压$tar-zxvfzookeeper-3.4.9.tar.gz你需要做的就是复制这个文件,并命名为:zoo.cfg,然后在zoo.cfg中修改自己的配置$cpzoo_sample.cfgzoo.cfg$vimzoo.cfg主要配置说明如下#zookeeper里面的基本单位,单位是毫秒,这个代表一个tickTime是2000毫秒。在其他zookeeper配置中,转换基于tickTime。tickTime=2000#集群中follower服务器(F)和leader服务器(L)初始连接时可以容忍的最大心跳次数(tickTime数量)。initLimit=10#syncLimit:集群中follower服务器(F)和leader服务器(L)之间的请求和响应之间可以容忍的最大心跳数(numberoftickTimes)syncLimit=5#数据存放文件夹,zookeeper运行过程中有两个数据需要保存,一个是快照数据(持久化数据)一个是事务日志dataDir=/tmp/zookeeper##client访问端口clientPort=2181配置环境变量$vim~/.bash_profile$exportZK=/usr/local/src/apache-zookeeper-3.7.0-bin$exportPATH=$PATH:$ZK/bin$exportPATH#Start$zkServer.shstart下面可以看到启动成功Kafka安装完毕下载kafka$wgethttps://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka-2.8.0-src.tgz安装kafka$tar-xzvfkafka_2.12-2.0。0.tgz配置环境变量$exportZK=/usr/local/src/apache-zookeeper-3.7.0-bin$exportPATH=$PATH:$ZK/bin$exportKAFKA=/usr/local/src/kafka$exportPATH=$路径:$KAFKA/binstartKafka$nohupkafka-server-start.sh自己的配置文件路径/server.properties&大功告成!
