使用Tokio生成非静态future

7

我有一个异步方法,它应该并行执行一些futures,并且只有在所有futures完成后才返回。然而,它通过引用传递了一些数据,这些数据的生命周期不如'static长(它会在主方法某个时刻被丢弃)。从概念上讲,它类似于这个(Playground):

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in array {
        let task = spawn(do_sth(i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

#[tokio::main]
async fn main() {
    parallel_stuff(&[3, 1, 4, 2]);
}

现在,tokio希望传递给 spawn 的 futures 具有'static 生命周期的有效性,因为我可以在不停止 future 的情况下放弃句柄。这意味着我的上面的示例会产生此错误消息:

error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/main.rs:12:25
   |
12 | async fn parallel_stuff(array: &[u64]) {
   |                         ^^^^^  ------ this data with an anonymous lifetime `'_`...
   |                         |
   |                         ...is captured here...
...
15 |         let task = spawn(do_sth(i));
   |                    ----- ...and is required to live as long as `'static` here

所以我的问题是:如何生成仅对当前上下文有效的未来,并等待它们全部完成?

1
这个回答解决了你的问题吗?在tokio中生成具有非静态生命周期的任务 - Ibraheem Ahmed
为什么不直接复制u64s呢?在这里根本不需要使用引用。 - pretzelhammer
相关:async_scoped - Ibraheem Ahmed
另外,传递给 spawn 的内容不需要具有 'static 生命周期,它们只需要由 'static 生命周期限定即可。这是一种常见的Rust生命周期误解 - pretzelhammer
由于在这种情况下,Tokio 0.2和0.3的答案是相同的,因此修改您的问题以适用于Tokio的两个版本是有意义的。 - Alice Ryhl
显示剩余3条评论
3个回答

15

在异步 Rust 中无法生成非'static未来。这是因为任何异步函数都可能随时取消,因此无法保证调用者确实超越了生成的任务。

虽然有各种各样的包允许范围内生成异步任务,但是这些包无法从异步代码中使用。它们所允许的是从非异步代码生成作用域内的异步任务。这不会违反上述问题,因为生成它们的非异步代码不能随时取消,因为它不是异步的。

一般有两种方法:

  1. 使用Arc而不是普通引用来生成'static任务。
  2. 使用 futures 包中的并发原语而不是生成。

通常要生成静态任务并使用Arc,您必须拥有相关的值的所有权。这意味着,由于您的函数通过引用接受了参数,因此如果没有克隆数据,您将无法使用此技术。

async fn do_sth(with: Arc<[u64]>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &[u64]) {
    // Make a clone of the data so we can shared it across tasks.
    let shared: Arc<[u64]> = Arc::from(array);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

请注意,如果您有对数据的可变引用,并且该数据为Sized,即不是切片,则可以暂时拥有该数据的所有权。

async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &mut Vec<u64>) {
    // Swap the array with an empty one to temporarily take ownership.
    let vec = std::mem::take(array);
    let shared = Arc::new(vec);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
    
    // Put back the vector where we took it from.
    // This works because there is only one Arc left.
    *array = Arc::try_unwrap(shared).unwrap();
}

另一个选择是使用来自futures crate的并发原语。它们具有使用非'static数据的优点,但缺点是任务将无法同时在多个线程上运行。

对于许多工作流程而言,这是完全可以接受的,因为异步代码应该大部分时间都在等待IO。

一种方法是使用FuturesUnordered。这是一个特殊的集合,可以存储许多不同的futures,并且它有一个next函数,可以同时运行所有futures,并在其中任何一个完成时返回。(只有在导入了StreamExt时才可用next函数)

您可以像这样使用它:

use futures::stream::{FuturesUnordered, StreamExt};

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks = FuturesUnordered::new();
    for i in array {
        let task = do_sth(i);
        tasks.push(task);
    }
    // This loop runs everything concurrently, and waits until they have
    // all finished.
    while let Some(()) = tasks.next().await { }
}

注意:FuturesUnordered必须在共享值之后定义。否则,您将会因为它们以错误的顺序被丢弃而得到一个借用错误。


另一种方法是使用Stream。使用流,您可以使用buffer_unordered。这是一个内部使用FuturesUnordered的实用工具。

use futures::stream::StreamExt;

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    // Create a stream going through the array.
    futures::stream::iter(array)
    // For each item in the stream, create a future.
        .map(|i| do_sth(i))
    // Run at most 10 of the futures concurrently.
        .buffer_unordered(10)
    // Since Streams are lazy, we must use for_each or collect to run them.
    // Here we use for_each and do nothing with the return value from do_sth.
        .for_each(|()| async {})
        .await;
}

请注意,在这两种情况下,导入StreamExt都很重要,因为它提供了一些在没有导入扩展特性的流上不可用的方法。


1

目前现有的答案可以概括为:

只要将非静态 future 限制在与调用者相同的线程中运行,就可以“生成”它。

这让我感到不满意。至少从表面上看,似乎应该可以完全生成一个作用域限定的 future,就像可以生成作用域限定的线程一样。原来 tokio 在 结构化并发 的名义下探索了这个想法。不幸的是,他们未能真正使其工作,主要是因为(如果我理解正确的话)目前无法以非阻塞和符合惯用方式强制执行作用域。这条评论对此进行了更详细的解释。


0

在使用线程进行并行编程的代码中,可以通过使用transmute扩展生命周期来避免复制。以下是一个示例:

fn main() {
    let now = std::time::Instant::now();
    let string = format!("{now:?}");
    println!(
        "{now:?} has length {}",
        parallel_len(&[&string, &string]) / 2
    );
}

fn parallel_len(input: &[&str]) -> usize {
    // SAFETY: this variable needs to be static, because it is passed into a thread,
    // but the thread does not live longer than this function, because we wait for
    // it to finish by calling `join` on it.
    let input: &[&'static str] = unsafe { std::mem::transmute(input) };
    let mut threads = vec![];
    for txt in input {
        threads.push(std::thread::spawn(|| txt.len()));
    }
    threads.into_iter().map(|t| t.join().unwrap()).sum()
}

看起来这对于异步代码也应该是可行的,但我不太确定。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接