1

I want to run a Java stream in which the first element filters in parallel to the second and third element. The second and third element can not filter parallel, because the use a shared ressource.

int numberOfFilteredElements = Stream.of(firstElement, secondElememt, thirdElement)
  .filter(element -> element.someFilter())
  .collect(Collectors.toSet())
  .size();

A Java stream can be parallel or sequential. Concating streams are only parallel if all streams are parallel. So I can not create create a parallel and sequential stream and concat them.

Any other idea how I can solve this with streams? Or do I need to need use locks like ReentrantLock in someFilter() and run the stream parallel for all elements?

5
  • 1
    This looks like a problem that would be much easier to solve without streams. Commented Mar 31, 2021 at 18:34
  • I thought the same but was interessted if there would be a nice solution with streams I could learn. Commented Mar 31, 2021 at 18:44
  • Not with streams. Method someFilter could - of course - synchronize or lock, but I am not sure how the stream API would react to this. Commented Mar 31, 2021 at 18:52
  • If it is only about filter then there is one way to achieve it. You can check the hashcode of element and decide to enter in a synchronized block or not. Commented Mar 31, 2021 at 18:55
  • Thanks. I will solve this without streams. Commented Apr 1, 2021 at 9:30

1 Answer 1

1

We can say:

  • First element is Stream #1.
  • Second and third elements are Stream #2.

Then we can create a Parent Parallel Stream that consists of Stream #1, Stream #2:

    Predicate<Integer> someFilter = i -> {
        try { Thread.sleep(4000); } catch (InterruptedException e) {e.printStackTrace();}
        logger.info(i + " filtered on thread id: " + Thread.currentThread().getId());
        return i != 7;
    };
    Stream<Integer> stream1 = IntStream.rangeClosed(0, 3).boxed().filter(someFilter);
    Stream<Integer> stream2 = IntStream.rangeClosed(-3, -1).boxed().filter(someFilter);

    int numberOfFilteredElements = Stream.of(stream1, stream2)
            .parallel()
            .flatMap(Function.identity())
            .peek(integer -> logger.info("Peeked: " + integer.toString()))
            .collect(Collectors.toSet())
            .size();
    System.out.println("Set size is: " + numberOfFilteredElements);

Output:

2021-04-01 12:56:34.084  INFO 26336 --- [           main] com.example.demo.DemoApplication         : Started DemoApplication in 2.966 seconds (JVM running for 5.717)
2021-04-01 12:56:38.098  INFO 26336 --- [           main] com.example.demo.DemoApplication         : -3 filtered on thread id: 1
2021-04-01 12:56:38.098  INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication         : 0 filtered on thread id: 22
2021-04-01 12:56:38.100  INFO 26336 --- [           main] com.example.demo.DemoApplication         : Peeked: -3
2021-04-01 12:56:38.100  INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication         : Peeked: 0
2021-04-01 12:56:42.100  INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication         : 1 filtered on thread id: 22
2021-04-01 12:56:42.100  INFO 26336 --- [           main] com.example.demo.DemoApplication         : -2 filtered on thread id: 1
2021-04-01 12:56:42.100  INFO 26336 --- [           main] com.example.demo.DemoApplication         : Peeked: -2
2021-04-01 12:56:42.100  INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication         : Peeked: 1
2021-04-01 12:56:46.101  INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication         : 2 filtered on thread id: 22
2021-04-01 12:56:46.101  INFO 26336 --- [           main] com.example.demo.DemoApplication         : -1 filtered on thread id: 1
2021-04-01 12:56:46.101  INFO 26336 --- [           main] com.example.demo.DemoApplication         : Peeked: -1
2021-04-01 12:56:46.101  INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication         : Peeked: 2
2021-04-01 12:56:50.102  INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication         : 3 filtered on thread id: 22
2021-04-01 12:56:50.102  INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication         : Peeked: 3
Set size is: 7

Notice that Stream #1 (which can be your first element) runs in parallel to Stream #2 (which can be second and third elements).

Sign up to request clarification or add additional context in comments.

3 Comments

It is very dangerous to assume that a particular limitation (the current implementation’s inability to parallelize sub-streams in flatMap) will persist all the time.
Thanks for the comment. I am confused though. I don't see why would we expect flatMap to parallelize the sub-streams. I thought it specifies to parallelize just the parent stream, not its elements, no?
No, it does not specify anywhere that sub-streams are processed sequentially. That’s just a detail of the current implementation.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.