75

I want to implement timer using Kotlin coroutines, something similar to this implemented with RxJava:

       Flowable.interval(0, 5, TimeUnit.SECONDS)
                    .observeOn(AndroidSchedulers.mainThread())
                    .map { LocalDateTime.now() }
                    .distinctUntilChanged { old, new ->
                        old.minute == new.minute
                    }
                    .subscribe {
                        setDateTime(it)
                    }

It will emit LocalDateTime every new minute.

2
  • 3
    I think you can use ticker channels: kotlinlang.org/docs/reference/coroutines/… Commented Feb 22, 2019 at 12:51
  • 3
    @marstran Not anymore they are obsolete now. Commented Dec 23, 2021 at 17:36

13 Answers 13

109

Edit: note that the API suggested in the original answer is now marked @ObsoleteCoroutineApi:

Ticker channels are not currently integrated with structured concurrency and their api will change in the future.

You can now use the Flow API to create your own ticker flow:

import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun tickerFlow(period: Duration, initialDelay: Duration = Duration.ZERO) = flow {
    delay(initialDelay)
    while (true) {
        emit(Unit)
        delay(period)
    }
}

And you can use it in a way very similar to your current code:

tickerFlow(5.seconds)
    .map { LocalDateTime.now() }
    .distinctUntilChanged { old, new ->
        old.minute == new.minute
    }
    .onEach {
        setDateTime(it)
    }
    .launchIn(viewModelScope) // or lifecycleScope or other

