Java NIO - 内存映射文件

7

我最近看到了这篇文章,很好地介绍了内存映射文件及其如何在两个进程之间共享。下面是读取文件的进程代码:

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;

public class MemoryMapReader {

 /**
  * @param args
  * @throws IOException 
  * @throws FileNotFoundException 
  * @throws InterruptedException 
  */
 public static void main(String[] args) throws FileNotFoundException, IOException, InterruptedException {

  FileChannel fc = new RandomAccessFile(new File("c:/tmp/mapped.txt"), "rw").getChannel();

  long bufferSize=8*1000;
  MappedByteBuffer mem = fc.map(FileChannel.MapMode.READ_ONLY, 0, bufferSize);
  long oldSize=fc.size();

  long currentPos = 0;
  long xx=currentPos;

  long startTime = System.currentTimeMillis();
  long lastValue=-1;
  for(;;)
  {

   while(mem.hasRemaining())
   {
    lastValue=mem.getLong();
    currentPos +=8;
   }
   if(currentPos < oldSize)
   {

    xx = xx + mem.position();
    mem = fc.map(FileChannel.MapMode.READ_ONLY,xx, bufferSize);
    continue;   
   }
   else
   {
     long end = System.currentTimeMillis();
     long tot = end-startTime;
     System.out.println(String.format("Last Value Read %s , Time(ms) %s ",lastValue, tot));
     System.out.println("Waiting for message");
     while(true)
     {
      long newSize=fc.size();
      if(newSize>oldSize)
      {
       oldSize = newSize;
       xx = xx + mem.position();
       mem = fc.map(FileChannel.MapMode.READ_ONLY,xx , oldSize-xx);
       System.out.println("Got some data");
       break;
      }
     }   
   }

  }

 }

}

我有一些关于这种方法的评论/问题:
如果我们只在空文件上执行读取器,即运行
  long bufferSize=8*1000;
  MappedByteBuffer mem = fc.map(FileChannel.MapMode.READ_ONLY, 0, bufferSize);
  long oldSize=fc.size();

这将分配8000字节,现在文件将被扩展。返回的缓冲区限制为8000,位置为0,因此,读取器可以继续读取空数据。发生这种情况后,读者将停止,因为currentPos == oldSize
假设现在写入程序进来了(代码被省略,因为大部分都很简单,可以参考网站)- 它使用相同的缓冲区大小,因此它将先写入8000个字节,然后再分配另外8000个字节,扩展文件。现在,如果我们假设该过程在此时暂停,然后回到读取器,那么读取器将看到文件的新大小并分配剩余部分(因此从位置8000到1600),然后再次开始读取,并读取出另一个垃圾数据……
我有一点困惑,是否有一种方法可以同步这两个操作。就我所看到的,调用map的任何操作可能会用一个真正的空缓冲区(填充为零)扩展文件,或者写入程序可能只是扩展了文件,但还没有写入任何内容……

1
每当我看到“写”和“共享数据”时,我就会想到需要同步。 - duffymo
我不知道你所说的“是否有一种同步方式”的意思,但是打开大量内存映射文件或多次打开同一个文件都是非常糟糕的想法,因为垃圾回收的原因,没有明确定义的时间可以释放相关的内存。而且,像8k这样的微小数量进行映射并没有特别的优势:您可以使用缓冲流,它们默认具有那么多的缓冲区,并且没有关于文件扩展时该怎么做的麻烦。当只有一个非常大的文件时,内存映射文件最好只用于很少的数量。 - user207421
好的,明白了 - 打开一个大文件。不过,这是IPC的方法,所以我想知道如何实现一种方式,即一个进程写入,另一个进程读取,但我们要确保在读取之前知道另一个进程实际上已经写入了某些内容。这就是我所说的同步。 - Bober02
这是一个文件而不是一个管道。仅使用mmap()将无法实现同步。示例代码使用(丑陋的)繁忙轮询来实现。 - eckes
3个回答

13

我经常使用内存映射文件进行进程间通信。我不建议使用Holger的第一或第二种方法,而是使用他的第三种方法。但一个关键点可能是,我只和单个写入者一起工作 - 如果你有多个写入者,事情会变得更加复杂。

