2

Trying to wrap my head around Observables and chaining / nesting HTTP requests.

Let's say I have the following REST endpoints from my dog-walking API back-end, which cannot be changed:

  • GET /dogs (returns all dogs):

    [
        { id: 1, name: 'Fido' },
        { id: 2, name: 'Barky' },
        { id: 3, name: 'Chip' },
        { id: 4, name: 'Bracken' }
    ]
    
  • GET /walker/:id (returns a single dog walker):

    { id: 1, name: 'John Doe' }
    
  • GET /pairings (returns all pairings between dogs and walkers):

    [
        { id: 1, dogIds: [2], walkerId: 1 },
        { id: 2, dogIds: [1, 3], walkerId: 2 }
    ]
    

Business rules

  • A pairing has exactly 1 walker
  • A pairing has a list of 1 or more dogs
  • A dog can be part of 0 or 1 pairings
  • A walker can be part of 0 or 1 pairings

Objective

I want to present a list of all pairings between walkers and dogs, sorted by walker name. I want to sub-sort each walker's dogs by name. I don't want to show any walkers or dogs without an active pairing, e.g:

Walker       | Dogs
-------------+-----------
John Doe     | Barky
Jan Kowalksi | Chip, Fido

My thought process

  1. Request all /pairings and /dogs in parallel
  2. Wait for both of those requests to complete
  3. Loop over each pairing and populate the dogs field
  4. Pluck the walkerId from each pairing and request each /walker/:id in parallel
  5. Wait for all of those requests to complete
  6. Loop over each pairing and populate the walker field

I feel like I could do this quite easily using Promises but I'm struggling to adapt my brain to thinking in Observables. Here's what I've got so far (using Angular's HttpClient):

function getDogWalkerPairings() {
    return Observable.forkJoin([
        this.http.get('/pairings'),
        this.http.get('/dogs')
    ])
        .map(
            (res) => {
                const pairings = res[0];
                const dogs = res[1];

                return pairings.map(p => {
                    const pDogs = p.dogIds.map(dogId =>
                        dogs.find(d => (d.id === dogId)
                    );
                    return Object.assign({ dogs: pDogs }, p);
                });
            }
        )
        .map((pairingsWithDogs) => {
            return Observable.forkJoin(
                pairingsWithDogs.map(p => this.http.get('/walkers/' + p.walkerId))
            );
        })
        .map((walkers) => {
            // uhhh... where to now?
            // I don't have a reference to pairings in this scope :/
        });
}

2 Answers 2

1

Okay, i give it a try :-)

My way was to extract as much as possible into functions. For me this helps to get a better picture. And i changed it from ".map()" to "pipe(map())", the new RxJs style since v5.5.

function getDogWalkerPairings() {
    return Observable.forkJoin([
        this.http.get('/pairings'),
        this.http.get('/dogs')
    ]).pipe(
        map([pairings, dogs] => createPairingsWithDogs(pairings, dogs) ),
        switchMap( pairingsWithDogs => getWalkersForPairs(pairingsWithDogs) )
    )
}

