异步运行进程并读取标准输出和标准错误流

14

我有一些代码可以异步执行一个进程并从stdout和stderr读取信息,然后在进程完成时进行处理。它看起来像这样:

Process process = builder.start();

    Thread outThread = new Thread(() -> {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
            // Read stream here
        } catch (Exception e) {
        }
    });

    Thread errThread = new Thread(() -> {
      try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
        // Read stream here
      } catch (Exception e) {
      }
    });

    outThread.start();
    errThread.start();

    new Thread(() -> {
      int exitCode = -1;
      try {
        exitCode = process.waitFor();
        outThread.join();
        errThread.join();
      } catch (Exception e) {
      }

    // Process completed and read all stdout and stderr here
    }).start();

我的问题在于我需要使用3个线程来实现异步的"运行和获取输出"任务 - 我不知道为什么,但是我感觉使用3个线程不太合适。我可以从一个线程池中分配这些线程,但那仍然会阻塞这些线程。

有没有什么方法可以通过NIO将此减少到更少的线程(1个?)?我所能想到的任何方法都需要不断旋转一个线程(除非我加入一些睡眠),但我也不想这样做...

注意:我需要在进行过程中读取(而不是在进程停止时),并且我需要将stdin与stderr分开,因此无法使用重定向。


我认为你不需要那个最后的线程,除非你需要在循环中运行多个线程。 - Elliott Frisch
1
@ElliottFrisch - 你所看到的是在一个名为 runAsync(String command) 的方法内部,我需要立即返回(它是异步的)。我该如何处理进程的退出代码以及流的读取完成,而不需要额外的线程? - Cheetah
考虑到这一点,我认为你需要所有三个线程。 - Elliott Frisch
5个回答

4

既然您已经指定需要在操作过程中查看输出,那么就没有非多线程解决方案。

不过您可以将线程数减少到一个,除了主线程之外:

Process process = builder.start();
Thread errThread = new Thread(() -> {
    try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
      // Read stream here
    } catch (Exception e) {
    }
});
errThread.start();

try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
        // Read stream here
} catch (Exception e) {
}
// we got an end of file, so there can't be any more input.  Now we need to wait for stderr/process exit.

int exitCode = -1;
try {
    exitCode = process.waitFor();
    errThread.join();
} catch (Exception e) {
}

// Process completed

如果您真的不需要在进程结束之前处理错误/输出,您可以简化它并只使用主线程如下所示:

    File stderrFile = File.createTempFile("tmpErr", "out");
    File stdoutFile = File.createTempFile("tmpStd", "out");
    try {
        ProcessBuilder builder = new ProcessBuilder("ls /tmp");
        Process p = builder.start();
        int exitCode = -1;
        boolean done = false;
        while (!done) {
            try {
                exitCode = p.waitFor();
                done = true;
            } catch (InterruptedException ie) {
                System.out.println("Interrupted waiting for process to exit.");
            }
        }
        BufferedReader err = new BufferedReader(new FileReader(stderrFile));
        BufferedReader in = new BufferedReader(new FileReader(stdoutFile));
        ....
    } finally {
        stderrFile.delete();
        stdoutFile.delete();
    }

这可能不是一个好主意,如果你从调用的进程生成了大量输出,它可能会耗尽磁盘空间...但由于不必再启动另一个线程,所以速度可能会稍微快一些。

2
如果您在进程中没有实际从 err 中读取内容,那么可能很快就会被卡住。 - assylias
它正在从两个方面读取。基本上这里唯一的改进是您不需要单独的线程等待进程退出。它无法退出,直到stdout关闭。 - Alcanzar
这不是正确的方法,因为它只适用于非常小的输出。如果输出相对较大(可能超过1kB?),你的主线程肯定会被卡住。 - Lucio Paiva
@LucioPaiva 怎么做呢?“在这里读取流”应该涉及某种循环。 - OrangeDog

3
假设您不介意将输入和错误流合并,您可以只使用一个线程来处理:
builder.redirectErrorStream(true); //merge input and error streams
Process process = builder.start();

