在TPL Dataflow中具有保证传递的BroadcastBlock

11
我有一个数据流要以几种不同的方式进行处理…所以我想将每个收到的消息的副本发送到多个目标,以便这些目标可以并行执行…然而,由于数据的流速比我的目标处理速度要快得多,并且数据量很大,因此我需要在我的块上设置BoundedCapacity。如果没有BoundedCapacity,我很快就会耗尽内存。
然而,问题是BroadcastBlock会丢弃消息,如果某个目标无法处理它(由于BoundedCapacity)。
我需要一个BroadcastBlock,它不会丢弃消息,但是实质上会拒绝额外的输入,直到它能够将消息传递到每个目标并准备好接收更多消息。
是否有这样的东西,或者有人编写了一个具有这种行为的自定义块?
2个回答

10

使用 ActionBlockSendAsync() 构建您所要求的内容相当简单,类似于:

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    IEnumerable<ITargetBlock<T>> targets)
{
    var targetsList = targets.ToList();

    return new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targetsList)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}

这是最基本的版本,但将其扩展以支持可变目标列表、传播完成或克隆函数应该很容易。


如果我设置的BoundedCapacity大于1,那么我就会丢失消息...我是否需要与发送方(或者可能是目标方)协商以避免这种情况发生? - Brian Rice
我不明白这怎么可能会丢失消息,假设你没有在任何地方使用普通的BroadcastBlock。你能否在某个地方发布演示问题的示例代码? - svick
我的错,我应该在源和ActionBlock之间以及ActionBlock和每个目标之间添加Completion.ContinueWith... 我仍然在源和目标之间跳过ActionBlock进行此操作... 可能会在ActionBlock的缓冲区中留下一些东西。 - Brian Rice
所以...我写了一段代码,本质上是用一个ActionBlock和SendAsync替换了TransformBlock(正如建议的那样)...当我使用BufferBlock-TransformBlock-ActionBlock运行代码时,它需要7秒钟,而当我使用BufferBlock-ActionBlock/SendAsync-ActionBlock运行时,需要10秒钟...性能下降了一些...但愿我有经验,能够编写一个真正的GuaranteedBroadcastBlock。 - Brian Rice
我已经将一个VS 2012项目上传到http://www.brianrice.com/downloads/permanent/test2.zip... 如果有人想看一下我的结果或者帮助改进它,欢迎!:) (请查看Form1.cs文件中的注释) - Brian Rice
显示剩余2条评论

