当前位置: 首页 > 科技观察

你确定你不懂Redis的跳表?

时间:2023-03-14 16:33:53 科技观察

本文转载自微信公众号“学Java的小姐姐”,作者0618学Java的。转载本文请联系正在学习Java的小姐姐公众号。前言大家好,这几周我们看了Redis的底层数据结构,比如动态字符串SDS、双向链表Adlist、字典Dict。如果对Redis的常用类型或者底层数据结构不了解,请看上面的传送门。.今天我们就来看看ZSET的底层架构。不知道ZSET是什么的可以看上面传送门第一篇。简单来说,ZSET是Redis提供的一种数据结构,用于根据数据和分数来判断其排名。最常见的是微信体育排行榜。每个用户对应自己的步数,每晚都可以给出用户的排名。可能有朋友会想,如果是为了实现排序,各种排序方式都可以实现,Redis的ZSET结构就不用介绍了吧?当然,如果使用排序的方式,也可以实现同样的功能,但是需要对代码进行Hardcoding,会增加工作量,而且还会提供代码bug,哈哈哈。而且Redis底层是用C实现的,直接内存操作的速度相比Java方式会有提升。综上所述,使用Redis的ZSET结构有很多好处。话不多说,让我们开始吧。在正式开始之前,我们需要引入下跳表的概念,它是ZSET结构的底层实现。下面可能有点无聊,我尽量保持简单。(一定要看,这个写起来太累了)什么是跳表?对于数据量很大的链表结构,插入和删除速度比较快,但是查询速度很慢。那是因为不能直接获取某个节点,需要从头节点开始,使用某个节点的next指针获取下一个节点。即使数据是有序排列的,如果要查询某条数据,也只能从头到尾遍历变量,查询效率会很低,时间复杂度为O(n).如果我们需要快速查询链表怎么办?有同学说用数组存储,但是数据结构不改变怎么办?我们可以先想一下有序数组结构中的二分法,每次把范围缩小一半,这样查询速度就提高了很多,那么这个思路可以用在链表中吗?这就把我们带到了今天的主角——跳表。(引出概念有点生硬#128522;)Step1新建一个有序的单项链表先看下图中的有序单向链表,里面存储了7个元素1,2,3,4、5、6、7。Step2提取二级索引节点我们可以提取链表中的一些节点。下图抽取了1、3、5、7四个节点,即每两个节点向上抽取一个节点,抽取的称为索引。注意,不可能每次都画得那么完美。这实际上与抛硬币相同。每枚硬币正反面出现的概率相同,均为1/2。当数据量较小时,正反概率可能相差很大。但是随着数据量的增加,优劣的概率越来越接近1/2。以此类推,就是说每个节点都有相同的机会,要么留在原层,要么提取到上层,概率都是1/2。但是随着节点数量的增加,提取的节点越来越接近1/2。Step3提取三级索引节点我们可以提取链表中的一些节点。下图抽取了1和5两个节点,即向上层每两个节点抽取一个节点,抽取的称为索引。step四类二分查询我们假设要找一个值为6的节点,从三级索引开始,找一个值为1的节点,发现小于5,找到一个值为5的节点根据值为1的节点的next指针,5之后没有其他三级索引。于是往下找二级索引,根据值为7的节点取值为5的结点的next指针,发现小于6,说明要找的结点6在这个范围内。然后在一级索引位置,根据值为5的节点指向值为6的节点的next指针,发现是要查询的数据,于是查询过程结束.根据上面的查询过程(下图中的蓝线),我们发现它采用的核心思想是二分法,不断缩小查询范围。如果在上层索引中找到区间,则向下一层寻找真正的数据。总结从上面的整个过程可以看出,当数据量较小时,这种以空间换时间、消耗内存的方式并不是最优方案。所以Redis的zset结构在数据量小的时候使用压缩表(先放这里,下一部分设置一个flag),数据量大的时候使用跳表。像这种链表加上多级索引的结构就是跳表。这个名字的图片,过程是跳转查询的。旋转跳跃,闭上眼睛,bgm响起。Redis中的跳表示意图下图是对跳表的简单改进和重新封装。首先介绍header的概念。这与双向链表和字典结构是一样的。它封装了数据,因为它们都被使用了。指针,指针在计算长度和获取最后一个节点的数据时,必然会导致查询性能慢的问题。所以封装表头的目的就是为了提高解决这些问题的速度。浪费的是增删改查等操作的时间。相比之下,它可以忽略不计。二是引入层数组管理所有节点。我们可以看到有32层,也就是32个数组,跟后面的数据节点结构是一样的。引入是为了方便直接根据这个数组的层数定位各个元素。二是数据节点的每一层都有level和span(也就是下图中箭头指针上的数字,是为了方便统计两个节点之间距离的长度)。最后是数据节点的反向指针。介绍的目的是Level数组只有前指针,即只能指向下一个节点地址,后指针是找后面的节点。上图主要分为3大块:(这里可以粗略的看一下,下面会详细解释各个模块的代码)header主要包括四个属性,分别是headerpointer,tailpointer,nodelengthlength,all节点的最高级别。header:指向跳转表的表头节点。通过这个指针地址可以直接找到表头,时间复杂度为O(1)。tail:指向跳转表的尾节点。通过这个指针可以直接找到尾部,时间复杂度为O(1)。length:记录跳表的长度,即不包括表头节点,整个跳表有多少个元素。level:记录当前跳表中所有节点(不包括header节点)的层数最多的level。管理所有节点层级的数组对象值为空,层级数组为32层。目的是管理真实的数据节点。关于具体层级的哪些属性放在数据节点中。数据节点主要包括四个属性对象值obj、分数score、向后指针backward和level数组。每个数据的Level数组中的层数是随机生成的,和上面说的跳表一样。成员对象obj:真实的实际数据,每个节点的数据都是唯一的,但节点的分数可能相同。得分相同的两个节点根据字典中成员对象的大小排序,成员对象较小的节点排在前面,成员对象较大的节点排在后面。Scorescore:每个节点中的数字为该节点保存的分数。在跳表中,节点按照每个节点保存的分数升序排列。向后指针backward:用于从表尾遍历到表头。每个节点只有一个后退指针,即一次只能后退一步。层级:节点的每一层都标有1、2、3等字样,L1代表第一层,L2代表第二层,L3代表第三层,以此类推。skiptableHeader结构的定义zskiplisttypedefstructzskiplist{//头指针header和尾指针tailstructzskiplistNode*header,*tail;//总共有多少个节点lengthunsignedlonglength;//所有节点中最大的levellevelintlevel;}zskiplist;具体数据NodezskiplistNode//跳表的具体节点typedefstructzskiplistNode{sdsele;//具体数据,对应张三doublescore;//分数,对应70structzskiplistNode*backward;//后退指针backward//层次数组structzskiplistLevel{structzskiplistNode*forward;//前向指针forwardunsignedintspan;//spanspan}level[];}zskiplistNode;skiplist的实现(源码分析)redis关于skiplist的API定义在t_zset.c文件中。看到源码分析不要跑路,一定要看。创建跳转表创建一个空的跳转表,其实就是创建一个表头,管理所有节点的层级数组。首先,定义一些变量并尝试分配内存空间。二是初始化header的level和length,分别赋值1和0。然后通过调用zslCreateNode函数创建一个管理所有节点Levels的数组,入参为数组大小宏常量ZSKIPLIST_MAXLEVEL(32),得分为0,对象值为NULL。(这是实现跳表的关键点)。下一步是初始化这个数组的每个元素的前指针forword和span。最后初始化尾指针并返回值。可以参考下面的图示和源码://创建一个表头为空的跳转表zskiplist*zslCreate(void){intj;zskiplist*zsl;//尝试分配内存空间zsl=zmalloc(sizeof(*zsl));//初始化level和lengthzsl->level=1;zsl->length=0;//调用下面的方法zslCreateNode,传入的参数有数组长度ZSKIPLIST_MAXLEVEL32//得分为0,对象值为NuLL//这一步是为了createandmanageallnodesarray//并将header的头指针设置为这个对象的地址zsl->header=zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//为这32个数组分配前指针forward和span(j=0;jheader->level[j].forward=NULL;zsl->header->level[j].span=0;}//设置尾指针zsl->header->backward=NULL;zsl->tail=NULL;//返回对象returnzsl;}zskiplistNode*zslCreateNode(intlevel,doublescore,sdsele){zskiplistNode*zn=zmalloc(sizeof(*zn)+level*sizeof(structzskiplistLevel));zn->score=score;zn->ele=ele;returnzn;}插入一个node,比如下图中有6个元素,你需要插入一个值为赵六,得分为101的元素,大家想一想。大致步骤包括找到要插入的元素位置,新建一个数据节点,然后调整与之相关的头尾指针的level数组。然后看看redis是干什么的。是不是和我们想的不一样?答案揭晓。当然大框架是一样的。正文开始:(先上图)1.遍历管理所有节点的level数组,从最大的level开始,也就是3,逐个比较值,如果有得分更高的值或者同样的分数,但是数据的值比他大,记录到数组中,同时记录跨度。那太抽象了。以上图为例,从header中的level3开始,先到张三的L3,找到分数70,小于目标分数101跳过,根据前面的指针找到赵六的L3,找到比目标分数101高的分数102。目标分数为101,待更新数组update中记录赵六L3,记录跨度为4。然后进入下一层,张三的L2层,发现70的分数小于101的目标分数,略过,根据前面的指针找到王舞的L2,找到90的分数,小于101的目标分数,略过,根据之前的指针找到赵六,发现分数102大于目标分数101,将赵六的L2记录在待更新数组update中,记录跨度为2。最后,到下一层,张三的L1层,逻辑和之前一样,赵六的L1层和span也记为1。2.为新节点随机生成层数level(按位运算),如果生成的层数大于当前最大层数3,则更大的部分将被一一遍历,span等信息会记录在上面的更新表中。比如新节点生成的层级是5,当前最大层级是3,说明只会有一个节点,并且跨越了之前所有的节点,那么我们就从第四层和第五层开始遍历,记录到要更新的数组update中。3、所有的准备工作做好后,我们已经找到了要插入节点的位置,在哪一层,每一层对应的span是多少。接下来,我们将添加一个新的数据节点。将前两步的信息添加到新节点中,并调整前后指针。4.最后是一些收尾工作,比如修改表头的层级、节点大小长度和尾指针tail等属性。综上,整个过程结束。可能看起来有点复杂,可以参考下面的代码。//插入一个节点,入参为//zsl:表头//score:插入元素的得分//ele:插入元素的具体数据elezskiplistNode*zslInsert(zskiplist*zsl,doublescore,sdsele){//使用updatearray记录每层待插入的前一个元素zskiplistNode*update[ZSKIPLIST_MAXLEVEL],*x;//记录前一个节点与第一个节点的跨度,即元素在列表中的rank-1unsignedintrank[ZSKIPLIST_MAXLEVEL];inti,level;serverAssert(!isnan(score));x=zsl->header;//从最大层开始遍历,从上到下,找到每一层要插入的位置for(i=zsl->level-1;i>=0;i--){/*storerankthatiscrossedtoreachtheinsertposition*/rank[i]=i==(zsl->level-1)?0:rank[i+1];//找到先直接score大于元素的位置//或者score与元素相同但是对象的ASSICC码大于元素的位置while(x->level[i].forward&&(x->level[i].forward->scorelevel[i].forward->score==score&&sdscmp(x->level[i].forward->ele,ele)<0))){//跨越通过的元素Count元素得到的排名列表中的元素,或者搜索到的路径长度rank[i]+=x->level[i].span;x=x->level[i].forward;}//记录要插入的位置update[i]=x;}//随机生成一个层数,在1到32之间,层数越高,生成的概率越低level=zslRandomLevel();//如果生成的层数大于现有最高层数层数,超过层数需要初始化if(level>zsl->level){//开始循环for(i=zsl->level;i<level;i++){rank[i]=0;//这个元素是这些层的第一个节点,前一个节点是headerupdate[i]=zsl->header;//初始化后,每一层都有两个元素,一步是跨越所有元素update[i]->level[i].span=zsl->length;}zsl->level=level;}//创建节点x=zslCreateNode(level,score,ele);for(i=0;ilevel[i].forward=update[i]->level[i].forward;update[i]->level[i].forward=x;//rank[0]为第0层前节点P1(即最底层插入节点前面的节点)到第一个节点的跨度//rank[i]是第i层前面节点P2(本层插入节点前面的节点)到第一个节点之间的跨度//插入节点X之间的跨度f(X,Y)而后节点Y可以通过下面的公式计算//关键是f(P1,0)-f(P2,0)+1等于新节点到P2的跨度,因为跨度延伸向下到底层呈扇形。//记录节点span各层的交叉元素,减去层间元素的rank之和得到x->level[i].span=update[i]->level[i].span-(rank[0]-rank[i]);//插入位置之前节点的span加1(新节点在rank[0]之后的位置)update[i]->level[i].span=(rank[0]-rank[i])+1;}/*incrementspanforuntouchedlevels*/for(i=level;ilevel;i++){update[i]->level[i].span++;}//0级是双向链表,方便redis经常支持倒序查找x->backward=(update[0]==zsl->header)?NULL:update[0];if(x->level[0].forward)x->level[0].forward->backward=x;elsezsl->tail=x;zsl->length++;returnx;}intzslRandomLevel(void){intlevel=1;while((random()&0xFFFF)<(ZSKIPLIST_P*0xFFFF))level+=1;return(levelheader;//从header的level开始循环遍历,即最大值for(i=zsl->level-1;i>=0;i--){//如果分数小于目标分数,则排名加上其跨度//或分数相同,但具体数据小于目标数据,则排名还加上spanwhile(x->level[i].forward&&(x->level[i].forward->scorelevel[i].forward->score==score&&sdscmp(x->level[i].forward->ele,ele)<=0))){rank+=x->level[i].span;x=x->level[i].forward;}//保证rank为仅当第i层找到相同分数和相同对象值时才返回if(x->ele&&sdscmp(x->ele,ele)==0){returnrank;}}return0;}结论本文主要讲底稀土层dis的ZSET数据类型实现跳表。一、什么是跳表?,引出跳表的概念和数据结构,分析其主要组成部分,然后通过多张流程图说明Redis是如何设计跳表的,最后结合源码描述跳表,如创建过程,添加节点的过程,获取某个节点的排名过程,穿插实例和过程图。如果您觉得文笔还可以,请给我一个赞👍,您的认可是我写作的动力!如果大家觉得有什么不对的地方,欢迎评论指出。那就再见啦。原文链接:https://mp.weixin.qq.com/s/EDGrGabrdorgKUdaw6OK5A