当其他任务完成时通知任务

4

.Net TPL专家们,

注意:不能使用DataFlow库;不允许使用附加组件。

我有四个任务,如下图所示:

enter image description here

  • 任务1(数据生产者)- 从大文件中读取记录(>500000条记录),并将记录添加到BlockingCollection中。

  • 任务2、任务3(数据消费者)- 每个任务从BlockingCollection中获取记录。每个任务对从BlockingCollection中获取的记录执行一些工作(与网络相关),完成后,每个任务都可以向结果队列添加一个记录。处理顺序不重要。

  • 任务4(结果处理器)- 从结果队列中获取记录,并写入输出文件。

然后等待任务完成,即:

Task.WhenAll( t1, t2, t3, t4 )

我有一个生产者任务,多个消费者任务和一个保存结果的任务。
我的问题是:
如何在任务2和3完成时通知任务4,以便任务4也知道何时结束?
我找到了许多示例,可以将数据从一个任务移动到另一个任务,以线性“管道”方式运行,但没有找到任何说明上述问题的示例。我的最初想法是向任务4“注册”任务2和3,并监视每个已注册任务的状态 - 当任务2和3不再运行时,任务4可以停止(如果结果队列也为空)。
提前致谢。

1
您无法为项目添加TPL Dataflow NuGet Package(https://www.nuget.org/packages/Microsoft.Tpl.Dataflow)吗? - VMAtm
对于这个特定的项目,不允许使用TPL Dataflow。 - bdcoder
TPL Dataflow现在已经内置于.NET平台(.NET Core)中。 - Theodor Zoulias
3个回答

0

如果您也使用BlockingCollection作为results_queue,那么您可以通过使用属性BlockingCollection.IsCompleted和BlockingCollection.IsAddingCompleted来实现这些通知。

流程如下:

  • task1在输入集合中没有更多记录时调用方法BlockingCollection.CompleteAdding()。
  • task2和task3定期检查输入集合上的IsCompleted属性。当输入集合为空且生产者调用了CompleteAdding()方法时,此属性为true。在此属性为true后,任务2和任务3完成,并可以在结果队列上调用CompleteAdding()方法并完成其工作。
  • task4可以在结果队列中到达记录时处理它们,或者可以等待结果队列的IsAddingCompleted属性变为true,然后开始处理。当结果队列上的IsCompleted属性为true时,task4的工作就完成了。

编辑: 我不确定您是否熟悉这些IsCompleted和IsAddingCompleted属性。它们是不同的,并且非常适合您的情况。我认为除了BlockingCollection属性之外,您不需要任何其他同步元素。如果需要额外的解释,请随时提问!

    BlockingCollection<int> inputQueue;
    BlockingCollection<int> resultQueue;

    public void StartTasks()
    {
        inputQueue = new BlockingCollection<int>();
        resultQueue = new BlockingCollection<int>();

        Task task1 = Task.Run(() => Task1());
        Task task2 = Task.Run(() => Task2_3());
        Task task3 = Task.Run(() => Task2_3());
        Task[] tasksInTheMiddle = new Task[] { task2, task3 };
        Task waiting = Task.Run(() => Task.WhenAll(tasksInTheMiddle).ContinueWith(x => resultQueue.CompleteAdding()));
        Task task4 = Task.Run(() => Task4());

        //Waiting for tasks to finish
    }
    private void Task1()
    {
        while(true)
        {
            int? input = ReadFromInputFile();
            if (input != null)
            {
                inputQueue.Add((int)input);
            }
            else
            {
                inputQueue.CompleteAdding();
                break;
            }
        }
    }

    private void Task2_3()
    {
        while(inputQueue.IsCompleted)
        {
            int input = inputQueue.Take();
            resultQueue.Add(input);
        }
    }

    private void Task4()
    {
        while(resultQueue.IsCompleted)
        {
            int result = resultQueue.Take();
            WriteToOutputFile(result);
        }
    }

以上的工作方式不太清楚。即使在到达输入文件结尾后,任务2和任务3仍然可能会向结果队列添加记录。我真正需要知道的是,当任务2和任务3完成(运行完毕)时,因此我正在考虑监视这些任务的状态,以确保所有结果都已完成。 - bdcoder
我现在已经阅读了Scott Chamberlain的评论。最好的方法可能是将我的解决方案与他的结合起来。如果可以并行处理,那么等待任务2和3完成后再进行最终的任务4处理就没有意义了。但从他的解决方案中,最好使用另一个任务(Continue.WhenAll)在结果队列上设置CompleteAdding - 这样你就真正确定这些任务何时完成了。在任务2和3仍在写入的同时,任务4可以调用结果队列上的Take()方法并同时添加到输出文件中。 - Thomas
在Scott的解决方案中,只需放置他的方法调用:Stage2MonitorStart(t2,t3);,这样它就不会阻止Task4Start的调用。当然,如果您的实现是在处理任务完成之前甚至可以写入输出文件,则可以在中间执行。 - Thomas

0
您所描述的任务可以很好地适用于TPL Dataflow库,这是一个TPL的小附加组件(它可以通过nuget包包含在项目中,支持.NET 4.5),您只需轻松引入类似以下的流程即可(根据BroadcastBlock的评论更新了代码):
var buffer = new BroadcastBlock<string>();
var consumer1 = new TransformBlock<string, string>(s => { /* your action here for a string */});
var consumer2 = new TransformBlock<string, string>(s => { /* your action here for a string */});
var resultsProcessor = new ActionBlock<string>(s => { /* your logging logic here */ });

我不确定你的解决方案逻辑,所以我认为你只是在这里操作字符串。你应该异步发送所有传入的数据作为第一个块(如果你Post你的数据,如果缓冲区过载,消息将被丢弃),并在它们之间链接块,就像这样:

buffer.LinkTo(consumer1, new DataflowLinkOptions { PropagateCompletion = true });
buffer.LinkTo(consumer2, new DataflowLinkOptions { PropagateCompletion = true });
consumer1.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true });
consumer2.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true });

