我想要在Rocket服务器旁边启动一个Tokio事件循环,然后稍后向该循环添加事件。我阅读了如何在新线程上启动tokio::Delay以允许主循环继续运行?,但我仍然不清楚如何实现我的目标。
我想要在Rocket服务器旁边启动一个Tokio事件循环,然后稍后向该循环添加事件。我阅读了如何在新线程上启动tokio::Delay以允许主循环继续运行?,但我仍然不清楚如何实现我的目标。
Handle
移动到其他线程。use futures::future; // 0.3.5
use std::{thread, time::Duration};
use tokio::{runtime::Runtime, time}; // 0.2.21
fn main() {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let (handle_tx, handle_rx) = std::sync::mpsc::channel();
let tokio_thread = thread::spawn(move || {
let mut runtime = Runtime::new().expect("Unable to create the runtime");
eprintln!("Runtime created");
// Give a handle to the runtime to another thread.
handle_tx
.send(runtime.handle().clone())
.expect("Unable to give runtime handle to another thread");
// Continue running until notified to shutdown
runtime.block_on(async {
shutdown_rx.await.expect("Error on the shutdown channel");
});
eprintln!("Runtime finished");
});
let another_thread = thread::spawn(move || {
let handle = handle_rx
.recv()
.expect("Could not get a handle to the other thread's runtime");
eprintln!("Another thread created");
let task_handles: Vec<_> = (0..10)
.map(|value| {
// Run this future in the other thread's runtime
handle.spawn(async move {
eprintln!("Starting task for value {}", value);
time::delay_for(Duration::from_secs(2)).await;
eprintln!("Finishing task for value {}", value);
})
})
.collect();
// Finish all pending tasks
handle.block_on(async move {
future::join_all(task_handles).await;
});
eprintln!("Another thread finished");
});
another_thread.join().expect("Another thread panicked");
shutdown_tx
.send(())
.expect("Unable to shutdown runtime thread");
tokio_thread.join().expect("Tokio thread panicked");
}
Runtime created
Another thread created
Starting task for value 0
Starting task for value 1
Starting task for value 2
Starting task for value 3
Starting task for value 4
Starting task for value 5
Starting task for value 6
Starting task for value 7
Starting task for value 8
Starting task for value 9
Finishing task for value 0
Finishing task for value 5
Finishing task for value 4
Finishing task for value 3
Finishing task for value 9
Finishing task for value 2
Finishing task for value 1
Finishing task for value 7
Finishing task for value 8
Finishing task for value 6
Another thread finished
Runtime finished
另请参阅:
handle_rx.recv()
的开销应该相对较小,对吗?我在想是否应该使用lazy_static
或者动态访问是否足够。 - jhprattpub static
,但是lazy_static
不允许解构元组,并且元组的字段是私有的。 - jhprattHandle
的传递应该是非常轻量级的。我不明白你的其他问题:“文件”在这里不是一个有意义的单位。你需要以某种方式为每个线程提供Handle
的副本,并且这将根据你如何启动它们而变化。 - ShepmasterOption<Handle>
)。这需要使用unsafe
,但考虑到事物的严格排序,我并不认为这是个问题。如果克隆成为问题,我肯定会研究一下Rc
等等。 - jhpratt