理想情况下,一个独立的任务会执行 I/O 操作,相关的 future 会轮询 I/O 线程来获取完成状态。
是的,这是异步执行的推荐方法。请注意,这不仅限于 I/O 操作,而是适用于任何长时间运行的同步任务!
Futures crate
ThreadPool
类型是为此创建的1。
在这种情况下,您可以将工作分配给池中运行。池本身会执行工作以检查工作是否已完成,并返回实现了 Future
特性的类型。
use futures::{
executor::{self, ThreadPool},
future,
task::{SpawnError, SpawnExt},
};
use std::{thread, time::Duration};
async fn delay_for(pool: &ThreadPool, seconds: u64) -> Result<u64, SpawnError> {
pool.spawn_with_handle(async {
thread::sleep(Duration::from_secs(3));
3
})?
.await;
Ok(seconds)
}
fn main() -> Result<(), SpawnError> {
let pool = ThreadPool::new().expect("Unable to create threadpool");
let a = delay_for(&pool, 3);
let b = delay_for(&pool, 1);
let c = executor::block_on(async {
let (a, b) = future::join(a, b).await;
Ok(a? + b?)
});
println!("{}", c?);
Ok(())
}
您可以看到总时间只有3秒钟:
% time ./target/debug/example
4
real 3.010
user 0.002
sys 0.003
1 — 有一些讨论认为目前的实现可能不是最适用于阻塞操作的,但暂时足够。
Tokio
这里我们使用task::spawn_blocking
use futures::future;
use std::{thread, time::Duration};
use tokio::task;
async fn delay_for(seconds: u64) -> Result<u64, task::JoinError> {
task::spawn_blocking(move || {
thread::sleep(Duration::from_secs(seconds));
seconds
})
.await?;
Ok(seconds)
}
#[tokio::main]
async fn main() -> Result<(), task::JoinError> {
let a = delay_for(3);
let b = delay_for(1);
let (a, b) = future::join(a, b).await;
let c = a? + b?;
println!("{}", c);
Ok(())
}
请参阅Tokio文档中的CPU绑定任务和阻塞代码。
补充说明
请注意,这不是睡眠的有效方式,它只是一些阻塞操作的占位符。如果你确实需要睡眠,请使用像futures-timer或tokio::time::sleep
之类的东西。更多详细信息请参见为什么Future::select首先选择睡眠时间较长的future?
两种解决方案都不是最优的,不能充分发挥绿色线程模型的优势
没错 - 因为你没有异步操作!你试图结合两种不同的方法论,必须得有一个丑陋的部分来在它们之间进行转换。
第二种解决方案没有通过反应器框架提供的执行程序
我不确定你在这里的意思。有一个由block_on
或tokio::main
隐式创建的执行程序。线程池有一些内部逻辑来检查线程是否完成,但这只会在用户的执行程序poll
它时被触发。
futures-cpupool
。 - Joe Clay