在TPL中使用Dataflow CTP
是否有一种方法,在超时后,如果当前排队或延迟的项目数量小于BatchSize
,自动调用BatchBlock.TriggerBatch
?
更好的是:每当该块接收到新项目时,此超时应重置为0。
在TPL中使用Dataflow CTP
是否有一种方法,在超时后,如果当前排队或延迟的项目数量小于BatchSize
,自动调用BatchBlock.TriggerBatch
?
更好的是:每当该块接收到新项目时,此超时应重置为0。
是的,您可以通过链接块来优雅地完成此操作。在这种情况下,您需要设置一个TransformBlock,并将其“链接”到BatchBlock之前。代码应该如下所示:
Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch());
TransformBlock<T, T> timeoutTransformBlock = new TransformBlock<T, T>((value) =>
{
triggerBatchTimer.Change(5000, Timeout.Infinite);
return value;
});
timeoutTransformBlock.LinkTo(yourBatchBlock);
yourBufferBlock.LinkTo(timeoutTransformBlock);
DataflowBlock.Encapsulate
方法创建一个封装了定时器+批处理功能的数据流块。除了新的参数timeout
之外,CreateBatchBlock
方法还支持所有可用于普通BatchBlock
构造函数的选项。public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
var transformBlock = new TransformBlock<T, T>((T value) =>
{
timer.Change(timeout, Timeout.Infinite);
return value;
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
CancellationToken = dataflowBlockOptions.CancellationToken,
EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
NameFormat = dataflowBlockOptions.NameFormat,
TaskScheduler = dataflowBlockOptions.TaskScheduler
});
transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
{
PropagateCompletion = true
});
return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}
注意:正如@Jeff在评论中指出的那样in a comment,这种方法存在竞争条件。如果timeout
非常小(在毫秒级别),transformBlock
将与timer
竞争将数据传递给batchBlock
,而timer
可能会在batchBlock
还没有任何内容之前先触发。最糟糕的情况是,我们会无限期地挂起。没有更多的消息进入队列,因为它们正在等待一些先前的消息完成,但是在最新的缓冲区中有一个落伍者永远不会触发。
BatchBlock<T>
功能范围的BatchUntilInactiveBlock<T>
类的替代方案。这个实现是对BatchBlock<T>
实例的一个薄包装。它比之前的CreateBatchBlock
实现具有更少的开销,同时具有类似的行为。它不受之前提到的竞争条件的影响。/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to BatchSize, or when a Timeout period has elapsed after receiving the last item.
/// </summary>
public class BatchUntilInactiveBlock<T> : IPropagatorBlock<T, T[]>,
IReceivableSourceBlock<T[]>
{
private readonly BatchBlock<T> _source;
private readonly Timer _timer;
private readonly TimeSpan _timeout;
public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout,
GroupingDataflowBlockOptions dataflowBlockOptions)
{
_source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
_timer = new Timer(_ => _source.TriggerBatch());
_timeout = timeout;
}
public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout) : this(batchSize,
timeout, new GroupingDataflowBlockOptions())
{ }
public int BatchSize => _source.BatchSize;
public TimeSpan Timeout => _timeout;
public Task Completion => _source.Completion;
public int OutputCount => _source.OutputCount;
public void Complete() => _source.Complete();
void IDataflowBlock.Fault(Exception exception)
=> ((IDataflowBlock)_source).Fault(exception);
public IDisposable LinkTo(ITargetBlock<T[]> target,
DataflowLinkOptions linkOptions)
=> _source.LinkTo(target, linkOptions);
public void TriggerBatch() => _source.TriggerBatch();
public bool TryReceive(Predicate<T[]> filter, out T[] item)
=> _source.TryReceive(filter, out item);
public bool TryReceiveAll(out IList<T[]> items)
=> _source.TryReceiveAll(out items);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(
DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
bool consumeToAccept)
{
var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
if (offerResult == DataflowMessageStatus.Accepted)
_timer.Change(_timeout, System.Threading.Timeout.InfiniteTimeSpan);
return offerResult;
}
T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target, out bool messageConsumed)
=> ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
target, out messageConsumed);
bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
=> ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);
void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
=> ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}
_timer
在BatchBlock<T>
被提供并接受消息后立即被调度。在调度计时器和提供消息之间没有时间窗口,因此不存在竞争。
CreateBatchBlock<T>
/BatchUntilInactiveBlock<T>
时,才能观察到与理想行为的偏差,例如如果链接的下游块是有界的,并且已经达到了其最大容量。感谢Drew Marsh提供了使用TransformBlock的想法,这在最近一个解决方案中对我非常有帮助。 然而,我认为在批处理块之后需要重置计时器(即在批处理大小达到或在计时器回调中显式调用TriggerBatch方法之后)。 如果每次获取单个项目时都重置计时器,则可能会不断地重置多次而根本不触发批处理(不断将“dueTime”推迟在计时器上)。
这将使代码片段如下所示:
Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch(), null, 5000, Timeout.Infinite);
TransformBlock<T[], T[]> timeoutTransformBlock = new TransformBlock<T[], T[]>((value) =>
{
triggerBatchTimer.Change(5000, Timeout.Infinite);
return value;
});
yourBufferBlock.LinkTo(yourBatchBlock);
yourBatchBlock.LinkTo(timeoutTransformBlock)
timeoutTransformBlock.LinkTo(yourActionBlock);
// Start the producer which is populating the BufferBlock etc.
BatchBlock
封装起来,其中一个推出批次的频率至少与timeout
一样频繁。BatchBlock
后,最多在timeout
秒后传播出去。public static IPropagatorBlock<T, T[]> CreateTimeoutBatchBlock<T>(BatchBlock<T> batchBlock, int timeout)
{
var timespan = TimeSpan.FromSeconds(timeout);
var timer = new Timer(
_ => batchBlock.TriggerBatch(),
null,
timespan,
timespan);
var transformBlock = new TransformBlock<T[], T[]>(
value =>
{
// Reset the timer when a batch has been triggered
timer.Change(timespan, timespan);
return value;
});
batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
return DataflowBlock.Encapsulate(batchBlock, transformBlock);
}
你可以使用链接选项
_transformManyBlock.LinkTo(_batchBlock, new DataflowLinkOptions {PropagateCompletion = true});
DataflowBlock.Encapsulate
方法很方便,通常比直接实现IPropagatorBlock<TInput,TOutput>
接口要简短得多。另一方面,如果包含超出IPropagatorBlock
接口提供的附加功能,则变得非常笨拙(必须使用out Action
参数或类似的东西)。此外,有时候可以使用IReceivableSourceBlock<TOutput>
接口也很方便。例如,它允许将数据流块转换为IAsyncEnumerable<TOutput>
序列,如此处所示。 - Theodor ZouliasBatchUntilInactiveBlock<T>
,因为原始名称 (TimeoutBatchBlock<T>
) 在我看来更适合于其他行为(当第一个项目到达时激活计时器,而不是最后一个)。 - Theodor ZouliasDataflowBlock.Encapsulate
返回一个实现IPropagatorBlock<TInput,TOutput>
接口的实现,同时也实现了IReceivableSourceBlock<TOutput>
接口(源代码)。虽然不是很明显,但如果你将其强制转换为((IReceivableSourceBlock<TOutput>)encapsulatedBlock)
,转换将会成功。这使得DataflowBlock.Encapsulate
方法变得更加吸引人,因为它消除了其中一个被认为是缺点的问题。 - Theodor Zoulias