当前位置: 首页 > 后端技术 > Java

RabbitMQ高可用是如何保证消息成功消费的

时间:2023-04-01 14:56:22 Java

@[toc]上一篇松哥和大家聊了MQ高可用是如何保证消息成功消费的。各种配置投入战斗,确保消息的成功传递。甚至在某些极端情况下,可能会重复发送同一条消息。不管怎样,消息总算是发出去了。如果你还没有看过上一篇文章,建议你先看一下,然后学习这篇文章:保证RabbitMQ消息传递可靠性的四种策略!你用哪个?今天我们就来说说消息消费的问题,看看如何保证消息消费成功,保证幂等性。1、两种消费方式RabbitMQ消息消费一般有两种不同的思维方式:推送(push):MQ主动推送消息给消费者。该方法要求消费者设置缓冲区来缓存消息。对于消费者而言,内存中总有一堆消息需要处理,所以这种方式效率更高,也是目前大部分应用采用的消费方式。Pull:消费者主动从MQ中拉取消息。这种方式效率不是很高,但是有时候如果服务端需要批量拉取消息,可以使用这种方式。让我举个例子说明这两种方式。先来看push:这种方式比较常见,就是通过@RabbitListener注解来标记消费者,如下:out.println("味精="+味精);}}当监听队列中有消息时会触发该方法。让我们再看看pull:@Testpublicvoidtest01()throwsUnsupportedEncodingException{Objecto=rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);System.out.println("o="+newString(((byte[])o),"UTF-8"));}调用receiveAndConvert方法,方法参数为队列名,方法执行后,会从MQ中拉取一条消息,如果该方法返回null,则表示队列中没有消息。receiveAndConvert方法有一个重载方法,可以在重载方法中传入等待超时时间,比如3秒。此时假设队列中没有消息,receiveAndConvert方法会阻塞3秒。如果3秒内队列中有新消息,则返回。3秒后,如果队列中仍然没有新消息,则返回null。这个等待超时时间如果不设置,默认为0。这是两种不同的消息消费模式。如果需要不断从消息队列中获取消息,可以使用push方式;如果只是简单的消费一条消息,可以使用pull模式。不要让拉取模式陷入死循环,变相订阅消息,这样会严重影响RabbitMQ的性能。2.两种保证消费成功的方法在上一篇文章中,我们尽量保证消息能够发送成功。对于消息消费成功,其实官方已经提供了相关的机制。让我们来看看。为了保证消息能够可靠地到达消息消费者,RabbitMQ提供了消息消费确认机制。消费者在消费消息时,可以指定autoAck参数来表示消息消费的确认方式。当autoAck为false时,即使消费者已经收到消息,RabbitMQ也不会立即删除消息,而是等待消费者明确回复确认信号,然后再将消息标记为已删除,然后再删除。当autoAck为真时,消息消费者会自动将发送的消息设置为确认,然后删除消息(从内存或磁盘),即使消息尚未到达消费者。我们看一张图:如上图所示,在RabbitMQ的web管理页面:Ready表示要消费的消息数。Unacked表示已经发送给消费者但还没有收到消费者确认的消息数。这是我们可以从UI层面观察到的消息消费确认。当我们设置autoAck为false时,对于RabbitMQ来说,消费分为两部分:要消费的消息已经投递给消费者,但是消息还没有被消费者确认。也就是说,当将autoAck设置为false时,消息被正常处理的时候,消费者会变得非常冷静,它会有足够的时间来处理消息。当消息处理正常,再手动ack,那么RabbitMQ就会认为消息消费成功。如果RabbitMQ还没有收到客户端的反馈,此时客户端已经断开连接,那么RabbitMQ会将消息放回队列中,等待下一次消费。总结一下,保证消息被消费成功无非就是手动Ack或者自动Ack,没有别的。当然,无论是这两种,最终都可能导致消息的重复消费,所以一般来说,我们在处理消息的时候也需要解决幂等性的问题。3.消息拒绝当客户端收到消息时,可以选择消费消息或者拒绝消息。下面看看拒绝的方式:@ComponentpublicclassConsumerDemo{@RabbitListener(queues=RabbitConfig.JAVABOY_QUEUE_NAME)publicvoidhandle(Channelchannel,Messagemessage){//获取消息号longdeliveryTag=message.getMessageProperties().getDeliveryTag();try{//拒绝消息channel.basicReject(deliveryTag,true);}catch(IOExceptione){e.printStackTrace();}}}消费者收到消息后可以选择拒绝消费消息,拒绝的步骤分两步:获取消息号deliveryTag。调用basicReject方法拒绝消息。在调用basicReject方法时,第二个参数为requeue,即是否重新入队。如果第二个参数为true,被拒绝的消息会重新进入消息队列,等待下一次消费;如果第二个参数为false,被拒绝的消息将被丢弃。会有新的消费者来消费它。需要注意的是,basicReject方法一次只能拒绝一条消息。4、消息确认消息确认分为自动确认和手动确认,我们分开来看。4.1自动确认先来看一下自动确认。在SpringBoot中,默认情况下,消息消费是自动确认的。我们看下面的消息消费方法:诠释我=1/0;}}通过@Componet注解将当前类注入到Spring容器中,然后通过@RabbitListener注解标记一个消息消费方法。默认情况下,消息消费方法有自己的事务,即如果方法抛出异常,消费的消息会返回队列等待下一次消费。如果方法正常执行,没有抛出异常,则认为消息被消费。4.2人工确认人工确认分为推式人工确认和拉式人工确认两种。4.2.1推送方式下的手动确认要开启手动确认,我们需要先关闭自动确认,关闭方法如下:spring.rabbitmq.listener.simple.acknowledge-mode=manual这个配置表示确认方式消息的更改为手动确认。接下来我们看一下consumer中的代码:try{//message的消费代码写在这里Strings=newString(message.getBody());System.out.println("s="+s);//消费完成后,手动ackchannel.basicAck(deliveryTag,false);}catch(Exceptione){//手动nacktry{channel.basicNack(deliveryTag,false,true);}catch(IOExceptionex){ex.printStackTrace();}}}将消费者必须执行的操作放入try..catch代码块中。如果消息正常消费成功,则执行basicAck完成确认。如果消息消费失败,执行basicNack方法告诉RabbitMQ消息消费失败。这里涉及到两个方法:basicAck:这个是手动确认消息消费成功。该方法有两个参数:第一个参数表示消息的id;如果第二个参数multiple为false,则表示只消费当前消息成功,如果为true,则表示在当前消息之前所有未被当前消费者确认的消息都已经消费成功。basicNack:这个是告诉RabbitMQ当前消息还没有消费成功。该方法有三个参数:第一个参数表示消息的id;如果第二个参数multiple为false,则表示只拒绝消费当前消息;如果为真,则表示在拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数requeue的含义同上,被拒绝的消息是否重新入队。当basicNack中的最后一个参数设置为false时,还涉及到一个死信队列的问题。稍后宋兄会专门写一篇文章和大家一起探讨。4.2.2pull方式的手动确认pull方式的手动ack比较麻烦。Spring封装的RabbitTemplate中没有对应的方法,只好使用原来的方法,如下:publicvoidreceive2(){Channelchannel=rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);长deliveryTag=0L;试试{GetResponsegetResponse=channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME,false);deliveryTag=getResponse.getEnvelope().getDeliveryTag();系统输出。println("o="+newString((getResponse.getBody()),"UTF-8"));channel.basicAck(deliveryTag,false);}catch(IOExceptione){try{channel.basicNack(deliveryTag,false,true);}catch(IOExceptionex){ex.printStackTrace();}}}这里涉及到的basicAck和basicNack方法和前面的一样,这里不再赘述。5、幂等性问题最后说一下消息的幂等性问题。我们想象一下这样的场景:消费者消费一条消息后,向RabbitMQ发送一个ack确认。此时由于网络断开或者其他原因,RabbitMQ没有收到ack,那么此时RabbitMQ不会向RabbitMQ发送消息。消息被删除。当重新建立连接后,消费者还是会再次收到消息,造成消息的重复消费。同时,由于类似的原因,在发送一条消息时,同一条消息可能会被发送两次(详见保证RabbitMQ消息传递可靠性的四种策略!你用的是哪一种?)。各种原因导致我们在消费消息时处理幂等性。处理幂等性问题并不难。基本上都是从业务的角度来处理的。让我谈谈这个想法。使用Redis,在消费者消费消息之前,消息的id现在放在Redis中,存储方式如下:id-0(执行业务)id-1(执行业务成功)。如果ack失败,消息会交给RabbitMQ给其他消费者,先执行setnx,如果key已经存在(说明之前有人消费过这条消息),获取它的值,如果为0,则当前消费者会什么都不做,如果是1,直接ack。极端情况:第一个消费者执行业务时,出现死锁。在setnx的基础上,为key设置一个lifetime。生产者发送消息时,指定messageId。当然,这只是一个简单的想法,供大家参考。宋大哥在vhr项目中也处理了消息幂等的问题。有兴趣的小伙伴可以查看vhr源码(https://github.com/lenve/vhr),代码在mailserver中。6.总结,今天和小伙伴聊了几个关于RabbitMQ消息消费相关的话题。有兴趣的朋友可以实践一下~复制文章标题在公众号后台回复,可以下载这篇文章案例~