Important note: with the code as written here, the time taken to process elements is not taken into account by tickerFlow, so the delay might not be regular (it's a delay between element processing). If you want the ticker to tick independently of the processing of each element, you may want to use a buffer or a dedicated thread (e.g. via flowOn).


Original answer

I believe it is still experimental, but you may use a TickerChannel to produce values every X millis:

val tickerChannel = ticker(delayMillis = 60_000, initialDelayMillis = 0)

repeat(10) {
    tickerChannel.receive()
    val currentTime = LocalDateTime.now()
    println(currentTime)
}

If you need to carry on doing your work while your "subscribe" does something for each "tick", you may launch a background coroutine that will read from this channel and do the thing you want:

val tickerChannel = ticker(delayMillis = 60_000, initialDelayMillis = 0)

launch {
    for (event in tickerChannel) {
        // the 'event' variable is of type Unit, so we don't really care about it
        val currentTime = LocalDateTime.now()
        println(currentTime)
    }
}

delay(1000)

// when you're done with the ticker and don't want more events
tickerChannel.cancel()

If you want to stop from inside the loop, you can simply break out of it, and then cancel the channel:

val ticker = ticker(500, 0)

var count = 0

for (event in ticker) {
    count++
    if (count == 4) {
        break
    } else {
        println(count)
    }
}

ticker.cancel()
Sign up to request clarification or add additional context in comments.

7 Comments

Is there a way to "uncancel" a ticker? How can I pause/unpause the ticker?
@Lifes you probably need to have some kind of "active" state variable to check when you receive a tick. You can set it to false when you want to "pause" and back to true when you want to "resume"
Thanks for the fast reply. Given my use case, I don't want it to keep ticking, so I'm going to cancel and recreate it as needed.
ticker is marked as "ObsoleteCoroutinesApi" on version "1.3.2", witch means: "Marks declarations that are obsolete in coroutines API, which means that the design of the corresponding declarations has serious known flaws and they will be redesigned in the future. Roughly speaking, these declarations will be deprecated in the future but there is no replacement for them yet, so they cannot be deprecated right away."
Can someone please elaborate on the buffer and/or flowOn suggested solution for creating an overall fixed delay, including processing time?
|
39

A very pragmatic approach with Kotlin Flows could be:

// Create the timer flow
val timer = (0..Int.MAX_VALUE)
    .asSequence()
    .asFlow()
    .onEach { delay(1_000) } // specify delay

// Consume it
timer.collect { 
    println("bling: ${it}")
}

Be aware that this solution times out at Int.MAX_VALUE (2,147,483,647), so it might not be the right solution for very short interval values, or extreme long running timer values. Consider Long.MAX_VALUE in that case (see comments).

11 Comments

How to be notified when ends?
Make sure to import the flow using: import kotlinx.coroutines.flow.collect
Why are we using here asSequence() function?
@Hassa to be the sequence of Ints being lazily created. Otherwise, all Ints from 0 .. Int.MAX_VALUE would be loaded in memory immediately, which you probably would not want.
@SteffenFunke may you please elaborate on how can we reset the timer flow? if we need to reset the timer onclick of a button so that it starts emitting values from 0 again.
|
21

another possible solution as a reusable kotlin extension of CoroutineScope

fun CoroutineScope.launchPeriodicAsync(
    repeatMillis: Long,
    action: () -> Unit
) = this.async {
    if (repeatMillis > 0) {
        while (isActive) {
            action()
            delay(repeatMillis)
        }
    } else {
        action()
    }
}

and then usage as:

var job = CoroutineScope(Dispatchers.IO).launchPeriodicAsync(100) {
  //...
}

and then to interrupt it:

job.cancel()

another note: we consider here that action is non-blocking and does not take time.

8 Comments

It doesn't matter much here thanks to the delay() call, but in general we should avoid while (true) in coroutines, prefer while(isActive) to properly support cancellation.
@Joffrey this is just an example, feel free to modify it for the better.
What is the reason for using async() instead of launch() ?
@Phileo99 I think you could do it either way, but if you use Async it returns a Deferred<T> which gives you a few more options than a launch {}, such as await(). Not sure that'd be all that useful in this case, but I don't think it adds much overhead. Deferred extends Job, so anything that launch can do async can also do.
Keep in mind that the interval between subsequent action() calls is not the defined repeatMillis time, but repeatMillis + the time that action() takes to execute. So this solution is fine as long as action() doesn't take too long. By using flows with buffer(), conflate(), or flowOn, we can get intervals that are about constant.
|
11

You can create a countdown timer like this

GlobalScope.launch(Dispatchers.Main) {
            val totalSeconds = TimeUnit.MINUTES.toSeconds(2)
            val tickSeconds = 1
            for (second in totalSeconds downTo tickSeconds) {
                val time = String.format("%02d:%02d",
                    TimeUnit.SECONDS.toMinutes(second),
                    second - TimeUnit.MINUTES.toSeconds(TimeUnit.SECONDS.toMinutes(second))
                )
                timerTextView?.text = time
                delay(1000)
            }
            timerTextView?.text = "Done!"
        }

4 Comments

Use lifecycleScope instead to avoid leaking the Fragment or Activity.
Good solution, but I don't agree with GlobalScope. viewModelScope or lifecycleScope is much more preferablee
I just want to mention that this solution is not 100% exact. The countdown will run a bit longer than 120 seconds because the date formatting and setting the text on the TextView will also take some time. I guess in most use cases this won't be an issue, otherwise you should stick to the flow{} solution (combined with buffer(), conflate() or flowOn)
Is it a good idea to use main dispatcher for non UI relevant processing like timer?
5

Here's a possible solution using Kotlin Flow

fun tickFlow(millis: Long) = callbackFlow<Int> {
    val timer = Timer()
    var time = 0
    timer.scheduleAtFixedRate(
        object : TimerTask() {
            override fun run() {
                try { offer(time) } catch (e: Exception) {}
                time += 1
            }
        },
        0,
        millis)
    awaitClose {
        timer.cancel()
    }
}

Usage

val job = CoroutineScope(Dispatchers.Main).launch {
   tickFlow(125L).collect {
      print(it)
   }
}

...

job.cancel()

5 Comments

You are wrapping Timer with coroutines, why?! This simply makes no sense; either use timer or coroutines
It could be useful for example in a view model that has its scope like CoroutineScope(Dispatchers.Main + viewModelJob). If you need to perform a network check periodically you could launch the tick coroutine using that scope and with all the other coroutines (such as network requests or database queries) and then cancel the viewModelJob once for all. By the way, if it is useless for you no problem, it's fair.
Just to be clear canceling coroutine won't do anything to the Timer you have to make your flow cancellable(). Yet, even if you make your flow cancellable() canceling your flow and job won't stop the timer from "ticking". On top of that Timer is already using another thread I don't really get the reason for wrapping it with flow.
I confirm that with the above code the tick stops on job.cancel(). Just used it on a real case app in a Fragment.
@Farid I have an use-case: 1) I already have code that has coroutine 2) delay is not accurate enough. I ended up using this solution (and for accuracy I used ScheduledExecutorService instead of Timer)
4

Edit: Joffrey has edited his solution with a better approach.

Old :

