9

I'm having trouble getting with some observables. I can't seem to get two observables to place nice together. They work just fine on their own, but I need both values.

db.glass.subscribe( ( glass: GlassData[] ): void => {
  console.log( glass ); // This prints
});

db.cassette_designs.subscribe( ( cassettes: CassetteData[] ): void => {
  console.log( cassettes ): // This prints
});

Not being all that familiar with observables, the first thing I tried was nesting one inside the other, but the inner one doesn't seem to do anything.

db.glass.subscribe( ( glass: GlassData[] ): void => {
  console.log( glass ); // This prints

  db.cassette_designs.subscribe( ( cassettes: CassetteData[] ): void => {
    console.log( cassettes ): // This doesn't
  });
});

This seemed a bit silly to be so I searched on Google to see if there was a better way to combine observables and it turns out there are a few. I tried zip and forkJoin since they looked the most like what I wanted to do, but nothing would work with them either.

Observable.zip( db.cassette_designs, db.glass, ( cassettes: CassetteData[], glass: GlassData[] ): void => {
  console.log( cassettes ); // doesn't print
  console.log( glass     ); // doesn't print
});

Observable.forkJoin( [ db.cassette_designs, db.glass ] ).subscribe( ( data: any ): void => {
  console.log( data ); // doesn't print
});

It could be something as simple as I'm not calling the functions correctly, but I'd have thought I would get some sort of warning or error at some point. tsc doesn't have any problem with the code and I get no messages in the developer consoles on either Chrome or Firefox.

Update

I've tried combineLatest, but it still doesn't display anything in the console. I must be missing something, but I'm not sure what. They work individually.

Observable.combineLatest( db.cassette_designs, db.glass, ( cassettes: CassetteData[], glass: GlassData[] ): void => {
  console.log( cassettes ); // doesn't print
  console.log( glass     ); // deson't print
});

The observables are create in the following way:

...

public Listen( event: string ): Observable<Response>
{
  return new Observable<Response>( ( subscriber: Subscriber<Response> ): Subscription => {
    const listen_func = ( res: Response ): void => subscriber.next( res );

    this._socket.on( event, listen_func );

    return new Subscription( (): void =>
      this._socket.removeListener( event, listen_func ) );
  });
}

...

Then to actually get the observables I send a listen for responses on the relevant event. e.g.

...

public cassette_designs: Observable<CassetteData[]>;

...

this.cassette_designs = _socket.Listen( "get_cassette_designs" )
    .map( ( res: Response ) => res.data.data );
2
  • 1
    SO curious about what your project is! Cassettes and glass and they're all playing together! Love it! Commented Jan 5, 2018 at 8:19
  • 1
    It was an online tool that allowed users to design doors. Commented Jan 6, 2018 at 20:49

6 Answers 6

10

I managed to get combineLatest to work, by actually subscribing to the resulting observable.

Originally I was doing this:

Observable.combineLatest( db.cassette_designs, db.glass, ( cassettes: CassetteData[], glass: GlassData[] ): void => {
  console.log( cassettes );
  console.log( glass     );
});

Now I'm doing this:

Observable.combineLatest( db.cassette_designs, db.glass ).subscribe( ( data: any[] ): void => {
  console.log( data );
  // cassettes - data[0]
  // glass     - data[1]
});
Sign up to request clarification or add additional context in comments.

Comments

7

To follow up with your findings:

1) An Observable is a lazy execution data type, this means that it will not execute anything in the pipeline until it is subscribed to. This goes for the combinatorial operators as well. zip, forkJoin, combineLatest, and withLatestFrom will all recursively subscribe to the Observables that you pass them only after they themselves have subscriptions.

Hence:

var output = Observable.combinelatest(stream1, stream2, (x, y) => ({x, y}));

Won't actually do anything until you call output.subscribe(), at which point subscribe is also called on stream1 and stream2 and you get all the magic of Rx.

2) A more minor point but whenever you start doing your own creation methods, have a look first at the documentation to see if it already exists. There are static creation methods for, Arrays, Promises, Node-style callbacks and yes even standard event patterns.

Thus your Listen method can become:

public Listen<R>(event: string): Observable<R> {
  return Observable.fromEvent(this._socket, event);
}

1 Comment

That makes a lot of sense. In regards to the way I'm getting observables, the main reason I'm doing it the way I am is to make sure that unsubscribing from the observable also removes the listener. I'll take a look through the documentation and see what I can find.
1

Hey if you want to have a single stream (observable) that emits a combination data of the 2 sources, check out combineLatest method.

Comments

1

Both of them: Observable.zip and Observable.forkJoin must work. (Personally, I prefer 'forkJoin' - it returns an array with the same order, as you pushed observables, and you don't need a plenty of arguments)

Maybe, if you are creating observables manually (Observable.create...), you just forgot to call 'completed' for both provided Observers from 'create' method.

Comments

0

I've used combineLatest. I needed three API calls, but I wanted only one Object combining the three responses.

I've followed to formula mentioned in this same post:

var output = Observable.combinelatest(stream1, stream2, (x, y) => ({x, y}));

Being the final code:

getGobalStats(): Observable<any> {

    let obs1 = this._http.get(this._config.getGlobalStatsUrl(), this._config.getOptions())
      .map((res: Response) => {
        return res.json().content;
      })
      .catch((error: any) => { console.error(error); return error; });
    let obs2 = this._http.get(this._config.getGlobalStatsUrl() + '?type=1', this._config.getOptions())
      .map((res: Response) => {
        return res.json().content;
      })
      .catch((error: any) => { console.error(error); return error; });
    let obs3 = this._http.get(this._config.getGlobalStatsUrl() + '?type=3', this._config.getOptions())
      .map((res: Response) => {
        return res.json().content;
      })
      .catch((error: any) => { console.error(error); return error; });

      return Observable.combineLatest(obs1,obs2,obs3,(res1,res2,res3) => { return {all:res1,running: res2, cycling: res3}});
  }

Comments

0
        Observable.merge(
            Observable.fromEvent(canvasEl, 'mousedown'), Observable.fromEvent(canvasEl, 'touchstart'))
            .switchMap((e) => {
                return Observable.merge(
                    Observable.fromEvent(canvasEl, 'mousemove').takeUntil(Observable.fromEvent(canvasEl, 'mouseup')),
                    Observable.fromEvent(canvasEl, 'touchmove').takeUntil(Observable.fromEvent(canvasEl, 'touchend')),
                )
                    .pairwise()
            })
            .subscribe(...

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.