当前位置: 首页 > 科技观察

这几天在家办公整理的Kafka知识点

时间:2023-03-21 11:16:01 科技观察

Kakfa广泛应用于国内外公司,如BAT、字节跳动、美团、Netflix、Airbnb、Twitter等,今天我们通过这篇文章来了解一下如何卡夫卡作品。图片来自PexelsKafka概述Kakfa是一个分布式的基于发布/订阅的消息队列(MessageQueue),主要应用于大数据的实时处理领域。MessageQueue传统消息队列和新消息队列模式如下:上面是传统的消息队列,比如用户要注册信息,用户信息写入数据库后,还有一些其他的过程后面,比如发短信,需要等这些流程处理完,返回给用户。而新的队列,比如一个用户注册信息,直接把数据丢进数据库,成功了就直接返回给用户。使用消息队列的好处如下:解耦可恢复性缓冲灵活性和峰值处理能力异步通信消息队列的模式如下:①点对点模式:消息生产者向消息队列发送消息,消息消费者再向消息队列发送消息将它们从队列中取出并使用消息。消息被消费后,不会存入队列。因此,一个消息消费者不可能再消费一条已经被消费过的消息;队列支持多个消费者,但是对于一条消息,只有一个消费者可以消费;如果要将其发送给多个消费者,则需要更多才能发送消息。②发布/订阅模式(一对多,消费者消费数据后不会清除消息):消息生产者将消息发布到Topic,多个消息消费者(订阅者)同时消费消息。与点对点方式不同,发布到Topic的消息会被所有订阅者消费;但是数据保留是有限的,默认是7天,因为它不是一个存储系统。Kafka就是这种模式。有两种方式,一种是消费者主动消费(pull)消息,而不是生产者向消费者推送消息;另一种是生产者主动向消费者推送消息,类似公众号。Kafka基础架构Kafka的架构如下:Kafka的基础架构主要由Broker、producer、consumergroup组成,目前包括ZooKeeper。Producer负责发送消息,Brokers负责缓存消息,在Brokers中可以创建Topic,每个Topic都有Partition和Replication的概念。ConsumerGroup负责处理消息,同一个ConsumerGroup的消费者不能消费同一个Partition中的数据。消费群体主要是提高消费能力。比如之前一个消费者消费100条数据,现在两个消费者消费100条数据,可以提高消费能力。因此,消费组中的消费者数量应该小于分区数量,否则会有没有分区的消费者消费,造成资源浪费。注意:不同消费者组中的消费者可以消费相同的Partition数据。Kakfa如果要组成一个集群,只需要注册到一个ZooKeeper中即可。ZooKeeper也保留了消息消费的进度或者offset或者消费位置:ZooKeeper中保存了0.9之前版本的offset。0.9之后的版本偏移量存储在Kafka中。Kafka定义了一个系统主题,专门用于存储偏移量数据。为什么要改变它?主要原因是频繁的offset变化给ZooKeeper带来了很大的压力,Kafka自身的处理也比较复杂。安装Kafka①Kafka的安装只需要解压安装包即可完成安装。tar-zxvfkafka_2.11-2.1.1.tgz-C/usr/local/2-volume-value:[root@es1config]#pwd/usr/local/kafka/config[root@es1config]#lltotal84-rw-.r--r--.1rootroot906Feb82019Connect-Console-Sink.properties-rw-r--r--.1rootroot909Feb82019connect-console-source.properties-rw-r--r--。-r--.1rootroot883Feb82019connect-file-sink.properties-rw-r--r--。r--.1rootroot2262Feb82019connect-standalone.properties-rw-r--r--.1rootroot1221Feb82019consumer.properties-rw-r--r--.1rootroot4727Feb82019log4j.properties-rw-r--r---r--r--.1rootroot6865Jan1622:00server-1.properties-rw-r--r--.1rootroot6865Jan1622:00server-2.properties-rw-r--r--。r--r--.1rootroot1032Feb82019tools-log4j.properties-rw-r--r--.1rootroot1169Feb82019trogdor.conf-rw-r--r--.1rootroot1023Feb82019zookeeper.properties③修改配置文件server.properties,设置broker.id这是Kafka集群区分各个节点的唯一标识。④设置Kafka的数据存放路径:注意:该目录下不能有其他非Kafka目录,否则Kafka集群启动失败。⑤设置话题是否可以删除。Kafka的topic默认是不允许删除的。⑥Kafka数据保留时间,默认7天。⑦Log文件的最大大小,如果Log文件超过1G,将新建一个文件。⑧Kafka连接的ZooKeeper的地址和连接Kafka的超时时间。⑨默认的分区数。启动Kafka①启动方式1.Kafka只能在单节点上启动,所以需要手动启动每个Kakfa节点。下面的方法是采用阻塞方式启动。②启动方式二,以守护方式启动,推荐使用。Kafka运行①查看当前Kafka集群中已经存在的topic。注意:这里连接的是ZooKeeper,不是Kafka。②创建一个Topic,指定分片数和副本数。说明:replication-factor的副本数,replication-factor的分区数,topic的主题名称。如果当前Kafka集群只有3个Broker节点,最大replication-factor为3,下面的例子创建replicas为4,会报错。③删除主题。④查看主题信息。启动生产者生产消息Kafka带有生产者和消费者客户端。①启动一个生产者,注意此时连接的9092端口,以及连接的Kafka集群。②启动一个consumer,注意此时9092端口还是连接的,0.9之前的版本连接的是2181端口。这里我们启动2个消费者来测试一下。注意:如果不指定consumergroup的配置文件,默认每个consumer属于不同的consumergroup。③发送消息,可以看到每个消费者都能收到消息。④Kakfa的实际数据。Kafka架构深入人心。Kafka不能保证消息的全局顺序,只能保证Partition中消息的顺序,因为消费者在不同的Partition中随机消费消息。Kafka的工作流程Kafka中的消息是按Topic分类的。生产者生成消息,消费者为主题消费消息。Topic是逻辑概念,Partition是物理概念。每个Partition都有一个副本的概念。每个Partition对应一个Log文件,里面存放的是producer产生的数据,producer产生的数据会不断的追加到Log文件的末尾。并且每条数据都有自己的Offset,消费者会实时记录自己消费了哪个Offset,以便出错时可以从上次的位置继续消费,这个Offset会保存在Index文件中。Kafka的Offset在partition内是有序的,但是在不同partition内是乱序的。Kafka不保证数据的全局顺序。Kafka的原理由于生产者生产的消息会被不断的追加到Log文件的末尾,为了防止Log文件过大导致数据定位效率低下,Kafka采用了分片和索引的机制,并将每个Partition划分为多个Segment。Segment对应两个文件Indexfile和Logfile。两个文件位于同一个文件夹下,文件夹命名规则为:主题名+分区号。Index和Log文件的文件名是当前索引最小数据的Offset。Kafka如何快速消费数据?Index文件中存储的数据的索引信息,第一列为Offset,第二列为该数据对应的Log文件中的偏移量,就像我们读取文件一样,使用seek()设置当前鼠标位置,可以更快地找到数据。如果要消费Offset为3的数据,先用binary的方式查找数据在哪个Index文件中,然后用Index中的Offset查找Log文件中数据的Offset;这样可以快速定位到数据并消费。因此,Kakfa虽然将数据存储在磁盘上,但其读取速度还是非常快的。Kafkaproducer和consumerKafkaproducerKafka的Partition分区功能:Kafka分区主要是为了提供并发,提高性能,因为读写都是以Partition为单位进行读写的。生产者将消息发送到哪个分区?在客户端指定分区。轮询(推荐)消息1到p1,消息2到p2,消息3到p3,消息4到p1,消息5到p2,消息6到p3……Kafka是如何保证数据可靠性的?Kafka是如何保证数据可靠性的?Ack保证!为了保证producer发送的数据能够可靠地发送到指定的Topic,Topic的每个Partition在收到producer发送的数据后都需要向producer发送Ack(确认接收)。如果生产者收到Ack,就会发送下一轮,否则会重新发送数据。那么Kafka什么时候向生产者发送Ack呢?保证follower和leader是同步的,leader向producer发送ack,保证leader挂掉后,可以从follower中选举出新的leader,数据不会丢失。同步完成后有多少follower发送Ack?方案一:一半已经完成同步,再发送Ack。方案二:所有同步完成后才发送Ack(Kafka采用这种方式)。采用第二种方案后,想象一下这样的场景:Leader收到数据,所有Follower开始同步数据,但是有一个Follower因为某种故障一直无法完成同步,那么Leader只好等他完成同步,发送Ack。这将极大地影响效率。如何解决这个问题呢?Leader维护着一个动态的ISR列表(同步副本的作用),只需要同步这个列表中的Follower和Leader。ISR中的Follower完成数据同步后,Leader会向producer发送Ack。如果Follower长时间没有与Leader同步数据,则Follower将被从ISR中移除。这个时间阈值也是定制的。同一个Leader失效后,会从ISR中选举出新的Leader。如何选择ISR节点?首先,沟通的时间一定要快,和Leader的沟通要快的完成。默认时间为10秒。然后就看leader的数据差距了,默认的消息数是10000条(后面的版本去掉了)。为什么要删除它?因为Kafka是批量发送消息,瞬间就会被Leader接受,而Follower还没有被拉取,所以会频繁的被踢出添加到ISR中。这些数据会保存在ZooKeeper和内存中,所以会经常更新ZooKeeper和内存。但是对于一些不重要的数据,对数据的可靠性要求不是很高,可以容忍少量的数据丢失,所以不需要等待ISR中的所有Follower都接受成功。因此,Kafka为用户提供了三个可靠性级别。用户可以权衡可靠性和延迟。这个设置是在Kafka的生成中设置的:Ack参数设置。①Acks为0:producer不等待Ack,直接向topic抛出数据,这种数据丢失的概率很高。②Ack为1:leader放置磁盘后会返回一个Ack,会有数据丢失。如果同步完成后leader失效,就会出现数据丢失。③Ack为-1(全部):Leader和Follower(ISR)只有放好盘后才会返回Ack,会出现数据重复。如果Leader已经写好了,Follower同步了,但是返回Ack的时候失败了,就会出现Dataduplication。在极端情况下,还会出现数据丢失。比如Follower和Leader之间的通信速度很慢,所以ISR中只有一个Leader节点。此时Leader会在完成放置后返回一个Ack。如果此时Leader失效,数据就会丢失。Kafka如何保证消费数据的一致性Kafka如何保证消费数据的一致性?GuaranteedbyHW:LEO:指每个Follower的最大Offset。HW(highwatermark):指消费者能看到的最大Offset,LSR队列中最小的LEO,也就是说消费者只能看到1~6的数据,不能看到和消费后面的数据.防止leader挂掉,比如当前consumer消费完8的数据后,leader挂掉了。此时比如f2成为leader,f2没有9的数据,那么消费者就会报错,所以设计了HW参数,只暴露给消费者最少的数据,避免了上面的问题。HW保证数据存储的一致性:①Follower失效:一个Follower失效后,会被暂时踢出LSR。Follower恢复后,Follower会读取本地磁盘上记录的最后一个HW,并更新日志文件到比HW更高级别的partition部分,HW开始与Leader同步。当Follower的LEO大于等于Partition的HW,即Follower追上Leader后,可以重新加入LSR。②Leader失效:Leader失效后,会从ISR中选出新的Leader。之后,为了保证多副本之间的数据一致性,其余的follower会先把自己日志文件中高于HW的部分剪掉(新的Leader不会截断),然后从新的同步数据领导者。注意:这是为了保证多副本间数据存储的一致性,不保证数据不会丢失或重复。Exactlyonce(幂等),确保不重复数据:如果Ack设置为-1,可以保证不丢失数据,但会发生数据重复(至少一次)。如果Ack设置为0,可以保证数据不重复,但不能保证数据不丢失(最多一次)。但是,如果你可以两者兼得呢?这时候引入了ExactOnce。0.11版本后引入幂等性,解决Kakfa集群内部数据重复问题。在0.11版本之前,消费者自己处理。如果开启幂等性,Ack默认为-1,Kafka会为每个生产者分配一个Pid,并为每条消息分配一个Seqnumber。如果Pid、Partition、Seqnumber相同,Kafka认为是重复数据,不会存盘。但是,如果生产者挂了,同样会出现数据重复;因此,幂等性解决了单个session的单个partition内的数据重复问题,但无法解决partitions之间或者跨session的数据重复问题。Kafkaconsumer①消费方式消费消息队列中的消息有两种方式,Push(微信公众号)和Pull(kafka)。Push模式很难适应不同消费速率的消费者,因为消费发送速率是由Broker决定的,他的目标是以最快的速度传递消息。但这很容易导致消费者来不及处理消息,典型的表现是拒绝服务和网络拥塞。Pull方法可以根据消费者的消费能力,以合适的速率消费消息。Pull模式的缺点是如果Kafka没有数据,消费者可能会陷入死循环,一直返回空数据。为此,Kafka消费者在消费数据时会传回一个Timeout参数。如果当时没有可供消费的数据,消费者会等待一段时间再返回。②Partition分配策略一个consumergroup有多个consumer,一个Topic有多个Partition。所以,必然会涉及到Partition的分配,即决定哪个Partition被哪个consumer消费。Kafka提供了两种方法,一种是轮询(RountRobin)对Topic组生效,另一种是(Range)对单个Topic生效。轮询:前提是一个consumer中的consumer必须订阅同一个topic。否则会有问题;非默认方式。同一个消费组中的消费者不能同时消费同一个分区,比如3个消费者消费一个topic的9个分区。如果一个消费组中有2个消费者,那么这个消费组中同时消费了2个topic,每个topic有3个partition。首先将2个Topic作为一个topic,然后根据Topic和Partition做Hash,再根据Hash排序。然后将轮询分配给一个消费者组中的2个消费者。如果是通过下面的方式订阅呢?比如有3个Topic,每个Topic有3个Partition,一个消费者组有2个消费者。Consumer1订阅了Topic1和Topic2,Consumer2订阅了Topic2和Topic3。在这样的场景下,通过轮询的方式订阅Topic就会出现问题。如果您通过以下方式订阅呢?比如有2个Topic,每个Topic有3个Partition,一个consumergroup有2个consumer。消费者1订阅了Topic1,消费者2订阅了Topic2。这样就使用了轮询。订阅Topic也会有问题。所以我们一直强调,使用轮询订阅Topic的前提是一个消费组中的所有消费者都订阅了同一个topic;所以轮询方式不是Kafka的默认方式;Range按照单个Topic划分,默认分配方式。Range的问题将是消费者数据不平衡的问题。比如下面的例子,如果一个消费者组订阅了2个topic,那么消费者1会消费4个partition,而另一个消费者只会消费2个partition。什么时候触发分区策略?当消费者组中的消费者数量发生变化时,就会触发分区策略调整,比如增加消费者到消费者或者减少消费者。③Offset的维护由于消费者在消费过程中可能会遇到断电、宕机等故障,消费者恢复后需要从故障前的位置继续消费,所以消费者需要记录自己消费的是哪个Offset,这样才能继续消费故障后恢复消耗。Offset保存的位置有2个,一个是ZooKeeper,一个是Kafka。首先看将Offset保存到ZooKeeper中。唯一的Offset是由消费组、Topic、Partition这三个要素决定的。所以在消费组中的一个消费者挂掉之后,或者这个消费者仍然可以拿到Offset。Controller节点与ZooKeeper通信,同步数据。这个节点就是谁先起来的,谁先注册到Controller,谁就是Controller。其他节点和控制器信息保持同步。④消费组案例修改消费组id:启动一个消费者发送3条数据:指定消费组启动消费者,启动三个消费者,可以看到每个消费者都消费了一条数据。在演示中,不同的组可以消费同一个主题,我们看到两个消费者的消费者消费的是同一条数据。再启动一个consumer,这个consumer属于另一个consumergroup。Kafka高效的读写机制分布式部署多节点并行操作。顺序写入磁盘Kafka的producer生产数据,写入日志文件,写入过程中追加到文件末尾,顺序写入,如官网所示。同一张盘,顺序写入可以达到600M/S,而随机写入只有100K/S。这与磁盘的机械结构有关。顺序写之所以快,是因为它节省了大量的磁头寻址时间。一般情况下,零拷贝技术是先将数据读入内核空间,再从内核空间读取数据到用户空间,再将操作系统的IO接口写入内核空间,最后写入到硬盘。Kafka是通过在内核空间直接传输IO流来实现的,所以Kafka的性能是非常高的。ZooKeeper在Kafka中的作用。在Kafka集群中,会选举一个Broker作为Controller,负责管理集群Broker的上下线,所有Topic分区副本的分发,以及Leader的选举。