I have a flow (MutableSharedFlow, if it's relevant) and I have potentially expensive operation that I would like to execute asynchronously, while still maintaining the order. I achieved what I wanted using CompletableFuture:
private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)
fun process(flow: Flow<String>) = flow
.map { CompletableFuture.supplyAsync({ expensiveHandle(it) }, threadPool) }
.buffer(threadPoolSize)
.map { it.get() } // block current thread
.flowOn(threadPool.asCoroutineDispatcher())
Thanks to combination of offloading to thread pool, fixed size buffer and thread blocking CompletableFuture#get, this code works to my expectations - up to threadPoolSize events are processed in parallel, and emitted to the flow in the order they were received.
When I replace CompletableFuture#get with extension function CompletableFuture#await from kotlinx.coroutines.future and use flow or async instead of CompletableFuture#supplyAsync, the messages are no longer processed in parallel:
fun process(flow: Flow<String>) = flow
.map {
runBlocking {
future { expensiveHandle(it) } // same behaviour with async {...}
}
}
.buffer(threadPoolSize)
.map { it.await() }
.flowOn(threadPool.asCoroutineDispatcher())
Can I do equivalent code using coroutines/suspending functions?
bufferis doing what you described. It creates a separate coroutine for the code below it, but it is not running the firstmapcall in parallel. It's running the below.mapfunction in parallel with the first.mapcall on later items in the flow. Since the first.mapcall is non-blocking, there's not much gained by buffering.async.buffercall. There only needs to be a buffer of 2, which I think is typically the default.