当前位置: 首页 > 科技观察

浅谈分布式消息系统Kafka的设计原理

时间:2023-03-19 14:35:31 科技观察

一、Kafka简介Kafka是一个高吞吐、分布式、基于发布/订阅的消息系统,最初由LinkedIn开发,用Scala语言编写,目前是Apache的开源项目.与目前流行的RabbitMQ、RocketMQ等开源消息中间件相比,Kakfa具有高吞吐、低延迟的特点,广泛应用于大数据、日志采集等应用场景。本文主要简单介绍一下Kafka的设计原理。2、Kafka架构基本概念:broker:Kafka服务器,负责消息的存储和转发topic:消息类别,Kafka根据topic分区对消息进行分类:topic分区,一个topic可以包含多个partition,topic消息以offsets存储在on上eachpartition:消息在日志中的位置,可以理解为消息在分区上的偏移量,也是代表消息的唯一序号。Producer:消息生产者Consumer:消息消费者ConsumerGroup:消费者组,每个Consumer必须属于一个组Zookeeper:保存集群broker、主题、分区等元数据;此外,它还负责broker故障发现、partitionleader选举、负载均衡等功能。命名规则:-,比如一个名为test的topic有3个分区,那么Kafka数据目录下有3个目录:test-0、test-1、test-2,分别存放对应的Partition数据.分区数据文件分区中的每条消息都包含以下三个属性:last值唯一确定分区中的一条消息。可以认为offset是分区中Message的id;MessageSize表示消息内容数据的大小;data是Message的具体内容。分区的数据文件由上述格式的Message组成,按照从小到大的偏移量排列在一起。如果一个分区只有一个数据文件:在文件末尾添加新数据,无论文件数据文件有多大,这个操作总是O(1)。按顺序搜索特定偏移量的消息。因此,如果数据文件很大,则搜索效率很低。Kafka通过分段和索引来提高查找效率。数据文件分段segmentpartition在物理上由多个段文件组成,每个段大小相等,顺序读写。每个段数据文件以段中最小的偏移量命名,文件扩展名为.log。这样,在查找指定偏移量的Message时,可以使用二分查找的方式定位到Message所在的segment数据文件。数据文件索引数据文件分段使得可以在更小的数据文件中找到偏移量对应的Message,但这仍然需要顺序扫描才能找到偏移量对应的Message。为了进一步提高查找效率,Kafka为每一个被切分的数据文件创建了一个索引文件。文件名与数据文件名相同,但文件扩展名为.index。索引文件包含若干个索引条目,每个条目代表一条消息在数据文件中的索引。索引由两部分组成,相对偏移量和位置。相对偏移量:数据文件被切分后,每个数据文件的起始偏移量都不为0,相对偏移量表示该Message相对于所属数据文件中最小偏移量的大小。例如,如果一个分段数据文件的偏移量从20开始,那么偏移量为25的Message在索引文件中的相对偏移量为25-20=5。存储相对偏移量可以减少索引文件占用的空间。position,表示消息在数据文件中的绝对位置。只要打开文件,将文件指针移动到这个位置,就可以读取相应的Message。索引文件并不是为数据文件中的每条消息建立索引,而是采用稀疏存储的方式,每隔一定的数据字节建立一个索引。这样可以避免索引文件占用过多的空间,从而可以将索引文件保存在内存中。但是缺点是没有索引的消息不能一次定位到数据文件中,所以需要顺序扫描,但是这种顺序扫描的范围很小。总结搜索某个偏移量的消息,首先通过二分法找到消息所在的段文件(因为每个段都是以文件中消息偏移量的最小值命名的);然后,加载对应的.index索引文件到内存中,同样的二分法找到小于等于给定偏移量的偏移量记录(相对偏移量,位置)***;***,根据到.log文件的位置,依次找出offset等于给定offset值的消息。由于消息在分区的segment数据文件中是顺序读写的,消息消费后不会被删除(删除策略是针对过期的segment文件),这种顺序磁盘IO存储设计是Kafka高的重要原因表现。3.2Producer设计负载均衡:由于消息topic由多个partition组成,partition会平均分配给不同的broker,为了有效利用broker集群的性能,提高消息的吞吐量,producer可以使用random或哈希方法,将消息平均发送到多个分区,实现负载均衡。批量发送:是提高消息吞吐量的重要方式。Producer端在内存中合并多条消息后,可以一次请求发送一批消息给broker,从而大大减少broker存储消息的IO操作次数。但也在一定程度上影响了消息的实时性,相当于以延迟为代价换取更好的吞吐量。3.3消费者设计任何消费者都必须属于一个消费者组。同一个ConsumerGroup中的多个Consumer实例不会同时消费同一个分区,相当于队列模式。如图所示,ConsumerGroup1的三个Consumer实例消费来自不同分区的消息,分别是TopicA-part0、TopicA-part1和TopicA-part2。不同ConsumerGroup的Consumer实例可以同时消费同一个partition,相当于发布-订阅模式。如图所示,ConsumerGroup1的Consumer1和ConsumerGroup2的Consumer4同时消费了TopicA-part0的消息。分区中的消息是有序的,Consumer通过pull来消费消息。Kafka不删除消费的消息队列模式。队列模式意味着每条消息只会被一个Consumer消费。Kafka保证同一个ConsumerGroup中只有一个Consumer会消费一条消息。在ConsumerGroup稳定状态下,每个Consumer实例只会消费一个或多个特定分区的数据,而某个分区的数据只会被特定的Consumer实例消费,也就是Kafka对消息的分发是分配在分区单元,而不是每条消息;在同一个ConsumerGroup中,如果Consumer实例的数量小于partition的数量,则至少有一个Consumer会消费多个partition的数据;如果Consumer个数和partition个数相同,那么恰好有一个Consumer消费一个partition的数据;如果Consumer的数量大于partition的数量,则部分Consumers不能消费该Topic下的任何消息;设计的好处是:每个Consumer不需要跟大量的broker通信,减少通信开销,降低分配难度,实现更简单;可以保证每个分区中的数据可以被消费者有序的消费。这种设计的缺点是不能保证同一个消费者组中的消费者均匀消费数据,当消费者实例数多于分区数时,部分消费者会饿死。如果partitions或者consumer有增减,为了保证消费均衡,需要进行ConsumerRebalance。分配算法如下:Broker-to-Consumer设计原则:对于每个ConsumerGroup,选举一个Broker作为Coordinator(0.9以上版本),它监视Zookeeper,从而监控判断是否有增加或增加。partition或者Consumer的减少,然后生成一个Rebalance命令,按照上面的算法重新分配。第一次初始化ConsumerGroup时,Consumer通常会读取每个partition最早或最新的offset(Zookeeper记录),然后依次读取每个partitionlog的消息。在Consumer读取过程中,它会提交成功处理的消息的Offsets(由Zookeeper记录)。当一个partition被重新分配给ConsumerGroup中的其他Consumer时,新Consumer消费的初始位置会被设置为最近提交的offset(原Consumer)。如图所示,LastCommitedOffset是指Consumer上次提交的消费记录偏移量,CurrentPosition是当前消费位置,HighWatermark是所有复制节点成功复制到日志的最新消息的偏移量(该分区的所有ISR节点,如下所述),LogEndOffset是写入日志的最后一条消息的offset+1。从消费者的角度来看,最多只能读取到Highwatermark的位置,后续的消息对消费者来说是不可见的,因为复制不完整的数据还没有可靠的存储,可能会丢失。发布订阅模式发布订阅模式,也称为广播模式,Kafka保证一个topic的每条消息都会被所有的ConsumerGroup消费,对于同一个ConsumerGroup,仍然保证只有一个Consumer实例消费这个信息。3.4复制设计作为消息中间件,数据的可靠性和系统的可用性必须依赖于数据副本的设计。Kafka的副本副本单元是主题的分区。一个partition中的replica数量不能超过broker的数量,因为一个broker最多只能存储一个partition的副本。所有的消息生产和消费请求都由分区的leader副本处理,其他follower副本负责从leader复制数据进行备份。副本均匀分布到整个集群。Replicas的算法如下:对所有Brokers(假设一共有n个Brokers)和要分配的Partitions进行排序。将第i个分区分配给第(imodn)个Broker。第j个Replica分配给((i+j)moden)Broker如图,TopicA有3个partition:part0,part1,part2,每个partition的replica数量等于2(一个是leader,一个是follower)),根据上面的算法,会平均落在三个broker身上。Broker管理副本:选举一个broker作为控制器,监视Zookeeper,负责分区副本的集群分配,以及leader的切换和选举过程。In-Sync-Replica(ISR)分布式系统在处理节点故障时,需要提前定义节点的“失效”和“存活”。对于Kafka节点来说,判断“存活”的条件有两个:节点必须与Zookeeper保持心跳连接。如果该节点是follower,则必须从leader节点复制数据进行备份,备份数据不能落后leader太远。许多。Kafka将满足上述条件的副本节点视为“同步”(insync),称为In-Sync-Replica(ISR)。Kafka的Zookeeper维护着每个分区的ISR信息。理想情况下,ISR包含分区的所有副本的代理节点信息。当某些节点不满足上述条件时,ISR可能只包含部分副本。比如上图中TopicA-part0的ISR列表可能是[broker1,broker2,broker3],也可能是[broker1,broker3]和[broker1]。数据可靠性Kafka是如何保证数据可靠性的?首先,Producer产生一条消息,被认为是“committed”(即broker认为消息已经可靠存储):当所有ISRreplicas返回ack告诉leader消息已经成功写入日志中,leader认为消息已提交,并告诉Producer生产成功。这与上面“活着”条件的第二点并不矛盾,因为leader有超时机制,leader和其他ISRfollower复制数据。如果在一定时间内没有返回ack(可能是数据拷贝进度太落后),leader会将follower副本从ISR中移除。消息提交后,Consumer就可以消费了。ISR机制下的数据复制既不是完全同步也不是纯异步,这是Kafka实现高吞吐量的重要机制。同步复制要求在该消息被视为已提交之前复制所有工作的跟随者。这种复制方法极大地影响了吞吐量。在异步复制模式下,follower异步从leader复制数据。只要数据被leader写入日志,就被认为是committed。丢失数据。但是Kafka使用ISR的方式很好的平衡了数据不丢失和吞吐量。followers可以批量从leader复制数据,复制到内存时返回ack,大大提高了复制性能。当然,数据仍然存在丢失的风险。Kafka本身定位为高性能的MQ,更注重消息的吞吐量。在此基础上结合ISR机制,尽可能保证消息的可靠性,但不是绝对可靠。服务可用性所有Kafka消息发送和接收请求都由领导节点处理。根据上述数据可靠性设计,当ISR的follower副本发生故障时,leader会及时将其从ISR列表中移除,不会影响服务可用性。领导者失败后会发生什么?如何选举新的领导人?Leader选举Kafka将分区的ISR信息存储在Zookeeper中,并可以动态调整ISR列表的成员。只有ISR中的memberreplica会被选为leader,所有ISRmemberreplica都有可能成为leader;leader节点宕机后,Zookeeper可以监控发现,broker的controller节点会从ISR中选举出新的leader,并通知ISR中的所有broker节点。由此可见,只要ISR中至少有一个replica,Kafka就可以保证服务的可用性(但不能保证网络分区下的可用性)。容灾和数据一致性分布式系统的容灾能力与它出于数据一致性考虑而选择的算法有关,例如Zookeeper的Zab算法、raft算法等。Kafka的ISR机制与这些MajorityVote算法的比较如下:如下:ISR机制可以容忍更多的节点故障。如果有2f+1个副本节点,每个partition最多可以容忍2f次故障而不丢失消息数据;然而,与多数投票选举算法相比,它最多只能容忍f次失败。在消息提交持久化方面,ISR需要等待2f个节点返回ack,而MajorityVote只需要等待f+1个节点返回ack,不依赖最慢的follower节点,所以MajorityVote更有优势ISR机制,可以保存更多的副本节点。例如,要保证f个节点可用,ISR至少需要f个节点,MajorityVote至少需要2f+1个节点。如果所有副本都宕机了,有两种方法可以恢复服务:等待ISR中的任何一个节点恢复并选举它为领导者;选择第一个恢复的节点(不一定是ISR中的节点)作为leader***第一种方式,消息不会丢失(只能说这种方式极有可能不丢失),而在第二种方法,消息可能会丢失,但可以尽快恢复服务。这是可用性和一致性场景的两个考虑因素。Kafka默认选择第二种,用户也可以自行配置。大多数考虑CP的分布式系统(假设2f+1个节点),为了保证数据的一致性,最多只能容忍f个节点的故障,而Kafka为了兼顾可用性,最多允许2f??个节点故障,所以有不保证数据是强一致的。如图,一开始ISR的个数等于3,数据正常同步。红色部分开始。leader发现另外两个follower复制速度太慢或其他原因(网络分区、节点故障等),将其从ISR中移除后,由leader单节点存储数据;然后leader宕机,触发第二个节点重选为leader,重新同步数据,但是红色部分的数据在新的leader上不可用;***原leader节点恢复服务后,从新的leader重启leader上复制数据,红色部分的数据不再可供消费。因此,为了降低数据丢失的概率,可以设置Kafka的ISR的最小副本数。如果低于这个值,则直接返回unavailable。当然,这是牺牲一定可用性和吞吐量的前提。重复消息消息传输的三种方式:最多一次:消息可能丢失,但不会重复传输至少一次:消息不会丢失,但可能重复传输恰好一次:消息是保证只传输一次Kafka实现采用第二种方式,即可能存在重复消息,业务需要保证消息的幂等处理。3.5高吞吐设计针对分区,顺序读写磁盘数据,提供时间复杂度O(1)的消息持久化能力。Producer批量向broker写入数据。Consumer批量从broker拉取数据。日志被压缩并分成多个分区以提高并发代理零复制(ZeroCopy)。使用sendfile系统调用,数据直接从pagecache发送到socket。Producer可以配置为等待消息提交。如果Producer产生了一条消息,它必须等待ISR存储它,然后每次返回它。延迟会很高,这会影响整体的消息吞吐量。为了解决这个问题,一方面可以配置Producer减少partition的副本数,比如ISRsize为1;另一方面,在不太注重消息可靠存储的场景下,Producer可以通过配置来选择是否等待消息提交,如下:消息吞吐量和持久性之间的用户。持久化级别越高,生产消息吞吐量越小。相反,持久化级别越低,吞吐量越高。3.6HA基本原理brokerHAbroker集群信息由Zookeeper维护,并选举出一个controller。所有partition的leader选举由controller决定,controller通过RPC直接通知需要响应leader变化的broker;控制器还负责添加和删除主题以及分区副本的重新分配。控制器在Zookeeper上注册一个watch。一旦一个broker宕机,它在Zookeeper中对应的临时节点会被自动删除,controller会为宕机broker上的所有partition重新分配一个新的leader;如果controller宕机,其他broker通过Zookeeper选出新的leader。控制器,然后将新的领导者重新分配给停机代理上的所有分区。分区HApartitionleader所在broker宕机。如前所述,brokercontroller会根据动态维护的ISR,从剩余的broker机器中选出一个ISR成员成为新的leader。如果ISR中至少有一个follower,可以保证提交的数据不会丢失;否则,如果选择任何副本作为领导者,则在这种情况下可能存在潜在的数据丢失;如果分区的所有副本都宕机了,就不能保证数据不丢失,有两种恢复方案,上面已经介绍过了。4、推广腾讯云将推出高性能消息队列服务Ckafka,全面兼容开源的KafkaAPI(0.9版本)。Ckafka服务器完全托管在腾讯云上。用户无需自行维护和构建。他们可以使用开源的KafkaAPI客户端访问实例,大大降低了用户使用Kafka的门槛。欢迎体验:)原文链接:https://cloud.tencent.com/community/article/369570【本文为专栏作者《腾讯云技术社区》原创稿件,转载请联系原作者获得授权】点此查看该作者更多好文