直到几个月前,在消息传递的上下文中,流对我来说只是一个有趣且相对简单的概念。这个概念在Kafka流行之后,我主要研究了他们在Disque案例中的应用,Disque是一个消息队列,在Redis4.2中会转化为Redis的一个模块。然后我决定让Disque使用AP消息(LCTT译注:见CAP定理),即不需要client过多的参与就可以实现容错和可用性,所以我更确定flow的概念不适用在这种情况下。然而当时Redis在默认导出数据结构方面存在问题,这并不容易。它在Redis列表列表、有序集排序列表、发布/订阅Pub/Sub功能之间存在一定的缺陷。您可以使用这些工具来为一系列消息或事件建模。OrderedSets消耗大量内存,因此自然无法对相同的消息传递进行建模,客户端也无法阻止新消息。因为有序集不是序列化的数据结构,它是一个元素可以根据数量移动的集合:所以它不像顺序数据。列表还有另一个问题,它在某些特定用例中会产生类似的适用性问题:您无法浏览列表的中间部分,因为在那种情况下,访问时间是线性的。此外,由于没有任何指定输出的功能,列表上的阻塞操作仅向单个客户端提供单个元素。列表中没有固定的元素ID,也就是无法指定从哪个元素开始给我内容。对于一对多的工作任务,有发布/订阅机制,这在大多数情况下是非常好的,但是,对于一些不想“fire-and-forget”的事情:keepingahistory重要的不仅是因为消息会在断开连接后被检索,而且因为某些顺序消息列表,使用范围查询来浏览是非常重要的:例如,这个10秒范围内的温度读数是多少?在我尝试解决上述问题时,我想编写一个通用的排序集并包含一个独特的、更灵活的数据结构,但是,我的设计尝试最终产生了比当前数据结构更人为的结果。Redis的优势在于它的数据结构导出更像是一种自然的计算机科学数据结构,而不是“Salvatore发明的API”。所以我最终停止了我试图做的事情并说,“好的,这就是我们现在所拥有的”,也许我会向发布/订阅添加一些历史记录,或者为列表访问添加一些更灵活的东西。然而,每当用户在会议上对我说“你如何在Redis中模拟时间序列”之类的话,我的脸就绿了。在Redis4.0中引入模块后,用户开始思考如何自己解决这些问题。其中一位用户,TimothyDowns,通过IRC告诉我:\我计划向这个模块添加一个类似事务日志的数据类型——这意味着大量的订阅者可以做一些事情,比如发布/订阅,其中\订阅者在消息队列中保留他们的位置,而不是让Redis必须维护每个消费者的位置并为每个订阅者复制消息。他的想法启发了我。我想了几天,意识到这可能是我们一次解决上述所有问题的机会。我需要重新想象“日志记录”的概念是什么。日志记录是每个人都使用的基本编程元素,因为它只是以追加模式打开文件并以特定格式写入数据。但是Redis数据结构必须是抽象的。它们在内存中,我们使用内存不是因为我们懒惰,而是因为使用一些指针,我们可以概念化数据结构并将它们从显式约束中抽象出来。比如日志一般有几个问题:偏移量不是逻辑的,而是真正的字节偏移量,如果你想要相对于条目插入时间的逻辑偏移量怎么办?我们有可用的范围查询。此外,日志通常很难进行垃圾收集:如何删除只能追加的数据结构中的旧元素?好吧,在我们理想的日志中,我们只是说,我想要带有数字***的条目并且没有旧元素等。当我受到Timothy的想法启发并尝试编写规范时,我使用了基数Redis集群中的树来实现它并优化了它的某些部分。这为实现节省空间的日志提供了基础,并且仍然有可能以对数时间访问范围。同时开始阅读关于Kafka的流相关的内容来获得额外的灵感,也很适合我的设计,***借鉴了Kafka消费者群体的概念,再次优化以供Redis应用到Redis在内存中使用的内容。然而,该规范只是纸上谈兵,过了一段时间后,我几乎从头开始重写它以添加到Redis升级以及我从与其他人讨论中得到的许多建议。我希望Redis流能够成为时间序列的一个有用的特性,而不仅仅是一个常见的事件和消息类型的应用程序。让我们写一些代码从Redis会议回来后,我整个夏天都在实现一个名为listpack的库。该库是ziplist.c的继承者,ziplist.c是一种数据结构,表示单个分配中的字符串元素列表。它是一种非常特殊的序列化格式,其特点是能够以相反的顺序(从右到左)进行解析:以便在各种用例中替换ziplists。结合基数树和列表包的特性,很容易构建一个空间高效的日志,它也是可索引的,这意味着允许按ID和时间进行随机访问。既然这些都准备好了,我就开始写一些代码来实现流数据结构。我还在完成实现,无论如何,它现在在Github上的Redis的streams分支中工作。我并不是说API是100%最终的,但是,关于它有两个有趣的事实:一,当时只缺少消费者组,加上一些不太重要的操作流的命令,但是,所有大的方面都有得到满足。其次,一旦各方面都相对稳定,我决定在大约两个月内将所有流功能向后移植到4.0分支。这意味着想要使用流的Redis用户不必等待Redis4.2发布,它们将立即在生产中可用。这是可能的,因为作为一种新的数据结构,几乎所有的代码改动都发生在新代码中。除了阻塞列表操作:代码已经重构,我们将流和列表阻塞操作共享相同的代码,这大大简化了Redis内部实现。教程:欢迎使用Redis流在某些方面,您可以将流视为Redis列表的增强版。流元素不再是单个字符串,而是一个由字段和值组成的对象。范围查询更适用,速度更快。在流中,每个条目都有一个ID,它是一个逻辑偏移量。不同的客户端可以阻塞等待blocking-wait大于指定ID的元素。RedisStreams的一个基本命令是XADD。是的,所有Redis流命令都以X为前缀。>XADDmystream*sensor-id1234temperature10.51506871964177.0此XADD命令将附加指定条目作为指定流的新元素-“mystream”。上例中的条目有两个字段:sensor-id和temperature,每个条目在同一个流中可以有不同的字段。使用相同的字段名称可以提高内存利用率。有趣的是,字段的排序可以保证顺序。XADD只返回插入条目的ID,因为第三个参数中的星号(*)表示该ID是命令自动生成的。通常这就足够了,但也可以强制一个ID,在这种情况下用于将命令复制到从属服务器和AOFappend-only文件。这个ID由两部分组成:毫秒时间和序号。1506871964177是毫秒时间,正好是毫秒级的UNIX时间戳。点号(.)后的数字0为序号,用于区分毫秒数相同的表项。这两个数字都是64位无符号整数。这意味着,我们可以将所有需要的条目添加到流中,甚至在同一毫秒内。ID的毫秒部分使用Redis服务器当前本地时间生成的ID和流中最后一个条目的ID之间的最新一个。因此,例如,即使计算机时间向后跳,ID仍然递增。在某些情况下,您可以将流表项的ID视为一个完整的128位数字。然而,它们相对于它们所添加到的实例的本地时间这一事实意味着我们可以以毫秒精度随意查询它们。可以想象,在快速添加两个条目后,结果是只增加了一个序列号。我们可以简单地用一个MULTI/EXEC块模拟“快速插入”:>MULTIOK>XADDmystream*foo10QUEUED>XADDmystream*bar20QUEUED>EXEC1)1506872463535.02)1506872463535.1在上面的例子中,也表明不需要指定任何初始模式在模式的情况下,对不同的条目使用不同的字段。会发生什么如前所述,仅使用每个块的第一条消息(通常包含50-150条消息内容)。此外,具有相同字段的连续条目会使用一个标志进行压缩,该标志表示“它们与该块中的第一个条目具有相同的字段”。因此,使用相同字段的连续消息可以节省大量内存,即使字段集随时间缓慢变化也是如此。为了从流中检索数据,有两种方法:使用XRANGE命令实现的范围查询和使用XREAD命令实现的流式处理。XRANGE命令只获取从开始到停止范围内的所有条目。因此,例如,如果我知道它的ID,我可以使用以下命名获取单个条目:>XRANGEmystream1506871964177.01506871964177.01)1)1506871964177.02)1)"sensor-id"2)"1234"3)"temperature"4)“10.5”在任何情况下,都可以使用指定的开始符号-和结束符号+来表示最小值和***ID。要限制返回的条目数,还可以使用COUNT选项。这是一个更复杂的XRANGE示例:>XRANGEmystream-+COUNT21)1)1506871964177.02)1)"sensor-id"2)"1234"3)"temperature"4)"10.5"2)1)1506872463535.02)1)"foo"2)"10"这里我们说的是ID的范围,那么,要获取给定时间范围内的特定范围的元素,可以使用XRANGE,因为ID的“数字”部分可以省略。所以你可以只指定“毫秒”时间,以下命令的意思是:“从UNIX时间1506872463给我10个条目”:127.0.0.1:6379>XRANGEmystream1506872463000+COUNT101)1)1506872463535.02)1)"foo"2)"10"2)1)1506872463535.12)1)"bar"2)"20"XRANGE最需要注意的是,假设我们收到回复中的ID,后面连续的ID只是加上序号部分,因此XRANGE可用于遍历整个流,为每次调用接收指定数量的元素。Redis中的*SCAN命令系列允许迭代Redis数据结构,尽管它们不是为迭代而设计的,但这避免了再次犯同样的错误。使用XREAD进行流式处理:阻止新数据当我们想要通过ID或时间访问流中的范围,或者通过ID获取单个元素时,最好使用XRANGE。然而,在使用流的情况下,当数据到达时,它必须被不同的客户端消费,这不是一个好的解决方案,这需要某种形式的池化。(这对于某些应用程序可能是个好主意,因为它们只是偶尔被查询)XREAD命令旨在同时从多个流中读取,只指定我们获得的***条目的ID。此外,如果没有数据可用,我们可以要求阻塞,并在数据到达时解除阻塞。类似阻塞列表操作的效果,但是从流中获取的数据并没有在这里消费,多个客户端可以同时访问同一个数据。下面是一个典型的XREAD调用的例子:>XREADBLOCK5000STREAMSmystreamotherstream$$意思是:从mystream和otherstream获取数据。如果没有可用数据,则阻塞客户端5000毫秒。在STREAMS选项后面指定我们要监控的关键字,最重要的是指定我们要监控的ID,指定的ID是$表示:假设我现在需要流中的所有元素,那么只需要开始从下面一个到达的元素开始给我。如果我从另一个客户端发送这样的命令会发生什么:>XADDotherstream*XREAD端的消息“你好”?1)1)"otherstream"2)1)1)1506935385635.02)1)"message"2)"HiThere"随着接收到的数据,我们也得到了数据的密钥。在下一次调用中,我们将使用收到的***消息的ID:>XREADBLOCK5000STREAMSmystreamotherstream$1506935385635.0等等。但是请注意,使用它的方式,客户端可能会在非常大的延迟后重新连接(因为处理消息需要时间,或者出于其他原因)。这样的话,期间就会积累很多消息。为了确保客户端不会被消息淹没,并且服务器不会通过向单个客户端提供大量消息来浪费太多时间,使用XREAD的COUNT选项是非常明智的。到目前为止,流上限看起来不错……但是,有时流需要删除一些旧消息。幸运的是,这可以使用XADD命令的MAXLEN选项来完成:>XADDmystreamMAXLEN1000000*field1value1field2value2基本上意味着,如果向流中添加新元素后,发现消息数超过1000000,则删除旧消息,使元素总数回到1,000,000以内。它很像在列表中使用的RPUSH+LTRIM,但这次我们使用内置机制来完成它。但是请注意,上面的意思是每次我们添加新消息时,我们都需要额外的工作来从流中删除旧消息。这会消耗一些CPU资源,所以在计算MAXLEN之前尽可能使用~符号来表示我们不需要非常精确的1000000条消息,稍微多一点也没什么大不了的:>XADDmystreamMAXLEN~1000000*foobar这样XADD只有在可以删除整个节点的情况下才删除消息。与普通的XADD相比,这种方法几乎可以自由地限制流量。Consumergroups(indevelopment)这是Redis中第一个尚未实现且正在开发中的功能。灵感也来自Kafka,尽管在这里以不同的方式实现。重点是使用了XREAD,客户端也可以加一个GROUP选项。同一组的所有客户端将自动收到不同的消息。当然,同一个流可以被多个组读取。在这种情况下,所有组都将收到到达流中的消息的相同副本。但是,在每个组内,消息不会重复。指定组时,可以指定RETRY选项来扩展组:在这种情况下,如果消息未通过XACK确认,它将在指定的毫秒数后重新传送。这将为消息传递提供更好的可靠性,其中客户端没有专用方式将消息标记为已处理。这部分也在开发中。内存使用和加载时间节省由于用于建模Redis流的设计,内存使用非常低。这取决于它们的字段、值的数量和长度,对于简单的消息,每使用100MB内存可以有几百万条消息。此外,该格式被认为需要最少的序列化:列表包块存储为基数树节点,在磁盘和内存中以相同的方式表示,因此可以轻松存储和读取。例如,Redis可以在0.3秒内从一个RDB文件中读取500万条条目。这使得流的复制和持久存储非常高效。我还计划允许从条目中间进行部分删除。现在只实现了部分实现。策略是在entrymark中将entry标记为deleted,当被删除的entry占所有entry的比例达到指定值时,这个block会被回收重写。如有必要,它将被连接。到另一个相邻的块上以避免碎片。关于最终发布时间的结论Redis流功能将在年底前包含在Redis4.0系列的稳定版本中(LCTT译注:本文原文发表于2017年10月)。我认为这种通用的数据结构将为Redis提供一个巨大的补丁来解决许多现在难以解决的情况:这意味着你(之前)需要创造性地“滥用”当前提供的数据结构来解决那些问题。一个很重要的使用场景是时间序列,但是,我认为对于其他场景,通过TREAD流式传输消息会很有趣,因为对于那些对可靠性要求更高的应用,可以使用发布/订阅模式来代替“使用和丢弃””,还有其他全新的使用场景。现在,如果你想在有问题的环境中评估这个新的数据结构,你可以更新GitHub上的streams分支来开始试用它。欢迎所有错误报告给我们。:-)如果您喜欢观看视频的方式,这里有一个现场演示:https://www.youtube.com/watch?v=ELDzy9lCFHQ