使用TPL Dataflow按持续时间或阈值进行批处理

13

我已经使用TPL Dataflow实现了生产者..消费者模式。使用案例是代码从Kafka总线中读取消息。为了提高效率,当发送到数据库时,我们需要批量处理消息。

在TPL数据流中是否有一种方式可以保留消息,并在达到大小或持续时间阈值时触发?

例如,当前的实现在从队列中拉出消息后立即发布该消息。

    postedSuccessfully = targetBuffer.Post(msg.Value);

使用BatchBlockBatchBlock将收集消息,直到达到批处理大小,然后发出一组消息以进行下游处理。 - JSteward
1
谢谢。BatchBlock将收集消息。如果达到某个时间阈值,我还需要发出消息。是否有指定最大消息或超时阈值的选项? - Ashish Bhatia
没有现成的超时选项,但你可以使用定时器清空它。还有最大组和容量选项,可能有助于满足其他需求。 - JSteward
@AshishBhatia 为什么不使用响应式扩展呢?Buffer 允许按计数或时间段进行批处理,例如 mySequence.Buffer(TimeSpan.FromSeconds(1)) - Panagiotis Kanavos
4个回答

10
通过计数和持续时间进行缓冲已经可以通过System.Reactive和具体地,Buffer操作符实现。Buffer收集传入的事件,直到达到所需的计数或其时间跨度过期。
数据流块旨在与System.Reactive配合使用。通过使用DataflowBlock.AsObservable()AsObserver()扩展方法,块可以转换为可观察对象和观察者。
这使得构建缓冲块非常容易:
public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
    var inBlock = new BufferBlock<TIn>();
    var outBlock = new BufferBlock<IList<TIn>>();

    var outObserver=outBlock.AsObserver();
    inBlock.AsObservable()
            .Buffer(timeSpan, count)
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(outObserver);

    return DataflowBlock.Encapsulate(inBlock, outBlock);

}

这种方法使用两个缓冲块来缓冲输入和输出。在批次满或时间跨度过期时,Buffer()从输入块(可观察对象)读取数据并写入输出块(观察者)。
默认情况下,Rx在当前线程上工作。通过调用 ObserveOn(TaskPoolScheduler.Default),告诉它在任务池线程上处理数据。 示例 此代码创建一个缓冲块,可容纳 5 个项或 1 秒的数据。它首先发布 7 个项,然后等待 1.1 秒钟,然后再发布另外 7 个项。每个批次与线程 ID 一起写入控制台:
static async Task Main(string[] args)
{
    //Build the pipeline
    var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);

    var options = new DataflowLinkOptions { PropagateCompletion = true };
    var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
    bufferBlock.LinkTo(printBlock, options);

    //Start the messages
    Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");

    for (int i=0;i<7;i++)
    {
        bufferBlock.Post(i.ToString());
    }
    await Task.Delay(1100);
    for (int i=7; i < 14; i++)
    {
        bufferBlock.Post(i.ToString());
    }
    bufferBlock.Complete();
    Console.WriteLine($"Finishing");
    await bufferBlock.Completion;
    Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
    Console.ReadKey();
}

static void printOut(IEnumerable<string> items)
{
    var line = String.Join(",", items);
    Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}

输出如下:
Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6

在写入更多的数据之前,等待outBlock为空是可能的吗? - Ariel Moraes
2
它已经可用了。您可以通过“BoundedCapacity”选项指定任何块的边界,并使用“await block.SendAsync”将其发布到该块。如果块已满,“SendAsync”将异步等待。在相同情况下,“Post”将返回“false”。如果设置“BoundedCapacity=1”,则该方法仅在处理完先前的缓冲区后才会发布新的缓冲区。 - Panagiotis Kanavos

