workqueue在第一篇文章中,我们编写了一个程序来从声明的队列中发送和接收消息。在本文中,我们将创建一个工作队列(WorkQueue)来分配工作中的耗时任务。主要思想是避免立即执行资源密集型任务并等待它完成。相反,我们希望这些任务稍后执行。我们把任务封装成一个消息放到队列中。一个工作进程将在后台执行,获取(Pop)任务并最终完成任务,当您运行多个工作进程时,这些任务将在他们之间共享。当无法在单个http请求窗口内完成复杂任务时,此概念在Web应用程序中也非常有用。准备工作在之前的引导程序中,我们发送了一条“HelloWorld”消息。现在我们想要发送一个表示复杂任务的字符串,我们没有像调整图像大小或渲染pdf文件这样的现实世界任务,所以我们使用setTimeout来模拟我们很忙。我们取'.'的数量。表示字符串的复杂性;每个'。'将消耗一秒钟,例如:模拟任务“你好...”将消耗三秒钟。从前面的例子中,我们稍微修改了send.js的代码以允许命令行发送任意消息。这个程序在工作队列中安排任务,所以我们称它为new_task.jsvarq='task_queue';varmsg=process.argv.slice(2).join('')||“你好世界!”;通道assertQueue(q,{durable:true});ch.sendToQueue(q,newBuffer(msg),{persistent:true});console.log("[x]Sent'%s'",msg);我们之前的receive.js也需要做一些改动。它需要模拟每个'.'在消息内容中作为将消耗一秒钟的任务。它从队列中获取一条消息并执行此任务,我们称之为worker.jsch.consume(q,function(msg){varsecs=msg.content.toString().split('.').length-1;console.log("[x]Received%s",msg.content.toString());setTimeout(function(){console.log("[x]Done");},secs*1000);},{noAck:真});注意我们模拟的执行时间执行我们的程序shell1$./worker.jsshell2$./new_task.js使用任务队列(TaskQueue)的好处之一是简化了并行工作的能力。如果我们有很多未完成的任务堆积起来,我们只需添加更多的工人来扩大规模。首先,我们尝试同时启动两个worker.js,它们都会从队列中接收消息,但实际上呢?让我们看看你需要打开第三个命令行,两个来运行worker.js脚本,我们称之为C1,C2shell1$./worker.js[*]等待消息。要退出,请按CTRL+Cshell2$./worker.js[*]等待消息。要退出按CTRL+C在第三个命令行工具中,我们将发布新任务,一旦你启动消费者,你可以发布一些消息:shell3$./new_task。js第一条消息.shell3$./new_task.js第二条消息..shell3$./new_task.js第三条消息...shell3$./new_task.js第四条消息....shell3$./new_task.js第五条消息。....让我们看看发送给我们的workershell1$./worker.js[*]Waitingformessages的内容。要退出,请按CTRL+C[x]收到“第一条消息”。[x]收到“第三条消息...”[x]收到“第五条消息......”shell2$./worker.js[*]等待消息。ToexitpressCTRL+C[x]Received'Secondmessage..'[x]Received'Fourthmessage....'默认情况下,RabbitMQ会依次推送消息给下一个消费者,每个消费者都会得到相同的号码平均消息数。这种消息分发机制称为轮询。您可以尝试3个或更多工人。##消息确认(Messageacknowledgment)完成一个任务需要一些事件,你可能想知道当一个消费者开始执行一个长任务但只执行了一部分然后就死掉了会发生什么。使用我们当前的代码,一旦RabbitMQ向消费者发送消息,它会立即从存储中删除该消息。这样,如果您终止进程,我们将丢失正在处理的消息。我们还会丢失发送给流程但尚未处理的消息。但是我们不想丢失任何任务,如果一个进程挂了,我们希望这个任务会被分发给其他进程。为了保证每条消息永不丢失,RabbitMQ支持消息确认。消费者会返回一个ack标志通知RabbitMQ当前消息已经收到并完成,所以RabbitMQ可以删除这个任务。如果一个消费者挂了(通道关闭,连接关闭,或者TCP连接丢失)没有发送ack标志,RabbitMQ会明白这个任务还没有完成,并且会把它放回队列中,如果有其他消费者在线,这条消息会很快发送给其他消费者。这样你就可以保证不会丢失任何消息,即使进程只是偶尔死掉。不管消息处理是否超时,RabbitMQ只会在消费者挂掉的时候重新分发消息。这对于那些需要长时间处理的消息也有好处(补充:不会判断为noack,而是重新分发)前面的例子中,消息确认是关闭的,是时候打开了,使用{noAck:false}(您也可以删除此操作选项)选项,当我们完成此任务时发送正确的消息确认。ch.consume(q,function(msg){varsecs=msg.content.toString().split('.').length-1;console.log("[x]收到%s",msg.content.toString());setTimeout(function(){console.log("[x]Done");ch.ack(msg);},secs*1000);},{noAck:false});使用这样的代码,您可以确保即使您在进程仍在处理消息时使用CTRL+C终止进程,也不会丢失任何数据。进程挂起后,所有未确认的消息将被重新分发。##Forgettoacknowledge这是一个常见的错误,缺少ack。只是一个简单的错误,后果非常严重。当客户端停止时,消息将被重新分配(就像随机一样),但是当RabbitMQ无法摆脱任何未确认的消息时,它会使用越来越多的内存。要调试此类错误,您可以使用`rabbitmqctl`输出未确认的消息字段:$sudorabbitmqctllist_queuesnamemessages_readymessages_unacknowledged列出队列...hello00...done。消息持久化我们学会了保证,仍然保证进程挂掉的时候任务不会丢失,但是当RabbitMQ服务停止的时候我们的任务还是会丢失。当RabbitMQ退出或崩溃时,队列和消息都会丢失,除非你告诉它不要丢失。为了消息不丢失,需要保证两点,我们需要持久化队列和消息。首先,我们需要让RabbitMQ不丢失队列。为此,我们必须首先声明ch.assertQueue('hello',{durable:true});虽然这个操作是正确的,但是在我们当前的配置中它并没有起作用,因为我们已经定义了一个名为hello的非持久化队列。RabbitMQ不允许您更改现有队列的参数。如果这样做,程序将返回一个错误。但是有一个快速的方法——让我们定义一个新的队列,叫做task_queuech.assertQueue('task_queue',{durable:true});消费者和生产者都需要使用这种持久的选择。至此,我们可以保证RabbitMQ重启时task_queue队列不会丢失。现在我们需要持久化消息——使用持久化Channel.sendToQueue选项,ch.sendToQueue(q,newBuffer(msg),{persistent:true});注意:消息持久化并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是总是会出现RabbitMQ收到消息但还没有处理的情况。另外,RabbitMQ并没有实现每条消息的帧同步,可能只是写入了缓存,还没有写入磁盘。不能完全保证消息的持久化,但是已经远远满足我们简单工作队列的需求了。如果需要更强的持久性保证,可以使用[publisherconfirms](https://www.rabbitmq.com/confirms.html)。公平调度你可能已经注意到,当前的调度并不是我们想要的,例如:在两个worker的情况下,当所有奇数消息都重,偶数消息都轻时,那么就会有一个worker会忙一直,而另一个工人几乎不会工作。RabbitMQ不知道这些情况,只知道连续均匀地分发消息。这样做的原因是RabbitMQ只在消息进入队列时才分发消息,而不管消费者的未确认消息有多少,它只是盲目地将第N条消息分发给第N个消费者。为了解决这个问题,我们使用prefetch的方法,将值设置为1,这意味着RabbitMQ不会同时向一个worker发送多条消息,也就是只有在worker完成并发送ack标志。否则,RabbitMQ会将消息发送给下一个不忙的worker.ch.prefetch(1);注意队列的大小如果所有工作人员都很忙,您的队列可能会填满,您可能需要一个监视器,或者添加更多工作人员,或者有其他解决方案。整合最后的new_task.js的代码:#!/usr/bin/envnodevaramqp=require('amqplib/callback_api');amqp.connect('amqp://localhost',function(err,conn){conn.createChannel(function(err,ch){varq='task_queue';varmsg=process.argv.slice(2).join('')||“HelloWorld!”;ch.assertQueue(q,{durable:true});ch.sendToQueue(q,newBuffer(msg),{persistent:true});console.log("[x]Sent'%s'",msg);});setTimeout(function(){conn.close();process.exit(0)},500);});new_task.jssourceworker.js的代码:#!/usr/bin/envnodevaramqp=require('amqplib/callback_api');amqp.connect('amqp://localhost',function(err,conn){conn.createChannel(function(err,ch){varq='task_queue';ch.assertQueue(q,{durable:true});ch.prefetch(1);console.log("[*]正在等待%s中的消息。要退出请按CTRL+C",q);ch.consume(q,function(msg){varsecs=msg.content.toString().split('.').length-1;console.log("[x]收到%s",msg.content.toString());setTimeout(function(){console.log("[x]Done");ch.ack(msg);},secs*1000);},{noAck:false});});});worker.js源码使用了消息确认和预处理,你可以设置一个worker队列持久化选项,这样在RabbitMQ重启reserve的时候可以接收到消息。要获得有关Channel方法和消息属性的更多信息,您可以浏览amqplib文档现在我们可以转到第3章并学习如何将相同的消息分发给多个消费者。译文:Joursion日期:2016/12/25欢迎交流学习。
