缓存随机访问文件 Java

23
RandomAccessFile对于文件的随机访问非常慢。通常会读到关于在其上实现缓冲层的内容,但是在线上找到这样的代码并不容易。
因此我的问题是:你们是否知道任何开源实现此类的指针或者可以分享自己的实现?
如果这个问题可以成为一个有用的链接和代码的集合,那就太好了。我相信这个问题肯定被很多人共享,但还没有得到SUN公司的妥善解决。
请不要提及MemoryMapping,因为文件可能比Integer.MAX_VALUE大得多。

1
让我看看是否理解正确,你的意思是 java.nio.MemoryByteBuffer 不够好,因为它只能容纳 Integer.MAX_VALUE 字节。是这样吗? - Edwin Dalorzo
那是大约2个千兆字节的缓冲区内存。你的文件有多大,你有多少可用内存? - Edwin Dalorzo
1
你想要缓冲什么/如何缓冲?通常情况下,你会缓冲一个流,但如果你想访问多个Gig文件中的任意点,你到底想要存储哪些数据?我猜答案会给你解决方案(例如,“我总是想预加载随机点之后的1K数据”)。 - Will Iverson
@edalorzo:是的,这就是问题。我的文件有数十个GB。 - marcorossi
@Will:是的,那是最典型的想法。一种类似预先读取的行为。我的记录由标头和一些有效负载组成。因此,我可以读取int、long和short类型的字段来组成我的标头,而其中一些字段包含接下来的有效负载块的大小。所以这是许多read*()和一些read(byte[])s。主要是标头+有效载荷的情况。我心中所想的实现方式与添加BufferInputStream类型的行为并没有太大不同。 - marcorossi
6个回答

15

您可以使用以下代码从 RandomAccessFile 创建 BufferedInputStream:

 RandomAccessFile raf = ...
 FileInputStream fis = new FileInputStream(raf.getFD());
 BufferedInputStream bis = new BufferedInputStream(fis);

需要注意的一些事项:

  1. 关闭FileInputStream将会同时关闭RandomAccessFile,反之亦然。
  2. RandomAccessFile和FileInputStream指向相同的位置,因此从FileInputStream读取数据将会改变RandomAccessFile的文件指针,反之亦然。

你可能想要使用的方式类似于:

RandomAccessFile raf = ...
FileInputStream fis = new FileInputStream(raf.getFD());
BufferedInputStream bis = new BufferedInputStream(fis);

//do some reads with buffer
bis.read(...);
bis.read(...);

//seek to a a different section of the file, so discard the previous buffer
raf.seek(...);
bis = new BufferedInputStream(fis);
bis.read(...);
bis.read(...);

8
我采用了类似的方法,使用getFD方法。但是我没有构建一个BufferedInputStream,而是先构建了一个FileReader,然后再构建了一个BufferedReader。这样就可以访问比RandomAccessFile提供的更快(也可能更UTF友好?)的readLine方法。 - Jeff Terrell Ph.D.
2
@JeffTerrellPh.D. 我尝试使用 BufferedReader,并注意到 RandomAccessFile.getFilePointer 方法在多次调用 BufferedReader.readLine() 方法后返回相同的位置。这可能是因为 BufferedReader 可能会在单个 readLine() 调用中在内部将文件指针向前推进很远。 - Shailesh Pratapwar

13

即使文件大小超过Integer.MAX_VALUE,我认为没有理由不使用java.nio.MappedByteBuffer。

显然你不能为整个文件定义一个单一的MappedByteBuffer。但是你可以有几个MappedByteBuffers访问不同的文件区域。

在FileChannel.map中,位置和大小的定义类型为long,这意味着您可以提供超过Integer.MAX_VALUE的值,唯一需要注意的是缓冲区的大小不能超过Integer.MAX_VALUE。

因此,您可以像这样定义多个映射:

buffer[0] = fileChannel.map(FileChannel.MapMode.READ_WRITE,0,2147483647L);
buffer[1] = fileChannel.map(FileChannel.MapMode.READ_WRITE,2147483647L, Integer.MAX_VALUE);
buffer[2] = fileChannel.map(FileChannel.MapMode.READ_WRITE, 4294967294L, Integer.MAX_VALUE);
...

