1

This is the first time I have asked a question on StackOverflow. The problem I have is the following:

I have a Producer and Consumer class. In the Producer class I read a file line by line and put these lines of text in a List of Strings. When the list has an x amount of lines. This list gets added to an ArrayBlockingQueue. I have one Producer Thread that is started within the main thread. Besides this I start a couple of Consumer threads. The Consumer thread takes an item from the queue, which should be a list, and walks through this list of lines looking for a particular word. When the word is found it increments a count variable.

What happends is that when the Consumer takes an item from the queue, it says that it is empty. I cannot figure out why because my producer should certainly add it to the queue.

My code looks like this:

Consumer class:

public static class Consumer implements Callable<Integer> {

    int count = 0;

    @Override
    public Integer call() throws Exception {
        List<String> list = new ArrayList<>();
        list = arrayBlockingQueueInput.take();
        do {
            if (!list.isEmpty()){
                for (int i = 0; i < arrayBlockingQueueInput.take().size(); i++) {
                    for (String element : list.get(i).split(" ")) {
                        if (element.equalsIgnoreCase(findWord)) {
                            count++;
                        }
                    }
                }
            } else {
                arrayBlockingQueueInput.put(list);
            }
        } while (list.get(0) != "HALT");
        return count;
    }
}

Producer Class:

public static class Producer implements Runnable {

