在大型企业中,由于业务复杂、数据量大、数据格式不同、数据交互格式复杂,并不是所有的操作都可以通过交互界面来处理。并且有一些操作需要周期性的读取大量的数据,然后进行一系列的后续处理。这样的过程就是“批处理”。批量应用通常具有以下特点:数据量大,从几万到几百万甚至上亿不等;整个过程完全自动化,并预留一定接口用于自定义配置;此类应用程序通常定期运行,例如每天、每周和每月运行一次;对数据处理的准确性要求高,需要容错机制、回滚机制、完善的日志监控。什么是Spring批处理?SpringBatch是一个轻量级的综合批处理框架,专为大型企业设计,帮助开发健壮的批处理应用程序。SpringBatch为处理大量数据提供了许多必要的可重用功能,例如日志跟踪、事务管理、作业执行统计、作业重启和资源管理。同时,它还为高性能批处理任务提供了优化和分片技术。其核心功能包括:事务管理Block-basedprocessing声明式输入/输出操作Start,terminate,restartTaskretry/skiptasks基于Web的管理员界面在日常工作中开发过一些批处理应用,对SpringBatch的使用有丰富的经验。最近,笔者特意总结了这些经验。使用SpringBatch3.0和SpringBoot时,建议使用SpringBatch时使用最新的SpringBatch3.0版本。与SpringBatch2.2相比,做了以下改进:支持JSR-352标准支持Spring4和Java8增强SpringBatchIntegration功能支持JobScope支持SQLite支持Spring4和Java8是一个重大改进。这样就可以使用Spring4引入的Springboot组件,在开发效率上有了质的飞跃。引入Spring-batch框架,只需要在build.gradle中加入一行代码:compile("org.springframework.boot:spring-boot-starter-batch")在增强了SpringBatchIntegration的功能之后,我们可以方便地与Spring家族的其他组件集成,也可以通过多种方式调用作业,还支持远程分区操作和远程块处理。支持JobScope后,我们可以随时将当前Job实例的上下文信息注入到对象中。只要我们指定Bean的作用域为job的作用域,jobParameters、jobExecutionContext等信息就可以随时使用。@Component@JobScopepublicclassCustomClass{@Value("#{jobParameters[jobDate]}")privateStringjobDate;@Value("#{jobExecutionContext['input.name']}.")privateStringfileName;}使用JavaConfig而不是xml配置方式,我们以前在配置job和step时都是使用xml配置方式,但是作为时间过去了,我们发现了很多问题。xml文件数量迅速膨胀,配置块长而复杂,可读性差;xml文件缺乏语法检查,一些低级错误只有在运行集成测试时才能发现;IDE对xml文件中代码跳转的支持不够;我们逐渐发现使用纯Java类的配置方式更灵活,类型安全,IDE支持更好。构建作业或步骤时使用的流式语法比XML更简洁易懂。@BeanpublicStepstep(){returnstepBuilders.get("step").chunk(1).reader(reader()).processor(processor()).writer(writer()).listener(logProcessListener()).faultTolerant().skipLimit(10).skip(UnknownGenderException.class).listener(logSkipListener()).build();}在这个例子中可以清楚的看到step的配置,比如reader/processor/writer组件,以及配置了哪些监听器等。本地集成测试使用的内存数据库Springbatch在运行时需要数据库支持,因为需要在数据库中建立一套schema来存储job和step运行的统计信息.在本地集成测试中,我们可以使用Springbatch提供的内存Repository来存储Springbatch的任务执行信息,这样既避免了在本地配置数据库,又加快了job的执行速度。首先在Job配置类中添加一个扩展类:DefaultBatchConfigurer。publicclassCustomJobConfigurationextendsDefaultBatchConfigurer{...}我们在build.gradle中添加对hsqldb的依赖:runtime('org.hsqldb:hsqldb:2.3.2')然后在测试类中添加DataSource的配置。@EnableAutoConfiguration@EnableBatchProcessing@DataJpaTest@Import({DataSourceAutoConfiguration.class,BatchAutoConfiguration.class})publicclassTestConfiguration{}并在applicationton.properties配置中添加初始化Database的配置:spring.batch.initializer.enable=true合理使用Chunk机制的实现Springbatch在配置Step的时候使用了Chunk-based的机制。即每次读取一条数据,然后处理一条数据,积累到一定数量后,交给writer进行写操作。这样可以最大限度地提高写入效率,而且整个事务也是基于Chunk的。当我们需要对文件、数据库等进行数据写入操作时,可以适当设置Chunk的值,以最大限度地提高写入效率。但是,在某些场景下,我们的写操作实际上是调用一个web服务或者发送一个消息到一个消息队列中。在这些场景下,我们需要将Chunk的值设置为1,这样可以及时处理write,并且整个Chunk发生异常后,不会重复调用服务或者消息重试期间重复发送。使用Listener监听job的执行情况,并及时做相应的处理。Springbatch提供了大量的Listener来全面监控job的每一个执行环节。在job层面,Springbatch提供了JobExecutionListener接口,支持在Job开始或结束时进行一些额外的处理。在step层面,Springbatch提供了StepExecutionListener、ChunkListener、ItemReadListener、ItemProcessListener、ItemWriteListener、SkipListener等接口,也提供了RetryListener和SkipListener用于Retry和Skip操作。通常,我们会为每个作业实现一个JobExecutionListener。在afterJob操作中,我们输出了job的执行信息,包括执行时间、job参数、exitcode、执行的step,以及每个step的详细信息。这样无论是开发、测试还是运维人员,都对整个工作的执行情况了如指掌。如果某个步骤发生了跳过操作,我们也会为其实现一个SkipListener,将跳过的数据记录在其中,以供下一步处理。Listener的实现方式有两种,一种是继承相应的接口,比如继承JobExecutionListener接口,另一种是使用注解。经过实践,我们认为还是使用注解比较好,因为需要实现接口的所有方法才能使用接口,使用注解时只需要在相应的方法上加上注解即可。下面的类采用继承接口的方式。我们看到我们只用了第一种方法,没有用到第二种和第三种方法。但是我们必须提供一个空的实现。publicclassCustomSkipListenerimplementsSkipListener{@OverridepublicvoidonSkipInRead(Throwablet){//业务逻辑}@OverridepublicvoidonSkipInWrite(Stringitem,Throwablet){//不需要}@OverridepublicvoidonSkipInProcess(Stringitem,Throwablet))){//不需要}}使用annoation的方式可以简写为:publicclassCustomSkipListener{@OnSkipInReadpublicvoidonSkipInRead(Throwablet){//业务逻辑}}使用Retry和Skip增强批处理的健壮性在处理百万级数据的过程中难免会出现异常。如果出现异常,整个批处理作业终止,则无法处理后续数据。SpringBatch内置了Retry(重试)和Skip(跳过)机制,帮助我们轻松处理各种异常。我们需要将异常分为三类。第一类是需要重试的异常。它们的特点是异常可能会随着时间的推移而消失,比如数据库当前有锁不能写入,web服务当前不可用,web服务满载等。所以适合为他们配置Retry机制。第二种是需要Skip的异常,比如解析文件中某条数据异常,因为每次对这些异常执行Retry的结果都是一样的,但是又不想停止处理后续数据由于某个数据的错误。处理。第三种异常是要求整个Job立即失败的异常。比如发生OutOfMemory异常,需要立即终止整个Job。一般来说,需要Retry的异常也应该配置Skip选项,以保证后续的数据可以继续处理。我们还可以配置SkipLimit选项,保证当Skip数据项达到一定数量时,整个Job及时终止。有时候我们需要在每次Retry之间做一些操作,比如延长Retry时间,恢复操作现场等,SpringBatch提供了BackOffPolicy来达到目的。下面是配置Retry机制、Skip机制和BackOffPolicy的步骤示例。@BeanpublicStepstep(){returnstepBuilders.get("step").chunk(1).reader(reader()).processor(processor()).writer(writer()).listener(logProcessListener()).faultTolerant().skipLimit(10).skip(UnknownGenderException.class).skip(ServiceUnavailableException.class).retryLimit(5).retry(ServiceUnavailableException.class).backOffPolicy(backoffPolicy).listener(logSkipListener()).build();}使用自定义决策程序来实现作业流程。作业执行不一定是顺序的。我们经常需要根据作业的输出数据或执行结果来确定下一步。以前我们会在下游的步骤中放置一些判断,这可能会导致一些步骤实际运行起来,但实际上并没有做任何事情。例如,在一个步骤执行过程中,失败的数据条目将记录在报告中,下一步将判断是否已生成报告。如果生成报告,报告将发送给指定的联系人。如果没有,什么也不会做。.在这种情况下,Job的执行过程可以通过Decider机制来实现。在Springbatch3.0中,Decider已经从Step中分离出来,与Step处于同一级别。公共类ReportDecider实现JobExecutionDecider{@OverridepublicFlowExecutionStatusdecide(JobExecutionjobExecution,StepExecutionstepExecution){if(report.isExist()){returnnewFlowExecutionStatus("SEND");}returnnewFlowExecutionStatus("SEND"}SKwhile)Decider可以在作业配置中以这种方式使用。这样整个Job的执行过程会更加清晰易懂。publicJobjob(){returnnewJobBuilder("petstore").start(orderProcess()).next(reportDecider).on("SEND").to(sendReportStep).on("SKIP").end().build().build();}多种机制被用来加速Jobs的执行。批处理作业处理的数据量很大,一般要求执行窗口比较小。因此,需要通过各种方式来加快Job的执行速度。一般我们有四种实现方式:任务单步多线程执行不同Step并行执行同一个Step并行执行远程执行Chunk任务单步多线程执行任务可以借助help来实现任务执行器。这种情况适用于读写器线程安全无状态的场景。我们还可以设置线程数。publicStepstep(){returnstepBuilders.get("step").tasklet(tasklet).throttleLimit(20).build();}上面例子中的tasklet需要实现TaskExecutor,SpringBatch提供了一个简单的multi-threadedTaskExecutor供我们使用:SimpleAsyncTaskExecutor。不同Step的并行执行在Springbatch中很容易实现。这是一个例子:publicJobjob(){returnstepBuilders.get("parallelSteps").start(step1).split(asyncTaskExecutor).add(flow1,flow2).next(step3).build();}在这个example我们先执行step1,然后并行执行flow1和flow2,最后执行step3。Springbatch提供了PartitionStep来实现多个进程中同一个step的并行处理。通过PartitonStep和PartitionHandler,可以将一个step扩展到多个Slave,实现并行运行。Chunk任务的远程执行是将一个Step的处理器操作分成多个进程,多个进程之间通过一些中间件进行通信(比如通过消息的方式)。这种方式适用于Processer是瓶颈,Reader和Writer不是瓶颈的场景。结论SpringBatch合理抽象了批处理场景,封装了大量实用功能。用它来开发批处理应用程序可以达到事半功倍的效果。在使用过程中,我们仍然需要坚持总结一些最佳实践,才能交付出高质量、可维护的批处理应用,满足企业级应用的严苛要求。
