如何正确管理TPL Dataflow中的完成状态。

8
我创建了一个类似于网络爬虫的程序来生成我需要管理的1000多个 Web 服务的报告。为此,我创建了一个 TPL Dataflow Pipeline 来管理获取和处理数据。 我想象中的 Pipeline 程序看起来有点像这样(我的绘画技能很差 :D): The Pipeline 我已经创建了实现,并且一切都正常工作,直到我启动整个 Pipeline。我将500个对象作为输入放入 Pipeline 中,并期望程序会运行一段时间,但是程序在移动到执行块后停止执行。 检查程序流程后,我发现完成速度太快,以至于传播到Dispose块。 我创建了一个小的示例项目,使用相同的 Pipeline 来检查是我的输入类实现还是 Pipeline 本身出了问题。示例代码如下:
public class Job
{
    public int Ticker { get; set; }

    public Type Type { get; }

    public Job(Type type)
    {
        Type = type;
    }

    public Task Prepare()
    {
        Console.WriteLine("Preparing");
        Ticker = 0;
        return Task.CompletedTask;
    }

    public Task Tick()
    {
        Console.WriteLine("Ticking");
        Ticker++;
        return Task.CompletedTask;
    }

    public bool IsCommitable()
    {
        Console.WriteLine("Trying to commit");
        return IsFinished() || ( Ticker != 0 && Ticker % 100000 == 0);
    }

    public bool IsFinished()
    {
        Console.WriteLine("Trying to finish");
        return Ticker == 1000000;
    }

    public void IntermediateCleanUp()
    {
        Console.WriteLine("intermediate Cleanup");
        Ticker = Ticker - 120;
    }

    public void finalCleanUp()
    {
        Console.WriteLine("Final Cleanup");
        Ticker = -1;
    }
}

这是我的输入类,输入到准备阶段。
public class Dataflow
{
    private TransformBlock<Job, Job> _preparationsBlock;

    private BufferBlock<Job> _balancerBlock;

    private readonly ExecutionDataflowBlockOptions _options = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 4
    };

    private readonly DataflowLinkOptions _linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

    private TransformBlock<Job, Job> _typeATickBlock;

    private TransformBlock<Job, Job> _typeBTickBlock;

    private TransformBlock<Job, Job> _writeBlock;

    private TransformBlock<Job, Job> _intermediateCleanupBlock;

    private ActionBlock<Job> _finalCleanupBlock;

    public async Task Process()
    {
        CreateBlocks();

        ConfigureBlocks();

        for (int i = 0; i < 500; i++)
        {
            await _preparationsBlock.SendAsync(new Job(i % 2 == 0 ? Type.A : Type.B));
        }
        _preparationsBlock.Complete();

        await Task.WhenAll(_preparationsBlock.Completion, _finalCleanupBlock.Completion);
    }

    private void CreateBlocks()
    {
        _preparationsBlock = new TransformBlock<Job, Job>(async job =>
        {
            await job.Prepare();
            return job;
        }, _options);

        _balancerBlock = new BufferBlock<Job>(_options);

        _typeATickBlock = new TransformBlock<Job, Job>(async job =>
        {
            await job.Tick();
            return job;
        }, _options);

        _typeBTickBlock = new TransformBlock<Job, Job>(async job =>
        {
            await job.Tick();
            await job.Tick();
            return job;
        }, _options);

        _writeBlock = new TransformBlock<Job, Job>(job =>
        {
            Console.WriteLine(job.Ticker);
            return job;
        }, _options);

        _finalCleanupBlock = new ActionBlock<Job>(job => job.finalCleanUp(), _options);

        _intermediateCleanupBlock = new TransformBlock<Job, Job>(job =>
        {
            job.IntermediateCleanUp();
            return job;
        }, _options);
    }

    private void ConfigureBlocks()
    {
        _preparationsBlock.LinkTo(_balancerBlock, _linkOptions);

        _balancerBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
        _balancerBlock.LinkTo(_typeBTickBlock, _linkOptions, job => job.Type == Type.B);

        _typeATickBlock.LinkTo(_typeATickBlock, _linkOptions, job => !job.IsCommitable());
        _typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());

        _typeBTickBlock.LinkTo(_typeBTickBlock, _linkOptions, job => !job.IsCommitable());

        _writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
        _writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());

        _intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
    }
}

这是我的数据流水线,代表了上面的"艺术品" :D。所有这些都在我的调度程序中执行,该程序在Programm.cs中启动:

public class Scheduler
{
    private readonly Timer _timer;

    private readonly Dataflow _flow;


    public Scheduler(int intervall)
    {
        _timer = new Timer(intervall);
        _flow = new Dataflow();
    }

    public void Start()
    {
        _timer.AutoReset = false;
        _timer.Elapsed += _timer_Elapsed;
        _timer.Start();
    }

    private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        try
        {
            _timer.Stop();
            Console.WriteLine("Timer stopped");
            await _flow.Process().ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            Console.WriteLine("Timer started again.");
            _timer.Start();
        }
    }
}

class Program
{
    static  void Main(string[] args)
    {
        var scheduler = new Scheduler(1000);
        scheduler.Start();

        Console.ReadKey();

    }
}

我收到的控制台输出是: 计时器已停止 准备中 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 尝试提交 尝试完成
似乎程序在那时停止工作,因为我没有触发任何断点或取得任何进展。我认为我的所有块都已收到完成信号,因此停止接受任何新项。因此我的问题是:如何管理完成信号,使管道只在没有更多工作要做时结束?

