我希望能够向已创建的命名管道写入数据,而不会因为读取方没有响应而阻塞。我的读取方是另一个应用程序,可能会意外关闭。如果读取方关闭了,我希望写入方能够继续向该命名管道写入数据。在Java中可以使用类似下面的代码实现:
fopen(fPath, O_NONBLOCK)
这样当读者再次访问时,它可以从上一次失败的地方继续阅读。
我希望能够向已创建的命名管道写入数据,而不会因为读取方没有响应而阻塞。我的读取方是另一个应用程序,可能会意外关闭。如果读取方关闭了,我希望写入方能够继续向该命名管道写入数据。在Java中可以使用类似下面的代码实现:
fopen(fPath, O_NONBLOCK)
这样当读者再次访问时,它可以从上一次失败的地方继续阅读。
首先,我会回答你的问题。接下来,我将展示一个使用阻塞IO解决你的问题的代码片段。
我想向已创建的命名管道写入数据,但不希望在读取器上发生阻塞。
你不需要非阻塞IO来解决你的问题。我认为它甚至不能帮助你解决问题。阻塞IO也可以很好地运行(可能甚至比非阻塞IO更好,因为并发性较低)。另外,阻塞IO更容易编程。你的读取器可以/应该保持阻塞。
我的读取器是另一个应用程序,它可能会崩溃。如果读取器确实崩溃了,我希望编写器应用程序继续向命名管道写入数据。这样当读取器重新启动时,它可以从失败的位置恢复。
只需将消息放入阻塞队列中。然后只有在读取器正在从命名管道中读取数据时(由于阻塞IO的缘故自动发生),才向命名管道仅写入数据。当您使用阻塞队列时,无需使用非阻塞文件IO。数据是异步从阻塞队列传递到读取器,这将把您的数据从编写器发送到读取器。
类似于Java中的fopen(fPath,O_NONBLOCK)
你不需要在读取器上使用非阻塞IO,即使你使用了它。只需使用阻塞IO即可。
我创建了一个小片段,我相信它演示了您的需求。
组件:
Writer.java
import java.io.BufferedWriter;
import java.io.Console;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Writer {
private final BlockingDeque<StringBuffer> queue;
private final String filename;
public static void main(String[] args) throws Exception {
final Console console = System.console();
final Writer writer = new Writer("pipe");
writer.init();
while(true) {
String readLine = console.readLine();
writer.write(new StringBuffer(readLine));
}
}
public Writer(final String filename){
this.queue = new LinkedBlockingDeque<StringBuffer>();
this.filename = filename;
}
public void write(StringBuffer buf) {
queue.add(buf);
}
public void init() {
ExecutorService single = Executors.newSingleThreadExecutor();
Runnable runnable = new Runnable() {
public void run() {
while(true) {
PrintWriter w = null;
try {
String toString = queue.take().toString();
w = new PrintWriter(new BufferedWriter(new FileWriter(filename)), true);
w.println(toString);
} catch (Exception ex) {
Logger.getLogger(Writer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
};
single.submit(runnable);
}
}
Reader.java
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Reader {
private final BufferedReader br;
public Reader(final String filename) throws FileNotFoundException {
br = new BufferedReader(new FileReader(filename));
}
public String readLine() throws IOException {
return br.readLine();
}
public void close() {
try {
br.close();
} catch (IOException ex) {
Logger.getLogger(Reader.class.getName()).log(Level.SEVERE, null, ex);
}
}
public static void main(String[] args) throws FileNotFoundException {
Reader reader = new Reader("pipe");
while(true) {
try {
String readLine = reader.readLine();
System.out.println("readLine = " + readLine);
} catch (IOException ex) {
reader.close();
break;
}
}
}
}
您应该能够像对待其他文件一样,在UNIX FIFO上使用NIO的异步write
:
AsynchronousFileChannel channel = AsynchronousFileChannel.open(...);
Future<Integer> writeFuture = channel.write(...);
...或者...
channel.write(..., myCompletionHandler);
然而,我不清楚当FIFO无法接受写入时您希望发生什么。您想要它缓冲吗?如果是,您需要在Java程序中提供它。您希望它超时吗?在Java文件写入操作中没有简单的超时选项。
这些并非不能克服的问题。如果您下定决心,您可能可以使某些东西正常工作。但我想知道,如果您只使用TCP套接字或JMS队列,您是否不会觉得生活更容易。
AsynchronousFileChannel
的主要问题是write()
操作(以及read()
)需要一个写位置,然后在写入之前使用seek()
对文件描述符进行定位 - 这对于FIFO来说是不可行的。 - Guss