这里有一个实现了
PriorityBufferBlock<T>
类的例子,它比低优先级项目更频繁地传播高优先级项目。该类的构造函数有一个
priorityPrecedence
参数,用于定义每个低优先级项目将传播多少个高优先级项目。如果此参数的值为
1.0
(最小有效值),则没有真正的优先级可言。如果此参数的值为
Double.PositiveInfinity
,只要队列中有高优先级项目,就不会传播任何低优先级项目。如果此参数具有更常见的值,例如
5.0
,则每5个高优先级项目将传播一个低优先级项目。
该类在内部维护两个队列,一个用于高优先级项目,一个用于低优先级项目。除非两个列表之一为空,在这种情况下,另一个队列的所有项目都可以自由地按需传播,否则不考虑存储在每个队列中的项目数量。当两个内部队列都非空时,
priorityPrecedence
参数才会影响类的行为。否则,如果只有一个队列有项目,则
PriorityBufferBlock<T>
的行为就像普通的
BufferBlock<T>
。
public class PriorityBufferBlock<T> : IPropagatorBlock<T, T>,
IReceivableSourceBlock<T>
{
private readonly IPropagatorBlock<T, int> _block;
private readonly Queue<T> _highQueue = new();
private readonly Queue<T> _lowQueue = new();
private readonly Predicate<T> _hasPriorityPredicate;
private readonly double _priorityPrecedence;
private double _priorityCounter = 0;
private object Locker => _highQueue;
public PriorityBufferBlock(Predicate<T> hasPriorityPredicate,
double priorityPrecedence,
DataflowBlockOptions dataflowBlockOptions = null)
{
ArgumentNullException.ThrowIfNull(hasPriorityPredicate);
if (priorityPrecedence < 1.0)
throw new ArgumentOutOfRangeException(nameof(priorityPrecedence));
_hasPriorityPredicate = hasPriorityPredicate;
_priorityPrecedence = priorityPrecedence;
dataflowBlockOptions ??= new();
_block = new TransformBlock<T, int>(item =>
{
bool hasPriority = _hasPriorityPredicate(item);
Queue<T> selectedQueue = hasPriority ? _highQueue : _lowQueue;
lock (Locker) selectedQueue.Enqueue(item);
return 0;
}, new()
{
BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
CancellationToken = dataflowBlockOptions.CancellationToken,
MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask
});
this.Completion = _block.Completion.ContinueWith(completion =>
{
Debug.Assert(this.Count == 0 || !completion.IsCompletedSuccessfully);
lock (Locker) { _highQueue.Clear(); _lowQueue.Clear(); }
return completion;
}, default, TaskContinuationOptions.ExecuteSynchronously |
TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();
}
public Task Completion { get; private init; }
public void Complete() => _block.Complete();
void IDataflowBlock.Fault(Exception exception) => _block.Fault(exception);
public int Count
{
get { lock (Locker) return _highQueue.Count + _lowQueue.Count; }
}
private Queue<T> GetSelectedQueue(bool forDequeue)
{
Debug.Assert(Monitor.IsEntered(Locker));
Queue<T> selectedQueue;
if (_highQueue.Count == 0)
selectedQueue = _lowQueue;
else if (_lowQueue.Count == 0)
selectedQueue = _highQueue;
else if (_priorityCounter + 1 > _priorityPrecedence)
selectedQueue = _lowQueue;
else
selectedQueue = _highQueue;
if (forDequeue)
{
if (_highQueue.Count == 0 || _lowQueue.Count == 0)
_priorityCounter = 0;
else if (++_priorityCounter > _priorityPrecedence)
_priorityCounter -= _priorityPrecedence + 1;
}
return selectedQueue;
}
private T Peek()
{
Debug.Assert(Monitor.IsEntered(Locker));
Debug.Assert(_highQueue.Count > 0 || _lowQueue.Count > 0);
return GetSelectedQueue(false).Peek();
}
private T Dequeue()
{
Debug.Assert(Monitor.IsEntered(Locker));
Debug.Assert(_highQueue.Count > 0 || _lowQueue.Count > 0);
return GetSelectedQueue(true).Dequeue();
}
private class TargetProxy : ITargetBlock<int>
{
private readonly PriorityBufferBlock<T> _parent;
private readonly ITargetBlock<T> _realTarget;
public TargetProxy(PriorityBufferBlock<T> parent, ITargetBlock<T> target)
{
Debug.Assert(parent is not null);
_parent = parent;
_realTarget = target ?? throw new ArgumentNullException(nameof(target));
}
public Task Completion => throw new NotSupportedException();
public void Complete() => _realTarget.Complete();
void IDataflowBlock.Fault(Exception error) => _realTarget.Fault(error);
DataflowMessageStatus ITargetBlock<int>.OfferMessage(
DataflowMessageHeader messageHeader, int messageValue,
ISourceBlock<int> source, bool consumeToAccept)
{
Debug.Assert(messageValue == 0);
if (consumeToAccept) throw new NotSupportedException();
lock (_parent.Locker)
{
T realValue = _parent.Peek();
DataflowMessageStatus response = _realTarget.OfferMessage(
messageHeader, realValue, _parent, consumeToAccept);
if (response == DataflowMessageStatus.Accepted) _parent.Dequeue();
return response;
}
}
}
public IDisposable LinkTo(ITargetBlock<T> target,
DataflowLinkOptions linkOptions)
=> _block.LinkTo(new TargetProxy(this, target), linkOptions);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(
DataflowMessageHeader messageHeader, T messageValue,
ISourceBlock<T> source, bool consumeToAccept)
=> _block.OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T> target, out bool messageConsumed)
{
_ = _block.ConsumeMessage(messageHeader, new TargetProxy(this, target),
out messageConsumed);
if (messageConsumed) lock (Locker) return Dequeue();
return default;
}
bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T> target)
=> _block.ReserveMessage(messageHeader, new TargetProxy(this, target));
void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<T> target)
=> _block.ReleaseReservation(messageHeader, new TargetProxy(this, target));
public bool TryReceive(Predicate<T> filter, out T item)
{
if (filter is not null) throw new NotSupportedException();
if (((IReceivableSourceBlock<int>)_block).TryReceive(null, out _))
{
lock (Locker) item = Dequeue(); return true;
}
item = default; return false;
}
public bool TryReceiveAll(out IList<T> items)
{
if (((IReceivableSourceBlock<int>)_block).TryReceiveAll(out IList<int> items2))
{
T[] array = new T[items2.Count];
lock (Locker)
for (int i = 0; i < array.Length; i++)
array[i] = Dequeue();
items = array; return true;
}
items = default; return false;
}
}
使用示例:
var bufferBlock = new PriorityBufferBlock<SaleOrder>(x => x.HasPriority, 2.5);
以上实现支持内置的BufferBlock<T>
的所有功能,除了不带非空filter
的TryReceive
。该块的核心功能被委派给一个内部的TransformBlock<T, int>
,其中包含每个存储在其中一个队列中的项目的虚拟零值。