作者:海翔\来源:www.cnblogs.com/haixiang/p/10905189.html1、我们为什么假设一个限流场景?首先,我们的Rabbitmq服务器积压了数万条未处理的消息,我们随便开一个消费者客户端,会出现这样的情况:瞬间推过来大量的消息,而我们的单客户端无法处理同时处理这么多数据!当数据量特别大的时候,我们限制生产端的流量肯定是不科学的,因为有的时候并发量特别大,有的时候并发量特别小。我们不能限制生产端,这是用户的行为。因此,我们应该限制消费的流量,以保持消费的稳定。当消息数量激增时,很可能造成资源耗尽,影响服务性能,导致系统卡顿甚至直接崩溃。2.限流API说明RabbitMQ提供了一个qos(服务质量保证)功能,即在消息非自动确认的前提下,如果消息达到一定数量(通过根据consume设置qos的值orchannel)之前没有确认过,不消费新消息。/***请求特定的“服务质量”设置。*这些设置对服务器*在需要确认之前交付给消费者的数据量施加了限制。*因此它们提供了一种由消费者发起的流量控制方式。*@paramprefetchSize服务器将传送的最大内容量(以*八位字节为单位),如果无限制则为0*@paramprefetchCount服务器将传送的最大消息数,如果无限制则为0*@paramglobal如果设置应用于*整个通道而不是每个消费者*@throwsjava.io.IOException如果遇到错误*/voidbasicQos(intprefetchSize,intprefetchCount,booleanglobal)throwsIOException;prefetchSize:0,单条消息大小限制,0表示不限制prefetchCount:一次消费的消息数。它会告诉RabbitMQ不要同时向一个消费者推送超过N条消息,即一旦有N条消息没有ack,消费者就会阻塞,直到有消息ack。global:true,false是否将上面的设置应用到channel上,简单来说就是上面的限制是在channel层面还是在consumer层面。当我们设置为false时才生效,设置为true时没有限流功能,因为通道电平还没有实现。注意:prefetchSize和global这两项在rabbitmq中没有实现,所以我们暂时不去研究。特别注意prefetchCount只有在no_ask=false时才生效,即这两个值在自动响应的情况下是无效的。3.如何对consumer进行限流第一步,既然要使用consumer的限流,就需要关闭automaticack,设置autoAck为falsechannel.basicConsume(queueName,false,consume);第二步我们设置具体的限流大小和数量。channel.basicQos(0,15,false);第三步,在消费者的handleDelivery消费方法中手动ack,设置批处理ack响应为truechannel.basicAck(envelope.getDeliveryTag(),true);这是生产端的代码,前面章节的生产端代码没有改动,主要操作都集中在消费者端。importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassQosProducer{publicstaticvoidmain(String[]args)throwsException{//1.创建一个ConnectionFactory并设置ConnectionFactoryfactory=newConnectionFactory();factory.setHost("localhost");factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("客人");//2.通过连接工厂创建连接Connectionconnection=factory.newConnection();//3。通过ConnectionChannel创建Channelchannel=connection.createChannel();//4。声明StringexchangeName="test_qos_exchange";StringroutingKey="item.add";//5。SendStringmsg="thisisqosmsg";for(inti=0;i<10;i++){Stringtem=msg+":"+i;channel.basicPublish(exchangeName,routingKey,null,tem.getBytes());系统tem.out.println("发送消息:"+tem);}//6。关闭连接channel.close();connection.close();}}这里我们创建一个consumer,通过下面的代码验证限流效果,并且全局参数设置为true时不起作用。我们使用Thread.sleep(5000);减慢ack处理消息的过程,这样我们可以从后台管理工具清楚的观察到限流情况。importcom.rabbitmq.client.*;importjava.io.IOException;publicclassQosConsumer{publicstaticvoidmain(String[]args)throwsException{//1.创建一个ConnectionFactory并设置它ConnectionFactoryfactory=newConnectionFactory()factory.setHost("localhost");factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("客人");factory.setAutomaticRecoveryEnabled(真);factory.setNetworkRecoveryInterval(3000);//2。通过连接工厂创建连接Connectionconnection=factory.newConnection();//3。通过Connection创建ChannelfinalChannelchannel=connection.createChannel();//4。声明StringexchangeName="test_qos_exchange";StringqueueName="test_qos_queue";StringroutingKey="item.#";渠道。exchangeDeclare(exchangeName,"topic",true,false,null);渠道。queueDeclare(queueName,true,false,false,null);channel.basicQos(0,3,false);//一般不需要代码绑定,channel.queueBind(queueName,exchangeName,routingKey);//5。创建消费者并接收消息Consumerconsumer=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{try{Thread.sleep(5000);}catch(InterruptedExceptione){e.printStackTrace();}Stringmessage=newString(body,"UTF-8");System.out.println("[x]收到'"+message+"'");channel.basicAck(envelope.getDeliveryTag(),true);}};//6。设置通道消费者绑定队列channel.basicConsume(queueName,false,consumer);channel.basicConsume(queueName,false,consumer1);}}我们从下图发现,Unacked值一直是3,每5秒消费一条消息,即Ready和Total都减3,这里的Unacked值代表消费者正在处理的消息.通过我们的实验,我们发现消费算子一次最多可以处理3条消息,达到了预期的消费者限流功能。当我们将voidbasicQos(intprefetchSize,intprefetchCount,booleanglobal)中的global设置为true时,发现没有限流作用。近期热点文章推荐:1.1000+Java面试题及答案(2022最新版)2.厉害了!Java协程来了。..3.SpringBoot2.x教程,太全面了!4、SpringBoot2.6正式发布,一大波新特性。.5.《Java开发手册(嵩山版)》最新发布,赶快下载吧!感觉不错,别忘了点赞+转发!
