4

I was reading a tutorial:

http://code.tutsplus.com/tutorials/getting-started-with-reactivex-on-android--cms-24387

which concers RxAndroid in particular but it's pretty much the same as in RxJava. I am not sure that I understood the concept completely.

Below I have written a method and then a sample usage.

My question is: is this the right way to implement my functions so that I can run them on other threads asynchronously? They will in fact only return a created Observable running the real code, and handling errors and all that stuff.

Or is this wrong, then I'd like to know the correct way.

Observable<String> googleSomething(String text){
    return Observable.create(new Observable(){
        @Override
        public void call(Subscriber<? super String> subscriber) {
             try {
                String data = fetchData(text); // some normal method
                subscriber.onNext(data); // Emit the contents of the URL
                subscriber.onCompleted(); // Nothing more to emit
            } catch(Exception e) {
                subscriber.onError(e); // In case there are network errors
            }
        }
    });
}

googleSomething("hello world").subscribeOn(Schedulers.io()).observeOn(Schedulers.immediate()).subscribe(...)

Also is Schedulers.immediate() used in order to execute the subscriber code on the current thread? It says "Creates and returns a Scheduler that executes work immediately on the current thread." in javadoc, but I'm not sure.

4 Answers 4

9

Unless you are more experienced and need a custom operator or want to bridge a legacy addListener/removeListener based API you should not start with create. There are several questions on StackOverflow which used create and was the source of trouble.

I'd prefer fromCallable which let's you generate a single value or throw an Exception thus no need for those lengthy defer + just sources.

Schedulers.immediate() executes its task immediately on the caller's thread, which is the io() thread in your example, not the main thread. Currently, there is no support for moving back the computation to the Java main thread as it requires blocking trampolining and usually a bad idea anyway.

Sign up to request clarification or add additional context in comments.

2 Comments

What is the default behaviour if I do not specify observeOn thread? Which thread should I use then for subscribers?
Depends on what you do with the results of the sequence.
2

You should almost never use create(), especially not as a beginner. There are easier ways to create observables, and create() is difficult to implement correctly.

Most of the time, you can easily get around create() by using defer(). E.g., in this case you'd do:

Observable<String> googleSomething(String text) {
  return Observable.defer(new Func0<Observable<String>>() {
    @Override
    public Observable<String> call() {
      try {
        return Observable.just(fetchData(text));
      } catch (IOException e) {
        return Observable.error(e);
      }
    }
  });
}

If you're not using a checked exception, then you could even get rid of the try-catch. RxJava will automatically forward any RuntimeException to the onError() part of the subscriber.

2 Comments

What about Schedulers.immediate() ? What the difference between using this Scheduler or just not calling .observeOn completely?
immediate() does nothing in this case - it executes on the current thread, which is the one setup by io(). It'd be no different to leave it out. What you really need to examine is exactly which thread you're wanting the subscription on and setup a Scheduler for that.
1

You can create Observable via Observable.create(new OnSubscribe {}) method however:

  • Look at defer() operator, which allows you to return for example Observable.just() and Observable.error() so you don't need to touch subscriber directly
  • Prefer using SyncOnSubscribe/AsyncOnSubscribe to handle backpressure

Schedulers.immediate() will keep Observable processing on the thread it already is - so in your case it will be one of the Schedulers.io threads

Comments

1

Your code looks good to me. If you are unsure wether that is running on another thread or not. you could print something immediately after you call .subscribe() and see the order of the outputs.

googleSomething("hello world").subscribeOn(Schedulers.io()).observeOn(Schedulers.immediate()).subscribe(...) 
System.out.println("This should be printed first");

Try to simulate a long running operation inside fetchData() and print something else immediately afterwards. As .subscribe() is non blocking "This should be printed first" is in fact, going to be printed first.

Alternatively, you can print the current thread using.

Thread.currentThread().getName()

Use this inside and outside your observable and the outputs should differ.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.