线程中断无法结束阻塞调用输入流读取。

10

我正在使用RXTX从串口读取数据。读取是在以下方式生成的线程中完成的:

CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port);
CommPort comm = portIdentifier.open("Whatever", 2000);
SerialPort serial = (SerialPort)comm;
...settings
Thread t = new Thread(new SerialReader(serial.getInputStream()));
t.start();

SerialReader类实现了Runnable接口,它只是无限循环地从端口读取数据,并将数据组装成有用的包,然后将其发送给其他应用程序。但是,我已将其简化为以下简单形式:

public void run() {
  ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader
  ByteBuffer buffer = ByteBuffer.allocate(100);
  while (true) {
    try {
      byteChan.read(buffer);
    } catch (Exception e) {
      System.out.println(e);
    }
  }
}

当用户点击停止按钮时,以下功能应该理论上触发,以关闭输入流并跳出阻塞的byteChan.read(buffer)调用。 代码如下:
public void stop() {
  t.interrupt();
  serial.close();
}

然而,当我运行这段代码时,即使输入流关闭,我从未遇到过ClosedByInterruptException异常。此外,调用serial.close()会阻塞执行,因为底层输入流仍在读取调用中阻塞。我尝试使用byteChan.close()替换中断调用,这应该会导致AsynchronousCloseException异常,但我得到了相同的结果。如果您能提供任何帮助并告诉我哪里有问题,我将不胜感激。
3个回答

9
通过简单的包装你无法将不支持可中断 I/O 的流转换为“InterruptibleChannel”(而且,“ReadableByteChannel” 不扩展“InterruptibleChannel”)。你必须查看底层“InputStream”的契约。关于其结果的可中断性,“SerialPort.getInputStream()” 说了什么?如果没有说明,应该假定它忽略中断。对于任何没有明确支持可中断性的 I/O,唯一的选择通常是从另一个线程关闭流。这可能会立即在阻塞在对流的调用上的线程中引发“IOException”(尽管它可能不是“AsynchronousCloseException”)。然而,即使如此,这也极大地依赖于“InputStream”的实现 - 而底层操作系统也可能是一个因素。请注意“newChannel()”返回的“ReadableByteChannelImpl”类上的源代码注释。
  private static class ReadableByteChannelImpl
    extends AbstractInterruptibleChannel       // Not really interruptible
    implements ReadableByteChannel
  {
    InputStream in;
    ⋮

在我的例子中,Channels.newChannel(<输入流>) 返回一个 ReadableByteChannelImpl 类型的对象,该对象实现了 ReadableByteChannel 接口(但更重要的是扩展了 AbstractInterruptibleChannel 实现了 InterruptibleChannel 接口)。 - JDS
1
糟糕...按下回车键提交评论。无论如何,对byteChan进行instanceof检查,与InterruptibleChannel相比返回true。另外,由于不太清楚,调用stop()的操作是在生成读取循环线程的线程中执行的。 - JDS
@JDS - ...但是,它还没有工作,对吧?请看我的更新。该通道不可中断,您可能没有合适的选项来退出RXTX读取。 - erickson

5

RXTX SerialInputStream(由serial.getInputStream()调用返回的内容)支持超时方案,这种方案最终解决了我所有的问题。在创建新的SerialReader对象之前添加以下内容,可以使读取不再无限期地阻塞:

serial.enableReceiveTimeout(1000);

在SerialReader对象中,我需要做一些更改以直接从InputStream读取而不是创建ReadableByteChannel,但现在,我可以轻松停止和重新启动读取器。


1
我正在使用下面的代码关闭RXTX。我运行启动和关闭测试,似乎工作正常。我的读取器看起来像这样:
private void addPartsToQueue(final InputStream inputStream) {
    byte[] buffer = new byte[1024];
    int len = -1;
    boolean first = true;
    // the read can throw
    try {
        while ((len = inputStream.read(buffer)) > -1) {
            if (len > 0) {
                if (first) {
                    first = false;
                    t0 = System.currentTimeMillis();
                } else
                    t1 = System.currentTimeMillis();
                final String part = new String(new String(buffer, 0, len));
                queue.add(part);
                //System.out.println(part + " " + (t1 - t0));
            }
            try {
                Thread.sleep(sleep);
            } catch (InterruptedException e) {
                //System.out.println(Thread.currentThread().getName() + " interrupted " + e);
                break;
            }
        }
    } catch (IOException e) {
        System.err.println(Thread.currentThread().getName() + " " + e);
        //if(interruSystem.err.println(e);
        e.printStackTrace();
    }
    //System.out.println(Thread.currentThread().getName() + " is ending.");
}

感谢

public void shutdown(final Device device) {
    shutdown(serialReaderThread);
    shutdown(messageAssemblerThread);
    serialPort.close();
    if (device != null)
        device.setSerialPort(null);
}

public static void shutdown(final Thread thread) {
    if (thread != null) {
        //System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        thread.interrupt();
        //System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e);
        }
        //System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            thread.join();
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " join interruped");
        }
        //System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState());
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接