3

I'm new to RxJS observables and I'm trying to resolve a rather simple use case.

In a service, I first make a http call that returns an item (as an observable). The item contains an array of ids, some of them repeated. For each distinct id, I need to call another http service (again returns an observable), and add its return value to the original item in place of the corresponding id. These calls should happen in parallel. Finally, once every call has completed, my service should return an observable of the original item, now with its sub-items in place.

To give a better idea, this is what it would look like with promises rather than observables:

MyService() {
    return HttpGetMainItem()
        .then(item => {
            var promises = _.uniq(item.subItems)
                 .map(sid => HttpGetSubItem(sid)
                            .then(subItem => {
                                // add to matching item.subItems
                             }));
            // wait for all promises to complete and return main item
            return Promise.all(promises).then(() => item);
        });
}

What would be the best way to accomplish this working with observables?

EDIT: from the answers it seems I wasn't very clear. The example with promises is just for clarity, in my case the http calls are actually Angular's HttpClient.get, so they return observables- I'm looking to do everything with observables.

3
  • have you tried something? you could start by defining the interfaces involved in your use case, eg the responses from the 2 requests that you mention. Commented Sep 14, 2018 at 16:13
  • In other words, you'd like to chain observables, right? Check this: learnrxjs.io/operators/transformation/mergemap.html You may also want to search for switchMap Commented Sep 14, 2018 at 16:50
  • Hey @udik since you're new to Rxjs, you want to start using the new syntax for rxjs6. It's feels a little clunkier at first but imports are easier and tree-shaking is much improved. So instead of call Observable.map, calling Observable.pipe(map). To make sure you're not using the old syntax remove 'rxjs-compat' from your dependencies in package.json. Commented Sep 14, 2018 at 17:14

3 Answers 3

4

Here's one way you can accomplish the above with rxjs.

  1. Convert the promise to an observable using from.
  2. In the outer pipe call switchMap so that you can call another observable and return that as the result of the outer observable. You're switching execution contexts.
  3. Inside the switchMap do the map of subItems as you did before and then use forkJoin to create an Observable of all the elements' promise. ForkJoin will emit an array of all the results once all promise complete. It's like promise.all.
  4. Add the items like you were planning to do before, and return the original item.

Code

from(HttpGetMainItem()).pipe(
    switchMap(item => 
        forkJoin(_.uniq(item.subItems).map(sid => HttpGetSubItem(sid)))
            .pipe(
                map(results => { 
                    /* add results to matching items.subItems and */
                    return item;
                })
            )
     )
);

I feel this looks a bit clunky, due to the need to retain the root item and the nesting it requires. You can use the selector parameter of switchMap to combine the outer and inner observable. You can use that parameter in place of the logic you had in map() and since both observables' results are passed you won't need any further nesting.

from(HttpGetMainItem()).pipe(
    switchMap(
        item => forkJoin(_.uniq(item.subItems).map(sid => HttpGetSubItem(sid))),
        (item, results) => { 
             /* add results to matching items.subItems and */
             return item;
        }
    )
);
Sign up to request clarification or add additional context in comments.

4 Comments

thank you for code and explanation. I was also trying to use from(item.subItems).pipe(distinct(), ..)- do you think it's worth it? How would the code look in that case?
It depends on the nature of subItems. I don't know how lodash's uniq works, but uniless you pass a selector into distinct it'll be a simple equality check. So different references with the same property values will be kept. If it's just primitives or you provide a key selector then yes, distinct will work.
actually I have no idea either about _.uniq, just used it in the example to clarify :). subItems is a simple array of strings, I just need to remove the duplicates.
The more I think about it, the less I think distinct would work. Rxjs's distinct is a distinct values from a stream of results. The only way to use it would be to wrap each subItem with of, merging them, then calling distinct, unless you want to call distinct after HttpGetSubItem which would be inefficient.
1

I believe you need something like this:

import { from, forkJoin } from 'rxjs';
import { switchMap } from 'rxjs/operators';

itemWithSubItems$ = from(HttpGetMainItem()).pipe(
    switchMap(item => {
        const promises = _.uniq(item.subItems).map(item => HttpGetSubItem(item.sid));
        return forkJoin(promises)
            .map(resultArray => item.subItems = resultArray)
            .map(_ => item);
    })
);

First fetches the main item. Then use forkJoin to resolve all subqueries and enrich the main item. After that just return the main item.

Comments

0

Maybe you could use a lib like async.each -> https://caolan.github.io/async/docs.html#each (eachSeries maybe).

would be something like :

async.each(array, (obj, cb) => {
  observable with obj in parameter and with subscriber result : 
  cb(err, subscriberRes);
}, (err, res) => {
  console.log(res);
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.