TPL Dataflow中使用BroadcastBlock出现重复异常

32

我尝试使用TPL数据流来创建一个管道。到目前为止一切都工作正常,我的管道定义如下(虽然我的问题只涉及广播器、提交成功和提交失败):

// Define tasks
var productListingBatchBuffer = new BufferBlock<PostSubmissionState>();
var splitFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SplitFile(s));
var saveFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SaveFile(s));
var postSubmission = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => PostSubmission(s));
var broadcaster = new BroadcastBlock<PostSubmissionState>(state => state);
var submissionSucceeded = new ActionBlock<PostSubmissionState>(s => SubmissionSucceeded(s));
var submissionFailed = new ActionBlock<PostSubmissionState>(s => SubmissionFailed(s));

// Link em up
productListingBatchBuffer.LinkTo(splitFile, new DataflowLinkOptions() { PropagateCompletion = true });
splitFile.LinkTo(saveFile, new DataflowLinkOptions() { PropagateCompletion = true });
saveFile.LinkTo(postSubmission, new DataflowLinkOptions() { PropagateCompletion = true });
postSubmission.LinkTo(broadcaster, new DataflowLinkOptions() { PropagateCompletion = true });
broadcaster.LinkTo(submissionSucceeded, new DataflowLinkOptions() { PropagateCompletion = true }, state => state.PostSucceeded);
broadcaster.LinkTo(submissionFailed, new DataflowLinkOptions() { PropagateCompletion = true }, state => !state.PostSucceeded);

我遇到的问题与异常传播有关。因为我的BroadcastBlock将其完成(及因此产生的任何故障)传播到两个块,如果确实发生异常,它就会被传播到这两个块。因此,当我执行以下操作时:

Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);

我最终得到了一个包含两个异常的聚合异常。目前我能做的最好的办法是对它们进行筛选,即:

try
{
    Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
}
catch (AggregateException ex)
{
    var uniqueExceptions = new AggregateException(ex.Flatten().InnerExceptions.Distinct());
    Console.WriteLine("An exception was thrown.\n{0}", uniqueExceptions.Flatten());
}

但我在想是否有更好的方法来做到这一点。也就是说,如果只发生一个异常,我只想抛出一个异常。我是 Dataflow 的新手,所以正在发现所有的约定。


1
submissionSucceededsubmissionFailed应该分别作为不同的块吗?如果将它们合并成一个块,并在其中检查state.PostSucceeded,那么问题就会得到解决。 - svick
是的,在这种情况下很有可能会发生。但更普遍的情况是,每当流程分裂时都会发生。例如,在下面的链接中 - 如果广播器中抛出异常,您最终会在存储处理器中得到重复的异常:http://taskmatics.com/blog/simplifying-producer-consumer-processing-with-tpl-dataflow-structures/ - bornfromanegg
这里有一篇来自 MSDN 的关于异常处理的好文章:http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235570.aspx。 - Alex
1个回答

1

我写了一个TPL数据流示例(https://github.com/squideyes/PodFetch),它采用了稍微不同的完成和错误处理方法。这是Program.cs中从第171到201行的相关代码:

    scraper.LinkTo(fetcher, link => link != null);
    scraper.LinkTo(DataflowBlock.NullTarget<Link>());

    scraper.HandleCompletion(fetcher);

    Status.Info.Log("Fetching APOD's archive list");

    links.ForEach(link => scraper.Post(link));

    scraper.Complete();

    try
    {
        await fetcher.Completion;

        Status.Finished.Log("Fetched: {0:N0}, Skipped: {1:N0}, Errors: {2:N0}, Seconds: {3:N2}",
            fetched, skipped, errored, (DateTime.UtcNow - startedOn).TotalMilliseconds / 1000.0);
    }
    catch (AggregateException errors)
    {
        foreach (var error in errors.InnerExceptions)
            Status.Failure.Log(error.Message);
    }
    catch (TaskCanceledException)
    {
        Status.Cancelled.Log("The process was manually cancelled!");
    }
    catch (Exception error)
    {
        Status.Failure.Log(error.Message);
    }

正如您所看到的,我将几个TPL块连接在一起,然后使用HandleCompletion扩展方法来处理完成。
    public static void HandleCompletion(
        this IDataflowBlock source, params IDataflowBlock[] targets)
    {
        source.Completion.ContinueWith(
            task =>
            {
                foreach (var target in targets)
                {
                    if (task.IsFaulted)
                        target.Fault(task.Exception);
                    else
                        target.Complete();
                }
            });
    }

非常重要的是,当我完成将对象传递给链中的第一个块时,我调用scraper.Complete()。有了这个,HandleCompletion扩展方法就可以处理继续执行。由于我正在等待fetcher(链中的最后一个块)完成,因此很容易在try/catch中捕获任何结果错误。

我相信您的HandleCompletion方法在底层执行了与PropagateCompletion相同的操作。您为什么选择这种方式呢?我不认为它解决了我的问题,但有趣的是,它确实突出了我的问题,因为很明显可以通过调用多个目标来复制异常。 - bornfromanegg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接