Joffrey's solution works for me but I ran into a problem with the for loop.

I have to cancel my ticker in the for loop like this :

            val ticker = ticker(500, 0)
            for (event in ticker) {
                if (...) {
                    ticker.cancel()
                } else {
                    ...
                    }
                }
            }

But ticker.cancel() was throwing a cancellationException because the for loop kept going after this.

I had to use a while loop to check if the channel was not closed to not get this exception.

                val ticker = ticker(500, 0)
                while (!ticker.isClosedForReceive && ticker.iterator().hasNext()) {
                    if (...) {
                        ticker.cancel()
                    } else {
                        ...
                        }
                    }
                }

4 Comments

Why don't you just break out of the loop if you know you want it to stop? You can then cancel the ticker outside of the loop, this worked fine for me. Also, you're creating a new iterator at each loop turn with this approach, this may not be what you want to do.
Sometimes we don't think of the simplest solutions... You are absolutely right, thanks!
No problem :) That being said, I didn't expect cancel() to fail when called from within the loop, so you taught me something on this one. I will need to investigate further to get to the bottom of this.
Well with the coroutines's version 1.2.2 it didn't failed! But I upgraded to the version 1.3.2 and now it does. Maybe it was supposed to fail with the 1.2.2 and they fixed it or it's a bug introduced...
3

Timer with START, PAUSE and STOP functions.

Usage:

val timer = Timer(millisInFuture = 10_000L, runAtStart = false)
timer.start()

Timer class:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow

enum class PlayerMode {
    PLAYING,
    PAUSED,
    STOPPED
}

class Timer(
    val millisInFuture: Long,
    val countDownInterval: Long = 1000L,
    runAtStart: Boolean = false,
    val onFinish: (() -> Unit)? = null,
    val onTick: ((Long) -> Unit)? = null
) {
    private var job: Job = Job()
    private val _tick = MutableStateFlow(0L)
    val tick = _tick.asStateFlow()
    private val _playerMode = MutableStateFlow(PlayerMode.STOPPED)
    val playerMode = _playerMode.asStateFlow()

    private val scope = CoroutineScope(Dispatchers.Default)

    init {
        if (runAtStart) start()
    }

    fun start() {
        if (_tick.value == 0L) _tick.value = millisInFuture
        job.cancel()
        job = scope.launch(Dispatchers.IO) {
            _playerMode.value = PlayerMode.PLAYING
            while (isActive) {
                if (_tick.value <= 0) {
                    job.cancel()
                    onFinish?.invoke()
                    _playerMode.value = PlayerMode.STOPPED
                    return@launch
                }
                delay(timeMillis = countDownInterval)
                _tick.value -= countDownInterval
                onTick?.invoke(this@Timer._tick.value)
            }
        }
    }

    fun pause() {
        job.cancel()
        _playerMode.value = PlayerMode.PAUSED
    }

    fun stop() {
        job.cancel()
        _tick.value = 0
        _playerMode.value = PlayerMode.STOPPED
    }
}

I took inspiration from here.

1 Comment

Why do you switch to dispatcher io?
2

Here is Flow version of Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS) based on Joffrey's answer:

fun tickerFlow(start: Long,
               count: Long,
               initialDelayMs: Long,
               periodMs: Long) = flow<Long> {
    delay(initialDelayMs)

    var counter = start
    while (counter <= count) {
        emit(counter)
        counter += 1

        delay(periodMs)
    }
}

//...

tickerFlow(1, 5, 0, 1_000L)

Comments

0

Made a copy of Observable.intervalRange(0, 90, 0, 1, TimeUnit.SECONDS) ( will emit item in 90 sec each 1 sec ):

fun intervalRange(start: Long, count: Long, initialDelay: Long = 0, period: Long, unit: TimeUnit): Flow<Long> {
        return flow<Long> {
            require(count >= 0) { "count >= 0 required but it was $count" }
            require(initialDelay >= 0) { "initialDelay >= 0 required but it was $initialDelay" }
            require(period > 0) { "period > 0 required but it was $period" }

            val end = start + (count - 1)
            require(!(start > 0 && end < 0)) { "Overflow! start + count is bigger than Long.MAX_VALUE" }

            if (initialDelay > 0) {
                delay(unit.toMillis(initialDelay))
            }

            var counter = start
            while (counter <= count) {
                emit(counter)
                counter += 1

                delay(unit.toMillis(period))
            }
        }
    }

