27

I notice a strange behavior when trying to prematurely cancel from a Flow. Take a look at the following example.

This is a simple flow that emits integer values

  private fun createFlow() = flow {
        repeat(10000) {
            emit(it)
        }
    }

Then I call the createFlow function using this code

  CoroutineScope(Dispatchers.Main).launch {
            createFlow().collect {

                Log.i("Main", "$it isActive $isActive")
                if (it == 2) {
                    cancel()
                }
            }
        }

This is what is printed out

0 isActive true
1 isActive true
2 isActive true
3 isActive false
4 isActive false
etc...etc

Now I would expect that the flow should stop emitting integers once it reaches the value of 2 but instead it actually switches the isActive flag to false and keeps emitting without otherwise stopping.

When I add a delay between emissions the flow behaves as I would expect.

private fun createFlow() = flow {
    repeat(10000) {
        delay(500) //add a delay
        emit(it)
    }
}

This is what is printed out after calling the flow again (which is the expected behaviour).

0 isActive true
1 isActive true
2 isActive true

What can I do to cancel the flow emission exactly at the specified value without adding delay?

1
  • This open issue seems to describe the same thing, and I think this issue tracks the progress of this fix. Commented Jan 10, 2020 at 11:20

3 Answers 3

18

I came across a workaround in this related issue

I have replaced every single collect with a safeCollect function in my project:

/**
 * Only proceed with the given action if the coroutine has not been cancelled.
 * Necessary because Flow.collect receives items even after coroutine was cancelled
 * https://github.com/Kotlin/kotlinx.coroutines/issues/1265
 */
suspend inline fun <T> Flow<T>.safeCollect(crossinline action: suspend (T) -> Unit) {
  collect {
    coroutineContext.ensureActive()
    action(it)
  }
}
Sign up to request clarification or add additional context in comments.

1 Comment

I believe you can add .cancellable() to your Flow: > This operator provides a shortcut for .onEach { currentCoroutineContext().ensureActive() }.
14

I want to add that in 1.3.7 version emissions from flow builder now check cancellation status and are properly cancellable. So the code in question will work as expected

Comments

1

I came up with this recently

it seems that it will only actually cancel if it reaches a suspending point and in your code that emits there is no such point

to solve this either add yield() between emissions or some other suspending function like delay(100)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.