如何接受异步函数作为参数?

34
我希望能够复制闭包/函数作为参数传递的行为和人机工程学,就像map一样:iterator.map(|x| ...)
我注意到一些库代码允许传递异步功能,但这种方法不允许我传递参数:
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
    F: Future<Output = T> + Send + 'static,
    T: Send + 'static,

spawn(async { foo().await });

I'm hoping to do one of the following:

iterator.map(async |x| {...});

async fn a(x: _) {}
iterator.map(a)

1
我认为你需要使用Stream才能实现此目的,可以看一下crates futures。 - Mario Santini
@MarioSantini https://docs.rs/async-std/1.5.0/async_std/stream/trait.Stream.html#method.any 它似乎并不明显地表明它们所接受的函数是异步的,而最终结果似乎是异步的。 - Skarlett
1
我的意思只是:如果你需要异步迭代一个集合,你可能需要使用流(stream),这是一种可以进行迭代的特性。 - Mario Santini
3个回答

59

async函数实际上被解糖为返回impl Future。一旦你知道了这一点,就只需要结合现有的Rust技术来接受一个函数/闭包,从而得到一个带有两个泛型类型的函数:

use std::future::Future;

async fn example<F, Fut>(f: F)
where
    F: FnOnce(i32, i32) -> Fut,
    Fut: Future<Output = bool>,
{
    f(1, 2).await;
}

这也可以写成

use std::future::Future;

async fn example<Fut>(f: impl FnOnce(i32, i32) -> Fut)
where
    Fut: Future<Output = bool>,
{
    f(1, 2).await;
}

如果在 Dart 中省略了 .await,则会导致 -> Future<Output = Future<Output = bool>> 的结果被自动展平为 Future<Output = bool>,因此 .await 是隐含的。到目前为止,我还没有找到 Rust 是否也是这样做的内容。 - Günter Zöchbauer
1
@GünterZöchbauer 我并不知道有这样的情况。在Rust中,这种隐式的事情对我来说有点过于"松散"了,所以如果真的加入了,我会感到惊讶的。 - Shepmaster

9
async |...| expr闭包语法现在可以在夜间频道上使用,启用特性async_closure
#![feature(async_closure)]

use futures::future;
use futures::Future;
use tokio;

pub struct Bar;

impl Bar {
    pub fn map<F, T>(&self, f: F)
    where
        F: Fn(i32) -> T,
        T: Future<Output = Result<i32, i32>> + Send + 'static,
    {
        tokio::spawn(f(1));
    }
}

async fn foo(x: i32) -> Result<i32, i32> {
    println!("running foo");
    future::ok::<i32, i32>(x).await
}

#[tokio::main]
async fn main() {
    let bar = Bar;
    let x = 1;

    bar.map(foo);

    bar.map(async move |x| {
        println!("hello from async closure.");
        future::ok::<i32, i32>(x).await
    });
}

请查看2394-async_await RFC了解更多细节。


1
虽然已经太晚了,但我希望这段代码能帮助到其他人。在我的情况下,我有一些预定义的函数,我无法更改它们(func0,func1,...),我需要按顺序运行它们直到最后一个。如果其中一个失败,整个操作必须终止。因此,我创建了一个函数数组,然后在循环中执行每个函数。
use std::future::Future;
use std::pin::Pin;    

type FuncResult = dyn Future<Output=Result<(), String>>;

async fn apply_funcs(mut start: usize, end: usize) {
   let funcs = [func0];
   let mut result = true;
   while start < end && result {
      result = func_wrapper(funcs[start]).await;
      start += 1;
   }
}

async fn func_wrapper(f: impl FnOnce() -> FuncResult) -> bool {
   match f().await {
      Ok(_) => { println!("ok"); }
      Err(e) => {
         println!("{e}");
         return false;
      }
   }
   true
}

async fn func0() -> Result<(), String> {
   todo!()
}

这段代码失败了:期望是dyn Future,但找到的是future 我找到了两种解决方法:
1:通过静态分派,即通过泛型:
async fn apply_funcs(mut start: usize, end: usize) {
   let funcs = [func0];
   let mut result = true;
   while start < end && result {
      result = func_wrapper(funcs[start]).await;
      start += 1;
   }
}

async fn func_wrapper<Fut>(f: impl FnOnce() -> Fut) -> bool
   where Fut: Future<Output=Result<(), String>> {
   match f().await {
      Ok(_) => { println!("ok"); }
      Err(e) => {
         println!("{e}");
         return false;
      }
   }
   true
}

async fn func0() -> Result<(), String> {
   todo!()
}

2: 通过定义另一个闭包来实现动态调度:
type FuncResult = Pin<Box<dyn Future<Output=Result<(), String>>>>;

async fn apply_funcs(mut start: usize, end: usize) {
   let funcs = [
      || -> FuncResult {
         Box::pin(func0())// no await here
      }
   ];
   let mut result = true;
   while start < end && result {
      result = func_wrapper(funcs[start]).await;
      start += 1;
   }
}

async fn func_wrapper(f: impl FnOnce() -> FuncResult) -> bool {
   match f().await {
      Ok(_) => { println!("ok"); }
      Err(e) => {
         println!("{e}");
         return false;
      }
   }
   true
}

async fn func0() -> Result<(), String> {
   todo!()
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接