当前位置: 首页 > 科技观察

消息消费失败如何处理?

时间:2023-03-17 17:16:15 科技观察

本文转载自微信公众号《Java极客技术》,作者鸭血范。转载本文请联系Java极客技术公众号。一、简介在介绍消息中间件MQ之前,先简单了解一下为什么要引用消息中间件。例如,在电商平台上,普通用户在下单时会经历以下流程。用户下单时,订单创建后,会调用第三方支付平台扣除用户账户金额。如果平台扣款成功,会将结果通知给相应的业务系统,然后业务系统会更新订单状态,同时调用仓库接口,减少库存,通知物流发货!试想一下,从订单状态更新,到库存清点,再到物流发货通知,都是在一个方法中同步完成的。如果用户支付成功,订单状态更新也是成功的,但是如果没有扣除库存或者通知物流发货步骤,就会出现问题。用户已经支付成功,但是没有扣掉仓库中的库存,导致整个交易失败!一个订单失败,老板你可以装作没看见,但万一上千个订单因此而失败,系统造成的业务损失将是巨大的,老板可能坐不住了!因此,针对这种业务场景,架构师引入了异步通信技术方案来保证服务的高可用。大致流程如下:当订单系统收到支付平台发送的扣款结果后,会向MQ消息中间件发送订单消息,同时更新订单状态。在另一端,仓库系统异步监听订单系统发送的消息。收到订单消息后,将运营扣减库存、通知物流公司发货等服务!优化后的流程下,即使扣存服务失败,也不会影响用户交易。正如《人月神话》所说,软件工程没有灵丹妙药!当引入MQ消息中间件的时候,也会带来另外一个问题。如果MQ消息中间件突然死机,导致消息发不出去,那么仓库系统就收不到订单消息,进而无法发货!为了解决这个问题,业界主流的方案是采用集群部署,一主多从的模式,从而实现服务的高可用,即使一台机器突然宕机,依然可以保证服务可用。服务器故障期间,将通过运维手段重启服务,之后服务仍将正常运行!但是还有一个问题。如果仓库系统已经收到订单消息,但是业务处理异常,或者服务器异常,导致当前商品库存没有减少,没有发货!这个时候怎么处理呢?我们今天要介绍的就是这种场景。如果消息消费失败,应该怎么处理呢?2.解决方案对于消息消费失败的场景,我们一般是这样处理的:当消息消费失败时,重新推送消息。如果重试次数超过最大值,异常信息将被存储到数据库中,然后手动介入故障排除并进行手动重试。当客户端消费消息失败时,我们会将异常消息添加到一个消息重试对象中,设置最大重试次数,当重试次数超过最大值,异常信息会保存在MongoDB数据库中,方便后续查询异常信息。基于上面的系统模型,我们可以写一个公共的重试组件,废话少说,动手吧!3、代码实践本补偿服务由rabbitmq消息中间件处理,其他消息中间件的处理思路类似!3.1.创建消息重试实体类@Data@EqualsAndHashCode(callSuper=false)@Accessors(chain=true)publicclassMessageRetryDTOimplementsSerializable{privatestaticfinallongserialVersionUID=1L;/***原始消息体*/privateStringbodyMsg;/***消息源ID*/privateStringsourceId;/***MessageSourceDescription*/privateStringsourceDesc;/***Exchange*/privateStringexchangeName;/***RoutingKey*/privateStringroutingKey;/***Queue*/privateStringqueueName;/***State,1:Initialization,2:成功,3:失败*/privateIntegerstatus=1;/***最大重试次数*/privateIntegermaxTryCount=3;/***当前重试次数*/privateIntegercurrentRetryCount=0;/***重试间隔(毫秒)*/privateLongretryIntervalTime=0L;/***任务失败消息*/privateStringerrorMsg;/***创建时间*/privateDatecreateTime;@OverridepublicStringtoString(){return"MessageRetryDTO{"+"bodyMsg='"+bodyMsg+'\''+',sourceId='"+sourceId+'\''+",sourceDesc='"+sourceDesc+'\''+",exchangeName='"+exchangeName+'\''+",routingKey='"+路由ngKey+'\''+",queueName='"+queueName+'\''+",status="+status+",maxTryCount="+maxTryCount+",currentRetryCount="+currentRetryCount+",retryIntervalTime="+retryIntervalTime+",errorMsg='"+errorMsg+'\''+",createTime="+createTime+'}';}/***检查重试次数是否超过最大值**@return*/publicbooleancheckRetryCount(){retryCountCalculate();//检查重试次数是否超过最大if(this.currentRetryCountmessageHeaders=message.getMessageProperties().getHeaders();if(messageHeaders.containsKey("message_retry_info")){ObjectretryMsg=messageHeaders.get("message_retry_info");if(Objects.nonNull(retryMsg)){returnJSONObject.parseObject(String.valueOf(retryMsg),MessageRetryDTO.class);}}//自动添加业务消息到补偿实体MessageRetryDTOmessageRetryDto=newMessageRetryDTO();messageRetryDto.setBodyMsg(newString(message.getBody(),StandardCharsets.UTF_8));messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedKey());messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());messageRetryDto.setCreateTime(newDate());returnmessageRetryDto;}/***异常消息重传entry*@paramretryDto*/privatevoidretrySend(MessageRetryDTOretryDto){//将补偿消息实体放入header中,保持原消息内容不变message_retry_info",JSONObject.toJSON(retryDto));Messagemessage=newMessage(retryDto.getBodyMsg().getBytes(),messageProperties);rabbitTemplate.convertAndSend(retryDto.getExchangeName(),retryDto.getRoutingKey(),message);}/***将异常信息存入mongodb*@paramretryDto*/privatevoidsaveMessageRetryInfo(MessageRetryDTOretryDto){try{mongoTemplate.save(retryDto,"message_retry_info");}catch(Exceptione){log.error("保存异常消息到mongodb失败,消息数据:"+retryDto.toString(),e);}}}3.3。监听服务类在消费者端应用的时候写起来也很简单。例如,对于库存扣减操作,我们可以按照以下方式进行处理!@ComponentpublicclassOrderServiceListenerextendsCommonMessageRetryService{privatestaticfinalLoggerlog=LoggerFactory.getLogger(OrderServiceListener.class);/***监听订单系统成功消息*@parammessage*/@RabbitListener(queues="mq.order.add")publicvoidconsume(Messagemessage){log.info("收到订单成功信息:{}",message.toString());super.initMessage(message);}@Overrideprotectedvoidexecute(MessageRetryDTOmessageRetryDto){//调用库存扣除服务并抛出一个业务异常}@OverrideprotectedvoidsuccessCallback(MessageRetryDTOmessageRetryDto)tryDto){//业务处理成功,回调}@OverrideprotectedvoidfailCallback(MessageRetryDTOmessageRetryDto){//业务处理失败,回调}}当消息消费失败,超过最大次数时,会将消息存入mongodb,然后像普通数据库一样操作,同样可以通过web界面查询异常消息,针对特定场景进行重试!4.小结可能有同学会问,为什么不把异常信息存储在数据库中呢?一开始确实是存储在MYSQL中,但是随着业务的快速发展,订单消息的数据结构越来越复杂,数据量也非常大,甚至大到文本类型在MYSQL无法存储。同时这个数据结构不适合存储在MYSQL中,所以迁移到mongodb!本文主要针对消息消费失败的场景,讲解基本方案和代码实践。可能有理解不到位的地方,欢迎批评指正!

最新推荐
猜你喜欢