当前位置: 首页 > 后端技术 > Java

RabbitMQ中的消息会过期吗?

时间:2023-04-01 17:18:42 Java

@[toc]RabbitMQ中的消息长时间不消费会不会过期?用过RabbitMQ的朋友可能会有这样的疑问。今天宋弟兄就来和大家讨论一下这个问题。1.违约情况首先,我们来看一下违约情况。默认情况下,消息是不会过期的,也就是我们在工作日发送消息的时候,如果不设置任何消息过期相关的参数,那么消息是不会过期的,即使消息没有被消费,也会一直被存储在队列中。在这种情况下,我不需要演示具体的代码。宋哥之前的文章说到RabbitMQ基本都是这样的。2、TTLTTL(Time-To-Live),消息存活的时间,即消息的有效期。如果我们希望消息有生存时间,那么我们可以通过设置TTL来实现这个需求。如果消息的存活时间超过了TTL还没有被消息发送出去,此时消息就会变成死信。关于死信和死信队列,后面宋哥给大家介绍一下。TTL的设置有两种不同的方式:在声明队列的时候,我们可以在队列属性中设置消息的有效期,这样所有进入队列的消息都会有相同的有效期。发送消息时设置消息的有效期,使不同的消息有不同的有效期。如果两个都设置了呢?以最短时间为准。当我们设置消息的有效期时,消息过期就会从队列中删除(进入死信队列,下同,不再标注),但是相应的删除时机有些区别对两种方法:对于第一种方式,当消息队列设置过期时间时,消息过期会被删除,因为消息进入RabbitMQ后,存在于一个消息队列中,而队列的头部是最早过期的消息,所以RabbitMQ只需要一个定时器任务从头部开始扫描过期消息,有则删除。对于第二种方法,当消息过期时,不会立即删除,而是在消息传递给消费者时删除,因为在第二种方法中,每条消息的过期时间是不同的。要知道哪条消息过期了,必须遍历队列中的所有消息。当有很多消息时,这会消耗更多的性能。因此,对于第二种方法,在消息要投递给消费者时删除消息。介绍完TTL,我们再来看看具体的用法。接下来所有的代码松哥都会以SpringBoot封装的AMPQ为例进行讲解。2.1单条消息的过期我们先来看看单条消息的过期时间。首先创建一个SpringBoot工程,引入Web和RabbitMQ依赖,如下:然后在application.properties中配置RabbitMQ的连接信息,如下:spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq。username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/接下来稍微配置消息队列:@ConfigurationpublicclassQueueConfig{publicstaticfinalStringJAVABOY_QUEUE_DEMO="javaboy_queue_demo";publicstaticfinalStringJAVABOY_EXCHANGE_DEMO="ange_pubic_ex"staticfinalStringHELLO_ROUTING_KEY="hello_routing_key";@BeanQueuequeue(){returnnewQueue(JAVABOY_QUEUE_DEMO,true,false,false);}@BeanDirectExchangedirectExchange(){returnnewDirectExchange(JAVABOY_EXCHANGE_DEMO,true,false);}Bean绑定binding(){returnBindingBuilder.bind(queue()).to(directExchange()).with(HELLO_ROUTING_KEY);}}这个配置类主要做了三件事:配置消息队列,配置交换机,将两者绑定在一起。首先配置一个消息队列,新建一个Queue:第一个参数是消息队列的名称;第二个参数表示消息是否持久化;第三个参数表示消息队列是否独占,一般我们设置为false,即不独占;第四个参数表示如果队列没有任何订阅的消费者,则自动删除该队列,一般适用于临时队列。配置DirectExchange交换机。将交换器和队列绑定在一起。这个配置应该很简单,没什么好解释的,有个独占性,松哥这里多说两句:关于独占性,如果设置为true,消息队列只能被创建的Connection访问它,其他的Connection都是无法访问消息队列的。如果尝试在不同的连接中重新声明或访问独占队列,系统会报资源锁定错误。另一方面,对于独占队列,当连接断开时,消息队列也会被自动删除(不管队列是否声明为持久队列)。接下来提供消息发送接口,如下:@RestControllerpublicclassHelloController{@AutowiredRabbitTemplaterabbitTemplate;@GetMapping("/hello")publicvoidhello(){Messagemessage=MessageBuilder.withBody("hellojavaboy".getBytes()).setExpiration("10000").build();rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO,消息);}}在创建Message对象的时候,我们可以设置消息的过期时间,这里我们设置消息的过期时间为10秒。就是这样!接下来我们启动项目,进行消息发送测试。消息发送成功后,由于没有消费者,消息不会被消费。打开RabbitMQ管理页面,点击Queues选项卡,10s后,我们会发现消息消失了:很简单!设置单条消息的过期时间就是在发送消息时设置消息的有效期。2.2队列消息过期设置队列的消息过期时间如下:@BeanQueuequeue(){Mapargs=newHashMap<>();args.put("x-message-ttl",10000);returnnewQueue(JAVABOY_QUEUE_DEMO,true,false,false,args);}设置完成后,我们修改消息发送逻辑如下:@RestControllerpublicclassHelloController{@AutowiredRabbitTemplaterabbitTemplate;@GetMapping("/hello")publicvoidhello(){Messagemessage=MessageBuilder.withBody("hellojavaboy".getBytes()).build();rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO,消息);}}可以看到消息可以正常发送,不需要设置消息过期时间。OK,启动项目,发消息测试。查看RabbitMQ管理页面,如下:可以看到消息队列的Features属性为D和TTL,D表示消息队列中的消息是持久化的,TTL表示消息会过期。10s后刷新页面,发现消息数已经归0。这是给消息队列设置消息过期时间。一旦设置,所有进入队列的消息都会有一个过期时间。2.3特殊情况还有一种特殊情况,就是将消息的过期时间TTL设置为0,即如果消息不能立即被消费,则立即丢弃。该特性可以部分替代RabbitMQ3.0之前支持的immediate参数。之所以部分替换是因为立即数参数会有一个basic.return方法在传递失败时返回消息体(这个功能可以使用死信队列来实现)。具体代码松哥就不演示了,这个应该更简单。3、死信队列中有些朋友不禁要问,删除的消息去哪了?真的被删了吗?不,不!这就涉及到死信队列了。接下来我们来看一下死信队列。3.1死信交换死信交换,Dead-Letter-Exchange就是DLX。死信开关是用来接收死信消息(DeadMessage)的,那么什么是死信消息呢?一般在以下几种情况下消息会变成死信消息:消息被拒绝(Basic.Reject/Basic.Nack),requeue参数设置为false。消息过期队列达到最大长度。当一个消息在一个队列中变成死信后,消息被发送出去,此时会被发送到DLX,绑定到DLX的消息队列称为死信队列。DLX本质上是一个普通的交换机。我们可以为任何队列指定DLX。当队列中出现死信时,RabbitMQ会自动将死信发布到DLX,然后路由到另一个绑定的DLX队列(即死信队列)。3.2死信队列很容易理解。死信交换机绑定的队列就是死信队列。3.3练习让我们看一个简单的例子。首先我们创建一个死信交换器,然后创建一个死信队列,然后将死信交换器和死信队列绑定在一起:publicstaticfinalStringDLX_EXCHANGE_NAME="dlx_exchange_name";publicstaticfinalStringDLX_QUEUE_NAME="dlx_queue_name";publicstaticfinalStringDLX_ROUTING_KEY="dlx_routing_key";/***配置死信交换**@return*/@BeanDirectExchangedlxDirectExchange(){returnnewDirectExchange(DLX_EXCHANGE_NAME,true,false);}/***配置死信队列*@return*/@BeanQueuedlxQueue(){returnnewQueue(DLX_QUEUE_NAME);}/***绑定死信队列和死信交换*@return*/@BeanBindingdlxBinding(){returnBindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLX_ROUTING_KEY);}这个其实和普通交换机、普通消息队列是一样的。接下来为消息队列配置死信开关,如下:@BeanQueuequeue(){Mapargs=newHashMap<>();//设置消息过期时间args.put("x-message-ttl",0);//设置死信交换args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);//设置死信routing_keyargs.put("x-dead-letter-routing-key",DLX_ROUTING_KEY);returnnewQueue(JAVABOY_QUEUE_DEMO,true,false,false,args);}就两个参数:x-dead-letter-exchange:配置死信交换。x-dead-letter-routing-key:配置死信routing_key。就是这样。以后发往这个消息队列的消息,如果出现nack、reject、expiration等问题,都会发往DLX,然后进入DLX绑定的消息队列。死信消息队列的消费和普通消息队列一样:@RabbitListener(queues=QueueConfig.DLX_QUEUE_NAME)publicvoiddlxHandle(Stringmsg){System.out.println("dlxmsg="+msg);}很简单~4.总结,今天就和小伙伴们聊一聊RabbitMQ中消息过期的问题。有兴趣的小伙伴可以试试哦~公众号江南一点鱼后台回复本文标题,即可获得本文案例下载链接。参考资料:blog.csdn.net/u012988901/article/details/88958654