Java NIO - Memory mapped files

2019-02-13 17:02发布

问题:

I recently came across this article which provided a nice intro to memory mapped files and how it can be shared between two processes. Here is the code for a process that reads in the file:

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;

public class MemoryMapReader {

 /**
  * @param args
  * @throws IOException 
  * @throws FileNotFoundException 
  * @throws InterruptedException 
  */
 public static void main(String[] args) throws FileNotFoundException, IOException, InterruptedException {

  FileChannel fc = new RandomAccessFile(new File("c:/tmp/mapped.txt"), "rw").getChannel();

  long bufferSize=8*1000;
  MappedByteBuffer mem = fc.map(FileChannel.MapMode.READ_ONLY, 0, bufferSize);
  long oldSize=fc.size();

  long currentPos = 0;
  long xx=currentPos;

  long startTime = System.currentTimeMillis();
  long lastValue=-1;
  for(;;)
  {

   while(mem.hasRemaining())
   {
    lastValue=mem.getLong();
    currentPos +=8;
   }
   if(currentPos < oldSize)
   {

    xx = xx + mem.position();
    mem = fc.map(FileChannel.MapMode.READ_ONLY,xx, bufferSize);
    continue;   
   }
   else
   {
     long end = System.currentTimeMillis();
     long tot = end-startTime;
     System.out.println(String.format("Last Value Read %s , Time(ms) %s ",lastValue, tot));
     System.out.println("Waiting for message");
     while(true)
     {
      long newSize=fc.size();
      if(newSize>oldSize)
      {
       oldSize = newSize;
       xx = xx + mem.position();
       mem = fc.map(FileChannel.MapMode.READ_ONLY,xx , oldSize-xx);
       System.out.println("Got some data");
       break;
      }
     }   
   }

  }

 }

}

I have, however, a few comments/questions regarding that approach:

If we execute the reader only on an empty file, i.e run

  long bufferSize=8*1000;
  MappedByteBuffer mem = fc.map(FileChannel.MapMode.READ_ONLY, 0, bufferSize);
  long oldSize=fc.size();

This will allocate 8000 bytes which will now extend the file. The buffer that this returns has a limit of 8000 and a position of 0, therefore, the reader can proceed and read empty data. After this happens, the reader will stop, as currentPos == oldSize.

Supposedly now the writer comes in (code is omitted as most of it is straightforward and can be referenced from the website) - it uses the same buffer size, so it will write first 8000 bytes, then allocate another 8000, extending the file. Now, if we suppose this process pauses at this point, and we go back to the reader, then the reader sees the new size of the file and allocates the remainder (so from position 8000 until 1600) and starts reading again, reading in another garbage...

I am a bit confused whether there is a why to synchronize those two operations. As far as I see it, any call to map might extend the file with really an empty buffer (filled with zeros) or the writer might have just extended the file, but has not written anything into it yet...

回答1:

There are several ways.

  1. Let the writer acquire an exclusive Lock on the region that has not been written yet. Release the lock when everything has been written. This is compatible to every other application running on that system but it requires the reader to be smart enough to retry on failed reads unless you combine it with one of the other methods

  2. Use another communication channel, e.g. a pipe or a socket or a file’s metadata channel to let the writer tell the reader about the finished write.

  3. Write at a position in the file a special marker (being part of the protocol) telling about the written data, e.g.

    MappedByteBuffer bb;
    …
    // write your data
    
    bb.force();// ensure completion of all writes
    bb.put(specialPosition, specialMarkerValue);
    bb.force();// ensure visibility of the marker
    


回答2:

I do a lot of work with memory-mapped files for interprocess communication. I would not recommend Holger's #1 or #2, but his #3 is what I do. But a key point is perhaps that I only ever work with a single writer - things get more complicated if you have multiple writers.

The start of the file is a header section with whatever header variables you need, most importantly a pointer to the end of the written data. The writer should always update this header variable after writing a piece of data, and the reader should never read beyond this variable. A thing called "cache coherency" that all mainstream CPU's have will guarantee that the reader will see memory writes in the same sequence they are written, so the reader will never read uninitialised memory if you follow these rules. (An exception is where the reader and writers are on different servers - cache coherency doesn't work there. Don't try to implement shared memory across different servers!)

There is no limit to how frequently you can update the end-of-file pointer - it's all in memory and there won't be any i/o involved, so you can update it each record or each message you write.

ByteBuffer has versions of 'getInt()' and 'putInt()' methods which take an absolute byte offset, so that's what I use for reading & writing the end-of-file marker...I never use the relative versions when working with memory-mapped files.

There's no way you should use the file size or yet another interprocess method to communicate the end-of-file marker and no need or benefit when you already have shared memory.



回答3:

Check out my library Mappedbus (http://github.com/caplogic/mappedbus) which enables multiple Java processes (JVMs) to write records in order to the same memory mapped file.

Here's how Mappedbus solves the synchronization problem between multiple writers:

  • The first eight bytes of the file make up a field called the limit. This field specifies how much data has actually been written to the file. The readers will poll the limit field (using volatile) to see whether there's a new record to be read.

  • When a writer wants to add a record to the file it will use the fetch-and-add instruction to atomically update the limit field.

  • When the limit field has increased a reader will know there's new data to be read, but the writer which updated the limit field might not yet have written any data in the record. To avoid this problem each record contains an initial byte which make up the commit field.

  • When a writer has finished writing a record it will set the commit field (using volatile) and the reader will only start reading a record once it has seen that the commit field has been set.

(BTW, the solution has only been verified to work on Linux x86 with Oracle's JVM. It most likely won't work on all platforms).