1

I am confused with concurrency - i am trying to stop the consumer thread from running if the producer is shutdown but am having issues if the consumer is blocked on take(). I have tried adding a posion pill, interruptung the current thread, using a boolean flag and still to no avail.

Please can someone help advise where I am going wrong. Thanks.

public class TestPoisonPill implements Runnable {
    private BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1);
    private volatile boolean stopped = false;

    public void addToQueue(String event) throws InterruptedException{
        System.out.println("in add to queue");
        if(event != null){
            try {
                queue.put(event);
            } catch (InterruptedException e) {
                stopped = true;
                queue.put("Poison");
                System.out.println("Unable to add the event to the queue, order routing processing is stopped");
                throw e;
            }
        }
    }

    @Override
    public void run() {
        while(!stopped){
            try {
                if(queue.size() > 0){
                    String string = queue.take();
                    System.out.println("taken " + string + "from the queue");
                }else{
                    continue;
                }
            }
            catch (InterruptedException e) {
                stopped = true;
            }
        }
    }

    public boolean isStopped(){
        return stopped;
    }

    protected BlockingQueue<String> getQueue() {
        return queue;
    }

    protected void setBoolean(boolean b){
        this.stopped = b;
    }

    public static void main(String[] args) throws InterruptedException{
        ExecutorService exec = Executors.newSingleThreadExecutor();
        final TestPoisonPill t = new TestPoisonPill();
        exec.execute(t);
        ExecutorService exec2 = Executors.newSingleThreadExecutor();
        Runnable addTask = new Runnable() {
            public void run() {
                while (true) {
                    try {
                        t.addToQueue("hi");
                        Thread.sleep(100);
                    } catch (InterruptedException ex) {
                        System.out.println("add task interrupted ");
                        t.setBoolean(true);
                        break;
                    }
                }
            }
        };
        exec2.execute(addTask);
        Thread.sleep(1000);
        exec2.shutdownNow();
    }
}

2 Answers 2

3

am confused with concurrency - i am trying to stop the consumer thread from running if the producer is shutdown but am having issues if the consumer is blocked on take()

If you problem is that you program is not stopping, I think you are missing an exec.shutdownNow() on your first ExecutorService. This will interrupt your first thread, if you change your loop to be something like:

while (!stopped && !Thread.currentThread().isInterrupted()) {

Without the interrupt flag check any interrupt will be not been seen by the thread. An interrupt is just a flag that is set on the thread. Certain methods (like Thread.sleep(...) and BlockingQueue.take()) throw InterruptedException when a thread is interrupted but your consumer is spinning and never calling take().

Really, the spin loop in the consumer is an extremely bad pattern. It should just call queue.take() and then either use the interrupt or have your producer actually submit a poisoned pill. Something like:

while (!Thread.currentThread().isInterrupted()) {
    String string;
    try {
         string = queue.take();
    } catch (InterruptedException e) {
         break;
    }
    // here is where you could check for a poison pill
    // something like: if (string == STOP_PILL) break;
    System.out.println("taken " + string + "from the queue");
}

You don't really need the stopped flag if you are using interrupt appropriately.

You mention having tried a "poisoned pill". For others, a poisoned pill is when you put a specific "special" object on the queue which the consumer uses to know when to shutdown. Something like the following should work:

private static final String STOP_PILL = "__STOP_PLEASE!!__";
...
// the consumer removes from the queue
String string = queue.take();
// it tests to see if it a pill, == is better than .equals here
if (string == STOP_PILL) {
   // the consumer should stop
   break;
}
...
// to stop the consumer, the producer puts the pill into the queue
queue.put(STOP_PILL);

Lastly, you are using 2 ExecutorService instances when you could easily use one. I guess the point here is to interrupt only one of them but FYI. You can use a single Executors.newCachedThreadPool() which will create the number of threads you need.

Sign up to request clarification or add additional context in comments.

2 Comments

hi i was using a poisoned pill which i removed as it didnt work! what i dont understand is - why should both executors need shutting down, because surely the run method should exit?
Sorry @Biscuit128, I didn't see your question. You have to shutdown the executor service after you've submitted your last task to it. Otherwise it waits forever to see if you want to submit more tasks. shutdown() only stops new tasks from being run -- it allows existing tasks to run till finish. shutdownNow() removes any waiting tasks and interrupts the running ones.
0

You never shutdown your exec executor, only exec2, so the thread running your TestPoisonPill never gets interrupted.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.