当前位置: 首页 > 后端技术 > PHP

RabbitMQ+PHP教程三(发布-订阅)

时间:2023-03-30 00:44:26 PHP

使用php-amqplib简介在上一个教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务都交付给工作人员进行处理。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者发送消息。这种模式称为“发布/订阅”。为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成,第一个将发出日志消息,第二个将接收并打印它们。在我们的日志系统中,接收程序的每个运行副本都会收到消息。这样我们就可以运行一个接收器并将日志定向到磁盘;同时我们可以运行另一个接收器并在屏幕上查看日志。本质上,发布的日志消息将广播给所有接收者。交换在本教程的前面部分,我们从队列发送和接收消息。现在是时候在Rabbit中引入完整的消息传递模型了。让我们快速浏览一下之前教程中介绍的内容:生产者是发送消息的用户应用程序。队列是存储消息的缓冲区。消费者是接收消息的用户应用程序。RabbitMQ消息模型的核心思想是生产者不直接向队列发送任何信息。事实上,生产者甚至不知道消息是否会被发送到任何队列。相反,生产者只能将消息发送到交换器(Exchange)。开关的作用很简单。它一方面接收来自生产者的消息,另一方面将它们推送到队列中。Exchange必须知道如何处理收到的消息。它应该附加到特定队列吗?它应该被添加到多个队列吗?还是应该丢弃?.此规则由交换类型定义。有几种可用的交换类型:直接、主题、标题和扇出。我们将专注于最后一个-扇出。让我们创建这种类型的交换并将其称为日志:$channel->exchange_declare('logs','fanout',false,false,false);扇出交换非常简单。正如您可能从名称中猜到的那样,它只是将收到的所有消息广播到它知道的所有队列。这正是我们需要的记录器。Listingexchanges列出了服务器上的交易所,你可以运行rabbitmqctl:sudorabbitmqctllist_exchanges在这个列表中会有一些amq.*交易所和默认(未命名)的交易所。这些>是默认创建的,但目前不太可能使用它们。默认交换器在本教程的前几部分中,我们对交换器一无所知,但仍然能够向队列发送消息。这是可能的,因为我们使用的是默认交易所,我们通过空字符串("")来识别它们。回想一下我们之前是如何发布消息的:$channel->basic_publish($msg,'','hello');我们在这里使用默认或无名交换:消息被路由到具有指定routing_key名称的队列(如果存在)。路由键是第三个参数:basic_publish现在,我们可以将它发布到我们命名的Exchange:$channel->exchange_declare('logs','fanout',false,false,false);$channel->basic_publish($msg,'日志');临时队列也许您还记得我们之前用来指定队列的名称(还记得hello和task_queue吗?)。能够命名队列对我们来说至关重要——我们的Workers需要指向同一个队列。当您想要在生产者和消费者之间共享队列时,为队列命名很重要。但是我们的记录器不是这样的。我们想了解所有日志消息,而不仅仅是其中的一部分。我们也只对当前流动的消息感兴趣,而不是旧消息。要解决这个问题,我们需要做两件事。首先,我们每次连接Rabbit时都需要一个新的空队列。为此,我们可以创建一个随机名称的队列,或者更好——让服务器为我们选择一个随机的队列名称。其次,一旦消费者断开连接,队列应该自动删除。在php客户端中,当我们将队列名称作为空字符串提供时,我们创建了一个具有生成名称的非持久队列:list($queue_name,,)=$channel->queue_declare("");方法返回,queue_name变量包含随机生成的RabbitMQ队列名称。例如,它可能看起来像amq.gen-jzty20brgko-hjmujj0wlg当声明连接关闭时,队列被删除,因为它被声明为独占。绑定我们已经创建了扇出交换和队列。现在我们需要告诉Exchange将消息发送到我们的队列。交换器和队列之间的关系称为绑定。$channel->queue_bind($queue_name,'logs');从现在开始,日志交换将向队列中添加消息。列出绑定您可以使用现有的绑定列表,使用以下命令:rabbitmqctllist_bindings让我们把它放在一起(Puttingitalltogether)生成日志消息的生成过程与前面的教程没有太大区别。最重要的变化是我们现在想要将消息发布到我们的日志交换,而不是匿名的。这是emit_log.php代码:channel();$channel->exchange_declare('logs','fanout',false,false,false);$data=implode('',array_slice($argv,1));if(empty($data))$data="info:HelloWorld!";$msg=newAMQPMessage($data);$channel->basic_publish($msg,'logs');echo"[x]Sent",$data,"\n";$channel->close();$connection->close();?>emit_log.phpsource如你可以看到,连接建立后,我们声明了exchange。此步骤是必要的,因为禁止发布到不存在的交易所。如果没有队列绑定到交换器,消息将会丢失,但这对我们来说很好;如果没有用户在收听,我们可以安全地丢弃该消息。receive_logs.php代码:channel();$channel->exchange_declare('logs','fanout',false,false,false);列表($queue_name,,)=$channel->queue_declare("",false,false,true,false);$channel->queue_bind($queue_name,'logs');echo'[*]等待日志。要退出,请按CTRL+C',"\n";$callback=function($msg){echo'[x]',$msg->body,"\n";};$channel->basic_consume($queue_name,'',false,true,false,false,$打回来);while(count($channel->callbacks)){$channel->wait();}$channel->close();$connection->close();?>receive_logs.php如果你想保存日志到一个文件,只需打开控制台并键入:phpreceive_logs.php>logs_from_rabbit.log如果您想在屏幕上查看日志,请生成一个新终端并运行:当然是phpreceive_logs.php,然后触发日志类型:phpemit_log.php使用rabbitmqctllist_bindings你可以验证代码实际上创建绑定和队列是我们想要的。运行两个receive_logs.php程序,您应该会看到:sudorabbitmqctllist_bindings#=>Listingbindings...#=>logsexchangeamq.gen-JzTY20BRgKO-HjmUJj0wLgqueue[]#=>logsexchangeamq.gen-vso0PVvyiRIL2WoV3i48Ygqueue[]#=>...完成。结果的解释很简单:来自Exchange日志的数据进入两个队列,这些队列的名称由服务器分配。这正是我们想要的。要查看如何收听消息的子集,让我们转到RabbitMQ+PHP教程四(路由)。从RabbitMQ翻译-RabbitMQ教程-发布/订阅