BatchBlock生成批处理,其中包含在TriggerBatch()之后发送的元素。

4

我有一个由多个块组成的Dataflow管道。 当元素通过我的处理管道时,我希望按字段 A 将它们分组。为此,我使用一个具有高 BoundedCapacityBatchBlock。在其中存储我的元素,直到我决定释放它们。因此,我调用TriggerBatch()方法。

private void Forward(TStronglyTyped data)
{
    if (ShouldCreateNewGroup(data))
    {
        GroupingBlock.TriggerBatch();
    }

 GroupingBlock.SendAsync(data).Wait(SendTimeout);
}

这是样子。 问题在于,批量生产的产品有时会包含下一个帖子中的元素,这不应该出现在此处。
为了说明:
BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);

在这个点上,我希望我的批次是 {A,A,A},但实际上它是 {A,A,A,B}

就像 TriggerBatch() 是异步的,而 SendAsync 实际上是在批处理实际制作之前执行的。

我该如何解决这个问题? 显然,我不想在那里放置 Task.Wait(x)(我尝试过了,它可以工作,但性能当然会变差)。


1
你没有解释如何调用“Forward”,但几乎可以肯定的是,在调用“ShouldCreate”和“TriggerBatch”之间,另一个消息被发布到了它上面。这并没有什么问题,只是它应该工作的方式。你不应该试图从外部触发BatchBlock。避免这种问题的唯一方法是从内部触发它。使用DataflowBlock.Encapsulate创建自定义块,将ActionBlock公开为输入,将BatchBlock或BufferBlock公开为输出。在ActionBlock中,检查输入,然后添加消息或触发批处理。 - Panagiotis Kanavos
1
请查看此示例,它使用ActionBlock创建了一个SlidingWindow块,使用Queue进行存储,并使用BufferBlock进行输出。链接为https://msdn.microsoft.com/en-us/library/hh228606(v=vs.110).aspx - Panagiotis Kanavos
“Forward”是从前面的ActionBlock中调用的,该ActionBlock紧接着BatchBlock执行。我已经禁止了并行处理,所以每个块应该一次处理一个消息,是吗? - wojciech_rak
谁在向BatchBlock发布呢?它不能与ActionBlock链接,那么它从哪里获取数据呢?无论如何,您不需要BatchBlock,可以使用简单的队列、列表等,在适当的时候仅发布所有缓存对象的数组。这就是SlidingWindow示例所做的。 - Panagiotis Kanavos
你说得对。我稍微修改了SlidingWindow的示例。 在“ActionBlock”部分,我检查当前数据是否应该被推出。现在一切都按照我的意愿运作。谢谢! - wojciech_rak
2个回答

5
我也遇到了这个问题,因为我试图在错误的地方调用TriggerBatch。如前所述,使用DataflowBlock.Encapsulate的SlidingWindow示例是解决方法,但需要一些时间来适应,因此我想分享我的完成块。
我的ConditionalBatchBlock创建最多为指定大小的批次,并在满足特定条件时可能提前结束。在我的特定场景中,我需要创建100个批次,但总是在检测到数据中的某些更改时创建新批次。
public static IPropagatorBlock<T, T[]> CreateConditionalBatchBlock<T>(int batchSize, Func<Queue<T>, T, bool> condition)
{
    var queue = new Queue<T>();

    var source = new BufferBlock<T[]>();

    var target = new ActionBlock<T>(async item =>
    {
        // start a new batch if required by the condition
        if (condition(queue, item))
        {
            await source.SendAsync(queue.ToArray());
            queue.Clear();
        }

        queue.Enqueue(item);

        // always send a batch when the max size has been reached
        if (queue.Count == batchSize)
        {
            await source.SendAsync(queue.ToArray());
            queue.Clear();
        }
    });

    // send any remaining items
    target.Completion.ContinueWith(async t =>
    {
        if (queue.Any())
            await source.SendAsync(queue.ToArray());

        source.Complete();
    });

    return DataflowBlock.Encapsulate(target, source);
}
condition参数在你的情况下可能更简单。我需要查看队列以及当前项,才能确定是否创建一个新的批处理。
我是这样使用的:
public async Task RunExampleAsync<T>()
{
    var conditionalBatchBlock = CreateConditionalBatchBlock<T>(100, (queue, currentItem) => ShouldCreateNewBatch(queue, currentItem));

    var actionBlock = new ActionBlock<T[]>(async x => await PerformActionAsync(x));

    conditionalBatchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    await ReadDataAsync<T>(conditionalBatchBlock);

    await actionBlock.Completion;
}

0
这是 Loren Paulsen 的 CreateConditionalBatchBlock 方法的专门版本。该方法接受一个 Func<TItem, TKey> keySelector 参数,并在收到具有不同键的项时发出新的批处理。
public static IPropagatorBlock<TItem, TItem[]> CreateConditionalBatchBlock<TItem, TKey>(
    Func<TItem, TKey> keySelector,
    DataflowBlockOptions dataflowBlockOptions = null,
    int maxBatchSize = DataflowBlockOptions.Unbounded,
    IEqualityComparer<TKey> keyComparer = null)
{
    if (keySelector == null) throw new ArgumentNullException(nameof(keySelector));
    if (maxBatchSize < 1 && maxBatchSize != DataflowBlockOptions.Unbounded)
        throw new ArgumentOutOfRangeException(nameof(maxBatchSize));

    keyComparer = keyComparer ?? EqualityComparer<TKey>.Default;
    var options = new ExecutionDataflowBlockOptions();
    if (dataflowBlockOptions != null)
    {
        options.BoundedCapacity = dataflowBlockOptions.BoundedCapacity;
        options.CancellationToken = dataflowBlockOptions.CancellationToken;
        options.MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask;
        options.TaskScheduler = dataflowBlockOptions.TaskScheduler;
    }

    var output = new BufferBlock<TItem[]>(options);

    var queue = new Queue<TItem>(); // Synchronization is not needed
    TKey previousKey = default;

    var input = new ActionBlock<TItem>(async item =>
    {
        var key = keySelector(item);
        if (queue.Count > 0 && !keyComparer.Equals(key, previousKey))
        {
            await output.SendAsync(queue.ToArray()).ConfigureAwait(false);
            queue.Clear();
        }
        queue.Enqueue(item);
        previousKey = key;

        if (queue.Count == maxBatchSize)
        {
            await output.SendAsync(queue.ToArray()).ConfigureAwait(false);
            queue.Clear();
        }
    }, options);

    _ = input.Completion.ContinueWith(async t =>
    {
        if (queue.Count > 0)
        {
            await output.SendAsync(queue.ToArray()).ConfigureAwait(false);
            queue.Clear();
        }
        if (t.IsFaulted)
        {
            ((IDataflowBlock)output).Fault(t.Exception.InnerException);
        }
        else
        {
            output.Complete();
        }
    }, TaskScheduler.Default);

    return DataflowBlock.Encapsulate(input, output);
}

这里可以找到一个更“重”的自定义BatchBlock实现:链接 - Theodor Zoulias
关于“不需要同步”的评论。ActionBlock是有意创建的,其MaxDegreeOfParallelism = 1?这就是为什么不需要同步的原因吗? - ben92
@ben92 是的,完全正确。 - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接