不使用锁原子地交换两个数值

5

我正在创建一个仅追加数据的数据结构,它驻留在内存中,并将记录序列化为字节数组追加到该内存中。 我需要这个数据结构是线程安全且非常快速的,所以我想出了以下代码,目前运行良好(这是伪代码,实际版本更复杂并执行其他操作,但只是为了了解思路)。

public sealed class MemoryList : IDisposable
{
    private int nextOffset = 0;
    private readonly MemoryMappedFile file;
    private readonly MemoryMappedViewAccessor va;

    public MemoryList(uint capacity)
    {
        // Some checks on capacity here
        var mapName = Guid.NewGuid().ToString("N");
        this.file = MemoryMappedFile.CreateNew(mapName, capacity);
        this.va = file.CreateViewAccessor(0, capacity);
    }

    public void AppendMessage(byte[] messagePayload)
    {
        if (messagePayload == null) 
            throw new ArgumentNullException(nameof(messagePayload));
        if (messagePayload.Length == 0)
            throw new ArgumentOutOfRangeException(nameof(messagePayload));

        if (TryReserveCapacity(messagePayload.Length, out var offsetToWriteTo))
        {
            this.va.Write(offsetToWriteTo, messagePayload.Length);
            this.va.WriteArray(offsetToWriteTo + sizeof(int), messagePayload, 0, messagePayload.Length);
        }
    }

    private bool TryReserveCapacity(int dataLength, out long reservedOffset)
    {
        // reserve enough room to store data + its size
        var packetSize = sizeof(int) + dataLength;
        reservedOffset = Interlocked.Add(ref this.nextOffset, packetSize) - packetSize;

        if (this.nextOffset <= this.va.Capacity)
            return true;
        reservedOffset = -1;
        return false;
    }

    public void Dispose()
    {
        file?.Dispose();
        va?.Dispose();
    }
}

这个方法运行非常快且效果很好,无论我怎么尝试都不能破坏它。

现在我需要的是每个追加消息时TryReserveCapacity方法输出每个消息的逻辑索引。 所以对于第一条消息获取索引0,对于第二条消息获取索引1等等。 这会导致使用两次调用Interlocked,一个用于offset,另一个用于messageIndex。这些调用显然不是线程安全的,我可能会遇到竞争条件,从而导致以下情况。

MI: 101,Offset: 10000 MI: 100,Offset: 10500

有没有什么方法可以保证没有MI会比具有更大偏移量的另一个MI更大?而且所有这些都不使用任何锁?

因此,基本上我们如何更改以下方法才能正确地执行?

private bool TryReserveCapacity(int dataLength, out long reservedOffset, out long messageId)
{
    // reserve enough room to store data + its size
    var packetSize = sizeof(int) + dataLength;
    reservedOffset = Interlocked.Add(ref this.nextOffset, packetSize) - packetSize;
    messageId = Interlocked.Increment(ref this.currentMessageId);

    if (this.nextOffset <= this.va.Capacity)
        return true;
    reservedOffset = -1;
    return false;
}

注:我知道示例代码存在字节序问题,但是请将其视为伪代码以说明问题。


messageId 是如何使用的?它需要递增吗?你能将 reservedOffset 作为 ID 返回吗? - Sinatr
是的,它需要是增量的。我不能使用reservedOffset作为id / index。一种方法是向后计算到这个保留索引之前有多少“消息”,但这对我来说似乎不是最好的选择。 - Mihail Shishkov
如果您可以将ID和偏移量打包成64位,则可以执行以下操作:1)使用Interlocked.Read()读取它们;2)正常增加ID并添加到偏移量中;3)尝试使用Interlocked.CompareExchange交换新的打包值,如果返回的值与步骤1不同(这意味着其他线程在您之前已经保留了容量),则重复步骤2。 - Sinatr
不幸的是,messageId 必须是 long 类型,但如果我创建自己的版本的 Read 方法,它可以使用 CompareExchange(ref object location1, object value, object comparand) 读取包含这两个字段的结构体。但是这样会涉及到一些装箱操作。 - Mihail Shishkov
还有针对引用类型的CompareExchange<T>(如果将id和offset打包到class中),但这样会产生分配。 - Sinatr
1个回答

1

如果这不是直接解决您的主要问题(非锁定原子性),请见谅,但我看到您正在使用 MemoryMappedFileMemoryMappedViewAccessor 类来操作内存映射文件。

我真的不知道当前的 .NET Framework 是否已经解决了这个问题,但在我们三年前编写的代码库中,我们发现使用这些类进行内存映射文件操作的性能非常差(如果我没记错的话,大约比使用 Win32 API 和直接指针操作映射内存慢 7 倍),即使在一个受控的 C++/CLI 类中。

