的应用范围是服务访问量的激增。可能有各种外部调用或内部问题导致消息积压,访问服务超过服务所能处理的最大值。尖峰,导致系统超时并因此崩溃。业务场景包括我们日常生活中的一些消费场景,比如火车票、机票、车票等。一般来说,这些服务下单后,会异步通知后续的售票结果。如果服务本身只支持每秒1000的访问量,由于外部服务的原因突然增加到每秒2000并发。此时,由于流量激增,服务接收者超过了自身系统所能处理的最大峰值。如果没有对消息进行限流措施,那么这段时间系统将不可用。这在生产环境中是一个非常严重的问题。实际的应用场景不仅限于这些。本文使用RabbitMQ来讲解如何限制消费者的流量。RabbitMQ,消费端限流机制,提供服务质量(QOS)功能,为通道(channel)预先设置一定数量的消息,每次发送的消息数量以预先设置为准数字。此时服务端不会发送新的消费消息,直到消费者对消息进行完全确认。注意:此时消费者不能设置自动签到,否则无效。RabbitMQv3.3.0之后,限制放宽了。除了通道设置外,还可以对每个消费者进行设置。下面是Node.js开发语言amqplib库提供的限流实现的接口方法prefetchexportinterfaceChannelextendsevents.EventEmitter{prefetch(count:number,global?:boolean):Promise;...}prefetchparameters说明:number:每次推送给消费者的N条消息数。如果这N条消息都没有被确认,生产者将不会再次推送,直到这N条消息被消费。global:在哪个级别限制,true表示在channel上限制,false表示在consumer端限制,默认为false。建立生产端生产端没有变化。这与正常声明相同。源码见rabbitmq-prefetch(Node.js客户端版Demo)constamqp=require('amqplib');asyncfunctionproducer(){//1.创建链接对象constconnection=awaitamqp.connect('amqp://localhost:5672');//2.获取通道constchannel=awaitconnection.createChannel();//3.声明参数constexchangeName='qosEx';constroutingKey='qos.test001';constmsg='生产者:';//4.声明交换awaitchannel.assertExchange(exchangeName,'topic',{durable:true});for(leti=0;i<5;i++){//5.发送消息awaitchannel.publish(exchangeName,routingKey,Buffer.from(`${msg}${i}message`));}awaitchannel.close();}producer();建立消费者constamqp=require('amqplib');asyncfunctionconsumer(){//1.创建链接对象constconnection=awaitamqp.connect('amqp://localhost:5672');//2。获取通道constchannel=awaitconnection.createChannel();//3.声明参数constexchangeName='qosEx';constqueueName='qosQueue';缺点troutingKey='qos.#';//4.声明交换并绑定列awaitchannel.assertExchange(exchangeName,'topic',{durable:true});等待channel.assertQueue(queueName);等待频道。bindQueue(queueName,exchangeName,routingKey);//5.限流参数设置awaitchannel.prefetch(1,false);//6.限流,noAck参数必须设置为falseawaitchannel.consume(queueName,msg=>{console.log('Consumer:',msg.content.toString());//channel.ack(msg);},{noAck:false});}消费者();consumer中unacknowledgedmessage条件测试我们暂时把channel.ack(msg)注释掉,分别启动producer和consumer,看看会发生什么?如上图,按照预先设置的一条消息一共发送了5条消息,因为我注释掉了channel.ack(msg),服务端没有收到ack就不会发送剩余的Ready消息确认确认消息测试修改消费者代码,打开确认消息评论,重启消费者测试awaitchannel.consume(queueName,msg=>{console.log('Consumer:',msg.content.toString());ack(msg);//打开评论},{noAck:false});如上图所示,Unacked为0,所有消息消费成功。RabbitMQ限流使用总结限流在我们实际工作中还是很有意义的。生产方的使用没有变化。重点是在消费者方面。重点关注以下两点:限流ack不能设置为自动签名,修改{noAck:false}增加限流参数设置channel.prefetch(1,false)资料个人博客:Node.js技术栈RabbitMQ系列:RabbitMQ进阶消息队列系列文章持续更新项目源码:rabbitmq-prefetch(Node.js客户端版Demo)作者:JunMay链接:https://www.imooc.com/article...来源:MOOC