为什么在接连调用 crossbeam_channel::select 和 tokio::spawn 时,tokio::spawn 会有延迟?

4

我正在创建一个任务,这个任务将会派生其他任务。其中一些任务需要一定的时间来完成,因此它们不能被等待,但是它们可以并行运行:

src/main.rs

use crossbeam::crossbeam_channel::{bounded, select};

#[tokio::main]
async fn main() {
    let (s, r) = bounded::<usize>(1);

    tokio::spawn(async move {
        let mut counter = 0;
        loop {
            let loop_id = counter.clone();
            tokio::spawn(async move { // why this one was not fired?
                println!("inner task {}", loop_id);
            }); // .await.unwrap(); - solves issue, but this is long task which cannot be awaited
            println!("loop {}", loop_id);
            select! {
                recv(r) -> rr => {
                    // match rr {
                    //     Ok(ee) => {
                    //         println!("received from channel {}", loop_id);
                    //         tokio::spawn(async move {
                    //             println!("received from channel task {}", loop_id);
                    //         });
                    //     },
                    //     Err(e) => println!("{}", e),
                    // };
                },
                // more recv(some_channel) -> 
            }
            counter = counter + 1;
        }
    });

    // let s_clone = s.clone();
    // tokio::spawn(async move {
    //     s_clone.send(2).unwrap();
    // });

    loop {
        // rest of the program
    }
}

我注意到了奇怪的行为。这个输出:

loop 0

我本以为它也会输出inner task 0
如果我向通道发送一个值,输出将是:
loop 0
inner task 0
loop 1

这里缺少了内部任务1

为什么内部任务会延迟一次循环后再启动?

我第一次注意到这种行为是在“从通道接收任务”中出现一个延迟的一次循环,但是当我缩小代码以准备样本时,这种情况开始发生在“内部任务”中。可能值得一提的是,如果我将第二个tokio::spawn直接写在另一个tokio::spawn后面,只有最后一个会出现这个问题。在调用tokio::spawnselect!时,是否有什么需要注意的地方?是什么导致了这一次循环的延迟?

Cargo.toml 依赖项

[dependencies]
tokio = { version = "0.2", features = ["full"] }
crossbeam = "0.7"

Rust 1.46,Windows 10
1个回答

4
select!是阻塞的,而tokio::spawn文档中说明

生成的任务可能在当前线程上执行,也可能被发送到另一个线程上执行。

在这种情况下,select!“future”实际上是一个阻塞函数,并且spawn不使用新线程(无论是第一次调用还是循环内部的调用)。 由于您没有告诉tokio您将要阻止,因此tokio认为不需要另一个线程(从tokio的角度来看,您只有3个不应阻止的futures,那么您为什么还需要另一个线程呢?)。

解决方案是为select!-ing闭包使用tokio::task::spawn_blocking(它将不再是未来,因此async move {}现在是move || {})。 现在,tokio将知道此功能实际上会阻止,并将其移动到另一个线程中(同时在其他执行线程中保留所有实际的futures)。

use crossbeam::crossbeam_channel::{bounded, select};

#[tokio::main]
async fn main() {
    let (s, r) = bounded::<usize>(1);

    tokio::task::spawn_blocking(move || {
        // ...
    });

    loop {
        // rest of the program
    }
}

Playground链接

另一种可能的解决方案是使用非阻塞通道,比如tokio::sync::mpsc,你可以在上面使用await并获得期望的行为,就像这个Playground示例中的直接 recv().await 或使用 tokio::select!

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut s, mut r) = mpsc::channel::<usize>(1);

    tokio::spawn(async move {
        loop {
            // ...
            tokio::select! {
                Some(i) = r.recv() => {
                    println!("got = {}", i);
                }
            }
        }
    });

    loop {
        // rest of the program
    }
}

玩耍链接


非常感谢您的出色解释!这个程序按预期运行。 - Krzysiu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接