TPL DataFlow,如何在链接块时设置优先级?

5
使用TPL.DataFlow块,能否将两个或多个源链接到单个ITargetBlock(例如ActionBlock)并优先处理这些源?
例如:
BufferBlock<string> b1 = new ...
BufferBlock<string> b2 = new ...
ActionBlock<string> a = new ...

//somehow force messages in b1 to be processed before any message of b2, always
b1.LinkTo (a);
b2.LinkTo (a);

只要b1中有消息,我希望将它们提供给“a”,一旦b1为空,就会将b2中的消息推送到“a”。有什么好的想法吗?
2个回答

15

TPL Dataflow本身没有这样的功能。

我能想象到最简单的方法是创建一个结构,封装三个块:高优先级输入、低优先级输入和输出。这些块将是简单的BufferBlock,还有一个基于优先级转发来自两个输入到输出的方法,在后台运行。

代码可能如下所示:

public class PriorityBlock<T>
{
    private readonly BufferBlock<T> highPriorityTarget;

    public ITargetBlock<T> HighPriorityTarget
    {
        get { return highPriorityTarget; }
    }

    private readonly BufferBlock<T> lowPriorityTarget;

    public ITargetBlock<T> LowPriorityTarget
    {
        get { return lowPriorityTarget; }
    }

    private readonly BufferBlock<T> source;

    public ISourceBlock<T> Source
    {
        get { return source; }
    }

    public PriorityBlock()
    {
        var options = new DataflowBlockOptions { BoundedCapacity = 1 };

        highPriorityTarget = new BufferBlock<T>(options);
        lowPriorityTarget = new BufferBlock<T>(options);
        source = new BufferBlock<T>(options);

        Task.Run(() => ForwardMessages());
    }

    private async Task ForwardMessages()
    {
        while (true)
        {
            await Task.WhenAny(
                highPriorityTarget.OutputAvailableAsync(),
                lowPriorityTarget.OutputAvailableAsync());

            T item;

            if (highPriorityTarget.TryReceive(out item))
            {
                await source.SendAsync(item);
            }
            else if (lowPriorityTarget.TryReceive(out item))
            {
                await source.SendAsync(item);
            }
            else
            {
                // both input blocks must be completed
                source.Complete();
                return;
            }
        }
    }
}

使用方法如下:

b1.LinkTo(priorityBlock.HighPriorityTarget);
b2.LinkTo(priorityBlock.LowPriorityTarget);
priorityBlock.Source.LinkTo(a);

为了使这个功能正常工作,a 也必须设置 BoundingCapacity 为1(或至少是一个非常低的数字)。
这段代码的注意事项是它可能会引入两条消息的延迟(一条在输出块中等待,一条在 SendAsync() 中等待)。因此,如果您有一个长列表的低优先级消息,突然出现一个高优先级消息,那么它将仅在已经等待的这两条低优先级消息之后被处理。
如果这对你来说是个问题,它可以得到解决。但我认为这需要更复杂的代码,涉及 TPL Dataflow 的较不公开的部分,比如 OfferMessage()

谢谢,但在我的情况下,我必须直接处理高优先级的消息。事实证明,DataFlow 对我的用例来说太慢了,所以我现在有一个手动解决方案。 - Roger Johansson