3
虽然没有现成的超时设置,但您可以在下游管道等待足够长时间以获取批处理数据时启动一个计时器,并在每次通过一批数据时重置计时器。 BatchBlock 会为您处理其余的事情。
例如,此示例已配置为始终使用批量大小为1,即使批量块通常会等待10个元素。 超时强制清空当前存储在 BatchBlock 中的任何内容。
public class BatchBlockExample
{
    [Test]
    public async Task BatchBlockWithTimeOut()
    {
        var batchBlock = new BatchBlock<int>(10);

        var timeOut = TimeSpan.FromSeconds(1);
        var timeOutTimer = new System.Timers.Timer(timeOut.TotalMilliseconds);
        timeOutTimer.Elapsed += (s, e) => batchBlock.TriggerBatch();            

        var actionBlock = new ActionBlock<IEnumerable<int>>(x =>
        {
            //Reset the timeout since we got a batch
            timeOutTimer.Stop();
            timeOutTimer.Start();
            Console.WriteLine($"Batch Size: {x.Count()}");
        });

        batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        timeOutTimer.Start();

        foreach(var item in Enumerable.Range(0, 5))
        {
            await Task.Delay(2000);
            await batchBlock.SendAsync(item);
        }

        batchBlock.Complete();
        await actionBlock.Completion;
    }
}

输出:

Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1

3
这里有一个稍微不同的方法。这个问题的难点在于如何知道BatchBlock<T>何时发出批次,以便停用内部计时器。我选择的解决方案是每次将ITargetBlock<T[]>连接到BatchBlock<T>时拦截TargetWrapper,并将TargetWrapper接收到的批次传递给真正的目标。

下面的TimeoutBatchBlock<T>类提供了BatchBlock<T>的全部功能。它具有所有API并支持所有选项。它是一个薄包装器,包装了一个BatchBlock<T>实例(加上每个链接目标的一个TargetWrapper实例)。
/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to batchSize, or when a timeout period has elapsed after receiving the first
/// item in the batch.
/// </summary>
public class TimeoutBatchBlock<T> : IPropagatorBlock<T, T[]>,
    IReceivableSourceBlock<T[]>
{
    private readonly BatchBlock<T> _source;
    private readonly TimeSpan _timeout;
    private readonly Timer _timer;
    private bool _timerEnabled;

    public TimeoutBatchBlock(int batchSize, TimeSpan timeout,
        GroupingDataflowBlockOptions dataflowBlockOptions)
    {
        // Arguments validation omitted
        _source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
        _timeout = timeout;
        _timer = new Timer(_ => _source.TriggerBatch());
        _timerEnabled = false;
    }

    public TimeoutBatchBlock(int batchSize, TimeSpan timeout) : this(batchSize,
        timeout, new GroupingDataflowBlockOptions())
    { }

    public int BatchSize => _source.BatchSize;
    public TimeSpan Timeout => _timeout;
    public Task Completion => _source.Completion;
    public int OutputCount => _source.OutputCount;

    public void Complete() => _source.Complete();

    void IDataflowBlock.Fault(Exception exception)
        => ((IDataflowBlock)_source).Fault(exception);

    public IDisposable LinkTo(ITargetBlock<T[]> target,
        DataflowLinkOptions linkOptions)
    {
        return _source.LinkTo(new TargetWrapper(target, this), linkOptions);
    }

    private class TargetWrapper : ITargetBlock<T[]>
    {
        private readonly ITargetBlock<T[]> _realTarget;
        private readonly TimeoutBatchBlock<T> _parent;

        public TargetWrapper(ITargetBlock<T[]> realTarget, TimeoutBatchBlock<T> parent)
        {
            _realTarget = realTarget;
            _parent = parent;
        }

        public Task Completion => _realTarget.Completion;
        public void Complete() => _realTarget.Complete();
        public void Fault(Exception exception) => _realTarget.Fault(exception);

        public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
            T[] messageValue, ISourceBlock<T[]> source, bool consumeToAccept)
        {
            var offerResult = _realTarget.OfferMessage(messageHeader,
                messageValue, source, consumeToAccept);
            if (offerResult == DataflowMessageStatus.Accepted)
                _parent.DeactivateTimerIfActive(); // The block emitted a new batch
            return offerResult;
        }
    }

    public void TriggerBatch() => _source.TriggerBatch();

    public bool TryReceive(Predicate<T[]> filter, out T[] item)
        => _source.TryReceive(filter, out item);

    public bool TryReceiveAll(out IList<T[]> items)
        => _source.TryReceiveAll(out items);

    private void SetTimerState(bool enabled)
    {
        lock (_timer)
        {
            if (enabled == _timerEnabled) return;
            _timer.Change(
                enabled ? _timeout : System.Threading.Timeout.InfiniteTimeSpan,
                System.Threading.Timeout.InfiniteTimeSpan);
            _timerEnabled = enabled;
        }
    }
    private void ActivateTimerIfInactive() => SetTimerState(true);
    private void DeactivateTimerIfActive() => SetTimerState(false);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
        bool consumeToAccept)
    {
        var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
            messageValue, source, consumeToAccept);
        if (offerResult == DataflowMessageStatus.Accepted)
            ActivateTimerIfInactive(); // The block received a new message
        return offerResult;
    }

    T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target, out bool messageConsumed)
            => ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
                target, out messageConsumed);

    bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);

    void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}

