当前位置: 首页 > 后端技术 > Java

RabbitMQ延迟消息:死信队列-延迟插件-二合一使用+踩坑笔记+最佳体验

时间:2023-04-02 01:55:29 Java

前言前段时间写过一篇文章:【#RabbitMQ:消息丢失|消息重复|MessageBacklog的原因+解决方案+网上学不到的用户体验】很多人加了我为好友,说很喜欢这篇文章,也问了我一些问题。因为最近工作比较忙,所以有一段时间没有写了。忙完之后,我花时间通过案例梳理了RabbitMQ剩下的一个重要技术点,就是延迟消息的用法。delayedmessage的意思就不解释了,就是字面意思。有两种使用方式,死信队列和延时插件。两者各有优缺点。我将一一说明并给出最佳用法。
死信队列的方法死信队列不要理解成一个很玄乎的东西,就是一个普通的队列绑定一个死信开关,配置参数还是固定的,不用动脑子,如果行得通,你可以把它想象成一个回收站,被拒绝或者超时的消息就丢到这里,然后就可以继续消费了,就这么简单。1.示意图
2.引入MQ3,声明交换机和队列,声明普通的交换机、队列和路由。这里我们声明了两个队列做pre-delay,名字分别是5s和15min,用来区分延迟的消息是否达到预期的效果。我们后面所有的交换机和队列都是直接创建的,也就是点对点的。具体原因以后再说。另外注意,这里注释的延时开关和队列是做特殊说明的。其实还是普通的队列。请参考上面的示意图。创建交换机、队列和绑定关系。聪明的朋友应该能发现,上面的代码只有开关和绑定队列的关系,并没有创建队列。没错,接下来才是重头戏。创建队列时,需要绑定一个死信开关,成为死信队列。可以看出5s和15min队列绑定的是同一个死信交换,只是路由规则和消息过期时间TTL不同。这样,在项目启动后,RabbitMQ会创建两个不同过期时间的死信队列,后面会有截图给大家看。绑定后的效果,项目启动后,RabbitMQ会同时创建开关和队列,在控制台可以看到。普通队列绑定死信开关和对应的路由规则后,我们就可以创建死信开关、路由规则和队列了。其实和创建普通队列没什么区别。
4、yml配置为了方便演示,我们的生产者和消费者写在同一个项目中,所以配置文件没有区别。但是在在线环境中,生产者和消费者往往为了解耦而分开。在这里可以发现我们已经为RabbitMQ开启了消息确认机制。看过文章开头提到的朋友应该知道,我们一般不会开启确认机制,为了提高在线环境下的性能。之所以启用这里是为了演示。消息的传递,还要具体说说后面延迟插件会出现的一个问题。
5.创建生产者这里增加了消息的唯一ID、消息确认机制等写法,只是给大家看一下,其实不用加。
6.创建消费者。这里注意监听队列是我们前面声明的死信队列,因为过期消息是通过绑定的死信开关转发给它的。如果对流程有疑惑,可以回头看图开头的图。
7.测试界面分别创建了延时5s和延时15min的测试界面。
8.效果
延迟插件方式的插件方式比死信队列方式简单很多,只需要安装插件,启动插件功能,然后创建一个延迟队列即可。1、安装插件这里给出了源码安装方式和docker安装方式,大家可以根据自己的情况选择。1)、源码安装及下载插件:https://github.com/rabbitmq/r...本次演示下载的是3.8.0版本插件,兼容3.7.x和3.8.xRabbitMQ直通车:https://github.com/rabbitmq/r...提醒,一定要下载RabbitMQ对应的版本,否则延迟队列不会生效。其中部分版本兼容低版本的MQ,可以点击查看支持的版本。将下载好的插件上传到RabbitMQ插件:rabbitmq_server-3.7.24/plugins,然后启用延迟队列:rabbitmq-pluginsenablerabbitmq_delayed_message_exchange这样就可以愉快的开始使用了。具体操作和图示在下面的docker安装步骤中如图所示,两者的操作没有区别。2)、docker安装首先,还是一样的步骤,将下载好的插件上传到RabbitMQ服务器,因为是docker方式,所以要上传到docker容器中。将文件复制到docker容器:进入容器检查是否上传成功,开启延时插件。打开成功后会出现如下提示效果。重新启动RabbitMQ。不重启不会生效。如果重新启动不起作用,您可能需要终止该进程并重新启动它。打开控制台界面,如果在创建开关的选项中看到x-delayed-message,说明延迟插件安装启动成功。
2、yml配置这里我特地画了红框,因为这个配置对延时插件有作用,可以加也可以不加,后面的注意事项会单独说明。
3.声明交换机和队列。这里只需要注意一点。创建开关时,必须设置为延时开关,即setDelayed(true)。
4.消息处理器工具类我们使用MessagePostProcessorhook来设置持久化模式和延迟时间。项目中可能会在多个地方用到,所以单独提取出来,通过工具类获取。
5.创建producer的写法是固定的,可以直接复制到自己的项目中使用,延迟时间可以自己定义。
6.创建一个消费者直接监听这个延迟队列,没有什么特别的地方。
7.测试界面这里测试6s延时消息是否成功
8.效果可以看到,消息刚过6s就被消费了,延时效果没问题。
陷阱注意事项1.死信队列的坑其实死信队列比插件的坑少很多,但是死信队列不像插件那么简单直接。1)原理一定要明白,不然你连开关和队列的创建和绑定都不知道;2)绑定死信开关时,参数x-dead-letter对应的值一定要写对,尤其是路由,写错会导致过期消息进不去死信队列,找了半天找不到原因;
2.延迟插件的坑延迟插件的坑真的很多。我用死信队列的方法差点就成功了一次,但是插件折腾了我好几轮。1)延时插件的版本一定要下载正确,对应RabbitMQ本身的版本。最好点击进去阅读说明。通常,它会告诉您兼容哪个版本。如果下载错误,我只能为你哀悼;2)、不要下载太高版本的RabbitMQ和插件,你可能会抓狂;3)启用插件后,一定要重启RabbitMQ,否则不会生效,如果重启不生效,可以尝试杀掉MQ进程,然后重新启动;4)启用消息后确认机制,你会发现delay插件的一个特别之处,就是每发送一条消息,都会进入returncallback回调。重点讲解最后一个坑点,不算坑点,但大家在以后使用的过程中可能会有疑惑。还记得上一篇在yml文件中画红框的配置吗?我们把那个参数注释掉看看效果。你会发现一旦开启了消息确认机制,延迟的消息每次都会先进入returnCallback回调,然后才会发送成功。您可以自己尝试,每次都会发生。是什么原因?这里提到了延时插件的原理。从上一篇创建延迟开关可以看出,为开关设置了delayed:true,因为延迟消息实际上是挂载在开关上的,不会立即通过路由下发。出去。那我们再来看看。上图中returncallback回调打印的返回信息:replyText=NO_ROUTE很明显。说没有路由,所以消息确认失败,因为延迟插件没有立即通过路由传递。那么还有一个问题,为什么没有立即投递到returncallback回调呢?下图是一段源码,告诉你为什么。因为有一个必选参数,如果没有配置,就是null。为null时,传递的值为this.properties.isPublisherReturns()。mandatory官方解释如下:Enablemandatorymessages。如果服务器无法将强制消息路由到队列,它将使用Return方法返回不可路由的消息。翻译:启用强制消息。如果服务器无法将强制消息路由到队列,它将使用return方法返回不可路由的消息。因此,如果mandatory为true,则表示启用强制下发,如果为false,则表示不强制下发,该值可以为null。而我们在之前的yml文件中开启了消息确认机制:publisher-returns:true,所以每次都会使用returncallback回调。所以我们要么不开启消息确认机制,要么设置mandatory为false,这样延迟插件的消息就不会每次都经过回调。
最佳体验1)、延迟插件方式更方便,但我首先推荐死信队列方式,因为死信队列在RabbitMQ的使用中占比很大,它不仅可以用于延迟消息,它也可以用在许多其他地方。另外,死信队列是必须要学习的,开箱即用,避免安装插件的风险;2).死信队列其实有一个看不见的延迟消息的bug。存在多个延时场景同时存在时的执行顺序问题。可能会出现15min延时消息在前面,导致5s延时消息要等15min才执行。这是RabbitMQ本身的有序机制导致的,只对队尾的消息进行判断,所以我们在使用的时候一定要做好延迟消息隔离,对不同的延迟场景分别处理;3)、延时插件不存在上述死信队列问题,针对复杂场景做了专门处理,所以当一个项目在项目初期决定使用RabbitMQ时,不管会不会用以后安装的时候一定要顺便安装延时插件,以免在项目进行到一半的时候突然想用,还需要安装插件。重启RabbitMQ带来的未知风险;4)大多数情况下死信队列是完全够用的,但是切记不要在项目中频繁使用延迟队列,基本上会很少有用到的场景,比如定时关闭订单,定时释放锁资源等,都是仅用于对延迟时间精度要求较高的特定场景。其他的主要是基于分布式任务调度。过多的延迟队列会造成不必要的认知混乱;5)、个人经验,在消息延迟的场景下,切换最好使用Direct点对点方式。我们公司有同事用过Topic模式。他们认为匹配路由规则没有问题,但实际上规则冲突消息被其他消费者消费了。MQ转消息本身就是默默处理的,所以一直找不到他死的原因,直到被其他同事无意中发现才解决。所以这种单人场景最稳妥的方式就是Direct,一对一绝对不会有问题。
综上所述,总结了以下几点:1)、无论是否使用,在安装RabbitMQ时顺便安装延时插件;2)、推荐使用死信队列作为主要方法;3)、没有太多地方使用延迟队列;4)、交换方式采用Direct点对点。最后把这个案例的代码地址放在评论里。有两种实现方式,直接运行即可。想学的可以下载看看。
原创文章纯手写。如果觉得有一点点帮助,欢迎点赞收藏哦~继续分享真实的心得和工作中的心得。喜欢就关注吧~更多最新技术文章可以关注GZH:【Java分享客栈】

猜你喜欢