如何在tokio::select!中等待一个Future而不移动它?

3

我正在尝试使用Rust的tokio停止异步TCP连接和数据包读取。我已经编写了一种使用channelselect在CTRL+C或超时事件上停止循环的方法,但是在循环中select会导致移动并且无法编译。

use std::time::Duration;
use tokio;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let (src, dst) = oneshot::channel();
    tokio::spawn(async {
        tokio::select!{
            _ = tokio::signal::ctrl_c() => (),
            _ = tokio::time::sleep(Duration::from_secs(10)) => ()
        }
    });

    let _ = connect(dst);
    eprintln!("progoram finished");
    Ok(())
}

async fn connect(shutdown: oneshot::Receiver<()>) -> Option<tokio::net::TcpStream> {
    loop {
        tokio::select! {
            biased;
            _ = shutdown => return None,
            r = tokio::net::TcpStream::connect("127.0.0.1:80") => {
                match r {
                    Ok(stream) => return Some(stream),
                    Err(err) => {
                        eprintln!("ERROR: {:?}", err);
                        tokio::time::sleep(Duration::from_secs(2)).await;
                    }
                }
            }
        }
    }
}

error[E0382]: use of moved value: `shutdown`
  --> src/main.rs:23:17
   |
20 | async fn connect(shutdown: oneshot::Receiver<()>) -> Option<tokio::net::TcpStream> {
   |                  -------- move occurs because `shutdown` has type `tokio::sync::oneshot::Receiver<()>`, which does not implement the `Copy` trait
...
23 |             _ = shutdown => return None,
   |                 ^^^^^^^^ value moved here, in previous iteration of loop

For more information about this error, try `rustc --explain E0382`.

我做了一些更改,但无法解决错误。
  • _ = &shutdown => return None → 特质Future未为&tokio::sync::oneshot::Receiver<()>实现。是的,没错。
  • _ = shutdown.into_future() => return None,使用use std::future::IntoFuture; → 不稳定的库特性。

如何编写一种从外部优雅地停止这种异步重试循环的方法?


1
尝试从 &mut shutdown 中进行选择。我在stackoverflow上找不到重复的内容,但是有人已经在这里回答了。 - Caesar
1
正如@Caesar所指出的那样,提供了&mut T的全面实现,如此所示在这里。值得注意的是,&mut T是不同于&TT的类型,如果没有被全面实现覆盖,每个类型都需要实现一个特征。 - Henry Gomersall
1个回答

4
在使用 select!时添加&mut将不会导致移动。 这也在select!教程的Resuming an async operation中提到。
use std::time::Duration;
use tokio;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let (src, dst) = oneshot::channel();
    tokio::spawn(async {
        tokio::select!{
            _ = tokio::signal::ctrl_c() => (),
            _ = tokio::time::sleep(Duration::from_secs(5)) => ()
        }
        eprintln!("interrupting");
        src.send(()).unwrap();
    });

    let _ = connect(dst).await;
    eprintln!("progoram finished");
    Ok(())
}

async fn connect(mut shutdown: oneshot::Receiver<()>) -> Option<tokio::net::TcpStream> {
    loop {
        tokio::select! {
            biased;
            _ = &mut shutdown => return None,
            r = tokio::net::TcpStream::connect("127.0.0.1:80") => {
                match r {
                    Ok(stream) => return Some(stream),
                    Err(err) => {
                        eprintln!("ERROR: {:?}", err);
                        tokio::time::sleep(Duration::from_secs(1)).await;
                    }
                }
            }
        }
    }
}

上述代码可被中断,在5秒后停止执行。

ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
interrupting
progoram finished

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接