当前位置: 首页 > 科技观察

Kafka中改进的二分查找算法

时间:2023-03-12 21:27:00 科技观察

最近学习了一些Kafak源码,想和大家分享一下Kafak中改进的二分查找算法。二分查找是每个程序员都应该掌握的基本算法。Kafka如何改进二分查找并将其应用到自己的场景中,值得学习。由于Kafak将二分查找应用于索引查找场景,本文先简单介绍一下Kafka的日志结构和索引。在Kafak中,消息以日志的形式保存。每个日志实际上是一个包含多个日志段的文件夹。一个日志段是指具有相同文件名(起始偏移量)和4个日志段的消息日志文件。索引文件,如下图所示。消息附加在消息日志文件中,每条消息都有一个唯一的偏移量。查找消息时,索引文件用于查找。如果查询是基于偏移量的,那么会使用偏移量索引文件来定位消息的位置。为了便于讨论索引查询,下面将以位移索引为背景。位移索引的本质是一个字节数组,里面存放的是偏移量和对应的磁盘物理位置。这里的偏移量和磁盘的物理位置固定为4个字节,可以看作是每8个字节一个key。-值对,如下图所示:索引的结构清晰了,现在我们可以正式进入本文的主题,“二分查找”。给定索引项的数组和目标偏移量,可以写出如下代码:privatedefindexSlotRangeFor(idx:ByteBuffer,target:Long,searchEntity:IndexSearchEntity):(Int,Int)={//_entries表示索引项的个数//1、如果当前索引为空,直接返回(-1,-1)表示没有找到if(_entries==0)return(-1,-1)//2。确保搜索偏移量不小于当前最小偏移量Shiftif(compareIndexEntry(parseEntry(idx,0),target,searchEntity)>0)return(-1,0)//3。执行二进制搜索算法找到targetvarlo=0varhi=_entries-1while(lo0)hi=mid-1elseif(compareResult<0)lo=midelsereturn(mid,mid)}(lo,if(lo==_entries-1)-1elselo+1)}上面的代码使用普通的二分查找,让我们看看这种方式会存在什么问题。虽然每个索引项的大小为4B,但是操作系统访问内存时的最小单位是页,一般为4KB,即4096B,其中包含512个索引项。而要在索引中找出指定的偏移量,当操作系统访问内存时,就变成了找出指定偏移量所在的页。假设索引的大小有13页,如下图所示:由于Kafka读取消息,通常是读取最新的offset,所以要查询的页集中在最后,也就是第12页。我们结合上面的代码在查询最新的偏移量时,查看将访问哪些页面。根据二分查找,将依次访问第6、9、11、12页。随着Kafka收到的消息越来越多,索引文件也会增加到第13页,此时根据二分查找,会依次访问到第7、10、12、13页。可以看出,访问的页面与之前的页面完全不同。之前只有第12页时,Kafak在读取索引时会频繁访问第6、9、11、12页。由于Kafka使用了mmap来提高速度,即所有的读写操作都会经过操作系统的pagecache。因此第6、9、11和12页将缓存在页缓存中以避免磁盘加载。但是当增加到第13页时,需要访问第7、10、12、13页,而由于很长时间没有访问到第7页和第10页(现代操作系统使用LRU或其变体来管理页缓存),很可能已经不在pagecache中了,那么就会造成pagefault中断(线程阻塞等待从磁盘加载数据还没有缓存到pagecache中)。在Kafka官方的测试中,这种情况会造成几毫秒到1秒的延迟。针对以上情况,Kafka对二分查找进行了改进。由于一般读取的数据都集中在索引的末尾。然后将索引中最后的8192B(8KB)划分为“热区”,其余划分为“冷区”,分别进行二分查找。代码实现如下:privatedefindexSlotRangeFor(idx:ByteBuffer,target:Long,searchEntity:IndexSearchType):(Int,Int)={//1。如果当前索引为空,直接返回(-1,-1)表示没有找到if(_entries==0)return(-1,-1)//二分查找封装到一个方法defbinarySearch(begin:Int,end:Int):(Int,Int)={varlo=beginvarhi=endwhile(lo>>1valfound=parseEntry(idx,mid)valcompareResult=compareIndexEntry(found,target,searchEntity)if(compareResult>0)hi=mid-1elseif(compareResult<0)lo=midelsereturn(mid,mid)}(lo,if(lo==_entries-1)-1elselo+1)}/***2.确认热区中的第一个索引条目。_warmEntries就是所谓的分界线,目前固定为8192字节*对于OffsetIndex,_warmEntries=8192/8=1024,即第1024个索引条目*大部分查询都集中在索引条目的末尾,所以tail8192bytes设置为热区*如果查询目标在热区索引??项范围内,则直接查看热区,避免页面中断*/valfirstHotEntry=Math.max(0,_entries-1-_warmEntries)//3。判断目标偏移值是在热区Zone还是冷区if(compareIndexEntry(parseEntry(idx,firstHotEntry),target,searchEntity)<0){//如果在热区,搜索热区returnbinarySearch(firstHotEntry,_entries-1)}//4.确保要查找的位移值不能小于当前最小位移值if(compareIndexEntry(parseEntry(idx,0),target,searchEntity)>0)return(-1,0)//5.如果在冷区,搜索冷区binarySearch(0,firstHotEntry)}这样做的好处是在频繁查询tail的时候,tail的页面基本可以在pagecahce中,从而避免pagefault中断。让我们看一下前面的例子。由于每个页面最多包含512个索引条目,因此最后1024个索引条目所在的页面将被视为热区。那么当12页未满时,10、11、12会被判断为热点;当第12页刚满时,11和12会被判断为热点;未满时,11、12、13判断为热区。假设我们看最新的新闻,热区二分查找的情况是这样的:当第12页未满时,依次访问第11页和第12页,当第12页满时,访问页面的情况是相同的。当出现第13页时,依次访问第12页和第13页,长时间未访问的页面将不会出现,可以有效避免缺页中断。关于热区大小为什么设置为8192字节,官方的解释是这是一个合适的值:小到可以保证热区中的页面数小于等于3,那么在二分查找很可能在页面缓存中。也就是说,如果设置太大,热区中的页面可能不在页面缓存中。它足够大,8192字节。位移索引为1024个索引项,可以覆盖4MB的消息数据,足够大部分in-sync节点在热区查询。最后一句话总结:在Kafka索引中使用普通的二分查找会导致分页,造成延迟,结合大部分查询都集中在末尾的情况,通过将索引区域分为热区和冷区,进行搜索分别地,它会尽量保证热区中的页面在页面缓存中,避免缺页中断。