当前位置: 首页 > 科技观察

SpringBoot分布式事务的Besteffort通知

时间:2023-03-17 12:47:04 科技观察

环境:springboot.2.4.9+RabbitMQ3.7.4什么是besteffortnotification这是一个充值案例交互流程:1.账户系统调用充值系统接口。2、充值系统完成支付,向账户系统发送充值结果通知。如果通知失败,充值系统会根据策略重复通知。3、账户系统收到充值结果通知修改充值状态。4.如果账户系统没有收到通知,会主动调用充值系统接口查询充值结果。通过上面的例子,我们总结了best-effort通知方案的目标:发起通知者通过一定的机制尽量将业务处理结果通知给接收者。具体包括:1.有一定的消息重复通知机制。因为通知的接收者可能还没有收到通知,这时候肯定有一定的机制来重复通知消息。2.消息校对机制。如果接收者尽最大努力仍未收到通知,或者接收者消费消息后想再次消费消息,则接收者可以主动向通知者查询消息信息以满足需求。尽力而为通知和可靠的消息一致性之间有什么区别?1.解决方案不同。可靠的消息一致性。发起方需要确保消息发送出去并发送给接收方。消息的可靠性由发起通知决定。方来保证。尽力通知。发起方尽量将业务处理结果通知通知方,但可能收不到消息。在这种情况下,通知方需要主动调用通知方接口查询业务处理结果和通知的可靠性。关键在于通知的接收者。2、两者的业务应用场景不同。消息可靠一致性关注交易过程的交易一致性,以异步方式完成交易。尽力而为通知专注于交易后通知交易,这意味着交易结果的可靠通知。3、技术方案的方向不同。可靠的消息一致性需要解决消息从发送到接收的一致性,即消息的发送和接收。Best-effortnotification不能保证消息从发送到接收的一致性,只是提供消息接收的可靠性机制。可靠的机制是尽量将消息通知接收方,当消息无法被接收方接收时,接收方会主动查询并消费。BestEffortNotificationwithRabbitMQ关于RabbitMQ的相关文章《SpringBoot RabbitMQ消息可靠发送与接收 》、《RabbitMQ消息确认机制confirm 》。项目结构两个子模块users-mananger(账户模块),pay-manager(支付模块)依赖org.springframework.bootspring-boot-starter-data-jpaorg.springframework.bootspring-boot-starter-weborg.springframework.boot复制代码spring-boot-starter-amqpmysqlmysql-connector-java运行时子模型pay-manager配置文件server:port:8080---spring:rabbitmq:host:localhostport:5672username:guestpassword:guestvirtual-host:/publisherConfirmType:correlatedpublisherReturns:truelistener:simple:concurrency:5maxConcurrency:10prefetch:5acknowledgeMode:MANUALretry:enabled:trueinitialInterval:3000maxAttempts:3defaultRequeueRejected:false实体类记录充值金额及账户信息@Entity@Table(name="t_pay_info")publicclassPayInfoimplementsSerializable{@IdprivateLongid;privateBigDecimalmoney;privateLongaccountId;}DAO及ServicepublicinterfacePayInfoRepositoryextendsJpaRepository{PayInfofindByOrderId(StringorderId);}@ServicepublicclassPayInfoService{@ResourceprivatePayInfoRepositorypayInfoRepository;@ResourceprivateRabbitTemplaterabbitTemplate;//数据保存完后发送消息(这里发送消息可以应用确认模式或事物模式)@TransactionalpublicPayInfosavePayInfo(PayInfopayInfo){payInfo.setId(System.currentTimeMillis());PayInforesult=payInfoRepository.save(payInfo);CorrelationDatacorrelationData=newCorrelationData(UUID.randomUUID().toString().replaceAll("-",""));try{rabbitTemplate.convertAndSend("pay-exchange","pay.#",newObjectMapper().writeValueAsString(payInfo),correlationData);}catch(AmqpException|JsonProcessingExceptione){e.printStackTrace();}returnresult;}publicPayInfoqueryByOrderId(StringorderId){returnpayInfoRepository.findByOrderId(orderId);}}支付完成后发送消息控制器接口@RestController@RequestMapping("/payInfos")publicclassPayInfoController{@ResourceprivatePayInfoServicepayInfoService;//支付接口@PostMapping("/pay")publicObjectpay(@RequestBodyPayInfopayInfo){payInfoService.savePayInfo(payInfo);return》支付已提交,返回"";}@GetMapping("/queryPay")publicObjectqueryPay(StringorderId){returnpayInfoService.queryByOrderId(orderId);}}子模块users-manager应用配置server:port:8081---spring:rabbitmq:host:localhostport:5672用户名:guestpassword:guestvirtual-host:/publisherConfirmType:correlatedpublisherReturns:truelistener:simple:concurrency:5maxConcurrency:10prefetch:5acknowledgeMode:MANUALretry:enabled:trueinitialInterval:3000maxAttempts:3defaultRequeueRejected:false@Userclass_Table@Entityclass@EntityIdprivateBStringname;privateLongid}账户信息表@Entity@Table(name="t_users_log")publicclassUsersLog{@IdprivateLongid;privateStringorderId;//0:支持室温付付,1:已,2:@@@column(columndefinition=“intdefault0”)UsersLog,Long>{UsersLogfindByOrderId(StringorderId);}服务类@ServicepublicclassUsersService{@ResourceprivateUsersRepositoryusersRepository;@ResourceprivateUsersLogRepositoryusersLogRepository;@TransactionalpublicbooleanupdateMoneyAndLogStatus(Longid,StringorderId){UsersLogusersLog=usersLogRepository.find&Logatll&Status==userId(orderId);@TransactionalpublicbooleanupdateMoneyAndLogStatus(Longid,StringorderId)()){thrownewRuntimeException("已支付");}Usersusers=usersRepository.findById(id).orElse(null);if(users==null){thrownewRuntimeException("账户不存在");}users.setMoney(users.getMoney().add(usersLog.getMoney()));usersRepository.save(users);usersLog.setStatus(1);usersLogRepository.save(usersLog);returntrue;}@TransactionalpublicbooleansaveLog(UsersLogusersLog){usersLog.setId(System.currentTimeMillis());usersLogRepository.save(usersLog);returntrue;}}消息监听@ComponentpublicclassPayMessageListener{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(PayMessageListener.class);@ResourceprivateUsersServiceusersService;@SuppressWarnings("unchecked")@RabbitListener(queues={"pay-queue"})@RabbitHandlerpublicvoidreceive(Messagemessage,Channelchannel){longdeliveryTag=message.getMessageProperties().getDeliveryTag();byte[]buf=null;try{buf=message.getBody();logger.info("接收到消息:{}",newString(buf,"UTF-8"));Mapresult=newJsonMapper().readValue(buf,Map.class);Longid=((Integer)result.get("accountId"))+0L;StringorderId=(String)result.get("orderId");usersService.updateMoneyAndLogStatus(id,orderId);channel.basicAck(deliveryTag,true);}catch(Exceptione){logger.error("消息接收出现异常:{},异常Message:{}",e.getMessage(),newString(buf,Charset.forName("UTF-8")));e.printStackTrace();try{//应该把这种异常消息放到死信中人工检查队列channel.basicReject(deliveryTag,false);}catch(IOExceptione1){logger.error("拒绝消息重入队列异常:{}",e1.getMessage());e1.printStackTrace();}}}}Controller接口@RestController@RequestMapping("/users")publicclassUsersController{@ResourceprivateRestTemplaterestTemplate;@ResourceprivateUsersServiceusersService;@PostMapping("/pay")publicObjectpay(Longid,BigDecimalmoney)throwsException{HttpHeadersheaders=newHttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);StringorderId=UUID.randomUUID().toString().replaceAll("-","");Mapparams=newHashMap<>();params.put("accountId",String.valueOf(id));params.put("orderId",orderId);params.put("money",money.toString());UsersLogusersLog=newUsersLog();usersLog.setCreateTime(newDate());usersLog.setOrderId(orderId);usersLog.setMoney(money);usersLog.setStatus(0);usersService.saveLog(usersLog);HttpEntity请求stEntity=newHttpEntity(newObjectMapper().writeValueAsString(params),headers);returnrestTemplate.postForObject("http://localhost:8080/payInfos/pay",requestEntity,String.class);}}以上是两个子模块的所有代码,针对初始数据账户子模块控制台进行了测试支付子模块控制台数据表数据完成!!!