当前位置: 首页 > 后端技术 > PHP

PHP实现redis消息发布订阅

时间:2023-03-30 01:54:24 PHP

Pub/Sub功能基本介绍(意思是Publish,Subscribe),即发布和订阅功能在基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它以事件作为基本的通信机制,提供大型系统所需的松耦合交互方式:一个订阅者(如客户端)以事件订阅的形式表达一个事件或它有兴趣接收的一类事件;发布者(例如服务器)可以订阅订阅者随时收到他们感兴趣的事件的通知。消息发布者,即发布客户端,不需要独占链接。您可以在发布消息的同时使用同一个redis-client链接执行其他操作(例如:INCR等)。消息订阅者,即订阅客户端,需要独享链接。即在订阅期间,redis-client不能穿插其他操作。此时客户端以阻塞的方式等待“发布端”的消息;这个很好理解,所以订阅端需要使用单独的链接,甚至在一个额外的线程中使用。使用银行卡消费时,银行往往会通过微信、短信或邮件等方式将交易信息告知用户。这是一种发布-订阅模式,发布是指交易信息的发布,订阅是指各种渠道。这在实际工作中很常见,Redis支持这种模式。发布-订阅模型首先需要一个消息源,即必须要有一个消息要发布,比如例子中的银行通知。首先是银行的簿记系统。收到交易订单并记账成功后,发送消息。这时候订阅者就可以接收到消息并进行处理了。观察者模式就是这种模式的典型模型。应用。终端实现订阅,频道为‘chat’发布消息。代码实现subscribe.phpconnect("127.0.0.1",6379);//$redis->setOption(Redis::OPT_READ_TIMEOUT,-1);//redis模式不设置超时,推荐$redis->subscribe(['chan'],'callback');//callback是回调函数的名称//$redis->subscribe(['chan'],array(newTestCall(),'callback'));//如果回调函数是类中的方法名,就这样写//回调函数,这里写处理逻辑functioncallback($instance,$channelName,$message){echo$channelName,"==>",$消息,PHP_EOL;//$instance,就是上面创建的redis实例对象,在回调函数中,默认参数为,所以不需要具体传参。除了SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE,PUNSUBSCRIBE这4个命令,这里不能使用其他命令$newredis->connect("127.0.0.1",6379);回显$newredis->get('test')。PHP_EOL;$newredis->close();//可以根据$channelName处理不同的业务逻辑,$messageswitch($chan){case'chan-1':...break;case'chan-2':...中断;}switch($message){case'msg1':...break;case'msg2':...中断;}}publish.phpconnect("127.0.0.1",6379);$redis->publish('chan','这是一条消息');代码在subscribe.php中引入不设置超时的方法一:ini_set('default_socket_timeout',-1);方法二:$redis->setOption(Redis::OPT_READ_TIMEOUT,-1);如果不设置超时,60s后会报错:PHPFatalerror:UncaughtRedisException:readerroronconnectionto127.0.0.1:6379insubscribe.php:6方法一的实现,是通过临时修改配置值ini的default_socket_timeout默认为60s,default_socket_timeout是socket流的超时参数,即socket流从建立到传输到关闭的整个过程必须在这个参数设置的时间内完成,如果不能完成,然后PHP将自动结束套接字并返回警告。第二种方法是修改redis配置项,所以只对redis连接生效。与方法一相比,不会对其他方法产生意想不到的影响。批量订阅redis的psubscribe支持通过模式匹配进行批量订阅,订阅方式$redis->psubscribe(['my*'],'psubscribe');//回调函数写入函数名或$redis->psubscribe(['my*'],array(newTestCall(),'psubscribe'));//回调函数是类中的一个方法,类名写自己定义的classsubscribe.phpconnect("127.0.0.1",6379);$redis->setOption(Redis::OPT_READ_TIMEOUT,-1);//匹配方法一:可用于发布$redis->publish('mymest','这是一条消息');//$redis->psubscribe(['my*'],'psubscribe');//匹配方式二:发布可用$redis->publish('mydest','thisisamessage');//$redis->psubscribe(['my?est'],'psubscribe');//匹配方法三:发布可用$redis->publish('myaest','thisisamessage');或$redis->publish('myeest','这是一条消息');$redis->psubscribe(['my[ae]est'],'psubscribe');functionpsubscribe($redis,$pattern,$chan,$msg){echo"模式:$pattern\n";echo"频道:$chan\n";echo"Payload:$msg\n";}模式匹配规则支持以下类型,以hello为例:h?llo订阅hello、hallo和hxlloh*llo订阅hllo和heeeelloh[ae]llo订阅hello和hallo,但不是hillo特殊字符使用\转义pubsub方法介绍publicfunctionpubsub($keyword,$argument)pubsub获取pub/sub系统的信息,$keyword可用"channels","numsub",or"numpat"三种类型,不同关键字返回的数据*$redis->pubsub('channels');//Allchannels获取所有频道并返回一个数组*$redis->pubsub('channels','*pattern*');//仅匹配你的模式的通道,返回匹配匹配模式的通道*$redis->pubsub('numsub',array('chan1','chan2'));//获取'chan1'和'chan2'的订阅者数量//返回每个订阅频道的数量,返回一个数组*$redis->pubsub('numpat');//获取模式订阅者数量获取模式匹配模式下的订阅数量,即$redis->psubscribe(['my[ae]est'],'psubscribe');返回编号为1,$redis->subscribe(['chan'],'callback');该方法获取不到,所以返回数量为0。参考:Redis发布订阅模式