如何在Rust中读取子进程的输出而不阻塞?

34
我正在使用 Rust 编写一个小型的 ncurses 应用程序,需要与子进程通信。我已经用 Common Lisp 写了一个原型。我尝试重写它,因为 CL 对于这样一个小工具使用了大量内存。
我在尝试弄清楚如何与子进程交互方面遇到了一些困难。
我目前的做法大致如下:
1. 创建进程:

let mut program = match Command::new(command)
    .args(arguments)
    .stdin(Stdio::piped())
    .stdout(Stdio::piped())
    .stderr(Stdio::piped())
    .spawn()
{
    Ok(child) => child,
    Err(_) => {
        println!("Cannot run program '{}'.", command);
        return;
    }
};
  • 将其传递给一个无限循环(直到用户退出),该循环读取和处理输入并侦听输出,就像这样(并将其写入屏幕):

  • fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
        match program.stdout {
            Some(ref mut out) => {
                let mut buf_string = String::new();
                match out.read_to_string(&mut buf_string) {
                    Ok(_) => output_viewer.append_string(buf_string),
                    Err(_) => return,
                };
            }
            None => return,
        };
    }
    
    调用read_to_string会阻塞程序直到进程退出。据我所见,read_to_endread也似乎会阻塞。如果我尝试运行像ls这样立即退出的命令,它可以正常工作,但是对于像pythonsbcl这样不会退出的命令,只有手动杀死子进程后才能继续执行。
    基于这个答案,我修改了代码以使用BufReader:
        fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
            match program.stdout.as_mut() {
                Some(out) => {
                    let buf_reader = BufReader::new(out);
                    for line in buf_reader.lines() {
                        match line {
                            Ok(l) => {
                                output_viewer.append_string(l);
                            }
                            Err(_) => return,
                        };
                    }
                }
                None => return,
            }
        }
    
    然而,问题仍然存在。它会读取所有可用的行,然后阻塞。由于该工具应该与任何程序一起使用,因此在尝试读取之前,无法猜测输出何时结束。似乎也没有办法为BufReader设置超时。
    4个回答

    28

    默认情况下,流是阻塞的。TCP/IP 流、文件系统流、管道流,它们都是阻塞的。当你要求流提供一块字节时,它会停止并等待直到它获得了给定数量的字节或发生其他事件(一个interrupt,一个流结束,一个错误)。

    操作系统渴望将数据返回给读取进程,因此如果你只想等待下一行并在其到来时立即处理它,则 Shepmaster 在Unable to pipe to or from spawned child process more than once(以及他在这里的答案中提出的方法)可以工作。
    虽然从理论上讲,它不必须工作,因为操作系统允许在read中让BufReader等待更多数据,但在实践中,操作系统更喜欢早期的“短读取”而不是等待。

    这种基于BufReader的简单方法在处理多个流(例如子进程的stdoutstderr)或多个进程时会变得更加危险。例如,当子进程等待您排空其stderr管道而您的进程被阻塞等待其空的stdout时,基于BufReader的方法可能会死锁。

    同样,在不想让程序无限期地等待子进程时,也不能使用BufReader。也许您想要在子进程仍在工作并且没有输出时显示进度条或计时器。

    如果您的操作系统不愿意立即将数据返回给进程(更喜欢“完整读取”而不是“短读取”),则不能使用基于BufReader的方法,因为在这种情况下,子进程打印的最后几行可能会落入灰色区域:操作系统已经收到它们,但它们不足以填充BufReader的缓冲区。

    BufReader 受限于 Read 接口允许它对流进行的操作,它的阻塞程度与底层流一样。为了效率,它会分块 读取 输入,告诉操作系统填满其可用缓冲区。
    你可能想知道为什么在这里分块读取数据如此重要,为什么 BufReader 不能逐字节读取数据。问题在于,为了从流中读取数据,我们需要操作系统的帮助。另一方面,我们不是操作系统,我们与其隔离工作,以避免在进程出现问题时干扰它。因此,为了调用操作系统,需要转换到“内核模式”,这可能还涉及“上下文切换”。这就是为什么调用操作系统以读取每个单独字节是昂贵的原因。我们希望尽可能少地调用操作系统,并以批处理方式获取流数据。

    要在不阻塞的情况下等待流,您需要使用非阻塞流。MIO 承诺为管道提供所需的非阻塞流支持, 最有可能是使用PipeReader, 但我目前还没有检查过。

    流的非阻塞特性应该使得无论操作系统是否偏好“短读取”,都可以按块读取数据。由于非阻塞流永远不会阻塞,如果流中没有数据,它只会简单地告诉您这一点。

    在没有非阻塞流的情况下,你必须使用线程来执行阻塞读取,使其在单独的线程中执行,从而不会阻塞主线程。你可能还需要逐字节读取流,以便能够立即响应行分隔符,以防操作系统不喜欢“短读取”。这里有一个可工作的示例:https://gist.github.com/ArtemGr/db40ae04b431a95f2b78
    P.S. 这里是一个函数示例,可以通过共享字节向量来监视程序的标准输出。
    use std::io::Read;
    use std::process::{Command, Stdio};
    use std::sync::{Arc, Mutex};
    use std::thread;
    
    /// Pipe streams are blocking, we need separate threads to monitor them without blocking the primary thread.
    fn child_stream_to_vec<R>(mut stream: R) -> Arc<Mutex<Vec<u8>>>
    where
        R: Read + Send + 'static,
    {
        let out = Arc::new(Mutex::new(Vec::new()));
        let vec = out.clone();
        thread::Builder::new()
            .name("child_stream_to_vec".into())
            .spawn(move || loop {
                let mut buf = [0];
                match stream.read(&mut buf) {
                    Err(err) => {
                        println!("{}] Error reading from stream: {}", line!(), err);
                        break;
                    }
                    Ok(got) => {
                        if got == 0 {
                            break;
                        } else if got == 1 {
                            vec.lock().expect("!lock").push(buf[0])
                        } else {
                            println!("{}] Unexpected number of bytes: {}", line!(), got);
                            break;
                        }
                    }
                }
            })
            .expect("!thread");
        out
    }
    
    fn main() {
        let mut cat = Command::new("cat")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .spawn()
            .expect("!cat");
    
        let out = child_stream_to_vec(cat.stdout.take().expect("!stdout"));
        let err = child_stream_to_vec(cat.stderr.take().expect("!stderr"));
        let mut stdin = match cat.stdin.take() {
            Some(stdin) => stdin,
            None => panic!("!stdin"),
        };
    }
    

    我正在使用一些辅助工具来控制SSH会话:

    try_s! (stdin.write_all (b"echo hello world\n"));
    try_s! (wait_forˢ (&out, 0.1, 9., |s| s == "hello world\n"));
    

    注意,在async-std中对read调用使用await也会阻塞。只是它不是阻塞系统线程,而是阻塞一条未来链(本质上是无栈绿色线程)。poll_read是非阻塞接口。在async-std#499中,我已经询问开发人员是否从这些API中保证了短读取。

    注意,在Nom中可能存在类似的问题:“我们希望告诉IO侧根据解析器的结果(不完整或不完整)进行补充”

    P.S. 或许看看 crossterm 中流读取是如何实现的会很有趣。在 Windows 上,poll.rs 中使用了本地的 WaitForMultipleObjects。在 unix.rs 中使用了 mio 的 poll


    感谢您的解释,我会研究MIO,如果不行的话,我将使用单独的线程。 - jkiiski

    17

    Tokio的Command

    下面是使用tokio 0.2的示例:

    use std::process::Stdio;
    use futures::StreamExt; // 0.3.1
    use tokio::{io::BufReader, prelude::*, process::Command}; // 0.2.4, features = ["full"]
    
    #[tokio::main]
    async fn main() {
        let mut cmd = Command::new("/tmp/slow.bash")
            .stdout(Stdio::piped()) // Can do the same for stderr
            .spawn()
            .expect("cannot spawn");
    
        let stdout = cmd.stdout().take().expect("no stdout");
        // Can do the same for stderr
    
        // To print out each line
        // BufReader::new(stdout)
        //     .lines()
        //     .for_each(|s| async move { println!("> {:?}", s) })
        //     .await;
    
        // To print out each line *and* collect it all into a Vec
        let result: Vec<_> = BufReader::new(stdout)
            .lines()
            .inspect(|s| println!("> {:?}", s))
            .collect()
            .await;
    
        println!("All the lines: {:?}", result);
    }
    

    Tokio-Threadpool

    这是一个使用 tokio 0.1 和 tokio-threadpool 的示例。我们使用 blocking 函数在一个线程中启动进程,然后使用 stream::poll_fn 将其转换为流。

    use std::process::{Command, Stdio};
    use tokio::{prelude::*, runtime::Runtime}; // 0.1.18
    use tokio_threadpool; // 0.1.13
    
    fn stream_command_output(
        mut command: Command,
    ) -> impl Stream<Item = Vec<u8>, Error = tokio_threadpool::BlockingError> {
        // Ensure that the output is available to read from and start the process
        let mut child = command
            .stdout(Stdio::piped())
            .spawn()
            .expect("cannot spawn");
        let mut stdout = child.stdout.take().expect("no stdout");
    
        // Create a stream of data
        stream::poll_fn(move || {
            // Perform blocking IO
            tokio_threadpool::blocking(|| {
                // Allocate some space to store anything read
                let mut data = vec![0; 128];
                // Read 1-128 bytes of data
                let n_bytes_read = stdout.read(&mut data).expect("cannot read");
    
                if n_bytes_read == 0 {
                    // Stdout is done
                    None
                } else {
                    // Only return as many bytes as we read
                    data.truncate(n_bytes_read);
                    Some(data)
                }
            })
        })
    }
    
    fn main() {
        let output_stream = stream_command_output(Command::new("/tmp/slow.bash"));
    
        let mut runtime = Runtime::new().expect("Unable to start the runtime");
    
        let result = runtime.block_on({
            output_stream
                .map(|d| String::from_utf8(d).expect("Not UTF-8"))
                .fold(Vec::new(), |mut v, s| {
                    print!("> {}", s);
                    v.push(s);
                    Ok(v)
                })
        });
    
        println!("All the lines: {:?}", result);
    }
    

    这里有许多可能的权衡可以做。例如,总是分配128字节并不理想,但它易于实现。

    支持

    以下是slow.bash的参考:

    #!/usr/bin/env bash
    
    set -eu
    
    val=0
    
    while [[ $val -lt 10 ]]; do
        echo $val
        val=$(($val + 1))
        sleep 1
    done
    

    另请参阅:


    2
    如果Unix支持足够好,您还可以将两个输出流设置为非阻塞,并像使用TcpStreamset_nonblocking函数一样对它们进行轮询。由Command生成的ChildStdoutChildStderrStdio(并包含文件描述符),您可以直接修改这些句柄的读取行为以使其非阻塞。基于jcreekmore/timeout-readwrite-rsanowell/nonblock-rs的工作,我使用此包装器来修改流处理程序:
    extern crate libc;
    use std::io::Read;
    use std::os::unix::io::AsRawFd;
    use libc::{F_GETFL, F_SETFL, fcntl, O_NONBLOCK};
    
    fn set_nonblocking<H>(handle: &H, nonblocking: bool) -> std::io::Result<()>
    where
        H: Read + AsRawFd,
    {
        let fd = handle.as_raw_fd();
        let flags = unsafe { fcntl(fd, F_GETFL, 0) };
        if flags < 0 {
            return Err(std::io::Error::last_os_error());
        }
        let flags = if nonblocking{
            flags | O_NONBLOCK
        } else {
            flags & !O_NONBLOCK
        };
        let res = unsafe { fcntl(fd, F_SETFL, flags) };
        if res != 0 {
            return Err(std::io::Error::last_os_error());
        }
        Ok(())
    }
    

    你可以像处理其他非阻塞流一样管理这两个流。以下示例基于 polling crate,它使得处理读事件和使用BufReader进行行读取变得非常容易:
    use std::process::{Command, Stdio};
    use std::path::PathBuf;
    use std::io::{BufReader, BufRead};
    use std::thread;
    extern crate polling;
    use polling::{Event, Poller};
    
    fn main() -> Result<(), std::io::Error> {
        let path = PathBuf::from("./worker.sh").canonicalize()?;
    
        let mut child = Command::new(path)
            .stdin(Stdio::null())
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .spawn()
            .expect("Failed to start worker");
    
        let handle = thread::spawn({
            let stdout = child.stdout.take().unwrap();
            set_nonblocking(&stdout, true)?;
            let mut reader_out = BufReader::new(stdout);
    
            let stderr = child.stderr.take().unwrap();
            set_nonblocking(&stderr, true)?;
            let mut reader_err = BufReader::new(stderr);
    
            move || {
                let key_out = 1;
                let key_err = 2;
                let mut out_closed = false;
                let mut err_closed = false;
    
                let poller = Poller::new().unwrap();
                poller.add(reader_out.get_ref(), Event::readable(key_out)).unwrap();
                poller.add(reader_err.get_ref(), Event::readable(key_err)).unwrap();
    
                let mut line = String::new();
                let mut events = Vec::new();
                loop {
                    // Wait for at least one I/O event.
                    events.clear();
                    poller.wait(&mut events, None).unwrap();
    
                    for ev in &events {
                        // stdout is ready for reading
                        if ev.key == key_out {
                            let len = match reader_out.read_line(&mut line) {
                                Ok(len) => len,
                                Err(e) => {
                                    println!("stdout read returned error: {}", e);
                                    0
                                }
                            };
                            if len == 0 {
                                println!("stdout closed (len is null)");
                                out_closed = true;
                                poller.delete(reader_out.get_ref()).unwrap();
                            } else {
                                print!("[STDOUT] {}", line);
                                line.clear();
                                // reload the poller
                                poller.modify(reader_out.get_ref(), Event::readable(key_out)).unwrap();
                            }
                        }
    
                        // stderr is ready for reading
                        if ev.key == key_err {
                            let len = match reader_err.read_line(&mut line) {
                                Ok(len) => len,
                                Err(e) => {
                                    println!("stderr read returned error: {}", e);
                                    0
                                }
                            };
                            if len == 0 {
                                println!("stderr closed (len is null)");
                                err_closed = true;
                                poller.delete(reader_err.get_ref()).unwrap();
                            } else {
                                print!("[STDERR] {}", line);
                                line.clear();
                                // reload the poller
                                poller.modify(reader_err.get_ref(), Event::readable(key_err)).unwrap();
                            }
                        }
                    }
    
                    if out_closed && err_closed {
                        println!("Stream closed, exiting process thread");
                        break;
                    }
                }
            }
        });
    
        handle.join().unwrap();
        Ok(())
    }
    

    此外,与EventFd上的包装器一起使用时,可以轻松地在另一个线程中停止进程,而无需阻塞或主动轮询,并且仅使用单个线程。
    编辑:根据我的测试,看起来polling crate会自动将已轮询的句柄设置为非阻塞模式。如果您想直接使用nix::poll对象,则set_nonblocking函数仍然很有用。

    2
    我遇到了许多使用情况,其中与子进程交互的行分隔文本非常有用,因此我为此编写了一个箱子interactive_process
    我认为原始问题早已解决,但我认为这可能对其他人有帮助。

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接