1

I am trying to apply get faster output through threads. Just doing a small POC sort.
Suppose I have a problem statement to find all the the numbers in an array who have odd occurrence. Following is my attempt for both sequentially and parallel.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class Test1 {

    final static Map<Integer, Integer> mymap  = new HashMap<>();

    static Map<Integer, AtomicInteger> mymap1 = new ConcurrentHashMap<>();

    public static void generateData(final int[] arr) {
        final Random aRandom = new Random();
        for (int i = 0; i < arr.length; i++) {
            arr[i] = aRandom.nextInt(10);
        }
    }

    public static void calculateAllOddOccurrence(final int[] arr) {

        for (int i = 0; i < arr.length; i++) {
            if (mymap.containsKey(arr[i])) {
                mymap.put(arr[i], mymap.get(arr[i]) + 1);
            } else {
                mymap.put(arr[i], 1);
            }
        }

        for (final Map.Entry<Integer, Integer> entry : mymap.entrySet()) {
            if (entry.getValue() % 2 != 0) {
                System.out.println(entry.getKey() + "=" + entry.getValue());
            }

        }

    }

    public static void calculateAllOddOccurrenceThread(final int[] arr) {

        final ExecutorService executor = Executors.newFixedThreadPool(10);
        final List<Future<?>> results = new ArrayList<>();
        ;
        final int range = arr.length / 10;
        for (int count = 0; count < 10; ++count) {
            final int startAt = count * range;
            final int endAt = startAt + range;
            executor.submit(() -> {
                for (int i = startAt; i < endAt; i++) {
                    if (mymap1.containsKey(arr[i])) {
                        final AtomicInteger accumulator = mymap1.get(arr[i]);
                        accumulator.incrementAndGet();
                        mymap1.put(arr[i], accumulator);
                    } else {
                        mymap1.put(arr[i], new AtomicInteger(1));
                    }
                }
            });
        }

        awaitTerminationAfterShutdown(executor);

        for (final Entry<Integer, AtomicInteger> entry : mymap1.entrySet()) {
            if (entry.getValue().get() % 2 != 0) {
                System.out.println(entry.getKey() + "=" + entry.getValue());
            }

        }

    }

    public static void calculateAllOddOccurrenceStream(final int[] arr) {

        final ConcurrentMap<Integer, List<Integer>> map2 = Arrays.stream(arr).parallel().boxed().collect(Collectors.groupingByConcurrent(i -> i));
        map2.entrySet().stream().parallel().filter(e -> e.getValue().size() % 2 != 0).forEach(entry -> System.out.println(entry.getKey() + "=" + entry.getValue().size()));

    }

    public static void awaitTerminationAfterShutdown(final ExecutorService threadPool) {
        threadPool.shutdown();
        try {
            if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                threadPool.shutdownNow();
            }
        } catch (final InterruptedException ex) {
            threadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public static void main(final String... doYourBest) {

        final int[] arr = new int[200000000];

        generateData(arr);
        long starttime = System.currentTimeMillis();
        calculateAllOddOccurrence(arr);

        System.out.println("Total time=" + (System.currentTimeMillis() - starttime));

        starttime = System.currentTimeMillis();
        calculateAllOddOccurrenceStream(arr);

        System.out.println("Total time Thread=" + (System.currentTimeMillis() - starttime));

    }

}

Output:

1=20003685
2=20000961
3=19991311
5=20006433
7=19995737
8=19999463
Total time=3418
5=20006433
7=19995737
1=20003685
8=19999463
2=20000961
3=19991311
Total time Thread=19640

Parallel execution (calculateAllOddOccurrenceStream ) is taking more time. What is the best way to process an array in parallel and then merge the result?

My goal is not to find the fastest algorithm, but to use any algorithm and try to run on in different threads such that they are processing different part of array simultaneously.

3
  • Have you considered the Stream API in Java 8+? Commented May 25, 2018 at 9:56
  • Yes sure, not able to solve this via it. Can you help? Commented May 25, 2018 at 9:58
  • If you show me your existing Streams API based code, I'll help you out. But as a hint to where you're going wrong, you're parallely overwriting the values in map1 within your loop. Using ConcurrentHashMap doesn't change the fact that "last put wins". Commented May 25, 2018 at 10:07

3 Answers 3

4

It seems that those threads are working on same parts of the array simultaneously hence the answer is not coming correctly.

Rather divide the array in parts with proper start and end indexes. Allocate separate threads to process these parts and count the occurences of each number in each of those parts.

At the end, you would have multiple maps having counts calculated from those separate parts. Merge those maps to get the final answer.

OR you could have a single concurrentHashMap for storing the counts coming from all those threads, but a bug could creep in there I guess as there would still be concurrent write conflicts. In a highly multi-threaded environment, writes on a cocnurrentHashMap might not be 100% safe. For a guaranteed write behaviour, the correct way is to use the the atomicity of ConcurrentHashMap.putIfAbsent(K key, V value) method and pay attention to the return value, which tells if the put operation was successful or not. Simple put might not be correct. See https://stackoverflow.com/a/14947844/945214

You could use java 8 streams API (https://www.journaldev.com/2774/java-8-stream) to write the code OR simple threading code using Java 5 constructs would also do.

Added Java8 stream code, Notice the timing differences. ArrayList (instead) of an array makes a difference:

package com.test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Collectors;

public class Test {

    public static void generateData(final int[] arr) {
        final Random aRandom = new Random();
        for (int i = 0; i < arr.length; i++) {
            arr[i] = aRandom.nextInt(10);
        }
    }

    public static void calculateAllOddOccurrence(final int[] arr) {
        final Map<Integer, Integer> mymap  = new HashMap<>();
        for (int i = 0; i < arr.length; i++) {
            if (mymap.containsKey(arr[i])) {
                mymap.put(arr[i], mymap.get(arr[i]) + 1);
            } else {
                mymap.put(arr[i], 1);
            }
        }
        for (final Map.Entry<Integer, Integer> entry : mymap.entrySet()) {
            if (entry.getValue() % 2 != 0) {
                System.out.println(entry.getKey() + "=" + entry.getValue());
            }

        }
    }

    public static void calculateAllOddOccurrenceStream( int[] arr) {
        Arrays.stream(arr).boxed().collect(Collectors.groupingBy(Function.identity(), Collectors.counting())).entrySet().parallelStream().filter(e -> e.getValue() % 2 != 0).forEach(entry -> System.out.println(entry.getKey()+"="+ entry.getValue()));
    }

    public static void calculateAllOddOccurrenceStream(List<Integer> list) {
        list.parallelStream().collect(Collectors.groupingBy(Function.identity(), Collectors.counting())).entrySet().parallelStream().filter(e -> e.getValue() % 2 != 0).forEach(entry -> System.out.println(entry.getKey()+"="+ entry.getValue()));
    }

    public static void main(final String... doYourBest) {

        final int[] arr = new int[200000000];

        generateData(arr);
        long starttime = System.currentTimeMillis();
        calculateAllOddOccurrence(arr);
        System.out.println("Total time with simple map=" + (System.currentTimeMillis() - starttime));

        List<Integer> list = Arrays.stream(arr).boxed().collect(Collectors.toList());
        starttime = System.currentTimeMillis();
        calculateAllOddOccurrenceStream(list);
        System.out.println("Total time stream - with a readymade list, which might be the case for most apps as arraylist is more easier to work with =" + (System.currentTimeMillis() - starttime));

        starttime = System.currentTimeMillis();
        calculateAllOddOccurrenceStream(arr);
        System.out.println("Total time Stream with array=" + (System.currentTimeMillis() - starttime));

    }}

OUTPUT


0=19999427
2=20001707
4=20002331
5=20001585
7=20001859
8=19993989
Total time with simple map=2813
4=20002331
0=19999427
2=20001707
7=20001859
8=19993989
5=20001585
Total time stream - with a readymade list, which might be the case for most apps as arraylist is more easier to work with = 3328
8=19993989
7=20001859
0=19999427
4=20002331
2=20001707
5=20001585
Total time Stream with array=6115
Sign up to request clarification or add additional context in comments.

15 Comments

"but a bug could creep in there I guess as there would be still concurrent write conflicts." - what? The point of ConcurrentHashMap is that it is thread-safe and can be written to by multiple threads at the same time.
In a highly multi-threaded environment, writes on a cocnurrentHashMap might not be 100% safe. For a guaranteed write behaviour, the correct way is to use the the atomicity of ConcurrentHashMap.putIfAbsent(K key, V value) method and pay attention to the return value, which tells if the put operation was successful or not. Simple put might not be correct. See stackoverflow.com/a/14947844/945214
The 'bug' is that a sequence of "get then put" is never atomic. It always requires external synchronization.
Ok, good point. Would to nice to update the answer to be clearer on that.
I have updated my answer, please upvote and accept if it helped.
|
1

You are looking at the STREAMS API introduced in Java 8: http://www.baeldung.com/java-8-streams

Example:

// sequential processes
myArray.stream().filter( ... ).map( ... ).collect(Collectors.toList()):

// parallel processes
myArray.parallelStream().filter( ... ).map( ... ).collect(Collectors.toList());

1 Comment

How do I write a good answer?: Provide context for links - Links to external resources are encouraged, but please add context around the link so your fellow users will have some idea what it is and why it’s there. Always quote the most relevant part of an important link, in case the target site is unreachable or goes permanently offline.
1

Looking at your code, you're going wrong with this line:

mymap1.put(arr[i], mymap1.get(arr[i]) + 1);

You are overwriting the values in parallel, for example:

Thread 1 'get' = 0
Thread 2 'get' = 0
Thread 1 'put 1' 
Thread 2 'put 1'

Change your map to:

static Map<Integer, AtomicInteger>       mymap1 = new ConcurrentHashMap<>();
static {
    //initialize to avoid null values and non-synchronized puts from different Threads
    for(int i=0;i<10;i++) {
        mymap1.put(i, new AtomicInteger());
    }
}
....
    //in your loop
    for (int i = 0; i < arr.length; i++) {
        AtomicInteger accumulator = mymap1.get(arr[i]);
        accumulator.incrementAndGet();
    }

Edit: The problem with the above approach is of course the initialization of mymap1. To avoid falling into the same trap (creating AtomicInteger within the loop and overwriting each other yet again), it needs to be prefilled with values.

Since I'm feeling generous, here's what might work with the Streams API:

int totalEvenCount = Arrays.stream(arr).parallel().filter(i->i%2==0).reduce(0, Integer::sum);
int totalOddCount = Arrays.stream(arr).parallel().filter(i->i%2!=0).reduce(0, Integer::sum);

//or this to count by individual numbers:
ConcurrentMap<Integer,List<Integer>> map1 = Arrays.stream(arr).parallel().boxed().collect(Collectors.groupingByConcurrent(i->i));
map1.entrySet().stream().filter(e -> e.getKey()%2!=0).forEach(entry -> System.out.println(entry.getKey() + "=" + entry.getValue().size()));

As an exercise to the reader, perhaps you can look into how the various Collectors work, in order to write your own countingBy(i->i%2!=0) to output a map only containing the counts instead of a list of values.

4 Comments

Thanks for the bug.. Have edited the program and now both sequential and parallel outputs are same. However, parallel execution is taking 3 times more time. :(
Have added some Streams API code for you to try and get inspiration from.
Thanks for the answer. I understand your approach now. Its cleaner and functional. However, its slower than the imperative way. Do you know y?
@Dhananjay As a guess, most likely due to the overhead associated with Lamndas. Lamndas are objects in their own right. The functional loop can easily be optimised by the compiler at compile time, but the Lamnda object creation/invocation cycle can't as readily. It depends on how you do the computation as well - note the use of boxed() in my code, which converts all the ints into Integer - that is a lot of overhead! You could create the array as Integer[] and see how that performs in your tests, as comparing int to Integer is simply not fair.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.