1

A simple example:

import io.reactivex.Flowable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RxTest {

    private static Logger log = LoggerFactory.getLogger(RxTest.class);

    public static void main(String[] args) throws InterruptedException     {
        log.debug("start");
        RxTest rx = new RxTest();
        Flowable.zip(rx.m1(), rx.m2(), rx::result).subscribe( (r) -> log.debug(r));
    }

    private Publisher<String> m1() throws InterruptedException {
        log.debug("m2");
        String result = new TestRxObj().getObj(4000L, "Hello");
        return Flowable.just(result);
    }

    private Publisher<Long> m2() throws InterruptedException {
        log.debug("m1");
        Long result = new TestRxObj().getObj(100L, 777L);
        return Flowable.just(result);
    }

    private String result(String t1, Long t2) {
        log.debug("result");
        return t1 + "-" + t2;
    }

}

public class TestRxObj {

    public <T> T getObj(Long time, T t) throws InterruptedException {
        Thread.sleep(time);
        return t;
    }

}

So the problem is, that I want to call methods m1() and m2() asynchronously

Right now i have this output:

2018-03-09 22:34:20 [main] DEBUG RxTest: 18 - start
2018-03-09 22:34:20 [main] DEBUG RxTest: 24 - m2
2018-03-09 22:34:24 [main] DEBUG RxTest: 30 - m1
2018-03-09 22:34:24 [main] DEBUG RxTest: 36 - result
2018-03-09 22:34:24 [main] DEBUG RxTest: 20 - Hello-777

On git, they have some example, how to run in new thread:

import io.reactivex.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

But problem with this example, I don't know how to join to(Main/current) threat, to not write Thread.sleep(2000);. I'm running this not on Android, just simple java project, so I cannot .observeOn(AndroidSchedulers.mainThread()), if I correct understand what this doing

My main goal, run 1..N(at least 2 :) ) methods asynchronously with RxJava2, if it can be done without zip, I ok with it, any approach with RxJava without Thread.sleep(...) :)

1
  • 1
    I hope you realize that m1() is invoked and executes before RxJava even gets involved. The only way to return to the Java main thread is to block the subscription or use a blocking scheduler. The former is easier with blockingSubscribe. Commented Mar 9, 2018 at 21:37

1 Answer 1

1

Maybe this could help:

public static void main(String[] args) throws InterruptedException     {
    log.debug("start");
    RxTest rx = new RxTest();
    Flowable.zip(
        Observable.defer(() -> rx.m1()).subscribeOn(Schedulers.io()),
        Observable.defer(() -> rx.m2()).subscribeOn(Schedulers.io()),
        rx::result)
    .subscribe( (r) -> log.debug(r));
}

For testing purposes, sometimes having the Schedulers.io() as a parameter or field is usefule, so you can replace it with a TestScheduler.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.