消息可靠传输是面试必问的问题之一。为了保证可靠的消息传输,生产端开启confirm模式,持久化开启RabbitMQ,消费者端关闭自动ack模式。环境配置SpringBoot集成了RabbitMQ来发送消息。添加maven依赖org.springframework.bootspring-boot-starterorg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-amqp复制代码添加application.yml配置文件spring:rabbitmq:host:192.168.3.19port:5672username:adminpassword:xxxx配置交换机、队列以及绑定@BeanpublicDirectExchangemyExchange(){DirectExchangedirectExchange=newDirectExchange("myExchange");返回直接交换;}@BeanpublicQueuemyQueue(){Queuequeue=newQueue("myQueue&q不;);返回队列;}@BeanpublicBindingbinding(){returnBindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");}生成发送消息@AutowiredprivateRabbitTemplaterabbitTemplate;@GetMapping("/send")publicStringsend(Stringmessage){rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);System.out.println("【发送消息】"+message)return"【发送消息】"+message;}消费者接收消息@RabbitListener(queuesToDeclare=@Queue("myQueue"))publicvoidprocess(Stringmsg,Channelchannel,Messagemessage){SimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss");日期date=newDate();字符串时间=sdf.format(date);System.out.println("【收到信息】"+msg+"当前时间"+time);call生产端发送消息hello,控制台输出:[发送消息]hello[接收消息]hello当前时间2022-05-1210:21:14表示消息已成功接收。消息丢失分析。一条消息从生产到消费,消息丢失可能发生在以下几个阶段:Producerloss:Producer无法传输到RabbitMQ存储端Lost:RabbitMQ存储本身挂掉了Consumer端丢失:由于网络问题,存储无法发送给消费者,或者消费者挂了,不能送去正常消费。RabbitMQ支持生产端、存储端、消费端的可靠传输。生产阶段生产阶段通过请求确认机制确保消息的可靠传输。向RabbitMQ服务器发送消息后,RabbitMQ收到消息后,向发送方返回请求确认,表示RabbitMQ服务器已成功接收到消息。配置application.ymlspring:rabbitmq:#消息确认机制producer->switchpublisher-confirms:true#消息返回机制switch->queuepublisher-returns:true配置@Configuration@Slf4jpublicclassRabbitConfig{@AutowiredprivateConnectionFactoryconnectionFactory;@BeanpublicRabbitTemplaterabbitTemplate(){RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,String.cause){“log:”+correlationData);log.info("【确认】"+ack);log.info("【原因】"+原因);if(ack){log.info("【发送成功】");}else{log.info("【发送失败】correlationData:"+correlationData+"cause:"+原因);}}});rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(newRabbitTemplate.ReturnCallback(){@OverridepublicvoidreturnedMessage(Messagemessage,intreplyCode,StringreplyText,Stringexchange,StringroutingKey){log.warn("【消息发送失败】");log.info("【message】"+message);log.info("【replyCode】"+replyCode);}});返回兔子模板;}}messagefromproduction或者到switch,有一个confirmCallback确认方式。消息发送成功后,消息会调用方法confirm(CorrelationDatacorrelationData,booleanack,Stringcause),根据ack判断消息是否发送成功。消息从Exchange发送到Queue,返回方式为returnCallback。发送消息产品消息控制台输出如下:[发送消息]产品消息[接收消息]产品消息当前时间2022-05-1211:27:56[correlationData]:null[ack]true[cause]null[发送成功]生产端模拟??消息丢失有两种解决方案:发送消息后立即关闭broker,后者关闭网络,但是broker关闭后,控制台会一直报错,消息也会报错500错误。发送一个不存在的开关://myExchange修改为myExchangexxxxxrabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);结果:[correlationData]:null【ack】false【cause】通道错误;协议方法:#method(reply-code=404,reply-text=NOT_FOUND-noexchange'myExchangexxxxx'invhost'/',class-id=60,method-id=40)【发送失败】发送失败时,可以查看消息重试交换机是否正确,发送不存在的队列:交换机收到消息,返回成功通知,控制台输出:[correlationData]:CorrelationData[id=7d468b47-b422-4523-b2a2-06b14aef073c]【ack】true【cause】null【发送成功】交换机没有找到队列,返回失败消息:【消息发送失败】【消息】产品消息【replyCode】312RabbitMQ启用队列持久化,创建的队列和交换机的默认配置是持久化的。首先正确设置队列和开关,修改consumer监听的队列,让消息存放在队列中。修改队列的持久化为非持久化:@BeanpublicQueuemyQueue(){Queuequeue=newQueue("myQueue",false);返回队列;}发送消息后,消息存入队列,然后重启RabbitMQ,消息不存在了。设置队列持久化:@BeanpublicQueuemyQueue(){Queuequeue=newQueue("myQueue",true);返回队列;}重启后,队列中的消息依然存在。消费端消费端默认开启ack自动确认模式。当队列消息被消费者接收到后,无论是否有来自消费者端的消息,都会自动删除队列中的消息。因此,为了保证消费者能够成功消费消息,将自动模式改为手动确认模式:修改application.yml文件spring:rabbitmq:#手动消息确认listener:simple:acknowledge-mode:manual收到消息后,需要手动确认:channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);@RabbitListener(queuesToDeclare=@Queue("myQueue"))publicvoidprocess(Stringmsg,Channelchannel,Messagemessage){SimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss");日期date=newDate();字符串时间=sdf.format(date);System.out.println("【收到信息】"+msg+"当前时间"+time);System.out.println(message.getMessageProperties().getDeliveryTag());尝试{channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(IOExceptione){e.printStackTrace();如果没有添加:channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);发送两条消息收到消息后,没有确认,放回队列:重启项目,之后,队列中的消息会发送给消费者,但没有ack确认,会继续放回队列并添加channel.basicAck,然后重启项目:队列消息会被删除。basicAck方法的最后一个参数multiple,表示在queue之前被删除。multiple设置为true,后续队列全部清理:源码https://github.com/jeremylai7/springboot-learning/tree/master/spring-rabbitmq如果觉得文章对你有帮助,请点赞它!