2

I'm learning rxjava and converting some of my codebase to see how it works. Currently I am trying to wrap my head around using Observables or Completables to accomplish executing tasks (task execution has no relevant return value) with dependencies such as:

Execution

Tasks: A, B, C, D, E

  • B depends on A
  • C depends on B, D
  • E depends on D

So the task execution could look like:

execute A, D  
D completes -> execute E  
A completes -> execute B  
B completes -> execute C (B, D both completed)  

Questions

  • Taking in an arbitrary dependency graph is something like this a good use case for rxjava?
  • Is this even a use case for observables/completables?
  • If so: what is a technique for implementing such behavior?

3 Answers 3

4

You can use concat() and merge() operators to achieve this.

Here is how it can be done :

package rxtest;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;


public class RxJavaDagTest {
    private static final Logger logger = LoggerFactory.getLogger(RxJavaDagTest.class);
    private static Executor customExecutor = Executors.newFixedThreadPool(20);

    @Test
    public void stackOverflowTest() {
        Observable<Character> a = createObservable('A', 100);
        Observable<Character> b = createObservable('B', 200);
        Observable<Character> c = createObservable('C', 500);
        Observable<Character> d = createObservable('D', 150);
        Observable<Character> e = createObservable('E', 200);

        logger.info("BEGIN");

        // As Observable for D is referred at two places in the graph, it needs to be cached to not to execute twice
        Observable<Character> dCached = d.cache();

        Observable.merge(
                Observable.concat(
                        Observable.merge(
                                Observable.concat(a, b),
                                dCached),
                        c),
                Observable.concat(dCached, e))
                .toBlocking()
                .subscribe(i -> logger.info("Executed : " + i));

        logger.info("END");
    }

    private Observable<Character> createObservable(char c, int sleepMs) {
        Observable<Character> single = Observable.just(c)
                .flatMap(i -> Observable.<Character> create(s -> {
                    logger.info("onSubscribe Start Executing : {}", i);
                    sleep(sleepMs);
                    s.onNext(Character.valueOf(i));
                    s.onCompleted();
                }).subscribeOn(Schedulers.from(customExecutor)));
        return single;
    }

    private void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch (InterruptedException e) {
        }
    }
}

Output will be :

20:47:05.633 [main] INFO rxtest.RxJavaDagTest BEGIN
20:47:05.745 [pool-1-thread-1] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : A
20:47:05.748 [pool-1-thread-2] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : D
20:47:05.849 [main] INFO rxtest.RxJavaDagTest Executed : A
20:47:05.850 [pool-1-thread-3] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : B
20:47:05.899 [main] INFO rxtest.RxJavaDagTest Executed : D
20:47:05.899 [main] INFO rxtest.RxJavaDagTest Executed : D
20:47:05.899 [pool-1-thread-4] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : E
20:47:06.051 [main] INFO rxtest.RxJavaDagTest Executed : B
20:47:06.051 [pool-1-thread-5] INFO rxtest.RxJavaDagTest onSubscribe Start Executing : C
20:47:06.100 [main] INFO rxtest.RxJavaDagTest Executed : E
20:47:06.552 [main] INFO rxtest.RxJavaDagTest Executed : C
20:47:06.552 [main] INFO rxtest.RxJavaDagTest END
Sign up to request clarification or add additional context in comments.

1 Comment

This example executes in proper order, then hangs. I think something needs to be added to terminate the executor threads, like ExecutorService.shutdown() maybe, but I'm not sure where it should be added.
1

Here is an implementation of a general purpose tool to create and run a DAG using RxJava. I don't claim to be an expert in Reactive theory or RxJava specifically, and acknowledge I might not be doing things at all optimally. This code uses Lombok and JOOL to eliminate boilerplate; if there's interest I can supply a version in plain old Java.

package reactivedag;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import lombok.NonNull;
import lombok.val;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;

import static io.reactivex.Observable.*;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.jooq.lambda.Seq.seq;

