如何在循环中生成异步方法?

15

我有一个对象向量,其中包含一个 resolve() 方法,该方法使用 reqwest 查询外部 Web API。在对每个对象调用 resolve() 方法后,我想打印出每个请求的结果。

这是我的半异步代码,它可以编译和工作(但实际上并不是完全异步):

for mut item in items {
    item.resolve().await;

    item.print_result();
}

我尝试使用tokio::join!来生成所有异步调用并等待它们完成,但我可能做错了什么:

tokio::join!(items.iter_mut().for_each(|item| item.resolve()));

这是我遇到的错误:

error[E0308]: mismatched types
  --> src\main.rs:25:51
   |
25 |     tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
   |                                                   ^^^^^^^^^^^^^^ expected `()`, found opaque type
   | 
  ::: src\redirect_definition.rs:32:37
   |
32 |     pub async fn resolve(&mut self) {
   |                                     - the `Output` of this `async fn`'s found opaque type
   |
   = note: expected unit type `()`
            found opaque type `impl std::future::Future`

如何同时调用所有实例的resolve()方法?


这段代码反映了答案-现在我正在处理一些我不太理解的借用检查器错误-我应该使用'static注释我的某些变量吗?

let mut items = get_from_csv(path);

let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();

for task in tasks {
    task.await;
}

for item in items {
    item.print_result();
}
error[E0597]: `items` does not live long enough
  --> src\main.rs:18:25
   |
18 |       let tasks: Vec<_> = items
   |                           -^^^^
   |                           |
   |  _________________________borrowed value does not live long enough
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
31 |   }
   |   - `items` dropped here while still borrowed

error[E0505]: cannot move out of `items` because it is borrowed
  --> src\main.rs:27:17
   |
18 |       let tasks: Vec<_> = items
   |                           -----
   |                           |
   |  _________________________borrow of `items` occurs here
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
27 |       for item in items {
   |                   ^^^^^ move out of `items` occurs here

for_each旨在原地执行,不返回任何内容。您想要的是将每个项目映射到调用resolve以产生Future,然后将它们收集到向量中。从那里开始,您需要等待所有任务完成。 - Alexey Larionov
然而,最后一部分并不容易,使用join可以实现并发,但不一定是并行的。可能会在这里找到一些解决方法。 - Alexey Larionov
很难回答你的问题,因为它没有包含一个 [MRE]。我们无法确定代码中存在哪些 crates(及其版本)、types、traits、fields等。如果可能的话,您可以在Rust Playground上尝试重现错误,否则可以在全新的Cargo项目中进行,然后[编辑]您的问题以包括额外的信息。这里有一些Rust特定的MRE提示,您可以使用它们来缩小您的原始代码以便在此处发布。谢谢! - Shepmaster
1个回答

16

既然您想要以并行方式等待未来结果,那么可以将它们spawn成单独的任务并行运行。由于它们彼此独立以及与生成它们的线程无关,因此您可以以任何顺序等待它们的处理。

理想情况下,您应该编写类似以下代码:

// spawn tasks that run in parallel
let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();
// now await them to get the resolve's to complete
for task in tasks {
    task.await.unwrap();
}
// and we're done
for item in &items {
    item.print_result();
}

但是这将被借用检查器拒绝,因为item.resolve()返回的future持有对item的借用引用。该引用被传递给tokio::spawn(),后者将其移交给另一个线程,编译器无法证明item将超过该线程的生命周期。(当您想将对本地数据的引用发送到线程时,也会遇到同样的问题。)
有几种可能的解决方案; 我认为最优雅的一种是将项目移动到传递给tokio::spawn()的异步闭包中,并在任务完成后让任务将它们交还给您。基本上,您消耗items向量以创建任务,并立即从等待的结果中重新构建它:
// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
    .into_iter()
    .map(|mut item| {
        tokio::spawn(async {
            item.resolve().await;
            item
        })
    })
    .collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
    items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
    item.print_result();
}

playground中运行可执行代码。
请注意,futures包含一个类似于您所需的join_all函数,但它会轮询单个futures而不确保它们并行运行。我们可以编写一个通用的join_parallel函数,它使用join_all,同时使用tokio::spawn来实现并行执行:
async fn join_parallel<T: Send + 'static>(
    futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
    let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
    // unwrap the Result because it is introduced by tokio::spawn()
    // and isn't something our caller can handle
    futures::future::join_all(tasks)
        .await
        .into_iter()
        .map(Result::unwrap)
        .collect()
}

使用这个函数,回答问题所需的代码就简化为:
let items = join_parallel(items.into_iter().map(|mut item| async {
    item.resolve().await;
    item
})).await;
for item in &items {
    item.print_result();
}

再次,在playground中运行的可执行代码。


1
我并不特别关心结果,因为resolve()方法不返回任何东西,而是改变了对象内部的值。我正在尝试使用spawn,但由于我需要可变对象,所以在借用检查器方面存在一些问题。 - Djent
1
@Djent 只要在所有句柄创建后等待它们,您就可以安全地忽略结果(请参见编辑后的答案)。您能否编辑问题以包括借用检查器错误? - user4815162342
1
我已更新答案。我还是Rust的新手,不太理解借用检查器的所有内容(欢迎推荐书外的好的知识来源)。 - Djent
1
@Djent 尝试使用 for item in &itemsfor item in items.iter() 来打印它们。如此编写,for 循环正在消耗 items,而这些元素仍被任务借用。 - user4815162342
2
谢谢您的耐心等待。现在我更明白发生了什么。这个解决方案非常聪明,我喜欢匿名函数处理resolve()方法的返回类型,因此我不必更改它。 - Djent
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接