当前位置: 首页 > 科技观察

本文将带你了解Redis的发布订阅底层原理

时间:2023-03-20 10:25:08 科技观察

01。前言发布和订阅系统在我们的日常工作中经常会用到。在这个场景中的大多数情况下,我们使用消息队列。常用的消息队列有Kafka、RocketMQ、RabbitMQ,每个消息队列都有自己的特点。本文主要是介绍Redis的发布订阅系统。很多时候,我们可能不需要独立部署相应的消息队列,只是简单的使用,数据量不会太大。在这种情况下,我们可以使用Redis的Pub/Sub模型。02.使用方法2.1发布与订阅Redis的发布与订阅功能主要由PUBLISH、SUBSCRIBE、PSUBSCRIBE命令组成。一个或多个客户端订阅一个或多个频道。当其他客户端向该频道发送消息时,订阅已加入该频道的客户端将收到相应的消息。上图中,有四个客户端,Client02、Client03、Client04订阅了同一个Sport频道(Channel)。此时,当Client01向SportChannel发送消息“篮球”时,三个Client02-04两端同时收到消息。整个过程的执行命令如下:首先打开四个Redis客户端,然后在Client02、Client03、Client04分别输入subscribesport命令,表示订阅体育频道,然后进入publishsportbasketball在Client01客户端中将消息“basketball”发送到体育频道。这个时候我们在看Client02-04的clients,可以看到消息已经收到了,每个订阅这个频道的client都是一样的。这里Client02-Client04订阅Sport频道,我们称为订阅者,Client01发布消息,我们称为发布者,发送的消息为消息。2.2.Patternsubscription我们在上面看到的是一个客户端订阅了一个Channel。事实上,单个客户端也可以同时订阅多个Channel。使用模式匹配,客户端可以同时订阅多个频道。如上图,Client05通过命令subscriberun订阅了run频道,Client06通过命令psubscriberun*订阅了频道匹配run*。当客户端07向运行通道发送消息666时,客户端05和06都收到消息;当Client07向run1和run_sport这两个通道发送消息时,Client06仍然可以收到消息,而Client05则收不到消息。客户端05订阅run通道并接收消息:客户端06订阅run*通道并接收消息:客户端07向多个通道发送消息:通过上面的案例,我们了解到一个客户端可以分别订阅单个或多个通道通过subscribe和psubscribe命令,客户端可以通过publish发送相应的消息。在命令行中,我们可以使用Ctrl+C来取消相关的订阅,对应的命令是unsubscribechannelName。03.Pub/Sub底层存储结构3.1.订阅Channel在Redis的底层结构中,客户端和Channel的订阅关系是通过字典加链表的结构来保存的。形式如下:在Redis的底层结构中,Redis服务器结构体定义了一个pubsub_channels字典structredisServer{//用来保存所有频道的订阅关系dict*pubsub_channels;}在这个字典中,key代表的是频道名,值为一个链表,里面存储了所有订阅该频道的客户端。因此,当客户端执行订阅频道的动作时,服务器会将客户端与pubsub_channels字典中订阅的频道相关联。这个时候有两种情况:频道是第一次订阅:频道是第一次订阅,说明字典里没有这个频道的信息,那么程序要先创建一个对应的key,并且分配一个空链表,然后将对应的Client添加到链表中。此时链表只有一个元素。该频道已经被其他客户端订阅:此时只需在链表末尾添加相应的客户端信息即可。比如有一个新的客户端Client08订阅了run频道,那么上图就变成了如果Client08要订阅一个新的频道new_sport,那么就变成了整个订阅流程。下面的伪代码可以用来实现Map>pubsub_channels=newHashMap<>();publicvoidsubscribe(String[]subscribeList,Objectclient){//遍历所有订阅的频道,检查是否在pubsub_channels中,如果不在,新建一个key和一个空链表for(inti=0;i());}pubsub_channels.get(subscribeList[i]).add(客户端);}}3.2取消订阅上面介绍了单个Channel的订阅。相反,如果一个client要取消订阅相关的Channel,无非就是找到对应Channel的链表,删除对应的client,如果client是最后一个,对应的Channel也会被删除。publicvoidunSubscribe(String[]subscribeList,Objectclient){//遍历所有订阅的频道,依次删除for(inti=0;ipubsub_patterns=newArrayList<>();publicvoidpsubscribe(String[]subscribeList,Objectclient){//遍历所有订阅的频道,为(inti=0;i