将多个TransformBlock链接到单个BatchBlock TPL

3

我使用 TPL Dataflow 构建了两个管道:

TransformBlock => TransformBlock => BatchBlock => ....

TransformBlock => BatchBlock => TransformBlock => ....

我想要完成

            / => Transform Block => TransformBlock => BatchBlock => ....
BatchBlock /
           \
            \ => Transform Block => BatchBlock => TransformBlock => ....

然而,只有第一个流水线被执行。
我的代码
batchMediaBlock.LinkTo(pipelineA.FirstBlock, new DataflowLinkOptions {PropagateCompletion = true});
batchMediaBlock.LinkTo(pipelineB.FirstBlock, new DataflowLinkOptions {PropagateCompletion = true});

我该如何实现这个目标?

你想让初始的BatchBlock将每个消息发送到两个管道中,还是只发送到较不繁忙的那一个? - Theodor Zoulias
2个回答

2
默认情况下,TPL Dataflow 中的链接被视为贪婪的,因此第一个目标始终获得消息并从先前块的输出中删除它,这就是为什么您的第二个块没有收到任何消息的原因。可以通过 BroadcastBlock<T> 来解决这种情况,该块确保在允许覆盖元素之前将当前元素广播到任何链接的目标。
您还应该注意,这个块会复制消息。
所以您基本上应该在批量块后添加一个广播,但是!您不应该将广播块的完成传播到消费者 - 只有第一个完成会得到。您应该为广播添加一个 ContinueWith 处理程序,如 @JSteward 建议的那样。

2

BatchBlock之后,您需要一个BroadcastBlock。但请注意,完成状态只会传播到其中一个TransformBlock。以下是部分示例来处理完成状态:

using System.Threading.Tasks.Dataflow;

namespace MyDataflow {
    class MyDataflow {

        public void HandlingCompletion() {
            var batchBlock = new BatchBlock<int>(10);
            var broadcastBlock = new BroadcastBlock<int[]>(_ => _);
            var xForm1 = new TransformBlock<int[], int[]>(_ => _);
            var xForm2 = new TransformBlock<int[], int[]>(_ => _);

            batchBlock.LinkTo(broadcastBlock, new DataflowLinkOptions() { PropagateCompletion = true });
            broadcastBlock.LinkTo(xForm1);
            broadcastBlock.LinkTo(xForm1);

            broadcastBlock.Completion.ContinueWith(broadcastBlockCompletionTask => {
                if (!broadcastBlockCompletionTask.IsFaulted) {
                    xForm1.Complete();
                    xForm2.Complete();
                }else {
                    ((IDataflowBlock)xForm1).Fault(broadcastBlockCompletionTask.Exception);
                    ((IDataflowBlock)xForm2).Fault(broadcastBlockCompletionTask.Exception);
                }

            });

            xForm1.Completion.ContinueWith(async _ => {
                try {
                    await xForm2.Completion;
                    //continue passing completion / fault on to rest of pipeline
                } catch  {

                }
            });

        }
    }
}

或者,如果您的管道再也不会收敛,您可以在继续 BroacastBlock 之后单独处理每个管道的完成。所提供的示例将同时完成管道中的每个步骤,同步地流动完成。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接