我该如何使用Tokio来实现一组定时器,在多个线程中定期重置和取消?当计时器到期时,将执行回调函数。
与Go的time.AfterFunc类似的API是我所需要的:
我找到的唯一一个实现了(足够)类似API的板条箱是timer,它通过生成2个线程的方式来实现。当计时器经常重置时,这很快变得不可行。
显而易见的答案是使用Tokio,问题是如何优雅地做到这一点。
一种选择是每次更新计时器时都生成一个新的绿色线程,并使用原子取消上一个计时器,通过在此原子上条件化回调的执行,例如这个伪Rust:
问题在于,我为已经取消的计时器维护状态,可能要维护几分钟。此外,这似乎也不太优雅。
除了
不清楚如何在多线程应用程序中使用此库,即:
与Go的time.AfterFunc类似的API是我所需要的:
package main
import (
"fmt"
"time"
)
func main() {
t := time.AfterFunc(time.Hour, func() {
// happens every 2 seconds with 1 second delay
fmt.Println("fired")
})
for {
t.Reset(time.Second)
time.Sleep(time.Second * 2)
}
}
我找到的唯一一个实现了(足够)类似API的板条箱是timer,它通过生成2个线程的方式来实现。当计时器经常重置时,这很快变得不可行。
显而易见的答案是使用Tokio,问题是如何优雅地做到这一点。
一种选择是每次更新计时器时都生成一个新的绿色线程,并使用原子取消上一个计时器,通过在此原子上条件化回调的执行,例如这个伪Rust:
tokio::run({
// for every timer spawn with a new "cancel" atomic
tokio::spawn({
Delay::new(Instant::now() + Duration::from_millis(1000))
.map_err(|e| panic!("timer failed; err={:?}", e))
.and_then(|_| {
if !cancelled.load(Ordering::Acquire) {
println!("fired");
}
Ok(())
})
})
})
问题在于,我为已经取消的计时器维护状态,可能要维护几分钟。此外,这似乎也不太优雅。
除了
tokio::time::Delay
之外,tokio::time::DelayQueue
也似乎适用。特别是,通过使用从“插入”返回的Key
引用来重置和取消计时器的能力。不清楚如何在多线程应用程序中使用此库,即:
这将在任务通过其键取消计时器和任务从返回值表示插入并用作删除和重置的参数。请注意,Key是令牌,并且一旦通过调用poll到达或调用remove将值从队列中删除,就会被重用。此时,调用者必须小心,不要再次使用返回的Key,因为它可能引用队列中的另一个项。
DelayQueue
流中消耗计时器事件之间创建竞争条件,导致恐慌或取消不相关的计时器。