0

I'm evaluating spring batch for a particular project and after a lot of searching around the web, I haven't been able to find a spring batch solution that meets my requirements.

I'm wondering if spring batch is capable of reading multiple CSV files made up of different formats in a single job? For example, lets say Person.csv and Address.csv, both made up of different formats, but depend on each other

I need to read, process with data corrections (ie toUpperCase, etc), and validate each record.

In the event of validation errors, I need to record the error(s) to some sort of object array where it will be made available later on after validation has completed to be emailed to the end user for corrections.

Once all the data from both files has been validated and no validation errors have occurred in either file, continue on to the batch writer. If any errors have occurred in either of the two files, I need to stop the entire job. If the writer has already began writing to the database when an error has occurred, the entire job would need to be rolled back regardless if the error exist in the opposite file.

I cannot insert any of the two CSV files if there is any kind of validation error in either one. The end user must be notified with the errors. The errors will be used to make any necessary corrections prior to reprocessing the files.

Is Spring batch in SpringBoot 2 capable of this behavior?

Example

Person.csv

BatchId, personId, firstName, lastName

Address.csv

BatchId, personId, address1

In the above example the relationship between the two files is the batchId and personId. If there is any kind of validation error in either of the two files, I must fail the entire batch. I'd like to complete validation on both files so I can respond with all the errors, but just not write to the database.

3
  • How do files depend on each other? Can you show few records as well as the domain model to see the relation between them? Commented Aug 29, 2018 at 7:29
  • @MahmoudBenHassine sure, I've provided a little CSV example above where the address.csv contains a personId and a batchId which must be contained in the person.csv. If there is so much as a required field such as a first name or address I must fail the entire batch and rollback any db commits. Commented Aug 29, 2018 at 11:56
  • Thanks for the clarification, I added an answer with an example. Hope it helps. Commented Aug 29, 2018 at 13:51

1 Answer 1

2

I'm wondering if spring batch is capable of reading multiple CSV files made up of different formats in a single job?

Yes, you can have a single job with multiple steps, each step processing a file of a given type. The point is how to design the job. One technique you can apply is using staging tables. A batch job can create temporary staging tables where it loads all data needed and then remove them when done.

In your case, you can have two steps loading each file in a specific staging table. Each step can apply validation logic specific to each file. If one of these steps fail, you fail the job. Staging tables can have a marker column for invalid records (this is useful for reporting).

Once these two preparatory steps are done, you can read data from the two staging tables in another step and apply cross-validation rules against joined data (for example select from both tables and join by BatchId and PersonId). If this step fails, you fail the job. Otherwise, you write data where appropriate.

The advantage of this technique is that data is available in staging tables during the entire job. So whenever a validation step fails, you can use a flow to redirect the failed step to a "reporting step" (that reads invalid data and sends the report) and then fail the job. Here is a self-contained example you can play with:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class FlowJobSample {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Step personLoadingStep() {
        return steps.get("personLoadingStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("personLoadingStep");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step addressLoadingStep() {
        return steps.get("addressLoadingStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("addressLoadingStep");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step crossValidationStep() {
        return steps.get("crossValidationStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("crossValidationStep");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step reportingStep() {
        return steps.get("reportingStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("reportingStep");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(personLoadingStep()).on("INVALID").to(reportingStep())
                    .from(personLoadingStep()).on("*").to(addressLoadingStep())
                    .from(addressLoadingStep()).on("INVALID").to(reportingStep())
                    .from(addressLoadingStep()).on("*").to(crossValidationStep())
                    .from(crossValidationStep()).on("INVALID").to(reportingStep())
                    .from(crossValidationStep()).on("*").end()
                    .from(reportingStep()).on("*").fail()
                    .build()
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(FlowJobSample.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

To make one of the steps fail, set the exit status to INVALID, for example:

@Bean
public Step personLoadingStep() {
    return steps.get("personLoadingStep")
            .tasklet((contribution, chunkContext) -> {
                System.out.println("personLoadingStep");
                chunkContext.getStepContext().getStepExecution().setExitStatus(new ExitStatus("INVALID"));
                return RepeatStatus.FINISHED;
            })
            .build();
}

I hope this helps.

Sign up to request clarification or add additional context in comments.

3 Comments

BTW, if it was up to me and the job would run on linux/unix os, I would join the files by BatchId and PersonId in a SystemCommandTasklet step with the join command. This would tremendously simplify the design of the job.
Thank you very much, I actually started thinking down this path but with a temp file, I like your staging table idea a lot better though. Thanks for the examples as well. Could you elaborate on the SystemCommandTasklet and how it would simplify the design of the job? This is an new project and I'm free to make whatever design decisions are most appropriate. Thanks
A first step would be a SystemCommandTasklet that executes the command: join person.csv address.csv for example (you need to see options of join). The idea is that the SystemCommandTasklet is a preparatory step that outputs a single file having the format: BatchId, personId, firstName, lastName, address. Then you can have a second chunk oriented step that reads this file and maps it to a domain object Person having everything needed to apply validation rules (implemented in a processor). This way you have only two steps and there is no need to use staging tables.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.