40

Im using listeners as callbacks to observe asynchronous operations with Android, but I think that could be great replacing this listeners with RxJava, Im new using this library but I really like it and Im always using it with Android projects.

Here is my code to refactor:

public void getData( final OnResponseListener listener ){
   if(data!=null && !data.isEmpty()){
       listener.onSuccess();
   }
   else{
       listener.onError();
   }
}

A simple callback:

public interface OnResponseListener {
   public void onSuccess();
   public void onError(); 
}

And the "observer":

object.getData( new OnResponseListener() {
    @Override
    public void onSuccess() {
       Log.w(TAG," on success");
    }

    @Override
    public void onError() {
       Log.e(TAG," on error");
    }
});

Thanks!

1

4 Answers 4

39

For example you can use Observable.fromCallable to create observable with your data.

public Observable<Data> getData(){
    return Observable.fromCallable(() -> {
        Data result = null;
        //do something, get your Data object
        return result;
    });
}

then use your data

 getData().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(data -> {
                //do something with your data
            }, error -> {
                //do something on error
            });

Used rxjava 1.x and lambda expressions.

edit:

if I understand you well, you wanted to replace that listener, not wrap it into observable. I added other example in reference to your comment. Oh.. also you should use Single if you are expecting only one item.

public Single<Data> getData() {
        return Single.create(singleSubscriber -> {
            Data result = object.getData();
            if(result == null){
                singleSubscriber.onError(new Exception("no data"));
            } else {
                singleSubscriber.onSuccess(result);
            }
        });
    }

getData().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(data -> {
                //do something with your data
            }, error -> {
                //do something on error
            });
Sign up to request clarification or add additional context in comments.

3 Comments

Thank you. Im going to try your example
Its possible to say: hey, Im ready, like listener.onSuccess() ? for example in a method without return, just fire an event?
I know it is slightly off topic, but having hard time to figure out or find following: how to change from standard Java observable (deprecated in Java 10) docs.oracle.com/javase/7/docs/api/java/util/Observable.html to RxJava Observables? Any tip or pointer for wrapping or replacing? thanks
15

You are looking for Completable.create:

Completable: Represents a deferred computation without any value but only indication for completion or exception. The class follows a similar event pattern as Reactive-Streams: onSubscribe (onError|onComplete)?

Completable.create(subscriber -> {
    object.getData(new OnResponseListener() {
        @Override
        public void onSuccess() {
           subscriber.onCompleted();
        }

        @Override
        public void onError() {
           subscriber.onError(* put appropriate Throwable here *);
        }
    }
})
...//apply Schedulers
.subscribe((() -> *success*), (throwable -> *error*));

Comments

6

How I would refactor your code; alongside getData method, I would add the getData method wrapped as a Single:

public void getData( final OnResponseListener listener ){
    if(data!=null && !data.isEmpty()){
        listener.onSuccess();
    }
    else{
        listener.onError();
    }
}

public Single<Boolean> getDataSingle() {
    return Single.create(new SingleOnSubscribe<Boolean>() {
        @Override
        public void subscribe(SingleEmitter<Boolean> e) throws Exception {
            getData(new OnResponseListener() {
                @Override
                public void onSuccess() {
                    e.onSuccess(true);
                }

                @Override
                public void onError() {
                    e.onSuccess(false);
                }
            });
        }
    });
}

Or with Java 8 :

public Single<Boolean> getDataSingle() {
    return Single.create(e -> getData(
            new OnResponseListener() {
                @Override
                public void onSuccess() {
                    e.onSuccess(true);
                }

                @Override
                public void onError() {
                    e.onSuccess(false);
                }
            })
    );
}

Now you have exposed a Rx API alongside the callback's one. Supposing it's some kind of DataProvider of your own, you can now use it without dealing with callbacks, like this:

dataProvider.getDataSingle()
        .map(result -> result ? "User exist" : "User doesn't exist")
        .subscribe(message -> display(message));

I used Rx2 but with Rx1 the logic is the same.

I also used a Single instead of an Observable, since you await only one value. The interest is a more expressive contract for your function.

You can't emit value on behalf of an Observable, ie calling something like myObservable.send(value). The first solution is to use a Subject. Another solution (the one above) is to create the observable with Observable.create() (or Single.create()). You call the callback method and create the listener inside the method Observable.create(), because it's inside Observable.create() that you can call onSuccess() method, the method who told the Observable to pass down a value.

It's what I use to wrap callback into observable. A bit complicated at first, but easy to adapt.

I give you another example, as asked. Let's say you want to display the changes of an EditText as a Snackbar:

View rootView;
EditText editTextView;

//Wrap Android addTextChangedListener into an Observable
Observable<String> textObservable = Observable.create(consumer ->
        editTextView.addTextChangedListener(new TextWatcher() {
            @Override
            public void beforeTextChanged(CharSequence s, int start, int count, int after) {

            }

            @Override
            public void onTextChanged(CharSequence s, int start, int before, int count) {

            }

            @Override
            public void afterTextChanged(Editable s) {
                consumer.onNext(s.toString());
            }
        })
);

//Use it
textObservable.subscribe(text -> Snackbar.make(rootView, text, Snackbar.LENGTH_SHORT).show());

4 Comments

can you post a full example with lambdas? I am a bit lost trying this code
Added two example, one with your code, one with a the changes of the text property of a TextView.
The listener isn't unregistered when the subscription is disposed resulting in a memory leak.
You are right, and it's even worse than that; if getData() produce a new value after subscription is disposed, it will crash. I should have added a doOnSubscribe in getDataSingle() to remove the listener. The problem doesn't exist (usually) in second example, because emitter and receiver both belong to the same Activity (or Fragment) and will be GC together.
-3
Maybe.<String>create(new MaybeOnSubscribe<String>() {
      @Override
      public void subscribe(MaybeEmitter<String> e) throws Exception {
        OnSuccessListener(uri->{
          e.onSuccess(uri));
        })
        .addOnFailureListener(throwable -> {
          e.onError(throwable);
        });
      }
    });

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.