1

I'm looking for a way to create Observable after processing the result in subscribe.

Given I have this Observable from productRepo.list() which is Retrofit returning Observable<Response<ProductResponse>>.

productRepo
    .list()
    .retry(3)
    .subscribe { response ->
        if (response.isSuccessful) {
            response.body().apply {
                cache.saveProducts(data)
            }
        }
    }

The purpose of this is to save the result into local DB cache. This plus another very similar call populate local DB with remote data from API.

After the two calls completed I wanted to load data from the cache.

I don't want to combine both observable in any way. Just want to run some task afterward.

I want this handling as a unit in Rx call graph so that it does Call1 and Call2 at the same time and once Call1 and Call2 completed run Task3. What's the best way in this scenario? I really prefer if the subscriber for each call is separated.

Is flatMap the best option here?

3 Answers 3

3
.doOnNext()

is your answer, because will return your final response or each response if are multiple. Have a try.

Sign up to request clarification or add additional context in comments.

Comments

1

Have a look at Zip. Do something like, Observable.zip(firstObservable, secondObservable,.....{Task 3}

2 Comments

Zip would merge them into a single stream which is not exactly what I'm looking for.
I was wrong about Zip. I ended up using it as well. I mixed up with merge. My bad!
0

As you have mentioned,

I really prefer if the subscriber for each call is separated.

Assume we have two observables

val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8))

val call2 = Observable.from(arrayOf(2,4,6,8))

If we purely use Observable.zip as following, it only can has single subscriber for both two Call1 & Call2.

Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber)

If we use three separate subscribers as following, the Call1 & Call2 stream will be triggered twice.

call1.subscribe(call1Subscriber)

call2.subscribe(call2Subscriber)

Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber)

Therefore, we need to use .share().cacheWithInitialCapacity(1) to do the tricks

val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8))
  .share()
  .cacheWithInitialCapacity(1)

val call2 = Observable.from(arrayOf(2,4,6,8))
  .share()
  .cacheWithInitialCapacity(1)

val task3Signal = Observable.zip(call1,call2){ c1, c2 ->
  c1 + c2
}
call1.subscribe(call1Subscriber)
call2.subscribe(call2Subscriber)
task3Signal.subscribe(task3Subscriber)

You can also prove/test your concept of the Rx graph from a simple Test case.

class SimpleJUnitTest {

  @Test
  fun test(){

    val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8))
      .doOnNext { println("call1 doOnNext $it") }
      .share()
      .cacheWithInitialCapacity(1)

    val call2 = Observable.from(arrayOf(2,4,6,8))
      .doOnNext { println("call2 doOnNext $it") }
      .share()
      .cacheWithInitialCapacity(1)

    val task3Signal = Observable.zip(call1,call2){ c1, c2 ->
      println("task3Signal c1:$c1, c2: $c2")
      c1 + c2
    }

    val testSubscriber1 = TestSubscriber<Int>()
    val testSubscriber2 = TestSubscriber<Int>()
    val testSubscriber3 = TestSubscriber<Int>()
    call1.subscribe(testSubscriber1)
    call2.subscribe(testSubscriber2)
    task3Signal.subscribe(testSubscriber3)

    testSubscriber1.assertReceivedOnNext(listOf(1,2,3,4,5,6,7,8))
    testSubscriber2.assertReceivedOnNext(listOf(2,4,6,8))
    testSubscriber3.assertReceivedOnNext(listOf(3,6,9,12))
    testSubscriber1.assertValueCount(8)
    testSubscriber2.assertValueCount(4)
    testSubscriber3.assertValueCount(4)


  }
}

Output:

call1 doOnNext 1
call1 doOnNext 2
call1 doOnNext 3
call1 doOnNext 4
call1 doOnNext 5
call1 doOnNext 6
call1 doOnNext 7
call1 doOnNext 8
call2 doOnNext 2
call2 doOnNext 4
call2 doOnNext 6
call2 doOnNext 8
task3Signal c1:1, c2: 2
task3Signal c1:2, c2: 4
task3Signal c1:3, c2: 6
task3Signal c1:4, c2: 8

1 Comment

Thanks for the thorough example. Appreciated your help. I ended up using doOnNext for caching then zip both Observable into one. This make method signature a verbose but it will do.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.