6

I have an instance of CoroutineScope and log() function which look like the following:

private val scope = CoroutineScope(Dispatchers.IO)

fun log(message: String) = scope.launch { // launching a coroutine
    println("$message")
    TimeUnit.MILLISECONDS.sleep(100) // some blocking operation
}

And I use this test code to launch coroutines:

repeat(5) { item ->
    log("Log $item")
}

The log() function can be called from any place, in any Thread, but not from a coroutine.

After a couple of tests I can see not sequential result like the following:

Log 0
Log 2
Log 4
Log 1
Log 3

There can be different order of printed logs. If I understand correctly the execution of coroutines doesn't guarantee to be sequential. What it means is that a coroutine for item 2 can be launched before the coroutine for item 0.

I want that coroutines were launched sequentially for each item and "some blocking operation" would execute sequentially, to always achieve next logs:

Log 0
Log 1
Log 2
Log 3
Log 4

Is there a way to make launching coroutines sequential? Or maybe there are other ways to achieve what I want?

Thanks in advance for any help!

11
  • Does this answer your question? Kotlin Coroutines sequential execution Commented May 7, 2022 at 12:35
  • Thanks @TylerV. I wonder if there are alternative ways to accomplish this without using Mutex. Commented May 7, 2022 at 12:47
  • Look at the other answer too, using join. Could call log("foo").join() Commented May 7, 2022 at 12:48
  • @TylerV The other answer assumes to call join in a coroutine, it means I need to call log() function in a coroutine, but this is not what I want. I want it to be called from any place without launching an outside coroutine. Commented May 7, 2022 at 12:52
  • I don't think there is any other way, you have to have log(2) wait until log(1) is done, and waiting must happen in a coroutine to keep from hanging, so the wait can either be inside log, with a lock, or outside log in a higher level coroutine. Commented May 7, 2022 at 12:55

2 Answers 2

6

One possible strategy is to use a Channel to join the launched jobs in order. You need to launch the jobs lazily so they don't start until join is called on them. trySend always succeeds when the Channel has unlimited capacity. You need to use trySend so it can be called from outside a coroutine.

private val lazyJobChannel = Channel<Job>(capacity = Channel.UNLIMITED).apply {
    scope.launch {
        consumeEach { it.join() }
    }
}

fun log(message: String) {
    lazyJobChannel.trySend(
        scope.launch(start = CoroutineStart.LAZY) {
            println("$message")
            TimeUnit.MILLISECONDS.sleep(100) // some blocking operation
        }
    )
}
Sign up to request clarification or add additional context in comments.

Comments

1

Since Flows are sequential we can use MutableSharedFlow to collect and handle data sequentially:

class Info {
    // make sure replay(in case some jobs were emitted before sharedFlow is being collected and could be lost)
    // and extraBufferCapacity are large enough to handle all the jobs. 
    // In case some jobs are lost try to increase either of the values.
    private val sharedFlow = MutableSharedFlow<String>(replay = 10, extraBufferCapacity = 10)
    private val scope = CoroutineScope(Dispatchers.IO)

    init {
        sharedFlow.onEach { message ->
            println("$message")
            TimeUnit.MILLISECONDS.sleep(100) // some blocking or suspend operation
        }.launchIn(scope)
    }

    fun log(message: String) {
        sharedFlow.tryEmit(message) 
    }
}

fun test() {

    val info = Info()

    repeat(10) { item ->
        info.log("Log $item")
    }
}

It always prints the logs in the correct order:

Log 0
Log 1
Log 2
...
Log 9

It works for all cases, but need to be sure there are enough elements set to replay and extraBufferCapacity parameters of MutableSharedFlow to handle all items.


Another approach is

Using Dispatchers.IO.limitedParallelism(1) as a context for the CoroutineScope. It makes coroutines run sequentially if they don't contain calls to suspend functions and launched from the same Thread, e.g. Main Thread. So this solution works only with blocking (not suspend) operation inside launch coroutine builder:

private val scope = CoroutineScope(Dispatchers.IO.limitedParallelism(1))

fun log(message: String) = scope.launch { // launching a coroutine from the same Thread, e.g. Main Thread
    println("$message")
    TimeUnit.MILLISECONDS.sleep(100) // only blocking operation, not `suspend` operation
}

It turns out that the single thread dispatcher is a FIFO executor. So limiting the CoroutineScope execution to one thread solves the problem.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.