3

I have a list of employees and I want to hit the API for each of them. In synchronous mode it takes a lot of time and I want to improve the performance with the coroutines. This is what I've done so far:

fun perform() = runBlocking {
    employeesSource.getEmployees()
            .map { launch { populateWithLeaveBalanceReports(it) } }
            .joinAll()
}

suspend fun populateWithLeaveBalanceReports(employee: EmployeeModel) {
        println("Start ${COUTNER}")
        val receivedReports = reportsSource.getReports(employee.employeeId) // call to a very slow API
        receivedReports { employee.addLeaveBalanceReport(it) }
        println("Finish ${COUTNER++}")
    }

When I try to run this, the code is being run synchronously and in the console I see the following output:

Start 0
Finish 0
Start 1
Finish 1
Start 2
Finish 2

which means that the calls are being done sequentially. If I replace this whole code in the populateWithLeaveBalanceReports function with delay(1000), it will work asynchronously:

Start 0
Start 0
Start 0
Finish 0
Finish 1
Finish 2

What am I doing wrong? Any ideas??

2 Answers 2

3

Coroutines don't magically turn your blocking network API into non-blocking. Use launch(Dispatchers.IO) { ... } to run blocking tasks in an elastic thread pool. Just note that this doesn't do much more than the plain-old executorService.submit(blockingTask). It's a bit more convenient because it uses a pre-constructed global thread pool.

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you for the explanation! When I use this approach, SecurityContextHolder.getContext().authentication is equal null inside the populateWithLeaveBalanceReports method. I assume it's because SecurityContext is not shared between threads? I'm having troubles calling the API as my token is stored inside the authentication object
Yes, it's a thread-local holder. It wasn't obvious from your question that you're running this inside a managed Spring app. I'm not sure what's the policy in that context on launching arbitrary threads. But I think you can work around the missing token by retrieving it into a local variable before you enter the launch block.
0

These lines might be using a blocking code - the code that relies on blocking threads to wait for task completion.

val receivedReports = reportsSource.getReports(employee.employeeId) receivedReports { employee.addLeaveBalanceReport(it) }

It is likely that you are using non asynchronous http client or jdbc driver under the hood of reportsSource.getReports call .

If so, you should either

  1. rewrite the code of reportsSource.getReports so it would not rely on any blocking code. This is the new / non-blocking / challenging way
  2. use a threadPool executor to distribute executions manually instead of using coroutines. This is the old / simple way.

2 Comments

Yes, indeed, I'm using webflux's WebClient in order to hit the API. But I'm using it in non-reactive way with the block() method. Which http client can be used instead? Fuel library maybe??
Why don't you continue using WebClient but start using it in a non-blocking way? Spring has built-in support for Kotlin coroutines.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.