public class ReactiveDag {

    public static <T> Observable<T> runAsReactiveDag(
            List<T> ts,
            Function<T, List<T>> tPredsFn,
            Function<T, List<T>> tSuccsFn,
            Consumer<T> jobLogic) {
        val ctx = new Context<T>(newFixedThreadPool(ts.size()), tPredsFn, tSuccsFn, jobLogic, new HashMap<>());

        val dag = seq(ts)
                .map(t -> {
                    Observable<T> pred = createObservable(t, ctx);
                    List<T> succTs = ctx.tSuccsFn.apply(t);
                    return createAndMergeDag(succTs, pred, ctx);
                })
                .flatMap(Optional::stream)
                .findSingle()
                .orElseThrow(() -> new RuntimeException("No DAG was created"));
        dag.subscribe(x -> {}, x -> {}, () -> {
            ctx.executor.shutdown();
        });
        return dag;
    }

    private static class Context<T> {
        @NonNull final ExecutorService executor;
        @NonNull final Function<T,List<T>> tPredsFn;
        @NonNull final Function<T, List<T>> tSuccsFn;
        @NonNull final Consumer<T> jobLogic;
        @NonNull final Map<T, List<Observable<T>>> deferredMerges;

        Context(ExecutorService executor, Function<T, List<T>> tPredsFn, Function<T, List<T>> tSuccsFn, Consumer<T> jobLogic, Map<T, List<Observable<T>>> deferredMerges) {
            this.executor = executor;
            this.tPredsFn = tPredsFn;
            this.tSuccsFn = tSuccsFn;
            this.jobLogic = jobLogic;
            this.deferredMerges = deferredMerges;
        }
    }

    private static <T> Observable<T> createObservable(T t, Context<T> ctx) {
        return just(t)
                .flatMap(tee -> Observable.<T> create(emitter -> {
                    ctx.jobLogic.accept(t);
                    emitter.onNext(t);
                    emitter.onComplete();

                }).subscribeOn(Schedulers.from(ctx.executor)));
    }

    private static <T> Optional<Observable<T>> handleSuccessor(
        Observable<T> upstreamDag,
        T succT,
        int numUpstreamNodes,
        Context<T> ctx) {

        if (numUpstreamNodes == 1) {
            val newUpstreamDag = concat(upstreamDag, createObservable(succT, ctx));
            val newSuccTs = ctx.tSuccsFn.apply(succT);
            return createAndMergeDag(newSuccTs, newUpstreamDag, ctx);

        } else if (numUpstreamNodes > 1) {
            //this successor will have to be merged: either add to deferrals or merge now...
            List<Observable<T>> deferred = ctx.deferredMerges.getOrDefault(succT, new ArrayList<>());
            if (deferred.size() < numUpstreamNodes - 1) {
                //not all merge partners are constructed yet: add to deferrals
                deferred.add(upstreamDag);
                ctx.deferredMerges.put(succT, deferred); //often redundant, benignly
                return Optional.empty();
            } else {
                //ready to merge: merge the current and all deferred upstreams, remove the deferral entry, and
                //continue building the downstream recursively
                deferred.add(upstreamDag);
                val mergedUpstream = merge(deferred);
                ctx.deferredMerges.remove(succT);
                return handleSuccessor(mergedUpstream, succT, 1, ctx);
            }
        } else {
            throw new RuntimeException("successor " + succT + " is not expected to have zero predecessors");
        }

    }

    private static <T> Optional<Observable<T>> createAndMergeDag(
            List<T> ts,
            Observable<T> upstreamDag,
            Context<T> ctx) {

            if (ts.isEmpty())
                return Optional.of(upstreamDag);

            val maybeWrappedUpstreamDag = ts.size() > 1
                    ? upstreamDag.cache()
                    : upstreamDag;

            val observables = seq(ts)
                .map(succT ->
                    handleSuccessor(maybeWrappedUpstreamDag, succT, ctx.tPredsFn.apply(succT).size(), ctx))
                .flatMap(Optional::stream)
                .toList();

            return observables
                .isEmpty()
                    ? Optional.empty()
                    : Optional.of(observables.size() > 1
                    ? merge(observables)
                    : observables.get(0));
    }
}

