tokio::select! 但针对Future的Vec

15
我有一个包含多个future的Vec,我想并发地执行它们(但不一定是同时)。基本上,我正在寻找一种类似于tokio::select!select函数,但接受一个future集合;或者,一种类似于futures::join_all的函数,但在第一个future完成后返回。
另一个要求是,一旦一个future完成,我可能想要将一个新的future添加到Vec中。
使用这样的函数,我的代码大致如下:
use std::future::Future;
use std::time::Duration;
use tokio::time::sleep;

async fn wait(millis: u64) -> u64 {
    sleep(Duration::from_millis(millis)).await;
    millis
}

// This pseudo-implementation simply removes the last
// future and awaits it. I'm looking for something that
// instead polls all futures until one is finished, then
// removes that future from the Vec and returns it.
async fn select<F, O>(futures: &mut Vec<F>) -> O
where
    F: Future<Output=O>
{
    let future = futures.pop().unwrap();
    future.await
}

#[tokio::main]
async fn main() {
    let mut futures = vec![
        wait(500),
        wait(300),
        wait(100),
        wait(200),
    ];
    while !futures.is_empty() {
        let finished = select(&mut futures).await;
        println!("Waited {}ms", finished);
        if some_condition() {
            futures.push(wait(200));
        }
    }
}


也许可以使用流,即 stream::iter(futures).try_for_each_concurrentfuture::try_join_all - Martin Gallagher
@MartinGallagher 这是一个有趣的想法 - 我已经发布了一个原型作为答案,但目前还不够优雅。 - Florian Brucker
2个回答

19

这正是futures::stream::FuturesUnordered的用途(我是通过查看StreamExt::for_each_concurrent的源代码发现的):

use futures::{stream::FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::{sleep, Instant};

async fn wait(millis: u64) -> u64 {
    sleep(Duration::from_millis(millis)).await;
    millis
}

#[tokio::main]
async fn main() {
    let mut futures = FuturesUnordered::new();
    futures.push(wait(500));
    futures.push(wait(300));
    futures.push(wait(100));
    futures.push(wait(200));
    
    let start_time = Instant::now();

    let mut num_added = 0;
    while let Some(wait_time) = futures.next().await {
        println!("Waited {}ms", wait_time);
        if num_added < 3 {
            num_added += 1;
            futures.push(wait(200));
        }
    }
    
    println!("Completed all work in {}ms", start_time.elapsed().as_millis());
}

playground

如果您正在使用Tokio,请注意:正如@Bryan Larsen在一条评论中所指出的那样,将FuturesUnordered与Tokio结合使用可能会导致性能问题的风险。本文提供了更多细节,并表示该问题应该在futures crate的最新版本(0.3.19及更高版本)中得以解决。然而,使用Tokio的用户最好使用Tokio的JoinSet。上述示例则如下所示:

use std::time::Duration;
use tokio::task::JoinSet;
use tokio::time::{sleep, Instant};

async fn wait(millis: u64) -> u64 {
    sleep(Duration::from_millis(millis)).await;
    millis
}

#[tokio::main]
async fn main() {
    let mut futures = JoinSet::new();
    futures.spawn(wait(500));
    futures.spawn(wait(300));
    futures.spawn(wait(100));
    futures.spawn(wait(200));

    let start_time = Instant::now();

    let mut num_added = 0;
    while let Some(result) = futures.join_next().await {
        let wait_time = result.unwrap();
        println!("Waited {}ms", wait_time);
        if num_added < 3 {
            num_added += 1;
            futures.spawn(wait(200));
        }
    }

    println!(
        "Completed all work in {}ms",
        start_time.elapsed().as_millis()
    );
}

playground

1
谢谢你的回答。在tokio中有类似的东西吗? - Charlie 木匠
4
@Charlie木匠,Tokio在这里能带来什么好处?您可以在Tokio运行时中使用“futures”crate而不会遇到任何问题 ;) - Nicolas Dusart
你帮我节省了很多时间,非常感谢你! - Tadeo Hepperle
1
FuturesUnordered在与Tokio结合使用时存在很多风险。如果您正在使用Tokio,建议使用JoinSet。https://news.ycombinator.com/item?id=29911141 - Bryan Larsen
@BryanLarsen 谢谢你的信息!我已经扩展了我的回答,包括对JoinSet的解释。 - Florian Brucker

2
这是一个基于流和StreamExt::for_each_concurrent的工作原型,正如马丁·加拉格在评论中建议的那样。
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::sleep;

use futures::stream::{self, StreamExt};
use futures::{channel::mpsc, sink::SinkExt};

async fn wait(millis: u64) -> u64 {
    sleep(Duration::from_millis(millis)).await;
    millis
}

#[tokio::main]
async fn main() {
    let (mut sink, futures_stream) = mpsc::unbounded();

    let start_futures = vec![wait(500), wait(300), wait(100), wait(200)];

    let num_futures = RwLock::new(start_futures.len());

    sink.send_all(&mut stream::iter(start_futures.into_iter().map(Ok)))
        .await
        .unwrap();

    let sink_lock = RwLock::new(sink);

    futures_stream
        .for_each_concurrent(None, |fut| async {
            let wait_time = fut.await;
            println!("Waited {}", wait_time);
            if some_condition() {
                println!("Adding new future");
                let mut sink = sink_lock.write().await;
                sink.send(wait(100)).await.unwrap();
            } else {
                let mut num_futures = num_futures.write().await;
                *num_futures -= 1;
            }
            let num_futures = num_futures.read().await;
            if *num_futures <= 0 {
                // Close the sink to exit the for_each_concurrent
                sink_lock.write().await.close().await.unwrap();
            }
        })
        .await;
}

虽然这种方法可行,但它的缺点是我们需要维护一个单独的剩余未来计数器,以便我们可以关闭 sink -- 没有 Vec 的未来,我们可以检查它是否为空。关闭 sink 需要另一个锁。
考虑到我对 Rust 还比较新,如果能让这种方法更优雅一些,我也不会感到惊讶。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接