0

I am trying to chain few observables together and do some action depending what observable has been executed. But I faced with strange behavior.

class MainActivity : AppCompatActivity() {

    val TAG: String = MainActivity::class.java.name

    private lateinit var clicker: TextView

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        clicker = findViewById(R.id.clicker) as TextView
        clicker.setOnClickListener {
            val i = AtomicInteger()

            getFirstObservable()
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext {
                    showMessage(i, it)
                }
                .flatMap { getSecondObservable() }
                .doOnNext {
                    showMessage(i, it)
                }
                .flatMap { getThirdObservable() }
                .doOnNext {
                    showMessage(i, it)
                }
                .subscribe()
        }
    }

    fun getFirstObservable(): Observable<String> {
        return Observable.fromCallable {
            Thread.sleep(2000)
            "Hello"
        }
    }

    fun getSecondObservable(): Observable<Int> {
        return Observable.fromCallable {
            Thread.sleep(2000)
            3
        }
    }

    fun getThirdObservable(): Observable<String> {
        return Observable.fromCallable {
            Thread.sleep(2000)
            "World!"
        }
    }

    fun showMessage(i: AtomicInteger, obj: Any) {
        val msg = "Message #${i.incrementAndGet()}: from ${Thread.currentThread().name}: $obj"
        Log.e(TAG, msg)
        clicker.text = msg
        Toast.makeText(this, msg, Toast.LENGTH_SHORT).show()
    }
}

In this example logs would be shown every 2 seconds but all changes with views would be done when the last observable will be finished.

12-04 01:11:30.465 19207-19207/com.googlevsky.rxtest E/com.googlevsky.rxtest.MainActivity: Message #1: from main: Hello
12-04 01:11:32.473 19207-19207/com.googlevsky.rxtest E/com.googlevsky.rxtest.MainActivity: Message #2: from main: 3
12-04 01:11:34.479 19207-19207/com.googlevsky.rxtest E/com.googlevsky.rxtest.MainActivity: Message #3: from main: World!

I think it is behavior of AndroidScheduler.mainThread(), because when I remove this line and wrap changes with views like this

Handler(Looper.getMainLooper()).post {
    clicker.text = msg
    Toast.makeText(this, msg, Toast.LENGTH_SHORT).show()
}

behavior becomes right. So can someone explain this behavior and suggest a correct way to solve this problem?

2
  • doOnNext/flatMap and others are called not on main thread. observeOn - sets scheduler for param passed to subscribe() method. Commented Dec 3, 2016 at 23:10
  • @Ufkoku i added logs to question. doOnNext called on main thread Commented Dec 3, 2016 at 23:15

1 Answer 1

2

Most of your code is being executed on the the main thread, including the sleeps. When an observable is created, it is subscribed on and observed on the current thread unless otherwise specified. When you create your second and third observables, they are on the main thread. Furthermore, since there is no backgrounding of the observable work, it is executed immediately on the current thread when you subscribe. Therefore, all the work and observations happen on the main thread without yielding back to the android OS. The UI is blocked waiting for time on the main thread. If you increase those sleep times, you can force an ANR. To fix it, you can specify observeOn and subscribeOn for each of your observables to push the work to a computation thread for each of them.

getFirstObservable().subscribeOn(Schedulers.computation())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext {
                        showMessage(i, it)
                    }
                    .flatMap {
                        getSecondObservable().subscribeOn(Schedulers.computation())
                                             .observeOn(AndroidSchedulers.mainThread()) 
                    }
                    .doOnNext {
                        showMessage(i, it)
                    }
                    .flatMap { 
                        getThirdObservable().subscribeOn(Schedulers.computation())
                                            .observeOn(AndroidSchedulers.mainThread()) 
                    }
                    .doOnNext {
                        showMessage(i, it)
                    }
                    .doOnNext {
                        showMessage(i, it)
                    }
                    .subscribe()
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.