在Tokio运行时的上下文中,我如何在非异步方法内等待从异步方法调用的future?

24

我正在使用Tokio 1.1进行异步操作。我有一个带有#[tokio::main]asyncmain,因此我已经在运行时操作。

main调用一个非异步方法,我想在其中使用await等待一个future(具体来说,我正在从datafusion数据框中收集)。这个非异步方法有一个由trait规定的签名,返回一个结构体,而不是Future<Struct>。据我所知,我不能将其标记为异步。

如果我尝试调用df.collect().await;,编译器会报错,指出我调用await的方法不是async

如果我尝试像这样使用新运行时中的block_on

tokio::runtime::Builder::new_current_thread()
    .build()
    .unwrap()
    .block_on(df.collect());

我遇到了运行时恐慌:

无法在运行时内启动另一个运行时。这是因为函数(例如 block_on)尝试在当前线程被用于驱动异步任务时阻塞当前线程。

如果我尝试 futures::executor::block_on(df.collect()).unwrap();,我会得到一个新的运行时恐慌:

'not currently running on a Tokio 0.2.x runtime.'

这很奇怪,因为我正在使用 Tokio v1.1。

这比应该的要难。我正处于异步上下文中,并且感觉编译器应该知道这一点并允许我从方法中调用 .await - 唯一的代码路径会从 async 块内部调用此方法。我是否漏掉了一些简单的方法?


在同步函数内使用awaiting根本不起作用。你可以使用tokio::spawn_blocking来生成一个阻塞任务。 - Ibraheem Ahmed
@Shepmaster 我的非异步方法有一个由trait规定的签名,返回一个结构体,而不是Future<Struct>。据我所知,我不能将其标记为async。我会仔细看看其他问题,并在原始问题上进行更新,如果有区别的话。我想我尝试过thread::spawn,但不太记得为什么会有问题。 - growse
@Ibraheem,我不认为我看到了spawn_blocking,我会查一下并更新。谢谢! - growse
1
@IbraheemAhmed - spawn_blocking 似乎返回一个 JoinHandle,仍需要 await-ed - 除非我漏掉了什么? - growse
让我们在聊天中继续这个讨论 - Shepmaster
1个回答

13
在异步上下文中,我觉得编译器应该知道这一点,并允许我在方法内部调用.await,但无论是否在运行时上下文中,在同步函数内部基本上不可能使用await。await会被转换为yield points,并且异步函数将被转换为状态机来利用这些yield points执行异步计算。如果没有将函数标记为async,则无法进行此转换。如果我正确理解了你的问题,你有以下代码:
#[tokio::main]
async fn main() {
    let foo = Foo {};
    foo.bar()
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}
问题在于您不能从bar中等待df.collect,因为它没有被标记为async。如果您可以修改Trait的签名,那么您可以使用如何在Trait中定义异步方法?中提到的解决方法,使Trait::bar成为一个异步方法。 如果您无法更改Trait的签名,则会出现问题。异步函数永远不应花费太长时间而不达到.await。正如如何封装非异步代码中的阻塞I/O最佳方法中解释的那样,您可以在转换为非异步代码时使用spawn_blocking
#[tokio::main]
async fn main() {
    let foo = Foo {};
    tokio::task::spawn_blocking(move || foo.bar()).await
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}
现在你需要一种方法来执行df.collect,而不需要等待完成。你提到你尝试创建嵌套运行时来解决这个问题:
"If I try and block_on the future from a new runtime ... I get a panic"
然而,tokio不允许您创建嵌套的运行时。您可以创建一个新的独立运行时,如How can I create a Tokio runtime inside another Tokio runtime所述。但是,生成嵌套的运行时将效率低下。
相反,您可以获得对当前运行时的句柄:
let handle = Handle::current();

输入运行时上下文:

handle.enter();

然后使用futures::executor::block_on将未来任务运行至完成:

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        let handle = Handle::current();
        handle.enter();
        futures::executor::block_on(df.collect())
    }
}
进入Tokio运行时上下文将解决您先前遇到的错误: ``` 如果我尝试futures :: executor :: block_on(df.collect())。unwrap();,我会得到一个新的运行时panic “not currently running on a Tokio 0.2.x runtime” ``` 如果可以,请尽量避免这样做。最佳解决方案是将`Trait :: bar`标记为`async`并将`await`视为正常操作。包括上述提到的任何其他解决方案,都涉及阻塞当前线程直到给定的future完成。 Credit @AliceRyhl for the explanation

3
谢谢 - 我之前不知道这些都是可能的。最终,你是正确的 - 重构以实现异步操作是正确的做法。 - growse
很好的解释。我是 Rust 的新手,在我的程序基本上是单线程应用程序的情况下,我使用的第三方软件具有异步 API。当我想要为异步流创建一个包装器,使其与期望实现“Read”特性的代码兼容时,问题就出现了。使用 futures::executor::block_on(...) 的“hack”可以为我解决这个问题,但是您的解释让我觉得我应该尽量避免使用它? - mhvelplund
如果您的应用程序是同步的,但需要使用异步库,则使用某种形式的 block_on 是可以的。如果您的应用程序是异步的,则几乎从不使用阻塞当前线程是好的。您必须通过使用 spawn_blocking 或类似方法来小心地与阻塞代码进行交互。 - Ibraheem Ahmed
即使使用多线程运行时,您是否可以拥有多个 block_on/spawn_blocking 任务?我可能错了,但我感觉我有两个任务尝试 block_on(1个HTTP服务器,然后是该HTTP服务器中需要执行异步操作的1个路由),但它无法正常工作/我无法弄清楚。 - Brandon Ros

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接