业务背景跑一个batch,通常是指我们的应用程序对某一批数据进行的具体处理。在金融业务中,一般跑批的场景有账日结算、账款拨备、欠款抵扣、不良资产处理等,我举个具体的例子。客户向我司借款,并约定每月10号还款。客户自主授权银行卡签合同号后(一般是凌晨)我们会从客户签的银行卡上扣钱,然后可能有1个客户,2个客户,3个客户,4个客户,或者很多需要扣钱的客户,所以这个“批”数据,我们要进行统一的扣款处理,也就是说我们“跑批”就是跑批任务就是每隔一定时间去处理这些数据,并且不能因为其中一条数据出现异常而导致整批数据丢失,继续运行,所以一定是健壮的;而且对于异常数据,我们可以进行后续的补偿处理,所以一定是可靠的;而且通常运行批任务需要处理的数据量很大,我们不能让它处理太长,所以我们必须考虑它的性能处理;综上所述,运行批应用需要满足的需求如下鲁棒性针对的是异常数据,不会导致程序崩溃数据量针对的是数据量大,可以在规定的时间内处理。在性能方面,必须在规定时间内处理,以免干扰其他任何应用程序的正常运行。跑批的风险一些没有接触过跑批业务的同学,可能会犯一些错误。查询运行批量数据,无需分片处理。具体情况有两种。一是同学们不自觉的进行分片处理,直接根据查询条件查出全量数据;第二种情况嘛,不仅跑批处理的时候可能会出现,在平时的业务开发过程中也可能会发现查询条件没有判断为空。例如,selectidfromt_user_accountweheraccount_id="12";但是在业务处理的过程中,account_id为空,直接查询。一旦数据量上来,很容易导致OOM悲剧。不批量处理数据的情况对于学生来说也是一样的。一个容易犯的错误,通常我们跑batch的时候可能会涉及到数据准备的过程,那么有些同学会直接stud,一边循环跑batch数据一边寻找需要的数据,一方面进行嵌套循环处理,时间复杂度通常随着for的数量而增加。项目中,一位同学在保费扣缴的批量任务中进行了5次for循环。时间复杂度为O(n^5)。而如果你的方法不进行事务管理,那么释放数据库连接也是一件很耗资源的事情。一段伪代码可能更好理解//要调用数据库查询,需要运行批处理数据ListbizApplyDoList=this.listGetBizApply(businessDate);//for循环处理数据for(BizApplyDoba:bizApplyDoList){//业务处理逻辑..省略//查询账户数据ListbizAccountDoList=this.listGetBizAccount(ba.getbizApplyId());for(BizAccountDobic:bizAccountDoList){//账户处理逻辑..省略}...//for循环后面会嵌套}事务使用的粒度不合适。我们知道Spring中的事务可以分为程序式事务和声明式事务。两者的具体区别我们就不展开了。开发过程中,有可能同学们根本不关心是怎么回事,自己觉得不错,直接用@Transcational覆盖了我们整个方法。一旦方法处理时间过长,这个大事件就会给我们的代码埋下了雷区,没有考虑下游接口的能力。除了我们自己系统中的处理,我们在运行批处理任务的时候可能还需要调用外部接口。比如预扣的时候,我们需要调用支付公司这边的接口,那么有没有考虑到下游接口的容量和响应时间(这里有坑,下篇再说)和不同的跑分批处理任务时间设置不合理。在我们的项目中,有一种业务方式,我们只能在扣除保费后扣留本金和利息。小张理所当然地认为,我的预扣保费预定任务是从凌晨12点开始的。每小时定时任务应该结束了,所以我的代扣本息的定时任务会在凌晨1点开始,但是这样设置真的合适吗?优化思路时序框架常用选择Spring时序框架、Quartz、elastic-job、xxl-job等,框架没有好坏之分,适合自己业务的才是最好的。你可以为你的业务选择技术,我们经常使用的是xxl-job。针对我们上面提到的不同批量运行任务的时间设置不合理,我们可以利用xxljob的子任务特性进行嵌套任务处理。保费代扣任务完成后,再进行代扣本息任务,防止OOM。请记住,实际上与分片处理无关。在开发批量运行任务时,一定要记住,分片处理是一次性将所有数据加载到内存中。那么,如何优雅地分片呢?这时,小张同学举起了手:我来做sharding。比如这种推导是基于时间维度的。我直接select*fromt_repay_planwhererepay_time<="2022-04-10"limit0,1000好了,现在我们来创建一个数据,看看这个deepsharding的性能如何。我已经向数据库中插入了大约两百万条数据,我将创建数据的过程与您分享。//1.创建表CREATETABLE`t_repay_plan`(`id`int(11)NOTNULLAUTO_INCREMENT,`repay_time`datetimeDEFAULTNULLCOMMENT'还款时间',`str1`int(11)DEFAULTNULL,PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=3099998DEFAULTCHARSET=utf8mb4//2.创建存储过程分隔符$$createprocedureinsert_repayPlan()begindeclarenintdefault1;whilen<3000000doinsertintot_repay_plan(repay_time,str1)values(concat(CONCAT(FLOOR(2015+(RAND()*1)),'-',LPAD(FLOOR(10+(RAND()*2)),2,0),'-',LPAD(FLOOR(1+(RAND()*25)),2,0))),n);setn=n+1;endwhile;end//3.执行存储过程调用insert_repayPlan();shift,数据耗时逐渐增加因为这种深度分页是查询所有的数据然后丢弃,效果自然不是那么理想。其实我们还有一种分片方式,就是利用我们的id进行分页处理(当然你的id需要保证自增长,并结合具体的业务场景进行分析)我们也试试耗时的情况使用id进行碎片化。我们可以看到效果很明显。用id分片比我们的好。还款时间字段当然是在批量运行过程中使用覆盖索引,尽量不要选择*等,批量插入。sql的常用点就不一一讲解了,针对需要的数据说明map的结构。让我们写一个简单的。反例//调用数据库查询,需要运行批量数据.getBizAccount(ba.getbizApplyId());//账户处理逻辑..省略//查询扣款数据CustDocustDo=this.getCust(ba.getUserId);//推导处理逻辑..省略}我们可以这样改造(伪代码,忽略空判断处理)//调用数据库查询需要运行批量数据ListbizApplyDoList=this.listGetBizApply(businessDate);//构建业务申请号集合ListbizApplyIdList=bizApplyDoList.parallelStream()。map(BizApplyDo::getbizApplyId()).collect(Collectors.toList());//批量查询账号ListbizAccountDoList=this.listGetBizAccount(bizApplyIdList);//建账号MapMap;accountMap=bizAccountDoList.parallelStream().collect(Collectors.toMap(BizAccountDo::getBizApplyId(),Function.identity()))//扣款数据也被处理for(BizApplyDoba:bizApplyDoList){account=accountMap.get(ba.getbizApplyId())//账户处理逻辑..省略}尽量减少for循环的嵌套,减少频繁的数据库连接,破坏事务控制。一旦我们使用@Transcation来管理事务,我们需要开发人员在开发过程中需要擦亮眼睛注意事务的控制范围,因为@Trancation在第一个sql方法执行时就启动了事务,并且会在方法完成之前不要提交它。有的同学接手改造了这个Method,没注意到这个方法被@Trancation覆盖了,然后在这个方法上添加一些RPC远程调用,消息发送,文件写入,缓存更新等操作1.这些操作本身是不能roll的back,所以会导致数据不一致。可能RPC调用成功,但是本地事务回滚了,但是PRC调用无法回滚。2、交易中如果有远程调用,会拉长整个交易。这么长的时间会导致这个事务的数据库连接一直被占用。如果类似操作过多,会导致数据库连接池耗尽或单个连接超时。以前看过一个方法。事务导致数据库连接被强行破坏的悲剧,所以我们可以有选择地使用程序化事务来处理我们的业务逻辑,这样接手的同学就可以清楚的看到事务是什么时候打开的,什么时候提交的,并尝试我们最好能把我们的事务粒度范围缩小到下游接口hold。在分享这次优化之前,先简单介绍一下我们的业务背景。在保费扣缴运行的批量任务中,我们会使用进程编排的框架来发起我们的异步扣缴,可以理解为一个扣缴申请是一个异步线程,扣缴的数据全部在进程编排中传递。当我们做完优化,准备在UAT环境下优化测试的时候,发现只有20wpremiumdata的处理时间,非常不理想。监控系统环境,发现系统频繁进行GC。我的第一反应是没有内存泄漏。在准备dump文件的时候无意中发现,大部分的应用都卡在了扣费的节点上。观察日志发现下游接口给的响应时间过长,甚至有的已经超时了。那么这个GC是合理的,因为我们的预扣申请产生的速度非常快,而且是异步线程调度。线程还没死,一直在尝试向外界扣费,导致所有的数据都堆在内存中,导致跟下游接口查看后频繁GC。优化界面没有限流处理(太可惜了)的思路也很简单。在业务可接受的情况下,我们采取发送mq请求后暂停进程安排的方式(线程会死掉),然后让消费者处理调用成功再唤醒进程进行后续处理。当然也可以使用固定的线程池直接对外调用。目的是防止过多的线程处于RUNNING,导致内存堆积。处理外部调用的一种方法是在业务可以接受的情况下采用快速成功的方法。比如扣保费的时候,我们直接扣掉我们的扣费,然后再调用支付公司的接口。状态变成正在扣款中,然后我们的业务就直接挂掉了,然后我们使用定时任务来验证我们的扣款结果。收到扣款结果后,全力支持我扣款后继续使用作业机。对于生产机器,我们通常不会部署在单机上,那么如何才能尽可能的压榨我们服务器的资源呢?即利用xxl-job的分片广播和动态分片功能。部署执行器集群时,任务路由策略选择在“分片广播”的情况下,一次任务调度会广播触发对应集群中的所有executor执行一个任务,系统会自动传递分片参数;可根据分片参数制定分片任务;“shardbroadcasting”在executor维度上是分片的,支持动态扩展executorcluster,动态增加分片数量,协调业务处理;在执行大规模业务操作时,可以显着提高任务处理能力和速度。“Shardbroadcast”与普通任务的开发流程一致,除了Fragmentation参数,获取分片参数进行分片业务处理。//可以参考Sample示例执行器中的示例任务“ShardingJobHandler”了解试用。intshardIndex=XxlJobHelper.getShardIndex();intshardTotal=XxlJobHelper.getShardTotal();机器可以根据商户的数量进行建模,然后每台机器只执行一些特定商户的数据这里,对xxl-job不熟悉的同学可以去官网逛一逛:https://www.xuxueli.com/xxl-job/那么这里给大家提个问题:如果出现数据倾斜,你会怎么处理,就是某个商户的数据量特别大,导致执行的任务这台机器很重。如果是你,你会怎么做?如何处理这种情况?总结一下今天大量数据的跑批,项目中的实践思考到此结束。文章介绍了我们常见的批量运行任务中可能出现的风险以及一些常见的优化思路分享给大家。我在文章中没有提到线程池和缓存的使用。这两点对于我们高效的批量运行也很重要。帮助很大,朋友可以用一下。当然,文章只是引起大家对批量运行任务的思考。更多的优化需要结合任务的具体情况和项目本身的环境进行处理。深圳,48小时核酸常规,在杭州,48小时核酸常规,但在深圳,核酸排队的队伍中,只能看到孜孜不倦的劳动者,而在杭州,也能看到恩爱夫妻。刚来杭州,想问问大家,是的,不是没有拥抱就不能做核酸