如何使用Tokio从TcpStream读取单个数据包?

3

我正在尝试使用tokio接收单个数据包:

extern crate tokio;
extern crate tokio_io;

use tokio::net::{TcpListener};
use tokio::prelude::*;

use std::net::SocketAddr;
fn main() {
    let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
    let socket = TcpListener::bind(&addr).unwrap();
    println!("Listening on: {}", addr);

    let done = socket
        .incoming()
        .map_err(|e| println!("failed to accept socket; error = {:?}", e))
        .for_each(move |mut socket| {
            let mut bytes = vec![];
            bytes.reserve(1024);
            let processor = socket.read_buf(&mut bytes).into_future()
                .and_then(move |_size| {
                    println!("bytes: {:?}", bytes);
                    Ok(())
                })
                .map_err(|_| ());;
            tokio::spawn(processor)
        });
    tokio::run(done);
}

这段代码打印一个空数据包。我该如何修改代码以打印接收到的带有数据的数据包?

1
嗯...非常奇怪。我在编写代码时遇到了问题。如果我删除bytes.reserve(1024);,那么有时候我会得到完整的消息。很可能我们需要添加一些条件来检查消息的大小。 - undefined
我已经有了90%的答案,马上会发布。我觉得我之前的评论是错误的,所以删除了它。 - undefined
请不要在你的问题中放置答案。欢迎您在下方回答自己的问题,甚至接受该答案。这样做更好,因为它允许其他人提供答案,并由社区投票评选。 - undefined
2个回答

1

对于我自己而言,我几乎找到了答案。非常有帮助的类似问题

struct AsWeGetIt<R>(R);

impl<R> Stream for AsWeGetIt<R>
    where
        R: AsyncRead,
{
    type Item = BytesMut;
    type Error = std::io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        let mut buf = BytesMut::with_capacity(1000);

        self.0
            .read_buf(&mut buf)
            .map(|async| async.map(|_| Some(buf)))
    }
}
....
let processor = AsWeGetIt(socket).into_future()
.and_then(|(bytes,_)|  {
    println!("bytes: {:?}", bytes);
    Ok(())
}).map_err(|_| ());

为了更好地理解如何在没有单独的结构的情况下进行操作...以及地图使用的原因和目的是什么?


0

如果你的目标真的是收到一个数据包,我认为你已经成功了!

我已经测试了几次这个程序,并且得到了回应。我正在使用以下进行测试:

nc 127.0.0.1 8080 <<< hello

运行了几次后,我得到了以下输出:

Listening on: 0.0.0.0:8080
bytes: [104, 101, 108, 108, 111, 10]
bytes: []
bytes: [104, 101, 108, 108, 111, 10]
bytes: []
bytes: [104, 101, 108, 108, 111, 10]
bytes: []
bytes: [104, 101, 108, 108, 111, 10]
bytes: [104, 101, 108, 108, 111, 10]

正如你所见,有时候我们已经有了数据,而有时候则没有。我认为你在测试中只是运气不佳,在发送任何数据之前就收到了TCP响应?

我大约90%确定TCP流可以包含空数据包,这就是我们看到的情况。(如果有人在这方面有更多知识,请随意编辑答案或评论)。


要修复你的程序,也许你需要重新考虑你的目标。

仅读取一个TCP数据包很少会有帮助。通常情况下,你希望读取一定数量的字节,并在数据到达时进行处理。我对TCP的理解是它是一串字节流,而不是真正的数据包流。数据包只是将字节从一个地方传输到另一个地方的一种方式,并且它们可以是任意长度而不会破坏兼容性。"一个数据包" 是一个相当模糊的概念。

下面是一个示例,使用tokio::io::read_exact函数读取流的前16个字节:

extern crate tokio;

use tokio::net::TcpListener;
use tokio::prelude::*;

use std::net::SocketAddr;

fn main() {
    let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
    let socket = TcpListener::bind(&addr).unwrap();
    println!("Listening on: {}", addr);

    let done = socket
        .incoming()
        .map_err(|e| println!("failed to accept socket; error = {:?}", e))
        .for_each(move |mut socket| {
            // this function deals with bytes a bit differently and will just fill the
            // buffer exactly rather than adding onto the end.
            let mut bytes = vec![0; 16];
            let processor = tokio::io::read_exact(socket, bytes)
                .and_then(move |(socket, bytes)| {
                    println!("bytes: {:?}", bytes);
                    Ok(())
                })
                .map_err(|_| ());
            tokio::spawn(processor)
        });
    tokio::run(done);
}

1
read_exact解决了另一个问题。因此,我们总是等待确切的大小。以下是在阻塞模式下读取时如何准确了解正在发生的情况。 据我所知,空消息无法传递(无论如何,它们都不会进入用户空间)。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接