多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
# Spring Batch 已处理记录的计数示例 > 原文: [https://howtodoinjava.com/spring-batch/records-count-example/](https://howtodoinjava.com/spring-batch/records-count-example/) 了解如何使用`ItemStream`和`ChunkListener`计数 Spring Batch 作业处理的记录数,并将记录数记录在日志文件或控制台中。 ## 使用`ItemStream`计数记录 在给定的`ItemStream`实现下方,计算定期处理的记录数。 `ItemCountItemStream.java` ```java import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemStream; import org.springframework.batch.item.ItemStreamException; public class ItemCountItemStream implements ItemStream { public void open(ExecutionContext executionContext) throws ItemStreamException { } public void update(ExecutionContext executionContext) throws ItemStreamException { System.out.println("ItemCount: "+executionContext.get("FlatFileItemReader.read.count")); } public void close() throws ItemStreamException { } } ``` #### 如何使用`ItemCountItemStream` 在`Tasklet`中使用`SimpleStepBuilder.stream()`方法注册上面创建的`ItemCountItemStream`。 `BatchConfig.java` ```java @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job readCSVFilesJob() { return jobBuilderFactory .get("readCSVFilesJob") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } @Bean public Step step1() { return stepBuilderFactory .get("step1") .<Employee, Employee>chunk(1) .reader(reader()) .writer(writer()) .stream(stream()) .build(); } @Bean public ItemCountItemStream stream() { return new ItemCountItemStream(); } ``` ## 使用`ChunkListener`计数记录 在给定的`ChunkListener`实现下方,计算定期处理的记录数。 `ItemCountListener.java` ```java import org.springframework.batch.core.ChunkListener; import org.springframework.batch.core.scope.context.ChunkContext; public class ItemCountListener implements ChunkListener { @Override public void beforeChunk(ChunkContext context) { } @Override public void afterChunk(ChunkContext context) { int count = context.getStepContext().getStepExecution().getReadCount(); System.out.println("ItemCount: " + count); } @Override public void afterChunkError(ChunkContext context) { } } ``` #### 如何使用`ItemCountListener` 在`Tasklet`中使用`SimpleStepBuilder.listener()`方法注册上面创建的`ItemCountListener`。 `BatchConfig.java` ```java @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job readCSVFilesJob() { return jobBuilderFactory .get("readCSVFilesJob") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } @Bean public Step step1() { return stepBuilderFactory .get("step1") .<Employee, Employee>chunk(1) .reader(reader()) .writer(writer()) .listener(listener()) .build(); } @Bean public ItemCountListener listener() { return new ItemCountListener(); } ``` ## 计数记录演示 我正在使用上述`ItemCountListener`配置来处理此 CSV。 `inputData.csv` ```java id,firstName,lastName 1,Lokesh,Gupta 2,Amit,Mishra 3,Pankaj,Kumar 4,David,Miller 5,David,Walsh ``` 完整的批处理配置如下所示: `BatchConfig.java` ```java package com.howtodoinjava.demo.config; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.Resource; import com.howtodoinjava.demo.model.Employee; @Configuration @EnableBatchProcessing public class BatchConfig { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Value("/input/inputData.csv") private Resource inputResource; @Bean public Job readCSVFilesJob() { return jobBuilderFactory .get("readCSVFilesJob") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } @Bean public Step step1() { return stepBuilderFactory .get("step1") .<Employee, Employee>chunk(1) .reader(reader()) .writer(writer()) .listener(listner()) .build(); } @Bean public ItemCountListener listner() { return new ItemCountListener(); } @Bean public FlatFileItemReader<Employee> reader() { FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>(); itemReader.setLineMapper(lineMapper()); itemReader.setLinesToSkip(1); itemReader.setResource(inputResource); return itemReader; } @Bean public LineMapper<Employee> lineMapper() { DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>(); DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" }); lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 }); BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>(); fieldSetMapper.setTargetType(Employee.class); lineMapper.setLineTokenizer(lineTokenizer); lineMapper.setFieldSetMapper(fieldSetMapper); return lineMapper; } @Bean public ConsoleItemWriter<Employee> writer() { return new ConsoleItemWriter<Employee>(); } } ``` 作为 Spring 启动应用程序启动该应用程序。 Spring 任务调度器将开始工作。 `App.java` ```java import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; @SpringBootApplication @EnableScheduling public class App { @Autowired JobLauncher jobLauncher; @Autowired Job job; public static void main(String[] args) { SpringApplication.run(App.class, args); } @Scheduled(cron = "0 */1 * * * ?") public void perform() throws Exception { JobParameters params = new JobParametersBuilder() .addString("JobID", String.valueOf(System.currentTimeMillis())) .toJobParameters(); jobLauncher.run(job, params); } } ``` 现在观看控制台。 `Console` ```java 2018-07-11 16:38:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531307280004}] 2018-07-11 16:38:00 INFO - Executing step: [step1] Employee [id=1, firstName=Lokesh, lastName=Gupta] ItemCount: 1 Employee [id=2, firstName=Amit, lastName=Mishra] ItemCount: 2 Employee [id=3, firstName=Pankaj, lastName=Kumar] ItemCount: 3 Employee [id=4, firstName=David, lastName=Miller] ItemCount: 4 Employee [id=5, firstName=David, lastName=Walsh] ItemCount: 5 ItemCount: 5 2018-07-11 16:38:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531307280004}] and the following status: [COMPLETED] ``` 将我的问题放在评论部分。 学习愉快! 参考: [`ItemStream` Java 文档](https://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/item/ItemStream.html) [`ChunkListener` Java 文档](https://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/ChunkListener.html)