1. Demand
The system reads csv files from a fixed directory every day and prints them on the console.
2. Solutions
To solve the above requirements, there are many methods that can be used, here we choose to use Spring Batch to achieve.
3. Matters needing attention
1. Obtaining the file path
Simple processing here, read the date in JobParameters, then build a file path, and put the file path into the ExecutionContext. Here, for simplicity, the file path will be hard-coded in the program, but at the same time, the file path will also be stored in the ExecutionContext, and the path will be obtained from the ExecutionContext in a specific Step.
Notice:
Although the data stored in the ExecutionContext can be obtained in each Step, it is not recommended to store relatively large data in the ExecutionContext, because the data of this object needs to be stored in the database.
2. If each Step obtains the value in the ExecutionContext
- Add the @StepScope annotation to the class
- Get it by @Value("#{jobExecutionContext['importPath']}")
eg:
@Bean @StepScope public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) { // read data return new FlatFileItemReaderBuilder<Person>() .name("read-csv-file") .resource(new ClassPathResource(importPath)) .delimited().delimiter(",") .names("username", "age", "sex") .fieldSetMapper(new RecordFieldSetMapper<>(Person.class)) .build(); }
Explanation: When the program instantiates FlatFileItemReader, there is no jobExecutionContext at this time, then an error will be reported. If @StepScope is added, there will be no problem at this time. @StepScope indicates that this Bean is not instantiated until the Step stage is reached
3. Note on the use of FlatFileItemReader
When we use FlatFileItemReader to read our csv file, we need to return the FlatFileItemReader type here, instead of returning ItemReader directly, otherwise the following error may occur Reader must be open before it can be read
4. Implementation steps
1. Import dependencies and configure
1. Import dependencies
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> </dependencies>
2. Initialize the SpringBatch database
spring.datasource.username=root spring.datasource.password=root@1993 spring.datasource.url=jdbc:mysql://127.0.0.1:3306/spring-batch?useUnicode=true&characterEncoding=utf8&autoReconnectForPools=true&useSSL=false spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver # When the program starts, the job is not executed by default spring.batch.job.enabled=false spring.batch.jdbc.initialize-schema=always # Initialize the spring-batch database script spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-mysql.sql
2. Build file read path
My idea here is to complete the acquisition of the file path in the JobExecutionListener, put it into the ExecutionContext, and then get the value of the file path in each Step.
/** * In this listener, get the specific file path to be read and save it to ExecutionContext * * @author huan.fu * @date 2022/8/30 - 22:22 */ @Slf4j public class AssemblyReadCsvPathListener implements JobExecutionListener { @Override public void beforeJob(JobExecution jobExecution) { ExecutionContext executionContext = jobExecution.getExecutionContext(); JobParameters jobParameters = jobExecution.getJobParameters(); String importDate = jobParameters.getString("importDate"); log.info("from job parameter obtained from importDate The value of the parameter is:[{}]", importDate); String readCsvPath = "data/person.csv"; log.info("Assemble according to the date that needs to be read csv path is:[{}],Exclude the date here and write a dead path directly", readCsvPath); executionContext.putString("importPath", readCsvPath); } @Override public void afterJob(JobExecution jobExecution) { } }
3. Build Tasklet, output file path
@Slf4j @Component @StepScope public class PrintImportFilePathTaskLet implements Tasklet { @Value("#{jobExecutionContext['importPath']}") private String importFilePath; @Value("#{jobParameters['importDate']}") private String importDate; @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { log.info("from job parameter obtained from importDate:[{}],from jobExecutionContext obtained from importPath:[{}]", importDate, importFilePath); return RepeatStatus.FINISHED; } }
It should be noted that the @StepScope annotation is added to this class
4. Write entity classes
@AllArgsConstructor @Getter @ToString public class Person { /** * username */ private String username; /** * age */ private Integer age; /** * gender */ private String sex; }
5. Write Job configuration
@Configuration @AllArgsConstructor @Slf4j public class ImportPersonJobConfig { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final PrintImportFilePathTaskLet printImportFilePathTaskLet; private final ItemReader<Person> readCsvItemReader; @Bean public Job importPersonJob() { // Get a job builder, jobName may not exist return jobBuilderFactory.get("import-person-job") // Add job execution listener .listener(new AssemblyReadCsvPathListener()) // print the values in job parameters and ExecutionContext .start(printParametersAndContextVariables()) // read csv data and process .next(handleCsvFileStep()) .build(); } /** * read data * Note: The FlatFileItemReader type needs to be returned here, not the ItemReader * Otherwise, the following exception may be reported Reader must be open before it can be read * * @param importPath file path * @return reader */ @Bean @StepScope public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) { // read data return new FlatFileItemReaderBuilder<Person>() .name("read-csv-file") .resource(new ClassPathResource(importPath)) .delimited().delimiter(",") .names("username", "age", "sex") .fieldSetMapper(new RecordFieldSetMapper<>(Person.class)) .build(); } @Bean public Step handleCsvFileStep() { // Each time a piece of data is read, it is handed over to this processing ItemProcessor<Person, Person> processor = item -> { if (item.getAge() > 25) { log.info("user[{}]age:[{}>25]do not handle", item.getUsername(), item.getAge()); return null; } return item; }; // After reading the chunk-sized data, start writing ItemWriter<Person> itemWriter = items -> { log.info("start writing data"); for (Person item : items) { log.info("{}", item); } }; return stepBuilderFactory.get("handle-csv-file") // Every time 2 pieces of data are read, a write is executed, and after each piece of data is read, process is executed .<Person, Person>chunk(2) // read data .reader(readCsvItemReader) // Read a piece of data and start processing .processor(processor) // When the number of read data reaches the chunk, call this method for processing .writer(itemWriter) .build(); } /** * print the values in job parameters and ExecutionContext * <p> * TaskletStep It is a very simple interface with only one method - execute. * TaskletStep This method is called repeatedly until a RepeatStatus.FINISHED is returned or an exception is thrown. * All Tasklet calls are wrapped in a Thing. * * @return Step */ private Step printParametersAndContextVariables() { return stepBuilderFactory.get("print-context-params") .tasklet(printImportFilePathTaskLet) // When the job restarts, if it reaches 3, the step is not executed .startLimit(3) // When the job restarts, if the step is in the COMPLETED state that has been processed, false is given below to indicate that the step is not restarting, that is, it is not executing .allowStartIfComplete(false) // Add step monitor .listener(new CustomStepExecutionListener()) .build(); } }
6. Write the Job startup class
@Component @Slf4j public class StartImportPersonJob { @Autowired private Job importPersonJob; @Autowired private JobLauncher jobLauncher; @PostConstruct public void startJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { JobParameters jobParameters = new JobParametersBuilder() .addString("importDate", LocalDate.of(2022, 08, 31).format(DateTimeFormatter.ofPattern("yyyyMMdd"))) .toJobParameters(); JobExecution execution = jobLauncher.run(importPersonJob, jobParameters); log.info("job invoked"); } }
7. Automatically configure SpringBatch
@SpringBootApplication @EnableBatchProcessing public class SpringBatchReadCsvApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchReadCsvApplication.class, args); } }
Mainly @EnableBatchProcessing annotation
5. Execution results
6. Complete code
https://gitee.com/huan1993/spring-cloud-parent/tree/master/spring-batch/spring-batch-read-csv