4

I am trying to implement a simple sliding window function in RxJava2, but struggling to do what I want.

My goal is to take a stream of objects, i.e.

["a", "b", "c", "d", "e"]

and apply a sliding window which will return the elements adjacent to each element.

I.e resulting in:

["a", "b"]
["a", "b", "c"]
["b", "c", "d"]
["c", "d", "e"]
["d", "e"].

I.E.

a----------------b----------------c----------------d----------------e
↓                ↓                ↓                ↓                ↓
↓                ↓                ↓                ↓                ↓
↓                ↓                ↓                ↓                ↓
↓                ↓                ↓                ↓                ↓
["a", "b"]       ["a", "b", "c"]  ["b", "c", "d"]  ["c", "d", "e"]  ["d", "e"]

I can't seem to figure out how to make this happen. A Google Groups post seems like it is on the right track, but doesn't quite get the result I need: https://groups.google.com/forum/#!topic/rxjava/k-U5BijXinU

Any ideas?

4
  • I don't think the problem title is suitable for the problem statement. A sliding window is something like this while it seems you want the adjacent elements of each element. Commented Apr 10, 2017 at 3:26
  • There is a problem with your diagram. You cannot possibly emit an item as output before it has actially arrived as input. Eg. after receiving items "a" and "b" you can't say that the next adjacent would be "c". Otherwise buffer() method will do. Commented Apr 10, 2017 at 12:20
  • @jrook I think it is the same as a sliding window with a different boundary condition. Commented Apr 10, 2017 at 18:21
  • @YaroslavStavnichiy it is okay to be phase-delayed for my purposes, i.e. buffering is OK. Commented Apr 10, 2017 at 18:22

2 Answers 2

6

Depending whether you want your observable to emit List<Item> or Observable<Item> you might use either buffer() or window() operators. The solution isn't that clean but it's pretty straightforward:

Observable.fromArray("a", "b", "c", "d", "e")
        .startWith("")
        .buffer(3, 1)
        .map(strings -> {
            strings.remove("");
            return strings;
        })
        .filter(strings -> strings.size() > 1)

returns

["a", "b"]
["a", "b", "c"]
["b", "c", "d"]
["c", "d", "e"]
["d", "e"]
Sign up to request clarification or add additional context in comments.

2 Comments

This would've been perfect, but I couldn't figure out how to make a generic sentinel element (as "" is in this case), when using a richer object, i.e. a custom type. In my case, the strings are actually e.g. CustomerEvents.
Create an instance of CustomerEvents with some values in CustomerEvents class and make it static. Then you can test against the "sentinal" objekt by accessing the static member.
0

Perhaps as @jrook said, this isn't the best fit for a standard windowing. The array size itself is not enough information to know which side of the element you are on anyways, so I wrapped in in a simple value class.

Here's the solution I went with. It definitely is not a good solution for larger streams, since it blocks to read the whole observable first, which obviously may not be okay for certain use cases (in mine, it is OK).

 public static <R> ObservableTransformer<R, AdjacentPairing<R>> pairWithAdjacents() {
    return upstream -> upstream
        .toList()
        .flatMapObservable(list -> {
          ArrayList<AdjacentPairing<R>> pairings = new ArrayList<>(list.size());
          for (int i = 0; i < list.size(); i++) {
            pairings.add(AdjacentPairing.from(
                i == 0 ? null : list.get(i - 1),
                list.get(i),
                i == list.size() -1 ? null : list.get(i + 1)));
          }
          return Observable.fromIterable(pairings);
        });
  }

  @AutoValue
  public static abstract class AdjacentPairing<T> {
    @Nullable
    public abstract T getPrevious();
    public abstract T getElement();
    @Nullable
    public abstract T getNext();

    public static <T> AdjacentPairing<T> from(@Nullable T previous, T element, @Nullable T next){
      return new AutoValue_RxUtils_AdjacentPairing<>(previous, element, next);
    }
  }

1 Comment

Looks like reactive programming makes your life harder. Why do you need RxJava for such a task?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.