如何在Tokio的非主线程中运行异步任务?

20
use std::thread;
use tokio::task; // 0.3.4

#[tokio::main]
async fn main() {
    thread::spawn(|| {
        task::spawn(async {
            println!("123");
        });
    })
    .join();
}

当我编译时,我会得到一个警告:

warning: unused `std::result::Result` that must be used
  --> src/main.rs:6:5
   |
6  | /     thread::spawn(|| {
7  | |         task::spawn(async {
8  | |             println!("123");
9  | |         });
10 | |     })
11 | |     .join();
   | |____________^
   |
   = note: `#[warn(unused_must_use)]` on by default
   = note: this `Result` may be an `Err` variant, which should be handled

执行时出现错误:

thread '<unnamed>' panicked at 'must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`', src/main.rs:7:9

你在这里的实际目标是什么?为什么要尝试在不同的线程上生成该任务?我认为答案取决于你想要做什么。 - Frxstrem
@Frxstrem 我需要从一个非主线程(t)启动另一个线程,并且线程(t)应该继续运行。在此之前,我使用了线程池。类似于 https://repl.it/repls/AssuredWellmadeParentheses 现在我决定用async/await Tokio替换线程池。 - ibse
1
真正的问题是:你为什么要混合使用线程和任务? - mcarton
如果您使用tokio,最好在任何地方都使用它。任务不是线程,不能从任何地方生成,只能从由tokio管理的线程中生成。您可能可以使用tokio通道在两者之间进行通信,但这需要大量额外的工作和非常不同的代码结构。 - Richard Matheson
1
你需要获取运行时的句柄,并将其传递给线程。你可能需要手动创建运行时才能做到这一点。然后,该句柄将处理从调用它的线程到tokio工作线程的任务传送。 - user1937198
我假设非主线程是由你无法控制的某些外部因素创建的。 - user1937198
2个回答

23
关键是需要获取一个Tokio Handle。这是对Runtime的引用,它允许您从运行时之外生成异步任务。
使用#[tokio::main]时,最简单的获取Handle的方法是通过Handle::current在生成另一个线程之前,然后将句柄提供给可能希望启动异步任务的每个线程:
use std::thread;
use tokio::runtime::Handle; // 0.3.4

#[tokio::main]
async fn main() {
    let threads: Vec<_> = (0..3)
        .map(|thread_id| {
            let handle = Handle::current();

            thread::spawn(move || {
                eprintln!("Thread {} started", thread_id);

                for task_id in 0..3 {
                    handle.spawn(async move {
                        eprintln!("Thread {} / Task {}", thread_id, task_id);
                    });
                }

                eprintln!("Thread {} finished", thread_id);
            })
        })
        .collect();

    for t in threads {
        t.join().expect("Thread panicked");
    }
}

你还可以创建一个全局可变单例Mutex<Option<Handle>>,将其初始化为None,然后在tokio::main函数中尽早设置为Some。然后,当需要时,可以获取该全局变量,解包它,并克隆Handle
use once_cell::sync::Lazy; // 1.5.2

static HANDLE: Lazy<Mutex<Option<Handle>>> = Lazy::new(Default::default);

*HANDLE.lock().unwrap() = Some(Handle::current());

let handle = HANDLE.lock().unwrap().as_ref().unwrap().clone();

另请参见:


10
我有一个处理作业的应用程序,它公开了一个Web API以添加和处理作业,但API请求不应等待作业完成(这可能需要一段时间)。我使用Server-Sent Events广播作业结果。这意味着主API服务器在main中执行,带有#[tokio::main],但是任务执行器应该在哪里运行?在作业执行器中,我将有大量等待:例如下载。它们将干扰Web API服务器。关键问题是如何同时启动两个执行?
在这种情况下,您需要使用thread::spawn创建一个单独的线程,在其中创建Tokio执行器。您会收到的错误是,在第二个线程内没有Tokio执行器(运行时)。您需要手动创建一个并告诉它运行您的任务。更简单的方法是使用Runtime API:
use tokio::runtime::Runtime; // 0.2.23

// Create the runtime
let rt = Runtime::new().unwrap();

// Spawn a future onto the runtime
rt.spawn(async {
    println!("now running on a worker thread");
});

在您的主线程中,使用#[tokio::main]已经可以使用执行器。在添加此属性之前,运行时是手动创建的。
如果您想坚持使用async/await哲学,可以使用join
use tokio; // 0.2.23

#[tokio::main]
async fn main() {
    let (_, _) = tokio::join!(start_server_listener(), start_job_processor());
}

这就是为什么大多数答案都在质疑你的方法。虽然很少见,但我相信有一些场景需要异步运行时在另一个线程上,同时也能够手动配置运行时的好处。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接