当前位置: 首页 > 后端技术 > Java

停止使用RedisList来实现消息队列,Stream是为队列设计的

时间:2023-04-02 00:48:55 Java

上次提到使用RedisList实现消息队列有很多局限性,比如:没有很好的ACK机制;没有ConsumerGroup消费组概念;积累。List是线性结构,查询指定数据需要遍历整个List;Stream是Redis5.0引入的专门为消息队列设计的数据类型,Stream是一个包含0个或多个元素的有序队列。这些元素是根据ID的大小排序的。它实现了消息队列的大部分功能:序列化生成消息ID;消息遍历;消息阻塞和非阻塞读取;ConsumerGroups消费群体;ACK确认机制。支持多播。提供了很多消息队列操作命令,并借鉴了Kafka的ConsumerGroups的概念,提供了消费组功能。同时提供消息持久化和主从复制机制。客户端可以随时访问数据,并且可以记住每个客户端的访问位置,保证消息不丢失。废话不多说,先看看怎么用吧。详情见官网:https://redis.io/topics/strea...XADD:插入消息“云澜宗弟子奉命杀萧炎!”当云杉最后一个字落下,弥漫的紧张气氛顿时宣告破碎,悬浮在半空中的众多云岚宗长老双翼背后拍打,飞掠长空,向萧炎追杀而去。云山用下面的命令将“追杀萧炎”的命令插入到队列中,让长老带着自己的子弟去执行。XADD云澜宗*taskkillnameXiaoYan"1645936602161-0"Stream中的每个元素都是由键值对组成的,不同的元素可以包含不同数量的键值对。该命令的语法如下:XADDstreamNameidfieldvalue[fieldvalue...]消息队列名称后面的“*”表示让Redis自动为插入的消息生成一个唯一的ID,当然你也可以自己定义。消息ID由两部分组成:当前毫秒的时间戳;和序号。从0开始,用于区分同时产生的多个命令。通过将元素ID与时间相关联并强制新元素ID必须大于旧元素ID,Redis在逻辑上将流转换为仅追加数据结构。这个特性对于使用流的消息队列和事件系统的用户来说非常重要:用户可以确定新的消息和事件只会出现在现有的消息和事件之后,就像现实世界中的新事件一样总是一切都在有序地发生就好像它发生在现有事件之后一样。XREAD:读取消息云岭老狗使用如下命令接收云山的命令:XREADCOUNT1BLOCK0STREAMS云澜宗0-01)1)"\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"2)1)1)"1645936602161-0"2)1)"task"2)"kill"3)"name"4)"XiaoYan"#XiaoYanXREAD[COUNTcount][BLOCKmilliseconds]STREAMSkey[key...]ID[ID...]该指令可以同时读取多个流,各方法对应的含义如下:COUNT:表示每个流中最多读取的元素个数;BLOCK:阻塞读取,当消息队列中没有消息时,阻塞等待,0表示无限等待,单位为毫秒。ID:消息ID,读取消息时可以指定ID,从该ID的下一条消息开始读取,0-0表示从第一个元素开始读取。如果要使用XREAD进行顺序消费,则必须记住每次读取后返回的消息ID。下次调用XREAD时,会将上次返回的消息ID作为参数传递给下一次调用,就可以继续消费后续的消息了。云云宗主,我今日刚到云澜宗,不想接受历史消息。我只想在我使用XREAD阻塞等待的那一刻接收到我通过XADD发布的消息。我应该怎么办?只需运行“$”心理方法。心法最后的“$”符号表示读取最新的阻塞消息。读不下去,就等死吧。在等待过程中,如果有其他长辈向队列中添加消息,则会立即读取。XREADCOUNT1BLOCK0STREAMS云岚宗$消息队列实现起来这么容易?约定好的ACK机制呢?这只是开胃菜。通过XREAD读取的数据并没有被删除。重新执行XREADCOUNT2BLOCK0STREAMS0-0命令时会再次读取。所以我们还需要ACK机制。接下来,我们来到一个真正的消息队列。ConsumerGroupRedisStream的ConsumerGroup(消费者组)允许用户在逻辑上把一个流分成多个不同的流,让ConsumerGroup的消费者去处理。它是一个强大的多播持久化消息队列。RedisStream借鉴了Kafka的设计。Stream的高可用是基于主从复制的,这与其他数据结构的复制机制没有区别,也就是说Stream在Sentinel和Cluster集群环境下都可以支持高可用。RedisStream的结构如上图所示。有消息列表,每条消息都有唯一的ID和对应的内容;消息是持久的;每个消费者组的状态是独立的,互不影响,同一条Stream消息会被所有消费者组消费;一个消费群体可以由多个消费者组成,消费者之间存在竞争关系。任何阅读消息的消费者都会将last_deliverd_id向前移动;每个consumer都有一个pending_ids变量,用来记录当前consumerread已经fetch但是还没有acked的消息。它用于确保消息至少被客户端消费一次。消费组实现的消息队列主要涉及以下三个指令:XGROUP用于创建、销毁和管理消费组。XREADGROUP用于通过消费者组从流中读取。XACK是一个允许消费者将未决消息标记为已正确处理的命令。创建消费组Stream通过XGROUPCREATE命令创建消费组(ConsumerGroup),需要传递初始消息ID参数来初始化last_delivered_id变量。我们使用XADD向bossStream队列中插入一些消息:XADDbossStream*namezhangsanage26XADDbossStream*namelisiage2XADDbossStream*namebigoldage40以下命令为名为bossStream的消息队列创建了两个“青龙门”和“六扇门”.消费群体。#语法如下#XGROUPCREATEstreamgroupstart_idXGROUPCREATEbossStream青龙门0-0MKSTREAMXGROUPCREATEbossStream六门0-0MKSTREAMstream:指定队列名称;group:指定消费组名称;start_id:在StreamID中指定消费组的起始,决定了消费组从哪个ID开始读取消息,0-0从第一个开始读取,$表示从最后一个向后开始读取,以及只接收新消息。MKSTREAM:默认情况下,如果目标流不存在,XGROUPCREATE命令将返回错误。可以使用可选的MKSTREAM子命令作为后的最后一个参数自动创建流。读取消息让“青龙门”消费组的consumer1从bossStream块中读取一条消息:XREADGROUPGROUP青龙门consumer1COUNT1BLOCK0STREAMSbossStream>1)1)"bossStream"2)1)1)"1645957821396-0"2)1)"name"2)"zhangsan"3)"age"4)"26"语法如下:XREADGROUPGROUPgroupNameconsumerName[COUNTn][BLOCKms]STREAMSstreamName[stream...]id[id...][]中的表示可选参数。该命令与XREAD类似,不同之处在于增加了GROUPgroupNameconsumerName选项。该选项的两个参数用于指定要读取的消费者组和负责处理消息的消费者。其中:>:命令的最后一个参数>,表示从还没有被消费的消息开始读取;BLOCK:分块阅读;敲黑板。如果消息队列中的消息被消费组的某个消费者消费,则这条消息将不再被该消费组的其他消费者读取。例如consumer2执行读操作:XREADGROUPGROUP青龙门consumer2COUNT1BLOCK0STREAMSbossStream>1)1)"bossStream"2)1)1)"1645957838700-0"2)1)"name"2)"lisi"3)"age"4)"2"consumer2不能再读zhangsan,而是读下一个lisi,因为这条消息已经被consumer1读过了。使用消费者的另一个目的是让组内的多个消费者共享和读取消息,即每个消费者读取部分消息,从而达到负载均衡。例如,一个消费者组有三个消费者C1、C2、C3和一个包含消息1、2、3、4、5、6、7的流:XPENDING检查已读和未确认的消息,以确保消费者在消费时失败或者关机重新启动后仍然可以读取消息。Stream内部有一个队列(pendingList),用来保存每个消费者读到但还没有执行ACK的消息。如果消费者使用XREADGROUPGROUPgroupNameconsumerName读取消息,但没有向Stream发送XACK命令,则消息仍然保留。例如查看bossStream中消费组“青龙门”中每个消费者读过的未确认消息信息:XPENDINGbossStreamQinglongmen1)(integer)22)"1645957821396-0"3)"1645957838700-0"4)1)1)"consumer1"2)"1"2)1)"consumer2"2)"1"1)未确认消息的数量;2)~3)青龙门所有消费者读取消息的最小和最大ID;查看consumer1读取了哪些数据,使用如下命令:XPENDINGbossStream青龙门-+10consumer11)1)"1645957821396-0"2)"consumer1"3)(integer)37583844)(integer)1ACK确认所以当收到消息并成功消费后,我们需要手动ACK通知Streams,消息会被删除。命令如下:XACKbossStreamQinglongmen1645957821396-01645957838700-0(integer)2语法如下:XACKkeygroup-keyID[ID...]消费确认增加消息的可靠性,一般在业务之后处理完成,需要执行ack确认消息被消费,整个流程的执行如下图所示:使用Redisson实际使用maven添加依赖org.redissonredisson-spring-boot-starter3.16.7添加Redis配置,码哥的Redis没有密码,可以根据以实际情况为准。弹簧:应用程序:名称:redissionredis:主机:127.0.0.1端口:6379ssl:false@Slf4j@ServicepublicclassQueueService{@AutowiredprivateRedissonClientredissonClient;/***发送消息到队列**@parammessage*/publicvoidsendMessage(Stringmessage){RStreamstream=redissonClient.getStream("sensor#4921");stream.add("速度","19");stream.add("速度","39%");stream.add("温度","10C");}/***消费者消费消息**@parammessage*/publicvoidconsumerMessage(Stringmessage){RStreamstream=redissonClient.getStream("sensor#4921");stream.createGroup("sensors_data",StreamMessageId.ALL);Map>messages=stream.readGroup("sensors_data","消费者_1");for(Map.Entry>entry:messages.entrySet()){Mapmsg=entry.getValue();System.out.println(味精);stream.ack("sensors_data",entry.getKey());}}}各位读者朋友,如果阅读后有所收获,请点赞、收藏和分享,感谢您对他人、自私和黎明人的支持。参考链接:https://blog.51cto.com/u_1523...https://redis.io/topics/strea...https://redisson.org/articles...