当前位置: 首页 > 后端技术 > Node.js

[翻译]RabbitMQ教程(三)----'Pub-Sub'(Javascript)

时间:2023-04-03 19:23:26 Node.js

发布与订阅(Publish/Subscribe)在前面的章节中,我们创建了工作队列,假设前面的工作队列是每个任务只分配给一个工人。在本节中,我们将做一些完全不同的事情——向多个消费者发送一条消息,这种模式称为“发布/订阅”(publish/subscribe)。例如,我们要构建一个简单的日志系统。由两个程序组成——一个用于发出日志消息,另一个用于接收和显示它们。在我们的日志系统中,每个正在运行的接收器都会收到一条消息。这样,我们可以运行一个接收器并将日志定向到磁盘,然后运行另一个接收器,看看它是否会在屏幕上显示日志。事实上,发布的消息是广播给所有接收者的。交换在之前的指南中,我们从队列发送和接收操作。是时候介绍Rabbit中的整体消息传递模型了。让我们快速回顾一下我们之前学到的东西。生产者是发送消息的应用程序。队列是存储消息的缓冲区。消费者是接收消息的应用程序。在RabbitMQ中,消息模型的核心思想是生产者永远不会直接向队列发送消息。事实上,生产者通常不知道消息是否已经发送到任何队列。生产者只能向交易所发送消息。开关是一件简单的事情。它一方面接收来自生产者的消息,另一方面将它们推入队列。交易所必须知道在收到消息时如何处理消息。你想追加到一个特殊的队列吗?你想追加到很多队列吗?还是放弃这条消息?这些规则被定义为交换类型。可以使用以下交换类型:直接、主题、标头、扇出。介绍一下最后的--fanout。我们先创建一个扇出型交换器“logs”:ch.assertExchange('logs','fanout',{durable:false})扇出型交换器很简单,我们光看名字就可以猜到,就是广播它接收到所有已知队列的消息。这就是我们的记录器所需要的。要列出所有交易所(上市交易所),您可以使用rabbitmqctl$sudorabbitmqctllist_exchangesListingexchanges...directamq.directdirectamq.fanoutfanoutamq.headersheadersamq.matchheadersamq.rabbitmq.logtopicamq.rabbitmq.tracetopicamq.topicanfanouttopiclog..done.列表中默认创建了一些amq.*exchange和一些default(unnamed),但是你可能用不到未命名exchange(Namelessexchange)在前面的章节中我们没有提到exchange,但是我们还是可以通过到队列的消息,这就是我们使用默认交换的原因,因为我们使用空字符串("")。在我们发布这样的消息之前ch.sendToQueue('hello',newBuffer('HelloWorld!'));这里我们使用默认的或者未命名的exchange,如果第一个参数存在,消息会被Route到这个参数名的队列中。现在,我们可以使用我们定义的交换ch.publish('logs','',newBuffer('HelloWorld!'));如果第二个参数为空,则表示我们不想将消息推送到指定的队列,只想发布到日志交换。临时队列还记得我们之前使用的声明队列(hello和task_queue)吗?.能够指定队列的名称对我们来说很重要——我们需要将工作人员指向同一个队列。当您想要与消费者和生产者队列共享队列时,为队列命名很重要。但这不是我们的记录器程序所需要的。我们要监控所有日志消息,而不是某些日志消息。此外,我们对正在流动的消息(不是旧消息)感兴趣。我们需要完成两件事:首先,每当我们连接到Rabbit时,我们都需要一个新的空队列。我们可以创建一个随机的队列名称,或者让服务器为我们选择一个随机的队列名称。其次,每当我们与消费者断开连接时,队列都需要自动销毁。在amqp.node客户端中,当我们传入一个字符串时,我们可以创建一个名称为ch.assertQueue('',{exclusive:true});的非持久化队列。该方法返回一个带有随机名称的队列实例的队列,例如amq.gen-JzTY20BRgKO-HjmUJj0wLg。当连接断开时,队列会被销毁,因为当我们声明{exclusive:true}绑定(Bindings)时我们已经创建了一个fanout类型的exchange和一个队列,现在我们需要告诉exchange将消息发送到队列中,队列和交换器之间的关系称为绑定。ch.bindQueue(queue_name,'logs','');现在,日志交换用于将消息附加到我们的队列列出绑定:您猜对了,您可以列出现有的绑定。rabbitmqctl列表绑定。整合(Puttingitalltogether)生产者发送日志消息的程序与前面几章没有太大区别。最重要的变化是,现在我们将消息发布到我们的日志交换,而不是之前不声明使用。在发送的时候,我们需要提供一个routingkey,但是在fanout类型中,这个是可以忽略的。下面是emit_log.js#!/usr/bin/envnodevaramqp=require('amqplib/callback_api');amqp.connect('amqp://localhost',function(err,conn){conn.createChannel(function(err,ch){varex='logs';varmsg=process.argv.slice(2).join('')||'HelloWorld!';ch.assertExchange(ex,'fanout',{durable:false});ch.publish(ex,'',newBuffer(msg));console.log("[x]Sent%s",msg);});setTimeout(function(){conn.关闭();process.exit(0)},500);});(emit_log.js源码)可以看到,在与交易所建立连接后。至关重要的是,禁止发布到不存在的交易所。如果仍然没有队列绑定到交换器,则消息将丢失。但这对我们来说很好,如果仍然没有消费者在听,我们可以安全地丢弃这些消息。receive_logs.js的代码#!/usr/bin/envnodevaramqp=require('amqplib/callback_api');amqp.connect('amqp://localhost',function(err,conn){conn.createChannel(function(err,ch){varex='logs';ch.assertExchange(ex,'fanout',{durable:false});ch.assertQueue('',{exclusive:true},function(err,q){控制台.log("[*]正在等待%s中的消息。要退出请按CTRL+C",q.queue);ch.bindQueue(q.queue,ex,'');ch.consume(q.queue,function(msg){console.log("[x]%s",msg.content.toString());},{noAck:true});});});});(receive_logs,js源码)如果你想保存日志,你可以打开一个控制台,然后输入$./receive_logs.js>logs_from_rabbit.log如果你想在屏幕上看到日志,打开另一个控制台$./receive_logs.js当然,你需要发出日志$。/emit_log.js使用rabbitmqctllist_bindings,可以确定刚才的代码创建了exchange和queue,并且有两个receive_logs.js程序在运行。$sudorabbitmqctllist_bindingsListing绑定...日志交换amq.gen-JzTY20BRgKO-HjmUJj0wLg队列[]日志交换amq.gen-vso0PVvyiRIL2WoV3i48Yg队列[]...完成。这个结果的简要解释:数据从日志交换到两个服务器分配的队列。这也是我们想要的结果。如何收听一些消息?让我们继续下一章。