意外行为- TPL DataFlow BatchBlock在TriggerBatch执行时拒绝项目

10

当您创建带有有限容量的批处理块并在同时发布新项目时调用triggerBatch-在触发批处理执行期间,发布新项目将失败。

调用Trigger batch(每隔X时间)是为了确保数据不会在块中延迟太长时间,在传入数据流暂停或减慢的情况下。

以下代码将输出一些“发布失败”事件。例如:

    public static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 });
        var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
        batchBlock.LinkTo(actionBlock);

        var producerTask = Task.Factory.StartNew(() =>
        {
            //Post 10K Items
            for (int i = 0; i < 10000; i++)
            {
                var postResult = batchBlock.Post(i);
                if (!postResult)
                    Console.WriteLine("Failed to Post");
            }
        });

        var triggerBatchTask = Task.Factory.StartNew(() =>
            {                    
                //Trigger Batch..
                for (int i = 0; i < 1000000; i++)
                    batchBlock.TriggerBatch();
            });

        producerTask.Wait();
        triggerBatchTask.Wait();
    }

    public static void ProcessBatch(int[] batch)
    {
        Console.WriteLine("{0} - {1}", batch.First(), batch.Last());
    }

请注意,只有当batchBlock是有界的时,才能重现此情况。

我是否遗漏了什么或者这是batchBlock的一个问题?


有点相关:如果排队项目的数量小于BatchSize,如何在超时后自动调用TriggerBatch? - Theodor Zoulias
1个回答

5
BatchBlock 不会真正地拒绝一个元素,而是试图将其推迟。但对于 Post() 方法来说,推迟是不可能的。解决这个问题的简单方法是使用 await batchBlock.SendAsync(i) 替换 batchBlock.Post(i)(同时需要将 Task.Factory.StartNew(() => 改为 Task.Run(async () =>)。
为什么会出现这种情况?根据源代码,如果 BatchBlock 是有界的,则会异步处理 TriggerBatch(),在处理过程中不会接受新的元素。
无论如何,在有界块上,您不应该期望 Post() 总是返回 true,如果块已满,则 Post() 也会返回 false

同时,我正在使用另一种解决方案,引入另一个块来接受失败,并最终在两个块上以串行方式调用triggerbatch。对于您提出的解决方案-等待和异步将创建一个任务来处理每个传入的项目,当您有大量事件突发时,这可能会导致内存不足问题,许多任务将无限制地被创建。 - Al Yaros
1
@AlYaros 不会的。如果该项被接受,您将获得一个缓存的“Task”,因此不会有任何分配。如果该项被推迟,您所展示的代码将不会添加新项,直到它被接受。如果在您的实际代码中使用“await”会导致问题,那么我认为您应该能够解决这些问题,否则即使没有使用“await”,您也将遇到问题。 - svick
1
顺便说一下,感谢您的评论 :) 我不确定在任务内存消耗方面这是否绝对安全.. 我会查看您建议的源代码并进行一些测试。无论发布结果如何,任务是否在执行块代码之前创建? - Al Yaros
如果未达到有界容量,我也期望该Post请求将被接受,而不考虑TriggerBatch处理。 - shlomiw
@svick从源代码中可以看到,TriggerBatchOfferMessage方法在lock(IncomingLock)下执行。除此之外,还有什么可以防止在执行TriggerBatch()时导致Post()失败的因素吗?我进行了一些测试,发现如果将batchTrigger的超时时间设置为1000ms,则与将其设置为较低值(如<100ms)相比,'Post()'返回false的次数要少得多。 - Girish

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接