5

I am not well-versed in Multi-Threading. I am trying to take screenshot repeatedly by one producer thread, which adds the BufferedImage object to ConcurrentLinkedQueue and a Consumer Thread will poll queue for BufferedImage object to saving them in file. I could consume them by repeated polling(while loop), but I don't know how to consume them using notify() and wait(). I have tried using wait() and notify in smaller programs, but couldn't implement it here.

I have the following code:

class StartPeriodicTask implements Runnable {
    public synchronized void run() {
        Robot robot = null;
        try {
            robot = new Robot();
        } catch (AWTException e1) {
            e1.printStackTrace();
        }
        Rectangle screenRect = new Rectangle(Toolkit.getDefaultToolkit()
                .getScreenSize());
        BufferedImage image = robot.createScreenCapture(screenRect);
        if(null!=queue.peek()){
            try {
                System.out.println("Empty queue, so waiting....");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            queue.add(image);
            notify();
        }
    }
}

public class ImageConsumer implements Runnable {
        @Override
        public synchronized void run() {
            while (true) {
                BufferedImage bufferedImage = null;
                if(null==queue.peek()){
                    try {
                        //Empty queue, so waiting....
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }else{
                    bufferedImage = queue.poll();
                    notify();
                }
                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                    imageFile.getParentFile().mkdirs();
                }
                    try {
                        ImageIO.write(bufferedImage, extension, imageFile);
                        //Image saved
                    catch (IOException e) {
                        tracer.severe("IOException occurred. Image is not saved to file!");
                    }
                }
            }

Previously I had a repeated polling to check for existence of BufferedImage Object. Now I have changed run method as synchronised and tried to implement wait() and notify(). Am I doing correct? Please help. Thanks.

3 Answers 3

6

You are using the wrong Queue for the job. The ConcurrentLinkedQueue is a non-blocking Queue which means that there is no producer consumer semantics. If you are just doing one reader and one writer take a look at SynchronousQueue

Simply put your code can be re-written as such

BlockingQueue<?> queue = new SynchrnousQueue<?>();
class StartPeriodicTask implements Runnable {
    public void run() {
        Robot robot = null;
        try {
            robot = new Robot();
        } catch (AWTException e1) {
            e1.printStackTrace();
        }
        Rectangle screenRect = new Rectangle(Toolkit.getDefaultToolkit()
                .getScreenSize());
        BufferedImage image = robot.createScreenCapture(screenRect);
        queue.offer(image); //1
}
public class ImageConsumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                BufferedImage bufferedImage = queue.poll(); //2

                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                    imageFile.getParentFile().mkdirs();
                }
                    try {
                        ImageIO.write(bufferedImage, extension, imageFile);
                        //Image saved
                    catch (IOException e) {
                        tracer.severe("IOException occurred. Image is not saved to file!");
                    }
            }

That's really it.

Let me explain. At line //1 the producing thread will 'place' the image on the queue. I quotes place because a SynchrnousQueue has no depth. What actually happens is the thread tells the queue "If there are any threads asking for an element from this queue then give it the that thread and let me continue. If not I'll wait until another thread is ready"

Line //2 is similar to 1 where the consuming thread just waits until a thread is offering. This works great with a single-reader single-writer

Sign up to request clarification or add additional context in comments.

5 Comments

What do you mean by single-reader and single-writer? What if we have multiple threads offering and multiple threads polling? That should be fine right?
@Ahamed Yes, though I meant it was a 1-1 relationship. For each thread putting onto a queue you need a corresponding thread polling from the queue. Threads cannot place on a SynchrnousQueue and continue on, there has to be a thread asking for an element - at that point the putting thread can continue.
But that also means you can have 100 writing threads and 1 reading thread, that is fine and it will work. You will end up having many idle writing threads on the queue waiting for that 1 reading thread to poll.
@JohnVint "100 writing threads and 1 reading thread" so performance wise it will be like running 1 thread at a time...since there is no asynchronous processing in your solution...they all blocking each other...one might ask why bother with the Q at all in this case, why not to combine ImageConsumer with StartPeriodicTask? I think Ahamed wanted asynchronous processing.
When I said it will work I meant from a logical/functional perspective. If you have 100 writing threads and 100 reading threads it too will work asynchronously. The Queue here is useful because it allows each thread to not continue until another thread has processed the item.
5

The first problem is the unnecessary wait that you have in your producer:

    if(null!=queue.peek()){ // You are the producer, you don't care if the queue is empty
        try {
            System.out.println("Empty queue, so waiting....");
            wait(); // This puts you to bed, your waiting and so is your consumer
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }else{
        queue.add(image);
        notify();
    }

This is all you should need:

        queue.add(image);
        notify();

The next issue is the unnecessary notify in your consumer. It yields control of its processing at that point, which I believe you intended as a way to get your producer going, but of course your code never gets to that point. So this:

            }else{
                bufferedImage = queue.poll();
                notify();
            }
            File imageFile = getFile();
            if (!imageFile.getParentFile().exists()) {
                imageFile.getParentFile().mkdirs();
            }
                try {
                    ImageIO.write(bufferedImage, extension, imageFile);
                    //Image saved
                catch (IOException e) {
                    tracer.severe("IOException occurred. Image is not saved to file!");
                }
            }

Should look more like this:

            }else{
                bufferedImage = queue.poll();

                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                   imageFile.getParentFile().mkdirs();
                }

                try {
                    ImageIO.write(bufferedImage, extension, imageFile);
                    //Image saved
                catch (IOException e) {
                    tracer.severe("IOException occurred. Image is not saved to file!");
                }
            }

Comments

4

Once the java.util.concurrent library came into the JDK1.5, the need to write your own wait/notify logic went right out the door. In 2012, if you are doing your own wait/notify, you are working too hard and should strongly consider the tried and true java.util.concurrent equivalents.

That being said, I believe polling is the idea behind the built in java.util.concurrent.ConcurrentLinkedQueue. In other words, the consumers sit in their own Thread and .poll() items from the ConcurrentLinkedQue as long as it is !isEmpty(). Most implementations that I've seen throw some sort of a one second sleep between tests of the !isEmpty(), but I don't think that is actually necessary. Also, pay note to the Vint guy's comment on my answer, .poll() may return null. Consider alternative implementations of java.util.AbstractQueue that may have blocking behavior closer to what you are looking for.

This guy's got a simple example: http://www.informit.com/articles/article.aspx?p=1339471&seqNum=4

Finally, get the Goetz book "Java Concurrency In Practice" and read it. I'm almost sure it has a recipe for what to use to replace your own home-grown wait/notifys.

2 Comments

@Bob Kuhar 'In other words, the consumers sit in their own Thread and .remove items from the ConcurrentLinkedQue as long as it is !isEmpty()' That isn't true. It is true for ArrayBlockingQueue and LinkedBlockingQueue, a ConcurrentLinkedQueue.poll() will return null if isEmpty()
@JohnVint thanks for pointing that out. I changed my wording as it wasn't the "poll" method I was talking about it was the concept of polling the queue with repeated tests of isEmpty().

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.