SpringBatch是一个轻量级的完备的批处理框架。作为Spring体系的一员,具有灵活、方便、生产可用等特点。非常方便应对高效处理大量信息、定时处理大量数据等场景。结合调度框架可以更好的发挥SpringBatch的作用1.SpringBatch的概念知识1.1、分层架构SpringBatch的分层架构图如下:可以看到分为三层,分别是:应用层:包含所有任务批处理作业和开发者自定义代码,主要是根据项目需要开发的业务流程。BatchCore核心层:包含用于启动和管理任务的运行时环境类,如JobLauncher等BatchInfrastructure基础层:上面两层建立在基础层之上,包括基本的读入读取器和写出写入器,重试框架等1.2。关键概念理解下图涉及的概念非常重要,否则后续的开发和问题分析会很困难。1.2.1.JobRepository专门负责和数据库打交道,记录了整个批处理过程的添加、更新和执行。所以SpringBatch需要依赖数据库来管理。1.2.2.任务启动器JobLauncher负责启动任务Job。1.2.3.TaskJobJob是封装整个批处理过程的单元。运行一个批处理任务就是运行一个Job定义的内容。11上图介绍了Job的一些相关概念:Job:封装处理实体,定义流程逻辑。JobInstance:Job的运行实例,不同的实例有不同的参数,所以定义一个Job后,可以通过不同的参数运行多次。JobParameters:与JobInstance关联的参数。JobExecution:表示Job的一次实际执行,可能成功也可能失败。因此,开发者要做的就是定义Job。1.2.4.StepStepStep是对Job的某个进程的封装。一个作业可以包含一个或多个步骤。stepbystep按照特定的逻辑执行,代表Job执行完成。通过定义Step来组装Job,可以更灵活的实现复杂的业务逻辑。1.2.5.Input-Processing-Output所以,定义一个Job的关键是定义一个或多个Step,然后将它们组装起来。Step的定义有多种方式,但常用的一种模型是输入-处理-输出,即ItemReader、ItemProcessor和ItemWriter。例如,通过ItemReader从文件中输入数据,然后通过ItemProcessor进行业务处理和数据转换,最后通过ItemWriter写入数据库。SpringBatch为我们提供了许多开箱即用的Reader和Writers,非常方便。2.代码示例了解了基本概念后,我们直接通过代码来感受一下。整个项目的功能是从多个csv文件中读取数据,处理后输出到一个csv文件中。2.1.为基础框架添加依赖:/groupId>h2runtime需要添加SpringBatch依赖,使用H2作为内存数据库更方便。实际生产中必须使用外部数据库,如Oracle、PostgreSQL。主入口类:@SpringBootApplication@EnableBatchProcessingpublicclassPkslowBatchJobMain{publicstaticvoidmain(String[]args){SpringApplication.run(PkslowBatchJobMain.class,args);}}也很简单,在Springboot的基础上加上注解@EnableBatchProcessing即可。领域实体类Employee:包com.pkslow.batch.entity;公共类员工{字符串id;字符串名字;StringlastName;}对应的csv文件内容如下:id,firstName,lastName1,Lokesh,Gupta2,Amit,Mishra3,Pankaj,Kumar4,David,Miller2.2,输入-处理-输出2.2.1,读取ItemReader因为有是多个输入文件,所以定义如下:@Value("input/inputData*.csv")privateResource[]inputResources;@BeanpublicMultiResourceItemReadermultiResourceItemReader(){MultiResourceItemReaderresourceItemReader=newMultiResourceItemReader();resourceItemReader.setResources(inputResources);resourceItemReader.setDelegate(reader());returnresourceItemReader;}@BeanpublicFlatFileItemReaderreader(){FlatFileItemReaderreader=newFlatFileItemReader();//跳过csv文件的第一行,就是headerreader.setLinesToSkip(1);reader.setLineMapper(newDefaultLineMapper(){{setLineTokenizer(newDelimitedLineTokenizer(){{//字段名称setNames(newString[]{"id","firstName","lastName"});}});setFieldSetMapper(newBeanWrapperFieldSetMapper(){{//转换后的目标类setTargetType(Employee.class);}});}});returnreader;}这里使用了FlatFileItemReader,方便我们从文件中读取数据2.2.2.ProcessingItemProcessor为了简单演示,处理很简单,就是将最后一列转为大写:));返回员工;};}2.2.3、outputItremWriter比较简单,代码和注释如下:>writer=newFlatFileItemWriter<>();writer.setResource(outputResource);//是否为append模式writer.setAppendAllowed(true);writer.setLineAggregator(newDelimitedLineAggregator(){{//设置分隔符setDelimiter(",");setFieldExtractor(newBeanWrapperFieldExtractor(){{//设置字段setNames(newString[]{"id","名字","姓氏"});}});}});returnwriter;}2.3、Step有Reader-Processor-Writer,你可以定义步骤:@BeanpublicStepcsvStep(){returnstepBuilderFactory.get("csvStep").chunk(5).reader(multiResourceItemReader()).processor(itemProcessor())。writer(writer()).build();}这里有个chunk的设置,值为5,意思是5条记录后才提交输出,可以根据自己的需要定义2.4。Job已经完成Step的编码,定义Job很简单:@BeanpublicJobpkslowCsvJob(){returnjobBuilderFactory.get("pkslowCsvJob").incrementer(newRunIdIncrementer()).start(csvStep()).build();}2.5。上述代码完成后,执行程序,结果如下:成功读取数据,并将最后一个字段转换为大写,并输出到outputData.csv文件中。3、监听Listener可以通过Listener接口监听特定的事件,实现更多的业务功能。例如处理失败,会记录失败日志;如果处理完成,会通知下游获取数据等。我们分别监听Read、Process、Write事件,对应分别实现ItemReadListener接口、ItemProcessListener接口、ItemWriteListener接口。因为代码比较简单,只是打印日志,这里只贴出ItemWriteListener的实现代码:@OverridepublicvoidbeforeWrite(Listlist){logger.info("beforeWrite:"+list);}@OverridepublicvoidafterWrite(Listlist){logger.info("afterWrite:"+list);}@OverridepublicvoidonWriteError(Exceptione,Listlist){logger.info("onWriteError:"+list);}}将实现的listener监听器集成到Step中:@BeanpublicStepcsvStep(){returnstepBuilderFactory.get("csvStep").chunk(5).reader(multiResourceItemReader()).listener(newPkslowReadListener()).processor(itemProcessor()).listener(newPkslowProcessListener())。作家(作家()).听众(newPkslowWriteListener()).build();}执行后看日志:这里可以清楚的看到之前设置的chunk的效果。Writer每次处理5条记录。如果一次输出一个,就会对IO造成压力。