如何测试绑定到tokio TcpStream的未来?

7

我有一个将TCP流用LinesCodec封装在Framed中的未来计划。

当我试图在测试中进行封装时,大约20%的时间会发生未来阻塞,但由于没有任何东西在监听我要连接的套接字,我希望始终获取错误:

thread 'tokio-runtime-worker-0' panicked at 'error: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }', src/lib.rs:35:24 note: Run with 'RUST_BACKTRACE=1' for a backtrace.

这是我使用过的测试代码:

#[macro_use(try_ready)]
extern crate futures; // 0.1.24
extern crate tokio;   // 0.1.8

use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::TcpStream;
use tokio::prelude::*;

struct MyFuture {
    addr: SocketAddr,
}

impl Future for MyFuture {
    type Item = Framed<TcpStream, LinesCodec>;
    type Error = io::Error;
    fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
        let strm = try_ready!(TcpStream::connect(&self.addr).poll());
        Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::Shutdown;

    #[test]
    fn connect() {
        let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
        let fut = MyFuture { addr: addr }
            .and_then(|f| {
                println!("connected");
                let cn = f.get_ref();
                cn.shutdown(Shutdown::Both)
            }).map_err(|e| panic!("error: {:?}", e));
        tokio::run(fut)
    }
}

示例程序

我在其他语言中看到过测试二进制本身提供异步返回结果的机制,但是我没有找到在Rust中使用类似机制的好方法。

3个回答

5
一种简单的测试异步代码的方法是为每个测试使用专门的运行时:启动它,等待未来完成并在测试结束时关闭运行时。
#[test]
fn my_case() {

    // setup future f
    // ...

    tokio::run(f);
}

我不知道Rust生态系统中是否已经有了统一的模式;请参阅这个讨论,了解面向未来的代码的测试支持的演变。

为什么你的代码不能按预期工作

当你调用poll()时,将查询未来以检查是否有可用值。

如果没有可用值,则会注册一个兴趣,以便在发生可以解决未来的事件时再次调用poll()

当调用MyFuture::poll()时:

  1. TcpStream::connect创建一个新的future TcpStreamNew
  2. TcpStreamNew::poll立即被调用,只会在步骤1中创建future时调用一次
  3. 未来超出范围,因此下次调用MyFuture::poll时,您永远无法解决先前创建的futures。
你已经注册了对未来的兴趣,如果第一次轮询没有解决问题,则不会再次询问(轮询)已解决的值或错误。
“不确定性”行为的原因是因为第一个poll有时会立即解析为ConnectionRefused错误,有时它会永远等待未来的连接事件或无法检索的失败。
看看Tokio使用的mio::sys::unix::tcp::TcpStream
 impl TcpStream {
     pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
         set_nonblock(stream.as_raw_fd())?;

         match stream.connect(addr) {
             Ok(..) => {}
             Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
             Err(e) => return Err(e),
         }

         Ok(TcpStream {
             inner: stream,
         })
     }

当你在非阻塞套接字上连接时,系统调用可能会立即连接/失败或返回EINPROGRESS,在这种情况下,必须触发轮询以检索错误的值。

也许这个答案需要稍微升级一下?使用 Tokio 1.0,编译器报错:"tokio::run(f);(run) not found in tokio"。 - zertyz

4
问题不在于测试,而在于实现。
这个工作正常的测试用例基于你的测试用例,并没有使用自定义的未来实现,只调用了 TcpStream::connect()。它可以像你预期的那样正常工作。
extern crate futures;
extern crate tokio;

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::Shutdown;
    use std::net::SocketAddr;
    use tokio::net::TcpStream;
    use tokio::prelude::*;

    #[test]
    fn connect() {
        let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
        let fut = TcpStream::connect(&addr)
            .and_then(|f| {
                println!("connected");
                f.shutdown(Shutdown::Both)
            }).map_err(|e| panic!("error: {:?}", e));
        tokio::run(fut)
    }
}

playground

您在poll()方法中一遍又一遍地连接到同一个端点。这不是Future的工作方式。 poll()方法将被重复调用,并期望在某个时间点返回Ok(Async::Ready(..))Err(..)

如果每次调用poll()时都初始化一个新的TCP连接,则很难及时完成。

这是一个修改后的示例,可以实现您的预期:

#[macro_use(try_ready)]
extern crate futures;
extern crate tokio;
use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::{ConnectFuture, TcpStream};
use tokio::prelude::*;

struct MyFuture {
    tcp: ConnectFuture,
}

impl MyFuture {
    fn new(addr: SocketAddr) -> MyFuture {
        MyFuture {
            tcp: TcpStream::connect(&addr),
        }
    }
}

impl Future for MyFuture {
    type Item = Framed<TcpStream, LinesCodec>;
    type Error = io::Error;

    fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
        let strm = try_ready!(self.tcp.poll());
        Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::Shutdown;

    #[test]
    fn connect() {
        let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
        let fut = MyFuture::new(addr)
            .and_then(|f| {
                println!("connected");
                let cn = f.get_ref();
                cn.shutdown(Shutdown::Both)
            }).map_err(|e| panic!("error: {:?}", e));
        tokio::run(fut)
    }
}

我不确定您将来的打算,但我无法评论是否是正确的方法。


虽然我认为这个答案也非常高质量,但我认为被采纳的答案展示了一种在框架中运行 futures 的方法,对我来说更有价值,更符合问题的要求。当我看到另一个答案时,我正准备接受你的答案,这是一个艰难的决定,因为我确实承认你回答得又快又正确。如果这让你失望,我很抱歉,同时也非常感谢你的回复。 - Martin

1

在某种程度上,您可以使用Tokio的测试库来使这更加容易; 它支持在单元测试中使用async/await。

#[tokio::test]
async fn my_future_test() {
  let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
  match MyFuture { addr }.poll().await {
    Ok(f) => assert!("something good")
    Err(e) => assert!("something bad")
  }
}

https://docs.rs/tokio/0.3.3/tokio/attr.test.html


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接