我正在创建一个仅追加数据的数据结构,它驻留在内存中,并将记录序列化为字节数组追加到该内存中。 我需要这个数据结构是线程安全且非常快速的,所以我想出了以下代码,目前运行良好(这是伪代码,实际版本更复杂并执行其他操作,但只是为了了解思路)。
public sealed class MemoryList : IDisposable
{
private int nextOffset = 0;
private readonly MemoryMappedFile file;
private readonly MemoryMappedViewAccessor va;
public MemoryList(uint capacity)
{
// Some checks on capacity here
var mapName = Guid.NewGuid().ToString("N");
this.file = MemoryMappedFile.CreateNew(mapName, capacity);
this.va = file.CreateViewAccessor(0, capacity);
}
public void AppendMessage(byte[] messagePayload)
{
if (messagePayload == null)
throw new ArgumentNullException(nameof(messagePayload));
if (messagePayload.Length == 0)
throw new ArgumentOutOfRangeException(nameof(messagePayload));
if (TryReserveCapacity(messagePayload.Length, out var offsetToWriteTo))
{
this.va.Write(offsetToWriteTo, messagePayload.Length);
this.va.WriteArray(offsetToWriteTo + sizeof(int), messagePayload, 0, messagePayload.Length);
}
}
private bool TryReserveCapacity(int dataLength, out long reservedOffset)
{
// reserve enough room to store data + its size
var packetSize = sizeof(int) + dataLength;
reservedOffset = Interlocked.Add(ref this.nextOffset, packetSize) - packetSize;
if (this.nextOffset <= this.va.Capacity)
return true;
reservedOffset = -1;
return false;
}
public void Dispose()
{
file?.Dispose();
va?.Dispose();
}
}
这个方法运行非常快且效果很好,无论我怎么尝试都不能破坏它。
现在我需要的是每个追加消息时TryReserveCapacity
方法输出每个消息的逻辑索引。
所以对于第一条消息获取索引0,对于第二条消息获取索引1等等。
这会导致使用两次调用Interlocked
,一个用于offset
,另一个用于messageIndex
。这些调用显然不是线程安全的,我可能会遇到竞争条件,从而导致以下情况。
MI: 101,Offset: 10000 MI: 100,Offset: 10500
有没有什么方法可以保证没有MI会比具有更大偏移量的另一个MI更大?而且所有这些都不使用任何锁?
因此,基本上我们如何更改以下方法才能正确地执行?
private bool TryReserveCapacity(int dataLength, out long reservedOffset, out long messageId)
{
// reserve enough room to store data + its size
var packetSize = sizeof(int) + dataLength;
reservedOffset = Interlocked.Add(ref this.nextOffset, packetSize) - packetSize;
messageId = Interlocked.Increment(ref this.currentMessageId);
if (this.nextOffset <= this.va.Capacity)
return true;
reservedOffset = -1;
return false;
}
注:我知道示例代码存在字节序问题,但是请将其视为伪代码以说明问题。
messageId
是如何使用的?它需要递增吗?你能将reservedOffset
作为 ID 返回吗? - SinatrInterlocked.Read()
读取它们;2)正常增加ID并添加到偏移量中;3)尝试使用Interlocked.CompareExchange
交换新的打包值,如果返回的值与步骤1不同(这意味着其他线程在您之前已经保留了容量),则重复步骤2。 - SinatrCompareExchange<T>
(如果将id和offset打包到class
中),但这样会产生分配。 - Sinatr