0

I went through the link - https://github.com/spring-projects/spring-batch/blob/master/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/ClassifierCompositeItemProcessorTests.java, but did not strike much out of it.

I am trying to replace ETL Informatica mapping logic into the Batch. I am looking to separate out Status=I and Status=U into separate (Individual) processor and then further perform lookup and massage the data and then write those records directly into the table for Status=I and for status=U, perform another complex logic (like lookups, massaging and match and merge logic) and then upsert those records again into the same table.

I've tried to do POC, where I am looking to segregate the records in the processor

CustomerClassifier.java

public class CustomerClassifier implements Classifier<Customer, ItemProcessor<Customer, Customer>> {

    private ItemProcessor<Customer, Customer> insertCustomerProcessor;
    private ItemProcessor<Customer, Customer> updateCustomerProcessor;
    
    public CustomerClassifier(ItemProcessor<Customer, Customer> evenCustomerProcessor, ItemProcessor<Customer, Customer> oddCustomerProcessor) {
        this.insertCustomerProcessor= insertCustomerProcessor;
        this.updateCustomerProcessor= updateCustomerProcessor;
    }
    
    @Override
    public ItemProcessor<Customer, Customer> classify(Customer customer) {
        return customer.getStatus().equals("I") ? insertCustomerProcessor : updateCustomerProcessor;
    }
}

OddCustomerProcessor.java

public class OddCustomerProcessor implements ItemProcessor<Customer, Customer> {

    @Override
    public Customer process(Customer item) throws Exception {
        Customer customer = new Customer();
        // Perform some msaaging and lookups here
        customer.setId(item.getId());
        customer.setFirstName(item.getFirstName());
        customer.setLastName(item.getLastName());
        customer.setBirthdate(item.getBirthdate());
        customer.setStatus(item.getStatus());
        return customer;
    }
}

EvenCustomerProcessor.java

public class EvenCustomerProcessor implements ItemProcessor<Customer, Customer> {

    @Override
    public Customer process(Customer item) throws Exception {
        Customer customer = new Customer();
        // Perform some msaaging and lookups here
        customer.setId(item.getId());
        customer.setFirstName(item.getFirstName());
        customer.setLastName(item.getLastName());
        customer.setBirthdate(item.getBirthdate());
        customer.setStatus(item.getStatus());
        return customer;
    }
}

CustomLineAggregator.java

public class CustomLineAggregator implements LineAggregator<Customer> {
    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public String aggregate(Customer item) {
        try {
            return objectMapper.writeValueAsString(item);
        } catch (Exception e) {
            throw new RuntimeException("Unable to serialize Customer", e);
        }
    }
}

Customer.java

@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer {
    private Long id;
    private String firstName;
    private String lastName;
    private String birthdate;
    private String status;
}

Error-

The method setClassifier(Classifier<? super Customer,ItemProcessor<?,? extends Customer>>) in the type ClassifierCompositeItemProcessor<Customer,Customer> is not applicable for the arguments (CustomerClassifier)

Configuration

@Configuration
public class JobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Autowired
    private DataSource dataSource;
    
    @Bean
    public JdbcPagingItemReader<Customer> customerPagingItemReader(){
        // reading database records using JDBC in a paging fashion
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(this.dataSource);
        reader.setFetchSize(1000);
        reader.setRowMapper(new CustomerRowMapper());
        
        // Sort Keys
        Map<String, Order> sortKeys = new HashMap<>();
        sortKeys.put("id", Order.ASCENDING);
        
        // MySQL implementation of a PagingQueryProvider using database specific features.
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from customer");
        queryProvider.setSortKeys(sortKeys);
        
        reader.setQueryProvider(queryProvider);
        
        return reader;
    }
    
    @Bean
    public EvenCustomerProcessor evenCustomerProcessor() {
        return new EvenCustomerProcessor();
    }
    
    @Bean
    public OddCustomerProcessor oddCustomerProcessor() {
        return new OddCustomerProcessor();
    }
    
    @Bean 
    public JdbcBatchItemWriter<Customer> customerItemWriter(){
    JdbcBatchItemWriter<Customer> batchItemWriter = new JdbcBatchItemWriter<>();
    batchItemWriter.setDataSource(dataSource);
    batchItemWriter.setSql(""); // Query Goes here
    return batchItemWriter;
}
    
    @Bean
    public ClassifierCompositeItemProcessor<Customer, Customer> classifierCustomerCompositeItemProcessor() throws Exception{
        ClassifierCompositeItemProcessor<Customer, Customer> itemProcessor = new ClassifierCompositeItemProcessor<>();
        itemProcessor.setClassifier(new CustomerClassifier(evenCustomerProcessor(), oddCustomerProcessor()));
    }
    
    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer> chunk(10)
                .reader(customerPagingItemReader())
                .processor(classifierCustomerCompositeItemProcessor())
                .writer(customerItemWriter())
                .build();
    }
    
    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .start(step1())
                .build();
    }
}
5
  • Is it intended that step1 does not use any item processor? I guess you are planning to use classifierCustomerCompositeItemProcessor as item processor. Commented Oct 28, 2020 at 14:58
  • @MahmoudBenHassine - I've added, please help me to solve the issue by reading my problem statement and thanks in advance for direction and consulting Commented Oct 28, 2020 at 15:06
  • Your code does not compile, the method classifierCustomerCompositeItemProcessor() should return the item processor: return itemProcessor;. Do you agree? Commented Oct 28, 2020 at 15:16
  • Yes, Indeed agree, how can I segregate the records in process with Status=I and Status=U or how can I achieve this using ItemProcessor ? Commented Oct 28, 2020 at 15:19
  • I added an answer, let me know if it helps. Commented Oct 28, 2020 at 15:49

1 Answer 1

2

You can remove the CustomerClassifier and define the composite item processor as follows:

@Bean
public ClassifierCompositeItemProcessor<Customer, Customer> classifierCustomerCompositeItemProcessor(
        EvenCustomerProcessor evenCustomerProcessor,
        OddCustomerProcessor oddCustomerProcessor
) {
    ClassifierCompositeItemProcessor<Customer, Customer> itemProcessor = new ClassifierCompositeItemProcessor<>();
    itemProcessor.setClassifier(new Classifier<Customer, ItemProcessor<?, ? extends Customer>>() {
        @Override
        public ItemProcessor<?, ? extends Customer> classify(Customer customer) {
            return customer.getStatus().equals("I") ? evenCustomerProcessor : oddCustomerProcessor;
        }
    });
    return itemProcessor;
}

Then update your step definition as follows:

@Bean
public Step step1() throws Exception {
    return stepBuilderFactory.get("step1")
            .<Customer, Customer> chunk(10)
            .reader(customerPagingItemReader())
            .processor(classifierCustomerCompositeItemProcessor(evenCustomerProcessor(), oddCustomerProcessor()))
            .writer(customerItemWriter())
            .build();
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.