当前位置: 首页 > 后端技术 > Node.js

关于RabbitMQ的三四事

时间:2023-04-04 01:07:19 Node.js

数据持久化对于一个非常健壮和稳定的后台系统,我们必须考虑各种宕机情况:物理宕机,应用崩溃等,这时候我们的应用需要数据仍然不丢失重启后。这个问题就是数据持久化,也就是数据持久化到磁盘。在RabbitMQ中,如果我们要保证消息发送到broker,首先需要实现三个持久化交换器(exchanges):声明时开启持久化选项的队列(queue):声明时开启持久化选项的消息声明:delivery_mode设置为2(php、python等库,2可以换成更友好的常量),在node的amqp.node库中,persistent设置为true。需要注意的一点是,持久化会造成性能损失(写磁盘操作),但是为了保证生产环境中的数据一致性,我们必须这样做。发送消息的确认机制其实实现了以上三点,数据还是有可能丢失,因为客户端成功调用api存储消息后,RabbitMQ需要一段时间(很短,但不可忽略)放在磁盘上。RabbitMQ不会对每条消息进行fsync处理,它可能只保存在缓存中而不是物理磁盘,而这段时间RabbitMQbroker崩溃,消息保存在缓存中但还没有来得及放置在磁盘上,则这些消息将丢失。为了解决以上问题,我们需要使用RabbitMQ的生产者确认模式。为了启用确认模式,生产者需要将通道设置为确认模式。一旦频道进入确认模式,频道上发布的所有消息都会被分配一个唯一的ID(从1开始)。一旦消息被投递到所有匹配队列后,broker就会向生产者发送一个确认信息(包括消息的唯一ID),让生产者知道消息已经正确到达了目的队列。如果消息和队列是持久的,确认消息会在消息写入磁盘后发送,broker返回给生产者的确认消息中的delivery-tag字段包含确认消息的序号。confirm方式最大的优点就是它是异步的。消息发布后,生产者应用程序可以在等待通道返回确认的同时继续发送下一条消息。当消息最终被确认时,生产者应用程序可以通过回调方法来处理确认消息,如果RabbitMQ由于自身内部错误丢失了消息,它会发送一个nack消息,生产者应用程序也可以处理这个nack消息回调方法中(来自参考文献1)简单确认示例示例代码是使用NodeJS实现的,RabbitMQ服务可以使用前面RabbitMQ的docker-compose.yml两三个东西快速启动constQUEUE_NAME='test_queue'constconfig=require("./config")constamqp=require('amqplib')asyncfunctiongetMQConnection(){returnawaitamqp.connect({协议:'amqp',主机名:config.host,端口:config.port,用户名:config.user,password:config.pass,locale:'en_US',frameMax:0,heartbeat:5,//heartbeatvhost:config.vhost,})}asyncfunctionrun(rmqConn,msgArr){try{const通道=awaitrmqConn.createConfirmChannel()//openconfirmconstexchangeName=`${QUEUE_NAME}_exchange`awaitchannel.assertExchange(exchangeName,'direct',{durable:true,autoDelete:false})//创建一个新的exchangeaif没有交换waitchannel.assertQueue(QUEUE_NAME,{durable:true,autoDelete:false})//如果没有queue,新建一个awaitchannel.bindQueue(QUEUE_NAME,exchangeName,QUEUE_NAME)//绑定exchange//队列名作为路由关键信息forEach(str=>{channel.publish(exchangeName,QUEUE_NAME,Buffer.from(str),{persistent:true,mandatory:true})})awaitchannel.waitForConfirms()console.log('批量数据发送成功')awaitchannel.close()}catch(err){//处理错误console.log('Failedtosendbatchdata:'+err.message)}}asyncfunctiontestSendBatchMsg(){constconn=awaitgetMQConnection()awaitrun(conn,['cat','dog','pig','mouse','mouse','penguin'])awaitconn.close()}testSendBatchMsg()显示assertExchange和assertQueue是保证交换和queues必须存在。这里的交换是简单的直接交换。ConfirmChannel#publish方法不返回promise消费消息的ack机制。现在我们需要考虑我们的消费者。消费者还会遇到程序错误或物理停机问题。RabbitMQ官方也给出了一套解决方案,并且confirm机制类似,也就是ack机制(Messageacknowledgement)。在ack机制中,consumer处理完业务逻辑后,需要发送ack消息,然后broker认为消息已经被正确消费,然后从内存和磁盘中移除,只要consumer的未收到确认,经纪人将保留此消息。如果一个消费者崩溃(掉线)但没有发送ack,broker会明白消息没有处理完全,然后交给另一个消费者重新处理。在这样的机制下,即使一个消费者崩溃了,也不会丢失消息。简单确认示例constQUEUE_NAME='test_queue'constconfig=require("./config")constamqp=require('amqplib')asyncfunctiongetMQConnection(){returnawaitamqp.connect({protocol:'amqp',hostname:config.MQ.host,port:config.MQ.port,username:config.MQ.user,password:config.MQ.pass,locale:'en_US',frameMax:0,heartbeat:5,//心跳vhost:config.MQ.vhost,})}asyncfunctionsleep(ms){returnnewPromise(resolve=>setTimeout(resolve,ms))}asyncfunctionstart(){constmqConn=awaitgetMQConnection()console.log('connectingRabbitMQ成功!')constchannel=awaitmqConn.createChannel()constexchangeName=`${QUEUE_NAME}_exchange`awaitchannel.assertExchange(exchangeName,'direct',{durable:true,autoDelete:false})awaitchannel.assertQueue(QUEUE_NAME,{durable:true,autoDelete:false})awaitchannel.bindQueue(QUEUE_NAME,exchangeName,QUEUE_NAME)channel.consume(QUEUE_NAME,asyncfunction(msg){console.log("Receivedmsg:%sfrom%s",QUEUE_NAME,msg.content.toString())console.log('consumingmessage...')try{awaitsleep(500)//模拟消费消息console.log('consumingends')channel.ack(msg)//消费成功,发送ack}catch(e){console.log('consumingfailed:'+e.message)channel.nack(msg)//消费失败,发送nack}},{noAck:false})//ack}start()注意默认开启自动ack,也就是说,消息发送给Consumers是自动acked的,而很多情况下,我们希望手动ack,所以需要显式设置autoAsk=false来关闭这个机制(例子中noAck:false)ack没有任何timeout限制;只有当消费者断??开连接时,即使处理一条消息需要很长时间,代理也会重新投递。一些问题。amqp.node库提供了检测心跳的功能(heartbeatoption),但是没有做自动重连。对于心跳值,RabbitMQ官网表示,从用户和客户端库维护者多年的反馈来看,低于5秒的值很可能导致误报,1秒或更低的值是很有可能会这样做。5到20秒范围内的值对于大多数环境都是最佳的。因此,心跳不能设置得太低(因为瞬时网络拥塞或流控),太低容易导致误报,根据经验,5s-20s比较合理。参考文章:深入研究RabbitMQ(四):channel的confirm模式when-publishes-are-confirmedChannel-orientedAPI参考

最新推荐
猜你喜欢