当前位置: 首页 > 科技观察

你们都知道那些Kafka复制机制吗?

时间:2023-03-21 21:36:16 科技观察

日常开发过程中使用kafka来实现限流和削峰的效果,但是往往kafka会存储多份以防止数据丢失。你知道它的机制是怎样的吗?这篇文章就给大家解释下。1、Kafka集群Kafka使用Zookeeper来维护集群成员(broker)的信息。每个broker都有一个唯一的标识符broker.id,用于在集群中标识自己。可以在配置文件server.properties中配置,也可以由程序自动生成。下面是自动创建Kafkabrokers集群的过程:每个broker在启动时都会在Zookeeper的/brokers/ids路径下创建一个临时节点,并写入自己的broker.id将自己注册到集群中;当有多个broker时,所有broker会竞争在Zookeeper上创建/controller节点。由于Zookeeper上的节点不会重复,所以只会成功创建一个broker。此时,代理称为控制器代理。除了其他代理的功能外,它还负责管理主题分区及其副本的状态。当broker宕机或主动退出,导致其持有的Zookeeper会话超时时,会触发注册在Zookeeper上的watcher事件。这时Kafka会进行相应的容错处理;如果宕机的是controllerbroker,也会触发新的controller选举。2.副本机制为了保证高可用,Kafka分区有多个副本。如果一个副本丢失,还可以从其他副本中获取分区数据。但是这就要求replica对应的数据必须是完整的,这是Kafka数据一致性的基础,所以需要使用controllerbroker进行专门的管理。Kafka的复制机制将在下面详细说明。2.1分区和副本Kafka的topic分为多个分区,分区是Kafka最基本的存储单元。每个分区可以有多个副本(可以在创建主题时使用replication-factor参数指定)。其中一个副本是领导者副本(Leaderreplica),所有的事件都直接发送给领导者副本;其他副本为跟随者副本(Followerreplica),需要进行复制以保持与领导者副本的数据一致性。当leader副本不可用时,follower的A副本将成为新的boss。2.2ISR机制每个partition都有一个ISR(in-syncReplica)列表,用于维护所有同步可用的副本。leader副本必须是同步副本,而对于follower副本,需要满足以下条件才算同步副本:与Zookeeper有一个活跃的会话,即必须定期向Zookeeper发送心跳;在指定时间内以低延迟从领导者副本接收消息。如果replica不满足上述条件,则将其从ISR列表中移除,直到满足条件后才会再次加入。下面是一个创建topic的例子:使用--replication-factor指定replicafactor为3,创建成功后使用--describe命令。可以看到partition0,0,1,2有3个replicas,这3个replicas都在ISR列表中,1是leadercopy。2.3Incompleteleaderelection对于replica机制,在broker级别有一个可选的配置参数unclean.leader.election.enable。默认值为fasle,表示禁止不完整的leader选举。这是为了当leader副本挂掉,ISR中没有其他副本可用时,是否允许一个未完全同步的副本成为leader副本,可能导致数据丢失或数据不一致。在某些场景下(比如金融领域),这可能是不能容忍的,所以它的默认值为false。如果可以允许一些数据不一致,可以配置为true。2.4Min.insync.replicasISR机制的另一个相关参数是min.insync.replicas,可以在broker或topic级别进行配置,也就是说ISR列表中至少要有几个可用的副本。这里假设设置为2,那么当可用副本数小于这个值时,就认为整个分区不可用。此时,客户端向分区写入数据时,会抛出异常org.apache.kafka.common.errors.NotEnoughReplicasException:Messagesarerejectedastherelessinsyncreplicasthanrequired.2.5发送确认Kafka在生产者上有一个可选参数ack,它指定生产者认为消息写入成功之前必须有多少分区副本收到消息:acks=0:当消息发送出去时,就认为已经成功并且不会等待服务器的任何响应;acks=1:只要集群的leader节点收到消息,producer就会收到server的成功响应;acks=all:只有当所有参与复制的节点都是all生产者收到消息时,才会收到服务器的成功响应。3.数据请求3.1元数据请求机制在所有副本中,只有领导者副本可以读写消息。由于不同分区的leader副本可能在不同的broker上,如果一个broker收到一个分区请求,但是该分区的leader副本不在broker上,它会返回一个NotaLeaderforPartition消息给客户端。错误响应。为了解决这个问题,Kafka提供了元数据请求机制。首先,集群中的每个broker都会缓存所有topic的partitionreplica信息,client会周期性的发送metadata请求,然后缓存获取到的metadata。定时刷新元数据的时间间隔可以通过为客户端配置metadata.max.age.ms来指定。有了元数据信息,客户端就知道了leader副本所在的broker,然后直接向对应的broker发送读写请求。如果分区副本的选举发生在定时请求的时间间隔内,则意味着原来缓存的信息可能已经过时,此时可能会收到NotaLeaderforPartition的错误响应。这种情况下,客户端会再次请求元数据,然后刷新本地缓存,然后在正确的broker上进行相应的操作。流程如下图所示:3.2数据可见性需要注意的是,不是所有存储在partitionleader上的数据都可以被客户端读取,为了保证数据的一致性,只有所有同步副本保存的数据(所有ISR中的副本)可以被客户端读取。3.3零拷贝Kafka所有数据的写入和读取都是通过零拷贝实现的。传统拷贝和零拷贝的区别如下:传统模式下的四拷贝和四次上下文切换以通过网络发送磁盘文件为例。传统模式下,一般采用如下伪代码所示的方法,先将文件数据读入内存,然后通过Socket发送内存中的数据。buffer=File.readSocket.send(buffer)这个过程实际上涉及到四份数据。首先通过系统调用将文件数据读入内核态Buffer(DMA拷贝),然后应用程序将内存态Buffer数据读入用户态Buffer(CPU拷贝),然后用户程序发送数据通过Socket将用户态Buffer数据拷贝到内核态Buffer(CPU拷贝),最后通过DMA拷贝将数据拷贝到网卡Buffer。同时,它还伴随着四个上下文切换,如下图所示:sendfile和transfer实现零拷贝Linux2.4+内核通过sendfile系统调用提供了零拷贝。数据通过DMA拷贝到内核态Buffer后,直接通过DMA拷贝到网卡Buffer,不需要CPU拷贝。这也是零拷贝一词的来源。除了减少数据拷贝之外,因为整个文件的读取和发送到网络都是通过一次sendfile调用完成的,所以整个过程只有两次上下文切换,从而大大提高了性能。零拷贝过程如下图所示:从具体实现来看,Kafka的数据传输是通过TransportLayer完成的,其子类PlaintextTransportLayer的transferFrom方法通过调用FileChannel的transferTo方法实现零拷贝JavaNIO,如下图:@OverridepubliclongtransferFrom(FileChannelfileChannel,longposition,longcount)throwsIOException{returnfileChannel.transferTo(position,count,socketChannel);}注意:transferTo和transferFrom不保证可以使用零拷贝。其实能否使用零拷贝与操作系统有关。如果操作系统提供了sendfile这样的零拷贝系统调用,这两个方法就会通过这样的系统调用充分利用零拷贝的优势。否则,这两种方法都无法使用。实现零拷贝本身。4.物理存储4.1分区分配在创建主题时,Kafka会首先决定如何在broker之间分配分区副本。它遵循以下原则:在所有broker上均匀分布分区副本;确保分区的每个副本分布在上面的不同代理上;如果使用broker.rack参数指定broker的rack信息,那么每个partition的副本会尽可能的分配给不同rack的broker,避免一个rack不可用,一个rack不可用整个分区。基于以上原因,如果在单节点上创建3个副本的topic,通常会抛出以下异常:定期保留数据是Kafka的一个基本特性,但是Kafka不会永远保留数据,也不会删除消息直到所有的消费者都读过。取而代之的是,Kafka为每个主题配置一个数据保留期,指定数据在删除之前可以保留多长时间,或者在清理之前可以保留多少数据。对应以下四个参数:log.retention.bytes:删除数据前允许的最大数据量;默认值为-1,表示没有限制;log.retention.ms:保存数据文件的毫秒数,如果不设置,使用log.retention.minutes中的值,默认为null;log.retention.minutes:保留数据文件的分钟数,如果不设置,则使用log.retention.hours中的值,默认为null;log.retention.hours:数据文件保留的小时数,默认值为168,即一周。由于在大文件中查找和删除消息既费时又容易出错,因此Kafka将分区划分为多个分片,当前正在写入数据的分片称为活动分片。永远不会删除活动片段。如果你默认保留数据一周,每天使用一个新的段,你会看到最旧的段将被删除,同时每天使用一个新的段,所以大多数时候分区中会有7个段。4.3文件格式存储在磁盘上的数据格式通常与生产者发送的消息格式相同。如果生产者发送压缩消息,则同一批次的消息被压缩在一起,作为“包装消息”发送(格式如下所示),然后保存到磁盘。消费者阅读后自行解压打包的消息,获取每条消息的具体信息。总结本文阐述了Kafka的存储副本机制的原理以及数据是如何存储的。Kafka加入ack来防止数据丢失。这个ack可能会影响一些效率。这个ack的值可以根据场景设置。比如丢失一些数据没有问题,那么设置为0,我发消息我就忽略。我是来给大家提供大数据资讯的。需要的朋友可以到下面的GitHub上下载。相信自己,努力和汗水总会得到回报。我是大数据哥,下期见~~~本文转载自微信公众号“大数据哥”,可通过以下二维码关注。转载本文请联系大数据小哥公众号。