基于TPL Dataflow的BroadcastCopyBlock,具有可靠传递功能

5
我很乐意就以下TPL Dataflow中BroadcastCopyBlock的实现提供一些见解。 BroadcastCopyBlock将接收到的消息复制到所有已注册的消费者,并保证将消息传递给所有在其接收到消息时连接到该块的消费者。与BroadcastBlock不同的是,如果下一个消息在前面的消息被传递给所有消费者之前到达,则BroadcastBlock不能保证消息的传递。


我的主要关注点是消息的保留和释放。如果接收块决定不处理消息会发生什么?我的理解是这会导致内存泄漏,因为消息将无限期地保留。我在考虑应该如何标记未使用的消息,但我不确定如何操作。我在考虑一些人工消息汇(没有动作的ActionBlock),或者我可以将消息标记为已丢弃吗?

对于该实现的进一步见解也将受到赞赏。

这可能几乎是以下问题的重复,但我希望使用自己的类而不是创建块的方法。那样是否被认为是不好的风格?
TPL Dataflow中保证传递的BroadcastBlock

/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
    private ITargetBlock<T> In { get; }

    /// <summary>
    /// Holds a TransformBlock for each target, that subscribed to this block
    /// </summary>
    private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks = new Dictionary<ITargetBlock<T>, TransformBlock<T, T>>();


    public BrodcastCopyBlock()
    {
        In = new ActionBlock<T>(message => Process(message));

        In.Completion.ContinueWith(task =>
                                   {
                                       if (task.Exception == null)
                                           Complete();
                                       else
                                           Fault(task.Exception);
                                   }
          );
    }

    /// <summary>
    /// Creates a transform source block for the passed target.
    /// </summary>
    /// <param name="target"></param>
    private void CreateOutBlock(ITargetBlock<T> target)
    {
        if (_OutBlocks.ContainsKey(target))
            return;

        var outBlock = new TransformBlock<T, T>(e => e);
        _OutBlocks[target] = outBlock;
    }

    private void Process(T message)
    {
        foreach (var outBlock in _OutBlocks.Values)
        {
            outBlock.Post(message);
        }
    }

    /// <inheritdoc />
    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
    {
        return In.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }

    /// <inheritdoc />
    public void Complete()
    {
        foreach (var outBlock in _OutBlocks.Values)
        {
            ((ISourceBlock<T>)outBlock).Complete();
        }
    }

    /// <inheritdoc />
    public void Fault(Exception exception)
    {
        foreach (var outBlock in _OutBlocks.Values)
        {
            ((ISourceBlock<T>)outBlock).Fault(exception);
        }
    }

    /// <inheritdoc />
    public Task Completion => Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));

    /// <inheritdoc />
    public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
    {
        CreateOutBlock(target);
        return _OutBlocks[target].LinkTo(target, linkOptions);
    }

    /// <inheritdoc />
    public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
    {
        return ((ISourceBlock<T>)_OutBlocks[target]).ConsumeMessage(messageHeader, target, out messageConsumed);
    }

    /// <inheritdoc />
    public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
    {
        return ((ISourceBlock<T>)_OutBlocks[target]).ReserveMessage(messageHeader, target);
    }

    /// <inheritdoc />
    public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
    {
        ((ISourceBlock<T>)_OutBlocks[target]).ReleaseReservation(messageHeader, target);
    }
}

这与 BroadcastBlock 有什么不同? - svick
@svick的BroadcastBlock不会复制消息。第一个接收到消息的消费者会从队列中删除它。 - crazy_crank
要么我完全误解了BroadcastBlock的概念,要么就是其他人没有理解它。 - crazy_crank
这就是使它成为“广播”的原因,它向所有链接的块发送消息。 - svick
是的,但据我所知,只有一个接收器可以处理该消息。一旦其中一个接收器处理了该消息,它就会从队列中删除。 - crazy_crank
嗯,我现在真的很困惑,可能是因为我看了一个奇怪的教程之类的东西...但是默认的 BroadcastBlock 对我来说实际上仍然不起作用,因为我需要消息的可靠传递,而根据我所知,BroadcastBlock 并不能保证这一点。我将重新表述问题。 - crazy_crank
2个回答

10

TL/DR
你的实现在 ActionBlock 中使用了 Post 方法,如果目标拒绝消息,则仍会丢失数据。改用SendAsync 方法,并且可能不需要实现所有这些方法,只需要实现 ITargetBlock<in TInput> 接口即可。


回到你的主要问题之前,我想澄清一些东西。我认为你对 TPL Dataflow 库中的一些选项感到困惑,我想在这里稍作解释。你提到的行为 The first consumer, which receives the message, deletes it from the queue 不是关于 BroadcastBlock 的,而是关于多个消费者链接到一个 ISourceBlock 上,比如 BufferBlock

var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {});
var consumer2 = new ActionBlock<int>(i => { Console.WriteLine(i); });
buffer.LinkTo(consumer1);
buffer.LinkTo(consumer2);
// this one will go only for one consumer, no console output present
buffer.Post(1);

BroadcastBlock 是什么,它的作用就是像下面这段代码一样广播数据:

private static void UnboundedCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Unbounded Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
        {
            Thread.Sleep(2000);
            Console.WriteLine($"SLOW Unbounded Block: {i}");
        });
    broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowAction.Completion.Wait();
}

输出将会是:

FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2

然而,只有当传入数据的速度小于处理数据的速度时,才能这样做,否则由于缓冲区增长会在很快消耗内存,正如您在问题中所述。让我们看看如果我们使用ExecutionDataflowBlockOptions 来限制慢块的传入数据缓冲区会发生什么:

