如何在Rust中等待异步函数调用列表?

23

我有一个async函数列表,我想要并发执行它们,然后等待它们全部完成。目前我拥有的可工作代码是:

async fn start_consumers(&self) {
    for consumer in &self.consumers {
        consumer.consume().await;
    }
}

这并不完全准确,因为函数是串行执行的。我正在寻找像join!这样的东西,但它适用于动态向量。有了它,我应该能够编写类似以下的内容:

async fn start_consumers(&self) {
    let mut v = Vec::new();
    for consumer in &self.consumers {
        consumer.consume();
    }
    join!(v);
}

目前join!只支持元组。我正在寻找替代方法。类似于JavaScript中的Promise.all()的东西。


虽然tokio::task::JoinSet已经存在,但它仍未稳定化(https://github.com/tokio-rs/tokio/issues/4535)。 - dbr
5个回答

15

futures crate中有一个join_all函数,可以等待一组futures:

use futures::future::join_all;

async fn start_consumers(&self) {
    let mut v = Vec::new();
    for consumer in &self.consumers {
        v.push(consumer.consume());
    }
    join_all(v).await;
 }

这个方法存在的箱子版本已经被撤下了。这个方法已经不再起作用了。 - undefined

6

我也在同一天问了类似的问题,但我的情况是我有一个包装在 Future 中的 Result。因此,我不得不使用 try_join_all 而不是 join_all


3
join_all/try_join_all可以做到这一点,但输出是一个收集未来结果的Vec。在上述修改后的示例中,组合的Future生成一个Vec<()>,它不会导致分配,即使扩展此向量的操作也应在发布版本中优化为无操作。
即使在需要输出的情况下,按照异步流处理它们以及在等待所有数据被收集之前进行处理也是值得的。为此,您可以使用FuturesOrderedFuturesUnordered,具体取决于您是否关心维护原始Futures顺序在流输出中产生的影响,还是更喜欢按完成顺序接收输出。FuturesUnordered不需要缓冲结果,因此可能比由相同Futures组成的快速完成。


0
最简单的方法是使用一个mpsc通道,在这个通道中,你不需要发送消息,只需要等待通道被关闭即可,当每个发送者都被删除时,通道会自动关闭。
在这里可以看到使用tokio的示例here

-2
如果您想要从一个普通的同步函数中 await/join 所有的 Future 且不关心它们的结果,您可以这样写:
futures::executor::block_on(async{futures::join!(future1, future2, future3, ...)});

您可以使用宏 block_all 来更加方便地使用:
macro_rules! block_all {
    ($($future:expr),*) => {{
        futures::executor::block_on(async{futures::join!($($future),*)})   
    }};
}

使用方法:

block_all!(future1, future2, ...);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接