You will need three thinks:
- two blocking list - first with data to porcess, second for results
- Executor service
- some kind of latch
My example application:
public class App {
private static final int NUMBER_OF_THREADS = 3;
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> data = prepareData();
BlockingQueue<String> results = new LinkedBlockingQueue<String>();
ExecutorService executor = Executors.newFixedThreadPool(3);
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < NUMBER_OF_THREADS; i++)
executor.execute(new Processor<String>(data, results,
countDownLatch, i + ""));
countDownLatch.await();
}
private static BlockingQueue<String> prepareData() {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
for (int i = 0; i < 1000; i++) {
queue.add(i + "");
}
return queue;
}
}
class Processor<T> implements Runnable {
private BlockingQueue<T> dataSource;
private CountDownLatch latch;
private String name;
private BlockingQueue<String> results;
public Processor(BlockingQueue<T> dataSource,
BlockingQueue<String> results, CountDownLatch countDownLatch,
String processName) {
this.dataSource = dataSource;
this.results = results;
this.latch = countDownLatch;
this.name = processName;
}
@Override
public void run() {
T t = null;
while ((t = dataSource.poll()) != null) {
try {
String result = "Process " + name + " processing: "
+ t.toString();
System.out.println(result);
results.put(result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
latch.countDown();
}
}
After prepare data create some Processors to process data. Each processor have reference to thread save data source. Get Object, process them and finally put result to another thread save collection that contains results.
When data source go empty then call latch.countDown() to say "everything done" to main thread or thread that wait for results.