当前位置: 首页 > 后端技术 > Java

分布式事务解决方案

时间:2023-04-01 21:32:48 Java

数据不会无故丢失,也不会莫名其妙的增加。性问题直接通过当地交易处理。2、随着时间的推移,用户越来越多,发现有一个Java服务支持不了,于是技术老大决定升级系统。根据系统的业务,拆分单个服务,然后也拆分开发人员。开发人员只开发和维护一个或几个服务中的问题。大家各司其职,相互配合。3.当然,服务拆分不是一蹴而就的。这是一项耗时耗力的巨大工程。大多数系统都经过多轮分裂,然后慢慢形成一个稳定的系统。遵循一个核心思路:先按照整体业务进行一轮拆分,再根据拆分后的业务模块进行细化拆分。4、服务拆分后,用户数有抗拒,但发现数据是在不同的服务访问的,这就引出了一个新的问题:如何保证跨服务器数据的一致性?当然,在一个跨服务的分布式系统中,不仅仅存在这个问题,还有其他一系列的问题,比如:服务可用性、服务容错、服务间调用的网络问题等等。只有数据的一致性问题在这里讨论。5.说到数据一致性,大致可以分为三种:强一致性,弱一致性,最终一致性。强一致性:数据一旦写入,随时可以读取到最新的值。弱一致性:写入一条数据,其他地方读取数据,可能查到的数据不是最新的最终一致性:是弱一致性的变种,根本不追求数据的一致性在系统中的时间。但经过一定时间后,数据最终会达到一致性。从这三种一致性模型可以看出,弱一致性和最终一致性一般是异步冗余的,而强一致性是同步冗余的,异步处理带来更好的性能,但也需要对处理数据进行补偿。同步意味着简单,但也不可避免地降低了系统的性能。2.理论上面提到的数据一致性问题其实就是分布式事务的问题。现在有一些解决办法。相信大家或多或少都见过。下面我就带大家回顾一下。2.1.两阶段提交2PC是一种强一致性设计方案,通过引入事务协调器来协调每个本地事务(也称为事务参与者)的提交和回滚。2PC主要分为两个阶段:1.第一阶段:事务协调器会向每个事务参与者发起一个开始事务的命令,每个事务参与者都会执行一个准备操作,然后回复事务协调器是否准备做完了。但是本地事务不会被提交,但是这个阶段需要锁定资源。2、第二阶段:事务协调器收到各个事务参与者的回复后,对各个参与者的回复进行统计。如果每个参与者都回复“可以提交”,那么事务协调器会发送commit命令,让参与操作者正式提交本地事务,释放所有资源,结束全局事务。但是如果一个参与者回复“拒绝提交”,那么事务协调器发送回滚命令,所有参与者回滚本地事务。全部回滚完成后,释放资源,取消全局事务。事务提交流程事务回滚流程当然这里也提到了2PC存在的问题。一种是同步阻塞,比较消耗性能。另一个是协调器失效的问题。一旦协调器失效,所有参与者将被阻塞处理资源锁定状态。2.2.3PC的三阶段提交主要是在2PC基础上的改进,主要解决2PC的阻塞问题。主要是把2PC的第一阶段分为两步,先准备,然后锁定资源,引入超时机制(也就是会造成数据不一致)。3PC的三个阶段包括:CanCommit、PreCommit、DoCommit。具体细节不再赘述,只说一个核心点:CanCommit期间不锁定资源,除非所有参与者都同意开始锁定资源。2.3.TCC灵活事务与之前的2PC和3PC相比,TCC与两兄弟的本质区别在于它是业务层面的分布式事务,而2PC和3PC是数据库层面的。TCC是三个词的缩写:Try、Confirm、Cancel,也分为这三个进程。Try:尝试,即尝试预留资源并锁定资源Confirm:确认,即执行预留资源,执行失败则重试Cancel:取消,取消预留资源,执行失败则重试从上面如图,TCC对业务的侵入性很强,紧耦合在一起。与2PC、3PC相比,TCC的试用范围更广,可以实现跨数据库、跨系统的分布式事务。缺点是实现这三个步骤需要在业务代码中开发大量的逻辑,并且需要和代码耦合,增加了开发成本。交易日志:在TCC模式下,交易发起者和交易参与者都会记录交易日志(交易状态、信息等)。这个事务日志是整个分布式事务在出现意外情况(宕机、重启、网络中断等)时提交和回滚的关键。幂等性:在TCC的第二阶段,无论是确认还是取消,都需要保证幂等性。一旦由于网络等原因执行失败,将发起连续重试。防挂:由于网络的不可靠性,当出现异常情况时,try请求可能比cancel请求晚到达。cancel可能执行空回滚,但是执行try请求时不会保留资源。2.4、Seata这里就不多说seata了。最常用的一种是AT模式。上次已经一步步分析过了。配置完成后,只需要在事务发起方法中添加@GlobalTransactional注解即可启动全局事务。无业务侵入,低耦合。有兴趣的可以参考之前关于Seata的讨论。3.应用场景知乎之前在一家公司遇到过这样的业务场景;用户通过页面投保并提交订单。订单通过上游服务处理与保单相关的业务逻辑,最终流入下游服务处理绩效、人员晋升、利润分配处理等。对于这种场景,双方处理的业务逻辑不是在同一个服务中,但访问不同的数据库。当涉及到数据一致性问题时,就需要使用分布式事务。对于上面介绍的几种方案,只讨论了理论和思路。我总结一下这个业务场景中使用的一个实现方案。采用本地消息表+MQ异步消息的方案,实现了事务的最终一致性,也符合当时的业务场景。一致性比较强,实现了高性能。下面是解决方案的思路图。真实业务处理可能有多种状态,需要明确哪些状态需要定时任务补偿。需要加入轮次的概念,重试多次后报警,人工干预处理。因为MQ和定时任务的存在,必然会出现重复请求,所以下游一定要做好幂等反重,否则会出现重复数据,导致数据不一致的情况对于落地实现,不要说不多说,直接上代码。首先定义两个表tb_order和tb_notice_message分别存储订单信息和本地交易信息CREATETABLE`tb_order`(`id`int(11)NOTNULLAUTO_INCREMENTCOMMENT'主键id',`user_id`int(11)NOTNULLCOMMENT'Ordererid',`order_no`varchar(255)CHARACTERSETlatin1NOTNULLCOMMENT'ordernumber',`insurance_amount`decimal(16,2)NOTNULLCOMMENT'insuranceamount',`order_amount`decimal(16,2)DEFAULTNULLCOMMENT'Premium',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`update_time`datetimeDEFAULTNULLONUPDATECURRENT_TIMESTAMPCOMMENT'更新时间',`is_delete`tinyint(4)DEFAULT'0'COMMENT'删除标志:0-不要删除;1-delete',PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=0DEFAULTCHARSET=utf8mb4;CREATETABLE`tb_notice_message`(`id`int(11)NOTNULLAUTO_INCREMENTCOMMENT'primarykeyid',`type`tinyint(4)NOTNULLCOMMENT'业务类型:1-order',`status`tinyint(4)NOTNULLDEFAULT'1'COMMENT'status:1-pending,2-processed,3-Alert',`data`varchar(255)NOTNULLCOMMENT'Information',`retry_count`tinyint(4)DEFAULT'0'COMMENT'Numberofretries',`create_time`datetimeNOTNULLCOMMENT'Creationtime',`update_time`datetimeDEFAULTNULLONUPDATECURRENT_TIMESTAMPCOMMENT'Updatetime',`is_delete`tinyint(4)NOTNULLDEFAULT'0'COMMENT'删除标志:0-不删除;1-delete',PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=0DEFAULTCHARSET=utf8mb4;处理订单服务,我们可以使用之前提到的装饰器模式,装饰这个服务保存本地交易和发送mq消息,交给装饰器类去做,服务只需要关心业务逻辑,也符合开闭原则。/***@author往事如风*@version1.0*@date2022/12/1310:58*@description*/@Service@Slf4j@AllArgsConstructorpublicclassOrderServiceimplementsBaseHandler{privatefinalOrderMapper订单映射器;/***订单处理方式:只处理订单关联逻辑*@paramo*@return*/@OverridepublicOrderhandle(Objecto){//订单信息Orderorder=Order.builder().orderNo("2345678").createTime(LocalDateTime.now()).userId(1).insuranceAmount(newBigDecimal(2000000)).orderAmount(newBigDecimal(5000)).build();orderMapper.insert(订单);退货单;}}新增OrderService装饰类OrderServiceDecorate,负责订单逻辑的扩展。这里是添加本地事务消息,发送MQ信息。扩展方法增加Transactional注解,保证订单逻辑和本地交易消息数据在同一个交易中进行。保证原子性。其中,事务消息标记过程在下游服务处理完业务逻辑后完成,然后更新过程完成。/***@author往事如风*@version1.0*@date2022/12/1418:48*@description*/@Slf4j@AllArgsConstructor@Decorate(scene=SceneConstants.ORDER,type=DecorateConstants.CREATE_ORDER)publicclassOrderServiceDecorateextendsAbstractHandler{privatefinalNoticeMessageMappernoticeMessageMapper;私有最终RabbitTemplaterabbitTemplate;/***装饰方法:扩展订单处理逻辑*@paramo*@return*/@Override@TransactionalpublicObjecthandle(Objecto){//调用服务方法实现策略逻辑Orderorder=(Order)service.handle(o);//扩展:1.保存交易消息,2.发送MQ消息//本地交易消息Stringdata="{\"orderNo\":\"2345678\",\"userId\":1,\"insuranceAmount\":2000000,\"订单金额\":5000}";NoticeMessagenoticeMessage=NoticeMessage.builder().retryCount(0).data(数据).status(1).type(1).createTime(LocalDateTime.now()).build();noticeMessageMapper.insert(noticeMessage);//发送mq消息log.info("发送mq消息....");rabbitTemplate.convertAndSend("trans","trans.queue.key",JSONUtil.toJsonStr(noticeMessage));返回空值;}}关于这个装饰器模式,我之前有提过。您可以阅读之前发布的内容。下游服务监听消息并处理自己的业务逻辑。之后(如:绩效、分红、促销等),需要发送MQ,上游服务监听消息,处理后更新本地交易状态。需要注意的是,下游服务需要做幂等处理,防止异常情况下对上游服务数据的重试。/***@author往事如风*@version1.0*@date2022/12/1318:07*@description*/@Component@Slf4j@RabbitListener(queues="trans.queue")publicclassFenRunListener{@AutowiredprivateRabbitTemplaterabbitTemplate;@RabbitHandlerpublicvoidorderHandler(Stringmsg){log.info("收听订单消息:{}",msg);//需要注意幂等性,幂等逻辑log.info("下游服务业务逻辑...");JSONObjectjson=JSONUtil.parseObj(msg);rabbitTemplate.convertAndSend("trans","trans.update.order.queue.key",json.getInt("id"));}}插一句题外话关于幂等性的处理,我这里大致有两种思路,1、比如根据订单号检查记录是否存在,存在则直接返回成功。2、redis存储一个唯一的请求号,之后删除处理,没有请求数直接返回成功,可以写个AOP处理,和业务隔离,言归正传,监听上游服务消息,下游发送MQ消息,本地事务消息更新为已处理,分布式事务过程结束。/***@author往事如风*@version1.0*@date2022/12/1318:29*@description*/@Component@Slf4j@RabbitListener(queues="trans.update.order.queue")公共类OrderListener{@AutowiredprivateNoticeMessageMappernoticeMessageMapper;@RabbitHandlerpublicvoidupdateOrder(IntegermsgId){log.info("监听消息,更新本地事务消息,消息id:{}",msgId);NoticeMessagemsg=NoticeMessage.builder().status(2).id(msgId).updateTime(LocalDateTime.now()).build();noticeMessageMapper.updateById(msg);}}当出现异常情况时,会通过定时任务轮询发送消息给MQ,尽最大努力让下游服务达到数据一致性,当然也要设置重试上限;如果达到上限后还是失败,就得考虑下游服务本身有问题(可能是代码逻辑的问题)。/***@author往事如风*@version1.0*@date2022/12/1410:25*@description*/@Configuration@EnableScheduling@AllArgsConstructor@Slf4jpublicclassRetryOrderJob{privatefinalRabbitTemplaterabbitTemplate;privatefinalNoticeMapperMappernoticeMessage/***最大自动重试次数*/privatefinalIntegerMAX_RETRY_COUNT=5;@Scheduled(cron="0/20****?")publicvoidretry(){log.info("定时任务,重试异常顺序");LambdaQueryWrapperwrapper=Wrappers.lambdaQuery(NoticeMessage.class);wrapper.eq(NoticeMessage::getStatus,1);ListnoticeMessages=noticeMessageMapper.selectList(wrapper);for(NoticeMessage:noticeMessagenoticeMessages){//重新发送mq消息rabbitTemplate.convertAndSend("trans","trans.queue.key",JSONUtil.toJsonStr(noticeMessage));//重试次数+1noticeMessage.setRetry计数(noticeMessage.getRetryCount()+1);noticeMessageMapper.updateById(noticeMessage);//判断重试次数,等于最大限制,直接更新为告警状态if(MAX_RETRY_COUNT.equals(noticeMessage.getRetryCount())){noticeMessage.setStatus(3);noticeMessageMapper.updateById(noticeMessage);//发送告警,通知相应人员//告警逻辑(短信、邮件、七微群等)....}}}}其实有一个问题是,当一个上游服务对应多个下游服务,此时往往无法保存一条本地消息记录。这里可以在消息表中增加一个字段next_server_count,表示一个订单发起者需要调用的下游服务的个数。上游服务在监听的时候,会在每个下游回调中减1,直到值为0,然后更新待处理的状态。但是为了控制并发,这个字段被多个下游服务共享。另一种解决方案是为每个下游服务记录一个事务消息,并使用类型字段来区分和标记类型。为事务消息实现上下游一对一的关系。最后,在达到最大重试次数后,可以将消息添加到报警列表中。这个告警列表可以在管理后台或者其他监控系统中显示一些必要的信息,以便公司内部人员进行人工干预处理。异常数据使数据达到最终一致性。4.总结其实分布式事务并没有完美的解决方案。只能说是尽量满足业务需求和数据一致性。如果程序无法处理,就要靠人类来为数据制定补偿计划。5.参考源码编程文档:https://gitee.com/cicadasmile/butte-java-note应用仓库:https://gitee.com/cicadasmile/butte-flyer-parent