0
这是svick的idea的精细版本。下面的GuaranteedDeliveryBroadcastBlock类是内置BroadcastBlock的(几乎)完全替代品。随时支持链接和取消链接目标。
public class GuaranteedDeliveryBroadcastBlock<T> :
    ITargetBlock<T>, ISourceBlock<T>, IPropagatorBlock<T, T>
{
    private class Subscription
    {
        public readonly ITargetBlock<T> Target;
        public readonly bool PropagateCompletion;
        public readonly CancellationTokenSource CancellationSource;

        public Subscription(ITargetBlock<T> target,
            bool propagateCompletion,
            CancellationTokenSource cancellationSource)
        {
            Target = target;
            PropagateCompletion = propagateCompletion;
            CancellationSource = cancellationSource;
        }
    }

    private readonly object _locker = new object();
    private readonly Func<T, T> _cloningFunction;
    private readonly CancellationToken _cancellationToken;
    private readonly ITargetBlock<T> _actionBlock;
    private readonly List<Subscription> _subscriptions = new List<Subscription>();
    private readonly Task _completion;
    private CancellationTokenSource _faultCTS
        = new CancellationTokenSource(); // Is nullified on completion

    public GuaranteedDeliveryBroadcastBlock(Func<T, T> cloningFunction,
        DataflowBlockOptions dataflowBlockOptions = null)
    {
        _cloningFunction = cloningFunction
            ?? throw new ArgumentNullException(nameof(cloningFunction));
        dataflowBlockOptions ??= new DataflowBlockOptions();
        _cancellationToken = dataflowBlockOptions.CancellationToken;

        _actionBlock = new ActionBlock<T>(async item =>
        {
            Task sendAsyncToAll;
            lock (_locker)
            {
                var allSendAsyncTasks = _subscriptions
                    .Select(sub => sub.Target.SendAsync(
                        _cloningFunction(item), sub.CancellationSource.Token));
                sendAsyncToAll = Task.WhenAll(allSendAsyncTasks);
            }
            await sendAsyncToAll;
        }, new ExecutionDataflowBlockOptions()
        {
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
            MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
            TaskScheduler = dataflowBlockOptions.TaskScheduler,
        });

        var afterCompletion = _actionBlock.Completion.ContinueWith(t =>
        {
            lock (_locker)
            {
                // PropagateCompletion
                foreach (var subscription in _subscriptions)
                {
                    if (subscription.PropagateCompletion)
                    {
                        if (t.IsFaulted)
                            subscription.Target.Fault(t.Exception);
                        else
                            subscription.Target.Complete();
                    }
                }
                // Cleanup
                foreach (var subscription in _subscriptions)
                {
                    subscription.CancellationSource.Dispose();
                }
                _subscriptions.Clear();
                _faultCTS.Dispose();
                _faultCTS = null; // Prevent future subscriptions to occur
            }
        }, TaskScheduler.Default);

        // Ensure that any exception in the continuation will be surfaced
        _completion = Task.WhenAll(_actionBlock.Completion, afterCompletion);
    }

    public Task Completion => _completion;

    public void Complete() => _actionBlock.Complete();

    void IDataflowBlock.Fault(Exception ex)
    {
        _actionBlock.Fault(ex);
        lock (_locker) _faultCTS?.Cancel();
    }

    public IDisposable LinkTo(ITargetBlock<T> target,
        DataflowLinkOptions linkOptions)
    {
        if (linkOptions.MaxMessages != DataflowBlockOptions.Unbounded)
            throw new NotSupportedException();
        Subscription subscription;
        lock (_locker)
        {
            if (_faultCTS == null) return new Unlinker(null); // Has completed
            var cancellationSource = CancellationTokenSource
                .CreateLinkedTokenSource(_cancellationToken, _faultCTS.Token);
            subscription = new Subscription(target,
                linkOptions.PropagateCompletion, cancellationSource);
            _subscriptions.Add(subscription);
        }
        return new Unlinker(() =>
        {
            lock (_locker)
            {
                // The subscription may have already been removed
                if (_subscriptions.Remove(subscription))
                {
                    subscription.CancellationSource.Cancel();
                    subscription.CancellationSource.Dispose();
                }
            }
        });
    }

    private class Unlinker : IDisposable
    {
        private readonly Action _action;
        public Unlinker(Action disposeAction) => _action = disposeAction;
        void IDisposable.Dispose() => _action?.Invoke();
    }

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue,
        ISourceBlock<T> source, bool consumeToAccept)
    {
        return _actionBlock.OfferMessage(messageHeader, messageValue, source,
            consumeToAccept);
    }

    T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T> target, out bool messageConsumed)
            => throw new NotSupportedException();

    bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T> target)
            => throw new NotSupportedException();

    void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T> target)
            => throw new NotSupportedException();
}

缺少的功能: 未实现 IReceivableSourceBlock<T> 接口,且不支持使用 MaxMessages 选项进行链接。

此类是线程安全的。


这根本不是svick发布的内容。这对于一个简单的克隆操作来说太过复杂了。即使在简单的克隆循环中,锁也是不必要的 - 只有一个工作任务,那么为什么需要锁呢?为了处理DOP>1吗?为什么需要DOP>1?即使需要,为什么要使用锁而不是通道?使用Dataflow的原因就是为了避免锁。 - Panagiotis Kanavos
@PanagiotisKanavos GuaranteedDeliveryBroadcastBlock 基于 svick 的想法,即拥有一个中央的 ActionBlock 将消息分发到各个目标。我只是将这个功能打包成了一个漂亮的 IPropagatorBlock 实现,以便更容易地使用它。lock 是必需的,以允许并发链接和取消链接目标。如果没有一些形式的同步机制,我无法在结尾声称“此类是线程安全的”。而锁是最简单、最容易理解的工具。 - Theodor Zoulias
@PanagiotisKanavos 顺便说一下,锁只是用来确保类的线程安全性。它不是争用锁,因此不会产生任何争用。它的开销应该可以忽略不计。内置的 TPL Dataflow 块在接收消息时也会在内部使用锁。 - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接