Read csv file using SpringBatch

1. Demand

The system reads csv files from a fixed directory every day and prints them on the console.

2. Solutions

To solve the above requirements, there are many methods that can be used, here we choose to use Spring Batch to achieve.

3. Matters needing attention

1. Obtaining the file path

Simple processing here, read the date in JobParameters, then build a file path, and put the file path into the ExecutionContext. Here, for simplicity, the file path will be hard-coded in the program, but at the same time, the file path will also be stored in the ExecutionContext, and the path will be obtained from the ExecutionContext in a specific Step.

Notice:
Although the data stored in the ExecutionContext can be obtained in each Step, it is not recommended to store relatively large data in the ExecutionContext, because the data of this object needs to be stored in the database.

2. If each Step obtains the value in the ExecutionContext

  1. Add the @StepScope annotation to the class
  2. Get it by @Value("#{jobExecutionContext['importPath']}")

eg:

@Bean
@StepScope
public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {
    // read data
    return new FlatFileItemReaderBuilder<Person>()
            .name("read-csv-file")
            .resource(new ClassPathResource(importPath))
            .delimited().delimiter(",")
            .names("username", "age", "sex")
            .fieldSetMapper(new RecordFieldSetMapper<>(Person.class))
            .build();
}

Explanation: When the program instantiates FlatFileItemReader, there is no jobExecutionContext at this time, then an error will be reported. If @StepScope is added, there will be no problem at this time. @StepScope indicates that this Bean is not instantiated until the Step stage is reached

3. Note on the use of FlatFileItemReader

When we use FlatFileItemReader to read our csv file, we need to return the FlatFileItemReader type here, instead of returning ItemReader directly, otherwise the following error may occur Reader must be open before it can be read

4. Implementation steps

1. Import dependencies and configure

1. Import dependencies

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
</dependencies>

2. Initialize the SpringBatch database

spring.datasource.username=root
spring.datasource.password=root@1993
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/spring-batch?useUnicode=true&characterEncoding=utf8&autoReconnectForPools=true&useSSL=false
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# When the program starts, the job is not executed by default
spring.batch.job.enabled=false
spring.batch.jdbc.initialize-schema=always
# Initialize the spring-batch database script
spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-mysql.sql

2. Build file read path

My idea here is to complete the acquisition of the file path in the JobExecutionListener, put it into the ExecutionContext, and then get the value of the file path in each Step.

/**
 * In this listener, get the specific file path to be read and save it to ExecutionContext
 *
 * @author huan.fu
 * @date 2022/8/30 - 22:22
 */
@Slf4j
public class AssemblyReadCsvPathListener implements JobExecutionListener {
    @Override
    public void beforeJob(JobExecution jobExecution) {
        ExecutionContext executionContext = jobExecution.getExecutionContext();
        JobParameters jobParameters = jobExecution.getJobParameters();
        String importDate = jobParameters.getString("importDate");
        log.info("from job parameter obtained from importDate The value of the parameter is:[{}]", importDate);
        String readCsvPath = "data/person.csv";
        log.info("Assemble according to the date that needs to be read csv path is:[{}],Exclude the date here and write a dead path directly", readCsvPath);
        executionContext.putString("importPath", readCsvPath);
    }

    @Override
    public void afterJob(JobExecution jobExecution) {

    }
}

3. Build Tasklet, output file path

@Slf4j
@Component
@StepScope
public class PrintImportFilePathTaskLet implements Tasklet {

    @Value("#{jobExecutionContext['importPath']}")
    private String importFilePath;

    @Value("#{jobParameters['importDate']}")
    private String importDate;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

        log.info("from job parameter obtained from importDate:[{}],from jobExecutionContext obtained from importPath:[{}]",
                importDate, importFilePath);

        return RepeatStatus.FINISHED;
    }
}

It should be noted that the @StepScope annotation is added to this class

4. Write entity classes

@AllArgsConstructor
@Getter
@ToString
public class Person {
    /**
     * username
     */
    private String username;
    /**
     * age
     */
    private Integer age;
    /**
     * gender
     */
    private String sex;
}

