4

I have a BlockingQueue which is being populated by a thread with put(). But I am confused on how to do take() for the BlockingQueue. Currently I implemented it this way:

String link;
try {
    while(!(link = links.take()).isEmpty()) {
        System.out.println(link);
    }
} catch(InterruptedException ex) {
    ex.printStackTrace();
}

Is that right? How is it possible to loop the queue and assign the string variable if not within the conditional statement?

2 Answers 2

2

If I have understood correctly, you have asked for a way to take outside of the condition? Well, it's not that hard:

while (!links.isEmpty()) {
    try {
        String link = links.take();
        // Do stuff.
    } catch (InterruptedException e) {
        // Exception handling.
    }
}

Your current condition !(link = links.take()).isEmpty() checks if the return value—a string—is empty (length equals 0), not the queue.

Anyhow, please bear in mind that the code above is not atomic, so there's no guarantee that nothing else happens between links.isEmpty() and links.take().

EDIT: You can handle race conditions during the startup with a flag:

BlockingQueue<Integer> numbers = new ArrayBlockingQueue<>(10);
AtomicBoolean flag = new AtomicBoolean(true);

// Producer.
new Thread(() -> {
    for (int i = 0; i < 10; i++) {
        try {
            numbers.put(i);
        } catch (InterruptedException e) { /* NOP */ }
    }
    flag.set(false);
}).start();

// Consumer.
while (flag.get() || !numbers.isEmpty()) {
    try {
        System.out.println(numbers.take());
    } catch (InterruptedException e) { /* NOP */ }
}

The AtomicBoolean is not necessary here, but it might become handy if you have multiple producers and/or consumers. It's also part of java.util.concurrent which you definitely should checkout.

Sign up to request clarification or add additional context in comments.

6 Comments

a seperate question though, is it possible within the producer code to know when to stop producing after a certain number of elements have been processed by the consumer?
You can do this asynchronously. Let's say you want n items to be consumed, then you can create a ArrayBlockingQueue with a capacity of n. put only blocks if there's not enough space in the queue, so your producer is able to fire and forget.
But take from the consumer will make space, and the producer would continue putting into the queue, so it does only pauses the producer but I want it to stop completely once the consumer processes up to a number of items.
Simply wrap the invocation of put in a for loop that terminates after n iterations. This way your producer will exactly submit n items. The consumers can take independently.
Ok, back to the original question, I just tried your solution, and found out that when the while loop is executed, the producer has not put into links, thus skipping the while loop altogether. What should I do to make the while loop waits for something to be inserted into the queue?
|
0

My understanding is that you are asking on how to terminate a BlockingQueue in a good way. There are two scenarios I can think of. In any case you have a Message producer A and message consumer B.

  1. Message producer sends some form of a terminal value like "Stop". You check against it and the while cycle terminates.
  2. You can interrupt the message producer which will throw InterruptedException on the consumer side. This is the way to avoid the terminal input case. The problem here is that you dont't have control if the consumer has actually consumed everything that is on the queue. So interruption is usually used if there is a condition that requires the consuming to terminate imediately.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.