    @Override
    public void run() {
        try {
            FileReader file = new FileReader("src/testText.txt");
            BufferedReader br = new BufferedReader(file);

            while ((textLine = br.readLine()) != null) {

                if (textLine.isEmpty()) {
                    continue;
                }

                /* Remove punctuation from the text, except of punctuation that is useful for certain words.
                * Examples of these words are don't or re-enter */
                textLine = textLine.replaceAll("[[\\W]&&[^']&&[^-]]", " ");

                /* Replace all double whitespaces with single whitespaces.
                * We will split the text on these whitespaces later */
                textLine = textLine.replaceAll("\\s\\s+", " ");

                textLine = textLine.replaceAll("\\n", "").replaceAll("\\r", "");

                if (results.isEmpty()) {
                    results.add(textLine);
                    continue;
                }
                if (results.size() <= SIZE) {
                    results.add(textLine);
                    if (results.size() == SIZE) {
                        if (arrayBlockingQueueInput.size() == 14){
                            List<String> list = new ArrayList<String>();
                            list.add(HALT);
                            arrayBlockingQueueInput.put(list);
                        } else{
                            arrayBlockingQueueInput.put(results);
                            results.clear();
                        }
                    }
                }
            }
            /* Count the remaining words in the list
             *  (last lines of the file do perhaps not fill up until the given SIZE, therefore need to be counted here)
             *  Fill the list with empty items if the size of the list does not match with the given SIZE */
            while (results.size() != SIZE) {
                results.add("");
            }
            arrayBlockingQueueInput.put(results);
            List<String> list = new ArrayList<String>();
            list.add(HALT);
            arrayBlockingQueueInput.put(list);
            results.clear();
        } catch (InterruptedException e) {
            producerIsRunning = false;
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Main Class:

public void main() throws IOException, InterruptedException {
    System.out.println("Enter the word you want to find: ");
    Scanner scan = new Scanner(System.in);
    findWord = scan.nextLine();

    System.out.println("Starting...");
    long startTime = System.currentTimeMillis();
    Thread producer = new Thread(new Producer());
    producer.start();
    ExecutorService executorService = Executors.newFixedThreadPool(CORE);

    List<Future<Integer>> futureResults = new ArrayList<Future<Integer>>();

    for (int i = 0; i < CORE; i++) {
        futureResults.add(executorService.submit(new Consumer()));
    }

    executorService.shutdown();

    for (Future<Integer> result : futureResults) {
        try {
            wordsInText += result.get();
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    producer.join();

    long stopTime = System.currentTimeMillis();

    System.out.println("The word " + findWord + " appears " + wordsInText + " times in the given text");

    System.out.println("Elapsed time was " + (stopTime - startTime) + " milliseconds.");
}

Can anybody explain why this happends? I would like to add that we try to use a poison pill to notify the consumers that the producer is on HALT.

To answer the question why we want to do this like this? For school we try to parallelize a certain programming problem. The problem we chose is string matching. We first made a serial solution and a parallel solution. For the next assignment we have to improve our parallel solution and our teacher told us that this is a way to do it.

Thanks in advance!

Nick

2 Answers 2

3

You add list to a queue and the clear it:

arrayBlockingQueueInput.put(results);
results.clear();

You need to do something like this to add copy of list to a queue so that clear() will not clean list which is in queue:

arrayBlockingQueueInput.put(new ArrayList<String>(results));
results.clear();
Sign up to request clarification or add additional context in comments.

7 Comments

Thanks for your response! I have tried this, and it still shows the queue is empty. When I print the arrayBlockingQueueInput.take() method it shows four empty brackets (for the four consumers I started I suppose). My program also keeps running.
@NickSchokker you need to do the same thing after your while loop in Producer in case your file has less then SIZE records. Have you debugged your Producer to be sure what data results has before being added to the queue?
I did add it after that while loop aswell. Also I have debugged results and it shows the data that should be in the list. I have also looked in the queue right after the put in my producer. Which show the same contents. I cannot get my head around this.
Why not just create a new 'results' list instance after queueing up the old one?
Did not help @MartinJames
|
0

After some help from my teacher he helped us find the problem. There were two mistakes. One was within the producer class. I had code to signal a HALT of the producer within the main while loop. This should not have been done.

Besides this, the .take() I do within the Consumer class before the Do-While should have been done within the do-while loop.

The correct code looks like this:

Consumer class:

public static class Consumer implements Callable<Integer> {

    int count = 0;

    @Override
    public Integer call() throws Exception {
        List<String> list = new ArrayList<>();
        do {
            list = arrayBlockingQueueInput.take();
            if (!list.get(0).equals(HALT)){
                for (int i = 0; i < list.size(); i++) {
                    for (String element : list.get(i).split(" ")) {
                        if (element.equalsIgnoreCase(findWord)) {
                            count++;
                        }
                    }
                }
            } else {
                arrayBlockingQueueInput.put(list);
            }
        } while (!list.get(0).equals(HALT));
        return count;
    }
}

Producer class:

public static class Producer implements Runnable {

    @Override
    public void run() {
        try {
            FileReader file = new FileReader("src/testText.txt");
            BufferedReader br = new BufferedReader(file);

            while ((textLine = br.readLine()) != null) {

                if (textLine.isEmpty()) {
                    continue;
                }

                /* Remove punctuation from the text, except of punctuation that is useful for certain words.
                * Examples of these words are don't or re-enter */
                textLine = textLine.replaceAll("[[\\W]&&[^']&&[^-]]", " ");

                /* Replace all double whitespaces with single whitespaces.
                * We will split the text on these whitespaces later */
                textLine = textLine.replaceAll("\\s\\s+", " ");

                textLine = textLine.replaceAll("\\n", "").replaceAll("\\r", "");

                if (results.isEmpty()) {
                    results.add(textLine);
                    continue;
                }
                if (results.size() <= SIZE) {
                    results.add(textLine);
                    if (results.size() == SIZE) {
                        arrayBlockingQueueInput.put(new ArrayList<String>(results));
                        results.clear();
                    }
                }
            }
            /* Count the remaining words in the list
             *  (last lines of the file do perhaps not fill up until the given SIZE, therefore need to be counted here)
             *  Fill the list with empty items if the size of the list does not match with the given SIZE */
            while (results.size() != SIZE) {
                results.add("");
            }
            arrayBlockingQueueInput.put(new ArrayList<String>(results));
            List<String> list = new ArrayList<String>();
            list.add(HALT);
            arrayBlockingQueueInput.put(list);
            results.clear();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Main class:

public void main() throws IOException, InterruptedException {
    System.out.println("Enter the word you want to find: ");
    Scanner scan = new Scanner(System.in);
    findWord = scan.nextLine();

    System.out.println("Starting...");
    long startTime = System.currentTimeMillis();
    Thread producer = new Thread(new Producer());
    producer.start();
    ExecutorService executorService = Executors.newFixedThreadPool(CORE);

    List<Future<Integer>> futureResults = new ArrayList<Future<Integer>>();

    for (int i = 0; i < CORE; i++) {
        futureResults.add(executorService.submit(new Consumer()));
    }

    executorService.shutdown();

    for (Future<Integer> result : futureResults) {
        try {
            wordsInText += result.get();
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    producer.join();

    long stopTime = System.currentTimeMillis();

    System.out.println("The word " + findWord + " appears " + wordsInText + " times in the given text");

    System.out.println("Elapsed time was " + (stopTime - startTime) + " milliseconds.");
}

Thanks to @Ivan for helping me with the .clear method called on results. Without this the code solution did not work.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.