本文指的是源代码版本是
REDIS支持了5.0版的流数据类型,可用于保存消息数据,这可以帮助我们实现带有消息阅读和写入基本功能的消息队列,以及每日分布式程序通信。
在其中,为了节省内存空间,在流数据类型的基础数据结构中,使用两个数据结构来保存消息。ListPack是保存数据时保存内存的listpack。radix树是该数据结构的最大功能是它适合保存数据,从而实现了内存空间和支持范围查询的目标。
REDIS的每个数据结构的出现是有效解决某种类型的问题。Redis流好奇心的问题有什么问题?
例如,流之前存在哪些方案,例如和其他数据结构无法有效地解决实际问题?
下一个考虑是如何设计结构。
以上问题是设计更多数据结构以解决问题的想法,如下所示:
让我们看一下流如何实现上述功能。
此外,在这里消费者组的概念,作者还说它是基于Kafka消费者组的概念。详细信息,您可以查看作者的想法:streams:redis中的新的一般底料数据结构。
1. XADD插入数据
1)当您想简单地插入数据流时,您可以写下以下内容:
这是一个多/执行句子块,用于快速插入,该插入是通过指示ID自动生成的。当然,您还可以指定ID,该ID需要符合指定ID的最大ID,而不是当前流队列。
自动ID组件是一部分,即毫秒 +当前毫秒中的添加数
2)当您想避免由太多流消息引起的高内存职时,您可以写下:
您可能已经注意到,此处使用这些参数来限制流的消息数。如果超过限制,它将删除最早的数据。此用法类似于列表的操作。
3)如果您认为Maxlen更有效,则可以编写以下内容:
符号的使用是准确的限制,也就是说,没有比限制数量的大问题。因此,您无需检测到是否每次删除该消息,从而导致一些CPU费用。
2. Xrange用于范围查询数据
1)如果您知道ID范围可能是这样查询的
2)如果要查询流队列中的所有数据并控制输出号,则可以这样查询
其中,最小ID和最大ID
3)如果您只知道一个特定的ID,也可以进行类似的范围查询
3. Xread查询数据
1)如果您想阅读即将到来的新数据,则可以编写以下内容:
2)当然,您可能只想阅读现有数据,您可以写下以下内容:
1. XGroup
它用于创建,销毁和管理消费者群体;这里的消费者组类似于消费者群体的概念。组中消费者之间的消费者数据具有间组数据。不同之处在于,卡夫卡(Kafka)具有分区的概念,而redis流则没有。
当指定流不存在时,您可以指定自动创建流的子选择
2. XreadGroup
一旦消费者组的消费者阅读了消息队列中的消息队列,消费者组中的其他消费者再也无法阅读它。换句话说,在同一消费者组中,新闻是。
值得注意的是,使用消费者组的目的是允许该组中的多个消费者共享消息以收集消息。因此,每个消费者通常都会读取消息的一部分,因此读取负载在多个消费者中的多个消费者中。平衡分布之间。这与Kafka不同。卡夫卡(Kafka)具有分区的概念。消费者群体中的消费者负责消耗不同的分区数据,并且没有消费者消耗同一分区的情况。
3. Xpending是流的重要实现,它是REDIS实现消息队列的重要表现,以满足高可用性的高可用性。如果您使用列表或排序作为队列,那么您应该知道它们都需要处理故障恢复的场景。条目的条目列表等同于
此命令查询已绘制的消费者的信息,但可用于恢复数据,例如崩溃,而无需正常执行ACK。用途如下:
如果要返回PEL队列中的前10个数据,则可以编写以下内容:
为了确保消费者在失败或停机时间失败后仍然可以阅读未准备好的新闻,流将自动使用待处理的列表队列来备份消费者组中每个消费者的新闻,直到消费者使用Xack来使用Xack来使用XackThe流的订单“已处理消息”。
如果消费者未成功处理该消息,它将不会将XACK命令发送到流,并且该消息仍将保留。这次,消费者可以使用Xpending命令查看已读取的新闻,但未被确认以完成该新闻重新启动后处理。
1)REDIS流的结构主要由消息,生产者,消费者和消费者组组成。
流式消费者群体特征
底层的实现主要使用,树是一一引入的
前缀树是一个数据结构,在搜索字符串时通常会使用,可以在字符串集合中快速找到字符串。
1)前缀树展示示例图
因为树中的每个节点仅在字符串中存储一个字符,所以有时会导致空间浪费。RAX的出现是解决此问题。
对于仅有一个子节点的节点,它可以是一个节点,因此它可以适当地将树的水平和节点空间的数量降低多次;换句话说,它可以提高检索效率并节省更多内存。
2)RAX树显示示例图:
关于RAX结构,在上一篇文章中 - 深度分析中,您可以单击以查看
那么,流如何使用RAX提高检索效率并节省空间?
现在,新闻ID已保存,相应的消息在哪里存在?不用担心,请继续看?
REDIS源代码由ListPack解释为字符串服务格式的列表,即字符串列表的序列化格式,即序列化字符系列的存储。
ListPack也称为,它的特征是连续存储空间,可以紧凑地存储数据。同时,为了节省内存空间,ListPack列表使用多种编码方法来表示不同的长度数据。这些数据包括整数和字符串弦乐
1)Listpack结构图
ListPack由4个部分组成:总字节,数字,输入和结束。
简单理解ListPack是一种专门的数据结构,专门研究通过特定的编码方法编码和解码数据。这种结构天生存在以节省空间。
我相信您已经想到了,我们通过ListPack结构存储;我们上面提到新闻ID已存储在Rax树中。如果您知道RAX结构,则应该知道树中密钥节点的数据域,您可以在这里存储数据。同样,相应消息的指针在此处编写,指向listPack结构,这是流的特定消息内容。
关于ListPack,在上一篇文章中进行了-Depth分析,您可以单击以查看
REDIS流的实现取决于RAX结构和ListPack结构。每个消息流都包含RAX结构,该结构存储在RAX结构中作为键和结构。每个消息的特定信息存储在此listPack中。1)示例图:
2)流结构:
3)消费者组。消费者群是流中的一个重要概念。每个流将有多个消费者组。每个消费者群体都是通过组名称唯一标识的。同时,溪流结构相关联。结构如下:
特别是,PEL结构:PEL是消费者组的消息,消息ID是密钥,并且StreamNack值得。
消费者组的概念受到作者的Kafka消费者组的启发;在流的消费者组中:
可以有多个生产商可以继续发布新闻,同时可以听到多个消费者群体。此外,还支持非消费者组的状况。如下所示:4)消费者。通过流媒体来唯一确定每个消费者,该结构如下:
每个消费者都有一个PEL结构,可以保存元数据,并提供无薪信息;值得注意的是,这里不会有完整的消息,只有消息ID和处理情况的元数据;
因此,当您想从PEL恢复数据时,需要从PEL获取消息ID列表,然后根据原始流列表根据ID列表检查特定消息信息。
5)没有确认消息。不令人满意的消息保留了消费者或消费者尚未确认的消息。
该结构用于存储在PEL队列中的元数据信息,例如上次处理时间,处理时间以及最后一次处理的消费者。
相信您很细心,您发现StreamCG结构和StreamConsumer结构具有PEL场。那么他们有什么关系?
您可能想问,两个包含关系吗?数据重复记录吗?它确实包含关系,但是为什么您必须录制很多次呢?
6)迭代设备。为了在流中穿越新闻,Redis提供了流层结构。
使用迭代器的优点是为遍历流提供一系列抽象,并且不需要关心实际的radix树 + listpack实现。流都在内部使用,例如streamplyplywithrane(),AOF操作等。
流可以视为消息流。对于消息,只能添加或删除它,并且消息的内容无法更改。因此,这里主要引入与流相关结构的主要操作以及添加和删除的操作。
可以看出,主要是分配空间并初始化相应的字段。
1)添加消息
命令如下:
源代码实现如下:
可以看出,实际元素插入操作已完成,并且
①获得RAX的最后一个键的节点。由于RAX树以消息ID的顺序存储,因此最后一个键节点存储了最后插入的消息。
②查看节点是否可以插入此新消息中。
③如果节点不能再插入新消息(ListPack为空或已设置的存储最大值),请在RaxlistPack中插入一个新节点;如果您仍然可以插入消息,则插入消息的比较与与ListPack中主消息相对应的相应字段完全一致,并且完全表明该消息可以由Master's字段重复使用。
④将其插入与原始RAX的最后一个键节点相对应的新构建的ListPack或原始列表包。此步骤主要取决于前两个步骤的结果。该功能主要使用ListPack和RAX的相关接口。
2)添加了消费者组
在流下创建一个消费者组;如果消费者组已经存在,则返回,否则,指针将返回。
3)新消费者
流允许消费者增加消费者群体的消费者,但没有直接提供在某个消费者群体中创建消费者的界面。相反,当他们在消费者群体中查询消费者时,他们发现消费者群体没有选择消费者何时没有消费者。
在阅读消费者组的消息时,如果不存在消费者,它将自动创建
1)删除消息
订单,例如:
源代码实现:
您可以看到删除操作已移交给处理:
成功删除回返回1,否则返回0。最终删除操作完成了:
此方法通常仅设置要删除要删除的符号,并且不会从其位置的ListPack删除消息。当已删除该消息的整个列表包的所有消息时,该节点将从rax。
2)裁缝信息流
消息流的大小(未删除的消息的数量,不包括已删除的消息)被剪切到给定的大小,当删除消息时,请根据消息将其从小到大的删除id.接口是
3)释放消费者组
消费者组发行的接口是流弗雷克。界面主要完成2个部分。首先,发布了消费者组的PEL链接列表,然后发布消费者组中的每个消费者。
4)释放消费者
在发布消费者时,重要的是要注意,无需释放消费者的PEL,因为消费者的不令人满意的消息结构StreamNack与消费者组的PEL共享并直接释放相关的内存。
REDIS提供了流的迭代流式流材,用于在流中浏览消息。主要有以下四种方法:
为了更好地帮助您了解基本原则,您可以查看ListPack和Rax的原理。