如何在循环中运行多个Tokio异步任务而不使用tokio::spawn?

3
我制作了一个同时显示天气的LED时钟。我的程序循环执行几个不同的操作,每个操作有不同的间隔时间:
  • 每50毫秒更新LED,
  • 每1秒检查光线水平(以调整亮度),
  • 每10分钟获取天气,
  • 还有一些其他的操作,但这些都不重要。
更新LED是最关键的:当获取天气信息时,我不希望此过程被延迟。这应该不成问题,因为获取天气信息主要是异步HTTP调用。
以下是我拥有的代码:
let mut measure_light_stream = tokio::time::interval(Duration::from_secs(1));
let mut update_weather_stream = tokio::time::interval(WEATHER_FETCH_INTERVAL);
let mut update_leds_stream = tokio::time::interval(UPDATE_LEDS_INTERVAL);
loop {
    tokio::select! {
      _ = measure_light_stream.tick() => {
        let light = lm.get_light();
        light_smooth.sp = light;
      },
      _ = update_weather_stream.tick() => {
        let fetched_weather = weather_service.get(&config).await;
        // Store the fetched weather for later access from the displaying function.
        weather_clock.weather = fetched_weather.clone();
      },
      _ = update_leds_stream.tick() => {
        // Some code here that actually sets the LEDs.
        // This code accesses the weather_clock, the light level etc.
      },
    }
}

我发现代码没有按照我的预期工作 - 获取天气信息阻塞了循环的执行。我明白原因是 tokio::select!的文档说,一旦表达式update_weather_stream.tick()完成,其他分支就会被取消。
那么,在等待网络获取天气信息时,如何使LED继续更新呢?我发现我可以使用tokio::spawn来启动一个单独的非阻塞“线程”来获取天气信息,但接下来我遇到了不可发送的weather_service,更别提不能在线程之间共享的weather_clock问题。我不想增加这些复杂性,我希望所有内容都在一个单一的线程中运行,就像select!一样。 可重现的示例
use std::time::Duration;
use tokio::time::{interval, sleep};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut slow_stream = interval(Duration::from_secs(3));
    let mut fast_stream = interval(Duration::from_millis(200));
    // Note how access to this data is straightforward, I do not want
    // this to get more complicated, e.g. care about threads and Send.
    let mut val = 1;
    loop {
        tokio::select! {
          _ = fast_stream.tick() => {
            println!(".{}", val);
          },
          _ = slow_stream.tick() => {
            println!("Starting slow operation...");
            // The problem: During this await the dots are not printed.
            sleep(Duration::from_secs(1)).await;
            val += 1;
            println!("...done");
          },
        }
    }
}

1
可能在这里使用 spawn 是最好的想法。要在任务之间通信,可以使用 mpsc 通道。我也在 tokio 文档中错过了这样的例子。也许你可以创建一个上游问题来请求这种类型的问题。我很难找到任何关于这个的信息... - hellow
@hellow 我也想生成任务,但是 OP 说 weather_service 没有实现 Send,这使得任务变得更加困难(如果不使用类似 spawn_local 的东西)。weather_clock 也不应该在任务之间共享。 - Shepmaster
很难回答你的问题,因为它没有包含一个 [MRE]。我们无法确定代码中存在哪些 crates(及其版本)、types、traits、fields等。如果可能的话,您可以在Rust Playground上尝试重现错误,否则可以在全新的Cargo项目中进行,然后[编辑]您的问题以包括额外的信息。这里有一些Rust特定的MRE提示,您可以使用它们来缩小您的原始代码以便在此处发布。谢谢! - Shepmaster
@Shepmaster 添加了一个示例,尽管我不太清楚playground使用的版本是什么。我相信这并不是很重要。 - TPReal
2个回答

7
你可以使用 tokio::join! 在同一个任务中并发地运行多个异步操作。
这里是一个示例:
async fn measure_light(halt: &Cell<bool>) {
    while !halt.get() {
        let light = lm.get_light();
        // ....

        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

async fn blink_led(halt: &Cell<bool>) {
    while !halt.get() {
        // LED blinking code

        tokio::time::sleep(UPDATE_LEDS_INTERVAL).await;
    }
}

async fn poll_weather(halt: &Cell<bool>) {
    while !halt.get() {
        let weather = weather_service.get(&config).await;
        // ...

        tokio::time::sleep(WEATHER_FETCH_INTERVAL).await;
    }
}

// example on how to terminate execution
async fn terminate(halt: &Cell<bool>) {
    tokio::time::sleep(Duration::from_secs(10)).await;
    halt.set(true);
}

async fn main() {
    let halt = Cell::new(false);
    tokio::join!(
        measure_light(&halt),
        blink_led(&halt),
        poll_weather(&halt),
        terminate(&halt),
    );
}

如果你正在使用tokio::TcpStream或其他非阻塞IO,那么它应该允许并发执行。
我已经添加了一个Cell标志作为停止执行的示例。您可以使用相同的技术在join分支之间共享任何可变状态。
编辑:可以使用tokio::select!完成相同的操作。您代码的主要区别在于“业务逻辑”实际上在select等待的futures中。 select允许您放弃未完成的futures,而不是等待它们自行退出(因此不需要halt终止标志)。
async fn main() {
    tokio::select! {
        _ = measure_light() => {},
        _ = blink_led() = {},
        _ = poll_weather() => {},
    }
}

我会使用适当的同步原语,例如AtomicBool,而不是Cell - hellow
@hellow,由于这都是在同一个线程中完成的,因此在这里使用CellRefCell都可以。 - stepan
我会考虑这个。缺点是所有循环需要完成它们当前的休眠才能意识到停止发生了,这意味着最多需要10分钟的关闭时间,这并不是真正可接受的。 - TPReal
@TPReal 我刚意识到这段代码也可以与 select 一起使用,见底部。在这种情况下并发运行更多的是关于重新排列代码,而不是 join/select。 - stepan
然后你可以添加另一个分支,例如 tokio::signal::unix::signal(...).recv(),它会很好地中断整个过程,并且实际上它将是唯一能够完成的分支。看起来非常优雅。 - TPReal

1
这里提供一种具体的解决方案,基于Stepan答案的第二部分。
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    // Cell is an acceptable complication when accessing the data.
    let val = std::cell::Cell::new(1);
    tokio::select! {
      _ = async {loop {
        println!(".{}", val.get());
        sleep(Duration::from_millis(200)).await;
      }} => {},
      _ = async {loop {
        println!("Starting slow operation...");
        // The problem: During this await the dots are not printed.
        sleep(Duration::from_secs(1)).await;
        val.set(val.get() + 1);
        println!("...done");
        sleep(Duration::from_secs(3)).await;
      }} => {},
    }
}

Playground link


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接