Skip to main content
edited tags
Link
Amarnath
  • 147
  • 1
  • 1
  • 4
Post Migrated Here from stackoverflow.com (revisions)
Source Link
Che
  • 147
  • 1
  • 1
  • 4

Reading output from multiple threads and writing to a file

There are 10 threads and I have 15 tasks. Now I have submitted all these tasks to these threads. I need to write all these threads output to a file which I was not successful.

I am getting output by running all the threads.

ThreadPool.java (Creates a Thread pool and adds all the tasks to the Blocking queue and submits)

public class ThreadPool {
    
    ExecutorService execService = null;
    BlockingQueue<Callable<String[]>> tasks = null;
    BlockingQueue<String> queue = null;
    
    // Get a list of Employee ID's
    public ThreadPool(List<String> empIDList) {
        try {
            tasks = new ArrayBlockingQueue<Callable<String[]>>(empIDList.size());
            
            for(String empNum : empIDList) {
                tasks.add(new ThreadTask(empNum));
            }
            
            performExecution(tasks);
            
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
    
    private void performExecution(BlockingQueue<Callable<String[]>> tasks) {
        try {
            execService = Executors.newFixedThreadPool(10);
            
            queue = new LinkedBlockingQueue<String>();
            
            Runnable reader = new ReaderEmp(queue);
            Thread readerThread = new Thread(reader);
            readerThread.start();
            
            for(Callable<String[]> call : tasks) {
                queue.add(executeTask(call, 5));
            }
            
            execService.shutdown();
            
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
    
    private String executeTask(Callable<String[]> task, int seconds) {
        try {
            String[] future = execService.submit(task).get(seconds, TimeUnit.SECONDS);
            
            System.out.println("Successfully Executed for " + future[0]);
            
            return future[0];
        } catch(TimeoutException toe) {
            System.out.println("Time Out for " + ((ThreadTask) task).getEmpID());
        } catch (ExecutionException ee) {
            System.out.println("Execution Exception for " + ((ThreadTask) task).getEmpID());
        } catch (InterruptedException ie) {
            System.out.println("Interrupted Exception for " + ((ThreadTask) task).getEmpID());
        }
        return ((ThreadTask) task).getEmpID() + " - Failed";
    }
}

ReaderEmp.Java (Gets each Emp_ID from the queue and writes them to the files.)

public class ReaderEmp implements Runnable {
    
    private final String EMP_SUCCESS = "Success.txt";
    private final String EMP_FAILURE = "Failure.txt";
    
    private BlockingQueue<String> queue;
    
    private FileWriter writerSuccess;
    private FileWriter writerFailure;
    
    private BufferedWriter bufWriterSuccess;
    private BufferedWriter bufWriterFailure;
    
    public ReaderEmp(BlockingQueue<String> queue) {
        try {
            writerSuccess = new FileWriter(EMP_SUCCESS);
            writerFailure = new FileWriter(EMP_FAILURE);
        } catch (IOException e) {
            e.printStackTrace();
        }
        
        bufWriterSuccess = new BufferedWriter(writerSuccess);
        bufWriterFailure = new BufferedWriter(writerFailure);

        this.queue = queue;
    }
    
    @Override
    public void run() {
    // This is the place which I am not able to reach while running. 
    // But when I keep a debugging point and running in debug mode then I am reaching.
        try {
            while(!queue.isEmpty()) {
                synchronized (this) {
                    String empID = queue.take();
                    if(empID != null) {
                        if(empID.contains("Failed")) {
                            bufWriterFailure.append(empID.split("-")[0].trim() + ";");
                        } else {
                            bufWriterSuccess.append(empID + ";");
                        }
                    }
                }
            }
        } catch(Exception e) {
            e.printStackTrace();
        } finally {
            try {
                synchronized (bufWriterFailure) {
                    bufWriterFailure.flush();
                    bufWriterFailure.close();
                }
                
                synchronized (bufWriterSuccess) {
                    bufWriterSuccess.flush();
                    bufWriterSuccess.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}