在使用tokio::time::timeout之后再次等待未来

5

背景:
我有一个使用tokio::process在tokio运行时中生成具有句柄的子进程的过程。

它还负责在杀死子进程后释放资源,并根据文档(std::process::Child, tokio::process::Child),这需要父进程等待并通过wait()(或在tokio中使用await)等待该进程。

并非所有进程对SIGINTSIGTERM的响应都相同,因此在发送SIGKILL之前,我想给子进程一些时间来终止。

期望解决方案:

    pub async fn kill(self) {
        // Close input
        std::mem::drop(self.stdin);

        // Send gracefull signal
        let pid = nix::unistd::Pid::from_raw(self.process.id() as nix::libc::pid_t);
        nix::sys::signal::kill(pid, nix::sys::signal::SIGINT);

        // Give the process time to die gracefully
        if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), self.process).await
        {
            // Kill forcefully
            nix::sys::signal::kill(pid, nix::sys::signal::SIGKILL);
            self.process.await;
        }
    }

然而,会出现以下错误:

error[E0382]: use of moved value: `self.process`
  --> src/bin/multi/process.rs:46:13
   |
42 |         if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), self.process).await
   |                                                                                 ------------ value moved here
...
46 |             self.process.await;
   |             ^^^^^^^^^^^^ value used here after move
   |
   = note: move occurs because `self.process` has type `tokio::process::Child`, which does not implement the `Copy` trait

如果我遵循并删除 self.process.await,我会发现子进程仍然在 ps 中占用资源。

问题:
如何等待一段时间并执行操作,并在时间到期后再次 await?

注:
我通过设置一个始终在两秒钟后发送 SIGKILL 的 tokio 计时器来解决了我的直接问题,并在底部只有一个 self.process.await。但是,这种解决方案并不理想,因为在计时器运行时可能会生成另一个具有相同 PID 的进程。

编辑:
添加一个 最小可重现示例 (playground)

async fn delay() {
    for _ in 0..6 {
        tokio::time::delay_for(std::time::Duration::from_millis(500)).await;
        println!("Ping!");
    }
}

async fn runner() {
    let delayer = delay();
    if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), delayer).await {
        println!("Taking more than two seconds");
        delayer.await;
    }
}
2个回答

7

您需要传递一个可变引用。但是,为了使其可变引用实现Future,您首先需要固定未来。

pin_mut 是一个很好的帮手,可以从futures crate中重新导出。

use futures::pin_mut;

async fn delay() {
    for _ in 0..6 {
        tokio::time::delay_for(std::time::Duration::from_millis(500)).await;
        println!("Ping!");
    }
}

async fn runner() {
    let delayer = delay();
    pin_mut!(delayer);
    if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), &mut delayer).await {
        println!("Taking more than two seconds");
        delayer.await;
    }
}

1
这很直截了当! :) 另外,我能够避免带入整个板条箱,因为 pin_mut 是一个相当简单的宏。我添加了链接到你的答案,这样其他人就可以更容易地找到板条箱(和宏的实现)。 - mr.celo
谢谢,这对我有帮助,但你能解释一下为什么这个方法有效吗?timeout的文档说:“如果未来在持续时间之前完成,则返回完成的值。否则,返回错误并取消未来”。那么如果它被pin_mut了,为什么它没有被取消呢? - user511824

1
一个通用的包装器可以在这些情况下重复使用:
use std::time::Duration;
use futures::pin_mut;
use tokio::time::timeout;

pub async fn on_slow<T, S: FnOnce()>(
    future: impl Future<Output = T>,
    duration: Duration,
    fn_on_slow: S,
) -> T {
    pin_mut!(future);
    if let Ok(result) = timeout(duration, &mut future).await {
        result
    } else {
        fn_on_slow();
        future.await
    }
}

// usage
async fn runner() {
  on_slow(
    delay(), // some long running future WITHOUT .await
    Duration::from_secs(2),
    || println!("Taking more than two seconds")
  ).await
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接