private static void BoundedCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Bounded Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
        {
            Thread.Sleep(2000);
            Console.WriteLine($"SLOW Bounded Block: {i}");
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowAction.Completion.Wait();
}

输出结果将会是:

FAST Bounded Block: 0
FAST Bounded Block: 1
FAST Bounded Block: 2
SLOW Bounded Block: 0
SLOW Bounded Block: 1

正如您所看到的,我们慢速块丢失了最后一条消息,这不是我们想要的结果。原因是默认情况下BroadcastBlock 使用Post 方法传递消息。根据官方介绍文档:

  • Post
    • 一个异步的扩展方法,用于向目标块发布数据。无论该数据是否被接受,它都会立即返回但它不允许目标在以后的时间消耗该消息
  • SendAsync
    • 一个扩展方法,支持缓冲区的异步发送到目标块。对目标的Post操作是异步的,但如果目标要推迟提供的数据,则没有地方可以缓冲数据,而目标必须拒绝。 SendAsync 允许将数据异步发送到缓冲区,如果目标推迟执行,它将能够从用于此一次异步发布消息的临时缓冲区检索推迟的数据

因此,这个方法可以帮助我们完成任务,让我们引入一些封装器ActionBlock,它们恰好做到了我们想要的——SendAsync数据给我们真正的处理器:

private static void BoundedWrapperInfiniteCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
    {
        Thread.Sleep(2000);
        Console.WriteLine($"SLOW Wrapper Block: {i}");
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
    var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));

    broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowAction.Completion.Wait();
}

输出结果将为

FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2

但是这种等待永远不会结束——我们的基本包装器不会传播与链接块相关的完成,并且ActionBlock无法链接到任何东西。我们可以尝试等待包装器完成:

private static void BoundedWrapperFiniteCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST finite Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
        {
            Thread.Sleep(2000);
            Console.WriteLine($"SLOW finite Block: {i}");
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
    var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
    broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowActionWrapper.Completion.Wait();
}

输出结果将会是:

FAST finite Block: 0
FAST finite Block: 1
FAST finite Block: 2
SLOW finite Block: 0

这绝对不是我们想要的结果 - ActionBlock 完成了所有工作,最后一条消息的发布不会被等待。此外,在Sleep方法结束之前,我们甚至看不到第二条消息!所以你一定需要自己的实现。

现在,最后,关于你的代码的一些想法:

  1. 你不需要实现如此大量的方法 - 你的包装器将被用作ITargetBlock<in TInput>,因此只需实现该接口即可。
  2. 你的实现在ActionBlock中使用了Post方法,如我们所见,这可能导致数据丢失,如果消费者端出现问题。考虑使用SendAsync方法。
  3. 在进行上述更改后,你应该评估你的数据流性能 - 如果你有许多异步等待数据传递,你可能会遇到性能和/或内存问题。这应该通过一些高级设置来解决,这些设置在链接文档中讨论。
  4. 你对Completion任务的实现实际上颠倒了数据流的顺序 - 你正在等待目标完成,我认为这不是好的实践 - 你可能应该为你的数据流创建一个结束块(这甚至可以是NullTarget块,它只是同步地丢弃输入消息),并等待其完成。

@VMTAtm 感谢您详细的回答,您真的帮了我很多。我只有几个后续问题:关于1:为什么我不应该实现ISourceBlock?毕竟,我想能够将其他块链接到我的BroadcastBlockbroadcastBlock.LinkTo(consumerBlock))?关于4:您能详细解释一下吗?我会认为,我的实现会等待,直到所有源块完成,然后将完成事件向前传播到其_OutBlocks(如果propagateCompletion设置为true,则将完成传递给BroadcastBlock的消费块)。 - crazy_crank
  1. 根据您提供的代码,我并没有看出来这是有意为之的行为,但如果确实是这样的话,那么您应该实现它。
  2. 在您的代码中,您将Completion任务作为任务提供,它等待所有outputs完成,而不是源。如果某些代码附加到此任务,则仅在outputs完成后才会执行,这可能是意外的。
- VMAtm
  1. 好的。
  2. 我有点困惑了。In块是我BroadcastBlock的目标,所以如果有人调用sourceblock.LinkTo(myBroadcastBlock),它将被订阅。请参见我的实现中的“LinkTo”方法。我可能有一个误解,但在我看来,我正在等待先前块的完成,然后才完成BroadcastBlock本身?
- crazy_crank
1
哦,我现在明白你的意思了。你是在谈论任务本身的“完成”事件。当然,那是完全错误的。 - crazy_crank

2

我想补充一下 VMAtm的出色回答,在 BoundedWrapperInfiniteCase 中,您可以手动传播完成状态。在调用 broadcast.SendAsync() 前添加以下行,然后等待两个操作完成以使操作包装器完成内部操作:

slowActionWrapper.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)slowAction).Fault(t.Exception);
        else slowAction.Complete();
    });
fastActionWrapper.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)fastAction).Fault(t.Exception);
        else fastAction.Complete();
    });

e.g.

var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
    {
        Thread.Sleep(2000);
        Console.WriteLine($"SLOW Wrapper Block: {i}");
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));

broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });

// Manually propagate completion to the inner actions
slowActionWrapper.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)slowAction).Fault(t.Exception);
        else slowAction.Complete();
    });
fastActionWrapper.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)fastAction).Fault(t.Exception);
        else fastAction.Complete();
    });

for (var i = 0; i < 3; ++i)
    broadcast.SendAsync(i);
broadcast.Complete();

// Wait for both inner actions to complete
Task.WaitAll(slowAction.Completion, fastAction.Completion);

输出结果与VMAtm的答案相同,但是所有任务都将正确完成。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接