使用TcpStream强制进行非阻塞读取

12
我有一个线程,维护着一个套接字列表,我想遍历这个列表,看看是否有可读的内容,如果有-就对其进行操作,如果没有-就继续到下一个。问题是,一旦我遇到第一个节点,所有的执行都会停止,直到有数据可读为止。
我正在使用std :: io :: Read :: read(&mut self,buf:&mut [u8])-> Result
来自doc “此函数不提供关于它是否阻止等待数据的任何保证,但如果对象需要阻止读取但无法阻止,则通常会通过Err返回值发出信号。”
深入研究源代码,TcpStream的Read实现是
impl Read for TcpStream {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
}

这句话的意思是“引发了什么”。
pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
    let fd = self.fd();
    let dolock = || self.lock_nonblocking();
    let doread = |nb| unsafe {
        let flags = if nb {c::MSG_DONTWAIT} else {0};
        libc::recv(fd,
                   buf.as_mut_ptr() as *mut libc::c_void,
                   buf.len() as wrlen,
                   flags) as libc::c_int
    };
    read(fd, self.read_deadline, dolock, doread)
}

最后,调用read<T, L, R>(fd: sock_t, deadline: u64, mut lock: L, mut read: R)
在这里,可以看到循环非阻塞读取直到数据被检索或发生错误。
是否有一种方法可以使用TcpStream强制进行非阻塞读取?

为什么不为每个套接字启动一个线程? - oli_obk
@ker 这个实现是针对预计连接时间为5-30分钟的,并且应该能够处理大约200k并发连接。我假设线程数量是不好的,但目前正在尝试找出一种计算方法,因为这将是备选方案。 - nathansizemore
你可能想要看一下AsyncIO库,例如https://github.com/carllerche/mio。 - Levans
看起来是一个很好的库,但我并不需要在流上所有的io都是异步的。我想我只需在本项目中实现一个单独的特性来满足这个特定的目的。 - nathansizemore
1个回答

9

更新的答案

需要注意的是,从Rust 1.9.0开始,std::net::TcpStream已经添加了功能:

fn set_nonblocking(&self, nonblocking: bool) -> Result<()>

原始答案

我无法使用TcpStream,也不想引入一个单独的IO操作库,所以决定在使用之前将文件描述符设置为非阻塞状态,然后执行系统调用来读写。这绝对不是最安全的解决方案,但比实现新的IO库要少些工作,即使MIO看起来很好。

extern "system" {
    fn read(fd: c_int, buffer: *mut c_void, count: size_t) -> ssize_t;
}

pub fn new(user: User, stream: TcpStream) -> Socket {

    // First we need to setup the socket as Non-blocking on POSIX
    let fd = stream.as_raw_fd();
    unsafe {
        let ret_value = libc::fcntl(fd,
            libc::consts::os::posix01::F_SETFL,
            libc::consts::os::extra::O_NONBLOCK);

        // Ensure we didnt get an error code
        if ret_value < 0 {
            panic!("Unable to set fd as non-blocking")
        }
    }

    Socket {
        user: user,
        stream: stream
    }
}

pub fn read(&mut self) {
    let count = 512 as size_t;
    let mut buffer = [0u8; 512];
    let fd = self.stream.as_raw_fd();

    let mut num_read = 0 as ssize_t;
    unsafe {
        let buf_ptr = buffer.as_mut_ptr();
        let void_buf_ptr: *mut c_void = mem::transmute(buf_ptr);
        num_read = read(fd, void_buf_ptr, count);
        if num_read > 0 {
            println!("Read: {}", num_read);
        }

        println!("test");
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接