TPL Dataflow,从广播块到批处理块

4
我在连接BroadcastBlock(s)BatchBlocks时遇到了问题。情境是源头是BroadcastBlocks,接收者是BatchBlocks

在下面的简化代码中,只有一个附加操作块会执行。我甚至将每个BatchBlock的batchSize设置为1,以说明这个问题。
如果将Greedy设为“true”,那么2个ActionBlocks就会被执行,但这不是我想要的,因为这会导致BatchBlock即使还没完成也继续进行。你有什么想法吗?
class Program
{
    static void Main(string[] args)
    {
        // My possible sources are BroadcastBlocks. Could be more
        var source1 = new BroadcastBlock<int>(z => z);

        // batch 1
        // can be many potential sources, one for now
        // I want all sources to arrive first before proceeding
        var batch1 = new BatchBlock<int>(1, new GroupingDataflowBlockOptions() { Greedy = false }); 
        var batch1Action = new ActionBlock<int[]>(arr =>
        {
            // this does not run sometimes
            Console.WriteLine("Received from batch 1 block!");
            foreach (var item in arr)
            {
                Console.WriteLine("Received {0}", item);
            }
        });

        batch1.LinkTo(batch1Action, new DataflowLinkOptions() { PropagateCompletion = true });

        // batch 2
        // can be many potential sources, one for now
        // I want all sources to arrive first before proceeding
        var batch2 = new BatchBlock<int>(1, new GroupingDataflowBlockOptions() { Greedy = false  });
        var batch2Action = new ActionBlock<int[]>(arr =>
        {
            // this does not run sometimes
            Console.WriteLine("Received from batch 2 block!");
            foreach (var item in arr)
            {
                Console.WriteLine("Received {0}", item);
            }
        });
        batch2.LinkTo(batch2Action, new DataflowLinkOptions() { PropagateCompletion = true });

        // connect source(s)
        source1.LinkTo(batch1, new DataflowLinkOptions() { PropagateCompletion = true });
        source1.LinkTo(batch2, new DataflowLinkOptions() { PropagateCompletion = true });

        // fire
        source1.SendAsync(3);

        Task.WaitAll(new Task[] { batch1Action.Completion, batch2Action.Completion }); ;

        Console.ReadLine();
    }
}

2
我认为将 Greedy 设为 true 正确的解决方案。如果您担心的是会创建较小的批次,则不会发生这种情况。 - svick
1个回答

0

看起来TPL Dataflow库内部机制中支持非贪婪功能存在缺陷。当一个配置为非贪婪的BatchBlock从链接块中接收到所有提供的消息时,它会将它们推迟而不是接受它们。它会保留一个内部队列,其中包含已经推迟的消息。当这些消息数量达到其BatchSize配置时,它会尝试消耗推迟的消息,如果成功,就像预期的那样向下传播它们。问题在于,像BroadcastBlockBufferBlock这样的源块将停止向已推迟先前提供的消息的块提供更多的消息,直到它消耗了这条单一的消息。这两种行为的组合导致死锁。无法取得任何进展,因为BatchBlock在消耗推迟的消息之前等待更多的消息被提供,而BroadcastBlock则在推迟的消息被消耗之前等待提供更多的消息...

这种情况仅在BatchSize大于1时发生(这是此块的典型配置)。

这里演示了这个问题。使用更常见的BufferBlock而不是BroadcastBlock作为源,将10条消息发布到三个块管道中,预期行为是消息流经管道到达最后一个块。实际上什么也没有发生,所有消息都停留在第一个块中。

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

public static class Program
{
    static void Main(string[] args)
    {
        var bufferBlock = new BufferBlock<int>();

        var batchBlock = new BatchBlock<int>(batchSize: 2,
            new GroupingDataflowBlockOptions() { Greedy = false });

        var actionBlock = new ActionBlock<int[]>(batch =>
            Console.WriteLine($"Received: {String.Join(", ", batch)}"));

        bufferBlock.LinkTo(batchBlock,
            new DataflowLinkOptions() { PropagateCompletion = true });

        batchBlock.LinkTo(actionBlock,
            new DataflowLinkOptions() { PropagateCompletion = true });

        for (int i = 1; i <= 10; i++)
        {
            var accepted = bufferBlock.Post(i);
            Console.WriteLine(
                $"bufferBlock.Post({i}) {(accepted ? "accepted" : "rejected")}");
            Thread.Sleep(100);
        }

        bufferBlock.Complete();
        actionBlock.Completion.Wait(millisecondsTimeout: 1000);
        Console.WriteLine();
        Console.WriteLine($"bufferBlock.Completion: {bufferBlock.Completion.Status}");
        Console.WriteLine($"batchBlock.Completion:  {batchBlock.Completion.Status}");
        Console.WriteLine($"actionBlock.Completion: {actionBlock.Completion.Status}");
        Console.WriteLine($"bufferBlock.Count: {bufferBlock.Count}");
    }
}

输出:

bufferBlock.Post(1) accepted
bufferBlock.Post(2) accepted
bufferBlock.Post(3) accepted
bufferBlock.Post(4) accepted
bufferBlock.Post(5) accepted
bufferBlock.Post(6) accepted
bufferBlock.Post(7) accepted
bufferBlock.Post(8) accepted
bufferBlock.Post(9) accepted
bufferBlock.Post(10) accepted

bufferBlock.Completion: WaitingForActivation
batchBlock.Completion:  WaitingForActivation
actionBlock.Completion: WaitingForActivation
bufferBlock.Count: 10

我的猜测是,内部的offer-consume-reserve-release机制已经被调整为最大效率地支持BoundedCapacity功能,这对许多应用程序至关重要,而很少使用的Greedy = false功能则没有经过彻底测试。

好消息是,在您的情况下,您实际上不需要将 Greedy 设置为 false。默认贪婪模式下的 BatchBlock 不会传播少于配置的 BatchSize 的消息,除非它已被标记为完成并传播任何剩余的消息,或者您在任意时刻手动调用其 TriggerBatch 方法。非贪婪配置的预期用途是防止在 复杂图形场景 中出现资源耗尽,其中存在多个块之间的依赖关系。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接