I'm working on converting a blocking sequential orchestration framework to reactive. Right now, these tasks are dynamic and are fed into the engine by a JSON input. The engine pulls classes and executes the run() method and saves the state with the responses from each task.
How do I achieve the same chaining in reactor? If this was a static DAG, I would have chained it with flatMap or then operators but since it is dynamic, How do I proceed with executing a reactive task and collecting the output from each task?
Examples:
Non reactive interface:
public interface OrchestrationTask {
OrchestrationContext run(IngestionContext ctx);
}
Core Engine
public Status executeDAG(String id) {
IngestionContext ctx = ContextBuilder.getCtx(id);
List<OrchestrationTask> tasks = app.getEligibleTasks(id);
for(OrchestrationTask task : tasks) {
// Eligible tasks are executed sequentially and results are collected.
OrchestrationContext stepContext = task.run(ctx);
if(!evaluateResult(stepContext)) break;
}
return Status.SUCCESS;
}
Following the above example, if I convert tasks to return Mono<?> then, how do I wait or chain other tasks to operate on the result on previous tasks? Any help is appreciated. Thanks.
Update::
Reactive Task example.
public class SampleTask implements OrchestrationTask {
@Override
public Mono<OrchestrationContext> run(OrchestrationContext context) {
// Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
return Mono.just(context).delayElements(Duration.ofSeconds(2));
}
So i will have a series of tasks that accomplish various things but the response from each task is dependent on the previous and is stored in the Orchestration Context. Anytime an error is occurred, the orchestration context flag will be set to false and the flux should stop.
SUCCESS, but will only executeevaluateResult()on eachStepContextuntil one returns false. Is that the behaviour you'd need to emulate in a reactive implementation?