2
我大声朗读给我的女友听:“计时器停止准备滴答尝试提交尝试完成滴答尝试提交尝试完成滴答尝试提交尝试完成滴答尝试提交尝试完成滴答尝试提交尝试完成滴答尝试提交尝试完成滴答尝试提交尝试完成滴答尝试提交尝试完成滴答尝试提交尝试完成尝试提交尝试完成”,她说这听起来像是一台电脑在做爱 :) :) - CodingYoshi
不错。"真正的"程序听起来不像这样。我只是不想在这里发布5000行代码,因为那样可能不太容易调试,会有很多额外的开销。 - Brezelmann
4
你的问题没有问题,只是当她那么说时,我觉得很有趣。只是想让气氛轻松一些。 - CodingYoshi
实现这一点的方法是为您的任务处理一个计数器,这样您就可以了解每个发现的任务何时开始,并且只有在完成管道后才能完成。 - VMAtm
如果您不打算处理类型b,则应将它们发送到NullTarget。这将丢弃所有消息并防止积压。 - JSteward
显示剩余2条评论
1个回答

4
您的流程存在主要问题,即反馈循环到您的滴答块。这会导致两个问题。
  • 后压
  • 完成流程

首先:后压

_typeATickBlock与自身相连时,一旦达到容量,它将停止接受所有消息。在您的情况下,容量为4,这意味着一旦输出缓冲区中有3个消息正在处理和一个被处理,它将停止接受和传递消息。您可以通过将以下行添加到块中来查看此情况:

Console.WriteLine($"Tick Block {_typeATickBlock.InputCount}/{_typeATickBlock.OutputCount}");

将会输出:

Tick Block 0/3

为了解决这个问题,您可以添加任何缓冲块,例如Buffer或Transform。关键是缓冲区的有界容量。在您的情况下,每个消息都需要重新路由回tick块。因此,您需要知道您的容量需要匹配任何给定时间的消息数量。在这种情况下,容量为500。
_printingBuffer = new TransformBlock<Job, Job>(job =>
{
    Console.WriteLine($"{_printingBuffer.InputCount}/{_printingBuffer.OutputCount}");
    return job;
}, new ExecutionDataflowBlockOptions() { BoundedCapacity = 500 });

在您的实际代码中,您可能不知道这个值,而Unbounded可能是避免锁定管道的最佳选项,但是您可以根据输入的数量来调整此值。

第二:完成流程

在流水线中使用反馈循环进行完成传播比仅设置链接选项更加困难。一旦完成到达刻度块,它就停止接受所有消息,即使仍需要处理这些消息。为了避免这种情况,您需要暂停传播,直到所有消息都通过循环。首先,在刻度块之前停止传播,然后检查参与循环的每个块上的缓冲区。然后,一旦所有缓冲区都为空,就将完成和故障传播到块。

_balancerBlock.Completion.ContinueWith(tsk =>
{
    while (!_typeATickBlock.Completion.IsCompleted)
    {
        if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
        && _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
        {
            _typeATickBlock.Complete();
        }
    }
});

最后

您的完整的ConfigureBlocks设置和插入缓冲区应该看起来像这样。请注意,我只传递了完成状态而不是故障,并删除了B类型分支。

private void ConfigureBlocks()
{
    _preparationsBlock.LinkTo(_balancerBlock, _linkOptions);

    _balancerBlock.LinkTo(_typeATickBlock, job => job.Type == Type.A);

    _balancerBlock.Completion.ContinueWith(tsk =>
    {
        while (!_typeATickBlock.Completion.IsCompleted)
        {
            if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
            && _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
            {
                _typeATickBlock.Complete();
            }
        }
    });

    _typeATickBlock.LinkTo(_printingBuffer, job => !job.IsCommitable());
    _printingBuffer.LinkTo(_typeATickBlock);
    _typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());            

    _writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
    _writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());

    _intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}

我之前写过一篇博客,讲述如何通过反馈循环来处理完成情况。博客已经不再更新,但是你可以从WayBackMachine中找到它。

在复杂的流程中寻找完成:反馈循环


好答案(还有好的博客:D)。检查输入和输出队列的想法非常好。但是我有两个小问题。我不应该也检查_writeBlock和_intermediateCleanupBlock的队列吗?因为它们也会创建一个循环到_typeABlock。那么正在处理的作业呢?它们计入输入或输出计数吗?问题背后的想法是,通常一个作业可能需要长时间运行(考虑4分钟)在_typeABlock中,并且不能在仅一轮后提交。因此它需要再次返回。 - Brezelmann
1
啊,是的,我只加入了最少量的检查来确保每个时刻的快速处理时间能够正常运行您的流程。您说得对,您将不得不检查循环中的每个块,现在计算新缓冲区,您有4个块,每个块都需要进行正在处理的计数器检查和缓冲区检查,以确保没有项目正在处理,也没有项目在等待。您可以想象这会增加很多代码,而博客文章则详细介绍了如何抽象出一些额外的复杂性。简而言之,数据流循环并不容易。 - JSteward
1
如果您对如何实现这些策略有具体问题,我可以在回答中发布代码片段。另一个选择是使用DataflowEx库,它有反馈循环模式的另一种实现方式以及更多功能,我建议您深入了解。在处理数据流时,它提供了许多有用的功能。 - JSteward
1
具体来说,这可能有助于Cyclic graph and ring completion detection - JSteward
1
@TheodorZoulias,是的,我也不满意,应该有一个更健壮的解决方案,但我最近没有花时间开发它。随时欢迎您编辑或添加答案以供未来的谷歌搜索者参考。我认为问题归结为“数据流循环很难”,哈哈。 - JSteward
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接