业务后台跑批通常指的是我们的应用对某一批数据的具体处理。在金融业务中,一般会有不同的场景跑分批账户日结算、账户拨备、欠款抵扣、不良资产处理等,我举个具体的例子。客户向我司借款,同意每月10号还款。客户自主授权银行卡签约后每月10号(一般是凌晨),我们会从客户签字的银行卡上扣款,然后可能会有一个客户,两个客户,三个客户,四个客户,很多客户需要扣钱,所以这个“batch”里的所有数据都要统一扣掉,也就是说我们“跑批”跑批任务来处理这些数据有规律,不能因为其中一条数据异常导致整批。数据不能继续操作,所以一定要健壮;并且后期我们可以对异常数据进行补偿处理,所以一定是可靠的;而且通常运行批任务需要处理的数据量很大,我们不能让它处理的时间太长,所以我们要考虑它的性能处理;综上所述,我们的批处理应用程序的要求如下鲁棒性针对异常数据,不可能导致程序崩溃可以跟踪大量数据。对于大数据量,可以在规定的时间内处理。在性能方面,必须在规定时间内处理,以免干扰其他任何应用程序的正常运行。运行批处理的风险。一些没有接触过批量跑业务的同学可能会犯一些错误。《不分片处理批量数据查询运行》这种情况有两种具体情况。一是同学们不自觉的进行分片处理,直接根据查询条件查出全量数据;第二种情况不仅是运行批处理时可能出现的情况,在平时的业务开发过程中也可能会发现,查询条件没有判断为空。例如,selectidfromt_user_accountweheraccount_id="12";但是在业务处理的过程中,account_id为空,直接查询。一旦数据量上来,很容易导致“数据没有被批量处理”的OOM悲剧。这也是学生容易犯的一个错误。通常我们跑batch的时候,可能会涉及到准备数据的过程,所以有的同学会直接stud,把batch数据循环跑一遍,找到需要的数据。一方面,对于嵌套循环的处理,时间复杂度通常会随着你的for次数的增加而增加。项目中,一位同学在保费扣缴的批量任务中进行了5次for循环。时间复杂度是O(n^5)的,而且如果你的方法不进行事务管理,数据库连接释放也是一件很耗资源的事情。一段伪代码可能更好理解//要调用数据库查询,需要运行批处理数据ListbizApplyDoList=this.listGetBizApply(业务日期);//for循环处理数据for(BizApplyDoba:bizApplyDoList){//业务处理逻辑..省略//查询账户数据ListbizAccountDoList=this.listGetBizAccount(ba.getbizApplyId());for(BizAccountDobic:bizAccountDoList){//账户处理逻辑..省略}...//for循环后面会嵌套}"交易的强度不合适。"我们知道Spring中的事务可以分为程序化事务和声明式事务,两者的具体区别我们就不多说了。在开发过程中,同学们有可能只是@Transcational就覆盖了我们整个方法。一旦方法的处理时间过长,这个大事务就给我们的代码埋下了伏笔。“没有考虑下游接口的容量。”除了自身系统中的处理外,我们在运行批处理任务时,可能还需要调用预扣等外部接口。我们需要调用payment公司端的接口,有没有考虑下游接口的承载能力和响应时间(这里有个坑,下篇再说)》不同的跑批任务时间设置不合理。”在我们的项目中,有一种业务方式,我们只能在扣除保费后扣留本金和利息。小张理所当然的认为我的预扣保费定时任务是从Let'sstart开始的,一小时的定时任务应该结束了,所以我的预扣本息定时任务会在凌晨1:00开始,但是这是不是设定真的合适吗?优化思路时序框架常用选择Spring时序框架、Quartz、elastic-job、xxl-job等,框架没有好坏之分,适合自己业务的才是最好的。你可以为你的业务选择技术,我们经常使用的xxl-job的技术就是xxl-job。针对我们上面提到的不同批量运行任务的时间设置不合理,我们可以利用xxljob的子任务特性进行嵌套任务处理。保费扣缴任务完成后,再进行保费扣缴任务,防止OOM。请记住,在分片过程中没有什么可扩展的。在开发批量运行任务时,一定要记住分片过程是一次性将所有数据加载到内存中。无疑是自掘坟墓那么,如何优雅地碎片化呢?这时,小张同学举起了手:我来做sharding。比如这种推导是基于时间维度的。我直接select*fromt_repay_planwhererepay_time<="2022-04-10"limit0,1000好了,现在我们找一些数据看看这个deepsharding的性能如何。我已经向数据库中插入了大约两百万条数据,我将制作数据的过程与您分享。//1.创建表CREATETABLE`t_repay_plan`(`id`int(11)NOTNULLAUTO_INCREMENT,`repay_time`datetimeDEFAULTNULLCOMMENT'还款时间',`str1`int(11)DEFAULTNULL,PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=3099998DEFAULTCHARSET=utf8mb4//2.创建存储过程分隔符$$createprocedureinsert_repayPlan()begindeclarenintdefault1;whilen<3000000doinsertintot_repay_plan(repay_time,str1)valuees(concat(CONCAT(FLOOR(2015+(RAND()*1)),'-',LPAD(FLOOR(10+(RAND()*2)),2,0),'-',LPAD(FLOOR(1+(RAND()*25)),2,0))),n);setn=n+1;endwhile;end//3.执行存储过程调用insert_repayPlan();随着数据Offset的逐渐增加,数据耗时逐渐增加因为这种深度分页是查询所有的数据然后丢弃,效果自然不是那么理想。其实我们分片还有另外一种方式,就是利用我们的id进行分页处理(当然你的id是需要保证业务增长的,结合具体的业务场景来分析)我们也来试试如何使用id,分片的耗时情况,我们可以看到效果很明显,使用id分片的效果比我们的要好当然,还款时间字段是关于“使用覆盖索引,批量运行过程中尽量不要选择*等,批量插入”。我会针对需要的数据一一说明map的结构。写一个简单的反例//调用数据库查询,需要运行批处理数据查询账户数据BizAccountDobizAccountDo=this.getBizAccount(ba.getbizApplyId());//账户处理逻辑..省略//查询借方数据CustDocustDo=this.getCust(ba.getUserId);//debiter处理逻辑..省略}我们可以这样改造(伪代码,忽略空判断处理)//调用数据库查询需要运行批量数据ListbizApplyDoList=this.listGetBizApply(businessDate);//构建业务申请号集合ListbizApplyIdList=bizApplyDoList.parallelStream().map(BizApplyDo::getbizApplyId()).collect(Collectors.toList());//批量账户查询ListbizAccountDoList=this.listGetBizAccount(bizApplyIdList);//建立账户MapMapaccountMap=bizAccountDoList.parallelStream().collect(Collectors.toMap(BizAccountDo::getBizApplyId(),Function.identity()))//扣除数据也被处理for(BizApplyDoba:bizApplyDoList){account=accountMap.get(ba.getbizApplyId())//账户处理逻辑..省略}尽量减少for循环的嵌套,减少频繁的数据库连接,破坏长期的事务控制点心。一旦我们使用@Transcation来管理事务,那么就需要组内开发人员在开发过程中需要擦亮眼睛注意事务控制的范围,因为@Trancation在第一个sql方法执行时就启动了事务,并且不会在方法结束之前提交它。有的同学在接管改造的时候使用这个方法的时候,没有注意到这个方法被@Trancation覆盖了,于是在这个方法中添加了一些RPC远程调用、消息发送、文件写入、缓存更新等操作。1、这些操作本身是不能回滚的。这样会导致数据不一致。可能RPC调用成功,但是本地事务回滚了,但是PRC调用无法回滚。2、交易中如果有远程调用,会拉长整个交易。这么长的时间会导致这个事务的数据库连接一直被占用。如果类似操作过多,会导致数据库连接池耗尽或单个连接超时。以前看过一个方法。事务导致数据库连接被强行破坏的悲剧,所以“我们可以有选择地使用程序化事务来处理”我们的业务逻辑,让接手的同学可以清楚的看到事务是什么时候开启的,什么时候提交的。也尽量把我们的事务粒度范围缩小到下游接口hold。在分享这次优化之前,先简单介绍一下我们的业务背景。在保费扣缴的运行批处理任务中,我们会使用流程编排的框架来异步发起我们的扣缴,可以理解为一个扣缴申请是一个异步线程,所有的扣缴数据都是在流程编排中传递的。当我们完成优化,准备在UAT环境下进行优化测试时,发现只有20万条优质数据,处理时间非常不理想。监控系统环境,发现系统频繁进行GC。我的第一反应是没有内存泄漏。准备转储文件时,我感到很惊讶。发现大部分应用都卡在了扣费的节点。观察日志发现下游接口给的响应时间过长,甚至出现一些超时,所以这次GC是合理的。由于我们的预扣申请生成速度很快,而且是异步线程调度。线程还没死,一直在尝试向外界扣费,导致所有数据都在内存中堆积,导致与下游接口验证后频繁GC,确实,优化界面没有限流处理(太糟糕了)的想法也很简单。在业务可接受的情况下,我们采用的是发送mq请求后暂停进程安排(线程会Death),然后让消费者处理调用,调用成功后唤醒进程进行后续处理。当然也可以使用固定的线程池直接对外调用。目的是为了防止过多的线程处于RUNNING,导致内存堆积。另外还有一个处理外部调用的方法,就是在业务可以接受的情况下,采用快速成功的方式。比如扣保费的时候,我们在调用支付公司的接口之前,直接通过我们把扣款状态改成扣款,然后直接暂停我们的业务,然后用定时任务来验证我们的扣款结果。收到扣款结果后,在我们扣款后继续使用操作机方面给我打电话。对于生产机器,我们通常不会部署在单机上,那么如何才能尽可能的压榨我们服务器的资源呢?即使用xxl-job的“分片广播”和“动态分片”功能【外链图片转账失败,源站可能有防盗链机制,推荐保存图片直接上传(img-rhWVcYx9-1652096017012)(https://upload-images.jianshu...)]执行器集群部署时,如果任务路由策略选择“分片广播”,任务调度will会广播触发对应集群中的所有executor执行一个task,同时系统会自动传递sharding参数;可根据分片参数制定分片任务;“shardingbroadcast”以executor为维度进行分片,支持动态扩展executor集群,动态增加分片数量,协调业务处理;在执行大规模业务操作时,可以显着提高任务处理能力和速度。“分片广播”与普通任务的开发流程一致,不同的是可以获取分片参数,获取分片参数进行分片业务处理。//可以参考Sample示例执行器中的示例任务“ShardingJobHandler”了解试用。intshardIndex=XxlJobHelper.getShardIndex();intshardTotal=XxlJobHelper.getShardTotal();机器可以根据商户的数量进行建模,然后每台机器只执行一些特定商户的数据。那么这里就问大家一个问题:如果出现数据倾斜,你会怎么处理,就是某个商户的数据量非常大,这台机器执行的任务非常繁重。如果你是你,你会如何处理这种情况?总结一下今天大量数据的跑批,项目中的实践思考到此结束。文章介绍了我们常见的批量运行任务中可能出现的风险以及一些常见的优化思路分享给大家。我在文章中没有提到线程池和缓存的使用。这两点对于我们高效的批量运行也很重要。帮助很大,朋友可以用一下。当然,文章只是引起大家对运行批任务的思考。更多的优化需要结合任务的具体情况和项目本身的环境进行处理。