3

Fixtures

BiConsumer<Exception, Consumer<? super Integer>> NOTHING = (ex, unused) ->{/**/};

When I try to fix the bug that is reported by @Holger in this answer:

Stream<Integer> stream = Stream.of(1, 2, 3);

// v--- the bug I have already fixed, it will throws RuntimeException 
exceptionally(stream, NOTHING).collect(ArrayList::new, (l, x) -> {
    l.add(x);
    if (x < 4) throw new RuntimeException();
}, List::addAll);

Everything is ok but when using Stream.of(T) the map(...) operation will be invoked infinitely, for example:

List<Integer> result = exceptionally(
        //               v--- infinitely call
        Stream.of("bad").map(Integer::parseInt),
        NOTHING
).collect(toList());

But when I replace Stream.of(T) with Stream.of(T[]), it is works fine again, for example:

//            v--- return an empty list
List<Integer> result = exceptionally(
        Stream.of(new String[]{"bad"}).map(Integer::parseInt),
        NOTHING
).collect(toList());

The java.util.stream.Streams.StreamBuilderImpl#tryAdvance should be reset the count first, for example:

public boolean tryAdvance(Consumer<? super T> action) {
    Objects.requireNonNull(action);

    if (count == -2) {
        action.accept(first);
        count = -1;// <--- it should be call before `action.accept(first)`;
        return true;
    }
    else {
        return false;
    }
}

Q: It should be a bug in jdk, since it must keep the semantics consistent between the Stream.of methods. Am I right?

<T> Stream<T> exceptionally(Stream<T> source,
        BiConsumer<Exception, Consumer<? super T>> exceptionally) {
    
    class ExceptionallySpliterator extends AbstractSpliterator<T>
            implements Consumer<T> {

        private Spliterator<T> source;
        private T value;

        public ExceptionallySpliterator(Spliterator<T> source) {
            super(source.estimateSize(), source.characteristics());
            this.source = source;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            Boolean state = attempt(action);
            if (state == null) return true;
            if (state) action.accept(value);
            return state;
        }

        private Boolean attempt(Consumer<? super T> action) {
            try {
                return source.tryAdvance(this);
            } catch (Exception ex) {
                exceptionally.accept(ex, action);
                return null;
            }
        }

        @Override
        public void accept(T value) {
            this.value = value;
        }
    }

    return stream(
            new ExceptionallySpliterator(source.spliterator()), 
            source.isParallel()
    ).onClose(source::close);
}
12
  • and I found Stream.of(T) also can't run in jdk9. Commented Jul 22, 2017 at 15:39
  • 1
    I just tested Stream.of(T) and Stream.of(T[]) on my machine, and got the same result. Are you sure the issue isn't elsewhere? Commented Jul 22, 2017 at 20:13
  • @JoeC hi, have you test the code above ? just test Stream#of can't produce the expected behavior, :) Commented Jul 22, 2017 at 20:38
  • I've been able to observe that code is not the same when stream contains one element or several. When tested with bad values, in the first case an exception is thrown in java.util.stream.Streams$StreamBuilderImpl.tryAdvancewhile in the second case it is in java.util.Spliterators$ArraySpliterator.tryAdvance. Looks like a bug when exception in thrown in a stream of only one element (bad state management). Commented Jul 22, 2017 at 20:42
  • @Jean-BaptisteYunès thanks for your feedback, sir. I'm not sure it is a bug, so I want to know it is really a bug. Since it is not fixed in jdk9 as far as I know. Commented Jul 22, 2017 at 20:47

1 Answer 1

5

I wouldn’t call this a bug—not even an unexpected behavior, given the fact that I warned about such scenarios in this comment on the linked question a month ago:

Keep in mind that you don’t know whether the source iterator actually advanced its internal state or not when an exception has been thrown, so assuming that there is a next element can bring you into an infinite loop, repeating the failed operation over and over again.

Of course, for the Stream.of(singleElement) case that scenario is easy to avoid and changing the order of the two statements, action.accept(first); and count = -1;, would make the code more robust, still, being able to recover from an exception is not a guaranteed feature at all and there are other stream sources for which such recovery couldn’t get implemented that easily.

E.g., the streams returned by BufferedReader.lines(), resp. Files.lines() can’t force their underlying reader to advance one line if an IOException occurs.

Generally, this attempt to recover from an exception makes the premature assumption that an exception thrown by the source always indicates a problem with a particular element rather than a problem with the source in general. This works with the contrived example where you know that the exception is associated with a particular element because you provoked it. But this is not how you can deal with unexpected exceptions, which is what exceptions usually are, as when you expect a particular problem, like string elements not being in number format, you should handle them straight-forwardly, like filtering out the invalid strings before parsing.

Sign up to request clarification or add additional context in comments.

8 Comments

thanks, but how to solve this scenario BufferedReader.lines() ,sir? Stream.of(T) I have already solved by analyzing the stackTrace.
@holi-java: there is no way to recover from an exception from a data source, unless it has a documented recovery strategy. Generally, an exception may happen at an arbitrary place in the middle of an operation, leaving the data in an inconsistent state. You are only safe with custom exception types deliberately thrown at certain places. But RuntimeException is the supertype of a lot of exceptions that are thrown at unplanned places due to programming errors, etc.
Then you are still before the problematic position and about to repeat the failed operation, if the error is related to the particular element. And you can’t skip to the next element without doing the operation. You only know where the line ends, when you have read the line. And, as said, the error still can also be entirely independent of the element. If someone shut down the file server, there is no recovery…
@holi-java no, he means: give up, abandon this goal for it cannot be achieved. If you can recover from exceptions, this is because of the specific case of a particular 'bad' element. You can solve that much more straightforwardly by first invoking filter() and/or map() on the source stream instead.
@holi-java: a data source itself may offer recovery for certain kind of failures, but usually, most I/O operations do not fall into the recoverable category. To name an example where it might work, JDBC database drivers have such a category
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.