缓冲块(BufferBlock)和动作块(ActionBlock)使用有界容量(BoundedCapacity)时不会使用最大DOP。

7

I have this code:

var data = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 1 });

var action = new ActionBlock<int>(async id =>
{
    Console.WriteLine("[{0:T}] #{1}: Start", DateTime.Now, id);

    await Task.Delay(1000);

    Console.WriteLine("[{0:T}] #{1}: End", DateTime.Now, id);
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 1,
    MaxDegreeOfParallelism = -1
});

data.LinkTo(action, new DataflowLinkOptions { PropagateCompletion = true });

for (var id = 1; id <= 3; id++)
{
    Console.WriteLine("[{0:T}] Sending {1}", DateTime.Now, id);
    data.SendAsync(id).Wait();
    Console.WriteLine("[{0:T}] Sending {1} complete", DateTime.Now, id);
}

data.Complete();

Task.WhenAll(data.Completion, action.Completion).Wait();

这段代码会输出如下结果:

[22:31:22] Sending 1
[22:31:22] Sending 1 complete
[22:31:22] Sending 2
[22:31:22] #1: Start
[22:31:22] Sending 2 complete
[22:31:22] Sending 3
[22:31:23] #1: End
[22:31:23] #2: Start
[22:31:23] Sending 3 complete
[22:31:24] #2: End
[22:31:24] #3: Start
[22:31:25] #3: End

即使 ActionBlock 具有无限制的 DOP,为什么它仍然无法并行工作?

1个回答

8
你的 ActionBlock 看起来似乎只有有限程度的并行性,这是因为它的 BoundedCapacity 为1。与 InputCount 不同,BoundedCapacity 包括正在处理的项目。这很容易证明:
var block = new ActionBlock<int>(_ => Task.Delay(-1), new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 1,
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});

await block.SendAsync(4); // Adds a new item
await block.SendAsync(4); // Blocks forever

这意味着,当你将 MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 设置时,该块无法同时接受多个项目,从而实际上限制了您的并行度。您可以通过设置更大的 BoundedCapacity 来解决这个问题:
var action = new ActionBlock<int>(async id =>
{
    Console.WriteLine("[{0:T}] #{1}: Start", DateTime.Now, id);
    await Task.Delay(1000);
    Console.WriteLine("[{0:T}] #{1}: End", DateTime.Now, id);
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 10,
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});

好的,但我也将DOP设置为-1。我认为Dataflow应该根据需要创建尽可能多的ActionBlock副本,这意味着即使一个ActionBlock实例获得了一个项目并且有另一个项目进来 - 它也可以创建另一个ActionBlock实例来处理新项目。就像这样。不是吗? - Michael Logutov
1
@MichaelLogutov 没有块会复制自己。除非你创建更多,否则只有一个实例。块使用任务进行并行处理,但如果限制其容量,则无法容纳足够的项目以并行处理它们。 - i3arnon
1
谢谢。我不知道那个。而且文档在这一点上真的很缺乏。 - Michael Logutov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接