Usage:

lifecycleScope.launch {
intervalRange(0, 90, 0, 1, TimeUnit.SECONDS)
                .onEach {
                    Log.d(TAG, "intervalRange: ${90 - it}")
                }
                .lastOrNull()
}

Comments

0

Used this recently to chunk values based on a timer and max buffer size.

private object Tick

@Suppress("UNCHECKED_CAST")
fun <T : Any> Flow<T>.chunked(size: Int, initialDelay: Long, delay: Long): Flow<List<T>> = flow {
    if (size <= 0) throw IllegalArgumentException("invalid chunk size $size - expected > 0")
    val chunkedList = mutableListOf<T>()
    if (delay > 0L) {
        merge(this@chunked, timerFlow(initialDelay, delay, Tick))
    } else {
        this@chunked
    }
        .collect {
            when (it) {
                is Tick -> {
                    if (chunkedList.isNotEmpty()) {
                        emit(chunkedList.toList())
                        chunkedList.clear()
                    }
                }
                else -> {
                    chunkedList.add(it as T)
                    if (chunkedList.size >= size) {
                        emit(chunkedList.toList())
                        chunkedList.clear()
                    }
                }
            }
        }
    if (chunkedList.isNotEmpty()) {
        emit(chunkedList.toList())
    }
}

fun <T> timerFlow(initialDelay: Long, delay: Long, o: T) = flow {
    if (delay <= 0) throw IllegalArgumentException("invalid delay $delay - expected > 0")
    if (initialDelay > 0) delay(initialDelay)
    while (currentCoroutineContext().isActive) {
        emit(o)
        delay(delay)
    }
}

Comments

0

You could do something like this...

class JitterTimer {
    fun scheduleAtRandom(
        initialDelay : Duration = 2.seconds,
        base: Duration = 3.seconds,
        interval: Duration = 0.5.seconds
    ) = flow {
        delay(initialDelay)
        while (true) {
            emit(Unit)
            val jitter = randomizeJitter(
                baseDelay = base.inWholeMilliseconds,
                interval = interval.inWholeMilliseconds
            )
            delay(jitter)
        }
    }.cancellable()
}

//Usage

timerJob?.cancel()
JitterTimer()
 .scheduleAtRandom(10.seconds, 3.seconds)
 .onEach {
    //Do some work
  }
 .flowOn(dispatcher)
 .catch { it.remoteLog("Recovery", "Attempts") }
 .launchIn(scope)
 .apply { timerJob = this }

Comments

-1

It's not using Kotlin coroutines, but if your use case is simple enough you can always just use something like a fixedRateTimer or timer (docs here) which resolve to JVM native Timer.

I was using RxJava's interval for a relatively simple scenario and when I switched to using Timers I saw significant performance and memory improvements.

You can also run your code on the main thread on Android by using View.post() or it's mutliple variants.

The only real annoyance is you'll need to keep track of the old time's state yourself instead of relying on RxJava to do it for you.

But this will always be much faster (important if you're doing performance critical stuff like UI animations etc) and will not have the memory overhead of RxJava's Flowables.

Here's the question's code using a fixedRateTimer:


var currentTime: LocalDateTime = LocalDateTime.now()

fixedRateTimer(period = 5000L) {
    val newTime = LocalDateTime.now()
    if (currentTime.minute != newTime.minute) {
        post { // post the below code to the UI thread to update UI stuff
            setDateTime(newTime)
        }
        currentTime = newTime
    }
}

Comments

-1

enter image description here

enter code here
private val updateLiveShowTicker = flow {
    while (true) {
        emit(Unit)
        delay(1000L * UPDATE_PROGRAM_INFO_INTERVAL_SECONDS)
    }
}

private val updateShowProgressTicker = flow {
    while (true) {
        emit(Unit)
        delay(1000L * UPDATE_SHOW_PROGRESS_INTERVAL_SECONDS)
    }
}

private val liveShow = updateLiveShowTicker
    .combine(channelId) { _, channelId -> programInfoRepository.getShow(channelId) }
    .catch { emit(LiveShow(application.getString(R.string.activity_channel_detail_info_error))) }
    .shareIn(viewModelScope, SharingStarted.WhileSubscribed(), replay = 1)
    .distinctUntilChanged()

My solution,You can now use the Flow API to create your own ticker flow:

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.