如何作为一个流消耗进程的标准输出,而不会阻塞?

3
在Java(或clojure)中,我想启动一个外部进程并将其stdout作为流进行消耗。理想情况下,我希望每次外部进程刷新其输出流时都能消耗该过程的输出流,但不确定如何实现以及如何在不阻塞的情况下实现它。
在处理外壳进程的Java ProcessPipeInputStream时(例如{{link1:Unix ProcessPipeInputStream}}),我发现继承的 InputStream 方法有点低级,难以使用,并且不确定是否有一种非阻塞方式可以每次从流中消耗生产者端刷新或以非阻塞方式进行其他操作。
许多代码示例在无限循环中阻塞输出流,从而独占用于监听的线程。我希望完全避免这种阻塞行为。
底线是:
是否有一种非阻塞方式可以在输入流上被通知,每次生产方刷新时都会发生?
1个回答

1
你需要创建一个单独的线程来从这样的流中消费,以便你的程序的其余部分可以并行地执行其预定任务。
class ProcessOutputReader implements Runnable {

   private InputStream processOutput;

   public ProcessOutputReader(final InputStream processOutput) {
       this.processOutput = processOutput;
   } 

   @Override
   public void run() {
      int nextByte;
      while ((nextByte = processOutput.read()) != -1) {
        // do whatever you need to do byte-by-byte.
        processByte(nextByte);
      }
   }
}

class Main {
   public static void main(final String[] args) {
       final Process proc = ...; 
       final ProcessOutputReader reader = new ProcessOutputReader(proc.getInputStream());
       final Thread processOutputReaderThread = new Thread(reader);
       processOutputReaderThread.setDaemon(true); // allow the VM to terminate if this is the only thread still active.
       processOutputReaderThread.start();
       ...
       // if you wanna wait for the whole process output to be processed at some point you can do this:
       try {
         processOutputReaderThread.join();
       } catch (final InterruptedException ex) {
         // you need to decide how to recover from if your wait was interrupted.
       }
   }
}

如果你想以每个刷新为一个整体来处理,而不是逐字节处理...我不确定是否有100%的保证能够捕获到每个进程刷新。毕竟,进程自己的IO框架软件(Java、C、Python等)可能会以不同的方式处理“flush”操作,也许你最终接收到的是该外部进程中任何给定刷新的多个字节块。
无论如何,你可以尝试使用InputStreamavailable方法来实现:
   @Override
   public void run() {
      int nextByte;
      while ((nextByte = processOutput.read()) != -1) {
        final int available = processOutput.available();
        byte[] block = new byte[available + 1];
        block[0] = nextByte;
        final int actuallyAvailable = processOutput.read(block, 1, available);
        if (actuallyAvailable < available) { 
          if (actuallyAvailable == -1) { 
            block = new byte[] { nextByte };
          } else {
            block = Arrays.copyOf(block, actuallyAvailable + 1);
          } 
        }
        // do whatever you need to do on that block now.
        processBlock(block);
      }
   }

我不100%确定,但我认为不能相信available会返回一个保证的字节数下限,可以在不被阻塞的情况下检索,也不能保证下一次的read操作将返回请求的那些available字节数;这就是为什么上面的代码要检查实际读取的字节数(actuallyAvailable)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接