作为Java 8流的一部分,读取文件列表

4

我有一个(可能很长)的二进制文件列表,我希望能够惰性地读取它们。由于文件太多而无法加载到内存中,因此我目前将它们作为MappedByteBuffer使用FileChannel.map()进行了读取,但这可能并不是必需的。我希望readBinaryFiles(...)方法返回一个Java 8 Stream,以便在访问它们时可以惰性加载文件列表。

    public List<FileDataMetaData> readBinaryFiles(
    List<File> files, 
    int numDataPoints, 
    int dataPacketSize )
    throws
    IOException {

    List<FileDataMetaData> fmdList = new ArrayList<FileDataMetaData>();

    IOException lastException = null;
    for (File f: files) {

        try {
            FileDataMetaData fmd = readRawFile(f, numDataPoints, dataPacketSize);
            fmdList.add(fmd);
        } catch (IOException e) {
            logger.error("", e);
            lastException = e;
        }
    }

    if (null != lastException)
        throw lastException;

    return fmdList;
}


//  The List<DataPacket> returned will be in the same order as in the file.
public FileDataMetaData readRawFile(File file, int numDataPoints, int dataPacketSize) throws IOException {

    FileDataMetaData fmd;
    FileChannel fileChannel = null;
    try {
        fileChannel = new RandomAccessFile(file, "r").getChannel();
        long fileSz = fileChannel.size();
        ByteBuffer bbRead = ByteBuffer.allocate((int) fileSz);
        MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileSz);

        buffer.get(bbRead.array());
        List<DataPacket> dataPacketList = new ArrayList<DataPacket>();

        while (bbRead.hasRemaining()) {

            int channelId = bbRead.getInt();
            long timestamp = bbRead.getLong();
            int[] data = new int[numDataPoints];
            for (int i=0; i<numDataPoints; i++) 
                data[i] = bbRead.getInt();

            DataPacket dp = new DataPacket(channelId, timestamp, data);
            dataPacketList.add(dp);
        }

        fmd = new FileDataMetaData(file.getCanonicalPath(), fileSz, dataPacketList);

    } catch (IOException e) {
        logger.error("", e);
        throw e;
    } finally {
        if (null != fileChannel) {
            try {
                fileChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    return fmd;
}

readBinaryFiles(...)返回fmdList.Stream()并不能实现这一点,因为文件内容已经被读入内存,我将无法做到惰性读取。

其他读取多个文件内容作为Stream的方法都依赖于使用Files.lines(),但我需要读取二进制文件。

如果Scala或golang对此用例有更好的支持,我也可以考虑使用它们来实现。

我希望能够获得有关如何惰性读取多个二进制文件内容的任何指针。


你想将文件作为一个长的、连续的字节流来读取吗? - biziclop
3
如果您从readRawFile方法的签名中删除“throws IOException”,则return files.stream().map(f -> readRawFile(f, numDataPoints, dataPacketSize))就足够了。您可以让该方法在内部捕获IOException并将其包装在UncheckedIOException中。(延迟执行的问题在于异常也需要被延迟处理。) - VGR
1
当下一步操作是将整个缓冲区复制到堆字节缓冲区中时,使用映射的字节缓冲区就没有意义了。这正是read所做的。 - Holger
@VGR - 如果您把您的评论变成一个答案,我会接受它。 - Dean Schulze
4个回答

1

在读取整个文件以构建FileDataMetaData实例时,无法进行任何懒惰的阅读,因为你正在读取文件内部。如果要构建FileDataMetaData实例而不必读取整个文件,则需要对该类进行大量重构。

但是,在该代码中有几件事情需要清理,甚至特定于Java 7而不是Java 8,即您不再需要使用RandomAccessFile绕道打开通道,并且有try-with-resources以确保正确关闭。此外,请注意您使用内存映射没有意义。将整个内容复制到堆ByteBuffer后,没有任何懒惰可言。与在通道上使用堆ByteBuffer调用read时发生的情况完全相同,只是JRE可以在read情况下重用缓冲区。

为了让系统管理页面,您需要从映射的字节缓冲区中读取。根据系统,这可能仍然不如重复读取小块到堆字节缓冲区。
public FileDataMetaData readRawFile(
    File file, int numDataPoints, int dataPacketSize) throws IOException {

    try(FileChannel fileChannel=FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
        long fileSz = fileChannel.size();
        MappedByteBuffer bbRead=fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileSz);
        List<DataPacket> dataPacketList = new ArrayList<>();
        while(bbRead.hasRemaining()) {
            int channelId = bbRead.getInt();
            long timestamp = bbRead.getLong();
            int[] data = new int[numDataPoints];
            for (int i=0; i<numDataPoints; i++) 
                data[i] = bbRead.getInt();
            dataPacketList.add(new DataPacket(channelId, timestamp, data));
        }
        return new FileDataMetaData(file.getCanonicalPath(), fileSz, dataPacketList);
    } catch (IOException e) {
        logger.error("", e);
        throw e;
    }
}

根据这种方法构建流很简单,只需处理已检查的异常即可:
public Stream<FileDataMetaData> readBinaryFiles(
    List<File> files, int numDataPoints, int dataPacketSize) throws IOException {
    return files.stream().map(f -> {
        try {
            return readRawFile(f, numDataPoints, dataPacketSize);
        } catch (IOException e) {
            logger.error("", e);
            throw new UncheckedIOException(e);
        }
    });
}

我不需要对单个文件的内容进行惰性计算。读取整个文件的内容(通常<10 MBytes)是可以的。我需要惰性计算来避免一次性加载所有文件。我已经编辑了我的问题陈述以澄清这一点。你修改的readBinaryFiles就是我需要的,类似于VGR的解决方案。使用MappedByteBuffer读取文件只是快速而肮脏地重用/修改编写文件代码的方法。 - Dean Schulze

0
我不知道这个方法的性能如何,但你可以使用包装在内部的。这将有效地连接你的文件。如果你从每个文件创建一个,那么整个过程应该得到适当的缓冲。

0

这应该足够了:

return files.stream().map(f -> readRawFile(f, numDataPoints, dataPacketSize));

如果你愿意从readRawFile方法的签名中删除throws IOException,那么你可以这样做。你可以让该方法在内部捕获IOException并将其包装在UncheckedIOException中。(延迟执行的问题在于异常也需要被延迟处理。)


0

VGR的评论的基础上,我认为他的基本解决方案是:

return files.stream().map(f -> readRawFile(f, numDataPoints, dataPacketSize))

是正确的,因为它将惰性处理文件(并且如果从map()操作结果调用短路终端操作,则会停止执行)。我还建议在readRawFile的实现中略微不同,利用资源尝试和InputStream,这将不会将整个文件加载到内存中:

public FileDataMetaData readRawFile(File file, int numDataPoints, int dataPacketSize)
  throws DataPacketReadException { // <- Custom unchecked exception, nested for class

  FileDataMetadata results = null;

  try (FileInputStream fileInput = new FileInputStream(file)) {
    String filePath = file.getCanonicalPath();
    long fileSize = fileInput.getChannel().size()

    DataInputStream dataInput = new DataInputStream(new BufferedInputStream(fileInput);

    results = new FileDataMetadata(
      filePath, 
      fileSize,
      dataPacketsFrom(dataInput, numDataPoints, dataPacketSize, filePath);
  }

  return results;
}

private List<DataPacket> dataPacketsFrom(DataInputStream dataInput, int numDataPoints, int dataPacketSize, String filePath)
    throws DataPacketReadException { 

  List<DataPacket> packets = new 
  while (dataInput.available() > 0) {
    try {
      // Logic to assemble DataPacket
    }
    catch (EOFException e) {
      throw new DataPacketReadException("Unexpected EOF on file: " + filePath, e);
    }
    catch (IOException e) {
      throw new DataPacketReadException("Unexpected I/O exception on file: " + filePath, e);
    }
  }

  return packets;
}

这样可以减少代码量,并确保在出现错误时关闭文件。


请注意,原始代码读取的数据项不是DataPacket实例列表的一部分。考虑到这一点,将该方法拆分为两个并没有什么好处。此外,您认为需要单独处理EOFException吗? - Holger
原始代码仅从文件中读取进入“DataPacket”实例(channelId、时间戳和数据点)的信息。进入“FileDataMetaData”的其他信息要么传递到“readRawFile”方法中,要么从“File”或“FileInputStream”派生。 - J. Gregory Wright
至于异常的区分,那是一种风格问题。我更喜欢在堆栈跟踪的第一项消息中看到区别(DataPacketReadException),而不是有一个通用的消息需要我深入堆栈跟踪来查看顶层异常的原因。EOFException 表示存在数据问题,而 IOException 表示读取文件时出现问题。但如果你喜欢,你完全可以使用多个 catch 块。 - J. Gregory Wright
你说得对,其他信息并没有从流中读取,是我的错。不过,在readRawFile中仍然有可能出现会产生IOException的操作,因此在子程序dataPacketsFrom中处理IOException是不够的... - Holger

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接