问题陈述
我想在异步 Rust 中实现一个有向无环计算图框架,即一个互连的计算“节点”图,每个节点从前驱节点获取输入,并为后继节点产生输出。我计划通过生成一组Future
,每个计算节点都有一个,同时允许未来之间的依赖关系来实现这个框架。然而,在使用async
实现这个框架时,我已经陷入了编译器错误的困境。
最小示例
这是我想做的最小示例尝试。有一个单独的浮点数列表values
,任务是创建一个新列表output
,其中output[i] = values[i] + output[i - 2]
。这是我的尝试:
use std::sync;
fn some_complicated_expensive_fn(val1: f32, val2: f32) -> f32 {
val1 + val2
}
fn example_async(values: &Vec<f32>) -> Vec<f32> {
let runtime = tokio::runtime::Runtime::new().unwrap();
let join_handles = sync::Arc::new(sync::Mutex::new(Vec::<tokio::task::JoinHandle<f32>>::new()));
for (i, value) in values.iter().enumerate() {
let future = {
let join_handles = join_handles.clone();
async move {
if i < 2 {
*value
} else {
let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
some_complicated_expensive_fn(*value, prev_value)
}
}
};
join_handles.lock().unwrap().push(runtime.spawn(future));
}
join_handles
.lock()
.unwrap()
.iter_mut()
.map(|join_handle| runtime.block_on(join_handle).unwrap())
.collect()
}
#[cfg(test)]
mod tests {
#[test]
fn test_example() {
let values = vec![1., 2., 3., 4., 5., 6.];
println!("{:?}", super::example_async(&values));
}
}
我收到了有关未锁定的“Mutex”不能发送的错误:
error: future cannot be sent between threads safely
--> sim/src/compsim/runtime.rs:23:51
|
23 | join_handles.lock().unwrap().push(runtime.spawn(future));
| ^^^^^ future created by async block is not `Send`
|
= help: within `impl Future`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<f32>>>`
note: future is not `Send` as this value is used across an await
--> sim/src/compsim/runtime.rs:18:38
|
18 | let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ first, await occurs here, with `join_handles.lock().unwrap()` maybe used later...
note: `join_handles.lock().unwrap()` is later dropped here
--> sim/src/compsim/runtime.rs:18:88
|
18 | let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
| ---------------------------- ^
| |
| has type `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<f32>>>` which is not `Send`
help: consider moving this into a `let` binding to create a shorter lived borrow
--> sim/src/compsim/runtime.rs:18:38
|
18 | let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
这很有道理,在Tokio文档中可以看到,您可以使用
tokio::task::Mutex
,但是,a)我不确定如何使用,b)我想知道是否有更好的整体方法。非常感谢您的帮助!谢谢。