当前位置: 首页 > 科技观察

解决消息延迟和堆积问题

时间:2023-03-21 01:03:14 科技观察

文章结束本文转载自微信公众号《小菜两集》,作者蔡不才。转载本文请联系小菜良记公众号。消息可靠性问题:如何保证发送的消息至少被消费一次?延迟消息问题:如何实现消息的延迟传递?消息堆积问题:如何解决百万消息堆积无法及时消费的问题?我们在上一篇文章中已经介绍了如何解决消息丢失的问题,即保证消息的可靠性,所以其他两个问题同样重要。在本文中,我们将描述其他两个问题的解决方案~!1.延迟消息延迟消息的字面意思就是让消息延迟接收,那么如何延迟消息的到达呢?这是我们不得不思考和解决的问题。在了解延迟队列之前,我们需要先了解一下RabbitMQ中的两个概念死信开关TTL1)死信开关Deadletter(死信),也就是丢弃死消息,那么在什么情况下普通消息可以变成死信?需要满足以下三个条件:消费者使用basic.reject或basic.nack声明消费失败,并设置消息的requeue参数false消息为过期消息。超时后,无人消费待投递的队列消息已满,最早的消息会变成死信,死信开关就是死信的归属地。如果一个队列配置了dead-letter-exchange属性并指定了一个exchange,则队列中的死信将被投递到这个exchange,这个exchange称为死信exchange——DLX(DeadLetterExchange)步骤:当生产者正常发布到队列(simple.queue)。如果消费者从队列(simple.queue)中消费了消息,但是声明了reject,并且这个队列绑定了死信开关(dl.queue),那么这个时候它就变成死的了。信件的消息将传递到这个死信队列(dl.queue)。死信投递过程是从普通队列-->死信队列的过程。我们必须声明两个关键信息:死信交换机的名称和死信队列绑定的路由键。基本配置。下面我们简单模拟一下条件1产生的场景。1.首先声明一个死信开关和一个死信队列。这里我们使用简单的注解,直接生成一个死信开关和一个死信队列。您可以通过RabbitMQ控制台界面查看它们。已经生成成功2.声明交换机和队列正常使用。那么此时我们就可以创建一个正常使用的交换机和队列,并指定死信交换机。您也可以通过控制台查看创建状态。如果有声明的死信开关,我们可以通过队列的DLX和DLK标志位的判断3.模拟拒绝现在我们通过代码模拟客户端拒绝消息的场景1)消息发送2)消息接收Check控制台,结果如下:2021-11-0623:56:52.095INFO2112---[ntContainer#0-1]c.l.m.c.listener.SpringRabbitListener:normalbusinessswitch|Receivedmessage:[hello]2021-11-0623:56:52.118INFO2112---[ntContainer#1-1]c.l.m.c.listener.SpringRabbitListener:死信开关|Receivedmessage:hello这说明我们的死信开关已经成功运行了2)我们已经成功识别出使用TTL以上的死信开关了,不过这个好像和我们一开始说的延迟队列差别不大关系,不用担心~接下来说的TTL(Time-To-Live)就是用来处理延迟消息的~!在TTL的概念中,如果一个队列中的一条消息在TTL结束后还没有被消费,那么这条消息就会自动成为死信,TTL超时的情况分为两种:消息所在的队列有一个生存时间集。消息本身有一个生存时间。我们也进行上面条件2的模拟场景。1.声明死信开关和死信队列(上面已经完成)2.声明延迟队列并指定死信开关,查看创建结果同控制台,我们发现不仅有DLX和DLK的标志,还有一个TTL,说明这个队列是延迟队列。3.模拟消费超时的情况我们发送消息到延迟队列,并没有消费者消费,等待1分钟看是否可以进入死信队列我们已经发送消息到延迟队列,成功找到一分钟后控制台中的这条信息进入了死信交换2021-11-0700:01:30.854INFO32752---[ntContainer#1-1]c.l.m.c.listener.SpringRabbitListener:死信开关|收到的消息:testttl-message上面配置了队列超时时间,消息本身也可以配置超时时间。当消息和队列都有超时时间时,则使用最短的TTL消息的超时配置如下:如上图所示,我们可以使用Message类来传递消息信息,并设置超时时间时期。我们将其设置为5000毫秒。等待发送成功后,5000ms后控制台也会成功打印死信开关消耗的消息:2021-11-0700:03:09.048INFO39996---[ntContainer#1-1]c.l.m.c.listener.SpringRabbitListener:deadletterswitch|Receivedmessage:thisisattlmessage3)上面我们说的延迟队列是使用死信开关间接达到延迟队列的效果,但其实RabbitMQ不用那么麻烦。RabbitMQ已经为我们打包好了插件。我们只需要下载安装即可~RabbitMQ插件下载地址我们输入地址可以发现里面有很多插件。搜索delay关键字找到我们需要下载的插件,直接上传到RabbitMQ的插件目录-plugins。这里的小菜是临时安装测试使用docker,所以挂载了插件目录:dockerrun-itd--namerabbitmq-vplugins:/plugins-p15672:15672-p5672:5672rabbitmq:management所以我上传了插件直接进入容器中的plugins目录~然后进入容器执行如下命令启用插件rabbitmq-pluginsenablerabbitmq_delayed_message_exchange和我们在控制台创建开关的时候可以看到type类型的选项多了。如果这一步执行成功,则说明RabbitMQ的延迟队列功能已经开启。然后我们可以使用DelayExchange。首先,我们需要了解造成延迟的代码。开关:模式1模式2当一切准备就绪后,我们就可以发送消息了。发送消息时,必须在消息头中携带x-delay参数指定延迟时间。这样配置之后,我们可以在控制台上查看到,经过1delay.queue会在0秒后收到对应的消息,然后被对应的消费者消费。3)总结一下,从死信开关到TTL再到延迟队列,我们??一步步学会了如何实现延迟消息的功能,接下来我们进行一个小总结:问题1:什么样的消息会变成一纸空文?消息被消费者拒绝或nack消息超时,消息队列已满。问题2:消息超时的方式是给队列设置TTL属性,给消息设置TTL属性。3:如何使用延迟队列下载并启用RabbitMQ延迟队列插件声明一个开关,并设置delayed属性为true。发送消息时,加上x-delay头,值为超时时间。问题四:延迟队列的使用场景延迟发送短信通知订单自动取消,库存自动回滚2.惰性队列说完延迟队列,我们??继续了解惰性队列。在说惰性队列之前,我们先问一个问题~RabbitMQ是如何解决消息堆积问题的?什么情况下会出现消息堆积问题?生产者的生产速度远远赶不上消费者的消费速度。当消费者宕机,没有及时重启,这个问题怎么解决?大致思路如下:消费者机器重启后,增加更多的消费者进行处理,在消费者处理逻辑内部开辟线程池,利用多线程提高处理速度,扩大队列容量,增加累加上限是理论上解决消息堆积问题没问题,但是处理方式不够优雅,甚至不够灵活~所以除了上面几种解决这个问题,我们可以使用RabbitMQ自带的一种队列类型——惰性队列。什么是惰性队列?让我们来认识一下惰性队列的几个特点:收到消息后,直接存储在磁盘上,而不是内存中。消费者需要从磁盘中读取消息并加载到内存中。它支持存储数百万条消息。毕竟是利用了磁盘的缓冲机制。这种机制的缺点是消息的时效性会降低,性能会受到限制。磁盘IO,了解了特点和不足之后,我们来看看如何创建惰性队列方法一方法二方法三这种方法是直接根据命令行修改rabbitmqctlset_policyLazy"^lazy将一个运行队列修改为惰性队列-queue$"'{"queue-mode":"lazy"}'--apply-toqueues几个命令参数含义如下:rabbitmqctl:命令行工具set_policy:添加策略Lazy:策略名称,可以自定义^lazy-queue$:使用正则表达式匹配队列的名称'{"queue-mode":"lazy"}':设置队列为惰性模式--apply-toqueues:该策略的对象是所有队列。惰性队列方式虽然缺点是会降低消息的时效性,但在某些场景下也不是不能接受,其优势也很明显:基于磁盘存储,消息上限高无间歇性page-out,性能稳定至此,我们已经描述了RabbitMQ的常见问题。对于我们来说,普通的开发场景可能遇到这些问题的频率比较低,但是没有遇到不代表不存在,所以我们还是要多了解,防患于未然!