当前位置: 首页 > 后端技术 > PHP

RabbitMQ使用

时间:2023-03-29 20:34:51 PHP

RabbitMQ的定义RabbitMQ是一个使用erlang语言开发的开源消息队列系统,完全实现了AMPQ(AdvancedAbstractionLayerMessageCommunicationProtocol)。Mac下使用Homebrew安装RabbitMQ$brewinstallrabbitmq修改~/.bash_profile配置环境变量:#RabbitMQConfigexportPATH=$PATH:/usr/local/sbin重启配置$source~/.bash_profile启动mq服务(后台)startsasrabbitmq-server-detached)$rabbitmq-server登录管理界面http://127.0.0.1:15672账号密码:guestclientRabbitMQ官方提供了三个PHP扩展:php-amqp,php-rabbit,php-amqplibphp-amqplib的安装php的客户端现在常用的是php-amqplib直接拉github上面的代码$gitclonehttps://github.com/php-amqplib/php-amqplib.gitcomposer安装(官网提供)将composer.json将文件添加到您的项目{"require":{"php-amqplib/php-amqplib":">=2.6.1"}}下载依赖项$composerinstall概念VirtualvhostsVirtualvhosts是一个命名空间,其中有多个交换和队列。实现环境(用户、用户组、exchange、queue)隔离,是权限控制的最小粒度。默认的虚拟主机是/。Exchange(交换机)接受生产者发送的消息,并根据绑定规则转发到相应的队列中。对于未命名的交换,默认是使用空字符串标识符。交换器类型(switchtype)包括四种类型:direct、topic、headers、fanoutdirect将消息转发到routigKey指定的队列中。topic和direct类型类似,只是routigKey是一个字符串,用句点“.”隔开。*可以替换一个词。#可以替换零个或多个单词。根据发送的消息内容中的headers属性来匹配headers。fanout将所有接收到的消息广播到所有已知队列。Queue(消息队列)queue是mq的一个内部对象,用来存放没有被客户消费的消息。相同属性的队列可以重复定义,每条消息都会放入一个或多个队列中。绑定(binding)绑定就是根据路由规则绑定exchange和queue。可以这样理解,绑定就是exchange和queue的关系连接(connection)消息tcp连接通道(channel)在每一个连接中,可以建立多个通道,每个通道代表一个会话任务。尝试共享connectionRabbitMQExample1.send.php:require_once__DIR__。'/vendor/autoload.php';使用PhpAmqpLib\Connection\AMQPStreamConnection;使用PhpAmqpLib\Message\AMQPMessage;//创建连接$connection=newAMQPStreamConnection('localhost',5672,'guest','guest');//创建一个通道,多个通道可以共享连接$channel=$connection->channel();//创建交换机和队列(如果已经存在,无需重新创建再绑定)//创建直接交换机$channel->exchange_declare('direct_logs','direct',false,false,false);//创建队列$channel->queue_declare('hello',false,false,false,false);//交换机和队列的绑定,$channel->queue_bind('hello','direct_logs','routigKey');//设置消息bady传输字符串日志(消息只能是字符String,建议消息为json格式)$msg=newAMQPMessage('logs');//发送数据到对应的交换机direct_logs,并设置对应的routigKey$channel->basic_publish($msg,'direct_logs','routigKey');2.receive.php:require_once__DIR__.'/vendor/autoload.php';使用PhpAmqpLib\Connection\AMQPStreamConnection;//创建连接$connection=newAMQPStreamConnection('localhost',5672,'guest','guest');//创建一个通道,多个通道可以共享连接$channel=$connection->channel();//可能在数据释放之前启动消费者,所以我们需要确保队列存在,然后再尝试从中消费消息//创建直接交换$channel->exchange_declare('direct_logs','direct',false,假的,假的);//创建队列$channel->queue_declare('hello',false,false,false,false);//交换机与队列的绑定,$channel->queue_bind('hello','direct_logs','routigKey');//回调函数$callback=function($msg){echo$msg->body;};//启动队列消费者$channel->basic_consume('hello3','',false,true,false,false,$callback);//判断是否有回调函数while(count($channel->callbacks)){//这里是执行回调函数$channel->wait();}RabbitMQ注意非持久化会导致队列重启,数据丢失和exchange持久化声明durable参数时指定truequeue持久化,声明durable参数时指定true消息持久化,实例化AMQPMessage类时指定delivery_mode为2exchange和queue持久化需要一致绑定消费者设置手动ack,在声明no_ack参数的时候,指定queue消息异常为false,需要删除消息最后再发送相同的消息,手动记录日志