RabbitMQ官方教程https://www.rabbitmq.com/tuto...是基于回调的。下面给出基于Promise的写法。并实现动态队列绑定初始化配置constamqp=require('amqplib')//rabbitMQ地址const{amqpAddrHost}=require('../config/index.js')//switchnameconstex='amq.topic'constamqpAddr=`amqp://${amqpAddrHost}`//读取HOSTNAME,运行多实例时,比如k8s中,HOSTNAME可以获取当前pod的名字//多实例时,写日志,或者创建连接的时候最好带上pod名称。如果有问题,最好定位到是哪个pod出了问题。consthostName=process.env.HOSTNAME//队列属性设置//一般来说,最好设置队列自动删除autoDelete。当链接断开时,队列也会被删除,这样不会有很多无用的队列//durable是用来持久化的,最好设置为非持久化constqueueAttr={autoDelete:true,durable:false}//定义通道的引用。链接建立后,所有方法都可以参考CH获取通道方法letCH=nullFunctiontosendmessagetoqueue//FunctiontosendmessagetoqueuefunctionpublishMessage(msg){if(!CH){return''}msg=JSON.stringify(msg)//指定exchangeex,routingkey,消息内容CH.publish(ex,eventBusTopic,Buffer.from(msg))}当rabbitMQ的链接断开时,需要activelyreconnectfunctionreconnectRabbitMq(){log.info('reconnect_rabbit_mq')connectRabbitMq()}连接rabbitMQ的主要函数functionconnectRabbitMq(){amqp.connect(amqpAddr,{//设置connection_name的属性,可以看到是哪个instancetheconnectionisfromontheUIofrabbitMQconsoleclientProperties:{connection_name:hostName}}).then((conn)=>{log.info('rabbitmq_connect_successd')//确保添加链接错误事件处理,否则一次报错,不处理错误,程序会崩溃//error是一个特殊的事件,必须处理//如果报错,直接去重连conn.on('error',(err)=>{log.error('connect_error'+err.message,err)reconnectRabbitMq()})//创建通道returnconn.createChannel()}).then((ch)=>{CH=ch//初始化交换ch.assertExchange(ex,'topic',{durable:true})//初始化一个队列,使用hostName作为队列名,从列名更容易知道是哪个实例创建了队列returnch.assertQueue(hostName,queueAttr)}).then((q)=>{//可以在队列初始化后立即绑定路由键,也可以暂时不绑定,再动态绑定//CH.bindQueue(q.queue,ex,'some.topic.aaa')//消费或者,获取消息CH.consume(q.queue,(msg)=>{var_msg=msg.content.toString()varMSG=JSON.parse(_msg)log.info(_msg,MSG)},{noAck:true})}).catch((err)=>{console.log(err)})}动态绑定或解绑路由keyfunctiontoggleBindQueue(routingKey,bind){returnnewPromise((resolve,reject)=>{if(!CH){log.error('channelnotestablished')reject(newError('channelnotestablished'))return''}//初始化队列。如果队列已经存在,则直接使用CH。assertQueue(`${hostName}`,queueAttr).then((q)=>{//如果绑定为真,绑定,否则解除绑定if(bind){log.info(`bindQueue${hostName}${topic}`)returnCH.bindQueue(q.queue,ex,topic)}else{returnCH.unbindQueue(q.queue,ex,topic)}}).then((res)=>{resolve()}).catch((err)=>{reject(err)log.error(err)})})}module.exports={connectRabbitMq,to??ggleBindQueue,publishMessage}使用该方法使用Express添加您的服务器,然后在app.js中您可以...const{connectRabbitMq}=require('./connect-mq.js')connectRabbitMq()...完整代码//onnect-mq.jsconstamqp=require('amqplib')//rabbitMQ地址const{amqpAddrHost}=require('../config/index.js')//switchnameconstex='amq.topic'constamqpAddr=`amqp://${amqpAddrHost}`//读取HOSTNAME,当运行多个实例时,比如在k8s中,HOSTNAME可以获取当前的podName//写入时logs或者与多个instance建立连接,最好带上pod名称。如果有问题,也最好定位到是哪个pod出了问题。consthostName=process.env.HOSTNAME//队列属性设置//一般来说,最好设置队列自动删除autoDelete。当链接断开时,队列也会被删除,这样不会有很多无用的队列//durable是用来持久化的,最好设置为非持久化constqueueAttr={autoDelete:true,durable:false}//定义通道的引用。链接建立后,所有的方法都可以参考CH获取通道方法letCH=null//函数发送消息到队列functionpublishMessage(msg){if(!CH){return''}msg=JSON.stringify(msg)//指定exchangeex,routingkey,messageCH.publish(ex,eventBusTopic,Buffer.from(msg))}//当与rabbitMQ的链接断开时,需要主动重连函数reconnectRabbitMq(){log.info('reconnect_rabbit_mq')connectRabbitMq()}//链接rabbitMQ的main函数functionconnectRabbitMq(){amqp.connect(amqpAddr,{//设置connection_name的属性,可以看到链接来的是哪个实例fromonrabbitMQconsole的UIclientProperties:{connection_name:hostName}}).then((conn)=>{log.info('rabbitmq_connect_successd')//一定要添加链接错误事件处理,否则一旦报错,如果错误不处理,程序会崩溃//错误是一个必须处理的特殊事件//如果报错,直接去重连conn.on('error',(err)=>{log.error('connect_error'+err.message,err)reconnectRabbitMq()})//创建通道returnconn.createChannel()}).then((ch)=>{CH=ch//初始化交换ch.assertExchange(ex,'topic',{durable:true})//初始化一个队列,队列名为hostName,通过列名更容易知道是哪个实例创建了队列returnch.assertQueue(hostName,queueAttr)}).then((q)=>{//可以在队列初始化后立即绑定路由键,也可以暂时不绑定,再动态绑定//CH.bindQueue(q.queue,ex,'some.topic.aaa')//消费者,获取消息CH.consume(q.queue,(msg)=>{var_msg=msg.content.toString()varMSG=JSON.parse(_msg)log.info(_msg,MSG)},{noAck:true})}).catch((err)=>{console.log(err)})}//动态绑定或解绑路由键functiontoggleBindQueue(routingKey,bind){returnnewPromise((resolve,reject)=>{if(!CH){log.error('channelnotestablished')reject(newError('channelnotestablished'))return''}//初始化队列。如果队列已经存在,则直接使用CH.assertQueue(`${hostName}`,queueAttr).then((q)=>{//如果绑定为真,绑定,否则解除绑定if(bind){log.info(`bindQueue${hostName}${topic}`)returnCH.bindQueue(q.queue,ex,topic)}else{returnCH.unbindQueue(q.queue,ex,topic)}}).then((res)=>{resolve()}).catch((err)=>{reject(err)log.error(err)})})}module.exports={connectRabbitMq,to??ggleBindQueue,publishMessage}
