Java中的非阻塞文件IO

9

我希望能够向已创建的命名管道写入数据,而不会因为读取方没有响应而阻塞。我的读取方是另一个应用程序,可能会意外关闭。如果读取方关闭了,我希望写入方能够继续向该命名管道写入数据。在Java中可以使用类似下面的代码实现:

fopen(fPath, O_NONBLOCK)

这样当读者再次访问时,它可以从上一次失败的地方继续阅读。


1
我不确定你是否可以使用管道完成这个操作。我相信如果读取器或写入器被关闭,那么该管道就会失效。这是来自PipedOutputStream的Javadoc的引用:“如果正在从连接的管道输入流读取数据字节的线程不再存活,则称管道已损坏”。我认为无法修复损坏的管道。 - Jim Tough
2
@Jim Tough 他在谈论一个命名管道(http://www.linuxjournal.com/article/2156),而不是Java SDK中的PipedOutputStream。 - Alfred
1
不是您可以接受的答案? - Alfred
你是真的指的UNIX命名管道吗?就像使用mkfifo创建的那种? - slim
4个回答

8

首先,我会回答你的问题。接下来,我将展示一个使用阻塞IO解决你的问题的代码片段。

你的问题

我想向已创建的命名管道写入数据,但不希望在读取器上发生阻塞。

你不需要非阻塞IO来解决你的问题。我认为它甚至不能帮助你解决问题。阻塞IO也可以很好地运行(可能甚至比非阻塞IO更好,因为并发性较低)。另外,阻塞IO更容易编程。你的读取器可以/应该保持阻塞。

我的读取器是另一个应用程序,它可能会崩溃。如果读取器确实崩溃了,我希望编写器应用程序继续向命名管道写入数据。这样当读取器重新启动时,它可以从失败的位置恢复。

只需将消息放入阻塞队列中。然后只有在读取器正在从命名管道中读取数据时(由于阻塞IO的缘故自动发生),才向命名管道写入数据。当您使用阻塞队列时,无需使用非阻塞文件IO。数据是异步从阻塞队列传递到读取器,这将把您的数据从编写器发送到读取器。

类似于Java中的fopen(fPath,O_NONBLOCK)

你不需要在读取器上使用非阻塞IO,即使你使用了它。只需使用阻塞IO即可。

代码片段

我创建了一个小片段,我相信它演示了您的需求。

组件:

  • Writer.java:作为示例从控制台读取行。当你启动程序时,输入文本后跟回车键,它会将其发送到你的命名管道。如果必要,编写器将恢复写入。
  • Reader.java:从你的命名管道(Writer.java)读取行。
  • 命名管道:我假设你已经在同一目录下创建了一个名为“pipe”的管道。

Writer.java

import java.io.BufferedWriter;
import java.io.Console;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Writer {
    private final BlockingDeque<StringBuffer> queue;
    private final String filename;

    public static void main(String[] args) throws Exception {
        final Console console = System.console();
        final Writer writer = new Writer("pipe");

        writer.init();

        while(true) {
            String readLine = console.readLine();
            writer.write(new StringBuffer(readLine));
        }
    }

    public Writer(final String filename){
        this.queue = new LinkedBlockingDeque<StringBuffer>();
        this.filename = filename;
    }

    public void write(StringBuffer buf) {
        queue.add(buf);
    }

    public void init() {
        ExecutorService single = Executors.newSingleThreadExecutor();

        Runnable runnable = new Runnable() {
            public void run() {
                while(true) {
                    PrintWriter w = null;
                    try {
                        String toString = queue.take().toString();
                        w = new PrintWriter(new BufferedWriter(new FileWriter(filename)), true);
                        w.println(toString);
                    } catch (Exception ex) {
                        Logger.getLogger(Writer.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            }
        };

        single.submit(runnable);
    }
}

Reader.java

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Reader {
    private final BufferedReader br;

    public Reader(final String filename) throws FileNotFoundException {
        br = new BufferedReader(new FileReader(filename));
    }

    public String readLine() throws IOException {
        return br.readLine();
    }

    public void close() {
        try {
            br.close();
        } catch (IOException ex) {
            Logger.getLogger(Reader.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public static void main(String[] args) throws FileNotFoundException {
        Reader reader = new Reader("pipe");
        while(true) {
            try {
                String readLine = reader.readLine();
                System.out.println("readLine = " + readLine);
            } catch (IOException ex) {
                reader.close();
                break;
            }
        }
    }
}

2
如果您希望管道保持活动并排队消息,那么您可能需要一个消息系统而不是原始管道。在Java中,标准API称为“Java Messaging System”(JMS),有许多标准实现--其中我见过最常见的是Apache ActiveMQ。如果您想要一个跨平台、类似于套接字的接口,具有缓冲和恢复功能,我建议使用0MQ,虽然它不是“纯Java”,但它有许多语言的绑定和出色的性能。

2
如果Java中有非阻塞文件I/O,那就好了,但实际上并没有。对于一个未被读取的命名管道的写入操作将返回零并且不会写入任何内容。因此,非阻塞并不是解决方案的一部分。
此外,命名管道具有有限的缓冲区大小。无论是否存在读取进程,它们都不是无限队列。我同意建议研究JMS。

有非阻塞文件IO。搜索NIO(http://en.wikipedia.org/wiki/New_I/O)。 - Alfred
4
NIO和Java方面的权威参考资料不是维基百科,而是Javadoc,不过这并没有在Javadoc中明确说明。同样地,维基百科也没有提到这一点。另外,NIO是“New I/O”的缩写,尽管维基百科上没有这样写。 - user207421
2
我知道这是一个旧的问答,但为了正确起见,值得指出Java 7有AsynchronousFileChannel - nilskp
2
@nilskp 为了准确起见,值得指出的是异步 I/O 不同于非阻塞 I/O,并且对一个未被读取的管道进行异步写入将永远不会终止,这也使其不成为解决方案的一部分。 - user207421

-1

您应该能够像对待其他文件一样,在UNIX FIFO上使用NIO的异步write

 AsynchronousFileChannel channel = AsynchronousFileChannel.open(...);
 Future<Integer> writeFuture = channel.write(...);

...或者...

 channel.write(..., myCompletionHandler);

然而,我不清楚当FIFO无法接受写入时您希望发生什么。您想要它缓冲吗?如果是,您需要在Java程序中提供它。您希望它超时吗?在Java文件写入操作中没有简单的超时选项。

这些并非不能克服的问题。如果您下定决心,您可能可以使某些东西正常工作。但我想知道,如果您只使用TCP套接字或JMS队列,您是否不会觉得生活更容易。


2
如果您查看AsynchronousFileChannel的javadoc,它会说明:"AsynchronousFileChannel与线程池相关联..."。因此,这个类并没有真正提供非阻塞API,只是异步的... - Jaime Casero
在UNIX FIFO上使用AsynchronousFileChannel的主要问题是write()操作(以及read())需要一个写位置,然后在写入之前使用seek()对文件描述符进行定位 - 这对于FIFO来说是不可行的。 - Guss

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接