function createPairingsWithDogs(pairing, dogs){
    return pairings.map(pairing => {
        const dogPairings = pairing.dogIds.map(
            dogId => dogs.find( dog => dog.id === dogId)
        );
        return Object.assign( {dogs: dogPairings }, pairing )
    }
}

function getWalkersForPairs(pairingsWithDogs):Observable<any>{
    return Observable.forkJoin(
        pairingsWithDogs.map(p => this.http.get('/walkers/' + p.walkerId))
    ).pipe(
        map( walkerArray => createWalkerDogPairs(walkerArray, pairingsWithDogs) )
    );
}

function createWalkerDogPairs(walkerArray, pairingsWithDogs){
    ...
    return finalResultTable;
}

How does it work? First, i create the pairs, like you did.

Then change the stream (switchMap). And here i use your trick with forkJoin. BUT, when i extract this into it´s own function, i create a new scope... And there i have everything i need. (okay, no cookies there, so not everything... :-( )

If this would be my coding, i would also add a lot of typings. Especially when i switch the type (with "map") this helps me to stay on top of it

Observable.of( [1,2,3,4] ).pipe(
 map( (numbers: number[]): boolean[] => checkOddNumbers(numbers) ),
 tap( (data: boolean[] => console.log(data) )
)

I hope that helps a bit.

warm regards

PS: i know, my method names are aweful... :-(

Sign up to request clarification or add additional context in comments.

3 Comments

This is awesome, thanks so much. The refactoring into functions really helps to clean things up. Am I correct in thinking you've used switchMap so that if the first forkJoin (/pairings and /dogs) emits again, any in-flight /walkers/:id requests will be cancelled and started again from the new pairings?
In Angular a http.get will not emit a second time, it´s completed after it´s first commit. But yes, if we wouldn´t listen to a http observable but something else, that could emits multiple times, switchMap would be my weapon of choice. For http a "concatMap" is also fine. That one starts after completion of the first stream. "switchMap" does its job after every emit
By the way, thank you for this nice problem. Currently i am trying to tackling it in multiple way´s to get a feeling which solution style suits me most. One thing i learned by that, is that i will wrap my http calls in their own method. The reason is, that i am than able to CATCH Errors directly at the source and do things like "retry" or "emit default value", without that the rest of my code is cluttered with special cases. Warm regards
1

Use concatMap to chain the final http call. The tricky part is you need to pass back the pairings you get for the first forkJoin. Here is my answer, and a working example on stackblitz. I mocked your http calls with 500ms delay on them, just simply use your appropriate http calls where I used them.

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Observable, of, forkJoin, merge, Subject } from 'rxjs';
import { map, delay, concatMap, takeUntil } from 'rxjs/operators';

interface IdNamePair {
  id: number;
  name: string;
}

interface Pairing {
  id: number;
  dogIds: Array<number>;
  walkerId: number;
  dogs?: Array<IdNamePair>;
  walker?: IdNamePair;
}

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit, OnDestroy {

  pairings: Array<Pairing>;
  private delay = 500;
  private ngUnsubscribe: Subject<any> = new Subject();

  constructor() { }

  ngOnInit() {
    this.getDogWalkerPairings();
  }

  ngOnDestroy() {
    this.ngUnsubscribe.next();
    this.ngUnsubscribe.complete();
  }

  private getDogWalkerPairings() {
    forkJoin(this.getPairings(), this.getDogs())
      .pipe(
        map(this.mapToPairingsWithDogs),
        concatMap((pairingsWithDogs: Array<Pairing>) => {
          return forkJoin(pairingsWithDogs.map(pair => {
            return forkJoin(this.getWalker(pair.walkerId), of(pair));
          }));
        }),
        map(this.mapToPairingsWithDogsAndWalker),
        takeUntil(this.ngUnsubscribe)
      )
      .subscribe((pairings: Array<Pairing>) => {
        console.log(pairings);
        this.pairings = pairings;
      });
  }

  private mapToPairingsWithDogs(data: [Array<Pairing>, Array<IdNamePair>]): Array<Pairing> {
    const pairings = data[0];
    const dogs = data[1];
    return pairings.map(pairing => {
      const pDogs = pairing.dogIds.map(dogId => dogs.find(d => (d.id === dogId)));
      pairing.dogs = pDogs;
      return pairing;
    });
  }

  private mapToPairingsWithDogsAndWalker(data: Array<[IdNamePair, Pairing]>): Array<Pairing> {
    return data.map(d => {
      const pairing: Pairing = d[1];
      pairing.walker = d[0];
      return pairing;
    });
  }

  private getDogs(): Observable<Array<IdNamePair>> {
    return of([
      { id: 1, name: 'Fido' },
      { id: 2, name: 'Barky' },
      { id: 3, name: 'Chip' },
      { id: 4, name: 'Bracken' }
    ]).pipe(delay(this.delay));
  }

  private getWalker(id: number): Observable<IdNamePair> {
    return of({ id: id, name: id === 1 ? 'John Doe' : 'Jane Doe'}).pipe(delay(this.delay));
  }

  private getPairings(): Observable<Array<Pairing>> {
    return of([
      { id: 1, dogIds: [2], walkerId: 1 },
      { id: 2, dogIds: [1, 3], walkerId: 2 }
    ]).pipe(delay(this.delay));
  }

}

EDIT

Explanation:

  1. forkJoin - will return the pairings and dogs at the same time
  2. map - will pair up dogs to pairings where we find their id
  3. concatMap - will execute the next call, for this we need to do a few things
    • we need to call getWalker for each pairings, and we need to get all their results at the same time, so we map each pairing to a getWalker method call which returns an Observable, and at the end we forkJoin the mapped Observable array
    • the tricky part is that we need to pass the pairings that are only available in the scope of concatMap to the next map or subscribe, for that we need to forkJoin each Observable we get from the getWalker call with an Observable created from the individual pairing.
  4. map - will pair up walker to pairings where we find their id

4 Comments

"ForkJoin" waits until all observables have completed, so there will be allways only one event being emited form forkJoin. And forkJoin will complete after this event. So i think there is no need for unsubscribing. ---> EDIT: Sorry,i did not saw that you are handling the case of an early ngOnDestroy. Than it´s a nice security measure
Really nice code, I'm enjoying reading through this :) I'm slightly confused by this line: return forkJoin(this.getWalker(pair.walkerId), of(pair));. It looks like it might be a neat trick for taking the result of the getWalker() function (an Observable of an IdNamePair) and turning into a tuple along with the original pair object. Is that right? If so that's very clever!
Yes, that is the way i came up with, that passes the result of the first forkJoin to the subscribe at the end.
@chrisf I edited my answer with an explanation, let me know if I can improve it in any way

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.