我创建了一个类似于网络爬虫的程序来生成我需要管理的1000多个 Web 服务的报告。为此,我创建了一个 TPL Dataflow Pipeline 来管理获取和处理数据。
我想象中的 Pipeline 程序看起来有点像这样(我的绘画技能很差 :D):
我已经创建了实现,并且一切都正常工作,直到我启动整个 Pipeline。我将500个对象作为输入放入 Pipeline 中,并期望程序会运行一段时间,但是程序在移动到执行块后停止执行。
检查程序流程后,我发现完成速度太快,以至于传播到Dispose块。
我创建了一个小的示例项目,使用相同的 Pipeline 来检查是我的输入类实现还是 Pipeline 本身出了问题。示例代码如下:
这是我的输入类,输入到准备阶段。
我收到的控制台输出是: 计时器已停止 准备中 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 尝试提交 尝试完成
似乎程序在那时停止工作,因为我没有触发任何断点或取得任何进展。我认为我的所有块都已收到完成信号,因此停止接受任何新项。因此我的问题是:如何管理完成信号,使管道只在没有更多工作要做时结束?
![The Pipeline](https://istack.dev59.com/3W3kU.webp)
public class Job
{
public int Ticker { get; set; }
public Type Type { get; }
public Job(Type type)
{
Type = type;
}
public Task Prepare()
{
Console.WriteLine("Preparing");
Ticker = 0;
return Task.CompletedTask;
}
public Task Tick()
{
Console.WriteLine("Ticking");
Ticker++;
return Task.CompletedTask;
}
public bool IsCommitable()
{
Console.WriteLine("Trying to commit");
return IsFinished() || ( Ticker != 0 && Ticker % 100000 == 0);
}
public bool IsFinished()
{
Console.WriteLine("Trying to finish");
return Ticker == 1000000;
}
public void IntermediateCleanUp()
{
Console.WriteLine("intermediate Cleanup");
Ticker = Ticker - 120;
}
public void finalCleanUp()
{
Console.WriteLine("Final Cleanup");
Ticker = -1;
}
}
这是我的输入类,输入到准备阶段。
public class Dataflow
{
private TransformBlock<Job, Job> _preparationsBlock;
private BufferBlock<Job> _balancerBlock;
private readonly ExecutionDataflowBlockOptions _options = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 4
};
private readonly DataflowLinkOptions _linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
private TransformBlock<Job, Job> _typeATickBlock;
private TransformBlock<Job, Job> _typeBTickBlock;
private TransformBlock<Job, Job> _writeBlock;
private TransformBlock<Job, Job> _intermediateCleanupBlock;
private ActionBlock<Job> _finalCleanupBlock;
public async Task Process()
{
CreateBlocks();
ConfigureBlocks();
for (int i = 0; i < 500; i++)
{
await _preparationsBlock.SendAsync(new Job(i % 2 == 0 ? Type.A : Type.B));
}
_preparationsBlock.Complete();
await Task.WhenAll(_preparationsBlock.Completion, _finalCleanupBlock.Completion);
}
private void CreateBlocks()
{
_preparationsBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Prepare();
return job;
}, _options);
_balancerBlock = new BufferBlock<Job>(_options);
_typeATickBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Tick();
return job;
}, _options);
_typeBTickBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Tick();
await job.Tick();
return job;
}, _options);
_writeBlock = new TransformBlock<Job, Job>(job =>
{
Console.WriteLine(job.Ticker);
return job;
}, _options);
_finalCleanupBlock = new ActionBlock<Job>(job => job.finalCleanUp(), _options);
_intermediateCleanupBlock = new TransformBlock<Job, Job>(job =>
{
job.IntermediateCleanUp();
return job;
}, _options);
}
private void ConfigureBlocks()
{
_preparationsBlock.LinkTo(_balancerBlock, _linkOptions);
_balancerBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
_balancerBlock.LinkTo(_typeBTickBlock, _linkOptions, job => job.Type == Type.B);
_typeATickBlock.LinkTo(_typeATickBlock, _linkOptions, job => !job.IsCommitable());
_typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());
_typeBTickBlock.LinkTo(_typeBTickBlock, _linkOptions, job => !job.IsCommitable());
_writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
_writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());
_intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}
}
这是我的数据流水线,代表了上面的"艺术品" :D。所有这些都在我的调度程序中执行,该程序在Programm.cs中启动:
public class Scheduler
{
private readonly Timer _timer;
private readonly Dataflow _flow;
public Scheduler(int intervall)
{
_timer = new Timer(intervall);
_flow = new Dataflow();
}
public void Start()
{
_timer.AutoReset = false;
_timer.Elapsed += _timer_Elapsed;
_timer.Start();
}
private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
{
try
{
_timer.Stop();
Console.WriteLine("Timer stopped");
await _flow.Process().ConfigureAwait(false);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
finally
{
Console.WriteLine("Timer started again.");
_timer.Start();
}
}
}
class Program
{
static void Main(string[] args)
{
var scheduler = new Scheduler(1000);
scheduler.Start();
Console.ReadKey();
}
}
我收到的控制台输出是: 计时器已停止 准备中 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 滴答声 尝试提交 尝试完成 尝试提交 尝试完成
似乎程序在那时停止工作,因为我没有触发任何断点或取得任何进展。我认为我的所有块都已收到完成信号,因此停止接受任何新项。因此我的问题是:如何管理完成信号,使管道只在没有更多工作要做时结束?