我如何同步返回在异步Future中计算的值?

62

我正在尝试使用Hyper获取HTML页面的内容,并希望同步返回未来输出的结果。我意识到我本可以选择一个更好的例子,因为同步HTTP请求已经存在,但我更感兴趣的是是否能够从异步计算中返回值。

extern crate futures;
extern crate hyper;
extern crate hyper_tls;
extern crate tokio;

use futures::{future, Future, Stream};
use hyper::Client;
use hyper::Uri;
use hyper_tls::HttpsConnector;

use std::str;

fn scrap() -> Result<String, String> {
    let scraped_content = future::lazy(|| {
        let https = HttpsConnector::new(4).unwrap();
        let client = Client::builder().build::<_, hyper::Body>(https);

        client
            .get("https://hyper.rs".parse::<Uri>().unwrap())
            .and_then(|res| {
                res.into_body().concat2().and_then(|body| {
                    let s_body: String = str::from_utf8(&body).unwrap().to_string();
                    futures::future::ok(s_body)
                })
            }).map_err(|err| format!("Error scraping web page: {:?}", &err))
    });

    scraped_content.wait()
}

fn read() {
    let scraped_content = future::lazy(|| {
        let https = HttpsConnector::new(4).unwrap();
        let client = Client::builder().build::<_, hyper::Body>(https);

        client
            .get("https://hyper.rs".parse::<Uri>().unwrap())
            .and_then(|res| {
                res.into_body().concat2().and_then(|body| {
                    let s_body: String = str::from_utf8(&body).unwrap().to_string();
                    println!("Reading body: {}", s_body);
                    Ok(())
                })
            }).map_err(|err| {
                println!("Error reading webpage: {:?}", &err);
            })
    });

    tokio::run(scraped_content);
}

fn main() {
    read();
    let content = scrap();

    println!("Content = {:?}", &content);
}

这个示例代码已经编译通过,并且对 read() 的调用成功,但是对scrap()的调用会导致以下错误信息的恐慌:

Content = Err("Error scraping web page: Error { kind: Execute, cause: None }")

我知道在调用future对象的.wait()方法之前没有正确启动任务,但是我找不到正确的方法来启动它,甚至不确定是否可能。

3个回答

102

标准库的 Future

让我们使用这个简洁易懂、可重现的示例

async fn example() -> i32 {
    42
}

调用executor::block_on函数:

use futures::executor; // 0.3.1

fn main() {
    let v = executor::block_on(example());
    println!("{}", v);
}

Tokio

在任何函数(不仅限于main)上使用tokio::main属性,将其从异步函数转换为同步函数:

use tokio; // 0.3.5

#[tokio::main]
async fn main() {
    let v = example().await;
    println!("{}", v);
}

tokio::main 是一个宏,它将这个函数:

#[tokio::main]
async fn main() {}

转化为:

fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async { {} })
}

这里使用了 Runtime::block_on 作为基础,因此你也可以将其写成:

use tokio::runtime::Runtime; // 0.3.5

fn main() {
    let v = Runtime::new().unwrap().block_on(example());
    println!("{}", v);
}

进行测试时,您可以使用tokio::test

async-std

main函数从异步函数转换为同步函数,请在main函数上使用async_std::main属性:

use async_std; // 1.6.5, features = ["attributes"]

#[async_std::main]
async fn main() {
    let v = example().await;
    println!("{}", v);
}

对于测试,您可以使用async_std::test

未来 0.1

让我们将其用作我们的最小可重现示例

use futures::{future, Future}; // 0.1.27

fn example() -> impl Future<Item = i32, Error = ()> {
    future::ok(42)
}

对于简单情况,您只需要调用 wait

fn main() {
    let s = example().wait();
    println!("{:?}", s);
}

然而,这带有一个非常严重的警告:

该方法不适用于调用事件循环或类似的I/O情况,因为它会阻止事件循环继续进行(这会阻塞线程)。只有在保证与此future关联的阻塞工作将由另一个线程完成时,才应调用此方法。

Tokio

如果您正在使用Tokio 0.1,则应使用Tokio的Runtime::block_on方法:

use tokio; // 0.1.21

fn main() {
    let mut runtime = tokio::runtime::Runtime::new().expect("Unable to create a runtime");
    let s = runtime.block_on(example());
    println!("{:?}", s);
}

如果你查看block_on的实现,它实际上会把future的结果通过通道发送,然后在该通道上调用wait!这是可以的,因为Tokio保证将未来运行到完成。

另请参见:


1
在 futures 0.1 中,对于简单情况,你只需要调用 wait 是不是很清楚:它仅在 future 立即解决时才起作用。当你有一些需要在未来解决的事情时,即使是简单情况,你仍然需要运行时。 - attdona
@Shepmaster,在Rust 1.45中是否有一种标准的方法(不使用其他crate)来完成这个任务? - Amani
4
如果你认为在 Cargo.toml 中添加一行是一个“路障”,那么 Rust 可能不是你喜欢使用的语言。虽然有可能更多的东西会被移动到标准库中,但并没有真正强烈的推动力去做这件事,尤其是当有多种方法可以解决问题而且很容易添加依赖时。 - Shepmaster
另外,请不要过度使用标准库。而且,需要添加一行到Cargo.toml也不是这样做的理由。 - chpio
请注意,您需要在cargo.toml中添加futures以获取block_on功能,如下所示:futures = { version="0", features=["executor"] } - Chris
显示剩余2条评论

2

使用Tokio对我有效:

tokio::runtime::Runtime::new()?.block_on(fooAsyncFunction())?;

2
作为搜索引擎中“如何在Rust中从同步调用异步”的顶级结果,我决定在此分享我的解决方案。我认为这可能会有所帮助。
正如@Shepmaster所提到的,在版本0.1中,futures crate有一个美妙的方法.wait(),可以用于从同步函数调用异步函数。然而,这个必备的方法已经从后来的crate版本中删除了。
幸运的是,重新实现它并不难:
trait Block {
    fn wait(self) -> <Self as futures::Future>::Output
        where Self: Sized, Self: futures::Future
    {
        futures::executor::block_on(self)
    }
}

impl<F,T> Block for F
    where F: futures::Future<Output = T>
{}

之后,您只需按照以下步骤操作:
async fn example() -> i32 {
    42
}

fn main() {
    let s = example().wait();
    println!("{:?}", s);
}

请注意,这带有原始.wait()中Shepmaster的回答中解释的所有警告。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接