简而言之,大小不能超过Integer.MAX_VALUE,但起始位置可以位于文件的任何位置。

在书籍《Java NIO》中,作者Ron Hitchens指出:

通过内存映射机制访问文件,即使使用通道,也比传统方法读取或写入数据要高效得多。不需要进行显式的系统调用,这可能耗费时间。更重要的是,操作系统的虚拟内存系统会自动缓存内存页面。这些页面将使用系统内存缓存,并不会占用JVM内存堆的空间。

一旦内存页面变为有效状态(从磁盘加载),就可以再次以全硬件速度访问它,无需发出另一个系统调用来获取数据。包含索引或其他经常被引用或更新的节的大型结构化文件可以从内存映射中受益。当与文件锁定结合使用以保护关键部分和控制事务原子性时,您开始看到如何将内存映射缓冲区好好利用。

我真的怀疑您是否能找到比这更好的第三方API。也许您可以找到基于此架构编写的API来简化工作。

您是否觉得这种方法适合您?


1
好的方法,但是你应该有重叠的缓冲区,这样你就可以读取在2G边界上的记录。 - Anon
1
这是一个可能的解决方案,本来要在另一个问题中询问。一种有效的方法是为大文件包装多个MappedByteBuffer。在这里,我更倾向于使用缓冲区方法,类似于https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java或http://minddumped.blogspot.com/2009/01/buffered-javaiorandomaccessfile.html。 - marcorossi
http://minddumped.blogspot.com/2009/01/buffered-javaiorandomaccessfile.html 很好!感谢marcorossi。 - le Mandarin
当使用那种形式的映射文件时,您无法关闭或扩展它,您可以使用larray。如果您需要以可移植的方式扩展文件,请参见下面的答案。 - Luke

3

2
RandomAccessFile对于文件的随机访问速度相当慢。你经常会看到有关在其上实现缓冲层的代码,但是在线上找到这样的代码并不容易。
实际上,在线上是可以找到这样的代码的。例如,JAI源代码中的jpeg2000就有一个实现,而更加非限制性的实现则在以下链接中提供:http://www.unidata.ucar.edu/software/netcdf-java/
Java文档如下:

http://www.unidata.ucar.edu/software/thredds/v4.3/netcdf-java/v4.0/javadoc/ucar/unidata/io/RandomAccessFile.html


2
如果你的文件在GB级别,使用内存映射文件肯定会提高速度。我提到的缓冲RandomAccessFile实现非常适合小文件,并且对内存要求较低。内存映射文件需要大量RAM来完成它们的魔法。 - javatothebone
唯一的问题是我必须依赖整个库来使用一个类。这就是问题所在。不过,还是谢谢提供链接。 - marcorossi

1

如果您正在64位机器上运行,则内存映射文件是最佳方法。只需将整个文件映射到等大小的缓冲区数组中,然后根据需要为每个记录选择一个缓冲区(即edalorzo的答案,但您希望有重叠的缓冲区,以便不会跨越边界的记录)。

如果您在32位JVM上运行,则只能使用RandomAccessFile。但是,您可以使用它来读取包含整个记录的byte[],然后使用ByteBuffer从该数组中检索单个值。最坏的情况下,您应该需要进行两次文件访问:一次用于检索记录的位置/大小,一次用于检索记录本身。

但是,请注意,如果创建大量的byte[],则可能会开始对垃圾收集器造成压力,并且如果您在文件中反复跳动,则仍然会受到IO限制。


1
@edalorzo - 这是由于32位硬件的限制。在64位机器上,您的虚拟地址空间足够大,可以映射整个文件。在32位机器上,您必须不断重新映射文件的部分,并且可能会遇到GC问题(映射的文件由垃圾收集器取消映射,应该取消映射一个文件以便您有空间映射另一个文件,但在这样做时可能进行完整的收集)。 - Anon
是的,我正好在寻找像你的32位解决方案这样的东西。看看我对edalorzo的评论。第一个问题是在许多不同位置进行小读取的mmapping(与mmapping的大小和成本相比)不太合理。 - marcorossi
1
@marcorossi:读取文件时不会映射文件的某些部分,而是会映射整个文件。这可能会对您有所帮助:http://kdgcommons.svn.sourceforge.net/viewvc/kdgcommons/trunk/src/main/java/net/sf/kdgcommons/io/MappedFileBuffer.java?revision=20&view=markup - kdgregory
@kdgregory:看起来很有趣,但我无法内存映射100+GIG文件。另外,您如何处理缓冲区之间的重叠数据?似乎您没有处理这种情况。 - marcorossi
@marcorossi:为什么你不能内存映射100+G?如果你有64位处理器、操作系统和JVM,那么你应该没有问题。至于重叠缓冲区,这就是该类的全部意义:你可以在缓冲区之间获得高达1Gb的重叠。 - kdgregory
显示剩余2条评论

