1

What is the best way to handle a server reconnection using RxJS and Angular 2?

This is what I have so far:

import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';

import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import 'rxjs/add/operator/takeUntil';
import 'rxjs/add/observable/throw';

@Injectable()
export class ConnectionService {

  // Stores the current status of the connection, false = not connected
  serviceOnline: BehaviorSubject<boolean> = new BehaviorSubject(false)

  constructor(
    private http: Http
  ) {
    this.serviceOnline.asObservable().subscribe((online) => {
      console.log("Service online: " + JSON.stringify(online))
    })
    setTimeout( () => this.setServiceOffline(), 2000 );
  }

  setServiceOffline() {
    console.log("Setting service offline");
    this.serviceOnline.next(false);
    this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline).subscribe();
  }

  testConnection(): Observable<boolean> {
    console.log("Testing connection");
    return new Observable(observer => {
      this.http.get(`http://endpoint/ping`)
        .timeout(5000)
        .catch((error: any) => Observable.throw(error))
        .subscribe(() => {
          console.log("Connection successful");
          if(this.serviceOnline.getValue() == false)
            this.serviceOnline.next(true);
          observer.next(true);
          observer.complete();
        }, (error) => {
          console.log("Connection failed");
          if(this.serviceOnline.getValue() == true)
            this.serviceOnline.next(false);
          observer.next(false);
          observer.complete();
        });
    })
  }
}

Result in the console:

Service online: false
Setting service offline
Service online: false
Testing connection
Connection failed

If I replace this line..

this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline).subscribe();

with

this.testConnection().delay(2000).repeat().take(5).subscribe();

I do get the repeats:

Service online: false
Setting service offline
Service online: false
Testing connection
Connection failed
Connection failed
Connection failed
Connection failed
Connection failed

From the documentation:

public takeUntil(notifier: Observable): Observable

Emits the values emitted by the source Observable until a notifier Observable emits a value.

The serviceOnline BehaviorSubject is only updated if there is a change in the server status. After this line...

this.serviceOnline.next(false);

...serviceOnline only emits another value when the http service has returned a successful response and if the current online status is false (disconnected):

if(this.serviceOnline.getValue() == false)
   this.serviceOnline.next(true);

So takeUntil should work. What am I misunderstanding/missing here?

I am new to RxJS - any other pointers on improving the implementation in this code would also be greatly appreciated...

Here is a Plnkr: http://plnkr.co/edit/WUKxH6nFxc8LMKcxDPql?p=info

RxJS version 5.4. Angular version 4.


Returning serviceOnline as observable doesn't fix the issue: this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline.asObservable()).subscribe();

2 Answers 2

1

I think the problem occurs because this.serviceOnline always emits the last value when subscribed to, as it is an instance of BehaviorSubject. When you pass it to the takeUntil operator as a notifier observable, takeUntil subscribes to it and immediately receives a notification which leads to immediate completion of the resulting observable. Note that it doesn't matter whether the notifier observable emits true or false, takeUntil completes whenever it receives any value.

Try .takeUntil(this.serviceOnline.filter(online => online === true)) instead.

Sign up to request clarification or add additional context in comments.

Comments

0

Hmmm Have you tried doing this?:

this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline.asObservable()).subscribe();

6 Comments

Yeah, I also tried that. I'll add that to the question. Thanks
Hmm so, I recommend you to try debug your serviceOnline, something like this: constructor(private http: Http) { this.serviceOnline.asObservable().subscribe(notification => { console.log(notification) }) }
Yes, I also wondered if serviceOnline was immediately emitting a value and causing takeUntil to fire before repeating the testConnection observable, but it doesn't. A false value is emitted after the 'this.serviceOnline.next(false)' call and never again unless a successful http request is made. In that case it should just repeat, as it does when using Take (which gives limited repeats). Weird huh?
Maybe first subscribe then send an event? If i get it right. setServiceOffline() { this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline).subscribe(); this.serviceOnline.next(false); }
I've updated the answer with some console outputs. So the interesting thing is that 1 call to testConnection is getting through when using takeUntil...
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.