0

I need to read a large csv file (328 MB) and process it .The processing of each row includes calling a Webservice also .

I am using ThreadPoolExecutor for first time . My logic is , i will spit every 100 rows from csv and create a thread that will run and process each row and writes the result in templ file. Once all the threads are finished , i will read the temp files and create a comined output file.

My method that splits the file and creates Threads

private List<Thread> invokeWS(String csvFilename, String tempFolder) {

    List<Thread> processCsvThreadList = new ArrayList<Thread>();

    //Thread Pool Executer


    int corePoolSize = 3;
    int maximumPoolSize = 6;
    long keepAliveTime = 10;
     ThreadFactory threadFactory = Executors.defaultThreadFactory();


    ThreadPoolExecutor thrdPoolEx = new ThreadPoolExecutor(corePoolSize,
            maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2));


    try {
        BufferedReader bfr = new BufferedReader(new FileReader(csvFilename));
        String line = "";
        int i = 0;
        line = bfr.readLine();
        Thread csvThread;
        List<String> rowList = new ArrayList<String>();


        do {
            line = bfr.readLine();
            if (line != null) {

                rowList.add(line);
                i++;

                if (i % 100 == 0) {

                    csvThread = new Thread(new ProcessCsvRow(rowList,
                            tempFolder));
                    csvThread.start();
                    thrdPoolEx.execute(csvThread);

                    rowList = new ArrayList<String>();
                    processCsvThreadList.add(csvThread);
                }

            } else {
                if (null != rowList && !rowList.isEmpty()) {

                    csvThread = new Thread(new ProcessCsvRow(rowList,
                            tempFolder));
                    csvThread.start();
                    thrdPoolEx.execute(csvThread);

                    processCsvThreadList.add(csvThread);
                }
                break;
            }
        } while (true);




    } catch (FileNotFoundException fnf) {
        fnf.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
    finally{
        thrdPoolEx.shutdown();
        }
    return processCsvThreadList;
}

My ProcessCsvRow class

public class ProcessCsvRow implements Runnable {

private List<String> csvRowsList;
private String tempDir;

public ProcessCsvRow(List<String> csvRowsList, String tempDir) {

    this.csvRowsList = csvRowsList;
    this.tempDir = tempDir;
}

@Override
public void run() {
    UUID idOne = UUID.randomUUID();
    FileWriter fw = null;
    BufferedWriter bufferedWriter = null;
    try {
        String res = "";
        fw = new FileWriter(new File(tempDir + "\\" + idOne.toString()+FilePropConstants.FILE_NAME_EXT_TMP));

        bufferedWriter = new BufferedWriter(fw);
        SentimentAnalyzer sentimentAnalyzer = new SentimentAnalyzer();

        for (String csvRow : csvRowsList) {
            //calling webservice for each row

            res = sentimentAnalyzer.invokeSentWS(csvRow);
            bufferedWriter.write(res);


        }

    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        try {
            if (bufferedWriter != null) {
                bufferedWriter.flush();
                bufferedWriter.close();
            }
            if (fw != null) {
                fw.close();
            }

        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

}

The issue is if for 5 row csv there should be one temp file created, but when i run this program i am getting two temp files generated which is wrong . I strongly belive its not a logical issue but the way I have implemented ThreadPoolExecuter.

Any help is greatly appreciated .

2 Answers 2

2

You shouldn't be creating Thread and you don't need to create a Thread pool directly.

Try

ExecutorService es = Executors.newFixedThreadPool(8);

es.submit(runnable); // not threads

BTW Each thread has to create it own output file or you need to lock a shared file, or you can submit a Callable and have it return what you want to log to the submitting thread.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks Peter, it worked.I have modified my code and added finally { taskExecutor.shutdown(); try { taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { } }
Unless you really need to wait for the thread pool to finish, I would just call shutdown();
1

It is because you are both starting the threads yourself, and asking the executor to execute it.

Change:

csvThread = new Thread(new ProcessCsvRow(rowList, tempFolder));
csvThread.start();
thrdPoolEx.execute(csvThread);

rowList = new ArrayList<String>();
processCsvThreadList.add(csvThread);

to:

csvThread = new Thread(new ProcessCsvRow(rowList, tempFolder));
thrdPoolEx.execute(csvThread);

rowList = new ArrayList<String>();
processCsvThreadList.add(csvThread);

1 Comment

@PeterLawrey - I know, but he wanted to know what he was doing wrong, so I explained. In general I think he should be using a completely different approach to the producer/consumer model then batching 100 calls per thread.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.