5

Can I analyse previous and/or next elements with Java8 streams somehow?

For example, can I count identical adjacent numbers?

public class Merge {
   public static void main(String[] args) {

      Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);

      // How to get 3, 2, 2, 4 from above

   }
}
4
  • 4
    In vanilla Java, single-pass and without intermediate data structures: no. StreamEx offers StreamEx::runLengths which is what you're looking for. Commented Aug 30, 2016 at 14:08
  • Can I write my custom aggregating objects in StreamEx? Commented Aug 30, 2016 at 14:16
  • 1
    You can see StreamEx as an extension to the common Stream API. So there is no limit and you can write your own aggregating objects. Commented Aug 30, 2016 at 15:08
  • Does a solution have to work with parallel streams or would a sequential-only approach be sufficient? Commented Aug 30, 2016 at 21:52

5 Answers 5

5

If you want it to be lazy, you have to escape the Stream API through Stream.iterator() or Stream.spliterator().

Otherwise the way to do it is to call the terminal operation Stream.collect(Collector) with a custom collector, which will consume the whole stream.


@Test
public void test() {
    Stream<Integer> input = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);

    UniqCountSpliterator uniqCountSpliterator = new UniqCountSpliterator(input.spliterator());

    long[] output = uniqCountSpliterator.stream()
            .toArray();

    long[] expected = {3, 2, 2, 4};

    assertArrayEquals(expected, output);
}

import java.util.Spliterator;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class UniqCountSpliterator implements Spliterator.OfLong {
    private Spliterator wrapped;
    private long count;
    private Object previous;
    private Object current;

    public UniqCountSpliterator(Spliterator wrapped) {
        this.wrapped = wrapped;
    }

    public LongStream stream() {
        return StreamSupport.longStream(this, false);
    }

    @Override
    public OfLong trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public int characteristics() {
        return NONNULL | IMMUTABLE;
    }

    @Override
    public boolean tryAdvance(LongConsumer action) {
        while (wrapped.tryAdvance(next -> current = next) && (null == previous || current.equals(previous))) {
            count++;
            previous = current;
        }
        if (previous == null) {
            return false;
        }
        action.accept(count);
        count = 1;
        previous = null;
        return true;
    }
}
Sign up to request clarification or add additional context in comments.

Comments

1

You can almost do it with flatMap. It would work for infinite streams, with finite stream I don't see a way to detect end of stream from within it.

    Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);

    Stream<Integer> flatMap = stream.flatMap(new Function<Integer, Stream<Integer>>() {
        Integer prev = null;
        int count;
        public java.util.stream.Stream<Integer> apply(Integer i) {
            if ( i.equals(prev)) {
                count++;
                return Stream.empty();
            } else {
                int c = count;
                count = 1;
                prev = i;
                if ( c > 0 ) {
                    return Stream.of(c);
                } else {
                    return Stream.empty();
                }
            }
        };
    });

    flatMap.forEach(i -> {
        System.out.println(i);
    });

Said that, you could probably get a lot better mileage out of rxjava for such kind of things (where you could use Subject to emit values as you wish and be able to detect end of stream).

Of course, if you want to escape Stream boundaries, there are many options, as indicated by Christoffers answer.

3 Comments

Almost is no solution and how about parallel computation?
It depends on what OP real problem is. Trick I wanted to show here is how to emit stream elements only in some cases using flatMap and empty/single stream, which might be non-obvious (his original question of looking up elements, even if possible, would still require similar thing to change number of elements in streams). As for parallell... good luck with that, especially for infinite streams.
I agree but flatMap is not the right tool for this job. I must admit that is a neat trick but you can't process finite Streams. The only way to do this kind of task is using the Spliterator as Christoffer Hammarström proposed. Then you can suppress parallel computation and handle the end of the Stream.
1

Stream::collect() can do that for you. Here a hack is applied for brevity: as both the inputs and the outputs are numbers, and int in particular, the intermediate storage can be int[2], where the first element is the thing we are counting (so 0 and 1 in the example), and the second element is the counter. Later in the post there will be "real" something-counter pairs.

Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);

List<Integer> result = stream.collect(
    ArrayList<int[]>::new,
    (list, i) -> {
        if (list.isEmpty() || list.get(list.size() - 1)[0] != i)
            list.add(new int[] { i, 1 });
        else
            list.get(list.size() - 1)[1]++;
    },
    (l1, l2) -> {
        if (l1.isEmpty() || l2.isEmpty() || l1.get(l1.size() - 1)[0] != l2.get(0)[0])
            l1.addAll(l2);
        else {
            l1.get(l1.size() - 1)[1] += l2.get(0)[1];
            l1.addAll(l2.subList(1, l2.size()));
        }
    }
).stream().map(pair -> pair[1]).collect(Collectors.toList());

System.out.println(result);

ArrayList<int[]>::new is the supplier, it creates the intermediate storage(s) when needed. Similar to "identity" in reduce(), but it can be reused.

The accumulator function (the (list, i) thing) checks if the list is empty or its last element is counting something else than i, and in that case adds a new pair, initialized with i as element, and 1 as count. Otherwise it just increases the existing counter as it counts the same kind of element as i. collect() does mutable accumulation, and thus nothing is returned (unlike with reduce()).

Then there is a "combiner" (the (l1, l2) thing), which has to be able to combine two partial results into one (into the first one of the two). Here we have to be prepared that a partial result may end with the beginning of the next partial result, that's what the if is checking: the two lists can be "blindly" appended if either of them is empty (unlikely, but who knows), or the last element of the first list is counting something else than the first element of the second list (here it's also handy that we already know that the lists are not empty, so those last/first elements exist). Otherwise we have to update the last element of the first list (with the counter of the first element in the second list), and only append the remaining elements.

