作者|皮天Redis是目前最流行的kv数据库。当然,它的功能越来越多,不局限于kv场景。消息队列是Redis中的一个重要功能。Redis从2010年发布1.0版本开始就有了消息队列的雏形,经过10多年的迭代,其消息队列的功能已经越来越完善。作为全内存消息队列,适用于需要高吞吐量和低延迟的应用。时间场景。下面我们就来看看Redis消息队列功能的发展历程,历史版本有哪些不足,后续版本又是如何解决这些问题的。1.Redis1.0list从广义上讲,消息队列是一种队列的数据结构。生产者从队列的一端放入消息,消费者从队列的另一端读取消息。消息保证按照先进先出的顺序。一个本地链表数据结构是一个进程维度的消息队列,让模块A写消息,模块B消费消息,从而实现模块A/B的解耦和异步。但是为了实现应用层的解耦和异步,需要一个消息队列服务。1、列表的特点Redis1.0在发布的时候就有一个列表数据结构。应用A可以通过lpush写入消息,应用B可以通过rpop从队列中读取消息。每条消息只会被读取一次,并根据lpushWrite读取。同时Redis的接口是并发安全的。多个生产者可以同时向一个列表生产消息,多个消费者可以从列表中读取消息。这里还有一个问题,消费者如何知道列表中有一条消息,是否需要不停地轮询查询呢?轮询不能保证及时处理消息,会增加延迟,而且当列表为空时,轮询请求大部分是无效请求,浪费了大量的系统资源。还好Redis有brpop接口,有一个timeout这个参数。如果列表为空,Redis服务器不会立即返回结果。它会在返回之前等待列表中的新数据或等待超时最多返回空。长轮询是通过brpop接口实现的,相当于服务器端推送。消费者可以第一时间感知到新的消息,并且通过设置合理的超时时间,将系统资源的消耗降低到很低的水平。#基于list完成消息生产和消费#Producer生产消息msg1lpushlistAmsg1(integer)1#Consumer读取消息msg1rpoplistA"msg1"#Consumer以阻塞方式读取listA,如果有数据则立即返回,否则最多等待持续10秒钟。brpoplistA101)"listA"2)"msg1"当使用rpop或brpop消费消息时,接口会先从队列中删除消息,然后再被应用消费。如果应用程序在处理消息之前异常崩溃,则消息丢失。但是,如果使用lindex这样的只读命令先读取消息,处理完再删除,则需要额外的机制来保证一条消息不会被其他消费者重复读取。幸运的是,列表有一个接口,如rpoplpush或brpoplpush,它们可以原子地从列表中删除消息并将其添加到另一个列表。应用可以通过两个列表的组合来完成消息的消费和确认功能。使用rpoplpush消费列表A中的消息,并将其移动到列表B中,处理完消息后,将消息从列表B中删除。如果处理完消息,应用程序异常崩溃。恢复后,应用程序可以再次从列表B中读取和处理未处理的消息。该方法增加了消息消费的ack机制。#根据2个列表完成消息消费和确认#从listA读取消息写入listBrpoplpushlistAlistB"msg1"#业务逻辑处理完msg1后,从listB中删除msg1,完成消息确认lremlistB1msg1(integer)12.链表的不足通过Redis1.0引入的链表结构,我们可以实现一个分布式的消息队列来满足一些简单的业务需求。但是,列表结构作为消息队列服务有一个致命的问题。它没有广播功能,一条消息只能消费一次。在大型系统中,通常一条消息会被多个下游应用同时订阅和消费。例如,当用户完成订单的支付操作后,需要通知商家发货。为了更新物流状态,用户的积分和积分也可能会增加。level,这些是不同的下游子系统,它们都会订阅完成的支付操作,一个list的消息只能消费一次,面对如此复杂的大型系统,捉襟见肘。也许你会说有多个列表,生产者向每个列表发送消息,每个消费者处理自己的列表。首先是性能不会很好,因为同一条消息需要重复传递。二是这种设计违背了生产者和消费者的解耦原则。在这种设计下,生产者需要知道下游有哪些消费者,如果业务发生变化,需要增加额外的消费者,生产者的代码也需要修改。3、优点总结模型简单,与使用本地列表基本相同,适应性强。通过brpop可以实现消息的实时处理。可以通过rpoplpush链接两个列表,这样可以先消费后确认消息,避免消费应用出现异常情况消息丢失不足消息只能消费一次,缺少广播机制2.Redis2.0pubsublist作为a的应用场景消息队列是有限的。重要的原因是没有广播,所以Redis2.0引入了一个新的数据结构pubsub。虽然pubsub不能算是list的替代品,但是确实可以解决一些list无法解决的问题。1.pubsub特性pubsub引入了一个概念,叫做channel。生产者通过发布接口传递消息时会指定频道,消费者通过订阅接口订阅自己关心的频道。调用订阅后,这个连接会进入一个特殊的状态,这个状态通常不能发送其他请求,当有消息投递到这个通道时,Redis服务器会立即通过这个连接将消息推送给消费者。这里,一个频道可以被多个应用订阅,消息会同时投递给每个订阅者,实现消息的广播。另一方面,消费者可能会订阅一批频道。比如某用户订阅了浙江新闻推送,但是浙江新闻也会被细分,比如“浙江杭州xx”,“浙江温州xx”,这里的订阅者不是,你需要获取浙江的所有子类,订阅一个一个。您只需调用psubscribe“浙江*”即可订阅所有以浙江开头的动态消息。这里psubscribe传入通配符表示的频道,Redis服务器根据规则推送所有匹配的频道。消息发送给相应的客户端。#基于pubsub完成频道匹配和消息广播#消费者1订阅channel1subscribechannel11)"subscribe"2)"channel1"3)(integer)1#收到消息push1)"message"2)"channel1"3)"msg1"#消费者2订阅频道*psubscribechannel*1)"psubscribe"2)"channel*"3)(integer)1#收到消息push1)"pmessage"2)"channel*"3)"channel1"4)"msg1"1)"pmessage"2)"channel*"3)"channel2"4)"msg2"#Producerpublishmessagesmsg1andmsg2publishchannel1msg1(integer)2publishchannel2msg2(integer)1inRedfis2.8keyspacenotifications函数时添加后,pubsub除了可以通知用户自定义的消息外,还可以通知系统内部的消息。keyspacenotifications引入了两个特殊的通道,分别是__keyevent@__:和__keyspace@__:,通过订阅__keyevent客户端,可以收到特定命令调用的回调通知,通过订阅__keyspace客户端,可以收到添??加目标键的删除、修改和过期事件。要使用该功能,还需要开启notify-keyspace-events的配置。#通过keyspacenotifications函数获取系统事件#WriterequestsettestkeyvEX1#订阅key级事件psubscribe__keyspace@0__:testkey1)"psubscribe"2)"__keyspace@0__:testkey"3)(integer)1#Receive通知1)"pmessage"2)"__keyspace@0__:testkey"3)"__keyspace@0__:testkey"4)"set"1)"pmessage"2)"__keyspace@0__:testkey"3)"__keyspace@0__:testkey"4)"expire"1)"pmessage"2)"__keyspace@0__:testkey"3)"__keyspace@0__:testkey"4)"expired"#订阅所有命令事件psubscribe__keyevent@0__:*1)"psubscribe"2)"__keyevent@0__:*"3)(integer)1#收到通知1)"pmessage"2)"__keyevent@0__:*"3)"__keyevent@0__:set"4)"testkey"1)"pmessage"2)"__keyevent@0__:*"3)"__keyevent@0__:expire"4)"testkey"1)"pmessage"2)"__keyevent@0__:*"3)"__keyevent@0__:expire"4)"testkey"2.pubsub的缺点pubsub既可以单播也可以广播,还支持频道的简单正则匹配,已经可以满足大部分业务功能的需求ns,而且这个接口很早就发布了。它在2011年Redis2.0发布的时候就已经面世,拥有广泛的用户群,所以现在很多企业都在使用这个功能。但是当你深入了解了pubsub的原理之后,对于一个一致性要求高、数据量大的系统,你肯定不敢将其作为消息服务使用。首先,pubsub的消息数据是瞬时的,并没有保存在Redis服务器上。通过publish发送给Redis的消息会被推送给当时所有连接订阅的客户端。如果客户端因为网络问题断开连接,那么就会丢失这条消息,当客户端重新连接时,就无法找回之前的消息,甚至无法判断是否丢失了任何一条消息。其次,pubsub中的消费者采用推送模型获取消息,也就是说Redis会根据消息产生的快慢向所有消费者推送消息,而不管消费者的处理能力如何。如果消费者应用的处理能力不足,消息会在Redis的客户端buf中累积,当累积数据超过一个阈值时,连接就会断开,这意味着这些消息全部丢失,无法找回。如果多个consumer的clientbuf同时累积数据但还没有达到断开阈值,Redisserver的内存就会膨胀,进程可能会因为oom被kill掉,从而导致整个服务中断。3.优势总结消息具有广播能力。psubscribe可以匹配字符串通配符,这给了业务逻辑灵活性。可以订阅特定键或特定命令的系统消息是不够的。Redis异常和客户端断开会导致消息丢失。消息缺乏积累消息的能力。削峰填谷。push方式缺乏反压机??制,没有考虑消费者的处理能力。如果推送的消息超过消费者的处理能力,可能导致消息丢失或服务异常。3、Redis5.0流消息丢失和消息服务不稳定的问题严重限制了pubsub的应用场景,因此Redis需要重新设计一种机制来解决这些问题,这就导致了后来的流结构。1.流特性一个稳定的消息服务需要有几个关键点。它必须保证消息不会丢失并且至少被消费一次。它必须具有削峰填谷的能力,以匹配生产者和消费者之间的吞吐量差异。2018年,Redis5.0加入了流结构。这次考虑到list和pubsub在应用场景中的缺陷,基于Kafka模型重新设计了全内存消息队列结构。从此,Redis的消息队列功能就可以算是媲美主流消息了。排队积pk一。Stream的改进分为几个方面。成本:listpack结构用于存储消息数据,是一种紧凑的数据结构。不同于list的双向链表,每个节点额外占用2个指针的存储空间,这使得small在msg的情况下,stream的空间利用率更高。作用:stream引入消费组的概念。一个消费组中可以有多个消费者,同一个消费组中的消费者共享一个消息位置(last_delivered_id),可以让消费者横向扩展。在一个组中添加多个消费者以线性提高吞吐量。对于一个消费者组,每条消息只会被其中一个消费者获取和处理,这是pubsub广播模型所不具备的。之前不同的consumergroup是相互隔离的,各自维护自己的location,使得一个msg可以被多个不同的consumergroup重复消费,实现消息广播的能力。流中的消费者使用pull方式,可以设置超时时间,在没有消息的时候阻塞。这种长轮询机制保证了消息的实时性,消费速率与消费者自身的吞吐量相匹配。消息不丢失:流的数据会保存在aof和rdb文件中,这样可以在Redis重启后恢复流的数据。但是pubsub的数据是瞬时的,重启Redis就意味着所有的消息都丢失了。流中的每个消费者组都会存储一个last_delivered_id来标识已读取的位置。客户端断开重连后,可以从这个位置继续读取,消息不会丢失。Stream引入了ack机制来保证消息至少被处理一次。考虑一个场景,如果消费者应用已经读取了消息,但是应用还没来得及处理就宕机了,对于这样一个已经读取但是没有ack的消息,stream会将消息的状态标记为pending等。客户端重连后,使用xpending命令重新读取pengind状态的消息,继续处理。如果应用程序永久宕机,消费者组中的其他消费者应用程序也可以读取消息,并通过xclaim命令分配给自己继续处理。#根据流完成消息的生产和消费,保证消息在异常状态下至少被消费一次自动生成消息id,消息内容是一个kv数组,其中包含field1value1field2value2XADDmystream*field1value1field2value2"1645517760385-0"#消费组mygroup中的消费者consumer1从mystream中读取一条消息,>表示消费者组从未读消息中读取消息XREADGROUPGROUPmygroupconsumer1COUNT1STREAMSmystream>1)1)"mystream"2)1)1)"1645517760385-0"2)1)"field1"2)"value1"3)"field2"4)"value2"#消费完成后的Ack确认消息xackmystreammygroup1645517760385-0(integer)1#如果消费者应用在ack之前异常崩溃,恢复后会取回未处理的消息id。XPENDINGmystreammygroup-+101)1)"1645517760385-0"2)"consumer1"3)(integer)3053564)(integer)1#如果consumer1永远宕机,其他消费者可以将pending状态的消息移动到Continueto以自己的名义消费#将消息id1645517760385-0移动到consumer2下的XCLAIMmystreammygroupconsumer201645517760385-01)1)"1645517760385-0"2)1)"field1"2)"value1"3)"field2"4)“value2”Redis流保证消息至少被处理一次,但是如果想让每条消息只被处理一次,就需要应用逻辑的介入。消息通过生产者的重复交付或消费者的重复消费而被重复处理。针对生产者重复投递的问题,RedisStream为每条消息设置了唯一的增量id。通过参数,Redis可以自动生成一个id,也可以由应用指定一个id。应用程序可以根据业务逻辑为每个消息生成一个id。当xadd超时后,应用无法判断消息是否投递成功。可以通过xread查询该id的消息是否存在。如果存在,则表示发送成功。重复提交将被拒绝。该机制确保每条消息只能传递一次。对于消费者重复消费的问题,考虑一个场景。消费者读取消息后,业务流程完成,但在ack之前发生异常。应用恢复后,没有ack的消息被重复消费。这个问题是因为ack和消费消息的业务逻辑发生在两个系统中,不能是事务性的,需要业务改造来保证消息处理的幂等性。2.流的不足流模型实现了消息的高效分发,保证了消息至少被处理一次。通过应用逻辑的改造,消息可以只被处理一次。其能力与Kafka相当,但吞吐量高于Kafka,在高吞吐场景下成本比Kafka低,那么它的缺点是什么。首先,消息队列的一个很重要的功能就是削峰填谷,以匹配生产者和消费者吞吐量的差异。生产者和消费者的吞吐量差异越大,持续时间越长,意味着需要在Steam中积累更多的消息。news,而且Redis是全内存产品,数据积累成本高于磁盘。其次stream使用ack机制保证消息至少被消费一次,但是有一个前提是Redis中保存的消息不会丢失。Redis数据的持久化依赖于aof和rdb文件。有几种方法可以将aof放入磁盘。通过配置appendfsync来决定。通常,我们不会一如既往地配置为在每个命令执行后进行一次fsync。在线配置一般是everysec,fsync是每秒做一次,而rdb是在全量备份的时候产生的,也就是说在宕机恢复的时候可能会丢失最后一秒的数据。另一方面,在线生产环境中的Redis是一个高可用的架构。当主节点宕机时,通常不遵循恢复逻辑,而是直接切换到备节点继续提供服务。Redis的同步方式是异步同步,也就是说主节点上新写入的数据可能还没有同步到备节点上,这部分数据在切换后会丢失。因此,在故障恢复过程中,Redis中的部分数据可能会丢失。在这种情况下,无论流接口设计得多么好,都不能保证消息至少被消费一次。3、优点总结。在成本和功能方面进行了许多改进。支持小消息紧凑存储、广播能力、消费者水平扩展、背压机制。ack机制保证Redis服务器上的消息在正常情况下至少能收到。基于内存的消息队列一次处理能力不足,数据堆积成本高,Redis本身rpo>0,故障恢复可能丢数据,所以stream不能保证Redis故障恢复后至少消费一次消息。4、Tair持久内存版streamRedis流的缺点也是基于内存数据库的特性带来的。它具有高吞吐量和低延迟,但是在大容量下成本会比较高,而且应用场景并不完全是绝对的大容量低吞吐量或者小容量高吞吐量,有时应用场景介于两者之间,以及需要平衡容量和吞吐量之间的关系,因此需要一种存储成本低于RedisStream,但性能高于磁盘式消息队列的产品。另一方面,RedisStream无法保证在Redis故障场景下消息不会丢失,这就需要业务实现一些复杂的机制来补充这些数据,也限制了它在一些需要高一致性的场景中的应用。.为了让业务逻辑更简单,stream的应用范围更广,需要在故障场景下保证消息的持久化。兼顾成本、性能、持久化,有Tair持久内存版本。1.Tair持久内存版本空间更大,成本更低。Tair持久内存版引入了IntelOptane持久内存(以下简称AEP)。其性能略低于内存,但成本低于同容量内存。Tair持久内存版本将主要数据存储在AEP上,使得同等容量下成本更低,从而使得流能够以同样的单价积累更多的消息。兼容社区版Tair持久内存版兼容原生Redis的大部分数据结构和接口,100%兼容流相关接口。如果你之前使用过stream的社区版,不需要修改任何代码,只需要改变一个连接地址就可以切换到持久内存版。并通过工具完成社区版和持久内存版数据的双向迁移。实时数据持久化Tair持久化内存版并不是简单的将Redis中的数据换成另一种介质进行存储,因为这只能通过AEP来降低成本,但是并没有利用AEP的数据不会丢失的特性。电源关闭,对持久性能力没有影响。任何促销。开源Redis通过在磁盘上记录AppendOnlyLog来持久化数据。AppendOnlyLog记录了所有的写操作,相当于redolog,在宕机恢复时通过回放这些日志来恢复数据。但是由于磁盘介质的高延迟和Redis内存数据库使用场景的低延迟要求,fsync无法在每次写操作后持久化日志,可能不会将最新写入的数据持久化到磁盘。可能的数据丢失的根本原因。Tair持久内存版本的数据恢复没有使用AppendOnlyLog,而是将redis数据结构存储在AEP上,这样这些数据结构在宕机后不会丢失,并且在这些数据结构中增加了一些额外的描述信息,之后宕机时,可以在恢复时读取这些额外的描述信息,从而重新识别这些redis数据结构并建立索引,将状态恢复到宕机前的状态。Tair通过将redis数据结构和描述信息实时写入AEP来保证写入数据的实时持久化。HA数据不会丢失。Tair持久内存版本保证数据持久化,但生产环境是高可用架构。大多数情况下,当主节点异常宕机时,不会等待主节点重启恢复,而是切换到备节点继续。提供服务,然后在新的主节点上增加一个新的备节点。因此,如果发生故障时,如果有数据还没有从主节点同步到备节点,这部分数据就会丢失。Redis采用异步同步。当客户端写入数据返回成功时,对Redis的修改可能没有同步到备节点。如果此时主节点宕机,数据就会丢失。为了避免HA过程中数据丢失,Tair持久内存版本引入了半同步机制,保证在写请求成功返回前,相关修改已经同步到备节点。可以发现,开启半同步功能后,写请求的RT会变高,会比主备同步耗时更多,大约需要几十微秒。但是通过一些异步技术,虽然写请求的RT会变高,但是对实例的最大写吞吐量影响不大。启用半同步时,生成器通过xadd发送消息。如果返回成功,消息必须同步到备节点。这时候就发生了HA,消费者在备节点上也可以读取这条消息。如果xadd请求超时,此时消息可能同步也可能不同步到备节点,生产者无法确定。这时候通过再次投递消息,可以保证消息至少被消费一次。如果严格要求保证消息只被消费一次,那么生产者可以通过xread接口查询消息是否存在,如果不存在则重新投递。2.优势总结AEP作为一种存储介质被引入。目前Tair持久内存版价格是社区版的70%。保证数据的实时持久化,通过半同步技术HA不丢数据。大多数情况下,消息不会丢失(当备库故障或主备网络异常时,会降级为异步同步,可用性优先)。消息至少被消费一次或仅被消费一次。5、未来消息队列主要解决三类问题,应用模块的解耦,消息的异步化,削峰填谷。目前主流的消息队列都能满足这些需求,所以在实际选型时,还要考虑一些特殊功能是否满足,产品的性能,具体业务场景下的成本,开发的复杂度等。Redis的消息队列功能还不是最全面的。它不想做一个大而全的产品,而是做一个小而美的产品,服务于某些用户在某些场景下的需求。目前用户之所以选择Redis作为消息队列服务,主要是因为Redis在同样的成本下具有更高的吞吐量,Redis的延迟更低,应用需要消息服务又不想引入一堆额外的依赖。未来,Tair持久内存版本将解决这些需求,并继续放大这些优势。吞吐量对持久化内存版本的持久化过程进行了优化,使其吞吐量接近内存版本甚至超过内存版本的吞吐量。Delay通过rdma在多副本之间同步数据,减少半同步下写入数据的延迟。
