Stream::collect() can do that for you. Here a hack is applied for brevity: as both the inputs and the outputs are numbers, and int in particular, the intermediate storage can be int[2], where the first element is the thing we are counting (so 0 and 1 in the example), and the second element is the counter. Later in the post there will be "real" something-counter pairs.
Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1);
List<Integer> result = stream.collect(
ArrayList<int[]>::new,
(list, i) -> {
if (list.isEmpty() || list.get(list.size() - 1)[0] != i)
list.add(new int[] { i, 1 });
else
list.get(list.size() - 1)[1]++;
},
(l1, l2) -> {
if (l1.isEmpty() || l2.isEmpty() || l1.get(l1.size() - 1)[0] != l2.get(0)[0])
l1.addAll(l2);
else {
l1.get(l1.size() - 1)[1] += l2.get(0)[1];
l1.addAll(l2.subList(1, l2.size()));
}
}
).stream().map(pair -> pair[1]).collect(Collectors.toList());
System.out.println(result);
ArrayList<int[]>::new is the supplier, it creates the intermediate storage(s) when needed. Similar to "identity" in reduce(), but it can be reused.
The accumulator function (the (list, i) thing) checks if the list is empty or its last element is counting something else than i, and in that case adds a new pair, initialized with i as element, and 1 as count. Otherwise it just increases the existing counter as it counts the same kind of element as i. collect() does mutable accumulation, and thus nothing is returned (unlike with reduce()).
Then there is a "combiner" (the (l1, l2) thing), which has to be able to combine two partial results into one (into the first one of the two). Here we have to be prepared that a partial result may end with the beginning of the next partial result, that's what the if is checking: the two lists can be "blindly" appended if either of them is empty (unlikely, but who knows), or the last element of the first list is counting something else than the first element of the second list (here it's also handy that we already know that the lists are not empty, so those last/first elements exist). Otherwise we have to update the last element of the first list (with the counter of the first element in the second list), and only append the remaining elements.
And as we have a list of int[2]s at this point, a separate map-collect pair strips them into the counter part which we want.
The printed output is [3, 2, 2, 4] by the way.
How such partial results and the need for combining them could arise? One possible case is that this thing can work in parallel. Here is a variant with some logging, and also with a "proper" pair object instead of int[2]. It's not that proper, but Map.Entry<key,value> can be used as pair. It's a but clumsier (like requiring getValue()-setValue() instead of ++), but now the input could be anything, not just numbers. Input has been changed too, but only for logging purposes, it works with the original too.
Stream<Integer> stream = Stream.of(0, 0, 0, 1, 1, 2, 2, 3, 3, 3, 3);
System.out.println(
stream.parallel().collect(
ArrayList<Map.Entry<Integer, Integer>>::new,
(list, i) -> {
System.out.println("acc " + list + " " + i + " " + Thread.currentThread());
if (list.isEmpty() || list.get(list.size() - 1).getKey() != i)
list.add(new AbstractMap.SimpleEntry<Integer, Integer>(i, 1));
else {
var p = list.get(list.size() - 1);
p.setValue(p.getValue() + 1);
}
}, (l1, l2) -> {
System.out.println("comb " + l1 + " " + l2 + " " + Thread.currentThread());
if (l1.isEmpty() || l2.isEmpty() || l1.get(l1.size() - 1).getKey() != l2.get(0).getKey())
l1.addAll(l2);
else {
var p = l1.get(l1.size() - 1);
p.setValue(p.getValue() + l2.get(0).getValue());
l1.addAll(l2.subList(1, l2.size()));
}
}
)
);
It may need a couple runs, but there are times when it actually runs multi-threaded, producing an output like this:
acc [] 2 Thread[main,5,main]
acc [] 0 Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 3 Thread[main,5,main]
acc [] 1 Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 1 Thread[ForkJoinPool.commonPool-worker-3,5,main]
comb [2=1] [3=1] Thread[main,5,main]
acc [] 3 Thread[ForkJoinPool.commonPool-worker-5,5,main]
acc [] 3 Thread[ForkJoinPool.commonPool-worker-5,5,main]
comb [1=1] [1=1] Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 2 Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [2=1] [2=1, 3=1] Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [0=1] [1=2] Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 3 Thread[ForkJoinPool.commonPool-worker-3,5,main]
acc [] 0 Thread[ForkJoinPool.commonPool-worker-3,5,main]
comb [3=1] [3=1] Thread[ForkJoinPool.commonPool-worker-5,5,main]
comb [3=1] [3=2] Thread[ForkJoinPool.commonPool-worker-5,5,main]
acc [] 0 Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [0=1] [0=1] Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [0=2] [0=1, 1=2] Thread[ForkJoinPool.commonPool-worker-7,5,main]
comb [2=2, 3=1] [3=3] Thread[ForkJoinPool.commonPool-worker-5,5,main]
comb [0=3, 1=2] [2=2, 3=4] Thread[ForkJoinPool.commonPool-worker-5,5,main]
[0=3, 1=2, 2=2, 3=4]
What is visible in this particular run is that all 11 input values were accumulated separately (the acc [] x lines, where [] shows that a brand new empty list was passed), in one of 4 threads (the main thread, and worker threads 3-5-7), and those initial steps happen in quite arbitrary order, then results are combined (but here the order is maintained), and thus the special combining step (when lists aren't just appended, but a counter needs to be updated) is indeed used quite often.
The final pair-number transformation is skipped here, that's why the elements and their count are both printed.
For comparison, the same code, just without the parallel() call simply uses the accumulator function, stepping through the input stream sequentially. I'm not sure if combining would ever happen in this sequential case, perhaps for large inputs.
acc [] 0 Thread[main,5,main]
acc [0=1] 0 Thread[main,5,main]
acc [0=2] 0 Thread[main,5,main]
acc [0=3] 1 Thread[main,5,main]
acc [0=3, 1=1] 1 Thread[main,5,main]
acc [0=3, 1=2] 2 Thread[main,5,main]
acc [0=3, 1=2, 2=1] 2 Thread[main,5,main]
acc [0=3, 1=2, 2=2] 3 Thread[main,5,main]
acc [0=3, 1=2, 2=2, 3=1] 3 Thread[main,5,main]
acc [0=3, 1=2, 2=2, 3=2] 3 Thread[main,5,main]
acc [0=3, 1=2, 2=2, 3=3] 3 Thread[main,5,main]
[0=3, 1=2, 2=2, 3=4]
StreamEx::runLengthswhich is what you're looking for.StreamEx?StreamAPI. So there is no limit and you can write your own aggregating objects.