我正在将TPL Dataflow应用于生产代码之前进行实验。生产代码是一个经典的生产者/消费者系统——生产者生成与金融领域相关的消息,消费者处理这些消息。
我感兴趣的是,如果在某个时刻,生产者生成的速度远远快于消费者处理的速度,系统会保持多稳定(是否会崩溃或发生什么情况),更重要的是,在这种情况下该怎么办。
因此,为了拥有类似的简单应用程序,我提出了以下方案。
现在,为了模拟“不那么快”的消费者 - 我添加了
执行这个操作会在20秒左右导致OOM异常。然后我添加了更多的消费者(增加到10个ActionBlocks),这样能够获得更多的时间,但最终还是会导致相同的OOM异常。我还注意到GC承受了巨大的压力(VS 2015诊断工具显示GC几乎一直运行),所以我引入了对象池(非常简单,本质上是存储项目的ConcurrentBag)用于Item,但仍然遇到了同样的问题(抛出OOM异常)。为了详细说明内存中有什么,为什么会耗尽内存,以下是一些细节:
- 最大尺寸的对象类型是SingleProducerSingleConsumerQueue + Segment 和ConcurrentQueue + Segment - 我看到BufferBlock的InputBuffer中充满了Items(Count = 14,562,296) - 由于我为ActionBlock设置了BoundedCapacity,它们的输入缓冲区也接近于配置数(InputCount = 99,996)
为了确保较慢的生产者能够使消费者跟上步伐,我让生产者在迭代之间睡眠:
我感兴趣的是,如果在某个时刻,生产者生成的速度远远快于消费者处理的速度,系统会保持多稳定(是否会崩溃或发生什么情况),更重要的是,在这种情况下该怎么办。
因此,为了拥有类似的简单应用程序,我提出了以下方案。
var bufferBlock = new BufferBlock<Item>();
var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
,
BoundedCapacity = 100000
};
var dataFlowLinkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
executiondataflowBlockOptions);
bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
{
bufferBlock.SendAsync(GenerateItem());
}
bufferBlock.Complete();
Console.ReadLine();
Item
是一个非常简单的类。
internal class Item
{
public Item(string itemId)
{
ItemId = itemId;
}
public string ItemId { get; }
}
GenerateItem
仅仅是实例化 Item
对象。
static Item GenerateItem()
{
return new Item(Guid.NewGuid().ToString());
}
现在,为了模拟“不那么快”的消费者 - 我添加了
ProcessItem
来等待100ms
。static async Task ProcessItem(Item item)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
Console.WriteLine($"Processing #{item.ItemId} item.");
}
执行这个操作会在20秒左右导致OOM异常。然后我添加了更多的消费者(增加到10个ActionBlocks),这样能够获得更多的时间,但最终还是会导致相同的OOM异常。我还注意到GC承受了巨大的压力(VS 2015诊断工具显示GC几乎一直运行),所以我引入了对象池(非常简单,本质上是存储项目的ConcurrentBag)用于Item,但仍然遇到了同样的问题(抛出OOM异常)。为了详细说明内存中有什么,为什么会耗尽内存,以下是一些细节:
- 最大尺寸的对象类型是SingleProducerSingleConsumerQueue + Segment 和ConcurrentQueue + Segment - 我看到BufferBlock的InputBuffer中充满了Items(Count = 14,562,296) - 由于我为ActionBlock设置了BoundedCapacity,它们的输入缓冲区也接近于配置数(InputCount = 99,996)
为了确保较慢的生产者能够使消费者跟上步伐,我让生产者在迭代之间睡眠:
for (int i = 0; i < int.MaxValue; i++)
{
Thread.Sleep(TimeSpan.FromMilliseconds(50));
bufferBlock.SendAsync(GenerateItem());
}
它运行良好 - 没有抛出异常,内存使用率一直很低,我再也没有看到任何GC压力。
所以我有几个问题
- 我在尝试使用TPL Dataflow构建块复制非常快的生产者/慢的消费者场景时是否存在固有错误?
- 是否有方法使其正常工作并避免OOM异常。
- 关于如何在TPL Dataflow上下文中处理这种情况(非常快的生产者/慢的消费者)的最佳实践的任何评论/链接。
- 我的理解是 - 由于消费者无法跟上,
BufferBlock
的内部缓冲区非常快地填满了消息,并持有消息直到某些消费者回来请求下一条消息,因此应用程序由于BufferBlock
的内部缓冲区被填满而耗尽内存 - 您同意吗?
我正在使用Microsoft.Tpl.Dataflow
包 - 版本4.5.24。
.NET 4.5(C#6)。 进程为32位。
false
(表示消息未被接受)。如果您使用await SendAsync
将消息发送到一个已满的块,它将(异步)等待有空间然后发布该消息。请参见此答案。 - Stephen Cleary