如何使用Kotlin协程实现计时器

68

我想使用Kotlin协程实现计时器,类似于使用RxJava实现的:

       Flowable.interval(0, 5, TimeUnit.SECONDS)
                    .observeOn(AndroidSchedulers.mainThread())
                    .map { LocalDateTime.now() }
                    .distinctUntilChanged { old, new ->
                        old.minute == new.minute
                    }
                    .subscribe {
                        setDateTime(it)
                    }

它会在每一分钟更新时发出LocalDateTime。


3
我认为你可以使用计时器通道:https://kotlinlang.org/docs/reference/coroutines/channels.html#ticker-channels - marstran
2
@marstran 不再了,它们现在已经过时了。 - Farid
13个回答

94

编辑: 请注意,原回答中建议的API现在已被标记为@ObsoleteCoroutineApi:

目前Ticker通道尚未与结构化并发集成,其API将来会有所改变。

您现在可以使用Flow API创建自己的Ticker流:

import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun tickerFlow(period: Duration, initialDelay: Duration = Duration.ZERO) = flow {
    delay(initialDelay)
    while (true) {
        emit(Unit)
        delay(period)
    }
}

您可以以与当前代码非常相似的方式使用它:

tickerFlow(5.seconds)
    .map { LocalDateTime.now() }
    .distinctUntilChanged { old, new ->
        old.minute == new.minute
    }
    .onEach {
        setDateTime(it)
    }
    .launchIn(viewModelScope) // or lifecycleScope or other

重要提示: 使用此处编写的代码时,tickerFlow 不会考虑处理元素所花费的时间,因此延迟可能不是固定的(这是元素处理之间的延迟)。如果您希望滚动新闻独立于每个元素的处理而运行,您可以使用buffer 或专用线程(例如通过 flowOn)。


原始回答

我认为它仍处于实验阶段,但您可以使用TickerChannel每隔 X 毫秒生成一个值:

val tickerChannel = ticker(delayMillis = 60_000, initialDelayMillis = 0)

repeat(10) {
    tickerChannel.receive()
    val currentTime = LocalDateTime.now()
    println(currentTime)
}

如果您需要在“tick”每次执行某些操作时继续工作,您可以启动一个后台协程,该协程将从此通道读取并执行您想要的操作:

val tickerChannel = ticker(delayMillis = 60_000, initialDelayMillis = 0)

launch {
    for (event in tickerChannel) {
        // the 'event' variable is of type Unit, so we don't really care about it
        val currentTime = LocalDateTime.now()
        println(currentTime)
    }
}

delay(1000)

// when you're done with the ticker and don't want more events
tickerChannel.cancel()

如果你想在循环内部停止,你可以简单地跳出循环,然后取消通道:

val ticker = ticker(500, 0)

var count = 0

for (event in ticker) {
    count++
    if (count == 4) {
        break
    } else {
        println(count)
    }
}

ticker.cancel()

有没有一种方法可以“取消取消”一个ticker?我如何暂停/恢复ticker? - Lifes
@Lifes,你可能需要一个“活动”状态变量来检查何时接收到一个tick。当你想要“暂停”时,可以将其设置为false,当你想要“恢复”时,将其设置回true。 - Joffrey
感谢您的快速回复。考虑到我的使用情况,我不希望它一直在运行,所以我会根据需要取消并重新创建它。 - Lifes
13
版本为"1.3.2"的ticker被标记为"ObsoleteCoroutinesApi",这意味着:在协程API中,标记了已过时的声明,这意味着相应声明的设计存在严重已知缺陷,并将在未来重新设计。简单来说,这些声明将来会被弃用,但目前还没有新的替代品,因此它们不能立即被弃用。 - aLx
@Shaked,你可以使用 tickerFlow(5.seconds).buffer(1).collect { doStuff() }(或在 tickerFlow() 的定义中添加 .buffer(1))来缓存下一个 ticker 的发射,同时等待处理时间。 - Joffrey
显示剩余2条评论

