1前言SpringBatch是一个轻量级的完善的批处理框架。作为Spring体系的一员,具有灵活、方便、生产可用等特点。非常方便应对高效处理大量信息、定时处理大量数据等场景。结合调度框架可以更好的发挥SpringBatch的作用。2SpringBatch的概念知识2.1分层架构SpringBatch的分层架构图如下:通过实例讲解SpringBatch的介绍,可以看出这个优秀的批处理框架分为三层,分别是:Applicationlayer:包含所有Task批处理作业和开发者自定义代码,主要是根据项目需要开发的业务流程。BatchCore核心层:包含用于启动和管理任务的运行时环境类,如JobLauncher等BatchInfrastructure基础层:上面两层建立在基础层之上,包括基本的读入读取器和写出写入器,retryframework等2.2关键概念理解下图涉及的概念非常重要,否则后续开发和问题分析会很困难。通过实例讲解SpringBatch的介绍。优秀的批处理框架2.2.1JobRepository专门负责和数据库打交道,记录整个批处理的新增、更新、执行。所以SpringBatch需要依赖数据库来管理。2.2.2任务启动器JobLauncher负责启动任务Job。2.2.3TaskJobJob是封装整个批处理过程的单元。运行一个批处理任务就是运行一个Job定义的内容。通过实例讲解SpringBatch的介绍。上面这个优秀的批处理框架介绍了Job的一些相关概念:Job:封装处理实体,定义流程逻辑。JobInstance:Job的运行实例,不同的实例有不同的参数,所以定义一个Job后,可以通过不同的参数运行多次。JobParameters:与JobInstance关联的参数。JobExecution:表示Job的一次实际执行,可能成功也可能失败。因此,开发者要做的就是定义Job。2.2.4StepStepStep是对Job某个进程的封装。一个作业可以包含一个或多个步骤。stepbystep按照特定的逻辑执行,代表Job执行完成。通过实例讲解SpringBatch的介绍。一个优秀的批处理框架可以通过定义Steps组装Jobs来更灵活的实现复杂的业务逻辑。2.2.5输入-处理-输出因此,定义一个Job的关键是定义一个或多个Step,然后将它们组装起来。Step的定义有多种方式,但常用的一种模型是输入-处理-输出,即ItemReader、ItemProcessor和ItemWriter。例如,通过ItemReader从文件中输入数据,然后通过ItemProcessor进行业务处理和数据转换,最后通过ItemWriter写入数据库。SpringBatch为我们提供了许多开箱即用的Reader和Writers,非常方便。3代码示例了解了基本概念后,我们直接通过代码来体验一下。整个项目的功能是从多个csv文件中读取数据,处理后输出到一个csv文件中。3.1基础框架添加依赖:org.springframework.bootspring-boot-starter-batchcom.h2databaseh2runtime需要添加SpringBatch依赖,使用H2作为内存数据库更方便。实际生产中必须使用外部数据库,如Oracle、PostgreSQL。主要入口类:@SpringBootApplication@EnableBatchProcessingpublicclassPkslowBatchJobMain{publicstaticvoidmain(String[]args){SpringApplication.run(PkslowBatchJobMain.class,args);}}也很简单,在Springboot的基础上加上注解@EnableBatchProcessing即可。领域实体类Employee:packagecom.pkslow.batch.entity;publicclassEmployee{Stringid;StringfirstName;StringlastName;}对应的csv文件内容如下:id,firstName,lastName1,Lokesh,Gupta2,Amit,Mishra3,Pankaj,Kumar4,David,Miller3.2输入-处理-输出3.2.1读取ItemReader因为有多个输入文件,所以定义如下:@Value("input/inputData*.csv")privateResource[]inputResources;@BeanpublicMultiResourceItemReadermultiResourceItemReader(){MultiResourceItemReaderresourceItemReader=newMultiResourceItemReader();resourceItemReader.setResources(inputResources);resourceItemReader.setDelegate(reader());returnresourceItemReader;}@BeanpublicFlatFileItemReaderreader(){FlatFileItemReaderreader=newFlatFileItemReader();//跳过第一行csv文件的标题reader.setLinesToSkip(1);reader.setLineMapper(newDefaultLineMapper(){{setLineTokenizer(newDelimitedLineTokenizer(){{//fieldnamesetNames(newString[]{"id","firstN美&曲ot;,"lastName"});}});setFieldSetMapper(newBeanWrapperFieldSetMapper(){{//转换后的目标类setTargetType(Employee.class);}});}});returnreader;}这里使用FlatFileItemReader,方便我们从文件中读取数据3.2.2处理ItemProcessor简单演示,处理很简单,就是将最后一列转为大写:());returnemployee;};}3.2.3OutputItremWriter比较简单,代码和注释如下:newFlatFileItemWriter<>();writer.setResource(outputResource);//是否为append模式writer.setAppendAllowed(true);writer.setLineAggregator(newDelimitedLineAggregator(){{//设置分隔符setDelimiter(",");setFieldExtractor(newBeanWrapperFieldExtractor(){{//设置字段setNames(newString[]{"id","firstName","lastName"});}});}});returnwriter;}3.3StepwithReader-Processor-Writer之后就可以定义Step了:@BeanpublicStepcsvStep(){returnstepBuilderFactory.get("csvStep").chunk(5).rreader(multiResourceItemReader()).processor(itemProcessor()).writer(writer()).build();}这里有个chunk的设置,值为5,意思是5条记录后才会提交输出。需求定义3.4Job已经完成了Step的编码,很容易定义Job:@BeanpublicJobpkslowCsvJob(){returnjobBuilderFactory.get("pkslowCsvJob").incrementer(newRunIdIncrementer()).start(csvStep()).build();}3.5运行以上代码后,执行程序,结果如下:通过实例讲解SpringBatch的引入,优秀的批处理框架成功读取数据,将最后一个字段转换为大写,输出到输出数据.csv文件。4监听Listener可以通过Listener接口监听特定的事件,实现更多的业务功能。例如处理失败,会记录失败日志;如果处理完成,会通知下游获取数据等。我们分别监听Read、Process、Write事件,对应分别实现ItemReadListener接口、ItemProcessListener接口、ItemWriteListener接口。因为代码比较简单,打印log就可以了,这里只贴出ItemWriteListener的实现代码:.“beforeWrite:”+list);}@OverridepublicvoidafterWrite(Listlist){logger.info(“afterWrite:”+list);}@OverridepublicvoidonWriteError(Exceptione,Listlist){logger.info("onWriteError:"+list);}}将实现的监听器listener集成到Step中:@BeanpublicStepcsvStep(){returnsstepBuilderFactory.get("csvStep").chunk(5).reader(multiResourceItemReader())。listener(newPkslowReadListener()).processor(itemProcessor()).listener(newPkslowProcessListener()).writer(writer()).listener(newPkslowWriteListener()).build();}执行后看日志:通过了一个例子解释一下SpringBatch的引入,这里优秀的批处理框架可以清楚的看到之前chunk集合的作用。Writer一次处理5条记录。如果一次输出一个,就会对IO造成压力。5小结SpringBatch还有很多优秀的特性,比如面对大量数据时的并行处理。本文主要介绍介绍,不再一一介绍,后续会一一说明。