1

I'm working on converting a blocking sequential orchestration framework to reactive. Right now, these tasks are dynamic and are fed into the engine by a JSON input. The engine pulls classes and executes the run() method and saves the state with the responses from each task.
How do I achieve the same chaining in reactor? If this was a static DAG, I would have chained it with flatMap or then operators but since it is dynamic, How do I proceed with executing a reactive task and collecting the output from each task?

Examples:
Non reactive interface:

public interface OrchestrationTask {
 OrchestrationContext run(IngestionContext ctx);
}

Core Engine

public Status executeDAG(String id) {
  IngestionContext ctx = ContextBuilder.getCtx(id);
  List<OrchestrationTask> tasks = app.getEligibleTasks(id);

  for(OrchestrationTask task : tasks) {
    // Eligible tasks are executed sequentially and results are collected.
    OrchestrationContext stepContext = task.run(ctx);
    if(!evaluateResult(stepContext)) break;
  }
  return Status.SUCCESS;
}

Following the above example, if I convert tasks to return Mono<?> then, how do I wait or chain other tasks to operate on the result on previous tasks? Any help is appreciated. Thanks.

Update::

Reactive Task example.

public class SampleTask implements OrchestrationTask {  
  @Override
  public Mono<OrchestrationContext> run(OrchestrationContext context) {  
  // Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
  return Mono.just(context).delayElements(Duration.ofSeconds(2));
 }

So i will have a series of tasks that accomplish various things but the response from each task is dependent on the previous and is stored in the Orchestration Context. Anytime an error is occurred, the orchestration context flag will be set to false and the flux should stop.

6
  • Could you give an example of your blocking code at the mo? I can't quite picture how it works from your description. Commented Mar 5, 2021 at 8:59
  • @MichaelBerry updated the post. Let me know if you still need more information. Thanks. Commented Mar 6, 2021 at 4:56
  • Just to be clear - what here needs to become reactive? You mention task.run() returns a mono, but is that it - or will app.getEligibleTasks(), ContextBuilder.getCtx() and evaluateResult() also need to avoid blocking operations by returning a reactive publisher? Also, the method currently seems to always return SUCCESS, but will only execute evaluateResult() on each StepContext until one returns false. Is that the behaviour you'd need to emulate in a reactive implementation? Commented Mar 6, 2021 at 23:37
  • @michaelberry other methods do not have blocking operations. Only run returns Mono and it returns the status based on the tasks written in run. The status success is for this method but it will stop executing next tasks if evaluateResult says false. Commented Mar 8, 2021 at 4:52
  • @MichaelBerry any idea? Commented Mar 9, 2021 at 6:26

1 Answer 1

4

Sure, we can:

  • Create the flux from the task list (if it's appropriate to generate the task list reactively then you can replace that arraylist with the flux directly, if not then keep as-is);
  • flatMap() each task to your task.run() method (which as per the question now returns a Mono;
  • Ensure we only consume elements while evaluateResult() is true;
  • ...then finally just return the SUCCESS status as before.

So putting all that together, just replace your loop & return statement with:

Flux.fromIterable(tasks)
        .flatMap(task -> task.run(ctx))
        .takeWhile(stepContext -> evaluateResult(stepContext))
        .then(Mono.just(Status.SUCCESS));

(Since we've made it reactive, your method will obviously need to return a Mono<Status> rather than just Status too.)

Update as per the comment - if you just want this to execute "one at a time" rather than with multiple concurrently, you can use concatMap() instead of flatMap().

Sign up to request clarification or add additional context in comments.

7 Comments

the generation of task list is merely collecting beans of a class type. I believe that should be fine correct?
@PavanKumar Yup, should be unless it's getting them through some weird blocking operation.
Also, the evaluateResult method when returned false, should terminate the flux and return the orchestration context until that point. Is there an elegant way that I can do it?
Tasks in the iterable have dependency on each other. Right now, these tasks are triggered in parallel and if there is an IO call ( Webclient for ex), the next task is executed before the response for previous task is returned. Any Idea on how we can make sure that each task gets executed one after the other? After getting the response that is.
@PavanKumar I'm having a slow day - you're correct, concatMap() is exactly the right operator to use for this usecase.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.