Here is an implementation of a general purpose tool to create and run a DAG using RxJava. I don't claim to be an expert in Reactive theory or RxJava specifically, and acknowledge I might not be doing things at all optimally. This code uses Lombok and JOOL to eliminate boilerplate; if there's interest I can supply a version in plain old Java.
package reactivedag;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import lombok.NonNull;
import lombok.val;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import static io.reactivex.Observable.*;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.jooq.lambda.Seq.seq;
public class ReactiveDag {
public static <T> Observable<T> runAsReactiveDag(
List<T> ts,
Function<T, List<T>> tPredsFn,
Function<T, List<T>> tSuccsFn,
Consumer<T> jobLogic) {
val ctx = new Context<T>(newFixedThreadPool(ts.size()), tPredsFn, tSuccsFn, jobLogic, new HashMap<>());
val dag = seq(ts)
.map(t -> {
Observable<T> pred = createObservable(t, ctx);
List<T> succTs = ctx.tSuccsFn.apply(t);
return createAndMergeDag(succTs, pred, ctx);
})
.flatMap(Optional::stream)
.findSingle()
.orElseThrow(() -> new RuntimeException("No DAG was created"));
dag.subscribe(x -> {}, x -> {}, () -> {
ctx.executor.shutdown();
});
return dag;
}
private static class Context<T> {
@NonNull final ExecutorService executor;
@NonNull final Function<T,List<T>> tPredsFn;
@NonNull final Function<T, List<T>> tSuccsFn;
@NonNull final Consumer<T> jobLogic;
@NonNull final Map<T, List<Observable<T>>> deferredMerges;
Context(ExecutorService executor, Function<T, List<T>> tPredsFn, Function<T, List<T>> tSuccsFn, Consumer<T> jobLogic, Map<T, List<Observable<T>>> deferredMerges) {
this.executor = executor;
this.tPredsFn = tPredsFn;
this.tSuccsFn = tSuccsFn;
this.jobLogic = jobLogic;
this.deferredMerges = deferredMerges;
}
}
private static <T> Observable<T> createObservable(T t, Context<T> ctx) {
return just(t)
.flatMap(tee -> Observable.<T> create(emitter -> {
ctx.jobLogic.accept(t);
emitter.onNext(t);
emitter.onComplete();
}).subscribeOn(Schedulers.from(ctx.executor)));
}
private static <T> Optional<Observable<T>> handleSuccessor(
Observable<T> upstreamDag,
T succT,
int numUpstreamNodes,
Context<T> ctx) {
if (numUpstreamNodes == 1) {
val newUpstreamDag = concat(upstreamDag, createObservable(succT, ctx));
val newSuccTs = ctx.tSuccsFn.apply(succT);
return createAndMergeDag(newSuccTs, newUpstreamDag, ctx);
} else if (numUpstreamNodes > 1) {
//this successor will have to be merged: either add to deferrals or merge now...
List<Observable<T>> deferred = ctx.deferredMerges.getOrDefault(succT, new ArrayList<>());
if (deferred.size() < numUpstreamNodes - 1) {
//not all merge partners are constructed yet: add to deferrals
deferred.add(upstreamDag);
ctx.deferredMerges.put(succT, deferred); //often redundant, benignly
return Optional.empty();
} else {
//ready to merge: merge the current and all deferred upstreams, remove the deferral entry, and
//continue building the downstream recursively
deferred.add(upstreamDag);
val mergedUpstream = merge(deferred);
ctx.deferredMerges.remove(succT);
return handleSuccessor(mergedUpstream, succT, 1, ctx);
}
} else {
throw new RuntimeException("successor " + succT + " is not expected to have zero predecessors");
}
}
private static <T> Optional<Observable<T>> createAndMergeDag(
List<T> ts,
Observable<T> upstreamDag,
Context<T> ctx) {
if (ts.isEmpty())
return Optional.of(upstreamDag);
val maybeWrappedUpstreamDag = ts.size() > 1
? upstreamDag.cache()
: upstreamDag;
val observables = seq(ts)
.map(succT ->
handleSuccessor(maybeWrappedUpstreamDag, succT, ctx.tPredsFn.apply(succT).size(), ctx))
.flatMap(Optional::stream)
.toList();
return observables
.isEmpty()
? Optional.empty()
: Optional.of(observables.size() > 1
? merge(observables)
: observables.get(0));
}
}
I'm using this tool to process all the objects in a somewhat elaborate model, and it simplifies the task substantially. It's overkill for the OP's problem, but for illustration, here's a solution based on the above ReactiveDag class:
package reactivedag;
import lombok.val;
import org.apache.logging.log4j.Logger;
import static reactivedag.ReactiveDag.runAsReactiveDag;
import static java.lang.Thread.sleep;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.apache.logging.log4j.LogManager.getLogger;
public class Sample {
private static final Logger log = getLogger();
static public void main(String[] args) {
val dag = runAsReactiveDag(asList('A', 'D'),
c -> {
//predecessor getter function
switch (c) {
case 'B': return asList('A');
case 'C': return asList('B', 'D');
case 'E': return asList('D');
default: return emptyList();
}
}, c -> {
//successor getter function
switch (c) {
case 'A': return asList('B');
case 'B': return asList('C');
case 'D': return asList('C', 'E');
default: return emptyList();
}
}, c -> {
//job logic
log.info("Starting " + c);
try {
sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("Ending " + c);
}
);
}
}