如何将任务添加到在另一个线程上运行的Tokio事件循环中?

6
1个回答

7
根据文档所述:
返回的句柄可用于在此运行时上生成任务,并且可以克隆以允许将Handle移动到其他线程。
以下是在一个线程中启动事件循环并在第二个线程上生成任务的示例。
use futures::future; // 0.3.5
use std::{thread, time::Duration};
use tokio::{runtime::Runtime, time}; // 0.2.21

fn main() {
    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
    let (handle_tx, handle_rx) = std::sync::mpsc::channel();

    let tokio_thread = thread::spawn(move || {
        let mut runtime = Runtime::new().expect("Unable to create the runtime");

        eprintln!("Runtime created");

        // Give a handle to the runtime to another thread.
        handle_tx
            .send(runtime.handle().clone())
            .expect("Unable to give runtime handle to another thread");

        // Continue running until notified to shutdown
        runtime.block_on(async {
            shutdown_rx.await.expect("Error on the shutdown channel");
        });

        eprintln!("Runtime finished");
    });

    let another_thread = thread::spawn(move || {
        let handle = handle_rx
            .recv()
            .expect("Could not get a handle to the other thread's runtime");

        eprintln!("Another thread created");

        let task_handles: Vec<_> = (0..10)
            .map(|value| {
                // Run this future in the other thread's runtime
                handle.spawn(async move {
                    eprintln!("Starting task for value {}", value);
                    time::delay_for(Duration::from_secs(2)).await;
                    eprintln!("Finishing task for value {}", value);
                })
            })
            .collect();

        // Finish all pending tasks
        handle.block_on(async move {
            future::join_all(task_handles).await;
        });

        eprintln!("Another thread finished");
    });

    another_thread.join().expect("Another thread panicked");

    shutdown_tx
        .send(())
        .expect("Unable to shutdown runtime thread");

    tokio_thread.join().expect("Tokio thread panicked");
}

Runtime created
Another thread created
Starting task for value 0
Starting task for value 1
Starting task for value 2
Starting task for value 3
Starting task for value 4
Starting task for value 5
Starting task for value 6
Starting task for value 7
Starting task for value 8
Starting task for value 9
Finishing task for value 0
Finishing task for value 5
Finishing task for value 4
Finishing task for value 3
Finishing task for value 9
Finishing task for value 2
Finishing task for value 1
Finishing task for value 7
Finishing task for value 8
Finishing task for value 6
Another thread finished
Runtime finished

Tokio 0.1的解决方案可以在此帖子的修订历史中找到

另请参阅:


调用 handle_rx.recv() 的开销应该相对较小,对吗?我在想是否应该使用 lazy_static 或者动态访问是否足够。 - jhpratt
也许对于一般用例(包括我的),更大的问题是如何在文件之间共享tx/rx通道?显而易见的方法是通过pub static,但是lazy_static不允许解构元组,并且元组的字段是私有的。 - jhpratt
@jhpratt 是的,我得到单个Handle的传递应该是非常轻量级的。我不明白你的其他问题:“文件”在这里不是一个有意义的单位。你需要以某种方式为每个线程提供Handle的副本,并且这将根据你如何启动它们而变化。 - Shepmaster
其实我有点想通了。我最终做的是创建一个可变静态变量(Option<Handle>)。这需要使用 unsafe,但考虑到事物的严格排序,我并不认为这是个问题。如果克隆成为问题,我肯定会研究一下 Rc 等等。 - jhpratt
1
@JanusTroelsen 已更新。两个答案都不需要用于生成任务的通道。这些通道用于在线程之间传输句柄并告诉运行时何时退出。如果您有不同的组织方式,您的代码可能不需要这些通道。 - Shepmaster
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接