我尝试使用TPL数据流来创建一个管道。到目前为止一切都工作正常,我的管道定义如下(虽然我的问题只涉及广播器、提交成功和提交失败):
// Define tasks
var productListingBatchBuffer = new BufferBlock<PostSubmissionState>();
var splitFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SplitFile(s));
var saveFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SaveFile(s));
var postSubmission = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => PostSubmission(s));
var broadcaster = new BroadcastBlock<PostSubmissionState>(state => state);
var submissionSucceeded = new ActionBlock<PostSubmissionState>(s => SubmissionSucceeded(s));
var submissionFailed = new ActionBlock<PostSubmissionState>(s => SubmissionFailed(s));
// Link em up
productListingBatchBuffer.LinkTo(splitFile, new DataflowLinkOptions() { PropagateCompletion = true });
splitFile.LinkTo(saveFile, new DataflowLinkOptions() { PropagateCompletion = true });
saveFile.LinkTo(postSubmission, new DataflowLinkOptions() { PropagateCompletion = true });
postSubmission.LinkTo(broadcaster, new DataflowLinkOptions() { PropagateCompletion = true });
broadcaster.LinkTo(submissionSucceeded, new DataflowLinkOptions() { PropagateCompletion = true }, state => state.PostSucceeded);
broadcaster.LinkTo(submissionFailed, new DataflowLinkOptions() { PropagateCompletion = true }, state => !state.PostSucceeded);
我遇到的问题与异常传播有关。因为我的BroadcastBlock将其完成(及因此产生的任何故障)传播到两个块,如果确实发生异常,它就会被传播到这两个块。因此,当我执行以下操作时:
Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
我最终得到了一个包含两个异常的聚合异常。目前我能做的最好的办法是对它们进行筛选,即:
try
{
Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
}
catch (AggregateException ex)
{
var uniqueExceptions = new AggregateException(ex.Flatten().InnerExceptions.Distinct());
Console.WriteLine("An exception was thrown.\n{0}", uniqueExceptions.Flatten());
}
但我在想是否有更好的方法来做到这一点。也就是说,如果只发生一个异常,我只想抛出一个异常。我是 Dataflow 的新手,所以正在发现所有的约定。
submissionSucceeded
和submissionFailed
应该分别作为不同的块吗?如果将它们合并成一个块,并在其中检查state.PostSucceeded
,那么问题就会得到解决。 - svick