2

In the book Java 8 In Action, section 7.1.1, the authors state that a stream can benefit from parallel processing by adding the function .parallel(). They provide a simple method called parallelSum(int) to illustrate this. I was curious to see how well it worked so I executed this code:

package lambdasinaction.chap7;

import java.util.stream.Stream;

public class ParallelPlay {

    public static void main(String[] args) {
        System.out.println(parallelSum(100_000_000));
    }

    public static long parallelSum(long n) {
        return Stream.iterate(1L, i -> i + 1)
                .limit(n)
                .parallel()
                .reduce(0L, Long::sum);
    }
}

To my surprise, I received this error:

Exception in thread "main" java.lang.OutOfMemoryError
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.lang.reflect.Constructor.newInstance(Unknown Source)
    at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
    at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
    at java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
    at java.util.stream.SliceOps$1.opEvaluateParallelLazy(Unknown Source)
    at java.util.stream.AbstractPipeline.sourceSpliterator(Unknown Source)
    at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
    at java.util.stream.ReferencePipeline.reduce(Unknown Source)
    at lambdasinaction.chap7.ParallelPlay.parallelSum(ParallelPlay.java:15)
    at lambdasinaction.chap7.ParallelPlay.main(ParallelPlay.java:8)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.util.stream.SpinedBuffer.ensureCapacity(Unknown Source)
    at java.util.stream.Nodes$SpinedNodeBuilder.begin(Unknown Source)
    at java.util.stream.AbstractPipeline.copyInto(Unknown Source)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
    at java.util.stream.SliceOps$SliceTask.doLeaf(Unknown Source)
    at java.util.stream.SliceOps$SliceTask.doLeaf(Unknown Source)
    at java.util.stream.AbstractShortCircuitTask.compute(Unknown Source)
    at java.util.concurrent.CountedCompleter.exec(Unknown Source)
    at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
    at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
    at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)

I am running Java 1.8.0_45 on Windows 7, SP1 with a four-core processor. What's going on?

3
  • On macbook pro (2.2 GHz Intel Core i7 with 16GB ram) it took 26 secs and returned: 5000000050000000 Commented Jun 14, 2015 at 3:56
  • Looks like your heap size is too small, run: java -XX:+PrintFlagsFinal -version | findstr /i "HeapSize PermSize ThreadStackSize" to check it, and consider increasing it (by changing the values of -Xms and -Xmx) and try running again. Commented Jun 14, 2015 at 3:59
  • Also, using iterate() as a stream source essentially guarantees that you will not get any parallelization, since this is a fundamentally sequential generation (can't generate element n+1 until you've generated element n.) Use IntStream.range() instead. Commented Jun 15, 2015 at 12:09

1 Answer 1

6

Here you create an infinite stream and limit it afterwards. There are known problems about processing infinite streams in parallel. In particular there's no way to split the task to equal parts effectively. Internally some heuristics are used which are not well suitable for every task. In your case it's much better to create the finite stream using LongStream.range:

import java.util.stream.LongStream;

public class ParallelPlay {

    public static void main(String[] args) {
        System.out.println(parallelSum(100_000_000));
    }

    public static long parallelSum(long n) {
        return LongStream.rangeClosed(1, n).parallel().sum();
    }
}

In this case Stream engine knows from the very beginning how many elements you have, so it can split the task effectively. Also note that using the LongStream is more effecient as you will have no unnecessary boxing.

In general avoid infinite streams if you can solve your task with finite ones.

Sign up to request clarification or add additional context in comments.

5 Comments

Though, regardless of how many threads may run in parallel, they can’t beat a simple single-threaded (n+1)*n/2. Unfortunately, the standard implementation isn’t clever enough to understand that.
@Holger: that's an interesting question why there are no specialized streams (like EmptyStream, SingletonStream, RangeStream, etc) which could optimize some operations. Probably this would produce too much code (given the fact that you should support primitives as well). Another possible reason is that it may hurt performance in normal case as stream calls will become polymorphic, so it will be harder for JIT to devirtualize them (currently JDK has only one Stream implementation).
I don’t think that polymorphic stream implementations could hurt performance more than the polymorphic Spliterator interfaces. After all, Hotspot does a great job dealing with them and the terminal operation consists of a single method invocation anyway. But you still wouldn’t need them, the only thing you need, is having these (or some of the) high-level operations defined on the internal pipeline stages. Often it’s just that either, nobody thought about it or optimizations have been deferred. See Stream.count() which will short-cut in Java 9, or flatmap streams which aren’t lazy yet.
@Holger, yes, I already tried optimized JDK9 Stream.count(). Optimizations may be significant when non-trivial flatMap for big collection is used: in this case stream operations are executed many times. But I agree: my StreamEx lib adds couple more Stream implementations and benchmarks rarely show visible performance overhead. Probably it's not done, because RangeStream can optimize only very trivial cases which rarely appear in real code. For example, it's hardly possible to optimize IntStream.range(0,100).map(x -> x*2).sum().
Actually that is not true (regarding infinite streams). you can rewrite the example from above with: LongStream.generate(() -> 1L).limit(n).parallel().reduce(0L, Long::sum) And it will not fail with OOM.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.