使用BufferBlock<T>在数据流网络中的好处

26

我想知道使用与一个或多个ActionBlock链接的BufferBlock是否有其他好处,而不仅仅是限流(使用BoundedCapacity),而直接将消息发送到ActionBlock(只要不需要限流)。

3个回答

27

如果你只想将项从一个块转发到其他几个块,那么你不需要使用BufferBlock

但是,在某些情况下,它是有用的。例如,如果您有一个复杂的数据流网络,您可能希望从较小的子网络构建它,每个子网络在其自己的方法中创建。为此,您需要一种表示块组的方式。在您提到的情况下,从该方法返回单个BufferBlock(可能作为ITargetBlock)将是一种简单的解决方案。

另一个BufferBlock有用的例子是如果您想将项目从多个源块发送到多个目标块。如果您将BufferBlock用作中介,您就不必将每个源块连接到每个目标块。

我确定还有许多其他例子可以使用BufferBlock。当然,如果您在您的情况下看不到使用它的原因,则不要使用。


我确实认为使用BufferBlocks是在数据流块之间进行通信的“更清洁”的方式,但是使用BufferBlocks是否值得(如果有)开销呢? - Dimitri
1
这取决于你自己。如果你觉得它可以让你的代码更清晰,那就去做吧。虽然会有一些额外开销,但我认为除非你真的很在意性能,否则不应该有太大影响。 - svick

25
补充svick的答案,bufferblock还有另一个好处。如果您有一个带有多个输出链接的块,并且想在它们之间平衡,则必须将输出块转换为有界容量为1,并添加bufferblock来处理队列。
这是我们计划要做的:
一些代码块将使用其Post(T t)方法将数据发布到BufferBlock中。
此BufferBlock与3个ActionBlock实例相关联,使用BufferBlock的LinkTo t)方法。
请注意,BufferBlock不会向其链接到的所有目标块交付输入数据的副本。相反,它只向一个目标块交付副本。在这里,我们期望当一个目标正在忙于处理请求时,它将被移交给另一个目标。现在让我们参考下面的代码:
static void Main(string[] args)
{
    BufferBlock<int> bb = new BufferBlock<int>();

    ActionBlock<int> a1 = new ActionBlock<int>(a =>
    {
        Thread.Sleep(100);
        Console.WriteLine("Action A1 executing with value {0}", a);
    });

    ActionBlock<int> a2 = new ActionBlock<int>(a =>
    {
        Thread.Sleep(50);
        Console.WriteLine("Action A2 executing with value {0}", a);
    });

    ActionBlock<int> a3 = new ActionBlock<int>(a =>
    {
        Thread.Sleep(50);
        Console.WriteLine("Action A3 executing with value {0}", a);
    });

    bb.LinkTo(a1);
    bb.LinkTo(a2);
    bb.LinkTo(a3);

    Task t = new Task(() =>
        {
            int i = 0;
            while (i < 10)
            {
                Thread.Sleep(50);
                i++;
                bb.Post(i);
            }
        }
    );

    t.Start();
    Console.Read();
}

执行以上代码会产生以下输出:
- 执行操作A1,值为1 - 执行操作A1,值为2 - 执行操作A1,值为3 - 执行操作A1,值为4 - 执行操作A1,值为5 - 执行操作A1,值为6 - 执行操作A1,值为7 - 执行操作A1,值为8 - 执行操作A1,值为9 - 执行操作A1,值为10
这表明只有一个目标实际上在执行所有数据,即使忙碌(由于故意添加了Thread.Sleep(100))。为什么?
这是因为所有的目标块默认都是贪婪的性质,即使它们无法处理数据,它们也会缓冲输入。为了改变这种行为,我们在初始化ActionBlock时在DataFlowBlockOptions中设置了Bounded Capacity为1,如下所示。
static void Main(string[] args)
{
    BufferBlock<int> bb = new BufferBlock<int>();
    ActionBlock<int> a1 = new ActionBlock<int>(a =>
        {
            Thread.Sleep(100);
            Console.WriteLine("Action A1 executing with value {0}", a);
        }
        , new ExecutionDataflowBlockOptions {BoundedCapacity = 1});
    ActionBlock<int> a2 = new ActionBlock<int>(a =>
        {
            Thread.Sleep(50);
            Console.WriteLine("Action A2 executing with value {0}", a);
        }
        , new ExecutionDataflowBlockOptions {BoundedCapacity = 1});
    ActionBlock<int> a3 = new ActionBlock<int>(a =>
        {
            Thread.Sleep(50);
            Console.WriteLine("Action A3 executing with value {0}", a);
        }
        , new ExecutionDataflowBlockOptions {BoundedCapacity = 1});

    bb.LinkTo(a1);
    bb.LinkTo(a2);
    bb.LinkTo(a3);

    Task t = new Task(() =>
    {
        int i = 0;
        while (i < 10)
        {
            Thread.Sleep(50);
            i++;
            bb.Post(i);
        }
    });

    t.Start();
    Console.Read();
}

这个程序的输出结果如下:
  • 执行操作 A1,值为1
  • 执行操作 A2,值为3
  • 执行操作 A1,值为2
  • 执行操作 A3,值为6
  • 执行操作 A3,值为7
  • 执行操作 A3,值为8
  • 执行操作 A2,值为5
  • 执行操作 A3,值为9
  • 执行操作 A1,值为4
  • 执行操作 A2,值为10
这很明显是数据在三个 ActionBlock 中按预期进行的分布。

2
无法编译第二个示例。 - Nathan
1
嗨, 如果将每个ActionBlock的BoundedCapacity属性设置为“1”,而不是设置所有这些配置设置,就可以实现“贪婪”行为。 - Jose Roberto Araujo
我更新了这个答案,去掉了贪婪属性,因为它只适用于分组数据流块。理想情况下,我想为分组情况创建一个单独的示例,并使用greedy = false,因为这也是批量块的一个好用例。 - VoteCoffee

5
不,第二个示例无法编译,原因有几个:仅对“分组”数据流块设置greedy=false是可能的,而不是执行块;然后必须通过GroupingDataflowBlockOptions设置它,而不是DataflowBlockOptions;然后将其设置为属性值“{ Greedy = false }”,而不是构造函数参数。
如果您想限制操作块的容量,请通过设置DataflowBlockOptions的BoundedCapacity属性值来实现(尽管正如OP所述,他们已经意识到此选项)。像这样:
var a1 = new ActionBlock<int>(
            i => doSomeWork(i), 
            new ExecutionDataflowBlockOptions {BoundedCapacity = 1}
        );

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接