2

I think I'm doing it wrong. I am creating threads that are suppose to crunch some data from a shared queue. My problem is the program is slow and a memory hog, I suspect that the queue may not be as shared as I hoped it would be. I suspect this because in my code I added a line that displayed the size of the queue and if I launch 2 threads then I get two outputs with completely different numbers and seem to increment on their own(I thought it could be the same number but maybe it was jumping from 100 to 2 and so on but after watching it shows 105 and 5 and goes at a different rate. If I have 4 threads then I see 4 different numbers).

Here's snippet of the relevant parts. I create a static class with the data I want in the queue at the top of the program

static class queue_class {
        int number;
        int[] data;
        Context(int number,  int[] data) {
            this.number = number;
            this.data = data;
        }
    }

Then I create the queue after sending some jobs to the callable..

static class process_threaded implements Callable<Void> {
    // queue with contexts to process
    private Queue<queue_class> queue;

    process_threaded(queue_class request) {
        queue = new ArrayDeque<queue_class>();
        queue.add(request);
    }

    public Void call() {
        while(!queue.isEmpty()) {
            System.out.println("in contexts queue with a size of " + queue.size());
            Context current = contexts.poll();
            //get work and process it, if it work great then the solution goes elsewhere
            //otherwise, depending on the data, its either discarded or parts of it is added back to queue
            queue.add(new queue_class(k, data_list)); 

As you can see, there's 3 options for the data, get sent off if data is good, discard if its totally horrible or sent back to the queue. I think the queues are going when its getting sent back but I suspect because each thread is working on its own queue and not a shared one.

Is this guess correct and am I doing this wrong?

3
  • This code won't compile. Did you try to compile and execute something and then ask a specific question rather than writing a pseudo-code which won't be accepted by java compiler? Commented Apr 14, 2012 at 4:24
  • my actual code compiles..I didn't think anyone would want to compile it so I took the code, cleaned up the info not related to the problem just to present an idea of what I'm doing. I don't need code just logic and think the above code should give an idea of what I'm doing. Commented Apr 14, 2012 at 4:33
  • You should read Java Concurrency in Practice to build your expertise in writing thread-safe code. Commented Apr 14, 2012 at 4:54

2 Answers 2

2

You are correct in your assessment that each thread is (probably) working with its own queue, since you are creating a queue in the constructor of your Callable. (It's actually very weird to have a Callable<Void> -- isn't that just a Runnable?)

There are other problems there, for example, the fact that you're working with a queue that isn't thread-safe, or the fact that your code won't compile as it is written.

The important question, though, is do you really need to explicitly create a queue in the first place? Why not have an ExecutorService to which you submit your Callables (or Runnables if you decide to make that switch): Pass a reference to the executor into your Callables, and they can add new Callables to the executor's queue of tasks to run. No need to reinvent the wheel.

For example:

static class process_threaded implements Runnable {
    // Reference to an executor
    private final ExecutorService exec;
    // Reference to the job counter
    private final AtomicInteger jobCounter;
    // Request to process
    private queue_class request;

    process_threaded( ExecutorService exec, AtomicInteger counter, queue_class request) {
        this.exec = exec;
        this.jobCounter = counter;
        this.jobCounter.incrementAndGet(); // Assuming that you will always
                                           // submit the process_threaded to
                                           // the executor if you create it.
        this.request = request;
    }

    public run() {
        //get work and process **request**, if it work great then the solution goes elsewhere
        //otherwise, depending on the data, its either discarded or parts of are added back to the executor
        exec.submit( new process_threaded( exec, new queue_class(k, data_list) ) );

        // Can do some more work

        // Always run before returning: counter update and notify the launcher
        synchronized(jobCounter){
            jobCounter.decrementAndGet();
            jobCounter.notifyAll();
        }
    }
}

Edit:

To solve your problem of when to shut down the executor, I think the simplest solution is to have a job counter, and shutdown when it reaches 0. For thread-safety an AtomicInteger is probably the best choice. I added some code above to incorporate the change. Then your launching code would look something like this:

void theLauncher() {

    AtomicInteger jobCounter = new AtomicInteger( 0 );

    ExecutorService exec = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcesses());

    exec.submit( new process_threaded( exec, jobCounter, someProcessRequest ) );
    // Can submit some other things here of course...

    // Wait for jobs to complete:
    for(;;jobCounter.get() > 0){
        synchronized( jobCounter ){ // (I'm not sure if you have to have the synchronized block, but I think this is safer.
            if( jobCounter.get() > 0 )
                jobCounter.wait();
        }
    }

    // Now you can shutdown:
    exec.shutdown();
}
Sign up to request clarification or add additional context in comments.

7 Comments

interesting, didn't think of that(just submitting it to the ExecutorService). A few, basic, questions about that. Would it be a problem if I do that from the thread itself or does it not matter? Also is the ExecutorService safe for multithreading into it(of course I know it can distribute tasks but can it take multiple in concurrently?)? The code was given to me, sorry so I was trying to improve what I got, didn't realize the obvious easy solution..
@learningJava Yes it's thread safe to submit tasks from different threads to the same executor. (It's actually how the new ForkJoinTask stuff in Java 7 works behind the scenes). Btw you might want to look at ForkJoinTask, it might be a good fit for this task.
I'm using java6 right now..jave7 seems to have addressed all the problems I have(well programming problems, i mean). I'll test it and report back..
@learningJava It's actually not that much work to replicate what ForkJoinTask does with Java 6. You probably don't need all of the new functionality anyway, it looks like what I showed should be good enough based on what you've said.
Thanks so much, it kind of works. It works in the section I asked for(my tests show no change in results) but my overall program is having a problem. Basically I had another loop that sleep and printed the results(of the good data from the above process) to prevent it from going on forever I checked executor.isTerminated() which worked because the threads I used to create never died(they just kept looping in that local while(!queue.isEmpty()) loop). My program exits early(Because I issue a shutdown command after I sent to executor.
|
2

Don't reinvent the wheel! How about using ConcurrentLinkedQueue? From the javadocs:

An unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection.

3 Comments

I'll give it a shot..thanks. I though it was only for java7, but guess not..thanks
Make sure you create a single queue outside of your tasks that they all share. If each task creates its own queue--thread-safe or not--you'll have problems.
ConcurrentLinkedQueue is available since Java 1.5

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.