1

I would like to create a method using rxjs to query an endpoint, which returns an array of objects, then use data from that response to get additional details about item in the array from another endpoint, and finally return a single array of objects as an observable.

This code works but I would like to do it without having a subscribe inside of the tap operator.

testMethod() {
  this.testService.getItemList().pipe(
    tap((items) => {
      items.forEach((item, i) => {
        this.itemArr.push(item);
        this.testService.getItemDetails(item.id).subscribe(itemDetails => {
          this.itemArr[i]['details'] = itemDetails;
        });
      });
    })
  ).subscribe();
}

3 Answers 3

5

There are several ways you can do something like this with rxJs. They vary in the way they manage the parallelism of the calls to the second endpoint, i.e. the endpoint that returns details per each item.

MAXIMUM PARALLELISM

Let's assume you receive 10 items in the array from the first endpoint and you want to run all the 10 calls to the second endpoint in parallel. In this case you can use the forkJoin operator like this

testMethod() {
  this.testService.getItemList().pipe(
    // transform the array of items into an array of Observables
    // note that the outer map is the rxJs operator while the inner one is the
    // the javascript array method
    // note also that we return the array of items since we will need it later
    map(items => [items.map(item => 
       this.testService.getItemDetails(item.id)), items]
    ),
    // then switch to a new Observable which will emit when all of the calls
    // to the second endpoint have returned
    switchMap(([arrayOfObs, items]) => forkJoin(arrayOfObs).pipe(
       // return both the results of the calls to the second endpoint and the
       // original array of items
       map(itemDetails => [itemDetails, items])
    )),
    // finally augment the original array of items with the detail info
    // as in your original code but with no subscription any more
    tap(([itemDetails, items]) => {
      items.forEach((item, i) => {
        this.itemArr.push(item);
        this.itemArr[i]['details'] = itemDetails[i];
      });
    })
  ).subscribe();
}

NO PARALLELISM

If you do want to execute the calls to the second endpoint sequentially, you can use the concatMap operator like this

testMethod() {
  this.testService.getItemList().pipe(
    // transform the array of items into a new stream which notifies sequentially
    // each item in the array - we use the from rxJs function to create the new stream (i.e. the new Observable)
    switchMap(items => from(items)),
    // then concatenate the calls to the second endpoint with concatMap
    concatMap(item => this.testService.getItemDetails(item.id).pipe(
       // return the original item with its details
       map(itemDetail => {
         item['details'] = itemDetail;
         return item
       })
    )),
    // finally gather all items into an array
    toArray()
  ).subscribe();
}

CONTROLLED CONCURRENCY

If you want a certail level of parallelism, e.g. 5 calls in parallel at max, you can substitute contactMap with mergeMap specifying the level of concurrency using the second parameter of mergeMap.

Sign up to request clarification or add additional context in comments.

Comments

1

Your solution works fine. However, I find the code gets a bit simpler if you first emit the returned items one at a time, which you can achieve by by starting with switchMap(items => items):

testMethod(): Observable<ItemWithDetails[]> {
  return this.testService.getItemList().pipe(
    switchMap(items => items),
    mergeMap(item => this.testService.getItemDetails(item.id).pipe(
        map(details => ({...item, details}))
    )),
    reduce((acc, curr) => [...acc, curr], [])
  );
}
  1. The switchMap looks a little strange, but really just emits the array items one at a time (think of it as switchMap(items => from(items)). Since it internally uses from, it we don't need to include it).

  2. mergeMap takes the item and makes the call to get details, then returns an object with the details appended.

  3. reduce accumulates the results and emits once the observable completes. (If you're interested in emitting as results are received use scan instead. This can be handy when there is lots of data and you want to update the UI with data as it is received)


To make it even cleaner, you could break out the part that appends the details and end up with this:

testMethod(): Observable<ItemWithDetails[]> {
  return this.testService.getItemList().pipe(
    switchMap(items => items),
    mergeMap(item => this.appendItemDetails(item)),
    reduce((acc, curr) => [...acc, curr], [])
  );
}
private appendItemDetails(item: Item): Observable<ItemWithDetails> {
  return this.testService.getItemDetails(item.id).pipe(
    map(details => ({...item, details}))
  );
}

Comments

0

I was able to get it to work by using a combination of the mergeMap, merge, and reduce operators.

testMethod() {
  this.testService.getItemList().pipe(
    mergeMap(items => {
      const item = items.reduce((acc, curr) => {
        const itemArr = this.testService.getItemDetails(curr.id).pipe(
          map(items => ({...curr, items}))
        );
        return [...acc, itemArr];
      },[]);
      return merge(...item);
    }),
    reduce((acc, curr) => ([...acc, curr]),[])
  );
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.