Thread singleThread = new Thread(() -> {
  int exitCode = -1;
  //read from the merged stream
  try (BufferedReader reader = 
              new BufferedReader(new InputStreamReader(process.getInputStream()))) {
    String line;
    //read until the stream is exhausted, meaning the process has terminated
    while ((line = reader.readLine()) != null) {
      System.out.println(line); //use the output here
    }
    //get the exit code if required
    exitCode = process.waitFor();
  } catch (Exception e) { }
}).start();

1
请阅读底部的注释 :) - Cheetah
我错过了 - 我认为你现在没什么可做的了。为什么一开始使用3个线程会有问题呢? - assylias

1

请查看来自OstermillerUtils的ExecHelper

这个想法是等待进程完成的线程不仅仅等待,而且如果有可用的输入,则从stdout和stderr读取输入,并定期检查进程是否已完成。

如果您不使用stdout和stderr的输入进行任何重处理,那么您可能不需要额外的线程来处理输入。只需复制ExecHelper并添加一些额外的函数/方法来处理任何新输入即可。我以前就这样做过,以显示进程输出,而进程正在运行时,这并不难做到(但我丢失了源代码)。

如果您确实需要单独的线程来处理输入,请确保在更新或读取这些缓冲区时同步输出和错误StringBuffers。

另一件你可能想考虑的事情是添加一个中止超时时间。这有点难以实现,但对我非常有价值:如果进程花费太多时间,该进程将被销毁,从而确保没有任何东西挂起。你可以在这个旧(过时?)示例 this gist 中找到。


0

你必须做出妥协。以下是你的选项:

A. 你可以使用2个线程(而不是3个)来完成它:

第一个线程:

  1. stdout 读取,直到 readline 返回 null
  2. 调用 Process.waitFor()
  3. join 线程#2

第二个线程:

  1. stderr 读取,直到 readline 返回 null

B. 合并流并使用 Debian 的 annotate-output 来区分这两个流。

http://manpages.debian.org/cgi-bin/man.cgi?query=annotate-output&sektion=1

C. 如果它是短暂的进程,只需等待其结束。

D. 如果它是长期运行的进程,则可以在读取器之间旋转,并在中间加入一些睡眠时间。


0
对于使用Kotlin的人来说,这个问题旧了,但有了新的答案。使用Kotlin协程看起来很简单:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.produce
import java.io.BufferedReader
import java.io.InputStreamReader

const val CONSOLE_MAX_LIFE = 1000000L

@OptIn(ExperimentalCoroutinesApi::class)
fun f(scope: CoroutineScope) {
    fun executeCommand(command: String): Process {
        val builder = ProcessBuilder()
        return builder.command(*command.split(" ").toTypedArray()).start()
    }
    val process = executeCommand("cmd args")
    val stdoutReader = BufferedReader(InputStreamReader(process.inputStream))
    val stderrReader = BufferedReader(InputStreamReader(process.errorStream))

    val stdoutChannel = scope.produce(Dispatchers.IO) {
        var line: String
        while (stdoutReader.readLine().also { line = it } != null) { send(line) }
    }
    val stderrChannel = scope.produce(Dispatchers.IO) {
        var line: String
        while (stderrReader.readLine().also { line = it } != null) { send(line) }
    }
    scope.launch(Dispatchers.IO) {
        while (true) {
            val line = stdoutChannel.receive()
            if (line.contains("xyz"))
                setProgressIndication(Pair(.5f, "5/10"))
        }
    }
    scope.launch(Dispatchers.IO) {
        while (true) {
            val line = stderrChannel.receive()
            if (line.contains("abc"))
                throw Exception()
        }
    }
    scope.launch (Dispatchers.IO) {
        process.onExit().get()
        stdoutChannel.cancel()
        stderrChannel.cancel()
    }
    scope.launch (Dispatchers.IO) {
        delay(CONSOLE_MAX_LIFE)
        process.destroy()
    }
}

这个使用6个协程,2个用于标准输出,2个用于标准错误。一个用于发送通道,每个标准输出/错误一个用于接收,最后一个用于在进程终止后关闭通道,并且一个用于终止终端。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接