38
一个非常实用的方法是使用Kotlin Flows:
// Create the timer flow
val timer = (0..Int.MAX_VALUE)
    .asSequence()
    .asFlow()
    .onEach { delay(1_000) } // specify delay

// Consume it
timer.collect { 
    println("bling: ${it}")
}


请注意,此解决方案在 Int.MAX_VALUE2,147,483,647)时会超时,因此对于非常短的时间间隔值或极长的计时器值可能不是正确的解决方案。在这种情况下,请考虑使用 Long.MAX_VALUE(参见注释)。

4
如何在结束时收到通知? - Skizo-ozᴉʞS ツ
2
@Hassa 应该是被惰性创建的整数序列。否则,从0到Int.MAX_VALUE的所有整数将立即加载到内存中,这可能不是您想要的。 - Steffen Funke
如果计时器达到Int.MAX_VALUE会怎么样?它就结束了吗?:) 你等待,然后再重新创建?哈哈,我真的不太明白这个.. - Renetik
@Renetik 也可以使用Long.MAX_VALUE作为解决方案。这应该提供足够的时间,即使是1毫秒的间隔 ;) - Steffen Funke
我已经使用了@Raphael的答案,因为对于简单计时器来说已经足够好了。这个可能更精确一些,但是我不喜欢受内部计数器限制的计时器。我觉得方法不对。谢谢。 - Renetik
显示剩余6条评论

20

另一种可能的解决方案是将其作为可重用的 Kotlin 扩展的 CoroutineScope

fun CoroutineScope.launchPeriodicAsync(
    repeatMillis: Long,
    action: () -> Unit
) = this.async {
    if (repeatMillis > 0) {
        while (isActive) {
            action()
            delay(repeatMillis)
        }
    } else {
        action()
    }
}

然后用法如下:

var job = CoroutineScope(Dispatchers.IO).launchPeriodicAsync(100) {
  //...
}

然后中断它:

job.cancel()

另一点说明:我们在此认为action是非阻塞的且不需要时间。


9
由于有delay()调用,这里并不重要,但通常我们应该避免在协程中使用while (true),而是优先使用while(isActive)以正确支持取消操作。 - Joffrey
1
@Joffrey 这只是一个例子,随意修改以使其更好。 - Raphael C
1
使用 async() 而不是 launch() 的原因是什么? - Phileo99
@Phileo99 我认为你可以两种方式都做,但如果你使用Async,它会返回一个Deferred<T>,这比launch {}给你更多的选项,例如await()。不确定在这种情况下是否有用,但我认为它并不会增加太多开销。Deferred扩展了Job,因此任何可以使用launch异步执行的操作也可以使用Deferred。 - AlexW.H.B.
3
请注意,连续的 action() 调用之间的时间间隔不是定义的 repeatMillis 时间,而是 repeatMillis 加上 action() 执行所需的时间。因此,只要 action() 不太耗时,这个解决方案就可以正常工作。通过使用带有 buffer()conflate()flowOn 的流,我们可以获得大致恒定的时间间隔。 - Lukas Lechner
显示剩余3条评论

11

你可以像这样创建一个倒计时器

GlobalScope.launch(Dispatchers.Main) {
            val totalSeconds = TimeUnit.MINUTES.toSeconds(2)
            val tickSeconds = 1
            for (second in totalSeconds downTo tickSeconds) {
                val time = String.format("%02d:%02d",
                    TimeUnit.SECONDS.toMinutes(second),
                    second - TimeUnit.MINUTES.toSeconds(TimeUnit.SECONDS.toMinutes(second))
                )
                timerTextView?.text = time
                delay(1000)
            }
            timerTextView?.text = "Done!"
        }

4
请改用 lifecycleScope,以避免泄露 Fragment 或 Activity。 - Tenfour04
1
好的解决方案,但我不同意GlobalScope。viewModelScope或lifecycleScope更可取。 - Taxist Samael
1
我只想提一下,这个解决方案并不是100%准确的。倒计时会比120秒多一点,因为日期格式化和在“TextView”上设置文本也需要一些时间。我猜在大多数情况下,这不会成为问题,否则你应该坚持使用“flow {}”解决方案(结合“buffer()”,“conflate()”或“flowOn”)。 - Lukas Lechner

