Vertx - 如何异步从可执行程序读取流

10

我有一个生成器 g,一旦我从控制台运行它,它就开始将数据写入控制台输出(stdout),然后休眠 x 秒并继续,形成了一串数据。

我想让我的程序运行 g 并将其输出绑定到 Java Vertx 应用程序作为流输入。

我希望所有的读取都是异步完成,我该如何实现?

这是我正在做的事情:

public class InputHandler extends AbstractVerticle {

    final String command = "path";

    @Override
    public void start() throws Exception {
        Runtime r = Runtime.getRuntime();
        Process p;     // Process tracks one external native process
        BufferedReader is;  // reader for output of process
        String line;
        p = r.exec(command);
        System.out.println("In Main after exec");
         is = new BufferedReader(new InputStreamReader(p.getInputStream()));

        while ((line = is.readLine()) != null)
           try {
               System.out.println(line);
           }catch (Exception ex){
               System.out.println(ex);
           }
    }
}

并且这是被抛出的异常:

io.vertx.core.VertxException: Thread blocked
    at java.io.FileInputStream.readBytes(Native Method)
    at java.io.FileInputStream.read(FileInputStream.java:255)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.io.BufferedReader.readLine(BufferedReader.java:324)
    at java.io.BufferedReader.readLine(BufferedReader.java:389)
    at io.vertx.example.InputHandler.start(InputHandler.java:27)
    at io.vertx.core.AbstractVerticle.start(AbstractVerticle.java:106)
    at io.vertx.core.impl.DeploymentManager.lambda$doDeploy$8(DeploymentManager.java:483)
    at io.vertx.core.impl.DeploymentManager$$Lambda$8/884603232.handle(Unknown Source)
    at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339)
    at io.vertx.core.impl.ContextImpl$$Lambda$9/154173878.run(Unknown Source)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:745)
2个回答

4
你应该使用一个"工作 verticle"。如此处所述https://vertx.io/preview/docs/vertx-core/java/#blocking_code,它们可以处理阻塞代码。
例如,你应该以这种方式部署你的 verticle:
vertx.deployVerticle(YourVerticle.class.getName(), new DeploymentOptions().setWorker(true));

10x @Gregoire,但是我想要实现的是在进程p的流上设置一个处理程序,它将监听stdout,并且每次流产生消息时,我都会在处理程序中异步捕获它。 - yoav.str
1
完全不相关:我看到了你最近在分类队列上的一些评论,例如:https://stackoverflow.com/questions/52071887/how-to-integrate-start-track-in-php... 简而言之:只有在其他人可以编辑问题时才进行编辑分类。任何需要所有者改进内容的事情都不应该被归为“编辑”。我查看了你的几个其他分类,老实说:我认为大多数都是错误的。所以请考虑更加小心,并学习相关材料。 - GhostCat

1
所以,我发现了如何解决这个问题:

 public void start() throws Exception {
        Runtime r = Runtime.getRuntime();
        Process p = r.exec(command);
        AsyncInputStream asyncInputStream = new AsyncInputStream(vertx, vertx.nettyEventLoopGroup(), p.getInputStream());
        asyncInputStream.handler(buffer ->
                lineHandler(buffer.getString(0, buffer.length() - 1))
        );
    }

我需要深入了解如何使用 org.wisdom.framework.vertx.AsyncInputStream,因此您需要在pom XML中添加这个库:

<wisdomVertxEngin.version>0.10.0</wisdomVertxEngin.version>

<dependency>
  <groupId>org.wisdom-framework</groupId>
  <artifactId>wisdom-vertx-engine</artifactId>
  <version>${wisdomVertxEngin.version}</version>
</dependency>

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接