简介Redis发布发布功能(Pub/Sub)是一种基于事件席位的基本通信机制。就是解除消息的发布者和订阅者之间的耦合关系。Redis作为消息发布和订阅之间的服务器,起到了桥梁的作用。在Redis中,有一个channel的概念,即通道。发布者通过指定发布到某个频道,然后只要有订阅者订阅了该频道,就会将消息发送给订阅者。原理图如下:Redis也可以使用list类型来实现消息队列(消息队列的实现和应用场景会在下一篇文章中讲解)。Redis的发布订阅功能应用广泛,应用场景非常多。比如:最常见的是实现实时聊天功能,或者是博客的粉丝文章推送。当博主推送一篇原创文章时,该文章会实时推送给博主的粉丝。介绍完Redis的发布订阅功能,我们再来看看实际操作,包括linux命令的实际操作和java代码的实现。这里的命令练习假设所有读者都安装了自己的虚拟机环境和Redis。如果没有,可以参考这篇博文:https://www.cnblogs.com/zuidongfeng/p/8032505。html这里我已经安装好了Redis,直接启动我们的Redis。我已经设置了启动。上面的博文解释了如何设置启动。发布消息在Redis中发布消息的命令是publish,具体使用如下:PUBLISHtest"haha":test表示频道名称,haha表示发布的内容,这样每一个的发布消息完成,后续返回(integer)0表示0人订阅。订阅频道并同时启动另一个窗口。此窗口用作订阅者。订阅者命令subscribe,使用SUBSCRIBEtest表示你订阅了测试频道。订阅后返回的结果中有3条信息,第1条表示类型,第2条表示订阅的频道,第3条表示订阅数。然后在第一个窗口发布消息:可以看到发布者发布的消息,订阅者会实时收到。并发订阅者收到的消息中也会有3条消息,分别表示:返回值类型、通道名称、消息内容。Unsubscribe如果想取消之前的订阅,可以使用unsubscribe命令,格式为:unsubscribechannelname//取消之前订阅的测试频道unsubscribetest输入命令后返回如下结果:[root@pinyoyougou-dockersrc]#./redis-cli127.0.0.1:6379>UNSUBSCRIBEtest1)"unsubscribe"2)"test"3)(integer)0分别表示:返回值类型、频道名称、频道订阅数。模式订阅除了直接订阅特定的名城外,您还可以通过模式订阅。按模式订阅可以一次订阅多个频道。按模式订阅的命令为psubscribe,具体格式如下:psubscribe模式//表示订阅名称以ldc开头的频道psubscribeldc*输入上述命令后返回结果如下:127.0.0.1:6379>PSUBSCRIBEldc*Readingmessages...(pressCtrl-Ctoquit)1)"psubscribe"2)"ldc*"3)(integer)1这个也很简单,分别表示:返回的类型(表示按模式订阅的类型),订阅方式和订阅数量。按模式取消订阅如果想按模式取消之前的订阅,可以使用punsubscribe取消,具体格式:punsubscribemode//根据ldcpunsubscribeldc*其返回值开头的频道取消频道名,如下:127.0.0.1:6379>PUNSUBSCRIBEldc*1)"punsubscribe"2)"ldc*"3)(integer)0这个就不多说了,意思和上面一样,可以看出上面的命令都是正则的订阅SUBSCRIBE,取消为UNSUBSCRIBE,以UN为前缀,按模式订阅也是一样的。查看订阅消息(1)如果想查看某个模式下订阅大于零的频道,可以使用如下格式的命令进行操作:subscriptions大于零pubsubchannelsldc*(2)如果要查看某个频道的订阅号,可以使用以下命令:pubsubnumsub频道名(3)根据模式查看订阅号,可以使用下面命令进行操作:pubsubnumpat上面的命令操作到这里基本就结束了,下面我们来看实际的代码。代码实践(一)第一步是操作Redis,然后在SpringBoot项目中引入jedis的依赖。毕竟jedis是官方推荐的操作Redis的工具。redis.clientsjedis2.9.0(2)然后创建发布者Publisher用于发布消息,具体代码如下:/***Publisher*@authorliduchang**/publicclassPublisherextendsThread{//连接池privatefinalJedisPooljedisPool;//发布通道名privateStringname;publicPublisher(JedisPooljedisPool,Stringname){super();this.jedisPool=jedisPool;this.name=name;}@Overridepublicvoidrun(){//获取要发布的消息BufferedReaderreader=newBufferedReader(newInputStreamReader(System.in));//获取连接Jedisresource=jedisPool.getResource();while(true){Stringmessage=null;try{message=reader.readLine();if(!"exit".equals(message)){//发布消息resource.publish(name,"Publisher:"+Thread.currentThread().getName()+"发布消息:"+消息e);}else{break;}}catch(IOExceptione){e.printStackTrace();}}}}(3)然后创建订阅类Subscriber,并继承JedisPubSub类,重写onMessage,onSubscribe,而onUnsubscribe,这三个方法的调用时机在注释中有说明。具体实现代码如下:packagecom.ldc.org.myproject.demo.redis;importcom.fasterxml.jackson.core.sym.Name;importredis.clients.jedis。JedisPubSub;/***Subscriber*@authorliduchang*/publicclassSubscriberextendsJedisPubSub{//订阅频道名privateStringname;publicSubscriber(Stringname){this.name=name;}/***订阅者收到消息时会调用*/@OverridepublicvoidonMessage(Stringchannel,Stringmessage){//TODOAuto-generatedmethodstubsuper.onMessage(channel,message);System.out.println("Channel:"+channel+"Acceptedmessageis:"+message);}/***订阅的频道会被调用*/@OverridepublicvoidonSubscribe(Stringchannel,intsubscribedChannels){System.out.println("订阅的频道:"+channel+"订阅数为:"+subscribedChannels);}/***取消订阅的频道会被调用*/@OverridepublicvoidonUnsubscribe(Stringchannel,intsubscribedChannels){System.out.println("取消订阅的频道:"+channel+"的订阅频道的数量是:"+subscribedChannels);}}(4)本次创建的SubThread才是真正的订阅者SubThread。上面的Subscriber是指测试真实订阅或者发布消息时输出的信息:packagecom.ldc.org.myproject.demo.redis;importredis.clients.jedis.Jedis;importredis.clients.jedis.JedisPool;/***subscriberthread*@authorliduchang**/publicclassSubThreadextendsThread{privatefinalJedisPooljedisPool;privatefinalSubscribersubscriber;privatesubscriberStringname;publicSubThread(JedisPoolSubscridisPool,SubscriberStringname){super();this.jedisPool=jedisPool;this.subscriber=subscriber;this.name=name;}@Overridepublicvoidrun(){JedisPublicvoidrun()=null;try{jedis=jedisPool.getResource();//订阅频道为namejedis。subscribe(subscriber,name);}catch(Exceptione){System.err.println("订阅失败");e.printStackTrace();}finally{if(jedis!=null){//jedis.close();//返回连接到redispooljedisPool.returnResource(jedis);}}}}(5)之后是测试,分别测试发布和订阅测试,发布者为TestPublisher,订阅者为TestSubscriber:packagecom.ldc.org.myproject.demo.redis;导入java.util.concurrent.ExecutorService;导入java.util.concurrent.Executors;导入tjava.util.concurrent.TimeUnit;importredis.clients.jedis.JedisPool;publicclassTestPublisher{publicstaticvoidmain(String[]args)throwsInterruptedException{JedisPooljedisPool=newJedisPool("192.168.163.155");//发布消息到ldc通道Publisherpublisher(=新的,“ldc”);publisher.start();}}订阅者包com.ldc.org.myproject.demo.redis;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent。TimeUnit;importredis.clients.jedis.JedisPool;publicclassTestSubscriber1{publicstaticvoidmain(String[]args)throwsInterruptedException{JedisPooljedisPool=newJedisPool("192.168.163.155",6379);Subscribersubscriber=newSubscriber("/Thread");=newSubThread(jedisPool,subscriber,"ldc");thread.start();Thread.sleep(600000);//取消订阅subscriber.unsubscribe("ldc");}}这里直接创建线程的方式为了测试方便,更好的是,可以使用线程池通过线程池的submit方法执行线程,如果不需要可以使用shutdown方法关闭