当前位置: 首页 > 科技观察

Redis5的新特性,Streams作为消息队列

时间:2023-03-14 21:20:40 科技观察

前言Redis5的新特性中,Streams数据结构的引入可以说是本次迭代最大的特点。使得Redis在本次5.x版本迭代中作为消息队列,尤其是持久化消息队列得到了更完善、更强大的原生支持。同时,stream借鉴了kafka的消费者组模型的理念和设计,让消费者消息的处理更加高效快速。本文分析Streams数据结构中常用的API。准备本文使用的Redis版本是5.0.5。如果您使用的是5.x之前的版本,部分API使用效果与本文描述的略有不同。AddmessageStreams使用XADD指令添加数据,消息中的数据以K-V键值对的形式进行操作。一条消息可以有多个键值对,添加命令格式:XADDkeyIDfieldstring[fieldstring...]其中key是Streams的名字,ID是消息的唯一标识,不能重复,fieldstring是key-值对。接下来我们添加一个名为person的stream来操作。XADDperson*nameytaodeshttps://ytao.top在上面添加的案例中,ID是使用*号复制的,表示服务器自动生成Id,添加后返回数据“1578238486193-0”。这里自动生成的Id的格式为-Id由两部分组成:millisecondsTime是当前服务器时间的毫秒时间戳。sequenceNumber当前序列号,取值取自当前毫秒内消息产生的先后顺序,默认从0开始递增1。例如:1578238486193-3表示在1578238486193毫秒的时间戳添加的第4条消息。除了服务端自动生成Id的方式,也支持生成指定Id,但是指定Id有以下限制:Id的前后两部分必须是数字。最小Id是0-1,不是0-0,但是2-0、3-0....是允许的。对于添加的消息,Id的前半部分不能小于已有Id的最大值,Id的后半部分不能小于同一个前半部分的最大后半部分。否则,当不满足上述条件时,添加后会抛出异常:(error)ERRTheIDspecifiedinXADDisequalorsmallerthanthetargetstreamtopitem实际上,在添加消息时,会进行两个操作。第一步是判断Streams是否不存在,创建Streams的名称,然后将消息添加到Streams中。即使在添加消息时,由于Id异常,它也可以以当前Streams的名称存在于Redis中。Streams中的Id也可以用作指针,因为它是一个有序的标签。在生产中,如果使用这种方式添加消息,会出现一个问题,即当消息数量过多时,服务会宕机。这里在Streams的早期设计中也考虑到了这个问题,即可以指定Streams的容量。如果容量操纵这个设定值,旧消息将被逆转。添加消息时,设置MAXLEN参数。XADDpersonMAXLEN5*nameytaodeshttps://ytao.top指定Streams的容量为5条消息。也可以使用XTRIM拦截消息,将多余的消息从小到大剔除:XTRIMpersonMAXLEN8MessageQuantity查看消息数量,使用XLEN命令进行操作。XLENkey示例:查看人员流中的消息数:>XLENperson(integer)5查询消息使用XRANGE和XREVRANGE指令查询Streams中的消息。XRANGE查询数据时,可以根据指定的Id范围进行查询。XRANGE查询命令格式:XRANGEkeystartend[COUNTcount]参数说明:key为Streams名称start为范围查询起始Id,包括本Id。start是范围查询的结束Id,包括这个Id。Count返回查询的最大消息数,可选。这里start和end有-和+两个不指定的值,分别代表无穷小和无穷大,所以使用这两个值时,会查询所有的消息。>XRANGEperson-+1)1)"0-1"2)1)"姓名"2)"ytao"3)"des"4)"https://ytao.top"2)1)"0-2"2)1)"name"2)"luffy"3)"des"4)"英勇!"3)1)"2-0"2)1)"name"2)"gaga"3)"des"4)“钓鱼!”上面查询的消息数据可以看出是按照先进先出的顺序查询的。使用COUNT指定查询返回的个数:#查询所有消息并返回一条数据>XRANGEperson-+COUNT11)1)"0-1"2)1)"name"2)"ytao"3)"des"4)"https://ytao.top"在范围查询中,Id的后半部分可以省略,将查询后半部分的所有数据。XREVRANGEEXREVRANGE的查询与XRANGE命令中使用的类似,只是查询的起始参数和结束参数的顺序相反:XREVRANGEkeyendstart[COUNTcount]Usecase:>XREVRANGEperson+-1)1)"2-0"2)1)"name"2)"gaga"3)"des"4)"fishion!"2)1)"0-2"2)1)"name"2)"luffy"3)"des"4)"valiant!"3)1)"0-1"2)1)"name"2)"ytao"3)"des"4)"https://ytao.top"查询结果与XRANGE结果,其他一切都一样。这两条指令可以按升序和降序返回消息。删除消息使用XDEL命令删除消息。您只需要指定要删除的流的名称和Id。它支持一次删除多条消息。XDELkeyID[ID...]deletecase:#queryallmessages>XRANGEperson-+1)1)"0-1"2)1)"name"2)"ytao"3)"des"4)"https:///ytao.top"2)1)"0-2"2)1)"name"2)"luffy"3)"des"4)"勇者!"3)1)"2-0"2)1)"name"2)"gaga"3)"des"4)"fishion!"#删除消息>XDELperson2-0(integer)1#再次查询所有删除的消息>XRANGEperson-+1)1)"0-1"2)1)"名字"2)"ytao"3)"des"4)"https://ytao.top"2)1)"0-2"2)1)"名字"2)"路飞"3)"des"4)"valiant!"#查询删除后的长度>XLENperson(integer)2由上可知,消息删除后,长度也会减少相应的量。消费消息在Redis的PUB/SUB中,我们通过订阅的方式来消费消息。在Streams数据结构中,也可以实现同样的功能。当没有新消息时,我们可以阻塞等待。不仅支持个人消费,还支持团体消费。个人消费个人消费使用XREAD指令。如您所见,在以下命令中,STREAMS、密钥和ID是必需的。ID表示将读取大于该ID的消息。当ID值赋值$时,表示现有消息的最大Id值。XREAD[COUNTcount][BLOCKmilliseconds]STREAMSkey[key...]ID[ID...]上面的COUNT参数用来指定最大读取次数,与XRANGE相同。>XREADCOUNT1STREAMSperson01)1)"person"2)1)1)"0-1"2)1)"name"2)"ytao"3)"des"4)"https://ytao.top">XREADCOUNT2STREAMSperson01)1)"人"2)1)1)"0-1"2)1)"姓名"2)"ytao"3)"des"4)"https://ytao.top"2)1)"0-2"2)1)"名字"2)"路飞"3)"des"4)"英勇!XREAD中还有一个BLOCK参数,用于阻塞订阅消息,BLOCK携带的参数是阻塞的时间,以毫秒为单位,如果在这段时间内没有新的消息消费,则释放该块。当这里的时间指定为0时,会一直阻塞,直到有新的消息消费。#窗口1被阻塞,等待新消息的到来>XREADBLOCK0STREAMSperson$#打开另一个连接窗口2,添加新消息>XADDperson2-2nametaodescoder"2-2"#窗口1,获取新消息消费,并计时block>XREADBLOCK0STREAMSperson$1)1)"person"2)1)1)"2-2"2)1)"name"2)"tao"3)"des"4)"coder"(60.81s)使用XREAD时对于顺序消费,需要额外记录读取位置的Id,方便下次消费。群组消费群组消费的主要目的是将消息分发到不同的客户端进行处理,并以更高效的速度处理消息。为了满足这个肝功能需求,我们需要做三件事:创建一个群,从群里读取消息,确认消息给服务器处理。组操作组使用XGROUP命令:XGROUP[CREATEkeygroupnameid-or-$][SETIDkeyid-or-$][DESTROYkeygroupname][DELCONSUMERkeygroupnameconsumername]在上面的命令中,包括的操作有:CREATE创建一个消费者组。SETID修改下一条处理消息的Id。DESTROY销毁消费者组。DELCONSUMER删除消费者组中的指定消费者。我们目前需要使用的是创建一个消费组:#使用当前存在的最大Id作为消费起点>XGROUPCREATEpersongroup1$OKGroup读取消息组读取使用XREADGROUP命令,COUNT和BLOCK的使用类似XREAD的操作,但是多了指定一个group和consumer:XREADGROUPGROUPgroupconsumer[COUNTcount][BLOCKmilliseconds]STREAMSkey[key...]ID[ID...]由于groupconsumption和individualconsumer类似,这里只做一个block分析,并且这里的Id也包含了一个特殊的值>,表示消息还没有被消费:#Window1,消费组中,淘淘消费者建立阻塞监听XREADGROUPGROUPgroup1taotaoBLOCK0STREAMSperson>#Window2,消费组中,yangyang消费者建立阻塞监听XREADGROUPGROUPgroup1yangyangBLOCK0STREAMSperson>#window3,添加消费消息>XADDperson3-1nametonydes666"3-1"#window1,读取新消息,此时窗口2无响应>XREADGROUPGROUPgroup1taotaoBLOCK0STREAMSperson>1)1)"person"2)1)1)"3-1"2)1)"name"2)"tony"3)"des"4)"666"(77.54s)#window3,再次添加消费消息>XADDperson3-2namejamesdesabc!"3-2"#window2,读取Fetchingnewmessages,此时window1无响应>XREADGROUPGROUPgroup1yangyangBLOCK0STREAMSperson>1)1)"person"2)1)1)"3-2"2)1)"姓名"2)"詹姆斯"3)"des"4)"abc!"(76.36s)在上面的执行过程中,group1组中有两个消费者。添加两条消息后,两个消费者轮流消费这条消息。ACK消息被消费后,为了避免重复消费,需要向服务端发送ACK,保证消息被消费。标记。比如下面这种情况,我们已经消费了上面最新的两条消息,但是当我们再次读取消息时,还是读取到:>XREADGROUPGROUPgroup1yangyangSTREAMSperson01)1)"person"2)1)1)"3-2"2)1)“名字”2)“詹姆斯”3)“德”4)“abc!”这时候我们用XACK命令告诉服务器我们已经处理了消息:XACKkeygroupID[ID...]0让服务器标记3-2Processed:>XACKpersongroup13-2(integer)1再次获取群读消息:>XREADGROUPGROUPgroup1yangyangSTREAMSperson01)1)"person"2)(emptylistorset)队列中没有可读消息。除了上面讲解的API,还可以使用XINFO命令查看消费组信息,本文不做分析。总结以上对Streams常用API的分析,可以感受到Redis在消息队列支持的道路上越来越强大。如果你用过它的PUB/SUB功能,你会觉得5.x的迭代只是优化了你的一些痛点。