当前位置: 首页 > 科技观察

多线程与Spring事务详解_0

时间:2023-03-17 15:52:20 科技观察

译者|徐蕾评论|梁策孙淑娟作为开发者,我们习惯于通过在公共方法上加上@Transactional注解来实现事务管理。大多数情况下,将事务的启动、提交或回滚交给Spring框架是非常方便的,但如果认为这都是事务管理,那就有失偏颇了。Spring确实可以负责所有事务管理的底层实现细节,而且无论你使用什么持久层框架,比如Hibernate、MyBatis,甚至JDBC都提供了统一的事务模型来保证数据访问方式的改变不会影响代码执行层面。良好的事务管理封装,一方面提高了开发效率,但同时也需要注意的是,它降低了开发者了解底层原理的积极性和意愿。问问自己,我们真的了解事务在多线程环境下运行的机制吗?比如一个事务中能否支持多线程同时写入数据?网上很多论坛都对这个问题给出了肯定的回答,但是也有很多Feedback@Transactionfailure的声音。其背后的根本原因是Spring实现了事务,并通过ThreadLocal将事务绑定到当前线程。ThreadLocal作为本地线程变量载体,保存当前线程的变量,保证所有变量都是线程安全的。这些封闭隔离的变量包括数据库连接、Session管理的对象,以及其他当前事务运行的必要信息,而这些变量和对象是新开线程无法获取的。不明白这个,事务中开启了多线程,受限于业务场景。大多数情况下不会有问题,但作为严谨的开发,其潜在的风险不容忽视。问题主要集中在两个方面:一方面导致事务失败,看似提高了处理效率,但是一旦出现异常相关数据就不会回滚,破坏了完整性这生意。另一方面会增加死锁的概率,计划外的并发处理,增加资源争用的概率。后果就是死锁,由此产生的异常会进一步损害业务的完整性,得不偿失。有没有办法提高事务处理的性能?不!虽然不能通过事务来发起多线程处理。我们可以在合理分区后开启多线程处理,通过类似分布式事务的方式达到同样的目的,达到同样的效果。假设我们要并行处理大量对象,然后将它们存储到数据库中。我们首先对这些对象进行分组,将每个块传递给不同的线程去调用带有事务的处理方法,最后在每个线程中收集汇总处理结果。这样,事务传播机制既保证了业务的完整性,又通过并行处理提高了处理效率。下面通过具体的例子来逐步演示如何实现。第一步:定义一个负责对象处理逻辑的服务接口。/***定义对象标识符处理契约的服务接口*/publicinterfaceProcessingService{/***处理由id标识的对象列表,并返回成功处理对象的标识符*列表**@paramobjectIds列表objectidentifiers**@returnidentifierslistofthesuccessfulprocessedobjects*/ListprocessObjects(ListobjectIds);}第二步:简单实现上述对象处理的接口。/***数据库相关id处理的服务实现*/@Service("ProcessingDBService")@Transactional@OverridepublicListprocessObjects(ListobjectIds){//处理并保存到数据库logger.info("Runninginthread"+Thread.currentThread().getName()+"withobjectids"+objectIds.toString());返回objectIds.stream().collect(Collectors.toList());}}第三步:也是最核心的一步,分块然后并行处理。当然,为了保持代码的整洁和隔离,我们在后续的具体实现中会采用Decorator修饰方式。/***并行块处理的服务实现*/@Service@Primary@ConditionalOnProperty(prefix="service",name="parallel",havingValue="true")publicclassProcessingServiceParallelRunDecoratorimplementsProcessingService{privateProcessingServicedelegate;publicProcessingServiceParallelRunDecorator(ProcessingServicedelegate){this.delegate=delegate;/***在真实场景中应该是外部配置*/privateintbatchSize=10;@OverridepublicListprocessObjects(ListobjectIds){List>chuncks=getBatches(objectIds,batchSize);List>processedObjectIds=chuncks.parallelStream().map(delegate::processObjects).collect(Collectors.toList());返回flatList(processedObjectIds);}privateList>getBatches(Listcollection,intbatchSize){returnIntStream.iterate(0,i->ii+batchSize).mapToObj(i->collection.subList(i,Math.min(i+batchSize,collection.size())).collect(Collectors.toList());}privateListflatList(List>listOfLists){returnlistOfLists.stream().collect(ArrayList::new,List::addAll,List::addAll);}最后我们通过简单的单元测试验证执行结果privateList>getBatches(Listcollection,intbatchSize){returnIntStream.iterate(0,i->ii+batchSize).mapToObj(i->collection.subList(i,Math.min(i+batchSize,collection.size())).collect(Collectors.toList());}privateListflatList(List>listOfLists){returnlistOfLists.stream().collect(ArrayList::new,List::addAll,List::addAll);}}通过输出日志,我们可以看到如下执行结果:10]ProcessingDBService:runninginthreadmainwithobjectids[11,12]执行结果也符合expe目标。List对象分组后,除了主线程外,通过ForkJoin启动另一个线程进行并行处理。ProcessingServiceParallelRunDecorator的parallelStream().map的并行处理提高了处理性能,而ProcessingDBService中公共方法processObjects上的@Transactional注解保证了业务的完整性,完美解决了问题。译者介绍徐磊,社区编辑,某知名电商技术副总监,专注于Java后端开发、技术管理、架构优化、分布式开发等领域。原标题:Multi-ThreadingandSpringTransactions,作者:DanielaKolarova