1
这里可以找到一个带有定时器的自定义BatchBlock<T>BatchUntilInactiveBlock<T>),其行为略有不同。 - Theodor Zoulias

3

我猜你可以使用这样的东西,基本上它只是一个同时具有BatchBlockTimeout的组合。

BatchBlockEx

public sealed class BatchBlockEx<T> : IDataflowBlock, IPropagatorBlock<T, T[]>, ISourceBlock<T[]>, ITargetBlock<T>, IReceivableSourceBlock<T[]>
{
   private readonly AsyncAutoResetEvent _asyncAutoResetEvent = new AsyncAutoResetEvent();

   private readonly BatchBlock<T> _base;

   private readonly CancellationToken _cancellationToken;

   private readonly int _triggerTimeMs;

   public BatchBlockEx(int batchSize, int triggerTimeMs)
   {
      _triggerTimeMs = triggerTimeMs;
      _base = new BatchBlock<T>(batchSize);
      PollReTrigger();
   }

   public BatchBlockEx(int batchSize, int triggerTimeMs, GroupingDataflowBlockOptions dataflowBlockOptions)
   {
      _triggerTimeMs = triggerTimeMs;
      _cancellationToken = dataflowBlockOptions.CancellationToken;
      _base = new BatchBlock<T>(batchSize, dataflowBlockOptions);
      PollReTrigger();
   }

   public int BatchSize => _base.BatchSize;

   public int OutputCount => _base.OutputCount;

   public Task Completion => _base.Completion;

   public void Complete() => _base.Complete();

   void IDataflowBlock.Fault(Exception exception) => ((IDataflowBlock)_base).Fault(exception);

   public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions) => _base.LinkTo(target, linkOptions);

   T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out bool messageConsumed) => ((ISourceBlock<T[]>)_base).ConsumeMessage(messageHeader, target, out messageConsumed);

   void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) => ((ISourceBlock<T[]>)_base).ReleaseReservation(messageHeader, target);

   bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) => ((ISourceBlock<T[]>)_base).ReserveMessage(messageHeader, target);

   DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
   {
      _asyncAutoResetEvent.Set();
      return ((ITargetBlock<T>)_base).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
   }

   public bool TryReceive(Predicate<T[]> filter, out T[] item) => _base.TryReceive(filter, out item);

   public bool TryReceiveAll(out IList<T[]> items) => _base.TryReceiveAll(out items);

   public override string ToString() => _base.ToString();

   public void TriggerBatch() => _base.TriggerBatch();

   private void PollReTrigger()
   {
      async Task Poll()
      {
         try
         {
            while (!_cancellationToken.IsCancellationRequested)
            {
               await _asyncAutoResetEvent.WaitAsync()
                                          .ConfigureAwait(false);

               await Task.Delay(_triggerTimeMs, _cancellationToken)
                           .ConfigureAwait(false); 
               TriggerBatch();
            }
         }
         catch (TaskCanceledException)
         {
            // nope
         }
      }

      Task.Run(Poll, _cancellationToken);
   }
}

AsyncAutoResetEvent

public class AsyncAutoResetEvent
{
   private static readonly Task _completed = Task.FromResult(true);
   private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
   private bool _signaled;

   public Task WaitAsync()
   {
      lock (_waits)
      {
         if (_signaled)
         {
            _signaled = false;
            return _completed;
         }

         var tcs = new TaskCompletionSource<bool>();
         _waits.Enqueue(tcs);
         return tcs.Task;
      }
   }

   public void Set()
   {
      TaskCompletionSource<bool> toRelease = null;

      lock (_waits)
         if (_waits.Count > 0)
            toRelease = _waits.Dequeue();
         else if (!_signaled)
            _signaled = true;

      toRelease?.SetResult(true);
   }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接