如何使用AsynchronousFileChannel读取大文件?

5
  Path file = Paths.get("c:/large.log");
  AsynchronousFileChannel channel = AsynchronousFileChannel.open(file);
  final ByteBuffer buffer = ByteBuffer.allocate(1000);
  channel.read(buffer, 0, buffer,
      new CompletionHandler<Integer, ByteBuffer>() {
        public void completed(Integer result, ByteBuffer attachment) {
          System.out.println(new String(buffer.array()));
        }
  }); 

以这种方式,我可以读取 large.log 的前 1000 字节。如果我不想分配更大的字节数组(如 ByteBuffer.allocate(1000*1000)),该怎么读取后续日志呢?因为我认为这会导致 OutOfMemory。
能否有人给我提供样例代码呢?谢谢。
补充:我可以使用 JIO 循环读取大文件,因为我可以检查 java.io.BufferedReader.read() 的返回值。但我不知道如何使用 NIO2。
4个回答

6

这是一个可行的 hack。

需要注意以下几点:

  1. 我刚刚使用了你的 buffer.array() 作为输出。我必须使用 buffer.clear() 来重置位置,以便异步读取能够看到有1000个备用字节,但这并不会清除数组中现有的数据。结果当你在文件末尾时,如果读取的字节数少于1000个字节,则会打印整个缓冲区:刚刚读取的所有内容以及剩余的1000个字节。在实际应用中,您需要采取一些措施(如使用result或缓冲区的位置)来解决这个问题。
  2. 由于我无法弄清楚原因,类变量 buffercompleted 方法内部可以正常工作,但同样是类变量的 channel 是 null。我还没有找出为什么。所以我修改了它,使其将 channel 作为 attachment 传递而不是 buffer。对我来说还是毫无意义。
  3. 异步读取线程不足以保持 jvm 运行。所以我只是在 main 方法的末尾放置了一个 read。按下 Enter 退出。
  4. 类变量 pos 维护您从中读取文件的位置。
  5. 当你在 complete 方法中发起另一个异步读取时,魔法就会发生。这就是我放弃匿名类并实现接口本身的原因。
  6. 您需要将路径改回您自己的路径。

玩得开心。

import java.nio.*;
import java.nio.channels.*;
import java.nio.file.*;
import java.io.IOException;

public class TryNio implements CompletionHandler<Integer, AsynchronousFileChannel> {

       // need to keep track of the next position.
        int pos = 0;
        AsynchronousFileChannel channel =  null;
        ByteBuffer buffer = null;

        public void completed(Integer result, AsynchronousFileChannel attachment) {
                 // if result is -1 means nothing was read.
                if (result != -1) {
                        pos += result;  // don't read the same text again.
                                        // your output command.
                        System.out.println(new String(buffer.array()));

                        buffer.clear();  // reset the buffer so you can read more.
                }
                        // initiate another asynchronous read, with this.
                attachment.read(buffer, pos , attachment, this );


        }
        public void failed(Throwable exc,
                        AsynchronousFileChannel attachment) {
                System.err.println ("Error!");
                exc.printStackTrace();
        }

        public void doit() {
                Path file = Paths.get("/var/log/syslog");
                AsynchronousFileChannel channel =  null;
                try {
                        channel = AsynchronousFileChannel.open(file);
                } catch (IOException e) {
                        System.err.println ("Could not open file: " + file.toString());
                        System.exit(1); // yeah.  heh.
                }
                buffer = ByteBuffer.allocate(1000);

                 // start off the asynch read. 
                channel.read(buffer, pos , channel, this );
                // this method now exits, thread returns to main and waits for user input.
        }

        public static void main (String [] args) {
                TryNio tn = new TryNio();
                tn.doit();
             // wait fur user to press a key otherwise java exits because the 
             // asynch thread isn't important enough to keep it running.
                try { System.in.read(); } catch (IOException e) { }
        }
}

我已经执行了这个示例代码。但它打印的内容比实际日志文件中的内容多。 - liam xu
2
System.out.print(new String(buffer.array(),0,result));否则会打印出无用的数据。 - liam xu
没错,而且请注意这个问题在原始代码中也存在。 - user207421
想指出上述读取文件的方法在我看来非常低效。它会递归地生成新线程来处理每个异步调用。如果你想异步读取一个大文件(这不是一个坏主意),那么最好只生成一个线程来完成任务,即使用普通的Runnable和join()等。也许有一种更新、更好的方法来实现它,但上面的代码对我来说并不是一个好方法。 - Hawkeye Parker
再思考一下,我可以想象出需要同时生成多个线程来读取单个文件的情况。但是,上面的代码同步地生成异步读取线程,等待每个线程完成后才生成新的线程。再次强调,这似乎没有意义。像这样的代码对我来说更有意义。 - Hawkeye Parker
1
通道必须为空的原因是,完成处理程序在不同的线程中执行,并且根据Java内存模型,除非您显式将其标记为volatile/通过锁/VarHandles保护,否则其他线程无法看到最新状态。只是为了测试,请将类级字段标记为volatile并查看它们是否仍为空(它们不应为空)。请注意,volatile是过于粗糙的工具。如果性能是标准,您会想要使用VarHandles。 - Ashok Koyi

1

GregHNZ的解决方案非常好,由于我在不同的项目中需要多次使用这种代码,我最终将其放入了一个辅助库RxIo中,并将其发布到了Maven Central Repository,并且也可在RxIo github仓库中获取。使用RxIo,您可以使用RxIo实用类来读取文件的所有字节,如下所示:

AsyncFiles
    .readAllBytes(Paths.get("input.txt"))
    .thenApply(bytes -> { /*... use bytes... */});

readAllBytes(Path file)方法会分配一个默认大小为262144的ByteBuffer,但你可以使用readAllBytes(Path file, int bufferSize)来指定不同的值。

你可以在单元测试文件夹中查看其他用例。


0
如果文件中还有剩余内容,可以在completionHandler中启动另一个读取操作。但是我建议使用比1000更大的缓冲区,至少8192。

可以运行。当然,您必须清除缓冲区,并在读取中增加位置参数,因此需要处理一些最终变量,但它是可以完成的。 - user207421

0

利用文件中的位置和文件大小,异步读操作需要在完成处理程序中反复调用以读取整个文件。每次读取操作完成后,需要使用读取的字节数增加位置。

以下是完成的异步读取整个文件的完成处理程序方法。有关完整示例,请参见 http://www.zoftino.com/java-asynchronous-io-nio2

public void completed(Integer result, ByteBuffer attachment) {
    try {
        bb.flip();
        System.out.println("bytea red "+bb.limit());

        if(afc.size() > position) {
            position = position + bb.limit();
            bb.clear();
            //pass the same completion handler
            afc.read(bb, position, bb, this);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }       
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接