3

I have a spring batch application which reads data from a csv file, pass all the lines and process it, pass all the processed lines and write it to a database. Very classic. Now my problem is that the csv file is too large, I have a java heap space, so I thought I could optimize that by processing the file per x lines, let's say per 10000 lignes (to release memory each 10000 instead of loading all the lines in the memory).

Is there anyway to tell spring-batch to process a step in recursive way ? Or is there any other way to solve my problem?

Any advise will be much appreciated. Thanks

1
  • 2
    if you do it with a batch chunk(reader-processor-writer) it doesn't load everything in memory. Commented May 24, 2013 at 6:45

1 Answer 1

6

here's an example of processing the following csv file into a bean

headerA,headerB,headerC
col1,col2,col3

the first row (header) is ignored and the other columns are mapped directly into a 'matching' object. (this is only done this way for brevity).

here's the job configuration using Spring Batch Out Of The Box components;

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">

    <batch:job id="fileJob">
        <batch:step id="fileJob.step1">
            <batch:tasklet>
                <batch:chunk reader="fileReader" writer="databaseWriter" commit-interval="10000"/>
            </batch:tasklet>
        </batch:step>
        <batch:validator>
            <bean class="org.springframework.batch.core.job.DefaultJobParametersValidator">
                <property name="requiredKeys" value="fileName"/>
            </bean>
        </batch:validator>
    </batch:job>

    <bean id="fileReader"
        class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <property name="lineMapper" ref="lineMapper"/>
        <property name="resource" value="file:#{jobParameters['fileName']}"/>
        <property name="linesToSkip" value="1"/>
    </bean>

    <bean id="lineMapper"
        class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="fieldSetMapper" ref="fieldSetMapper"/>
        <property name="lineTokenizer" ref="lineTokenizer"/>
    </bean>


    <bean id="lineTokenizer"
        class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
        <property name="delimiter" value=","/>
        <property name="names" value="col1,col2,col3"/>
    </bean>

    <bean id="fieldSetMapper"
        class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
        <property name="targetType" value="de.incompleteco.spring.batch.domain.SimpleEntity"/>
    </bean>

    <bean id="databaseWriter"
        class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="dataSource" ref="dataSource"/>
        <property name="itemSqlParameterSourceProvider">
            <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
        </property>
        <property name="sql" value="insert into simple_entity (col1,col2,col3) values (:col1,:col2,:col3)"/>
    </bean>
</beans>

there are a couple of note;

  1. this job needs a parameter 'fileName' to tell the fileReader where to find the file.
  2. there's a jobParametersValidator set to make sure the parameter is there

here's the batch resource configuration;

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <batch:job-repository id="jobRepository"/>

    <bean id="jobExplorer"
        class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
        <property name="dataSource" ref="dataSource"/>
    </bean>
    <bean id="jobLauncher"
        class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
        <property name="taskExecutor" ref="taskExecutor"/>
    </bean>

    <beans profile="junit">
        <jdbc:embedded-database id="dataSource" type="H2">
            <jdbc:script location="classpath:/org/springframework/batch/core/schema-h2.sql"/>
            <jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/>
        </jdbc:embedded-database>

        <task:executor id="taskExecutor"/>

        <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
            <property name="dataSource" ref="dataSource"/>
        </bean>
    </beans>
</beans>

here's a unit test for it too

package de.incompleteco.spring.batch;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileOutputStream;

import javax.sql.DataSource;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:/META-INF/spring/*-context.xml"})
@ActiveProfiles("junit")
public class FileJobIntegrationTest {

    @Autowired
    private Job job;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobExplorer jobExplorer;

    @Autowired
    private DataSource dataSource;

    private int recordCount = 1000000;

    private String fileName = System.getProperty("java.io.tmpdir") + File.separator + "test.csv";

    @Before
    public void before() throws Exception {
        if (new File(fileName).exists()) {
            new File(fileName).delete();
        }//end if
    }

    @Test
    public void test() throws Exception {
        //create a file
        FileOutputStream fos = new FileOutputStream(fileName);
        fos.write("col1,col2,col3".getBytes());
        fos.flush();
        for (int i=0;i<=recordCount;i++) {
            fos.write(new String(i + "," + (i+1) + "," + (i+2) + "\n").getBytes());
            fos.flush();//flush it
        }//end for
        fos.close();
        //lets get the size of the file
        long length = new File(fileName).length();
        System.out.println("file size: " + ((length / 1024) / 1024));
        //execute the job
        JobParameters jobParameters = new JobParametersBuilder().addString("fileName",fileName).toJobParameters();
        JobExecution execution = jobLauncher.run(job,jobParameters);
        //monitor
        while (jobExplorer.getJobExecution(execution.getId()).isRunning()) {
            Thread.sleep(1000);
        }//end while
        //load again
        execution = jobExplorer.getJobExecution(execution.getId());
        //test
        assertEquals(ExitStatus.COMPLETED.getExitCode(),execution.getExitStatus().getExitCode());
        //lets see what's in the database
        int count = new JdbcTemplate(dataSource).queryForObject("select count(*) from simple_entity", Integer.class);
        //test
        assertTrue(count == recordCount);
    }

}
Sign up to request clarification or add additional context in comments.

2 Comments

Excellent, this not the answer I were looking for, (It was on my own code), but I really like your sample
Hi storm_buster: Could you please guide on stackoverflow.com/questions/36391219/…?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.