当前位置: 首页 > 后端技术 > Java

最通俗易懂的Redis发布订阅与代码实践

时间:2023-04-01 15:42:25 Java

pub-subscribe简介Redis除了使用List实现简单的消息队列功能外,还提供了发布订阅消息机制。在这种机制下,消息发布者向指定的通道(channel)发布消息,消息订阅者可以接收到指定通道的消息。同一个频道可以有多个消息订阅者,如下图所示:Redis也提供了一些命令支持这种机制,接下来我们详细介绍这些命令。发布订阅相关命令在Redis中,发布订阅相关命令有:发布消息订阅频道取消订阅根据模式订阅根据模式取消订阅查询订阅信息发布消息发布消息的命令为publish,语法为:发布频道名消息channel:one-more-study:demo频道发布消息“IamOneMoreStudy.”,命令如下:>publishchannel:one-more-study:demo“IamOneMoreStudy.”(integer)由0返回结果是订阅者的数量。上面的例子中没有订阅者,所以返回结果为0。订阅新闻订阅新闻的命令是subscribe。订阅者可以订阅一个或多个频道。语法为:订阅频道名称[频道名称...]例如订阅频道:one-more-study:demo,命令如下:>subscribechannel:one-more-study:demo阅读消息。..(按Ctrl-C退出)1)"subscribe"2)"channel:one-more-study:demo"3)(integer)1返回结果中有3项,分别表示:的类型返回值(订阅成功),订阅的频道名称,当前订阅的频道数。当订阅者收到消息时,会显示:1)"message"2)"channel:one-more-study:demo"3)"IamOneMoreStudy"。还有3个结果,分别表示:返回值的类型(消息)、消息源的通道名、消息内容。新开通的订阅者无法接收到该频道之前的历史消息,因为Redis不会持久化发布的消息。取消订阅取消订阅的命令是unsubscribe,可以取消订阅一个或多个频道。语法为:unsubscribe[频道名称[频道名称...]]例如取消订阅channel:one-more-study:demo频道,命令如下:>unsubscribechannel:one-more-study:demo1)"unsubscribe"2)"channel:one-more-study:demo"3)(integer)0返回结果中有3项,分别表示:返回值的类型(取消订阅成功),取消订阅的频道名称,以及当前订阅的频道数量。按模式订阅消息按模式订阅消息的命令是psubscribe,它订阅一个或多个匹配给定模式的频道。语法为:psubscribepattern[pattern...]每个pattern作为匹配,例如channel匹配所有以channel开头的频道对于开头的频道,命令如下:>psubscribechannel:*Readingmessages...(按Ctrl-C退出)1)"psubscribe"2)"channel*"3)(integer)1返回结果中有3条消息,分别表示:返回值的类型(订阅成功者模式)、订阅模式和当前订阅模式的数量。当订阅者收到消息时,会显示:1)"pmessage"2)"channel*"3)"channel:one-more-study:demo"4)"IamOneMoreStudy"。有4项,分别表示:返回值类型(信息)、消息匹配方式、消息源的通道名称、消息内容。按花样取消订阅按花样取消订阅的命令为punsubscribe,可以取消对一个或多个花样的订阅。语法为:punsubscribe[pattern[pattern...]]每个pattern作为匹配,如channel:匹配所有以channel开头的频道,命令如下:1>punsubscribechannel:*1)"punsubscribe"2)"channel:*"3)(integer)0返回结果中有3项,分别表示:返回值的类型(以SUBSCRIBESUCCESSFUL方式取消)、未订阅的schema、当前订阅的schema个数。查询订阅信息查看活跃频道活跃频道是指至少有一个订阅者的频道。语法为:pubsubchannels[mode]例如:>pubsubchannels1)"channel:one-more-study:test"2)"channel:one-more-study:demo"3)"channel:demo">pubsubchannels*demo1)"channel:one-more-study:demo"2)"channel:demo">pubsubchannels*one-more-study*1)"channel:one-more-study:test"2)"channel:one-more-study:demo"查看频道订阅数pubsubnumsub[频道名称...]例如:>pubsubnumsubchannel:one-more-study:demo1)"channel:one-more-study:demo"2)(integer)1查看模式订阅号>pubsubnumpat(integer)1代码实战轻谈不练假式,我们用Java语言写了一个简单的发布订阅例子。Jedis集群示例Jedis是Redis官方推荐的Java连接开发工具。我们使用Jedis编写了一个简单的集群示例。包onemore.study;导入redis.clients.jedis.HostAndPort;导入redis.clients.jedis.JedisCluster;导入redis.clients.jedis.JedisPoolConfig;导入java.util.HashSet;导入java.util.Set;/***Jedis集群**@author万猫学社*/publicenumCluster{INSTANCE;//为简单起见,这里直接写IP和端口,实际开发中最好写在配置文件中。privatefinalStringhostAndPorts="192.168.0.60:6379;192.168.0.61:6379;192.168.0.62:6379";私有JedisClusterjedisCluster;Cluster(){JedisPoolConfigpoolConfig=newJedisPoolConfigMaxpool();//最大连接数20);//最大空闲数poolConfig.setMaxIdle(10);//最小空闲数poolConfig.setMinIdle(2);//从jedis连接池获取连接时,检查并返回可用连接poolConfig.setTestOnBorrow(true);//将连接放回jedis连接池时,验证并返回可用连接poolConfig.setTestOnReturn(true);Setnodes=newHashSet<>();String[]hosts=hostAndPorts.split(";");for(Stringhostport:hosts){String[]ipport=hostport.split(":");字符串ip=ipport[0];intport=Integer.parseInt(ipport[1]);nodes.add(newHostAndPort(ip,port));}jedisCluster=newJedisCluster(nodes,1000,poolConfig);}民众JedisClustergetJedisCluster(){返回jedisCluster;}}发布者展示示例packageonemore.study;importredis.clients.jedis.JedisCluster;/***发布者**@author万猫学社*/publicclassPublisherimplementsRunnable{privatefinalStringCHANNEL_NAME="channel:one-更多学习:演示”;私人最终字符串QUIT_COMMAND=“退出”;@Overridepublicvoidrun(){JedisClusterjedisCluster=Cluster.INSTANCE.getJedisCluster();for(inti=1;i<=3;i++){Stringmessage="第"+i+"消息";System.out.println(Thread.currentThread().getName()+"发布:"+message);jedisCluster.publish(CHANNEL_NAME,消息);尝试{Thread.sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("----------------");}jedisCluster.publish(频道_NAME,QUIT_COMMAND);}}订阅者展示示例packageonemore.study;importredis.clients.jedis.JedisCluster;importredis.clients.jedis.JedisPubSub;/***订阅者**@author万猫学社*/publicclassSubscriberimplementsRunnable{privatefinalStringCHANNEL_NAME="channel:one-more-study:demo";私人最终字符串QUIT_COMMAND=“退出”;privatefinalJedisPubSubjedisPubSub=newJedisPubSub(){@OverridepublicvoidonMessage(Stringchannel,Stringmessage){System.out.println(Thread.currentThread().getName()+"接收:"+message);if(QUIT_COMMAND.equals(message)){unsubscribe(CHANNEL_NAME);}}};@Overridepublicvoidrun(){JedisClusterjedisCluster=Cluster.INSTANCE.getJedisCluster();jedisCluster.subscribe(jedisPubSub,CHANNEL_NAME);}}综合示例packageonemore.study;publicclassApp{publicstaticvoidmain(String[]args)throwsInterruptedException{//创建3个订阅者newThread(newSubscriber()).start();新线程(新订阅者())。开始();新线程(新订阅者())。开始();线程.sleep(1000);//创建发布者newThread(newPublisher()).start();}}运行结果如下:Thread-6发布:第一条消息Thread-0收到:第一条消息Thread-1收到:第一条消息Thread-2收到:第一条消息----------------Thread-6发布:第二条消息Thread-0接收:第二条消息Thread-1接收:第二条消息Thread-2接收:第二条消息-----------------Thread-6发布:第三条消息Thread-0接收:第三条消息Thread-2接收:第三条消息Thread-1接收:第三条消息------------------线程-0接收:quitThread-1接收:quitThread-2接收:退出