2

I am having a really hard time understanding RxJS and how to handle errors. I am using Angular (4+) and with their switch to RxJS for simple HTTP requests, I am finding myself having to wrestle really hard with something that seems trivial.

Here is my very contrived code:

import { Component, OnInit } from '@angular/core';

import 'rxjs/add/observable/from';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/empty';
import 'rxjs/add/observable/throw';

import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/switchMap';

import { Observable } from 'rxjs/Observable';
import { Subscription } from 'rxjs/Subscription';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';

@Component({
    selector: 'app-root',
    templateUrl: './app.component.html',
    styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
    title = 'app';
    index = 0;
    sub1: Subscription;
    fail = false;

    // the event generated by the user (click)
    eventA = new BehaviorSubject<number>(0);

    // another event, downstream from eventA
    eventB = this.eventA.mergeMap(x => this.fetchData(x));

    constructor() { }

    ngOnInit(): void {
        this.sub1 = this.eventB.catch(err => {
            console.log(`error was caught!`);
            return this.eventB;
        }).subscribe(x => {
            this.title = x.toString();
        });
    }

    doIt() {
        this.eventA.next(Date.now());
    }

    fetchData(input: number) {
        if (this.fail) {
            return Observable.throw(new Error(`I failed!`));
        }
        return Observable.of(input);
    }
}

and the html:

<input type="checkbox" [checked]="fail" (change)="fail = !fail" >Fail<br>
<button (click)="doIt()">doit()</button><br>
{{title}}

And here is the demo

demo GIF

As you can see, once it fails, the pipeline is not executed anymore. I can confirm that eventA is still good and so is eventB. However, it appears that sub1 is unsubscribed.

That last part is what I don't understand. I am explicitly returning eventB so that it can continue... Or do I have this all wrong?

My use case is this:

  • I have a business event (eventA) that forces data to be refreshed in the application.
  • Instead of forcing all the components in the application to listen to that event and then request their new data, I want to leverage the power of Rx to simply have the data flow through the application.
  • Therefore, my components subscribe to various downstream events
  • The problem I have is that, like any data call, it can have errors. And I want each component to handle the error on its own.
  • The downstream event can produce errors for those bad data calls, so how do I let subscribers know of errors without killing the stream?
8
  • 1
    The scenario needs more thought, but one thing I can say for sure is that an error notification ends the subscription. If you want to catch and continue subscribing, a switchMap() might work. Commented Jan 17, 2018 at 23:28
  • @RichardMatsen I did try switchMap() instead of mergeMap() but without any difference in the outcome. I understand that an error ends the subscription, but this seems to be a fairly trivial use case for RxJS. Commented Jan 17, 2018 at 23:30
  • 1
    I was thinking of switchMap where the catch is. Commented Jan 17, 2018 at 23:32
  • 1
    I'm setting up a testbed to see if it's viable - sorry I'm a bit vague on the pattern at the moment - just looking for a substitute for catch. Commented Jan 17, 2018 at 23:37
  • 1
    I don't fully grasp it yet but the solution here seems to work for your example: stackoverflow.com/questions/38649652/… I put it in a plnkr here plnkr.co/edit/u9vpCR8ltCwtsaTH0Cxl?p=preview Commented Jan 18, 2018 at 0:26

1 Answer 1

2

Using a .switchMap() on the trigger observable to get a new fetch each click, and catching within the switchMap instead of after eventB.

switchMap effectively gives a new inner subscription each time, so if the old one closes because of an error, the next click it will start it again.

The only potential down side is that a fetch from 1st click is thrown away when another click happens before the fetch finishes.

eventB = this.eventA.switchMap(x => {
  return this.fetchData(x)
    .catch(err => {
      console.log(`error was caught!`, err);
      return Observable.empty();
    })
});

...

ngOnInit(): void {
  this.sub1 = this.eventB
    .subscribe(x => {
        this.title = x.toString();
      }
    );

Here's the StackBlitz I tested on.

Sign up to request clarification or add additional context in comments.

3 Comments

Thanks for the info. I am torn a bit because technically the events are in different services. But I think I can leverage this to generate 2 different events as I mentioned previously. This might actually make things cleaner. Thanks for your help. I am warming up to RxJS, but it's quite a steep learning curve...
If by separate streams, you want to subscribe to errors separately you might need onErrorResumeNext() after the catch and return Observable.of(err) inside it (just some ideas to try).
So, I keep hearing and reading about this onErrorResumeNext(), but I can't find any documentation about it on the RxJS website for Observable Am I missing something?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.