如何在向量中连接所有期货而不像join_all函数那样在失败时取消连接?

13

我有一个由调用async函数创建的future的Vec。在将所有futures添加到向量之后,我想要等待整个集合,获取结果的列表或每个完成的回调。

我可以简单地循环或迭代遍历future向量,并在每个future上调用.await,这样可以正确处理错误并且不会使futures::future::join_all取消其他future,但是我相信有更符合习惯的方式来完成此任务。

我还想能够在完成时处理futures,因此如果我从前几个得到足够的信息,我可以取消剩余的未完成的futures并且不必等待它们并且丢弃其结果,无论出现错误与否。如果我按顺序迭代向量,则无法实现此目标。

我要寻找的是一个回调(闭包等),让我能够在结果到达时累积结果,以便我可以适当地处理错误或者从回调中确定我不需要其余的future并且取消它们。

我可以看出这会让借用检查器头痛:试图在异步引擎的回调中修改Vec中的future。

有许多Stack Overflow的问题和Reddit帖子解释了join_all如何在future列表上进行连接,但如果其中一个失败则取消其余内容,并且异步引擎可能会生成线程,也可能不会,如果它们这样做则是糟糕的设计。


1
很难回答你的问题,因为它没有包含一个 [MRE]。我们无法确定代码中存在哪些 crates(及其版本)、types、traits、fields等。如果可能的话,您可以在Rust Playground上尝试重现错误,否则可以在全新的Cargo项目中进行,然后[编辑]您的问题以包括额外的信息。这里有一些Rust特定的MRE提示,您可以使用它们来缩小您的原始代码以便在此处发布。谢谢! - Shepmaster
这意味着你仍处于研究阶段。另一个问题是不清楚你所说的“join_all”是哪个。就我所知,我找到的join_all是在futures crate中的一个,但由于future::join_all对future的项是不可知的,如果其中一个future产生错误,它不会取消现有的futures。 - E net4
1
我没有提供代码示例,因为我还没有。与之相比,“我有一个由调用异步函数创建的未来Vec”表明您有选择不共享的代码。 - Shepmaster
1
@E_net4willhelpyouout 一个旧版本的future::join_all在失败时会取消其他futures. - John Kugelman
也许我在阅读旧的文档。 - stu
显示剩余4条评论
4个回答

13
使用 futures::select_all
use futures::future; // 0.3.4

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn might_fail(fails: bool) -> Result<i32> {
    if fails {
        Ok(42)
    } else {
        Err("boom".into())
    }
}

async fn many() -> Result<i32> {
    let raw_futs = vec![might_fail(true), might_fail(false), might_fail(true)];
    let unpin_futs: Vec<_> = raw_futs.into_iter().map(Box::pin).collect();
    let mut futs = unpin_futs;

    let mut sum = 0;

    while !futs.is_empty() {
        match future::select_all(futs).await {
            (Ok(val), _index, remaining) => {
                sum += val;
                futs = remaining;
            }
            (Err(_e), _index, remaining) => {
                // Ignoring all errors
                futs = remaining;
            }
        }

        if sum > 42 {
            // Early exit
            return Ok(sum);
        }
    }

    Ok(sum)
}

此操作将轮询集合中的所有Future,返回第一个不处于挂起状态的Future及其索引以及其余未处理或未轮询的Futures。然后,您可以匹配Result并处理成功或失败的情况。

在循环内调用select_all。这使您能够从函数中提前退出。当您退出时,futs向量将被丢弃,而丢弃Future则会取消它。


1
现在这并不是必要的,因为自从join_all自futures 0.3版本以来就不会失败,并且符合OP的要求。 - dessalines

1

正如@kmdreko、@John Kugelman和@e-net4-the-comment-flagger在评论中提到的那样,使用futures crate版本0.3中的join_all。futures版本0.2中的join_all具有问题描述中所述的取消行为,但是futures crate版本0.3中的join_all没有。


0

为了快速/简便地解决问题,而不必操作 select_all。你可以使用 join_all,但是返回的不是 Result<T> 而是 Result<Result<T>> - 总是对顶层结果返回 Ok


-3

或者你可以使用join_all

来自文档的示例:

use futures::future::join_all;

async fn foo(i: u32) -> u32 { i }

let futures = vec![foo(1), foo(2), foo(3)];

assert_eq!(join_all(futures).await, [1, 2, 3]);

3
原始问题特别抱怨join_all,称其在第一个错误处停止。但是,在当前版本的函数中(甚至在发布时根据评论),这是不正确的,但您至少应该指出前提不正确。 - kmdreko

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接