当前位置: 首页 > 科技观察

Springboot + Rabbitmq 用了消息确认机制,感觉掉坑里了

时间:2023-03-13 14:03:43 科技观察

springboot+Rabbitmq使用了消息确认机制,感觉自己掉坑里了。转载本文请联系程序员内电史公众号。最近部门号召大家多组织一些技术分享会,说是活跃公司的技术氛围,但是我早就看透了,知道这个TM只是为了刷KPI而已。不过,话说回来,这的确是一件好事。与其开那些无聊的扯皮会议,多一些技术交流对个人成长还是很有帮助的。这次分享下springboot+rabbitmq是如何实现消息确认机制的,以及实际开发中的一点心得。其实整体内容还是比较简单的。有时候事情就是这么神奇,越简单的东西越容易出错。可以看到,使用RabbitMQ之后,我们的业务环节明显变长了。虽然实现了系统间的解耦,但是可能导致消息丢失的场景也变多了。例如:Messageproducer->rabbitmqserver(消息发送失败)Rabbitmqserver自身故障导致消息丢失Messageconsumer->rabbitmqservice(消费消息失败)所以能用中间件的尽量不要用,如果用使用只会增加麻烦。启用消息确认机制后,虽然很大程度上保证了消息的准确投递,但是由于频繁的确认交互,导致rabbitmq整体效率变低,吞吐量下降严重。对于不是很重要的消息,不建议使用消息确认机制。.下面先来实现一下springboot+rabbitmq的消息确认机制,再具体分析遇到的问题。一、准备环境1、引入rabbitmq依赖包org.springframework.bootspring-boot-starter-amqp2.修改application.properties配置在配置中,需要开启发送方和消费者的消息确认。spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest#发送方开启confirm确认机制spring.rabbitmq.publisher-confirms=true#发送方开启返回确认机制spring.rabbitmq.publisher-returns=true#######################################################设置消费者手动ackspring.rabbitmq.listener.simple.acknowledge-mode=manual#是否支持retryspring.rabbitmq.listener.simple.retry.enabled=true3.定义Exchange和Queue定义exchangeconfirmTestExchange和queueconfirm_test_queue,将queue绑定到exchange。@ConfigurationpublicclassQueueConfig{@Bean(name="confirmTestQueue")publicQueueconfirmTestQueue(){returnnewQueue("confirm_test_queue",true,false,false);}@Bean(name="confirmTestExchange")publicFanoutExchangeconfirmTestExchange(){returnnew"FanoutExchange("确认);Test}@BeanpublicBindingconfirmTestFanoutExchangeAndQueue(@Qualifier("confirmTestExchange")FanoutExchangeconfirmTestExchange,@Qualifier("confirmTestQueue")QueueconfirmTestQueue){returnBindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);}}rabbitmq的消息确认分为两部分Confirmation的消息接收。此处插入图片描述2.消息发送确认发送消息确认:用于producer生产者向broker发送消息,broker上的switchexchange向queue发送消息的过程中,确认消息是否发送成功队列。producer到rabbitmqbroker的消息有一个confirmCallback确认方式。有一个returnCallback回退模式,用于从exchange到队列的消息传递失败。我们可以使用这两个Callbacks来保证100%的消息传递。1、ConfirmCallback确认方式消息只要被rabbitmqbroker接收到就会触发confirmCallback回调。@Slf4j@ComponentpublicclassConfirmCallbackServiceimplementsRabbitTemplate.ConfirmCallback{@Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringcause){if(!ack){log.error("消息发送异常!");}else{log.info("发送者父亲已收到确认,correlationData={},ack={},cause={}",correlationData.getId(),ack,cause);}}}实现接口ConfirmCallback,重写其confirm()方法,其中有三个参数方法相关数据,确认,原因。correlationData:对象内部只有一个id属性,用来表示当前消息的唯一性。ack:消息传递给broker的状态,true表示成功。cause:表示下发失败的原因。但是broker收到的消息只能说明已经到达了MQ服务器,并不能保证消息一定会投递到目标队列。所以接下来需要使用returnCallback。2.ReturnCallback返回方式如果消息未能投递到目标队列,会触发回调returnCallback。一旦消息投递到队列不成功,这里会记录当前消息的详细投递数据,方便后续重发或补偿等操作。@Slf4j@ComponentpublicclassReturnCallbackServiceimplementsRabbitTemplate.ReturnCallback{@OverridepublicvoidreturnedMessage(Messagemessage,intreplyCode,StringreplyText,Stringexchange,StringroutingKey){log.info("returnedMessage===>replyCode={},replyText={}}exchangroute{",replyCode,replyText,exchange,routingKey);}}实现接口ReturnCallback,重写returnedMessage()方法,该方法有五个参数message(消息体)、replyCode(响应码)、replyText(响应内容)、exchange(开关)、routingKey(队列).下面是具体的消息发送。在rabbitTemplate中设置Confirm和Return回调。我们通过setDeliveryMode()持久化消息。为后续测试创建一个CorrelationData对象,并添加一个id为10000000000。@AutowiredprivateRabbitTemplaterabbitTemplate;@AutowiredprivateConfirmCallbackServiceconfirmCallbackService;@AutowiredprivateReturnCallbackServicereturnCallbackService;publicvoidsendMessage(Stringexchange,StringroutingKey,Objectmsg){/***确保消息发送失败后可以返回队列true);/***消费者后确认收到消息,手动ackreceipt回调处理*/rabbitTemplate.setConfirmCallback(confirmCallbackService);/***消息投递队列失败回调处理*/rabbitTemplate.setReturnCallback(returnCallbackService);/***发送消息*/rabbitTemplate.convertAndSend(exchange,routingKey,msg,message->{message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);returnmessage;},newCorrelationData(UUID.randomUUID().toString()));}3.消息接收确认消息接收确认比消息发送确认简单一点,因为只有一个消息接收t(确认)过程。@RabbitHandler注解标注的方法需要添加两个参数,channel(通道)和message。@Slf4j@Component@RabbitListener(queues="confirm_test_queue")publicclassReceiverMessage1{@RabbitHandlerpublicvoidprocessHandler(Stringmsg,Channelchannel,Messagemessage)throwsIOException{try{log.info("小付收到消息:{}",msg);//TODO具体业务channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exceptione){if(message.getMessageProperties().getRedelivered()){log.error("消息重复处理失败并再次拒绝Receive...");channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);//拒绝消息}else{log.error("消息即将返回队列等待再次处理...”);通道。basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}}}}消费消息有3种接收方式。下面分析一下每个方法的含义。1.basicAckbasicAck:表示确认成功。使用这种回执方式后,消息会被rabbitmqbroker删除。voidbasicAck(longdeliveryTag,booleanmultiple)deliveryTag:表示消息的投递序号。每次消费或重新传递消息时,deliveryTag都会增加。在手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。multiple:是否批量确认,如果值为true,所有小于当前消息deliveryTag的消息都会一次性ack。举个栗子:假设我先发送了3条deliveryTags分别为5、6、7的消息,但是都没有被确认。当我发送第四条消息时,deliveryTag为8,multiple设置为true。5、6、7、8的所有消息都会被确认。2.basicNackbasicNack:表示失败确认。一般在消费消息业务异常时使用该方法,可以将消息重新投入队列。voidbasicNack(longdeliveryTag,booleanmultiple,booleanrequeue)deliveryTag:表示消息传递序号。multiple:是否批量确认。requeue:如果值为true,消息将被重新排队。3.basicRejectbasicReject:拒绝消息,与basicNack的区别在于不能进行批量操作,其他用法非常相似。voidbasicReject(longdeliveryTag,booleanrequeue)deliveryTag:表示消息传递序号。requeue:如果值为true,消息将被重新排队。4.测试发送消息测试消息确认机制是否有效。从执行结果可以看出,sender发送消息成功回调,consumer成功消费消息。使用抓包工具Wireshark观察rabbitmqamqp协议交互的变化,也有一个ack过程。五、踩坑日志1.不要忘记确认消息。这是一个非常没有技术含量的坑,但是非常容易出错。开启消息确认机制,消费消息时不要忘记channel.basicAck,否则消息会一直存在,导致重复消费。2.消息无限投递刚开始接触消息确认机制时,消费端代码是这样写的。思路很简单:业务逻辑处理完后确认消息,inta=1/0异常后重置消息入队。@RabbitHandlerpublicvoidprocessHandler(Stringmsg,Channelchannel,Messagemessage)throwsIOException{try{log.info("Consumer2received:{}",msg);inta=1/0;channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exceptione){channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}}但是有个问题就是一旦业务代码有bug,99.9%的情况不会自动修复,一条消息会无限投进队列,消费者会无限执行,造成死循环。这里插一张图描述下本地CPU瞬间爆满。你可以想象当服务在生产环境中崩溃时我有多慌张。而rabbitmq管理只有一条未确认消息。此处插入图片描述经过测试分析发现,当消息重新投递到消息队列时,消息不会返回到队尾,而是仍然在队头。消费者会立即消费消息,业务流程会抛出异常,消息会重新入队,等等。消息队列的处理被阻塞,无法运行正常的消息。而我们当时的解决方案是先应答消息,然后消息队列会删除消息,我们再次将消息发送到消息队列,异常消息会放在消息队列的尾部,这样保证了消息不会丢失,也保证了业务的正常进行。channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//重新发送消息到队尾),MessageProperties.PERSISTENT_TEXT_PLAIN,JSON.toJSONBytes(msg));但是这种方法并没有解决根本问题,还是会时不时的报错信息。后期优化了消息重试次数。达到重试次数上限后,手动确认并队列删除该消息,并将该消息持久化到MySQL并推送告警,并进行人工处理和定时任务作为补偿。3、如何保证MQ消费的重复消费是幂等的?这个需要根据具体的业务来确定。可以使用MySQL或者redis来持久化消息,检查消息中的唯一属性。