2
这里有一个实现了PriorityBufferBlock<T>类的例子,它比低优先级项目更频繁地传播高优先级项目。该类的构造函数有一个priorityPrecedence参数,用于定义每个低优先级项目将传播多少个高优先级项目。如果此参数的值为1.0(最小有效值),则没有真正的优先级可言。如果此参数的值为Double.PositiveInfinity,只要队列中有高优先级项目,就不会传播任何低优先级项目。如果此参数具有更常见的值,例如5.0,则每5个高优先级项目将传播一个低优先级项目。
该类在内部维护两个队列,一个用于高优先级项目,一个用于低优先级项目。除非两个列表之一为空,在这种情况下,另一个队列的所有项目都可以自由地按需传播,否则不考虑存储在每个队列中的项目数量。当两个内部队列都非空时,priorityPrecedence参数才会影响类的行为。否则,如果只有一个队列有项目,则PriorityBufferBlock<T>的行为就像普通的BufferBlock<T>
public class PriorityBufferBlock<T> : IPropagatorBlock<T, T>,
    IReceivableSourceBlock<T>
{
    private readonly IPropagatorBlock<T, int> _block;
    private readonly Queue<T> _highQueue = new();
    private readonly Queue<T> _lowQueue = new();
    private readonly Predicate<T> _hasPriorityPredicate;
    private readonly double _priorityPrecedence;
    private double _priorityCounter = 0;

    private object Locker => _highQueue;

    public PriorityBufferBlock(Predicate<T> hasPriorityPredicate,
        double priorityPrecedence,
        DataflowBlockOptions dataflowBlockOptions = null)
    {
        ArgumentNullException.ThrowIfNull(hasPriorityPredicate);
        if (priorityPrecedence < 1.0)
            throw new ArgumentOutOfRangeException(nameof(priorityPrecedence));
        _hasPriorityPredicate = hasPriorityPredicate;
        _priorityPrecedence = priorityPrecedence;
        dataflowBlockOptions ??= new();
        _block = new TransformBlock<T, int>(item =>
        {
            bool hasPriority = _hasPriorityPredicate(item);
            Queue<T> selectedQueue = hasPriority ? _highQueue : _lowQueue;
            lock (Locker) selectedQueue.Enqueue(item);
            return 0;
        }, new()
        {
            BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
            CancellationToken = dataflowBlockOptions.CancellationToken,
            MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask
        });
        this.Completion = _block.Completion.ContinueWith(completion =>
        {
            Debug.Assert(this.Count == 0 || !completion.IsCompletedSuccessfully);
            lock (Locker) { _highQueue.Clear(); _lowQueue.Clear(); }
            return completion;
        }, default, TaskContinuationOptions.ExecuteSynchronously |
            TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();
    }

    public Task Completion { get; private init; }
    public void Complete() => _block.Complete();
    void IDataflowBlock.Fault(Exception exception) => _block.Fault(exception);
    public int Count
    {
        get { lock (Locker) return _highQueue.Count + _lowQueue.Count; }
    }

    private Queue<T> GetSelectedQueue(bool forDequeue)
    {
        Debug.Assert(Monitor.IsEntered(Locker));
        Queue<T> selectedQueue;
        if (_highQueue.Count == 0)
            selectedQueue = _lowQueue;
        else if (_lowQueue.Count == 0)
            selectedQueue = _highQueue;
        else if (_priorityCounter + 1 > _priorityPrecedence)
            selectedQueue = _lowQueue;
        else
            selectedQueue = _highQueue;
        if (forDequeue)
        {
            if (_highQueue.Count == 0 || _lowQueue.Count == 0)
                _priorityCounter = 0;
            else if (++_priorityCounter > _priorityPrecedence)
                _priorityCounter -= _priorityPrecedence + 1;
        }
        return selectedQueue;
    }

    private T Peek()
    {
        Debug.Assert(Monitor.IsEntered(Locker));
        Debug.Assert(_highQueue.Count > 0 || _lowQueue.Count > 0);
        return GetSelectedQueue(false).Peek();
    }

    private T Dequeue()
    {
        Debug.Assert(Monitor.IsEntered(Locker));
        Debug.Assert(_highQueue.Count > 0 || _lowQueue.Count > 0);
        return GetSelectedQueue(true).Dequeue();
    }

    private class TargetProxy : ITargetBlock<int>
    {
        private readonly PriorityBufferBlock<T> _parent;
        private readonly ITargetBlock<T> _realTarget;

        public TargetProxy(PriorityBufferBlock<T> parent, ITargetBlock<T> target)
        {
            Debug.Assert(parent is not null);
            _parent = parent;
            _realTarget = target ?? throw new ArgumentNullException(nameof(target));
        }

        public Task Completion => throw new NotSupportedException();
        public void Complete() => _realTarget.Complete();
        void IDataflowBlock.Fault(Exception error) => _realTarget.Fault(error);

        DataflowMessageStatus ITargetBlock<int>.OfferMessage(
            DataflowMessageHeader messageHeader, int messageValue,
            ISourceBlock<int> source, bool consumeToAccept)
        {
            Debug.Assert(messageValue == 0);
            if (consumeToAccept) throw new NotSupportedException();
            lock (_parent.Locker)
            {
                T realValue = _parent.Peek();
                DataflowMessageStatus response = _realTarget.OfferMessage(
                    messageHeader, realValue, _parent, consumeToAccept);
                if (response == DataflowMessageStatus.Accepted) _parent.Dequeue();
                return response;
            }
        }
    }

    public IDisposable LinkTo(ITargetBlock<T> target,
        DataflowLinkOptions linkOptions)
            => _block.LinkTo(new TargetProxy(this, target), linkOptions);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue,
        ISourceBlock<T> source, bool consumeToAccept)
            => _block.OfferMessage(messageHeader,
                messageValue, source, consumeToAccept);

    T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T> target, out bool messageConsumed)
    {
        _ = _block.ConsumeMessage(messageHeader, new TargetProxy(this, target),
            out messageConsumed);
        if (messageConsumed) lock (Locker) return Dequeue();
        return default;
    }

    bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T> target)
            => _block.ReserveMessage(messageHeader, new TargetProxy(this, target));

    void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T> target)
            => _block.ReleaseReservation(messageHeader, new TargetProxy(this, target));

    public bool TryReceive(Predicate<T> filter, out T item)
    {
        if (filter is not null) throw new NotSupportedException();
        if (((IReceivableSourceBlock<int>)_block).TryReceive(null, out _))
        {
            lock (Locker) item = Dequeue(); return true;
        }
        item = default; return false;
    }

    public bool TryReceiveAll(out IList<T> items)
    {
        if (((IReceivableSourceBlock<int>)_block).TryReceiveAll(out IList<int> items2))
        {
            T[] array = new T[items2.Count];
            lock (Locker)
                for (int i = 0; i < array.Length; i++)
                    array[i] = Dequeue();
            items = array; return true;
        }
        items = default; return false;
    }
}

使用示例:

var bufferBlock = new PriorityBufferBlock<SaleOrder>(x => x.HasPriority, 2.5);

以上实现支持内置的BufferBlock<T>的所有功能,除了不带非空filterTryReceive。该块的核心功能被委派给一个内部的TransformBlock<T, int>,其中包含每个存储在其中一个队列中的项目的虚拟零值。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接