文件的开头是一个标题部分,其中包含所需的任何标题变量,最重要的是指向已写入数据结尾的指针。写入者应始终在写入数据后更新此标题变量,读取者不应超过此变量进行读取。所有主流CPU都具有的称为“缓存一致性”的东西将保证读取者看到与写入顺序相同的内存写入,因此如果遵循这些规则,读取器将永远不会读取未初始化的内存。(例外情况是读取器和写入器在不同的服务器上 - 缓存一致性在这里不起作用。不要尝试在不同服务器之间实现共享内存!)

您可以随意更新文件结尾指针的频率 - 它全部在内存中,不涉及任何I/O,因此您可以在每个记录或每个消息中更新它。

ByteBuffer具有'getInt()'和'putInt()'方法的绝对字节偏移版本,因此在处理内存映射文件时,我从不使用相对版本来读取和写入文件结尾标记。

当您已经拥有共享内存时,您不应该使用文件大小或其他进程间通信方法来通信文件结尾标记,也没有必要或好处。


+1. 你可以说明如何防止两个写入者同时在同一位置扩展文件。我也看到过使用头文件来实现锁本身的情况。 - user207421
请记住,我们不会在共享内存中存储POJO,而是使用“getInt()”等方法,并且这里的所有内容都是隐式易失性的。 - Tim Cooper
同意,你需要使用关键字“volatile”来确保存储顺序得到尊重。但无论如何,我们现在谈论的是Java中的内存映射文件,对吧?使用.getInt()等方法?如果你说这些调用可以被重新排序,我会非常惊讶。 - Tim Cooper
@TimCooper,你们两个都是对的,但是在两个不同的话题上。对于Linux,写入mmap共享内存对其他进程是立即可见的,具有缓存一致性,如果你只通过getInt()读取一个标志,那么完全没有问题,它是原子的并且始终最新,我想这就是你所说的。但是,如果你写入了两个标志并且顺序很重要,你需要使用内存屏障来排除CPU负载/存储/WC缓冲区,否则会发生重新排序。缓存一致性确保了立即的可见性,但它不能保证重新排序,这就是Eloff/JasonN所谈论的。 - Daniel
1
这个问题明确提到了“java”和“进程间通信”。因此,我们正在谈论MappedByteBuffer和getInt()。为什么每个人都在谈论只与C++和/或线程相关的问题呢? - Tim Cooper
显示剩余3条评论

5

请查看我的库Mappedbus (http://github.com/caplogic/mappedbus),它使得多个Java进程(JVMs)能够按顺序向同一个内存映射文件写入记录。

这是Mappedbus如何解决多个写入者之间的同步问题:

  • 文件的前8个字节构成了一个称为limit的字段。该字段指定实际写入文件的数据量。读取器将使用volatile轮询limit字段,以查看是否有新记录可读取。

  • 当编写者想要向文件添加记录时,它将使用fetch-and-add指令原子地更新limit字段。

  • 当limit字段增加时,读取器将知道有新数据可读取,但更新limit字段的编写器可能尚未在记录中写入任何数据。为避免这个问题,每个记录都包含一个初始字节,构成commit字段。

  • 当编写者完成记录编写后,它将设置commit字段(使用volatile),并且只有在读取器看到已设置commit字段时,才开始读取记录。

顺便说一下,该解决方案仅在Linux x86上使用Oracle的JVM进行验证。它很可能不适用于所有平台。


是的,你使用CAS的解决方案更好。你使用了Unsafe.compareAndSwapInt(...)吗? - Daniel
是的,确切地说是compareAndSwapLong。请查看MappedBusWriter.java中的allocate方法。 - MikaelJ
最初的解决方案使用了CAS,但现在使用fetch-and-add作为优化。 - MikaelJ

3
有几种方法。
  1. Let the writer acquire an exclusive Lock on the region that has not been written yet. Release the lock when everything has been written. This is compatible to every other application running on that system but it requires the reader to be smart enough to retry on failed reads unless you combine it with one of the other methods

  2. Use another communication channel, e.g. a pipe or a socket or a file’s metadata channel to let the writer tell the reader about the finished write.

  3. Write at a position in the file a special marker (being part of the protocol) telling about the written data, e.g.

    MappedByteBuffer bb;
    …
    // write your data
    
    bb.force();// ensure completion of all writes
    bb.put(specialPosition, specialMarkerValue);
    bb.force();// ensure visibility of the marker
    

你的意思是在通道上使用 FileLock 吗? - Bober02
是的,我指的是 FileLock - Holger

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接