简介在之前的HelloWorld教程中,我们编写了一个程序,用于从指定队列发送和接收消息。在这篇文章中,我们将创建一个工作队列,用于在多个工作人员(消费者)之间分配耗时任务。工作队列(又名任务队列)背后的主要思想是避免立即执行资源密集型任务,必须等待它完成。相反,我们计划稍后完成任务。我们将任务封装为消息发送到队列中。在后台运行的工作进程将弹出任务并最终执行它。当您运行许多工人(消费者)时,任务将在他们之间共享。这个概念在web应用程序中特别有用,因为不可能在短的HTTP请求中处理复杂的任务。先决条件在本教程的前一部分,我们发送了一条包含“HelloWorld”的消息。现在,我们将发送支持复杂任务的字符串。我们没有真正的环境任务,比如图像调整或PDF文件渲染,让我们使用sleep()来模拟真实环境的业务功能。我们将字符串中的点数作为其复杂度;每个点将占一秒钟的“工作”。例如,Hello...描述的伪任务需要三秒钟。new_task.php我们将稍微修改前面示例中的send.php代码,以允许从命令行发送任意消息。此计划将任务分配给我们的工作队列,因此我们将其命名为new_task.php:$data=implode('',array_slice($argv,1));if(empty($data))$data="HelloWorld!";$msg=newAMQPMessage($data);$channel->basic_publish($msg,'','hello');echo"[x]Sent",$data,"\n";我们上面的一个版本的receive.php脚本也需要一些改变:它需要在消息正文中每隔一个作业伪造一次。它将从队列中弹出消息并执行任务,所以我们将其命名为worker.php:$callback=function($msg){echo"[x]Received",$msg->body,"\n";//根据”。”获取延迟时间的个数,单位秒sleep(substr_count($msg->body,'.'));//模拟业务执行延时echo"[x]Done","\n";};$channel->basic_consume('hello','',false,true,false,false,$callback);单个工作人员简单地运行测试消费者phpworker.php消息生产者phpnew_task.php“一项非常艰巨的任务需要两秒钟......”循环调度使用任务队列的一个优点是能够轻松地并行工作。如果我们有大量积压的工作,我们可以增加更多的工人,这很容易扩展。首先,让我们尝试同时运行两个worker.php脚本。他们都会从队列中获取消息,看看它是如何工作的?让我们来看看。您需要打开三个控制台命令。两者都将运行worker.php脚本。这些控制台将是我们的两个消费者C1和C2。consumer1phpworker.phpconsumer2phpworker.phpmessageproducerphpnew_task.phpmsg1...默认情况下,RabbitMQ会将每条消息按顺序发送给下一个消费者。平均而言,每个消费者将收到相同数量的消息。这种分发消息的方式称为轮询。尝试使用三名或更多工人。确认任务完成的消息可能需要几秒钟。你可能会遇到如果一个消费者启动了一个长时间运行的任务并且只完成了一部分会发生什么?.使用我们当前的代码,一旦RabbitMQ向客户端发送消息,它就会立即被标记为删除。在这种情况下,如果您中止消费者,我们将丢失它正在处理的消息。我们还将丢失发送给该消费者的任何未完成的消息。如果我们不想丢失任何任务。如果一个消费者意外中止,我们希望将任务交给另一个消费者。为了保证消息不丢失,RabbitMQ支持消息确认。ACK(nowledgement)消费者返回的结果告诉RabbitMQ一条消息已经收到,你可以自由可控地删除它。如果消费者中止(其通道关闭、连接关闭或TCP连接丢失),RabbitMQ不会发送ACK。它会理解消息没有完全处理并重新排队。如果有其他用户同时在线,会很快传递给另一个消费者。这样,即使您意外中止,也可以确保不会丢失任何信息。没有消息超时;当消费者中止时,RabbitMQ将重新分发消息。即使处理消息需要很长时间,也没有关系。消息确认默认关闭。这可以通过将第四个参数basic_consume设置为false(true表示没有ACK)并在我们完成任务后从消费者发送适当的确认来完成。$callback=function($msg){echo"[x]Received",$msg->body,"\n";睡眠(substr_count($msg->body,'.'));echo"[x]完成","\n";$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);};$channel->basic_consume('task_queue','',false,false,false,false,$打回来);使用这段代码,我们可以确定,即使我们在处理消息时使用Ctrl+C杀死消费者,也不会丢失任何内容。消费者中止后不久,将重新分配未确认的消息。忘记确认(Forgottenacknowledgment)丢失ACK确认是一个常见的错误。这是一个容易犯的错误,但会带来严重的后果。当您的客户端退出时,消息将被重新分配(这看起来像是随机分配),RabbitMQ将消耗更多内存,它不会释放任何延迟确认消息。要调试这种错误,可以使用rabbitmqctl打印messages_unacknowledged字段:rabbitmqctllist_queuesnamemessages_readymessages_unacknowledged消息持久化(Messagedurability)我们已经学习了如何确保即使消费者死了,任务也不会丢失。但是如果RabbitMQ服务器停止了,我们的任务仍然可能会丢失。当RabbitMQ退出或崩溃时,队列和消息将丢失,除非您不希望它丢失。为了确保消息不丢失,需要做两件事:我们需要将队列和消息都标记为持久的。首先,我们需要确保RabbitMQ永远不会丢失队列。为了做到这一点,我们需要将其声明为持久的。为此,我们将queue_declare作为第三个参数传递给true:$channel->queue_declare('hello',false,true,false,false);虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们定义了一个名为hello的非持久队列。RabbitMQ不允许您使用不同的参数重新定义现有队列,任何尝试这样做的程序都将返回错误。但是有一个快速的解决方法——让我们声明一个具有不同名称的队列,例如task_queue:$channel->queue_declare('task_queue',false,true,false,false);需要应用于生产者和消费者代码设置为真。至此,我们可以保证即使RabbitMQ重启,task_queue队列也不会丢失。现在我们想通过设置delivery_mode=2消息属性将消息标记为持久消息,将amqpmessage作为属性数组的一部分。$msg=newAMQPMessage($data,array('delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT));消息持久化注意事项(Noteonmessagepersistence)将消息标记为持久化并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且不保存它时,会有一个很短的时间窗口。此外,RabbitMQ不会对每条消息都执行fsync(2)——它可能只是保存到缓存而不是实际写入磁盘。持久性保证不强,但对于我们的简单任务队列来说已经足够好了。如果您需要更强的保证,那么您可以使用消费者确认。公平调度您可能已经注意到,调度仍然不能完全按照我们的预期进行。比如有两个消费者的情况,当奇数消息都是重消息,偶消息都是轻消息时,一个消费者会一直很忙,而另一个几乎不干活。好吧,RabbitMQ不知道发生了什么,仍然会均匀地发送消息。这是因为RabbitMQ仅在消息进入队列时才分派消息。当有未确认的消息时。它只是盲目地将第n条消息分发给第n个消费者。为了改变这种分配方式,我们可以调用basic_qos方法并设置参数prefetch_count=1。这告诉RabbitMQ不要一次向一个消费者发送多条消息。或者,换句话说,在处理并确认之前的消息之前,不要向消费者发送新消息。相反,它会被发送给下一个仍然不忙的消费者。$channel->basic_qos(null,1,null);关于队列大小的注意事项如果所有消费者都很忙,您的队列就会填满。您需要密切注意这一点,可能会添加更多工人,或者采取其他策略。源码new_task.phpchannel();$channel->queue_declare('task_queue',false,true,false,false);$data=implode('',array_slice($argv,1));if(空($data))$data="HelloWorld!";$msg=newAMQPMessage($data,array('delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT));$channel->basic_publish($msg,'','task_queue');echo"[x]已发送",$data,"\n";$channel->close();$connection->close();?>worker.phpchannel();$channel->queue_declare('task_queue',false,真,假,假);echo'[*]等待消息。要退出,请按CTRL+C',"\n";$callback=function($msg){echo"[x]Received",$msg->body,"\n";睡眠(substr_count($msg->body,'.'));echo"[x]完成","\n";$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);};$channel->basic_qos(null,1,null);$channel->basic_consume('task_queue','',假的,假的,假的,假的,$回调);while(count($channel->callbacks)){$channel->wait();}$channel->close();$connection->close();?>使用消息确认和预取,可以设置一个work队列持久性配置选项,即使RabbitMQ重新启??动也能使任务保持活动状态。要了解如何向多个消费者传递相同的信息,您可以阅读下一章:RabbitMQ+PHP教程三(发布/订阅)。翻译自RabbitMQ-RabbitMQ教程-工作队列