foreach (var s in IncomingData)
{
    await buffer.SendAsync(s);
}
buffer.Complete();

如果你的消费者都需要处理所有项目,那么你应该使用BroadcastBlock(可能会出现一些有关保证传递的问题),另一个选项是通过消费者过滤消息(也许通过消息ID除以消费者数量的余数),但在这种情况下,你应该链接到另一个消费者,该消费者将“捕获”由于某种原因未被消费的所有消息。
如你所见,块之间的链接是完全传播的,因此在此之后,你可以简单地附加到.Completion 任务属性以获取resultsProcessor:
resultsProcessor.Completion.ContinueWith(t => { /* Processing is complete */ });

请注意,BufferBlock 只会将项目提供给第一个消费者,这不是 OP 所期望的。为了克服这个问题,您应该将 BufferBlock 链接到 TransmitBlock,并将 TransmitBlock 链接到每个消费者。 - Eyal Perry
请注意,SendAsync应该被等待。 - Eyal Perry
@EyalPerry TransmitBlock - 你指的是什么块?我从来没有见过那个。你是不是指 BroadCastBlock - VMAtm
是的,抱歉。今天过得有点累了 :) - Eyal Perry
我没有建议使用BroadcastBlock的原因是它会将当前消息提供给所有消费者,这可能会让操作员感到意外。也许可以通过一些谓词来链接块,我会在答案中加入这个建议。 - VMAtm
1
如果我正确理解了 OP 的意思,那么这正是他想要的。他确实说过两个消费者都会收到物品。 - Eyal Perry

0

这是对Thomas所说的内容的一点扩展。

通过使用BlockingCollection,您可以在其上调用GetConsumingEnumerable(),并将其视为正常的foreach循环。这将让您的任务“自然”结束。唯一需要做的是添加一个额外的任务来监视任务2和3何时结束,并在它们上调用完成添加。

private BlockingCollection<Stage1> _stageOneBlockingCollection = new BlockingCollection<Stage1>();
private BlockingCollection<Stage2> _stageTwoBlockingCollection = new BlockingCollection<Stage2>();

Task RunProcess()
{
    Task1Start();
    var t2 = Stage2Start();
    var t3 = Stage2Start();
    Stage2MonitorStart(t2,t3);
    retrun Task4Start();
}

public void Task1Start()
{
    Task.Run(()=>
    {
        foreach(var item in GetFileSource())
        {
            var processedItem = Process(item);
            _stageOneBlockingCollection.Add(processedItem);
        }
        _stageOneBlockingCollection.CompleteAdding();
    }
}

public Task Stage2Start()
{
    return Task.Run(()=>
    {
        foreach(var item in _stageOneBlockingCollection.GetConsumingEnumerable())
        {
            var processedItem = ProcessStage2(item);
            _stageTwoBlockingCollection.Add(processedItem);
        }
    }
}

void Stage2MonitorStart(params Task[] tasks)
{
    //Once all tasks complete mark the collection complete adding.
    Task.WhenAll(tasks).ContinueWith(t=>_stageTwoBlockingCollection.CompleteAdding());
}

public Task Stage4Start()
{
    return Task.Run(()=>
    {
        foreach(var item in _stageTwoBlockingCollection.GetConsumingEnumerable())
        {
            var processedItem = ProcessStage4(item);
            WriteToOutputFile(processedItem);
        }
    }
}

这个程序会执行精确的行为。所有任务都是并发运行的,注意在所有函数中都有 Task.Run。从 RunProcess 返回的 Task 是一个代表整个过程完成的任务。 - Scott Chamberlain
是的 - 看起来你是赢家 - 非常感谢!! - bdcoder

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接