6
这里是使用 Kotlin Flow 的可能解决方案。
fun tickFlow(millis: Long) = callbackFlow<Int> {
    val timer = Timer()
    var time = 0
    timer.scheduleAtFixedRate(
        object : TimerTask() {
            override fun run() {
                try { offer(time) } catch (e: Exception) {}
                time += 1
            }
        },
        0,
        millis)
    awaitClose {
        timer.cancel()
    }
}

使用方法

val job = CoroutineScope(Dispatchers.Main).launch {
   tickFlow(125L).collect {
      print(it)
   }
}

...

job.cancel()

3
你为什么要用协程来包装计时器?这根本没有意义;要么使用计时器,要么使用协程。 - Farid
例如,在视图模型中,它的作用域类似于CoroutineScope(Dispatchers.Main + viewModelJob),这可能很有用。 如果您需要定期执行网络检查,则可以使用该范围启动tick协程,并与所有其他协程(例如网络请求或数据库查询)一起运行,然后一次性取消viewModelJob。 顺便说一句,如果对您无用,那没关系,这是公平的。 - Dario Pellegrini
1
只是为了明确,取消协程不会对计时器产生任何影响,您必须使您的流cancellable()。然而,即使您使您的流cancellable(),取消您的流和作业也无法停止计时器的“滴答声”。除此之外,计时器已经在使用另一个线程,我真的不明白将其包装在流中的原因。 - Farid
1
我确认使用上述代码,当执行job.cancel()时,计时器停止。我在一个Fragment中的真实应用程序中使用了它。 - Dario Pellegrini
@Farid 我有一个使用案例:1)我已经有了带有协程的代码 2)延迟不够准确。最终我使用了这个解决方案(为了准确性,我使用了ScheduledExecutorService而不是Timer) - undefined

4

编辑:Joffrey已经用更好的方法更新了他的解决方案。

旧的:

Joffrey 的解决方案对我有用,但是在 for 循环中遇到了问题。

我必须像这样在 for 循环中取消我的计时器:

            val ticker = ticker(500, 0)
            for (event in ticker) {
                if (...) {
                    ticker.cancel()
                } else {
                    ...
                    }
                }
            }

但是ticker.cancel()抛出了一个CancellationException异常,因为for循环在此之后继续执行。

我必须使用while循环来检查通道是否未关闭以避免出现此异常。

                val ticker = ticker(500, 0)
                while (!ticker.isClosedForReceive && ticker.iterator().hasNext()) {
                    if (...) {
                        ticker.cancel()
                    } else {
                        ...
                        }
                    }
                }

1
如果你知道你想要停止循环,为什么不直接使用 break 退出循环呢?然后你可以在循环外取消计时器,这对我来说很有效。此外,你正在每次循环中创建一个新的迭代器,这可能不是你想要做的事情。 - Joffrey
有时候我们没有想到最简单的解决方案...你说得对,谢谢! - Benjamin Ledet
没问题 :) 话虽如此,我没想到在循环内部调用 cancel() 会失败,所以你教会了我一些东西。我需要进一步调查才能找到原因。 - Joffrey
好吧,使用协程的版本1.2.2时没有出错!但是我升级到了版本1.3.2,现在却出错了。也许在1.2.2版本中它本来就应该出错,然后他们修复了这个问题,或者是引入了一个bug... - Benjamin Ledet

3

具有启动、暂停和停止功能的计时器。

用法:

val timer = Timer(millisInFuture = 10_000L, runAtStart = false)
timer.start()

Timer类:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow

enum class PlayerMode {
    PLAYING,
    PAUSED,
    STOPPED
}

