TPL数据流:有界容量和等待完成

5

为了简单起见,以下我复制了一个LINQPad脚本作为现实生活场景:

var total = 1 * 1000 * 1000;
var cts = new CancellationTokenSource();
var threads = Environment.ProcessorCount;
int capacity = 10;

var edbOptions = new ExecutionDataflowBlockOptions{BoundedCapacity = capacity, CancellationToken = cts.Token, MaxDegreeOfParallelism = threads};
var dbOptions = new DataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var gdbOptions = new GroupingDataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var dlOptions = new DataflowLinkOptions {PropagateCompletion = true};

var counter1 = 0;
var counter2 = 0;

var delay1 = 10;
var delay2 = 25;

var action1 = new Func<IEnumerable<string>, Task>(async x => {await Task.Delay(delay1); Interlocked.Increment(ref counter1);});
var action2 = new Func<IEnumerable<string>, Task>(async x => {await Task.Delay(delay2); Interlocked.Increment(ref counter2);});

var actionBlock1 = new ActionBlock<IEnumerable<string>>(action1, edbOptions);
var actionBlock2 = new ActionBlock<IEnumerable<string>>(action2, edbOptions);

var batchBlock1 = new BatchBlock<string>(5, gdbOptions);
var batchBlock2 = new BatchBlock<string>(5, gdbOptions);

batchBlock1.LinkTo(actionBlock1, dlOptions);
batchBlock2.LinkTo(actionBlock2, dlOptions);

var bufferBlock1 = new BufferBlock<string>(dbOptions); 
var bufferBlock2 = new BufferBlock<string>(dbOptions); 

bufferBlock1.LinkTo(batchBlock1, dlOptions);
bufferBlock2.LinkTo(batchBlock2, dlOptions);

var bcBlock = new BroadcastBlock<string>(x => x, dbOptions);

bcBlock.LinkTo(bufferBlock1, dlOptions);
bcBlock.LinkTo(bufferBlock2, dlOptions);

var mainBlock = new TransformBlock<int, string>(x => x.ToString(), edbOptions);
mainBlock.LinkTo(bcBlock, dlOptions);

mainBlock.Dump("Main Block");
bcBlock.Dump("Broadcast Block");
bufferBlock1.Dump("Buffer Block 1");
bufferBlock2.Dump("Buffer Block 2");
actionBlock1.Dump("Action Block 1");
actionBlock2.Dump("Action Block 2");

foreach(var i in Enumerable.Range(1, total))
  await mainBlock.SendAsync(i, cts.Token);

mainBlock.Complete();

await Task.WhenAll(actionBlock1.Completion, actionBlock2.Completion);

counter1.Dump("Counter 1");
counter2.Dump("Counter 2");

我对这段代码有两个问题:
  1. 虽然我将所有适当块的BoundedCapacity限制为10个元素,但似乎我几乎可以一次性推送所有1,000,000条消息。这是预期行为吗?
  2. 尽管整个网络都配置为传播完成,但似乎在调用mainBlock.Complete()后立即完成了所有块。我希望counter1counter2变量都等于total。是否有一种方法可以实现这种行为?
1个回答

12

是的,这是期望的行为,因为 BroadcastBlock 的缘故:

提供一个缓冲区,最多存储一个元素,每当新的消息到达时,它就会被下一个消息覆盖。

这意味着,如果你将 BroadcastBlock 与带有 BoundedCapacity 的块链接起来,你将会丢失消息。

要解决这个问题,你可以创建一个自定义块,像 BroadcastBlock 一样工作,但确保将所有消息传递到目标。不过这并不是一件简单的事情,所以你可能满足于一个更简单的变体(最初来自 我的旧回答):

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options)
{
    var targetsList = targets.ToList();

    var block = new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targetsList)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = options.BoundedCapacity,
            CancellationToken = options.CancellationToken
        });

    block.Completion.ContinueWith(task =>
    {
        foreach (var target in targetsList)
        {
            if (task.Exception != null)
                target.Fault(task.Exception);
            else
                target.Complete();
        }
    });

    return block;
}

在你的情况下使用方法为:

var bcBlock = CreateGuaranteedBroadcastBlock(
    new[] { bufferBlock1, bufferBlock2 }, dbOptions);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接