我强烈建议您测试一下这种方法,您可能会惊喜地发现性能提升了很多(就像我们一样),也许性能提升是如此显著,以至于它可以承担达到所需原子性的标准锁定成本。

如果您想探索这条路,请参考下面这段代码,其中显示了该技术的基本原理。

Int32 StationHashStorage::Open() {
   msclr::lock lock(_syncRoot);
   if( _isOpen )
      return 0;
   String^ fileName = GetFullFileName();

   _szInBytes = ComputeFileSizeInBytes(fileName);
   String^ mapExtension = GetFileExtension();
   String^ mapName = String::Format("{0}{1}_{2}", _stationId, _date.ToString("yyyyMMdd"), mapExtension);

   marshal_context context;
   LPCTSTR pMapName = context.marshal_as<const TCHAR*>(mapName);

   {
      msclr::lock lock( _openLock );
         // Try to see if another storage instance has requested the same memory-mapped file and share it
         _hMapping = OpenFileMapping(FILE_MAP_READ | FILE_MAP_WRITE, FALSE, pMapName);
         if( !_hMapping ) {
            // This is the first instance acquiring the file
            LPCTSTR pFileName = context.marshal_as<const TCHAR*>(fileName);
            // Try to open the existing file, or create new one if not exists
            _hFile = CreateFile(pFileName, 
                                GENERIC_READ | GENERIC_WRITE, 
                                FILE_SHARE_READ,
                                NULL,
                                OPEN_ALWAYS,
                                FILE_ATTRIBUTE_NORMAL,
                                NULL);
            if( !_hFile )
               throw gcnew IOException(String::Format(Strings::CreateFileFailed, GetLastError(), _stationId));
            _hMapping = CreateFileMapping(_hFile, 
                                          NULL,
                                          PAGE_READWRITE | SEC_COMMIT,
                                          0,
                                          _szInBytes,
                                          pMapName);
            if( !_hMapping ) 
               throw gcnew IOException(String::Format(Strings::CreateMappingFailed, GetLastError(), _stationId));
            _usingSharedFile = false;
         } else {
            _usingSharedFile = true;
         }
      }

// _pData gives you access to the entire requested memory range, you can directly
// dereference it,  memcopy it, etc.

   _pData = (UInt32*)::MapViewOfFile(_hMapping, FILE_MAP_READ | FILE_MAP_WRITE, 0, 0, 0);

   if( !_pData ) 
      throw gcnew IOException(String::Format(Strings::MapViewOfFileFailed, ::GetLastError(), _stationId));

   // warm-up the view by touching every page
   Int32 dummy = 0;
   for( int i = 0; i < _szInBytes / sizeof(Int32); i+= 1024 ) {
      dummy ^=  _pData[i];
   }
   // return the dummy value to prevent the optimizer from removing the apparently useless loop
   _isOpen = true;
   return dummy;
}

void StationHashStorage::Cleanup() {
     if( !_disposed ) {
      // dispose unmanaged resources here
      if( _pData ) {
         if( !UnmapViewOfFile(_pData) ) 
            LOG_ERROR(Strings::UnmapViewOfFileFailed, ::GetLastError(), _stationId);
         _pData = NULL;
      }

      if( _hMapping ) {
         if( !CloseHandle(_hMapping) ) 
            LOG_ERROR(Strings::CloseMappingFailed, ::GetLastError(), _stationId);
         _hMapping = NULL;
      }


      if( _hFile ) {
         if( !CloseHandle(_hFile) ) 
            LOG_ERROR(Strings::CloseFileFailed, ::GetLastError(), _stationId);
         _hFile = NULL;
      }
      _disposed = true;
   }
}

现在,关于你的真正问题。是否可能将生成的ID作为数据流的一部分嵌入其中? 我的想法是这样的:
1. 使用已知的虚拟值(例如0xffffffff)预先写入内存的全部内容。 2. 使用当前容量检查原子逻辑。 3. 在编写消息有效负载后,立即编写计算出的消息ID(您的容量检查需要考虑此额外数据)。 4. 不使用Interlocked.Add获取下一个ID,而是进入循环,检查当前消息之前的内存(上一个消息ID),直到它与您已知的虚拟值不同为止。退出循环后,当前消息ID将是读取值+1。
这将要求对第一个插入的消息进行一些特殊操作(因为它需要在流中种植第一个ID标记)。如果您正在使用长ID并且处于32位模式,则还需要小心确保您的ID流读取和写入是原子的。

祝你好运,我真的鼓励你尝试Win32 API,如果事情有所改善,那将非常有趣!如果您需要C++/CLI代码方面的帮助,请随时联系我。


谢谢,我一定会尝试的。 - Mihail Shishkov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接