0
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;

/**
 * Adds caching to a random access file.
 * 
 * Rather than directly writing down to disk or to the system which seems to be
 * what random access file/file channel do, add a small buffer and write/read from
 * it when possible. A single buffer is created, which means reads or writes near 
 * each other will have a speed up. Read/writes that are not within the cache block 
 * will not be speed up. 
 * 
 *
 */
public class BufferedRandomAccessFile implements AutoCloseable {

    private static final int DEFAULT_BUFSIZE = 4096;

    /**
     * The wrapped random access file, we will hold a cache around it.
     */
    private final RandomAccessFile raf;

    /**
     * The size of the buffer
     */
    private final int bufsize;

    /**
     * The buffer.
     */
    private final byte buf[];


    /**
     * Current position in the file.
     */
    private long pos = 0;

    /**
     * When the buffer has been read, this tells us where in the file the buffer
     * starts at.
     */
    private long bufBlockStart = Long.MAX_VALUE;


    // Must be updated on write to the file
    private long actualFileLength = -1;

    boolean changeMadeToBuffer = false;

    // Must be update as we write to the buffer.
    private long virtualFileLength = -1;

    public BufferedRandomAccessFile(File name, String mode) throws FileNotFoundException {
        this(name, mode, DEFAULT_BUFSIZE);
    }

    /**
     * 
     * @param file
     * @param mode how to open the random access file.
     * @param b size of the buffer
     * @throws FileNotFoundException
     */
    public BufferedRandomAccessFile(File file, String mode, int b) throws FileNotFoundException {
        this(new RandomAccessFile(file, mode), b);
    }

    public BufferedRandomAccessFile(RandomAccessFile raf) throws FileNotFoundException {
        this(raf, DEFAULT_BUFSIZE);
    }

