0

I have a list of read only data which are a bunch of end points on the internet that I want to process. I was wondering is there any kind of built in class or pattern that I should follow to process this ?

Right now I'm just diving the initial list into N blocks and creating N threads to process each request.

6 Answers 6

4

Use an ExecutorService to handle your concurrent processing.

public void processAll(List<Endpoint> endpoints, int numThreads) {
    ExecutorService executor = Executors.newFixedThreadPool(numThreads);

    for(final Endpoint endpoint : endpoints) {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                doProcessing(endpoint);
            }
        });
    }
    // instead of a loop, you could also use ExecutorService#invokeAll()
    // with the passed-in list if Endpoint implements
    // java.util.concurrent.Callable

    executor.shutdown();
}

private void doProcessing(Endpoint endpoint) {
    // do whatever you do with each one
}

This is just a bare-bones example. Have a look at the API for some examples about how to use a more specific type of ExecutorService, handle Futures, and do all kinds of nifty stuff.

Sign up to request clarification or add additional context in comments.

Comments

3

Any reason why you can't use the appropriate concurrent container? http://download.oracle.com/javase/6/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

1 Comment

+1 ConcurrentLinkedQueue is my "first choice" for passing off data between threads (at least within a primitive threading model).
2

Sounds like a Queue (use one of the implementations in java.util.concurrent) is what you need. That way each thread can pick up a link when it's ready, which makes more sense than partitioning in advance.

Comments

1

You will need three thinks:

  • two blocking list - first with data to porcess, second for results
  • Executor service
  • some kind of latch

My example application:

public class App {

    private static final int NUMBER_OF_THREADS = 3;

    public static void main(String[] args) throws InterruptedException {

        BlockingQueue<String> data = prepareData();

        BlockingQueue<String> results = new LinkedBlockingQueue<String>();

        ExecutorService executor = Executors.newFixedThreadPool(3);
        CountDownLatch countDownLatch = new CountDownLatch(3);

        for (int i = 0; i < NUMBER_OF_THREADS; i++)
            executor.execute(new Processor<String>(data, results,
                    countDownLatch, i + ""));

        countDownLatch.await();
    }

    private static BlockingQueue<String> prepareData() {
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
        for (int i = 0; i < 1000; i++) {
            queue.add(i + "");
        }
        return queue;
    }

}

class Processor<T> implements Runnable {

    private BlockingQueue<T> dataSource;

    private CountDownLatch latch;

    private String name;

    private BlockingQueue<String> results;

    public Processor(BlockingQueue<T> dataSource,
            BlockingQueue<String> results, CountDownLatch countDownLatch,
            String processName) {
        this.dataSource = dataSource;
        this.results = results;
        this.latch = countDownLatch;
        this.name = processName;
    }

    @Override
    public void run() {
        T t = null;
        while ((t = dataSource.poll()) != null) {
            try {
                String result = "Process " + name + " processing: "
                        + t.toString();
                System.out.println(result);
                results.put(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        latch.countDown();
    }
}

After prepare data create some Processors to process data. Each processor have reference to thread save data source. Get Object, process them and finally put result to another thread save collection that contains results.

When data source go empty then call latch.countDown() to say "everything done" to main thread or thread that wait for results.

Comments

0

Take a look at the java.util.concurrent package and the ExecutorService.

Brian Goetz's book Java Concurrency in Practice is a must to understand this stuff.

Comments

0

Blocking Queues are probably the best way to go for you. Google it and you'll find plenty of info, this for one is a good tutorial:http://www.developer.com/java/ent/article.php/3645111/Java-5s-BlockingQueue.htm

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.