I'm using this tool to process all the objects in a somewhat elaborate model, and it simplifies the task substantially. It's overkill for the OP's problem, but for illustration, here's a solution based on the above ReactiveDag class:

package reactivedag;

import lombok.val;
import org.apache.logging.log4j.Logger;

import static reactivedag.ReactiveDag.runAsReactiveDag;
import static java.lang.Thread.sleep;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.apache.logging.log4j.LogManager.getLogger;

public class Sample {
    private static final Logger log = getLogger();

    static public void main(String[] args) {
        val dag = runAsReactiveDag(asList('A', 'D'),
            c -> {
                //predecessor getter function
                switch (c) {
                    case 'B': return asList('A');
                    case 'C': return asList('B', 'D');
                    case 'E': return asList('D');
                    default: return emptyList();
                }
            }, c -> {
                //successor getter function
                switch (c) {
                    case 'A': return asList('B');
                    case 'B': return asList('C');
                    case 'D': return asList('C', 'E');
                    default: return emptyList();
                }
            }, c -> {
                //job logic
                log.info("Starting " + c);
                try {
                    sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                log.info("Ending " + c);
            }
        );
    }
}

Comments

0
  1. Taking in an arbitrary dependency graph is something like this a good use case for rxjava?

Well, based on the RxJava documentation, it seems definitely a use case for RxJava

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

  1. Is this even a use case for observables/completables?

If you don't need the result of the operation, but you just need to know if it completes, then Completable is good.

  1. If so: what is a technique for implementing such behavior?

I don't know if there is a specific technique that can help transforming your problem in RxJava code.

I would do something like this:

Single.zip(
            executeA()
                    .subscribeOn(Schedulers.newThread())
                    .andThen(executeB())
                    .toSingleDefault(""),
            executeD()
                    .subscribeOn(Schedulers.newThread())
                    .andThen(new CompletableSource() {
                        @Override
                        public void subscribe(@NonNull CompletableObserver cs) {
                            executeE().subscribeOn(Schedulers.newThread())
                                    .subscribe(() -> Log.d("test", "complete E"));
                            cs.onComplete();
                        }
                    })
                    .toSingleDefault(""),
            (BiFunction<String, String, Object>) (s, s2) -> s)
            .flatMapCompletable(o -> CompletableObserver::onComplete)
            .andThen(executeC())
            .doOnSubscribe(disposable -> Log.d("test", "start"))
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(() -> Log.d("test", "complete C"));

where the tasks are defined as Completable in the following way:

private Completable executeA() {
    return Completable.create(e -> {
        Thread.sleep(4000);
        Log.d("test", "completing A");
        e.onComplete();
    });
}

private Completable executeB() {
    return Completable.create(e -> {
        Thread.sleep(12000);
        Log.d("test", "completing B");
        e.onComplete();
    });
}

private Completable executeC() {
    return Completable.create(e -> {
        Thread.sleep(3000);
        Log.d("test", "completing C");
        e.onComplete();
    });
}

private Completable executeD() {
    return Completable.create(e -> {
        Thread.sleep(10000);
        Log.d("test", "completing D");
        e.onComplete();
    });
}

private Completable executeE() {
    return Completable.create(e -> {
        Thread.sleep(10000);
        Log.d("test", "completing E");
        e.onComplete();
    });
}

NOTE: I'm not sure about the approach that I used to solve the problem of andThen part attached to executeD(). We cannot just concatenate D and E in this way :

executeD().andThen(executeE())

because otherwise the task C will start after E, but we want it to start after D. This is way I created the CompletableSource that execute the task C and at the same time allow the continuation of the execution calling onComplete().

Note also that in this implementation I have two subscriptions: one for each end of path (end of C and end of E).

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.