As you have mentioned,
I really prefer if the subscriber for each call is separated.
Assume we have two observables
val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8))
val call2 = Observable.from(arrayOf(2,4,6,8))
If we purely use Observable.zip as following, it only can has single subscriber for both two Call1 & Call2.
Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber)
If we use three separate subscribers as following, the Call1 & Call2 stream will be triggered twice.
call1.subscribe(call1Subscriber)
call2.subscribe(call2Subscriber)
Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber)
Therefore, we need to use .share().cacheWithInitialCapacity(1) to do the tricks
val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8))
.share()
.cacheWithInitialCapacity(1)
val call2 = Observable.from(arrayOf(2,4,6,8))
.share()
.cacheWithInitialCapacity(1)
val task3Signal = Observable.zip(call1,call2){ c1, c2 ->
c1 + c2
}
call1.subscribe(call1Subscriber)
call2.subscribe(call2Subscriber)
task3Signal.subscribe(task3Subscriber)
You can also prove/test your concept of the Rx graph from a simple Test case.
class SimpleJUnitTest {
@Test
fun test(){
val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8))
.doOnNext { println("call1 doOnNext $it") }
.share()
.cacheWithInitialCapacity(1)
val call2 = Observable.from(arrayOf(2,4,6,8))
.doOnNext { println("call2 doOnNext $it") }
.share()
.cacheWithInitialCapacity(1)
val task3Signal = Observable.zip(call1,call2){ c1, c2 ->
println("task3Signal c1:$c1, c2: $c2")
c1 + c2
}
val testSubscriber1 = TestSubscriber<Int>()
val testSubscriber2 = TestSubscriber<Int>()
val testSubscriber3 = TestSubscriber<Int>()
call1.subscribe(testSubscriber1)
call2.subscribe(testSubscriber2)
task3Signal.subscribe(testSubscriber3)
testSubscriber1.assertReceivedOnNext(listOf(1,2,3,4,5,6,7,8))
testSubscriber2.assertReceivedOnNext(listOf(2,4,6,8))
testSubscriber3.assertReceivedOnNext(listOf(3,6,9,12))
testSubscriber1.assertValueCount(8)
testSubscriber2.assertValueCount(4)
testSubscriber3.assertValueCount(4)
}
}
Output:
call1 doOnNext 1
call1 doOnNext 2
call1 doOnNext 3
call1 doOnNext 4
call1 doOnNext 5
call1 doOnNext 6
call1 doOnNext 7
call1 doOnNext 8
call2 doOnNext 2
call2 doOnNext 4
call2 doOnNext 6
call2 doOnNext 8
task3Signal c1:1, c2: 2
task3Signal c1:2, c2: 4
task3Signal c1:3, c2: 6
task3Signal c1:4, c2: 8