当前位置: 首页 > 后端技术 > PHP

PHP和RabbitMQ消息队列

时间:2023-03-30 05:37:23 PHP

RabbitMQ提供了跨语言接口,我们可以使用主流编程语言Java、C、C++、Python、PHP等与RabbitMQ连接。RabbitMQ具有消息确认机制、灵活的路由控制、消息集群的高可用性,使得很多大型系统都使用RabbitMQ作为消息处理系统。消息队列(MessageQueue)是应用程序之间的一种通信方式。消息发送后立即返回,消息系统保证消息的可靠传递。消息发布者不管谁得到消息只把消息发布到MQ,消息使用者不管谁发布消息只从MQ拿到消息。这样,发布者和用户都不需要知道对方的存在。我们可以使用Redis来实现简单的消息队列,但是相对复杂的需求,比如消息确认、消息持久化、高可用等,需要借助RabbitMQ这样的大型工具来实现。在本文中,我们将通过实例来讲解使用PHP处理RabbitMQ消息队列的应用。安装php-amqplibphp-amqplib是一个纯PHP库,使用它,基于PHP的脚本客户端可以方便地连接和操作RabbitMQ。我们使用composer来安装。composerrequirephp-amqplib/php-amqplib这个例子说明了生产者(Producer)和消费者(Consumer)是消息队列的基本概念。生产者是指生产消息的一方,也是消息的发送者。消费者是消费消息的一方,也是消息的接收者,队列是存放消息的缓冲区。在这个例子中,生产者会向消息队列发送很多消息,多个消费者会消费队列中的消息。我们可以想象这样一个场景:在皮鞋生产包装车间,成品鞋源源不断地进入传送带(消息队列),等待操作员(消费者)对皮鞋进行包装。由于等待打包的鞋子太多,我们需要在传送带两侧安排多名打包工人,及时从传送带上取出成品鞋,然后打包入箱。我们的要求是保证工人最终打包的皮鞋数量是相当多的,打包工人不能因为操作慢或者个人原因临时离开生产线,导致最终打包数量不一致。消息发送生产者将消息发送到队列中,生产者不关心谁来消费(处理)这些消息。消息队列(MQ),用于存储消息直到发送给消费者。它是消息的容器,也是消息的目的地。一条消息可以放在一个或多个队列中。消息一直在队列中,等待消费者连接队列取走。消息到达队列后,如果没有消费者来处理消息,我们希望队列中的消息不会被丢弃,即消息会被持久化。生产者和消费者都必须将queue_declare的第三个参数设置为true,也就是让消息队列持久化。$channel->queue_declare($queue,false,true,false,false);另外,我们可以保证即使重启RabbitMQ,消息队列也不会丢失。在生产者端设置:'delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT。现在我们创建生产者文件sender.php,我们假设服务端已经安装了RabbitMQ并开启了相应的端口。如何安装?请参阅:在CentOS7系统上安装和配置RabbitMQ。channel();$channel->queue_declare($queue,false,true,false,false);//第三个参数设置为true,表示使消息队列持久化for($i=0;$i<100;$i++){$arr=['id'=>'message_'.$i,'order_id'=>str_replace('.','',microtime(true)).mt_rand(10,99).$i,'content'=>'helloweba-'.time()];$data=json_encode($arr);$msg=newAMQPMessage($data,['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);////设置rabbitmq重启后不丢队列,或者设置为'delivery_mode'=>2$channel->basic_publish($msg,'',$queue);echo'发送消息:'.$数据。PHP_EOL;}$channel->close();$connection->close();在上面的代码中,我们模拟了生产者向队列发送了100条订单消息。消息接收消费者是指完成消息接收和处理的客户端程序。消费者就像生产线上的操作员。有序完成后续工作任务。在实际项目中,如果消费者处理消息的能力不足,则需要启用多个消费者来消费队列中的消息。默认情况下,RabbitMQ会将队列中的消息平均分配给每个消费者。如果消费者必须长时间处理分配的消息任务(耗时任务),那么在处理消息任务时可能会出现意外。例如,如果消费者断电或发生故障,它正在处理的消息会发生什么情况?这就是RabbitMQ的消息确认机制。为了保证数据不丢失,RabbitMQ会将未处理的消息分配给下一个消费者处理。另外,RabbitMQ还可以设置消息任务的公平分配,不会将多个消息处理任务同时分配给一个消费者,因为消费者不能同时处理多个消息任务。也就是说,RabbitMQ在处理和确认消息之前,不会向消费者发送新的消息,而是将消息分发给下一个不忙的消费者。$channel->basic_qos(null,1,null);//处理并确认消息后消费新消息我们现在创建一个消费者文件receiver.php,代码如下:channel();$channel->queue_declare($queue,false,true,false,false);echo'[*]等待消息。要退出,请按CTRL+C'。PHP_EOL;$callback=function($msg){echo"Receivedmessage:",$msg->body,PHP_EOL;睡觉(1);//模拟耗时执行$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);};$channel->basic_qos(null,1,null);//处理和确认消息后消费新消息$channel->basic_consume($queue,'',false,false,false,false,$callback);//第四个参数的值为false,开启消息确认while(count($channel->callbacks)){$channel->wait();}$channel->close();$connection->close();模拟测试现在我们运行多个消费者终端,可以打开多个ssh客户端,client1和client2运行:phpreceive.php然后再打开一个终端,运行生产者:phpsender.php由于消费者被阻塞,所以他们会一直等待队列中的消息,当有消息的时候,他们会去取出来处理,我们可以模拟中断其中一个客户端,也就是断开一个消费者。然后查看该消息是否已经被其他消费者接收并处理。同样,我们可以模拟重启所有客户端,看看队列中的消息是否没有丢失。client1中断与RabbitMQ的连接后,再次运行与RabbitMQ的连接,在client2中看到消息处理状态,注意图中的消息id。client1:client2:下面我们来了解一下RabbitMQ消息发布订阅的相关知识,敬请期待。