1.初步提示在上一篇《??MQ保证读写消息不丢失,这个你都不会就等着被开除吧...??》中,我们初步介绍了之前制定的消息中间件数据技术方案的遗留问题。最大的问题之一是生产者传递的消息可能会丢失。亏损的原因有很多。比如网络中途传输时网络故障导致消息丢失,或者消息投递到MQ内存时,MQ突然出现故障导致消息丢失。针对生产者投递数据丢失的问题,RabbitMQ其实提供了一些机制。比如有一个重量级的机制,就是事务消息机制。使用类事务的机制向MQ投递消息,可以保证消息不会丢失,但是性能极差,经测试性能下降数百倍。所以现在一般不使用这种过于重量级的机制,而是使用轻量级的confirm机制。但是这篇文章并不能直接说明生产者保证消息不丢失的confirm机制,因为这个confirm机制实际上是通过类似于消费者的ack机制来实现的。因此,要深入理解confirm机制,首先要从本文入手,分析消费者手动ack机制保证消息不丢失的底层原理。2.ack机制回顾其实手动ack机制很简单。消费者必须确保自己已经处理完一条消息,然后才能手动向MQ发送ack,MQ收到ack后会删除这条消息。如果消费者没有发送ack,它就挂了。这个时候MQ感知到他宕机了,会重新投递消息给其他的消费者实例。这种机制保证了当消费者实例宕机时,数据不会丢失。3、ack机制的实现原理:deliverytag如果你写了一个消费服务代码,让他开始从RabbitMQ消费数据,那么这个消费服务实例就会把自己注册到RabbitMQ上。因此,RabbitMQ实际上知道存在哪些消费者服务实例。我们看下图,直观感受一下:然后,RabbitMQ会通过其内部的“basic.delivery”方法将消息投递到存储服务,让他消费消息。在投递的时候,这个消息的投递会附带一个重要的东西,就是“投递标签”,你可以把它看成是本次消息投递的唯一标识。这个所谓的唯一标识有点类似于ID,比如本次投递到某个仓储服务实例的消息的唯一ID。通过这个唯一的ID,我们可以定位到一条消息的投递。所以这个deliverytag机制看起来不是很简单,实际上是后面要讲的很多机制的核心基础。而且这里还要给大家强调一个概念,就是每个consumer从RabbitMQ获取信息的时候,都是通过channel这个概念来完成的。让我们回顾一下下面的消费者代码片段。我们首先要和部署在指定机器上的RabbitMQ建立连接,然后通过这个连接获取通道。而大家还有一点印象的话,我们在存储服务中消费messages、ack等操作都是基于这个通道的。通道有点类似于我们与RabbitMQ通信的句柄。比如看看下面的代码:另外,这里有一句话:之前写文章解释手动ack保证数据不丢失的时候,很多人提出疑问:为什么直接上面的代码tryfinally,如果代码出现异常,还是会直接执行finally手动ackin?其实很简单,自己加catch就行了。好的,让我们继续。您大概可以将此通道视为数据传输的管道。对于每一个通道,一个“投递标签”可以唯一标识一次消息投递,这个投递标签大致是一个不断增长的数字。我们看下图,相信会很容易理解:如果使用手动ack机制,实际上存储服务每次消费一条消息,处理完成,派发完成后,它会向RabbitMQ服务器发送一条确认消息。这个ack消息会携带这个消息的deliverytag。下面我们来看一下ack代码。有送货标签吗?channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);那么,RabbitMQ根据哪个通道的哪个deliverytag可以唯一定位到一个消息的投递呢?然后您可以删除该消息并将其标记为已处理。这里大家要注意一点,deliverytag只是唯一标识一个频道内的消息投递。因此,当您确认一条消息时,它必须通过接收消息的同一渠道完成。看看下面的图片,会有一个直观的感受。其实这里还有一个很重要的点,就是我们可以设置一个参数,然后批量向RabbitMQ发送ack消息,这样可以提高整体的性能和吞吐量。例如,在下面的代码行中,将第二个参数设置为true。channel.basicAck(delivery.getEnvelope().getDeliveryTag(),true);看到这里大家应该对ack机制的底层原理有了更深的理解。至少你知道deliverytag是什么,它是实现ack的底层机制。那么,我们简单回顾一下自动ack和手动ack的区别。其实默认使用automaticack,很简单。只要RabbitMQ向仓储服务投递消息,它就会立即将消息标记为已删除,因为它不关心仓储服务是否收到或处理了它。所以在这种情况下,性能是好的,但是数据容易丢失。如果手动ack,则必须等待仓储服务完成货物的调度和发货后,再手动将ack发送给RabbitMQ。这时RabbitMQ会认为消息已经处理完毕,然后将消息标记为已删除。这样在发送ack之前,如果仓储服务宕机,RabbitMQ会重新发送消息给另一个仓储服务实例,保证数据不丢失。4、RabbitMQ是如何感知仓储服务实例宕机的?之前也有同学提出过这个问题,但其实要搞清楚这个问题,不需要深入探究底层。你只需要想一想,粗略地推测一下。如果你的仓储服务实例收到了消息,但是没来得及安排投递,也没有发送ack,这时候就宕机了。仔细想想我们就知道,既然RabbitMQ之前已经收到了存储服务实例的注册,那他们之间肯定是有某种联系的。一旦一个仓储服务实例宕机,RabbitMQ必然会感知到它的宕机,将所有发给他的未被确认的消息发送给其他仓储服务实例。所以我们以后可以就这个问题进行深入的交流。在这里,大家其实可以先建立这个认识。我们回头看下面的架构图:5.存储服务处理失败时重发消息首先我们看下面这段代码:如果一个存储服务实例处理消息失败,此时会进入catch代码block,那么这个时候怎么办呢?还是直接ack消息?当然不是,如果你还是ack,会导致消息被删除,但是实际的调度和投递还没有完成。这样的话,数据不还是丢失了吗?因此,合理的做法是使用nack操作。就是通知RabbitMQ自己没有处理成功的消息,然后让RabbitMQ将消息投递给其他的仓储服务实例,尝试完成调度投递的任务。我们只需要在catch代码块中添加以下代码:channel.basicNack(delivery.getEnvelope().getDeliveryTag(),true);注意上面第二个参数为true,意思是让RabbitMQ把消息重新投递给其他的仓储服务实例,因为我没处理成功。如果你设置为false,会导致RabbitMQ知道你处理失败了,但还是删除了这条消息,这是错误的。同样的,来一张图,大家一起来感受一下:6.阶段总结本文进一步分析了之前的ack机制,包括底层的deliverytag机制,以及消息处理失败时的消息重发。通过ack机制、消息重发等这套机制的实现,可以保证在消费者服务自身突然宕机、消息处理失败等场景下不会丢失数据。
