使用tokio的异步Rust互联未来的集合

3

问题陈述

我想在异步 Rust 中实现一个有向无环计算图框架,即一个互连的计算“节点”图,每个节点从前驱节点获取输入,并为后继节点产生输出。我计划通过生成一组Future,每个计算节点都有一个,同时允许未来之间的依赖关系来实现这个框架。然而,在使用async实现这个框架时,我已经陷入了编译器错误的困境。

最小示例

这是我想做的最小示例尝试。有一个单独的浮点数列表values,任务是创建一个新列表output,其中output[i] = values[i] + output[i - 2]。这是我的尝试:

use std::sync;

fn some_complicated_expensive_fn(val1: f32, val2: f32) -> f32 {
    val1 + val2
}

fn example_async(values: &Vec<f32>) -> Vec<f32> {
    let runtime = tokio::runtime::Runtime::new().unwrap();

    let join_handles = sync::Arc::new(sync::Mutex::new(Vec::<tokio::task::JoinHandle<f32>>::new()));
    for (i, value) in values.iter().enumerate() {
        let future = {
            let join_handles = join_handles.clone();
            async move {
                if i < 2 {
                    *value
                } else {
                    let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
                    some_complicated_expensive_fn(*value, prev_value)
                }
            }
        };
        join_handles.lock().unwrap().push(runtime.spawn(future));
    }
    join_handles
        .lock()
        .unwrap()
        .iter_mut()
        .map(|join_handle| runtime.block_on(join_handle).unwrap())
        .collect()
}

#[cfg(test)]
mod tests {
    #[test]
    fn test_example() {
        let values = vec![1., 2., 3., 4., 5., 6.];
        println!("{:?}", super::example_async(&values));
    }
}

我收到了有关未锁定的“Mutex”不能发送的错误:
error: future cannot be sent between threads safely
  --> sim/src/compsim/runtime.rs:23:51
   |
23 |         join_handles.lock().unwrap().push(runtime.spawn(future));
   |                                                   ^^^^^ future created by async block is not `Send`
   |
   = help: within `impl Future`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<f32>>>`
note: future is not `Send` as this value is used across an await
  --> sim/src/compsim/runtime.rs:18:38
   |
18 |                     let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
   |                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ first, await occurs here, with `join_handles.lock().unwrap()` maybe used later...
note: `join_handles.lock().unwrap()` is later dropped here
  --> sim/src/compsim/runtime.rs:18:88
   |
18 |                     let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
   |                                      ----------------------------                      ^
   |                                      |
   |                                      has type `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<f32>>>` which is not `Send`
help: consider moving this into a `let` binding to create a shorter lived borrow
  --> sim/src/compsim/runtime.rs:18:38
   |
18 |                     let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
   |                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

这很有道理,在Tokio文档中可以看到,您可以使用tokio::task::Mutex,但是,a)我不确定如何使用,b)我想知道是否有更好的整体方法。非常感谢您的帮助!谢谢。

1
只需浏览您的代码,就可以发现您没有进行任何异步操作。 对于这种情况,建议使用“rayon”。 - Chayim Friedman
并行化的意义何在,当每个值仍需等待前一个值时? - Chayim Friedman
2
这是一个理想化的问题版本,当我尝试实现主要问题陈述时遇到的问题,依赖图可能会更加复杂。此外,有两个独立的执行链,因此并行性是可能的:偶数条目和奇数条目。 - Josh Burkart
1个回答

1
编译器提示你无法在锁定 join_handle 的情况下跨越等待点,因为任务可能会在 .await 后被另一个线程接管,并且锁必须在同一线程中锁定和解锁。您可以通过使锁的生命周期更短来解决此问题,例如通过将每个句柄保存在 Option 中,在等待之前 取出 它。但是,然后您会遇到等待 JoinHandle消耗 它的问题 - 您将收到任务返回的值,并且会失去该句柄,因此无法将其返回到向量中。(这是 Rust 值具有单个所有者的结果,因此一旦句柄将值传递给您,它就不再拥有该值并且已变得无用。)
句柄基本上像生成任务结果的一次性通道。由于您需要在另一个地方使用结果,因此可以单独创建一个 一次性通道 的向量,以保留另一个结果副本,可以由需要它们的任务等待。
pub fn example_async(values: &[f32]) -> Vec<f32> {
    let runtime = tokio::runtime::Runtime::new().unwrap();

    let (txs, rxs): (Vec<_>, Vec<_>) = (0..values.len())
        .map(|_| {
            let (tx, rx) = tokio::sync::oneshot::channel();
            (Mutex::new(Some(tx)), Mutex::new(Some(rx)))
        })
        .unzip();
    let txs = Arc::new(txs);
    let rxs = Arc::new(rxs);

    let mut join_handles = vec![];
    for (i, value) in values.iter().copied().enumerate() {
        let txs = Arc::clone(&txs);
        let rxs = Arc::clone(&rxs);
        let future = async move {
            let result = if i < 2 {
                value
            } else {
                let prev_rx = rxs[i - 2].lock().unwrap().take().unwrap();
                let prev_value = prev_rx.await.unwrap();
                some_complicated_expensive_fn(value, prev_value)
            };
            let tx = txs[i].lock().unwrap().take().unwrap();
            tx.send(result).unwrap(); // here you'd use result.clone() for non-Copy result
            result
        };
        join_handles.push(runtime.spawn(future));
    }
    join_handles
        .into_iter()
        .map(|handle| runtime.block_on(handle).unwrap())
        .collect()
}

Playground


太棒了,这是一个很好的方法。 - Josh Burkart
1
@JoshBurkart 我已经编辑了答案,使用更高效的一次性通道,这应该足以满足这个用例。 - user4815162342

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接