3

How can I add objects from one stream to two different lists simultaneously

Currently I am doing

body.getSurroundings().parallelStream()
                .filter(o -> o.getClass().equals(ResourcePoint.class))
                .map(o -> (ResourcePoint)o)
                .filter(o -> !resourceMemory.contains(o))
                .forEach(resourceMemory::add);

to add objects from my stream into a linkedlist "resourceMemory", but I also want to add the same objects to another list simultaneously, but I can't find the syntax for it. Is it possible or do I need to have two copies of this code for each list?

3
  • 2
    You can use peek() or a custom collector. Commented Apr 21, 2016 at 9:03
  • 2
    Do you mean o instanceof ResourcePoint or o.getClass() == ResourcePoint.class? Decide for one, but don’t use o.getClass().equals(ResourcePoint.class) that’s obfuscating the actual intention. Further your code is broken in several ways when using a parallel stream. Please read docs.oracle.com/javase/8/docs/api/?java/util/stream/… carefully. Commented Apr 21, 2016 at 9:51
  • 1
    The strict answer to the question is .forEach(o -> { resourceMemory.add(o); myOtherList.add(o); }). But please note Holger's comment. You need to tell more about what you really want to achieve in order to fix this code. Commented Apr 21, 2016 at 10:33

2 Answers 2

3

There are several fundamental errors you should understand first, before trying to expand your code.

First of all, forEach does not guaranty a particular order of element processing, so it’s likely the wrong tool for adding to a List, even for sequential streams, however, it is completely wrong to use with a parallel stream to add to a collection like LinkedList which is not thread safe, as the action will be performed concurrently.

But even if resourceMemory was a thread safe collection, your code still was broken as there is an interference between your filter condition and the terminal action. .filter(o -> !resourceMemory.contains(o)) queries the same list which you are modifying in the terminal action and it shouldn’t be hard to understand how this can brake even with thread-safe collections:

Two or more threads may process the filter and find that the element is not contained in the list, then all of them will add the element, contradicting your obvious intention of not having duplicates.

You could resort to forEachOrdered which will perform the action in order and non-concurrently:

body.getSurroundings().parallelStream()
    .filter(o -> o instanceof ResourcePoint)
    .map(o -> (ResourcePoint)o)
    .forEachOrdered(o -> {// not recommended, just for explanation
        if(!resourceMemory.contains(o))
            resourceMemory.add(o);
    });

This will work and it’s obvious how you could add to another list within that action, but it’s far away from recommended coding style. Also, the fact that this terminal action synchronizes with all processing threads will destroy any potential benefit of parallel processing, especially as the most expensive operation of this stream pipeline is invoking contains on a LinkedList which will (must) happen single-threaded.

The correct way to collect stream elements into a list is via, as the name suggests, collect:

List<ResourcePoint> resourceMemory
    =body.getSurroundings().parallelStream()
        .filter(o -> o instanceof ResourcePoint)
        .map(o -> (ResourcePoint)o)
        .distinct()                    // no duplicates
        .collect(Collectors.toList()); // collect into a list

This doesn’t return a LinkedList, but you should rethink carefully whether you really need a LinkedList. In 99% of all cases, you don’t. If you really need a LinkedList, you can replace Collectors.toList() with Collectors.toCollection(LinkedList::new).

Now if you really must add to an existing list created outside of your control, which might already contain elements, you should consider the fact mentioned above, that you have to ensure single-threaded access to a non-thread-safe list anyway, so there’s no benefit from doing it from within the parallel stream at all. In most cases, it’s more efficient to let the stream work independently from that list and add the result in a single threaded step afterwards:

Set<ResourcePoint> newElements=
    body.getSurroundings().parallelStream()
        .filter(o -> o instanceof ResourcePoint)
        .map(o -> (ResourcePoint)o)
        .collect(Collectors.toCollection(LinkedHashSet::new));
newElements.removeAll(resourceMemory);
resourceMemory.addAll(newElements);

Here, we collect into a LinkedHashSet which implies maintenance of the encounter order and sorting out duplicates within the new elements, then use removeAll on the new elements to remove existing elements of the target list (here we benefit from the hash set nature of the temporary collection), finally, the new elements are added to the target list, which, as explained, must happen single-threaded anyway for a target collection which isn’t thread safe.

It’s easy to add the newElements to another target collection with this solution, much easier than writing a custom collector for producing two lists during the stream processing. But note that the stream operations as written above are way too cheep to assume any benefit from parallel processing. You would need a very large number of elements to compensate the initial multi-threading overhead. It’s even possible that there is no number for which it ever pays off.

Sign up to request clarification or add additional context in comments.

Comments

1

Instead of

.forEach(resourceMemory::add)

You could invoke

.forEach(o -> {
   resourceMemory.add(o);
   otherResource.add(o);
 })

or put the add operations in a separate method so you could provide a method reference

.forEach(this::add)

void add(ResourcePoint p) {
   resourceMemory.add(o);
   otherResource.add(o);
}

But bear in mind, that the order of insertion maybe different with each run as you use a parallel stream.

3 Comments

@Holger Your comment possibly ended up in the wrong place. I don't think it refers to the answer above.
@Markus, no, the Holger's comment was right, I edited my answer, the part that was broken got removed, if code is broken fix/remove it :)
It should be emphasized that the action passed to forEach is not only invoked in arbitrary order but concurrently, thus, this will only work with thread safe collections.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.