当前位置: 首页 > 科技观察

选择Redis做MQ的人,是不是脑子里缺一根弦?

时间:2023-03-20 21:02:52 科技观察

1。PreliminaryTips在上一篇文章:《RocketMQ消息中间件用起来真的可靠吗?》中,我们分析了ack机制(deliverytag机制)的底层实现原理,以及处理失败触发消息重发时如何消除nack机制。通过这个,大家对消费者端保证数据不丢失的解决方案有了进一步的了解。在本文中,我们将对ack底层的deliverytag机制进行更深入的分析,让大家更透彻的了解它。面试的时候,如果被问到消息中间件数据不丢失的问题,可以深入底层给面试官分析一下。2.unack消息的积压首先要介绍一下RabbitMQ的prefetchcount的概念。看了上一篇文章大家应该知道,对于每一个channel(其实对应的是一个消费者服务实例,大家可以大致这样想),RabbitMQ在投递一条消息的时候,都会给这条消息投递一个deliverytag唯一标识消息传递。那么我们ack的时候,也会带上这个deliverytag,基于同一个channel去ack,deliverytag会在ack消息中携带,让RabbitMQ知道哪个消息delivery被ack了,然后我们就可以查看那个消息了消息已删除。先来看一张图,帮助大家回忆一下这个发货标签的概念。所以你可以想一下,对于每一个channel(你认为是针对每一个消费者服务实例,比如存储服务实例),其实都有一些unack状态的消息。例如,RabbitMQ正在向通道传递消息。这个时候消息肯定是unack状态吧?然后,存储服务收到消息后,需要时间来处理消息。这个时候消息肯定是unack状态吧?同时,即使你执行了ack,你也需要知道ack默认是异步执行的,尤其是你启用了batchack,在ack被ack之前会有一个延迟。这个时候消息也是unacked吧?那么想一想,RabbitMQ是否可以无限制的向你的消费者服务实例推送消息呢?显然不是,如果RabbitMQ向你的消费服务实例推送的消息太多太快,比如一个消费服务实例的内存中积压了上千条消息。所以这个时候,这几千条消息都处于unack状态,已经积压了。会不会导致消费服务实例内存溢出?内存消耗太高?甚至出现内存泄漏等问题?因此,RabbitMQ必须考虑消费者服务的处理能力。看下图,感受下如果consumer服务实例的内存中积压消息过多,全部处于unack状态会怎样。3.如何解决unack消息的积压RabbitMQ基于预取计数来控制unack消息的数量。您可以使用“channel.basicQos(10)”方法设置当前通道的预取计数。例如设置为10,则表示当前通道的unack消息数不能超过10条,以免消费者服务实例积累过多的unack消息。在这种情况下,意味着RabbitMQ正在向通道传递的unack消息,消费者服务正在处理的unack消息,以及异步ack之后还没有完成ack的unack消息,所有这些消息加起来,而一个channel不能超过10个。如果想要简单粗暴的理解,可以大致理解为这个prefetchcount代表了一个consumerservice最多可以同时获取多少条消息进行处理。所以这里也指出prefetch这个词的意思。Prefetch是预取的意思,就是你的消费者服务实例预取多少条消息进行处理,但是同时最多只能处理这么多条消息。如果一个channel中的unack消息数量超过了prefetchcount指定的数量,此时RabbitMQ将停止向该channel投递消息,必须等待已经投递的消息被ack后再继续投递下一条消息。老规矩,我给大家看一张图,大家看看这个东西是什么意思。4.高并发场景下内存溢出的问题好!现在大家对ack机制底层的另一个核心机制也有了深刻的认识:prefetch机制。此时,我们应该考虑一个问题。如何设置这个预取计数?把这个东西设置太大或太小有什么影响?其实看懂上图就可以很好的理解这个问题了。如果我们把prefetchcount设置的很大,比如3000、5000,甚至100000,这样一个特别大的值,这时候会发生什么呢?这时候在高并发、高流量的场景下,消费服务的内存可能会很快被消耗掉。因为如果MQ接收的流量特别大,每秒几千条消息,这时候你的consumerservice的prefetchcount设置的非常大,可能会导致你的consumerservice接收到prefetch指定的消息数伯爵到了。举个例子,突然你的consumerservice内存积压了10万条消息,都是unack状态。无论如何,您的预取计数设置为100,000。那么对于一个channel来说,RabbitMQ在unack状态下最多可以容忍10万条消息,而在高并发下,消费者服务的内存中可能会积压最多10万条消息。那么这时候的结果就是消费者服务直接被销毁,内存溢出,OOM,服务宕机,然后大量的unack消息会重新投递给其他消费者服务。这个时候其他的消费服务也是一样的情况直接关闭,最后造成雪崩效应。所有的消费者服务都停止了,因为它们无法处理如此大量的数据。看看下面的图片,亲身体验一下气氛。5.低吞吐量问题那么反过来的话,如果我们把prefetchcount设置成一个很小的值呢?比如我们设置prefetchcount为1?这将不可避免地导致消费者服务的吞吐量极低。因为即使你处理完一条消息,ack的执行也是异步的。举个例子,如果你的prefetchcount=1,RabbitMQ将在unack状态下最多向你传递1条消息。此时,比如你刚刚处理完这条消息,然后执行ack的那行代码。不幸的是,ack需要异步执行,即RabbitMQ需要100ms才能感知到。然后RabbitMQ在100ms后感知到消息被ack了,此时会投递下一条消息给你!这很尴尬。在这100ms的时间里,你的消费者服务什么都没做吗?这不直接导致你的consumerservice处理消息的吞吐量下降10倍,甚至一百倍,一千倍吗,有这种可能!看看下面的图片,感受一下低吞吐量的场景。6、合理设置prefetchcount所以针对以上两种极端情况,RabbitMQ官方的建议是prefetchcount一般设置在100~300之间。即一个consumerservice最多可以接收100~300条消息进行处理,允许处于unack状态。这种状态既可以考虑吞吐量,也可以考虑高吞吐量,不容易造成内存溢出问题。但实际上,在我们的实践中,你完全可以自己测试prefetchcount。比如慢慢调整这个值,不断增加,观察高并发大流量下吞吐量是否在增加,观察消费服务的内存消耗是否会导致OOM、FullGC频繁等问题。7.阶段性总结其实通过最近的文章,对于消息中间件的消费者端如何保证数据不丢失的问题已经分析的很透彻了。如果是基于RabbitMQ做消息中间件,在消费者端的代码中必须考虑三个问题:手动ack,处理失败的nack,合理设置prefetchcount。这三个问题背后涉及各种机制:自动ack机制、deliverytag机制、ack批量和异步提交机制、消息重发机制、手动nack触发消息重发机制,prefetchcount过大导致内存溢出问题,prefetchcount过小导致excessivethroughput我们已经一步步分析了这些底层机制和问题。所以到此为止,只谈消费端数据不丢失的技术方案,相信大家在面试的时候都能有一整套的理解和解决方案来说明。