如何在超时后自动调用TriggerBatch,如果排队的项目数量小于BatchSize?

21

在TPL中使用Dataflow CTP

是否有一种方法,在超时后,如果当前排队或延迟的项目数量小于BatchSize,自动调用BatchBlock.TriggerBatch

更好的是:每当该块接收到新项目时,此超时应重置为0。

5个回答

29

是的,您可以通过链接块来优雅地完成此操作。在这种情况下,您需要设置一个TransformBlock,并将其“链接”到BatchBlock之前。代码应该如下所示:

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch());

TransformBlock<T, T> timeoutTransformBlock = new TransformBlock<T, T>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

timeoutTransformBlock.LinkTo(yourBatchBlock);

yourBufferBlock.LinkTo(timeoutTransformBlock);

8
这是Drew Marsh's想法的经过审查的版本。此实现使用DataflowBlock.Encapsulate方法创建一个封装了定时器+批处理功能的数据流块。除了新的参数timeout之外,CreateBatchBlock方法还支持所有可用于普通BatchBlock构造函数的选项。
public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
    int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
    dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
    var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
    var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
    var transformBlock = new TransformBlock<T, T>((T value) =>
    {
        timer.Change(timeout, Timeout.Infinite);
        return value;
    }, new ExecutionDataflowBlockOptions()
    {
        BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
        CancellationToken = dataflowBlockOptions.CancellationToken,
        EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
        MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
        NameFormat = dataflowBlockOptions.NameFormat,
        TaskScheduler = dataflowBlockOptions.TaskScheduler
    });
    transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
    {
        PropagateCompletion = true
    });
    return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}

注意:正如@Jeff在评论中指出的那样in a comment,这种方法存在竞争条件。如果timeout非常小(在毫秒级别),transformBlock将与timer竞争将数据传递给batchBlock,而timer可能会在batchBlock还没有任何内容之前先触发。最糟糕的情况是,我们会无限期地挂起。没有更多的消息进入队列,因为它们正在等待一些先前的消息完成,但是在最新的缓冲区中有一个落伍者永远不会触发。


下面是一个提供了整个BatchBlock<T>功能范围的BatchUntilInactiveBlock<T>类的替代方案。这个实现是对BatchBlock<T>实例的一个薄包装。它比之前的CreateBatchBlock实现具有更少的开销,同时具有类似的行为。它不受之前提到的竞争条件的影响。
/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to BatchSize, or when a Timeout period has elapsed after receiving the last item.
/// </summary>
public class BatchUntilInactiveBlock<T> : IPropagatorBlock<T, T[]>,
    IReceivableSourceBlock<T[]>
{
    private readonly BatchBlock<T> _source;
    private readonly Timer _timer;
    private readonly TimeSpan _timeout;

    public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout,
        GroupingDataflowBlockOptions dataflowBlockOptions)
    {
        _source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
        _timer = new Timer(_ => _source.TriggerBatch());
        _timeout = timeout;
    }

    public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout) : this(batchSize,
        timeout, new GroupingDataflowBlockOptions())
    { }

    public int BatchSize => _source.BatchSize;
    public TimeSpan Timeout => _timeout;
    public Task Completion => _source.Completion;
    public int OutputCount => _source.OutputCount;

    public void Complete() => _source.Complete();

    void IDataflowBlock.Fault(Exception exception)
        => ((IDataflowBlock)_source).Fault(exception);

    public IDisposable LinkTo(ITargetBlock<T[]> target,
        DataflowLinkOptions linkOptions)
            => _source.LinkTo(target, linkOptions);

    public void TriggerBatch() => _source.TriggerBatch();

    public bool TryReceive(Predicate<T[]> filter, out T[] item)
        => _source.TryReceive(filter, out item);

    public bool TryReceiveAll(out IList<T[]> items)
        => _source.TryReceiveAll(out items);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
        bool consumeToAccept)
    {
        var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
            messageValue, source, consumeToAccept);
        if (offerResult == DataflowMessageStatus.Accepted)
            _timer.Change(_timeout, System.Threading.Timeout.InfiniteTimeSpan);
        return offerResult;
    }

    T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target, out bool messageConsumed)
            => ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
                target, out messageConsumed);

    bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);

    void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}

_timerBatchBlock<T>被提供并接受消息后立即被调度。在调度计时器和提供消息之间没有时间窗口,因此不存在竞争。


免责声明:上述实现的行为并不理想,因为它们即使在不应该的情况下也会产生短批次。理想的行为是只有在批次可以立即传播到下游消费者时才产生短批次。产生短批次然后只将它们存储在块的输出缓冲区中并没有太多意义。只有在严格地推动CreateBatchBlock<T>/BatchUntilInactiveBlock<T>时,才能观察到与理想行为的偏差,例如如果链接的下游块是有界的,并且已经达到了其最大容量。

