3

I have a flow (MutableSharedFlow, if it's relevant) and I have potentially expensive operation that I would like to execute asynchronously, while still maintaining the order. I achieved what I wanted using CompletableFuture:

private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)

fun process(flow: Flow<String>) = flow
    .map { CompletableFuture.supplyAsync({ expensiveHandle(it) }, threadPool) }
    .buffer(threadPoolSize)
    .map { it.get() } // block current thread
    .flowOn(threadPool.asCoroutineDispatcher())

Thanks to combination of offloading to thread pool, fixed size buffer and thread blocking CompletableFuture#get, this code works to my expectations - up to threadPoolSize events are processed in parallel, and emitted to the flow in the order they were received.

When I replace CompletableFuture#get with extension function CompletableFuture#await from kotlinx.coroutines.future and use flow or async instead of CompletableFuture#supplyAsync, the messages are no longer processed in parallel:

fun process(flow: Flow<String>) = flow
    .map { 
        runBlocking {
            future { expensiveHandle(it) } // same behaviour with async {...}
        }
    }
    .buffer(threadPoolSize)
    .map { it.await() }
    .flowOn(threadPool.asCoroutineDispatcher())

Can I do equivalent code using coroutines/suspending functions?

5
  • I don't think buffer is doing what you described. It creates a separate coroutine for the code below it, but it is not running the first map call in parallel. It's running the below .map function in parallel with the first .map call on later items in the flow. Since the first .map call is non-blocking, there's not much gained by buffering. Commented Mar 30, 2021 at 12:57
  • Aha, I see, thank for pointing it out - in that case the parallelism comes solely from offloading the work to the thread pool, right @Tenfour04? Commented Mar 30, 2021 at 12:59
  • I've not ever mixed Java concurrency with Kotlin coroutines, so I don't want to guess without testing and tell you something wrong. But I think so. The CompletableFutures automatically start running when you create them, so they're all running in parallel. You can get similar behavior in a suspending way using async. Commented Mar 30, 2021 at 13:08
  • That looks right to me except for passing a parameter into the buffer call. There only needs to be a buffer of 2, which I think is typically the default. Commented Mar 30, 2021 at 13:13
  • So after removing the buffer and using 2 threads in the pool instead of 5, I didn't get to run the expensiveHandle on both thread, but only one. I guess the other one was blocked (perhaps in the await?) as the pool is also used in the flowOn. Also, I think I found the way to do the desired think - see my edit. Thank you for your help! Commented Mar 30, 2021 at 13:13

3 Answers 3

2

async as well as future are extension functions of CoroutineScope. So, you need some CoroutineScope to call them.

runBlocking gives some CoroutineScope, but it's a blocking call, so its usage in suspend functions is prohibited.

You may go with GlobalScope.async, but it's also not recommended and execution would be dispatched by Dispatchers.Default, not by threadPool.asCoroutineDispatcher() as in original example with CompletableFuture.

coroutineScope and withContext functions will provide CoroutineScope, which inherits its coroutineContext from the outer scope, so flow processing will be suspended with immediately executed expensiveHandle(it) coroutine.

You need to create CoroutineScope with factory function, so that coroutines contexts won't mix:

fun process(flow: Flow<String>, threadPool: ThreadPoolExecutor): Flow<String> {
    val dispatcher = threadPool.asCoroutineDispatcher()
    return flow
        .map { CoroutineScope(dispatcher).async { expensiveHandle(it) } }
        .buffer(threadPool.poolSize)
        .map { it.await() }
        .flowOn(dispatcher)
}
Sign up to request clarification or add additional context in comments.

3 Comments

If expensiveHandle() varies how long it takes to execute (e.g. making a RPC), then you could end up awaiting for the next value to complete even though there's a later value that has already finished. For example, if you have a buffer of size 2 and the expensiveHandle calls take this long: call 1: 4s; call 2: 1s; call 3: 1s. After 1s, call 2 will finish and you'd want call 3 to start, but it won't because you're still waiting for call 1 to finish. Is there a way to await any value from the contents of buffer?
@MageWind, Flows are sequential by its nature. You need to wrap it.await() into another flow and then flatten the result, so .map { it.await() } need to be replaced with .flatMapMerge { flow { emit(it.await()) } } (see kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/…)
I arrived at a similar solution myself. It was something like: .flatMapMerge { flow { emit(expensiveHandle()) } }
1

So the problem wasn't the future itself, but the surrounding runBlocking. When using custom CoroutineScope with the thread pool as underlying dispatcher, the code is working as expected (mind the change of get to await, and also I used async instead of future as it's in the core coroutine library):

private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)
private val dispatcher = threadPool.asCoroutineDispatcher()
private val scope = CoroutineScope(dispatcher)

fun process(flow: Flow<String>) = flow
    .map { scope.async(expensiveHandle(it)) }
    .buffer(threadPoolSize)
    .map { it.await() }
    .flowOn(dispatcher)

Comments

0

Instead of mapping the flow passed as argument, try returning a new flow built with the callbackFlow builder and collect the flow inside, so you can launch several coroutines to call expensiveHandle(it) and send their respective results asap.

fun process(flow: Flow<String>) = callbackFlow {
        flow.collect {
            launch {
                send(expensiveHandle(it))
            }
        }
    }.flowOn(threadPool.asCoroutineDispatcher())

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.