TransformBlock永远不会完成

17

我正在努力理解TPL Dataflow模块中的“完成(completion)”概念。特别是,TransformBlock似乎从来没有完成过。为什么呢?

示例程序

我的代码计算了从1到1000的所有整数的平方。我使用了一个BufferBlock和一个TransformBlock。稍后在我的代码中,我等待TransformBlock完成。但是该块实际上从未完成过,我不明白为什么。

static void Main(string[] args)
{
    var bufferBlock = new BufferBlock<int>();
    var calculatorBlock = new TransformBlock<int, int>(i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        return (int)Math.Pow(i, 2);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

    using (bufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    {
        foreach (var number in Enumerable.Range(1, 1000))
        {
            bufferBlock.Post(number);
        }

        bufferBlock.Complete();

        // This line never completes
        calculatorBlock.Completion.Wait();

        // Unreachable code
        IList<int> results;
        if (calculatorBlock.TryReceiveAll(out results))
        {
            foreach (var result in results)
            {
                Console.WriteLine("x² = {0}", result);
            }
        }
    }
}

一开始我以为自己创建了死锁,但事实并非如此。当我在调试器中检查calculatorBlock.Completion任务时,它的Status属性被设置为WaitingForActivation。那一刻我的大脑死机了。

3个回答

14

你的管道停止工作的原因是BufferBlockTransformBlock都显然不会在排空所有项目之前完成(尽管我猜测这是IPropagatorBlock的期望行为,但我没有找到相关文档)。

可以通过一个更简单的示例来验证这一点:

var bufferBlock = new BufferBlock<int>();
bufferBlock.Post(0);
bufferBlock.Complete();
bufferBlock.Completion.Wait();

除非在完成之前添加 bufferBlock.Receive();,否则此代码块将无限期地阻塞。

如果您在阻塞之前从管道中移除项目,可以通过您的TryReceiveAll代码块、连接另一个ActionBlock到管道、将TransformBlock转换为ActionBlock或任何其他方式来解决此问题,这样就不会再出现阻塞。


关于您的特定解决方案,似乎根本不需要BufferBlockTransformBlock,因为块有自己的输入队列,而且您也不使用TransformBlock的返回值。这可以通过仅使用ActionBlock来实现:

var block = new ActionBlock<int>(
    i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        Console.WriteLine("x² = {0}", (int)Math.Pow(i, 2));
    },
    new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 8});
foreach (var number in Enumerable.Range(1, 1000))
{
    block.Post(number);
}
block.Complete();
block.Completion.Wait();

2
文档只是这样说的:在数据流块上调用“Complete”之后,该块将完成,并且在处理完所有先前可用的数据后,其完成任务将进入最终状态。我认为“processed”意味着“转换”。我从未想过它意味着“转换和接收”。 - Steven Liekens
1
@StevenLiekens 我自己也感到相当惊讶。自从你问了这个问题以来,我一直在测试BufferBlock并查看源代码。 - i3arnon

11

我现在认为我理解了。只有当以下条件满足时,TransformBlock 实例才被视为“完成”:

  1. 已调用 TransformBlock.Complete()
  2. InputCount == 0 – 块将其转换应用于每个传入元素
  3. OutputCount == 0 – 所有转换后的元素已离开输出缓冲区

在我的程序中,没有与源 TransformBlock 相连的目标块,因此源块永远无法刷新其输出缓冲区。

作为解决方法,我添加了第二个BufferBlock,用于存储转换后的元素。

static void Main(string[] args)
{
    var inputBufferBlock = new BufferBlock<int>();
    var calculatorBlock = new TransformBlock<int, int>(i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        return (int)Math.Pow(i, 2);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
    var outputBufferBlock = new BufferBlock<int>();
    using (inputBufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    using (calculatorBlock.LinkTo(outputBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    {
        foreach (var number in Enumerable.Range(1, 1000))
        {
            inputBufferBlock.Post(number);
        }

        inputBufferBlock.Complete();
        calculatorBlock.Completion.Wait();

        IList<int> results;
        if (outputBufferBlock.TryReceiveAll(out results))
        {
            foreach (var result in results)
            {
                Console.WriteLine("x² = {0}", result);
            }
        }
    }
}

2

TransformBlock需要一个ITargetBlock,它可以将变换结果发布到该目标块。

 var writeCustomerBlock = new ActionBlock<int>(c => Console.WriteLine(c));
        transformBlock.LinkTo(
            writeCustomerBlock, new DataflowLinkOptions
            {
                PropagateCompletion = true
            });

在此之后,它完成了。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接