5

I have an array of Objects which I stream to use a function on each object to modify it and add it to a completely new array of Objects.

By using .parallel() the execution time speeds up 2 times, but I'm getting some NullPointerExceptions when looping over the new array.

I tried to debug it a few times without any success. It seems that this problem only occurs during the runtime. I tried implementing a synchronized function for adding the new objects to the list, but sadly it didn't work as well.

Can anyone give me a suggestion how to get this working? Thanks in advance!

Here is the code snippet:

private static final Object sync = new Object();
private ArrayList<Object> newList = new ArrayList<Object>();      

private void addNewObject(Object newObject) {
        synchronized (sync) {
            newList.add(newObject);
        }
    }

private Object mutateObject(Object oldObject) {
    // Do something with the object here
    return mutatedObject;
}

public ArrayList<Object> createNewList(ArrayList<Object> oldList) {
    oldList.stream().parallel().forEach(object -> addNewObject(mutateObject(object)));
    return newList;
}
3
  • Why do you add an parameter to the createNewList function without using it? Commented Apr 7, 2018 at 14:35
  • Fixed the example. Commented Apr 7, 2018 at 14:40
  • Consider using the built-in concurrent collections so that you don't have to write your own concurrency code. Also, use interfaces like List<Object> for your public interface instead of implementations like ArrayList Commented Apr 7, 2018 at 16:53

1 Answer 1

5

Consider refactoring createNewList method to something like this:

public List<Object> createNewList(List<Object> oldList) {
    return oldList.parallelStream()
            .map(this::mutateObject)
            .collect(Collectors.toList());
}

If you look up into the java.util.stream.Stream JavaDoc you can see that there is a few terminal operations, like forEach, collect, toArray, etc. In your question it's better to use the collect terminal operation because it handles synchronization internally, avoiding race condition situations, without any further implementation of synchronization.

Mapping objects with .map() (an intermediate operation of Stream) and collecting result to a final list in parallel solves this problem.

The addNewObject function and sync object should be removed, because it's not used anymore.

Sign up to request clarification or add additional context in comments.

4 Comments

You can replace object -> mutateObject(object) with this::mutateObject.
Thank you for the fast response! I'll try it. What can I do if I would like to add a object multiple times to the new list?
@Robert You can always return a list from .map() (e.g. .map(object -> Arrays.asList(mutateObject(object), mutateObject(object), ....)) operation and do.flatMap(Collection::stream) on the stream to transform list of lists to a single list.
@SzymonStepniak there’s no sense in mapping to a List, just to do .flatMap(Collection::stream) afterwards. Just do .flatMap(object -> Stream.of(mutateObject(object), mutateObject(object), ....)) in the first place…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.