3

i'm kinda new to rxjs and can't get my head around this problem:

I have two streams:

  • one with incoming objects
    • ---a----b----c----d----->
  • one with the selected object from a list
    • -------------------c---->

From the incoming objects stream make a stream of the list of objects (with scan operator)

  incoming: ----a--------b-------c----------d----------------\>
  list:  -------[a]----[a,b]----[a,b,c]----[a,b,c,d]---------\>

When a list object is selected (n), start a new stream

  • the first value of the new stream is the last value of the list sliced ( list.slice(n))
incoming: ----a--------b-------c----------d--------------------e-------->
list:  -------[a]----[a,b]----[a,b,c]----[a,b,c,d]--------->
selected object:  ---------------------------------c------->

new stream of list:                           ------[c,d]-----[c,d,e]--->

i can't get the last value of the list stream when the object is selected,,, made a marble diagram for better understanding, marble diagram

selectedObject$ =  new BehaviorSubject(0);
incomingObjects$ = new Subject();

list$ = incomingObjects$.pipe(
          scan((acc, val) => {
            acc.push(val);
            return acc;
          }, [])
        )
newList$ = selectedObject$.pipe(
            withLastFrom(list$),
          switchMap(([index,list])=> incomingObjects$.pipe(
             scan((acc, val) => {
              acc.push(val);
              return acc;
             }, list.slice(index))
          ))
        )

2 Answers 2

2

A common pattern I use along with the scan operator is passing reducer functions instead of values to scan so that the current value can be used in the update operation. In this case you can link the two observables with a merge operator and map their values to functions that are appropriate - either adding to a list, or slicing the list after a selection.

// these are just timers for demonstration, any observable should be fine.
const incoming$ = timer(1000, 1000).pipe(map(x => String.fromCharCode(x + 65)), take(10));
const selected$ = timer(3000, 3000).pipe(map(x => String.fromCharCode(x * 2 + 66)), take(2));

merge(
  incoming$.pipe(map(x => (s) => [...s, x])), // append to list
  selected$.pipe(map(x => (s) => { // slice list starting from selection
    const index = s.indexOf(x);
    return (index !== -1) ? s.slice(index) : s;
  }))
).pipe(
  scan((list, reducer) => reducer(list), []) // run reducer
).subscribe(x => console.log(x)); // display list state as demonstration.
Sign up to request clarification or add additional context in comments.

2 Comments

thank you for your time and for helping me. works as requested.
This is a very interesting idea. I've never thought of mapping emissions to functions. Seems especially useful the way you are mapping different sources to different functions, which allow them to modify the state in different ways.
0

If I understand the problem right, you could follow the following approach.

The key point is to recognize that the list Observable (i.e. the Observable obtained with the use of scan) should be an hot Observable, i.e. an Observable that notifies independent on whether or not it is subscribed. The reason is that each new stream you want to create should have always the same source Observable as its upstream.

Then, as you already hint, the act of selecting a value should be modeled with a BehaviorSubject.

As soon as the select BehaviorSubject notifies a value selected, the previous stream has to complete and a new one has to be subscribed. This is the job of switchMap.

The rest is to slice the arrays of numbers in the right way.

This is the complete code of this approach

const selectedObject$ = new BehaviorSubject(1);
const incomingObjects$ = interval(1000).pipe(take(10));
const incomingObjectsHot$ = new ReplaySubject<number[]>(1);

incomingObjects$
  .pipe(
    scan((acc, val) => {
      acc.push(val);
      return acc;
    }, [])
  )
  .subscribe(incomingObjectsHot$);

selectedObject$
  .pipe(
    switchMap((selected) =>
      incomingObjectsHot$.pipe(
        map((nums) => {
          const selIndex = nums.indexOf(selected);
          if (selIndex > 0) {
            return nums.slice(selIndex);
          }
        })
      )
    ),
    filter(v => !!v)
  )
  .subscribe(console.log);

An example can be seen in this stackblitz.

1 Comment

thank you for your time. IMHO with this solution one can modify incomingObjectsHot$ from anywhere by using next call. (there shoud be only two sources of truth)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.