2

I am setting up a Spring Boot application (DAO pattern with @Repositories) where I am attempting to write a @Service to asynchronously pull data from a database in multiple threads and merge-process the incoming payloads sequentially, preferably on arrival.

The goal is to utilize parallel database access for requests where multiple non-overlapping sets of filter conditions need to be queried individually, but post-processed (transformed, e.g. aggregated) into a combined result.

Being rather new to Java, and coming from Golang and its comparably trivial syntax for multi-threading and task-communication, I struggle to identify a preferable API in Java and Spring Boot - or determine if this approach is even favorable to begin with.


Question:

Given

  • a Controller:

    @RestController
    @RequestMapping("/api")
    public class MyController {
    
      private final MyService myService;
    
      @Autowired
      public MyController(MyService myService) {
          this.myService = myService;
      }
    
      @PostMapping("/processing")
      public DeferredResult<MyResult> myHandler(@RequestBody MyRequest myRequest) {
          DeferredResult<MyResult> myDeferredResult = new DeferredResult<>();
          myService.myProcessing(myRequest, myDeferredResult);
    
          return myDeferredResult;
    }
    
  • a Service:

    import com.acme.parallel.util.MyDataTransformer
    
    @Service
    public class MyServiceImpl implementing MyService {
    
      private final MyRepository myRepository;
    
      @Autowired
      public MyService(MyRepository myRepository) {
          this.myRepository = myRepository;
      }
    
      public void myProcessing(MyRequest myRequest, MyDeferredResult myDeferredResult) {
        MyDataTransformer myDataTransformer = new MyDataTransformer();
    
        /* PLACEHOLDER CODE
        for (MyFilter myFilter : myRequest.getMyFilterList()) {
          // MyPartialResult myPartialResult = myRepository.myAsyncQuery(myFilter);
    
          // myDataTransformer.transformMyPartialResult(myPartialResult);
        }
        */
    
        myDeferredResult.setResult(myDataTransformer.getMyResult());
      }
    }
    
  • a Repository:

    @Repository
    public class MyRepository {
    
      public MyPartialResult myAsyncQuery(MyFilter myFilter) {
        // for the sake of an example
        return new MyPartialResult(myFilter, TakesSomeAmountOfTimeToQUery.TRUE);
      }
    }
    
  • as well as a MyDataTransformer helper class:

    public class MyDataTransformer {
    
      private final MyResult myResult = new MyResult();  // e.g. a Map
    
      public void transformMyPartialResult(MyPartialResult myPartialResult) {
        /* PLACEHOLDER CODE
        this.myResult.transformAndMergeIntoMe(myPartialResult);
        */
      }
    }
    

how can I implement

  • the MyService.myProcessing method asynchronously and multi-threaded, and

  • the MyDataTransformer.transformMyPartialResult method sequential/thread-safe

  • (or redesign the above)

most performantly, to merge incoming MyPartialResult into one single MyResult?


Attempts:

The easiest solution seems to be to skip the "on arrival" part, and a commonly preferred implementation might e.g. be:

public void myProcessing(MyRequest myRequest, MyDeferredResult myDeferredResult) {
  MyDataTransformer myDataTransformer = new MyDataTransformer();
  
  List<CompletableFuture<myPartialResult>> myPartialResultFutures = new ArrayList<>();

  for (MyFilter myFilter : myRequest.getMyFilterList()) {     // Stream is the way they say, but I like for
    myPartialResultFutures.add(CompletableFuture.supplyAsync(() -> myRepository.myAsyncQuery(myFilter));
  }

  myPartialResultFutures.stream()
    .map(CompletableFuture::join)
    .map(myDataTransformer::transformMyPartialResult);
      
  myDeferredResult.setResult(myDataTransformer.getMyResult());
}

However, if feasible I'd like to benefit from sequentially processing incoming payloads when they arrive, so I am currently experimenting with something like this:

public void myProcessing(MyRequest myRequest, MyDeferredResult myDeferredResult) {
  MyDataTransformer myDataTransformer = new MyDataTransformer();
  
  List<CompletableFuture<myPartialResult>> myPartialResultFutures = new ArrayList<>();

  for (MyFilter myFilter : myRequest.getMyFilterList()) {
    myPartialResultFutures.add(CompletableFuture.supplyAsync(() -> myRepository.myAsyncQuery(myFilter).thenAccept(myDataTransformer::transformMyPartialResult));
  }

  myPartialResultFutures.forEach(CompletableFuture::join);
      
  myDeferredResult.setResult(myDataTransformer.getMyResult());
}

but I don't understand if I need to implement any thread-safety protocols when calling myDataTransformer.transformMyPartialResult, and how - or if this even makes sense, performance-wise.


Update:

Based on the assumption that

  • myRepository.myAsyncQuery takes slightly varying amounts of time, and
  • myDataTransformer.transformMyPartialResult taking an ever increasing amount of time each call

implementing a thread-safe/atomic type/Object, e.g. a ConcurrentHashMap:

public class MyDataTransformer {

  private final ConcurrentMap<K, V> myResult = new ConcurrentHashMap<K, V>();

  public void transformMyPartialResult(MyPartialResult myPartialResult) {
    myPartialResult.myRows.stream()
      .map((row) -> this.myResult.merge(row[0], row[1], Integer::sum)));
  }
}

into the latter Attempt (processing "on arrival"):

public void myProcessing(MyRequest myRequest, MyDeferredResult myDeferredResult) {
  MyDataTransformer myDataTransformer = new MyDataTransformer();
  
  List<CompletableFuture<myPartialResult>> myPartialResultFutures = new ArrayList<>();

  for (MyFilter myFilter : myRequest.getMyFilterList()) {
    myPartialResultFutures.add(CompletableFuture.supplyAsync(() -> myRepository.myAsyncQuery(myFilter).thenAccept(myDataTransformer::transformMyPartialResult));
  }

  myPartialResultFutures.forEach(CompletableFuture::join);
      
  myDeferredResult.setResult(myDataTransformer.getMyResult());
}

is up to one order of magnitude faster than waiting on all threads first, even with atomicity protocol overhead.


Now, this may have been obvious (not ultimately, though, as async/multi-threaded processing is by far not always the better choice), and I am glad this approach is a valid choice.

What remains is what looks to me like a hacky, flexibility lacking solution - or at least an ugly one. Is there a better approach?

1 Answer 1

1

We want to process the query result as soon as it is available so thenApply.Here the thread querying database might immediately work on data transformation unlike in approaches mentioned, and finally the partailResult of the query could be merged in one reduce operation.

    public static void service() {
        //myRequest.getMyFilterList()
        List<Integer> inputList = Arrays.asList(1, 2, 3);

        List<CompletableFuture<PartialResultTransformed>> partialProcessedResult = inputList.stream()
                .map(input -> CompletableFuture.supplyAsync(() -> repository.query(input)))
                .map(future -> future.thenApply(queryResult -> datatransformer.transform(queryResult)))
                .collect(Collectors.toList());

        List<PartialResultTransformed> allPartialResult = partialProcessedResult.stream()
                .map(CompletableFuture::join) //todo:handle exception scenario
                .collect(Collectors.toList());

        // final transformation of partial results
        //mergePartailResults;
       /* allPartialResult.stream()
                        .reduce();*/
    }
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.