我已经使用TPL Dataflow实现了生产者..消费者模式。使用案例是代码从Kafka总线中读取消息。为了提高效率,当发送到数据库时,我们需要批量处理消息。
在TPL数据流中是否有一种方式可以保留消息,并在达到大小或持续时间阈值时触发?
例如,当前的实现在从队列中拉出消息后立即发布该消息。
postedSuccessfully = targetBuffer.Post(msg.Value);
我已经使用TPL Dataflow实现了生产者..消费者模式。使用案例是代码从Kafka总线中读取消息。为了提高效率,当发送到数据库时,我们需要批量处理消息。
在TPL数据流中是否有一种方式可以保留消息,并在达到大小或持续时间阈值时触发?
例如,当前的实现在从队列中拉出消息后立即发布该消息。
postedSuccessfully = targetBuffer.Post(msg.Value);
public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
var inBlock = new BufferBlock<TIn>();
var outBlock = new BufferBlock<IList<TIn>>();
var outObserver=outBlock.AsObserver();
inBlock.AsObservable()
.Buffer(timeSpan, count)
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(outObserver);
return DataflowBlock.Encapsulate(inBlock, outBlock);
}
Buffer()
从输入块(可观察对象)读取数据并写入输出块(观察者)。ObserveOn(TaskPoolScheduler.Default)
,告诉它在任务池线程上处理数据。
示例
此代码创建一个缓冲块,可容纳 5 个项或 1 秒的数据。它首先发布 7 个项,然后等待 1.1 秒钟,然后再发布另外 7 个项。每个批次与线程 ID 一起写入控制台:static async Task Main(string[] args)
{
//Build the pipeline
var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);
var options = new DataflowLinkOptions { PropagateCompletion = true };
var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
bufferBlock.LinkTo(printBlock, options);
//Start the messages
Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");
for (int i=0;i<7;i++)
{
bufferBlock.Post(i.ToString());
}
await Task.Delay(1100);
for (int i=7; i < 14; i++)
{
bufferBlock.Post(i.ToString());
}
bufferBlock.Complete();
Console.WriteLine($"Finishing");
await bufferBlock.Completion;
Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
Console.ReadKey();
}
static void printOut(IEnumerable<string> items)
{
var line = String.Join(",", items);
Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}
Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6
BatchBlock
会为您处理其余的事情。BatchBlock
中的任何内容。public class BatchBlockExample
{
[Test]
public async Task BatchBlockWithTimeOut()
{
var batchBlock = new BatchBlock<int>(10);
var timeOut = TimeSpan.FromSeconds(1);
var timeOutTimer = new System.Timers.Timer(timeOut.TotalMilliseconds);
timeOutTimer.Elapsed += (s, e) => batchBlock.TriggerBatch();
var actionBlock = new ActionBlock<IEnumerable<int>>(x =>
{
//Reset the timeout since we got a batch
timeOutTimer.Stop();
timeOutTimer.Start();
Console.WriteLine($"Batch Size: {x.Count()}");
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });
timeOutTimer.Start();
foreach(var item in Enumerable.Range(0, 5))
{
await Task.Delay(2000);
await batchBlock.SendAsync(item);
}
batchBlock.Complete();
await actionBlock.Completion;
}
}
输出:
Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1
BatchBlock<T>
何时发出批次,以便停用内部计时器。我选择的解决方案是每次将ITargetBlock<T[]>
连接到BatchBlock<T>
时拦截TargetWrapper
,并将TargetWrapper
接收到的批次传递给真正的目标。
下面的TimeoutBatchBlock<T>
类提供了BatchBlock<T>
的全部功能。它具有所有API并支持所有选项。它是一个薄包装器,包装了一个BatchBlock<T>
实例(加上每个链接目标的一个TargetWrapper
实例)。/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to batchSize, or when a timeout period has elapsed after receiving the first
/// item in the batch.
/// </summary>
public class TimeoutBatchBlock<T> : IPropagatorBlock<T, T[]>,
IReceivableSourceBlock<T[]>
{
private readonly BatchBlock<T> _source;
private readonly TimeSpan _timeout;
private readonly Timer _timer;
private bool _timerEnabled;
public TimeoutBatchBlock(int batchSize, TimeSpan timeout,
GroupingDataflowBlockOptions dataflowBlockOptions)
{
// Arguments validation omitted
_source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
_timeout = timeout;
_timer = new Timer(_ => _source.TriggerBatch());
_timerEnabled = false;
}
public TimeoutBatchBlock(int batchSize, TimeSpan timeout) : this(batchSize,
timeout, new GroupingDataflowBlockOptions())
{ }
public int BatchSize => _source.BatchSize;
public TimeSpan Timeout => _timeout;
public Task Completion => _source.Completion;
public int OutputCount => _source.OutputCount;
public void Complete() => _source.Complete();
void IDataflowBlock.Fault(Exception exception)
=> ((IDataflowBlock)_source).Fault(exception);
public IDisposable LinkTo(ITargetBlock<T[]> target,
DataflowLinkOptions linkOptions)
{
return _source.LinkTo(new TargetWrapper(target, this), linkOptions);
}
private class TargetWrapper : ITargetBlock<T[]>
{
private readonly ITargetBlock<T[]> _realTarget;
private readonly TimeoutBatchBlock<T> _parent;
public TargetWrapper(ITargetBlock<T[]> realTarget, TimeoutBatchBlock<T> parent)
{
_realTarget = realTarget;
_parent = parent;
}
public Task Completion => _realTarget.Completion;
public void Complete() => _realTarget.Complete();
public void Fault(Exception exception) => _realTarget.Fault(exception);
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
T[] messageValue, ISourceBlock<T[]> source, bool consumeToAccept)
{
var offerResult = _realTarget.OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
if (offerResult == DataflowMessageStatus.Accepted)
_parent.DeactivateTimerIfActive(); // The block emitted a new batch
return offerResult;
}
}
public void TriggerBatch() => _source.TriggerBatch();
public bool TryReceive(Predicate<T[]> filter, out T[] item)
=> _source.TryReceive(filter, out item);
public bool TryReceiveAll(out IList<T[]> items)
=> _source.TryReceiveAll(out items);
private void SetTimerState(bool enabled)
{
lock (_timer)
{
if (enabled == _timerEnabled) return;
_timer.Change(
enabled ? _timeout : System.Threading.Timeout.InfiniteTimeSpan,
System.Threading.Timeout.InfiniteTimeSpan);
_timerEnabled = enabled;
}
}
private void ActivateTimerIfInactive() => SetTimerState(true);
private void DeactivateTimerIfActive() => SetTimerState(false);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(
DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
bool consumeToAccept)
{
var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
if (offerResult == DataflowMessageStatus.Accepted)
ActivateTimerIfInactive(); // The block received a new message
return offerResult;
}
T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target, out bool messageConsumed)
=> ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
target, out messageConsumed);
bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
=> ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);
void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
=> ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}
我猜你可以使用这样的东西,基本上它只是一个同时具有BatchBlock
和Timeout
的组合。
BatchBlockEx
public sealed class BatchBlockEx<T> : IDataflowBlock, IPropagatorBlock<T, T[]>, ISourceBlock<T[]>, ITargetBlock<T>, IReceivableSourceBlock<T[]>
{
private readonly AsyncAutoResetEvent _asyncAutoResetEvent = new AsyncAutoResetEvent();
private readonly BatchBlock<T> _base;
private readonly CancellationToken _cancellationToken;
private readonly int _triggerTimeMs;
public BatchBlockEx(int batchSize, int triggerTimeMs)
{
_triggerTimeMs = triggerTimeMs;
_base = new BatchBlock<T>(batchSize);
PollReTrigger();
}
public BatchBlockEx(int batchSize, int triggerTimeMs, GroupingDataflowBlockOptions dataflowBlockOptions)
{
_triggerTimeMs = triggerTimeMs;
_cancellationToken = dataflowBlockOptions.CancellationToken;
_base = new BatchBlock<T>(batchSize, dataflowBlockOptions);
PollReTrigger();
}
public int BatchSize => _base.BatchSize;
public int OutputCount => _base.OutputCount;
public Task Completion => _base.Completion;
public void Complete() => _base.Complete();
void IDataflowBlock.Fault(Exception exception) => ((IDataflowBlock)_base).Fault(exception);
public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions) => _base.LinkTo(target, linkOptions);
T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out bool messageConsumed) => ((ISourceBlock<T[]>)_base).ConsumeMessage(messageHeader, target, out messageConsumed);
void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) => ((ISourceBlock<T[]>)_base).ReleaseReservation(messageHeader, target);
bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) => ((ISourceBlock<T[]>)_base).ReserveMessage(messageHeader, target);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
_asyncAutoResetEvent.Set();
return ((ITargetBlock<T>)_base).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
public bool TryReceive(Predicate<T[]> filter, out T[] item) => _base.TryReceive(filter, out item);
public bool TryReceiveAll(out IList<T[]> items) => _base.TryReceiveAll(out items);
public override string ToString() => _base.ToString();
public void TriggerBatch() => _base.TriggerBatch();
private void PollReTrigger()
{
async Task Poll()
{
try
{
while (!_cancellationToken.IsCancellationRequested)
{
await _asyncAutoResetEvent.WaitAsync()
.ConfigureAwait(false);
await Task.Delay(_triggerTimeMs, _cancellationToken)
.ConfigureAwait(false);
TriggerBatch();
}
}
catch (TaskCanceledException)
{
// nope
}
}
Task.Run(Poll, _cancellationToken);
}
}
AsyncAutoResetEvent
public class AsyncAutoResetEvent
{
private static readonly Task _completed = Task.FromResult(true);
private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
private bool _signaled;
public Task WaitAsync()
{
lock (_waits)
{
if (_signaled)
{
_signaled = false;
return _completed;
}
var tcs = new TaskCompletionSource<bool>();
_waits.Enqueue(tcs);
return tcs.Task;
}
}
public void Set()
{
TaskCompletionSource<bool> toRelease = null;
lock (_waits)
if (_waits.Count > 0)
toRelease = _waits.Dequeue();
else if (!_signaled)
_signaled = true;
toRelease?.SetResult(true);
}
}
BatchBlock
。BatchBlock
将收集消息,直到达到批处理大小,然后发出一组消息以进行下游处理。 - JStewardmySequence.Buffer(TimeSpan.FromSeconds(1))
。 - Panagiotis Kanavos