如何使用tokio::join同时运行多个任务?

23

假设有一个存储在 Vec 中的未来(future),其长度由运行时决定,你需要并发地加入这些未来,那么该怎么做呢?

显然,根据tokio::join文档中的示例,手动指定Vec的每个可能的长度,如1、2、3等,并处理好相应情况即可。

extern crate tokio;

let v = Vec::new();
v.push(future_1);

// directly or indirectly you push many futures to the vector
 
v.push(future_N);

// to join these futures concurrently one possible way is 

if v.len() == 0 {}
if v.len() == 1 { join!(v.pop()); }
if v.len() == 2 { join!(v.pop(), v.pop() ); }
// ...

我也注意到文档中tokio::join!接受列表作为参数,当我使用以下语法时

tokio::join!(v);

或类似的东西

tokio::join![ v ] /  tokio::join![ v[..] ] / tokio::join![ v[..][..] ]

它就是不起作用。

现在出现的问题是,是否有更有效地加入这些未来的方法,还是我错过了一些文件中提到的东西?


tokio::join! 接受可变数量的异步表达式,而不是单个异步值的切片/向量。 - Masklinn
我在尝试自己解决这个问题时,误解了Tokio文档。我假设一个异步代码块(异步函数或异步表达式或其他)将被视为Rust中的futures。我的问题是如何以更清晰的方式并发地加入这些长度不同的“异步表达式”。 - bruceyuan
3个回答

24
你可以使用 futures::future::join_all 将你的未来对象集合“合并”为一个单一的未来对象,当所有的子未来对象都解决时,它才会解决。

谢谢。问题解决了。我没有找到对称的“join_any(…)”,它在其他地方吗?还是不存在...? - zertyz
找到了 https://docs.rs/tokio/latest/tokio/macro.select.html,但当第一个完成时它会取消其他分支... - zertyz

22

join_alltry_join_all以及来自同一crate futures的更加灵活的FuturesOrderedFuturesUnordered工具被执行为单一任务。如果组成的future很少同时准备好执行工作,则这可能是可以接受的,但是如果您想要利用多线程运行时的CPU并行性,则考虑将单个future作为单独的任务进行生成,然后等待任务完成。

Tokio 1.21.0或更高版本:JoinSet

最近的Tokio版本中,您可以使用JoinSet来获得最大的灵活性,包括中止所有任务的能力。当放弃JoinSet时,集合中的任务也会被中止。

use tokio::task::JoinSet;

let mut set = JoinSet::new();

for fut in v {
    set.spawn(fut);
}

while let Some(res) = set.join_next().await {
    let out = res?;
    // ...
}

旧的API

使用 tokio::spawn 生成任务,并等待连接句柄:

use futures::future;

// ...

let outputs = future::try_join_all(v.into_iter().map(tokio::spawn)).await?;

您还可以使用 FuturesOrderedFuturesUnordered 组合器以流的形式异步处理输出:

use futures::stream::FuturesUnordered;
use futures::prelude::*;

// ...

let mut completion_stream = v.into_iter()
    .map(tokio::spawn)
    .collect::<FuturesUnordered<_>>();
while let Some(res) = completion_stream.next().await {
    // ...    
}

使用这种方式等待任务的一个注意点是,当产生任务的未来对象(例如一个异步块),可能拥有返回的JoinHandle并在其被丢弃时,任务并不会被取消。需要使用JoinHandle::abort方法显式地取消任务。


7
一个完整的例子:
#[tokio::main]
async fn main() {
    let tasks = (0..5).map(|i| tokio::spawn(async move {
        sleep(Duration::from_secs(1)).await; // simulate some work
        i * 2
    })).collect::<FuturesUnordered<_>>();

    let result = futures::future::join_all(tasks).await;
    println!("{:?}", result); // [Ok(8), Ok(6), Ok(4), Ok(2), Ok(0)]
}

游乐场


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接