class Timer(
    val millisInFuture: Long,
    val countDownInterval: Long = 1000L,
    runAtStart: Boolean = false,
    val onFinish: (() -> Unit)? = null,
    val onTick: ((Long) -> Unit)? = null
) {
    private var job: Job = Job()
    private val _tick = MutableStateFlow(0L)
    val tick = _tick.asStateFlow()
    private val _playerMode = MutableStateFlow(PlayerMode.STOPPED)
    val playerMode = _playerMode.asStateFlow()

    private val scope = CoroutineScope(Dispatchers.Default)

    init {
        if (runAtStart) start()
    }

    fun start() {
        if (_tick.value == 0L) _tick.value = millisInFuture
        job.cancel()
        job = scope.launch(Dispatchers.IO) {
            _playerMode.value = PlayerMode.PLAYING
            while (isActive) {
                if (_tick.value <= 0) {
                    job.cancel()
                    onFinish?.invoke()
                    _playerMode.value = PlayerMode.STOPPED
                    return@launch
                }
                delay(timeMillis = countDownInterval)
                _tick.value -= countDownInterval
                onTick?.invoke(this@Timer._tick.value)
            }
        }
    }

    fun pause() {
        job.cancel()
        _playerMode.value = PlayerMode.PAUSED
    }

    fun stop() {
        job.cancel()
        _tick.value = 0
        _playerMode.value = PlayerMode.STOPPED
    }
}

我受到这里的启发。


你为什么要转向使用 Dispatcher IO? - Arkaha

2

这是基于Joffrey的回答,使用Flow版本的Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)

fun tickerFlow(start: Long,
               count: Long,
               initialDelayMs: Long,
               periodMs: Long) = flow<Long> {
    delay(initialDelayMs)

    var counter = start
    while (counter <= count) {
        emit(counter)
        counter += 1

        delay(periodMs)
    }
}

//...

tickerFlow(1, 5, 0, 1_000L)

0
最近在编程中使用它,基于定时器和最大缓冲区大小来分块值。
private object Tick

@Suppress("UNCHECKED_CAST")
fun <T : Any> Flow<T>.chunked(size: Int, initialDelay: Long, delay: Long): Flow<List<T>> = flow {
    if (size <= 0) throw IllegalArgumentException("invalid chunk size $size - expected > 0")
    val chunkedList = mutableListOf<T>()
    if (delay > 0L) {
        merge(this@chunked, timerFlow(initialDelay, delay, Tick))
    } else {
        this@chunked
    }
        .collect {
            when (it) {
                is Tick -> {
                    if (chunkedList.isNotEmpty()) {
                        emit(chunkedList.toList())
                        chunkedList.clear()
                    }
                }
                else -> {
                    chunkedList.add(it as T)
                    if (chunkedList.size >= size) {
                        emit(chunkedList.toList())
                        chunkedList.clear()
                    }
                }
            }
        }
    if (chunkedList.isNotEmpty()) {
        emit(chunkedList.toList())
    }
}

fun <T> timerFlow(initialDelay: Long, delay: Long, o: T) = flow {
    if (delay <= 0) throw IllegalArgumentException("invalid delay $delay - expected > 0")
    if (initialDelay > 0) delay(initialDelay)
    while (currentCoroutineContext().isActive) {
        emit(o)
        delay(delay)
    }
}

0

复制了Observable.intervalRange(0, 90, 0, 1, TimeUnit.SECONDS)(每秒发出一个项目,共90秒):

fun intervalRange(start: Long, count: Long, initialDelay: Long = 0, period: Long, unit: TimeUnit): Flow<Long> {
        return flow<Long> {
            require(count >= 0) { "count >= 0 required but it was $count" }
            require(initialDelay >= 0) { "initialDelay >= 0 required but it was $initialDelay" }
            require(period > 0) { "period > 0 required but it was $period" }

            val end = start + (count - 1)
            require(!(start > 0 && end < 0)) { "Overflow! start + count is bigger than Long.MAX_VALUE" }

            if (initialDelay > 0) {
                delay(unit.toMillis(initialDelay))
            }

            var counter = start
            while (counter <= count) {
                emit(counter)
                counter += 1

                delay(unit.toMillis(period))
            }
        }
    }

使用方法:

lifecycleScope.launch {
intervalRange(0, 90, 0, 1, TimeUnit.SECONDS)
                .onEach {
                    Log.d(TAG, "intervalRange: ${90 - it}")
                }
                .lastOrNull()
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接