5. Write Job configuration

@Configuration
@AllArgsConstructor
@Slf4j
public class ImportPersonJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    private final PrintImportFilePathTaskLet printImportFilePathTaskLet;
    private final ItemReader<Person> readCsvItemReader;

    @Bean
    public Job importPersonJob() {
        // Get a job builder, jobName may not exist
        return jobBuilderFactory.get("import-person-job")
                // Add job execution listener
                .listener(new AssemblyReadCsvPathListener())
                // print the values ​​in job parameters and ExecutionContext
                .start(printParametersAndContextVariables())
                // read csv data and process
                .next(handleCsvFileStep())
                .build();
    }

    /**
     * read data
     * Note: The FlatFileItemReader type needs to be returned here, not the ItemReader
     * Otherwise, the following exception may be reported Reader must be open before it can be read
     *
     * @param importPath file path
     * @return reader
     */
    @Bean
    @StepScope
    public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {
        // read data
        return new FlatFileItemReaderBuilder<Person>()
                .name("read-csv-file")
                .resource(new ClassPathResource(importPath))
                .delimited().delimiter(",")
                .names("username", "age", "sex")
                .fieldSetMapper(new RecordFieldSetMapper<>(Person.class))
                .build();
    }

    @Bean
    public Step handleCsvFileStep() {

        // Each time a piece of data is read, it is handed over to this processing
        ItemProcessor<Person, Person> processor = item -> {
            if (item.getAge() > 25) {
                log.info("user[{}]age:[{}>25]do not handle", item.getUsername(), item.getAge());
                return null;
            }
            return item;
        };

        // After reading the chunk-sized data, start writing
        ItemWriter<Person> itemWriter = items -> {
            log.info("start writing data");
            for (Person item : items) {
                log.info("{}", item);
            }
        };

        return stepBuilderFactory.get("handle-csv-file")
                // Every time 2 pieces of data are read, a write is executed, and after each piece of data is read, process is executed
                .<Person, Person>chunk(2)
                // read data
                .reader(readCsvItemReader)
                // Read a piece of data and start processing
                .processor(processor)
                // When the number of read data reaches the chunk, call this method for processing
                .writer(itemWriter)
                .build();
    }

    /**
     * print the values ​​in job parameters and ExecutionContext
     * <p>
     * TaskletStep It is a very simple interface with only one method - execute.
     * TaskletStep This method is called repeatedly until a RepeatStatus.FINISHED is returned or an exception is thrown.
     * All Tasklet calls are wrapped in a Thing.
     *
     * @return Step
     */
    private Step printParametersAndContextVariables() {
        return stepBuilderFactory.get("print-context-params")
                .tasklet(printImportFilePathTaskLet)
                // When the job restarts, if it reaches 3, the step is not executed
                .startLimit(3)
                // When the job restarts, if the step is in the COMPLETED state that has been processed, false is given below to indicate that the step is not restarting, that is, it is not executing
                .allowStartIfComplete(false)
                // Add step monitor
                .listener(new CustomStepExecutionListener())
                .build();
    }
}

6. Write the Job startup class

@Component
@Slf4j
public class StartImportPersonJob {

    @Autowired
    private Job importPersonJob;
    @Autowired
    private JobLauncher jobLauncher;

    @PostConstruct
    public void startJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("importDate", LocalDate.of(2022, 08, 31).format(DateTimeFormatter.ofPattern("yyyyMMdd")))
                .toJobParameters();
        JobExecution execution = jobLauncher.run(importPersonJob, jobParameters);
        log.info("job invoked");
    }
}

7. Automatically configure SpringBatch

@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchReadCsvApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchReadCsvApplication.class, args);
    }
}

Mainly @EnableBatchProcessing annotation

5. Execution results

6. Complete code

https://gitee.com/huan1993/spring-cloud-parent/tree/master/spring-batch/spring-batch-read-csv

Tags: Back-end csv Spring Boot springBatch

Posted by darkke on Wed, 31 Aug 2022 04:37:49 +0930