我有一个由多个块组成的Dataflow管道。
当元素通过我的处理管道时,我希望按字段 A
将它们分组。为此,我使用一个具有高 BoundedCapacity
的 BatchBlock
。在其中存储我的元素,直到我决定释放它们。因此,我调用TriggerBatch()
方法。
private void Forward(TStronglyTyped data)
{
if (ShouldCreateNewGroup(data))
{
GroupingBlock.TriggerBatch();
}
GroupingBlock.SendAsync(data).Wait(SendTimeout);
}
这是样子。 问题在于,批量生产的产品有时会包含下一个帖子中的元素,这不应该出现在此处。
为了说明:
BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);
在这个点上,我希望我的批次是 {A,A,A}
,但实际上它是 {A,A,A,B}
就像 TriggerBatch()
是异步的,而 SendAsync
实际上是在批处理实际制作之前执行的。
我该如何解决这个问题?
显然,我不想在那里放置 Task.Wait(x)
(我尝试过了,它可以工作,但性能当然会变差)。