当前位置: 首页 > 科技观察

Broker的实现逻辑-kafka知识体系(三)

时间:2023-03-16 11:35:37 科技观察

上一篇分享了kafka生产端的逻辑,消息发送到缓存后sender线程将消息发送给broker,那么broker如何接收并持久化数据?让我们从Broker的网络设计开始。Broker网络设计Kafka的网络设计与Kafka的调优有关,这也是它能够支持高并发的原因。Kafka的网络三层架构首先将客户端的所有请求发送给一个Acceptor。代理中将有3个线程(默认为3)。这3个线程称为处理器,Acceptor不会对客户端的请求做任何事情。任何处理都直接封装到socketChannels中发送给这些处理器,形成一个队列。发送方式为轮询,即先发送给第一个处理器,再发送给第二个、第三个,再返回给第一个处理器。消费者线程在消费这些socketChannels的时候,会一个一个的得到一个request,而这些request会伴随着数据。线程池默认有8个线程。这些线程用于处理请求,解析请求,如果请求是写请求,则写入磁盘。如果读取,则返回结果。处理器会从响应中读取响应数据返回给客户端。这就是Kafka的网络三层架构。调优优势1所以如果我们需要对Kafka进行增强调优,增加处理器,增加线程池中的处理线程,是可以达到效果的。request和response这部分实际上起到了缓存的作用,考虑到处理器产生请求的速度太快,线程数不够及时处理。所以这是reactor网络线程模型的增强版。Broker数据存储设计【分区数据文件】我们知道topic是一个逻辑概念,partition是topic的物理分组。一个主题可以分为多个分区,每个分区都是一个有序队列。例如创建两个主题名称分别为report_push和launch_info,分区数为partitions=4。存储路径和目录规则为:xxx/message-folder|--report_push-0|--report_push-1|--report_push-2|--report_push-3|--launch_info-0|--launch_info-1|--launch_info-2|--launch_info-3分区物理上由多个段组成。[segment]日志各段大小相等,顺序读写。每个段数据文件以段中的最小偏移量命名,文件扩展名为.log日志回滚由log.segment.bytes控制,默认1G;在查找指定偏移量的Message时,使用二分查找(跳表)定位Message在哪个段数据文件中。在磁盘上,一个分区就是一个目录,每个段由一个索引文件和一个日志文件组成.如下:$treekafka|head-n6kafka-├ist-1│├---i-00000000003064504069.INDEX│├I-index│├ITEX│├ITEXI-----0000000000003064504069.LOG│├I-0000000000000033065011416.Index│├将包含消息体、偏移量、时间戳、密钥、大小、压缩编解码器、校验和、消息版本号等。磁盘上的数据格式与生产者发送给代理的数据格式完全相同,并且完全相同作为消费者接收的数据格式。由于磁盘格式与消费者和生产者的数据格式完全一致,这使得Kafka可以通过零拷贝技术提高传输效率。[segment]index索引文件是内存映射的(memorymapped)。索引文件,一种稀疏格式的索引,由参数log.index.interval.bytes控制,默认4KB。即不是每条数据都会写入索引。默认情况下,每写入4KB数据只会写入一个索引。Kafka为每个分段数据文件创建一个索引文件。文件名与数据文件名相同,但文件扩展名为.index。索引文件不会为数据文件中的每条消息创建索引。相反,使用稀疏存储方法为每个特定字节的数据创建一个索引。这样可以避免索引文件占用过多的空间,这样索引文件就可以一直保存在内存中。关于内存映射:即使顺序写入硬盘,硬盘的访问速度也赶不上内存。所以Kafka的数据并不是实时写入硬盘的。它充分利用现代操作系统的分页存储来使用内存来提高I/O效率。MemoryMappedFiles(以下简称mmap)也译为内存映射文件。它的工作原理是直接使用操作系统的Page将文件直接映射到物理内存。映射完成后,你对物理内存的操作会同步到硬盘(操作系统在适当的时候)。通过mmap,进程读写内存就像读写硬盘一样,不需要关心内存的大小,虚拟内存会帮我们搞定。mmap其实是Linux中用来实现内存映射的一个函数。在JavaNIO中,可以使用MappedByteBuffer来实现内存映射。【Kafka中通过offset查询消息内容的全过程】Kafka中有一个ConcurrentSkipListMap保存在每个log段中。offset-->concurrentSkipListMap-->找到baseOffset对应的log段-->读取索引文件.index-->找到不大于offset-baseoffset的最大索引项-->读取分段文件(.log)-->从日志段文件(.log)开始依次查找当前索引文件的文件名,即baseOffset的值。【日志保留策略】Kafka会定时检查是否删除旧消息,查看参数log.retention.check.interval.ms,默认5分钟。目前有3种日志保留策略:基于空间的:log.retention.bytes,默认禁用;基于时间:log.retention.hours(mintues/ms),默认7天;基于起始位移:由Kafka0.11.0.0版本引入,解决流处理场景下已处理的中间消息删除问题。目前最常用的是基于时间的日志保留策略。调整优势2是尽量保持client版本与Broker版本一致,即尽量保持client版本与Broker版本一致。不要小看版本之间的不一致性,它会导致Kafka失去很多性能优势,比如零拷贝。图中蓝色Producer、Consumer和Broker的版本是一样的,他们之间的通信可以享受ZeroCopy的快速通道;反之,如果一个低版本的Consumer程序想要与Producer和Broker进行交互,只能依靠JVM堆转移,失去了fastchannel,只能使用slowchannel。因此,在优化Broker层时,只要保持服务端和客户端的版本一致,就可以获得很大的性能提升。Broker复制机制Partition复制默认为1,见参数default.replication.factor。【复制功能(不提供读写分离)】1.实现冗余,提高消息可靠性2.实现高可用,参与leader选举,leader不可用时提高可用性。3、leader处理分区的所有读写请求;follower会被动的定期复制leader上的数据[leaderreplicaelection]1.controller负责2.选举机制或策略。所有副本(replicas)统称为AssignedReplicas。也就是说,AR副本同步队列(ISR)SR是AR的一个子集。leader维护ISR列表,follower从leader同步数据有一定延迟。凡是超过阈值的,都会将follower从ISR中移除,存入OSR(Outof-SyncReplicas)列表中,新加入的follower也会先存入OSR中。AR=ISR+OSR的基本策略是从AR中寻找第一个存活的副本,该副本在ISR中。3.Leader维护:leader有独立线程定期检测ISR中的follower是否脱离ISR,如果发现ISR发生变化,则返回新ISR的信息给相关节点动物园管理员。【副本机制的好处】总的来说,副本机制的好处:1.提供数据冗余。即使某些系统组件发生故障,系统仍可继续运行,从而提高整体可用性和数据持久性。2.提供高可扩展性。支持水平扩展,可以通过增加机器来提高读性能,从而提高读操作的吞吐量。3.改进数据局部性。允许将数据放置在地理位置上靠近用户的位置,从而减少系统延迟。对于ApacheKafka来说,目前只能享受副本机制带来的第一个好处,就是提供数据冗余,实现高可用和高持久化。对于client用户,Kafka的follower副本没有作用。既不能像MySQL那样帮助leader复制“抗读”,也不能实现将部分副本放在靠近client的地方,提高数据局部性。性别。Brokerhighwatermark机制【概念】HW即highwatermark,是Kafka副本对象的一个??重要属性。partition的highwatermark用leader副本的highwatermark表示,也就是被follower副本同步后的位置。对于leader新写的消息,consumer不能立即消费。leader会等待ISR中的所有replicas同步完消息,然后更新HW。这个时候消息就可以被消费者消费了。[功能]定义消息可见性,只有低于高水位的分区消息才能被消费;帮助kafka完成副本同步,kafka是一种基于高水位的异步副本同步机制。【LEO的概念】是指日志结束偏移量(LogEndOffset),写入下一条消息的偏移量。综上所述,为什么MySQL的索引不用Kafka的索引机制呢?既然Kafka这么好这么快,为什么MySQL的索引不用Kafka的索引机制呢?我们不得不考虑另外一个问题:InnoDB维护索引的成本要比Kafka高。在Kafka中,ConcurrentSkipListMap只会在创建新的索引文件时更新,而不是每次写入数据时都更新。这一块的保养量基本可以忽略不计。当B+树中的数据插入、更新、删除时都需要更新索引,同时也会导致“分页”等相对耗时的操作。Kafka中的索引文件也是顺序追加文件的操作,工作量比B+树小很多。其实归根结底还是由不同的应用场景决定的。MySQL需要经常执行CRUD操作。CRUD是MySQL的主要工作内容,而为了支持这种操作,需要用到维护量很大的B+树来支持。Kafka中的消息一般都是顺序写入磁盘,然后顺序从磁盘读取(不深入讨论pagecache)。换句话说,检索查询只是Kafka的一个辅助功能,不需要花很多钱为这个功能维护一个高层索引。前面说过,Kafka中的这种方式是磁盘空间、内存空间和搜索时间的折衷。