如何在future-rs中封装阻塞I/O的最佳方法?

18
我阅读了tokio文档,并想知道封装昂贵同步I/O的最佳方法。
使用反应堆框架,我们得到了绿色线程模型的优势:少量操作系统线程通过执行器处理大量并发任务。
Tokio的未来模型是需求驱动的,这意味着未来本身将轮询其内部状态以提供有关其完成情况的信息;允许反压和取消功能。据我所知,未来的轮询阶段必须是非阻塞的才能正常工作。
我想要封装的I/O可以看作是长时间的原子和昂贵的操作。理想情况下,一个独立的任务将执行I/O,并且相关的未来将轮询I/O线程以获取完成状态。
我看到的两个选项是:
- 将阻塞I/O包含在未来的poll函数中。 - 生成一个操作系统线程来执行I/O,并使用未来机制轮询其状态,如文档中所示
据我所知,这两种解决方案都不是最优的,也不能充分利用绿色线程模型(第一种方案在文档中不建议使用,第二种方案则不能通过反应堆框架提供的执行器)。是否有其他解决方案?

2
选项3:让长时间运行的操作在线程池上运行,该线程池在完成后向未来发出信号(从IO线程轮询)。据我所知,tokio已经支持这种方式,使用了某种ThreadPool执行器(而不是IO执行器)。 - Matthias247
2
根据@Matthias247的评论,用于在线程池上运行futures的crate是futures-cpupool - Joe Clay
1个回答

23

理想情况下,一个独立的任务会执行 I/O 操作,相关的 future 会轮询 I/O 线程来获取完成状态。

是的,这是异步执行的推荐方法。请注意,这不仅限于 I/O 操作,而是适用于任何长时间运行的同步任务!

Futures crate

ThreadPool 类型是为此创建的1

在这种情况下,您可以将工作分配给池中运行。池本身会执行工作以检查工作是否已完成,并返回实现了 Future 特性的类型。

use futures::{
    executor::{self, ThreadPool},
    future,
    task::{SpawnError, SpawnExt},
}; // 0.3.1, features = ["thread-pool"]
use std::{thread, time::Duration};

async fn delay_for(pool: &ThreadPool, seconds: u64) -> Result<u64, SpawnError> {
    pool.spawn_with_handle(async {
        thread::sleep(Duration::from_secs(3));
        3
    })?
    .await;
    Ok(seconds)
}

fn main() -> Result<(), SpawnError> {
    let pool = ThreadPool::new().expect("Unable to create threadpool");

    let a = delay_for(&pool, 3);
    let b = delay_for(&pool, 1);

    let c = executor::block_on(async {
        let (a, b) = future::join(a, b).await;

        Ok(a? + b?)
    });

    println!("{}", c?);
    Ok(())
}
您可以看到总时间只有3秒钟:
% time ./target/debug/example
4

real    3.010
user    0.002
sys     0.003

1 — 有一些讨论认为目前的实现可能不是最适用于阻塞操作的,但暂时足够。

Tokio

这里我们使用task::spawn_blocking

use futures::future; // 0.3.15
use std::{thread, time::Duration};
use tokio::task; // 1.7.1, features = ["full"]

async fn delay_for(seconds: u64) -> Result<u64, task::JoinError> {
    task::spawn_blocking(move || {
        thread::sleep(Duration::from_secs(seconds));
        seconds
    })
    .await?;
    Ok(seconds)
}

#[tokio::main]
async fn main() -> Result<(), task::JoinError> {
    let a = delay_for(3);
    let b = delay_for(1);

    let (a, b) = future::join(a, b).await;
    let c = a? + b?;

    println!("{}", c);

    Ok(())
}

请参阅Tokio文档中的CPU绑定任务和阻塞代码

补充说明

请注意,这不是睡眠的有效方式,它只是一些阻塞操作的占位符。如果你确实需要睡眠,请使用像futures-timertokio::time::sleep之类的东西。更多详细信息请参见为什么Future::select首先选择睡眠时间较长的future?

两种解决方案都不是最优的,不能充分发挥绿色线程模型的优势

没错 - 因为你没有异步操作!你试图结合两种不同的方法论,必须得有一个丑陋的部分来在它们之间进行转换。

第二种解决方案没有通过反应器框架提供的执行程序

我不确定你在这里的意思。有一个由block_ontokio::main隐式创建的执行程序。线程池有一些内部逻辑来检查线程是否完成,但这只会在用户的执行程序poll它时被触发。


感谢您的回答。我所说的执行器是核心事件循环,如果我理解正确的话,它基本上不会生成任何线程。 虽然我认为我开始理解这个问题了:核心事件循环和CPU池执行器是互补的东西,最有效的取决于情况(核心事件循环用于低级快速轮询 - 如操作系统套接字轮询 - 和CPU池用于重型IO,如网络或磁盘)。 - Momh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接