如何使用Tokio生成多个可取消计时器?

3
我该如何使用Tokio来实现一组定时器,在多个线程中定期重置和取消?当计时器到期时,将执行回调函数。
与Go的time.AfterFunc类似的API是我所需要的:
package main

import (
    "fmt"
    "time"
)

func main() {
    t := time.AfterFunc(time.Hour, func() {
        // happens every 2 seconds with 1 second delay
        fmt.Println("fired")
    })

    for {
        t.Reset(time.Second)
        time.Sleep(time.Second * 2)
    }
}

我找到的唯一一个实现了(足够)类似API的板条箱是timer,它通过生成2个线程的方式来实现。当计时器经常重置时,这很快变得不可行。
显而易见的答案是使用Tokio,问题是如何优雅地做到这一点。
一种选择是每次更新计时器时都生成一个新的绿色线程,并使用原子取消上一个计时器,通过在此原子上条件化回调的执行,例如这个伪Rust:
tokio::run({
    // for every timer spawn with a new "cancel" atomic
    tokio::spawn({
        Delay::new(Instant::now() + Duration::from_millis(1000))
            .map_err(|e| panic!("timer failed; err={:?}", e))
            .and_then(|_| {
                if !cancelled.load(Ordering::Acquire) {
                    println!("fired");
                }
                Ok(())
            })
    })
})

问题在于,我为已经取消的计时器维护状态,可能要维护几分钟。此外,这似乎也不太优雅。
除了tokio::time::Delay之外,tokio::time::DelayQueue也似乎适用。特别是,通过使用从“插入”返回的Key引用来重置和取消计时器的能力。
不清楚如何在多线程应用程序中使用此库,即:

返回值表示插入并用作删除和重置的参数。请注意,Key是令牌,并且一旦通过调用poll到达或调用remove将值从队列中删除,就会被重用。此时,调用者必须小心,不要再次使用返回的Key,因为它可能引用队列中的另一个项。

这将在任务通过其键取消计时器和任务从DelayQueue流中消耗计时器事件之间创建竞争条件,导致恐慌或取消不相关的计时器。
1个回答

3

您可以使用 Select 组合器和 Tokio 一起使用,处理IT技术方面的内容。它会返回第一个已完成的 future 的结果,然后忽略/停止轮询其他未完成的 future。

作为第二个 future,我们可以使用来自 oneshot::channel 的接收器来创建一个信号,以结束我们的组合器 future。

use futures::sync::oneshot;
use futures::*;
use std::thread;
use std::time::{Duration, Instant};
use tokio::timer::Delay;

fn main() {
    let (interrupter, interrupt_handler) = oneshot::channel::<()>();

    //signal to cancel delayed call
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(500)); //increase this value more than 1000ms to see is delayed call is working or not.
        interrupter
            .send(())
            .expect("Not able to cancel delayed future");
    });

    let delayed = Delay::new(Instant::now() + Duration::from_millis(1000))
        .map_err(|e| panic!("timer failed; err={:?}", e))
        .and_then(|_| {
            println!("Delayed Call Executed!");

            Ok(())
        });

    tokio::run(delayed.select(interrupt_handler).then(|_| Ok(())));
}

游乐场


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接