TPL数据流块消耗了所有可用内存。

9
我有一个TransformManyBlock块,其设计如下:
  • 输入:文件路径
  • 输出:文件内容的IEnumerable,每行一个
我正在处理一个超大文件(61GB),因为它太大无法放入内存,所以我将此块和所有下游块的BoundedCapacity设置为非常低的值(例如1)。然而,该块显然会贪婪地迭代IEnumerable,这会消耗计算机上所有可用的内存,使每个进程停滞不前。该块的输出计数继续无限增加,直到我终止进程。
我该怎么做才能防止块以这种方式消耗IEnumerable
编辑:以下是说明问题的示例程序:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    static IEnumerable<string> GetSequence(char c)
    {
        for (var i = 0; i < 1024 * 1024; ++i)
            yield return new string(c, 1024 * 1024);
    }

    static void Main(string[] args)
    {
        var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
        var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
        var secondBlock = new ActionBlock<string>(str =>
            {
                Console.WriteLine(str.Substring(0, 10));
                Thread.Sleep(1000);
            }, options);

        firstBlock.LinkTo(secondBlock);
        firstBlock.Completion.ContinueWith(task =>
            {
                if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
                else secondBlock.Complete();
            });

        firstBlock.Post('A');
        firstBlock.Complete();
        for (; ; )
        {
            Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
            Thread.Sleep(3000);
        }
    }
}

如果您使用的是64位计算机,请确保在Visual Studio中清除“首选32位”选项。我的电脑上有16GB的RAM,这个程序会立即消耗所有可用的字节。


老实说,我没有时间在这里和你争论 - 祝你好运。 - Random Dev
如果您仔细阅读本节的其余部分,您会发现它并不像您想象的那样工作 - 您的“firstBlock”始终提供其可以生成的所有内容 - 如果您绑定第二个块,它只会拒绝第二个输入并稍后获取它。 - Random Dev
3个回答

6
你好像误解了TPL Dataflow的工作方式。
BoundedCapacity限制了你可以向块中发布的项目数量。在你的情况下,这意味着将单个char发布到TransformManyBlock中,将单个字符串发布到ActionBlock中。
因此,你将单个项目发布到TransformManyBlock,它随后返回1024 * 1024个字符串,并尝试将其传递给ActionBlock,而ActionBlock一次只能接受一个字符串。其余的字符串将保留在TransformManyBlock的输出队列中。
你可能想要做的是创建一个单独的块,并按流式方式发布项目,等待(同步或异步)容量达到时:
private static void Main()
{
    MainAsync().Wait();
}

private static async Task MainAsync()
{
    var block = new ActionBlock<string>(async item =>
    {
        Console.WriteLine(item.Substring(0, 10));
        await Task.Delay(1000);
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

    foreach (var item in GetSequence('A'))
    {
        await block.SendAsync(item);
    }

    block.Complete();
    await block.Completion;
}

1
谢谢。最终我创建了一个新的块,它封装了一个源ActionBlock和一个目标BufferBlock。该操作块使用您建议的SendAsync来填充缓冲区。对外部世界而言,它的行为类似于我想要的TransformManyBlock。 - Brian Berns
@brianberns:不好意思问这个问题是否很蠢,但是"await block.SendAsync(item)"和 "block.Post(item)"有什么区别? - Bugmaster
2
@Bugmaster 这绝不是一个愚蠢的问题:https://dev59.com/jmYr5IYBdhLWcg3wi6o_#13605979 - i3arnon
2
@i3arnon:谢谢,我没有意识到Post()会立即返回,无论如何,我以为它会阻塞直到消息被消耗。糟糕! - Bugmaster

1
似乎要创建一个输出受限的 TransformManyBlock,需要三个内部块:
  1. 一个 TransformBlock 接收输入并产生可能并行运行的 IEnumerables。
  2. 一个非并行的 ActionBlock 列举生成的 IEnumerables,并传播最终结果。
  3. 一 个 BufferBlock 存储最终结果,遵守所需的BoundedCapacity
略微棘手的部分是如何传播第二个块的完成,因为它不直接链接到第三个块。在下面的实现中,方法PropagateCompletion根据库的源代码编写。
public static IPropagatorBlock<TInput, TOutput>
    CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
    Func<TInput, Task<IEnumerable<TOutput>>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));

    var input = new TransformBlock<TInput, IEnumerable<TOutput>>(transform,
        dataflowBlockOptions);
    var output = new BufferBlock<TOutput>(dataflowBlockOptions);
    var middle = new ActionBlock<IEnumerable<TOutput>>(async results =>
    {
        if (results == null) return;
        foreach (var result in results)
        {
            var accepted = await output.SendAsync(result).ConfigureAwait(false);
            if (!accepted) break; // If one is rejected, the rest will be rejected too
        }
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1,
        BoundedCapacity = dataflowBlockOptions.MaxDegreeOfParallelism,
        CancellationToken = dataflowBlockOptions.CancellationToken,
        SingleProducerConstrained = true,
    });

    input.LinkTo(middle, new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateCompletion(middle, output);

    return DataflowBlock.Encapsulate(input, output);

    async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
    {
        try
        {
            await source.Completion.ConfigureAwait(false);
        }
        catch { }

        var exception = source.Completion.IsFaulted ? source.Completion.Exception : null;
        if (exception != null) target.Fault(exception); else target.Complete();
    }
}

// Overload with synchronous delegate
public static IPropagatorBlock<TInput, TOutput>
    CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TOutput>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
        item => Task.FromResult(transform(item)), dataflowBlockOptions);
}

使用示例:

var firstBlock = CreateOutputBoundedTransformManyBlock<char, string>(
    c => GetSequence(c), options);

0

如果管道的输出比后续比率低,则消息将在管道上累积,直到内存耗尽或达到某个队列限制。 如果消息具有显着的大小,则进程很快就会因为内存不足而饥饿。

BoundedCapacity设置为1将导致如果队列已经有一条消息,则消息被队列拒绝。这在批处理等情况下不是期望的行为。请查看此post以获取更多信息。

这个工作测试说明了我的观点:

//Change BoundedCapacity to +1 to see it fail
[TestMethod]
public void stackOverflow()
{      
    var total = 1000;
    var processed = 0;
    var block = new ActionBlock<int>(
       (messageUnit) =>
       {
           Thread.Sleep(10);
           Trace.WriteLine($"{messageUnit}");
           processed++;
       },
        new ExecutionDataflowBlockOptions() { BoundedCapacity = -1 } 
   );

    for (int i = 0; i < total; i++)
    {
        var result = block.SendAsync(i);
        Assert.IsTrue(result.IsCompleted, $"failed for {i}");
    }

    block.Complete();
    block.Completion.Wait();

    Assert.AreEqual(total, processed);
}

我的方法是对帖子进行节流,这样管道中的消息不会积累太多。

以下是一种简单的方法。通过这种方式,数据流可以以全速处理消息,但是消息不会积累,从而避免过度的内存消耗。

//Should be adjusted for specific use.
public void postAssync(Message message)
{

    while (totalPending = block1.InputCount + ... + blockn.InputCount> 100)
    {
        Thread.Sleep(200);
        //Note: if allocating huge quantities for of memory for each message the Garbage collector may keep up with the pace. 
        //This is the perfect place to force garbage collector to release memory.

    }
    block1.SendAssync(message)
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接