1
哈哈,是的,那是我。我确实看过那个帖子,但你在这里提供的完全自包含、"数据流纯"的解决方案是网络上最好的(在我看来)。封装方法很好用,我对它非常满意,无论它有多少额外开销,都不会成为我的特定用途的瓶颈。事实上,从美学角度来看,我仍然更喜欢它,乐高积木的方法是自然的选择。 - allmhuran
1
@allmhuran 是的,DataflowBlock.Encapsulate 方法很方便,通常比直接实现 IPropagatorBlock<TInput,TOutput> 接口要简短得多。另一方面,如果包含超出 IPropagatorBlock 接口提供的附加功能,则变得非常笨拙(必须使用 out Action 参数或类似的东西)。此外,有时候可以使用 IReceivableSourceBlock<TOutput> 接口也很方便。例如,它允许将数据流块转换为 IAsyncEnumerable<TOutput> 序列,如此处所示。 - Theodor Zoulias
1
@allmhuran 我将类的名称更改为 BatchUntilInactiveBlock<T>,因为原始名称 (TimeoutBatchBlock<T>) 在我看来更适合于其他行为(当第一个项目到达时激活计时器,而不是最后一个)。 - Theodor Zoulias
1
大家好,@allmhuran。有一个更正。今天我学到了,DataflowBlock.Encapsulate 返回一个实现 IPropagatorBlock<TInput,TOutput> 接口的实现,同时也实现了 IReceivableSourceBlock<TOutput> 接口(源代码)。虽然不是很明显,但如果你将其强制转换为 ((IReceivableSourceBlock<TOutput>)encapsulatedBlock),转换将会成功。这使得 DataflowBlock.Encapsulate 方法变得更加吸引人,因为它消除了其中一个被认为是缺点的问题。 - Theodor Zoulias
1
非常钦佩您回来并继续改进这个(和其他)主题的信息!这是我第一次考虑使用“封装”来做任何事情,因此我根本没有彻底探索它,我绝对没有意识到这一点。肯定很有用。 - allmhuran
显示剩余8条评论

4

感谢Drew Marsh提供了使用TransformBlock的想法,这在最近一个解决方案中对我非常有帮助。 然而,我认为在批处理块之后需要重置计时器(即在批处理大小达到或在计时器回调中显式调用TriggerBatch方法之后)。 如果每次获取单个项目时都重置计时器,则可能会不断地重置多次而根本不触发批处理(不断将“dueTime”推迟在计时器上)。

这将使代码片段如下所示:

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch(), null, 5000, Timeout.Infinite);

TransformBlock<T[], T[]> timeoutTransformBlock = new TransformBlock<T[], T[]>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

yourBufferBlock.LinkTo(yourBatchBlock);
yourBatchBlock.LinkTo(timeoutTransformBlock)
timeoutTransformBlock.LinkTo(yourActionBlock);

// Start the producer which is populating the BufferBlock etc.

两种计时方法针对不同的场景都是有意义的。 - Theodor Zoulias
1
我认为这种方法本身存在一个缺陷:
  1. 发布n<batchSize个工作项后,计时器将触发处理该块;
  2. 在批处理完成后,计时器将再次被触发;
  3. 如果在第2步中的计时器已经过去之后发布了一个工作项,则它将等待,直到有n> = batchSize个工作项可供处理。
- Graeme

3
这是一个建立在之前答案基础上的解决方案。该方法将现有的BatchBlock封装起来,其中一个推出批次的频率至少与timeout一样频繁。
其他答案没有处理当定时器触发时批处理块中没有项目的情况。在这种情况下,其他解决方案会等待批次填满。我们在非生产环境中遇到了这个问题,这使得测试变得更加困难。这个解决方案确保在将项目发布到BatchBlock后,最多在timeout秒后传播出去。
public static IPropagatorBlock<T, T[]> CreateTimeoutBatchBlock<T>(BatchBlock<T> batchBlock, int timeout)
{
    var timespan = TimeSpan.FromSeconds(timeout);
    var timer = new Timer(
        _ => batchBlock.TriggerBatch(),
        null,
        timespan,
        timespan);
    var transformBlock = new TransformBlock<T[], T[]>(
        value =>
        {
            // Reset the timer when a batch has been triggered
            timer.Change(timespan, timespan);
            return value;
        });
    batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
    return DataflowBlock.Encapsulate(batchBlock, transformBlock);
}

这确实可以避免竞态条件,但它的缺点是计时器会重复触发,即使管道处于空闲状态。老实说,我开始怀疑TPL Dataflow是否是一种适用于这种工作(或任何工作)的强大工具。它的组件之间连接过于松散。 - Theodor Zoulias
1
我认为定时器的重复触发是一个小的缺点,因为如果队列为空,它基本上什么也不做。但我同意你的另一个观点,当工作流程适应现有结构时,TPL对我来说效果最好。 - undefined

-1

你可以使用链接选项

_transformManyBlock.LinkTo(_batchBlock, new DataflowLinkOptions {PropagateCompletion = true});

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接