TPL Dataflow - 生产者非常快,消费者不太快导致OutOfMemory异常

5
我正在将TPL Dataflow应用于生产代码之前进行实验。生产代码是一个经典的生产者/消费者系统——生产者生成与金融领域相关的消息,消费者处理这些消息。
我感兴趣的是,如果在某个时刻,生产者生成的速度远远快于消费者处理的速度,系统会保持多稳定(是否会崩溃或发生什么情况),更重要的是,在这种情况下该怎么办。
因此,为了拥有类似的简单应用程序,我提出了以下方案。
var bufferBlock = new BufferBlock<Item>();

var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
                        ,
    BoundedCapacity = 100000
};

var dataFlowLinkOptions = new DataflowLinkOptions
{
    PropagateCompletion = true
};

var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
    executiondataflowBlockOptions);

bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
{
    bufferBlock.SendAsync(GenerateItem());
}

bufferBlock.Complete();
Console.ReadLine();

Item是一个非常简单的类。

internal class Item
{
    public Item(string itemId)
    {
        ItemId = itemId;
    }

    public string ItemId { get; }
}

GenerateItem 仅仅是实例化 Item 对象。

static Item GenerateItem()
{
   return new Item(Guid.NewGuid().ToString());
}

现在,为了模拟“不那么快”的消费者 - 我添加了ProcessItem来等待100ms
static async Task ProcessItem(Item item)
{
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    Console.WriteLine($"Processing #{item.ItemId} item.");
}

执行这个操作会在20秒左右导致OOM异常。然后我添加了更多的消费者(增加到10个ActionBlocks),这样能够获得更多的时间,但最终还是会导致相同的OOM异常。我还注意到GC承受了巨大的压力(VS 2015诊断工具显示GC几乎一直运行),所以我引入了对象池(非常简单,本质上是存储项目的ConcurrentBag)用于Item,但仍然遇到了同样的问题(抛出OOM异常)。为了详细说明内存中有什么,为什么会耗尽内存,以下是一些细节:
- 最大尺寸的对象类型是SingleProducerSingleConsumerQueue + Segment 和ConcurrentQueue + Segment - 我看到BufferBlock的InputBuffer中充满了Items(Count = 14,562,296) - 由于我为ActionBlock设置了BoundedCapacity,它们的输入缓冲区也接近于配置数(InputCount = 99,996)
为了确保较慢的生产者能够使消费者跟上步伐,我让生产者在迭代之间睡眠:
for (int i = 0; i < int.MaxValue; i++)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(50));
    bufferBlock.SendAsync(GenerateItem());
}

它运行良好 - 没有抛出异常,内存使用率一直很低,我再也没有看到任何GC压力。

所以我有几个问题

  1. 我在尝试使用TPL Dataflow构建块复制非常快的生产者/慢的消费者场景时是否存在固有错误?
  2. 是否有方法使其正常工作并避免OOM异常。
  3. 关于如何在TPL Dataflow上下文中处理这种情况(非常快的生产者/慢的消费者)的最佳实践的任何评论/链接。
  4. 我的理解是 - 由于消费者无法跟上,BufferBlock的内部缓冲区非常快地填满了消息,并持有消息直到某些消费者回来请求下一条消息,因此应用程序由于BufferBlock的内部缓冲区被填满而耗尽内存 - 您同意吗?

我正在使用Microsoft.Tpl.Dataflow包 - 版本4.5.24。 .NET 4.5(C#6)。 进程为32位。

2个回答

16
你很好地指出了问题: BufferBlock 会填充其输入缓冲区直到遇到OOM。
为了解决这个问题,您应该在缓冲块中添加一个 BoundedCapacity 选项。这将自动为你节流生产者(不需要在你的生产者中使用 Thread.Sleep )。

谢谢确认!您能否评论一下BufferBlock的“BoundedCapacity”行为?特别是当内部缓冲区已满时,传入的消息会发生什么?它们会被丢弃吗?(目前似乎是这种情况)。Stephen,您的书中是否有更详细的介绍? - Michael
1
@Michael:消息永远不会被丢弃。如果您向一个已满的块“Post”一条消息,它将返回false(表示消息未被接受)。如果您使用await SendAsync将消息发送到一个已满的块,它将(异步)等待有空间然后发布该消息。请参见此答案 - Stephen Cleary

2
下面的代码存在潜在的严重问题:
for (int i = 0; i < int.MaxValue; i++)
{
    bufferBlock.SendAsync(GenerateItem()); // Don't do this!
}

方法SendAsync返回一个Task对象,该对象在内存方面比您发送到块中的实际项要重得多。在特定示例中,返回的任务始终已完成,因为BufferBlock具有无限容量,因此任务的内存占用实际上为零(始终返回相同的缓存Task<bool>实例)。但是,在使用小BoundedCapacity值配置块之后,事情很快会变得有趣(以不愉快的方式)。每次调用SendAsync都将很快开始返回一个未完成的Task,每次都不同,每个任务的内存占用约为200字节(如果还使用CancellationToken参数,则每个任务的内存占用为300字节)。这显然不会很好地扩展。

解决方案是按照预期使用SendAsync。这意味着它应该被等待:

for (int i = 0; i < int.MaxValue; i++)
{
    await bufferBlock.SendAsync(GenerateItem()); // It's OK now
}

这样,生产者将会异步阻塞,直到块中有足够的空间来容纳发送的项目。希望这正是你想要的。否则,如果您不想阻塞生产者,请不要使用异步的SendAsync方法,而应该使用同步的Post方法代替:
for (int i = 0; i < int.MaxValue; i++)
{
    var item = GenerateItem();
    while (true)
    {
        bool accepted = bufferBlock.Post(item); // Synchronous call
        if (accepted) break; // Break the inner loop
        if (bufferBlock.Completion.IsCompleted) return; // Break both loops

        // Here do other things for a while, before retrying to post the item
    }
}

或者,您可以使用较低级别的 OfferMessage 方法(而不是 PostSendAsync):
for (int i = 0; i < int.MaxValue; i++)
{
    var item = GenerateItem();
    while (true)
    {
        var offerResult = ((ITargetBlock<Item>)bufferBlock).OfferMessage(
            new DataflowMessageHeader(1L), item, null, false);
        if (offerResult == DataflowMessageStatus.Accepted) break;
        if (offerResult == DataflowMessageStatus.DecliningPermanently) return;

        // Here do other things for a while, before retrying to offer the item
    }
}

“魔数”1L是TPL Dataflow 源代码内部声明的一个值,表示:
一个众所周知的消息 ID,用于发送仅一条消息的代码,或者对于确切的消息 ID 并不重要的情况。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接