1

I have a little difficulty with the RxJS Observable.

Following setup:

There is an interface (just to make things clearer) for my objects:

interface MyObject
{
    value: number;
    new_value?: number;
}

Then there is a little function, which takes an object and returns an observable, which updates the object:

let updateObject = (object: MyObject) => {
    return Observable.create(observer => {
        setTimeout(() => {
            object.new_value = object.value * 5;
            observer.next(object);
            observer.complete();
        }, 1500);
    });
};

Now I have a source observable, which returns an array of objects (for example as the response of a server):

let observable = Observable.of<MyObject[]>([
    {value: 1},
    {value: 2},
    {value: 3}
]);

So, now I want to update each object in the array, with my "updateObject()" method, so when I subscribe to the observable, I get this result:

// here the asyncOperation should be applied to the observable
observable.subscribe((objects: MyObject[]) => {
    // objects should be:
    // [
    //     {value: 1, new_value: 5},
    //     {value: 2, new_value: 10},
    //     {value: 3, new_value: 15}
    // ]
});

I think it is possible, when I nest many observables, concat and subscribe on them, but I think this is ugly. I didn't find a observable function, provided by the RxJS library, so I'm asking you guys, if you could help me!

Do you have any ideas or a solution for me?

Thanks in regard!

2 Answers 2

1

Most easily you can use just Observable.forkJoin to wait until all the inner Observables complete:

observable
  .concatMap((objects: MyObject[]) => {
    const observables = [];
    objects.forEach(obj => observables.push(updateObject(obj)));

    return Observable.forkJoin(observables);
  })
  .subscribe(...);
Sign up to request clarification or add additional context in comments.

1 Comment

Yeah, this works! Thanks so much, you are my hero! :)
0
Observable.merge(
  Observable.of({value: 1}),
  Observable.of({value: 2}),
  Observable.of({value: 3})
)
.timer(1500).map(obj => Object.assign(obj, {newValue: obj.value * 5}))

1 Comment

While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.