And as we have a list of int[2]s at this point, a separate map-collect pair strips them into the counter part which we want.

The printed output is [3, 2, 2, 4] by the way.

How such partial results and the need for combining them could arise? One possible case is that this thing can work in parallel. Here is a variant with some logging, and also with a "proper" pair object instead of int[2]. It's not that proper, but Map.Entry<key,value> can be used as pair. It's a but clumsier (like requiring getValue()-setValue() instead of ++), but now the input could be anything, not just numbers. Input has been changed too, but only for logging purposes, it works with the original too.

Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 2, 2, 3, 3, 3, 3);

System.out.println(
    stream.parallel().collect(
        ArrayList<Map.Entry<Integer, Integer>>::new,
        (list, i) -> {
            System.out.println("acc " + list + " " + i + " " + Thread.currentThread());
            if (list.isEmpty() || list.get(list.size() - 1).getKey() != i)
                list.add(new AbstractMap.SimpleEntry<Integer, Integer>(i, 1));
            else {
                var p = list.get(list.size() - 1);
                p.setValue(p.getValue() + 1);
            }
        }, (l1, l2) -> {
            System.out.println("comb " + l1 + " " + l2 + " " + Thread.currentThread());
            if (l1.isEmpty() || l2.isEmpty() || l1.get(l1.size() - 1).getKey() != l2.get(0).getKey())
                l1.addAll(l2);
            else {
                var p = l1.get(l1.size() - 1);
                p.setValue(p.getValue() + l2.get(0).getValue());
                l1.addAll(l2.subList(1, l2.size()));
            }
        }
    )
);

It may need a couple runs, but there are times when it actually runs multi-threaded, producing an output like this:

acc [] 2 Thread[main,5,main]
acc [] 0 Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 3 Thread[main,5,main]
acc [] 1 Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 1 Thread[ForkJoinPool.commonPool-worker-3,5,main]
comb [2=1] [3=1] Thread[main,5,main]
acc [] 3 Thread[ForkJoinPool.commonPool-worker-5,5,main]
acc [] 3 Thread[ForkJoinPool.commonPool-worker-5,5,main]
comb [1=1] [1=1] Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 2 Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [2=1] [2=1, 3=1] Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [0=1] [1=2] Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 3 Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 0 Thread[ForkJoinPool.commonPool-worker-3,5,main]
comb [3=1] [3=1] Thread[ForkJoinPool.commonPool-worker-5,5,main]
comb [3=1] [3=2] Thread[ForkJoinPool.commonPool-worker-5,5,main]
acc [] 0 Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [0=1] [0=1] Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [0=2] [0=1, 1=2] Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [2=2, 3=1] [3=3] Thread[ForkJoinPool.commonPool-worker-5,5,main]
comb [0=3, 1=2] [2=2, 3=4] Thread[ForkJoinPool.commonPool-worker-5,5,main]
[0=3, 1=2, 2=2, 3=4]

What is visible in this particular run is that all 11 input values were accumulated separately (the acc [] x lines, where [] shows that a brand new empty list was passed), in one of 4 threads (the main thread, and worker threads 3-5-7), and those initial steps happen in quite arbitrary order, then results are combined (but here the order is maintained), and thus the special combining step (when lists aren't just appended, but a counter needs to be updated) is indeed used quite often.
The final pair-number transformation is skipped here, that's why the elements and their count are both printed.

For comparison, the same code, just without the parallel() call simply uses the accumulator function, stepping through the input stream sequentially. I'm not sure if combining would ever happen in this sequential case, perhaps for large inputs.

acc [] 0 Thread[main,5,main]
acc [0=1] 0 Thread[main,5,main]
acc [0=2] 0 Thread[main,5,main]
acc [0=3] 1 Thread[main,5,main]
acc [0=3, 1=1] 1 Thread[main,5,main]
acc [0=3, 1=2] 2 Thread[main,5,main]
acc [0=3, 1=2, 2=1] 2 Thread[main,5,main]
acc [0=3, 1=2, 2=2] 3 Thread[main,5,main]
acc [0=3, 1=2, 2=2, 3=1] 3 Thread[main,5,main]
acc [0=3, 1=2, 2=2, 3=2] 3 Thread[main,5,main]
acc [0=3, 1=2, 2=2, 3=3] 3 Thread[main,5,main]
[0=3, 1=2, 2=2, 3=4]

Comments

0

If you don't mind two statements, you can setup a list to fill up with the counts, and then use reduce:

List<Integer> counts = new ArrayList<>();
Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1).reduce((i, j) -> {
    if (counts.isEmpty()) {
        counts.add(1);
    }

    if (j == i) {
        int index = counts.size() - 1;
        counts.set(index, counts.get(index) + 1);
    } else {
        counts.add(1);
    }
    return j;
});

Comments

0

You can use reduce function to merge the items in a TreeMap. If you want only the count, you can get the values of the map.

public class Merge {
   public static void main(String[] args) {

      Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);

      Map<Integer,Integer> map = stream.reduce(new TreeMap<Integer,Integer>(), (map, n) -> {
          if (map.isEmpty() || map.lastKey() != n)
              map.put(n, 1);
          else{
              map.put(map.lastKey(), map.lastEntry().getValue() + 1);
          }
          return map;
      }, (list,list2) -> list);

      Collection<Integer> numbers = map.values();

   }
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.