    public BufferedRandomAccessFile(RandomAccessFile raf, int b) {
        this.raf = raf;
        try {
            this.actualFileLength = raf.length();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.virtualFileLength = actualFileLength;
        this.bufsize = b;
        this.buf = new byte[bufsize];
    }

    /**
     * Sets the position of the byte at which the next read/write should occur.
     * 
     * @param pos
     * @throws IOException
     */
    public void seek(long pos) throws IOException{
        this.pos = pos;
    }

    /**
     * Sets the length of the file.
     */
    public void setLength(long fileLength) throws IOException {
        this.raf.setLength(fileLength);
        if(fileLength < virtualFileLength) {
            virtualFileLength = fileLength;
        }
    }

    /**
     * Writes the entire buffer to disk, if needed.
     */
    private void writeBufferToDisk() throws IOException {
        if(!changeMadeToBuffer) return;
        int amountOfBufferToWrite = (int) Math.min((long) bufsize, virtualFileLength - bufBlockStart);
        if(amountOfBufferToWrite > 0) {
            raf.seek(bufBlockStart);
            raf.write(buf, 0, amountOfBufferToWrite);
            this.actualFileLength = virtualFileLength;
        }
        changeMadeToBuffer = false;
    }

    /**
     * Flush the buffer to disk and force a sync.
     */
    public void flush() throws IOException {
        writeBufferToDisk();
        this.raf.getChannel().force(false);
    }

    /**
     * Based on pos, ensures that the buffer is one that contains pos
     * 
     * After this call it will be safe to write to the buffer to update the byte at pos,
     * if this returns true reading of the byte at pos will be valid as a previous write
     * or set length has caused the file to be large enough to have a byte at pos.
     * 
     * @return true if the buffer contains any data that may be read. Data may be read so long as
     * a write or the file has been set to a length that us greater than the current position.
     */
    private boolean readyBuffer() throws IOException {
        boolean isPosOutSideOfBuffer = pos < bufBlockStart || bufBlockStart + bufsize <= pos;

        if (isPosOutSideOfBuffer) {

            writeBufferToDisk();

            // The buffer is always positioned to start at a multiple of a bufsize offset.
            // e.g. for a buf size of 4 the starting positions of buffers can be at 0, 4, 8, 12..
            // Work out where the buffer block should start for the given position. 
            long bufferBlockStart = (pos / bufsize) * bufsize;

            assert bufferBlockStart >= 0;

            // If the file is large enough, read it into the buffer.
            // if the file is not large enough we have nothing to read into the buffer,
            // In both cases the buffer will be ready to have writes made to it.
            if(bufferBlockStart < actualFileLength) {
                raf.seek(bufferBlockStart);
                raf.read(buf);
            }

            bufBlockStart = bufferBlockStart;
        }

        return pos < virtualFileLength;
    }

    /**
     * Reads a byte from the file, returning an integer of 0-255, or -1 if it has reached the end of the file.
     * 
     * @return
     * @throws IOException 
     */
    public int read() throws IOException {
        if(readyBuffer() == false) {
            return -1;
        }
        try {
            return (buf[(int)(pos - bufBlockStart)]) & 0x000000ff ; 
        } finally {
            pos++;
        }
    }

    /**
     * Write a single byte to the file.
     * 
     * @param b
     * @throws IOException
     */
    public void write(byte b) throws IOException {
        readyBuffer(); // ignore result we don't care.
        buf[(int)(pos - bufBlockStart)] = b;
        changeMadeToBuffer = true;
        pos++;
        if(pos > virtualFileLength) {
            virtualFileLength = pos;
        }
    }

    /**
     * Write all given bytes to the random access file at the current possition.
     * 
     */
    public void write(byte[] bytes) throws IOException {
        int writen = 0;
        int bytesToWrite = bytes.length;
        {
            readyBuffer();
            int startPositionInBuffer = (int)(pos - bufBlockStart);
            int lengthToWriteToBuffer = Math.min(bytesToWrite - writen, bufsize - startPositionInBuffer);
            assert  startPositionInBuffer + lengthToWriteToBuffer <= bufsize;

            System.arraycopy(bytes, writen,
                            buf, startPositionInBuffer,
                            lengthToWriteToBuffer);
            pos += lengthToWriteToBuffer;
            if(pos > virtualFileLength) {
                virtualFileLength = pos;
            }
            writen += lengthToWriteToBuffer;
            this.changeMadeToBuffer = true;
        }

        // Just write the rest to the random access file
        if(writen < bytesToWrite) {
            writeBufferToDisk();
            int toWrite = bytesToWrite - writen;
            raf.write(bytes, writen, toWrite);
            pos += toWrite;
            if(pos > virtualFileLength) {
                virtualFileLength = pos;
                actualFileLength = virtualFileLength;
            }
        }
    }

    /**
     * Read up to to the size of bytes,
     * 
     * @return the number of bytes read.
     */
    public int read(byte[] bytes) throws IOException {
        int read = 0;
        int bytesToRead = bytes.length;
        while(read < bytesToRead) {

            //First see if we need to fill the cache
            if(readyBuffer() == false) {
                //No more to read;
                return read;
            }

            //Now read as much as we can (or need from cache and place it
            //in the given byte[]
            int startPositionInBuffer = (int)(pos - bufBlockStart);
            int lengthToReadFromBuffer = Math.min(bytesToRead - read, bufsize - startPositionInBuffer);

            System.arraycopy(buf, startPositionInBuffer, bytes, read, lengthToReadFromBuffer);

            pos += lengthToReadFromBuffer;
            read += lengthToReadFromBuffer;
        }

        return read;
    }

    public void close() throws IOException {
        try {
            this.writeBufferToDisk();
        } finally {
            raf.close();
        }
    }

    /**
     * Gets the length of the file.
     * 
     * @return
     * @throws IOException
     */
    public long length() throws IOException{
        return virtualFileLength;
    }

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接