Java:用于读写行的异步I/O通道

6
我有一个应用程序,它使用 BufferedReaderPrintStream 包装 java.net.Socket 对象的 InputStreamOutputStream 进行同步读写文本行。因此,我可以使用 BufferedReader.readLine()PrintStream.println() 方法,让 Java 库将输入拆分为行并为我格式化输出。

现在我想用异步 IO 替换这个同步 IO。所以我一直在研究AsynchronousSocketChannel,该通道允许异步读写字节。现在,我想要包装类,以便我可以使用字符串异步读/写行。

我在 Java 库中找不到这样的包装类。在编写自己的实现之前,我想问是否有其他库允许包装 AsynchronousSocketChannel 并提供异步文本 IO。


1
如果你没有从套接字中读取数据,那么谁会接收到完整的那一行呢?@giorgio-b - Andrew Henle
@Jägermeister:当有数据时,它会读取一些内容。只要没有从连接的另一端发送数据,就不会读取任何数据。 - giorgio-b
1
@EJP:我的问题是一个普遍的问题。如果我有同步字节IO和各种同步文本IO的包装器,我希望在异步IO上也有相同的功能。 - giorgio-b
@EJP:实际上我在文档中没有找到类似的内容。这就是为什么我想问一下是否有其他Java库提供类似功能的原因。 - giorgio-b
2
这就是我说的。它不在那里。关于离线资源的问题在这里不属于讨论范围。 - user207421
显示剩余9条评论
1个回答

1
你可以像这样做:

你可以这样做

public void nioAsyncParse(AsynchronousSocketChannel channel, final int bufferSize) throws IOException, ParseException, InterruptedException {
    ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
    BufferConsumer consumer = new BufferConsumer(byteBuffer, bufferSize);
    channel.read(consumer.buffer(), 0l, channel, consumer);
}


class BufferConsumer implements CompletionHandler<Integer, AsynchronousSocketChannel> {

        private ByteBuffer bytes;
        private StringBuffer chars;
        private int limit;
        private long position;
        private DateFormat frmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        public BufferConsumer(ByteBuffer byteBuffer, int bufferSize) {
            bytes = byteBuffer;
            chars = new StringBuffer(bufferSize);
            frmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            limit = bufferSize;
            position = 0l;
        }

        public ByteBuffer buffer() {
            return bytes;
        }

        @Override
        public synchronized void completed(Integer result, AsynchronousSocketChannel channel) {

            if (result!=-1) {
                bytes.flip();
                final int len = bytes.limit();
                int i = 0;
                try {
                    for (i = 0; i < len; i++) {
                        byte by = bytes.get();
                        if (by=='\n') {
                            // ***
                            // The code used to process the line goes here
                            // ***
                            chars.setLength(0);
                        }
                        else {
                            chars.append((char) by);
                        }
                    }
                }
                catch (Exception x) {
                    System.out.println("Caught exception " + x.getClass().getName() + " " + x.getMessage() + " i=" + String.valueOf(i) + ", limit=" + String.valueOf(len) + ", position="+String.valueOf(position));
                }

                if (len==limit) {
                    bytes.clear();
                    position += len;
                    channel.read(bytes, position, channel, this);
                }
                else {
                    try {
                        channel.close();
                    }
                    catch (IOException e) { }
                    bytes.clear();
                    buffers.add(bytes);
                }
            }
            else {
                try {
                    channel.close();
                }
                catch (IOException e) { }
                bytes.clear();
                buffers.add(bytes);
            }
        }

        @Override
        public void failed(Throwable e, AsynchronousSocketChannel channel) {
        }
};

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接