当前位置: 首页 > 科技观察

苏宁如何解决交易与非交易的数据一致性问题_0

时间:2023-03-19 16:17:43 科技观察

【.com原稿】1.业务场景苏宁的系统作为一个线上线下大数据的智慧零售平台,对并发和效率的要求非常高。针对各种需求场景,苏宁都有相应的解决方案。苏宁售后订单系统每天处理大量订单的创建、修改和数据分发。为了保证高效率,我们的数据通过分库分表的方式存储在数据库集群中。同时,部分活跃的订单按照一定的算法缓存在Redis中,保证订单处理的效率。在分发数据时,我们通过苏宁自研的MQ消息平台(WindQ)向下游系统分发消息,可以达到准实时的处理效率,即消息可以被下游及时接收,并立即通过反馈接口进行反馈,避免实时接口在调用时可能因为网络和下游处理效率问题发生阻塞。基于以上背景,我们遇到过这样一个场景:1、在创建订单的时候,我们需要将订单保存到数据库和缓存中。2、同时将下游消息保存到数据库中,通过WindQ发送给下游系统。3、下游系统返回接收数据的结果后,需要根据返回结果删除保存到数据库中的数据。图1:虽然业务模型逻辑简单,但是由于数据库操作是在一个事务中进行的,Redis和消息队列的操作不能被事务控制。如果缓存成功了,但是交易失败了,可能会导致我们在系统中有异常的订单缓存。事实上,这个命令并不存在。另外,消息发送到下游后,如果下游处理速度很快,会立即返回处理结果,处理返回结果的程序会删除一条发送成功的数据。这时候可能本地事务还没有提交,所以删除操作做了无用功。***事务提交时,该删除的数据会一直在pending表中,成为异常数据。图2:异常场景描述:数据通过消息队列发送给下游系统,同时在数据库中保存一份,保证发送队列异常或下游接收和接收异常处理可以通过存储在数据库中的数据来补偿。该机制在下游系统反馈数据正常接收时删除数据。最后确保上下游数据一致。业务场景模拟系统由Business提供,在这个过程中,通过OrdeSaver和MessageSender进行具体的数据处理功能。业务是业务的入口,所有的业务逻辑都从这里开始。OrderSaver执行的业务是保存订单和缓存订单到Redis。MessageSender执行的业务是将发送的数据保存到下表中,并将数据发送到WindQ消息队列中。业务/***完整业务,有多个数据库操作,需要延迟的数据库以外的业务逻辑*/publicclassBusiness{publicvoidsaveInfo(Mapmap){System.out.println("业务开始事务开启保存数据操作start");newOrderSaver().saveOrderInfo(map);newMessageSender().saveMessageForSend(map);System.out.println("业务结束事务提交保存数??据操作end");}}OrderSaver/***保存服务订单的业务逻辑*/publicclassOrderSaver{publicvoidsaveOrderInfo(Mapmap,ExecutorHandlerexecutorHandler){System.out.println("Saveorderinfotodatebase");System.out.println("Cacheorderinfointoredis");}}MessageSender/***发送数据的业务逻辑*/publicclassMessageSender{publicvoidsaveMessageForSend(Mapmap,ExecutorHandlerexecutorHandler){//保存数据到数据库System.out.println("Savecreatemessagetodatebase.");System.out.println("Sendmessagetowindq");}}业务接口调用publicclassSample{//这里模拟调用业务接口的情况publicstaticvoidmain(String[]args){Businessbusiness=newBusiness();Mapparam=newHashMap<>();business.saveInfo(param);}}输出结果业务开始事务开启保存数据操作开始保存订单信息到数据库缓存订单信息到redisSave创建消息到datebase.发送消息到windq业务结束事务提交保存数??据操作End图3:常规输出上面的场景模拟了我们正常情况下的业务处理流程,此时会对应的出现我们上面描述的异常,我们期望的处理结果应该是:业务启动,交易开启,并且数据已保存操作开始将订单信息保存到数据库将创建消息保存到数据库。生意结束。事务提交保存数??据操作结束。缓存订单信息到redis发送消息到windq图4:预期结果2.解决方案如何处理这个问题,这时候我们应该缓存到Redis和发出WindQ的操作被延迟到事务提交。这样在事务提交之前,Resdis中就没有数据了,WindQ也不会发送数据。如果交易失败,也可以根据异常进行后续处理。即使事务成功缓存Redis或向WindQ发送错误,也可以根据数据库中存储的数据进行后续的补偿处理。2.1方案一:代码迁移我们首先想到的是通过代码迁移的方式,将逻辑代码从事务中移出。但是问题来了……2.1.1问题一:逻辑分离为了描述业务模型,我们尽可能简化真实场景。从模型的角度来说,我们可以把这两个非事务性的方法移出事务操作。但是由于保存订单和缓存Redis是一组操作,所以使用的数据是一致的,保存下发消息和发送WindQ也是一对对应的操作。代码是一起写的,当有符合逻辑和业务需求的变化时,可以一起处理。如果拆开来,统一数据的处理就不会感觉是一次性的了。在某些业务中,不仅有一个或两个配对操作。强行拆解多个pairedoperation的transaction-non-transaction-related逻辑显得规模很大,这种方式变得不可取。另外,在原来的处理方案中,WindQ处理后的数据保存数据库、缓存、发送是一致的,但是如果拆开写入,数据会从前传到后。保证数据可以从内部传递到外部也会成为一个问题。2.2方案二:延迟执行模型为了解决***方案的问题,我们做了如下设计。当数据要存入数据库时??,我们可以先定义要处理的数据和要做的动作,放在一个容器中。事务提交后,我们会再次获取容器,将之前定义好的操作和数据统一取出来,按要求执行。怎么做?经过一番思考,我们建立了一个模型ExecutorHandler——ExecutorExecutor可执行对象,用来定义一个需要执行的逻辑。例如,通过WindQ发送数据,或将订单刷新到缓存中。ExecutorHandler容器类在内部存储了一个Executors列表。代码逻辑在业务代码中,我们将需要执行的业务操作封装到Executor中。定义好后,通过ExecutorHandler的add方法添加到容器中。在业务逻辑执行过程中,先进行数据库操作,非数据库操作只在相应位置定义。整个事务完成后,通过ExecutorHandler的handle方法遍历所有Executor对象,执行需要延迟的非事务操作。.图5:业务模型ExecutorpublicinterfaceExecutor{voidexecute();}ExecutorHandlerpublicclassExecutorHandler{//需要执行的业务处理对象列表privateListexecutors;publicvoidhandle(){if(!(null==executors||executors.isEmpty())){for(Executorexecutor:executors){executor.execute();}}}publicvoidadd(Executorexecutor){if(null==executors){executors=newArrayList<>();}executors.add(executor);}}业务接口调用publicclassSample{//这里模拟调用业务接口的情况publicstaticvoidmain(String[]args){Businessbusiness=newBusiness();ExecutorHandlerhandler=newExecutorHandler();Mapparam=newHashMap();//执行业务方法,启动事务,保存数据business.saveInfo(param,handler);//执行延迟执行方法handler.handle();}}输出结果业务开始事务开始保存数据操作开始保存订单信息到数据库保存创建消息到数据库。业务端事务提交保存数??据操作结束缓存订单信息到redis发送消息给windqBusiness/***完成业务,有多个数据库操作,数据库外的业务逻辑需要延迟*/publicclassBusiness{publicvoidsaveInfo(Mapmap,ExecutorHandlerexecutorHandler){System.out.println("业务开始事务开始保存数据操作开始");newOrderSaver().saveOrderInfo(map,executorHandler);newMessageSender().saveMessageForSend(map,executorHandler);System.out.println("业务端事务提交保存数??据操作结束");}}MessageSender/***发送数据的业务逻辑*/publicclassMessageSender{publicvoidsaveMessageForSend(Mapmap,ExecutorHandlerexecutorHandler){//保存数据到数据库System.out.println("Savecreatemessagetodatebase.");//定义要延迟的业务逻辑,在容器中注册到executorHandler.add(newExecutor(){@Overridepublicvoidexecute(){System.out.println("Sendmessagetowindq");}});}}OrderSaver/***保存服务订单的业务逻辑*/publicclassOrderSaver{publicvoidsaveOrderInfo(Mapmap,ExecutorHandlerexecutorHandler){System.out.println("Saveorderinfotodatebase");//这个就是所谓的回调函数executorHandler.add(newExecutor(){@Overridepublicvoidexecute(){System.out.println("Cacheorderinfointoredis");}});}}2.2.1问题2:上述传参方案解决了延迟执行的问题,但是此时我们发现由于使用ExecutorHandler,这时候就需要随时随地传递对象,考虑如何降低对象的攻击性。静态变量:使用时需要考虑同步和清理问题,在多线程的情况下容易造成逻辑混乱。使用。成员变量:还有数据清洗的问题,不推荐也不使用。2.2.2问题2的解决方法:使用ThreadLocal传参是否存在整个线程内的生命周期?这时候,我们就需要用到ThreadLocal。通过ThreadLocal获取ExecutorHandler不失为一个有效的解决方案。由于ThreadLocal对象用完需要移除,所以需要集中调用这个方法。在实现的时候,我们定义了HandlerThreadLocal类。HandlerThreadLocal对象负责通过ThreadLocal的get方法获取线程局部的ExecutorHandler对象,并执行其handle方法(具体实现请参考以下代码)。执行业务操作后,调用remove方法销毁。2.2.3异常捕获处理DelayedCallHandler由于必须执行ThreadLocal的remove方法,所以该方法应该放在try-catch-finally块的finally段中,以保证不会遗漏。DelayedCallHandler通过handle()方法调用业务逻辑。调用完业务逻辑后,调用ExecutorHandler的handle()方法执行已经注册到延迟调用容器中的业务方法。***最后删除ThreadLocal对象。整个DelayedCallHandler的handle方法就是一个完整的try-catch-finally块。2.2.4标准化定义:DelayableService需要延迟业务类的调用。由于DelayedCallHandler已经模块化,业务方法***也定义为具体的方法名(doBusiness)。所有的业务处理类都实现了DelayedCallHandler接口。在doBusiness方法中使用事务调用业务逻辑。3、最终实现方案根据方案二的分析,***我们使用ThreadLocal来传递业务数据。我们通过ThreadLocalexecutorHandler传递数据。在业务逻辑MessageSender和OrderSaver中,需要延时的业务是通过executorHandler来定义的。在HandlerThreadLocal中,使用executorHandler来处理之前定义的逻辑。这样就实现了事务和非事务的分离,数据不再以方法参数的形式向下游传递,从而可以更优雅的结构化和处理数据传输。示例代码如下。业务接口调用publicstaticvoidmain(String[]args){Businessb=newBusiness();Mapmap=newHashMap();DelayedCallHandler>bu=newDelayedCallHandler>();bu.handle(b,map);}输出结果:业务开始,交易开启,保存数据操作开始,保存订单信息到数据库保存创建消息到数据库;业务结束,事务提交,保存数据操作结束,Cacheorderinfo到redis中发送消息给windqHandlerThreadLocalpublicstaticfinalThreadLocalexecutorHandler=newThreadLocal(){protectedExecutorHandlerinitialValue(){returnnewExecutorHandler();}};publicstaticfinalvoidhandler().executorHandler()get().handle();}publicstaticfinalvoidremove(){executorHandler。remove();}}DelayedCallHandlerpublicclassDelayedCallHandler{publicvoidhandle(DelayableServicebuisnes,Tparam){try{//先执行业务操作buisnes.doBusiness(param);//执行延迟执行业务HandlerThreadLocal.handle();}赶上(异常){//异常处理}finally{HandlerThreadLocal.remove();}}}DelayablelService1.publicinterfaceDelayablelService{publicvoiddoBusiness(Tparam);}业务/***完成业务,有多个数据库操作,业务逻辑在数据库外需要延迟*/publicclassBusinessimplementsDelayablelService>{@OverridepublicvoiddoBusiness(Mapmap){saveInfo(map);}publicvoidsaveInfo(Mapmap){System.out.println("业务开始交易打开保存数据操作start");newOrderSaver().saveOrderInfo();newMessageSender().saveMessageForSend();System.out.println("业务结束交易提交保存数??据操作结束");}}MessageSender/***业务发送数据的逻辑*/publicclassMessageSender{publicvoidsaveMessageForSend(){ExecutorHandlerexecutorHandler=HandlerThreadLocal.executorHandler.get();System.out.println("Savecreatemessagetodatebase.");executorHandler.add(newExecutor(){@Overridepublicvoidexecute(){System.out.println("硒ndmessagetowindq");}});}}OrderSaver/***保存服务订单的业务逻辑*/publicclassOrderSaver{publicvoidsaveOrderInfo(){System.out.println("Saveorderinfotodatebase");ExecutorHandlerexecutorHandler=HandlerThreadLocal.executorHandler.get();//这就是所谓的回调函数executorHandler.add(newExecutor(){@Overridepublicvoidexecute(){System.out.println("Cacheorderinfointoredis");}});}}4.总结使用延迟执行模型解决当一个业务逻辑中同时存在数据库事务操作和相关非事务操作时,事务失败或不存在的问题committed而不是事务操作success导致的数据不一致问题文中提到的逻辑拆分和传参问题,只有在比较复杂的场景下才会出现。这种逻辑在苏宁的售后订单业务中经常出现,所以我们对这些问题进行了分析和讨论,得出了这样的解决方案。并非所有系统和企业都需要这样做。任何解决方案都应该根据具体情况而定,避免多余。使用该模型时,使用匿名内部类和线程局部变量(ThreadLocal)。使用时,有一定的注意事项。ThreadLocal在使用后要通过其remove()方法移除,所以使用时需要注意。作者:王海勇,苏宁科技集团苏宁云软件公司售后研发中心技术经理。从事Java开发多年,擅长业务抽象和业务架构设计。2016年9月加入苏宁,参与了售后服务域订单平台、时效平台等系统平台的研发。在苏宁庞大的业务量场景下,保障系统稳定、安全、高效地提供服务。【原创稿件,合作网站转载请注明原作者和出处为.com】