如何分批消费 BlockingCollection<T>

17

我已经写了一些代码来消费队列中所有的等待项。与其逐个处理这些项目,按集合方式处理所有等待的项目更有意义。

我这样声明了我的队列。

private BlockingCollection<Item> items = 
    new BlockingCollection<Item>(new ConcurrentQueue<Item>);

然后,在一个消费者线程上,我计划以批的方式读取这些项目,就像这样:

Item nextItem;
while (this.items.TryTake(out nextItem, -1))
{
    var workToDo = new List<Item>();
    workToDo.Add(nextItem);

    while(this.items.TryTake(out nextItem))
    {
        workToDo.Add(nextItem);
    }

    // process workToDo, then go back to the queue.
}

这种方法缺乏使用GetConsumingEnumerable的实用性,我不禁想知道是否有更好的方法,或者我的方法是否存在缺陷。

是否有更好的方式来批量消费 BlockingCollection<T>


1
你可以在 ConsumingEnumerable 上执行 Take(50),但这样会失去 50 毫秒超时的效果。所以选择更重要的事情。 - H H
@HenkHolterman,你说得对,我实际上不需要那个,如果物品生产速度更快,这将会成为问题。 - Jodrell
@HenkHolterman,问题已相应编辑。 - Jodrell
不需要更快的速度,但速度过慢会有问题。您当前的代码(无超时)不太适合被 ConsumingEnumerable 替换。 - H H
@HenkHolterman,我想这取决于我是否想等待更大的批次,如果是这样,就需要使用不同的方法。 - Jodrell
1
这就是我所说的做出选择的意思:最小/最大批量大小和最小/最大等待时间。 - H H
3个回答

6
一种解决方案是使用来自System.Threading.Tasks.Dataflow(已包含在.net core 3+中)的BufferBlock<T>。它不使用GetConsumingEnumerable(),但仍然允许您具有相同的实用性,主要包括:
  • 允许使用多个(对称和/或非对称)消费者和生产者进行并行处理
  • 线程安全(允许上述操作)-无需担心竞争条件
  • 可以通过取消标记和/或集合完成进行取消
  • 消费者会阻塞,直到数据可用,避免了在轮询上浪费CPU周期
还有一个BatchBlock<T>,但它会限制您的批量大小。
var buffer = new BufferBlock<Item>();
while (await buffer.OutputAvailableAsync())
{
    if (buffer.TryReceiveAll(out var items))
        //process items
}

以下是工作示例,演示了以下内容:
  • 多个对称消费者并行处理可变长度批次
  • 多个对称生产者(在这个例子中不是真正的并行操作)
  • 当生产者完成时完成收集
  • 为了让示例简短,我没有演示使用CancellationToken的情况
  • 等待生产者和/或消费者完成的能力
  • 能够从不允许异步的区域调用,例如构造函数
  • Thread.Sleep()调用不是必需的,但有助于模拟在更费力的场景中发生的一些处理时间
  • Task.WaitAll()和Thread.Sleep()都可以选择转换为它们的异步等效
  • 不需要使用任何外部库
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

static class Program
{
    static void Main()
    {
        var buffer = new BufferBlock<string>();

        // Kick off consumer task(s)
        List<Task> consumers = new List<Task>();
        for (int i = 0; i < 3; i++)
        {
            consumers.Add(Task.Factory.StartNew(async () =>
            {
                // need to copy this due to lambda variable capture
                var num = i; 
                while (await buffer.OutputAvailableAsync())
                {
                    if (buffer.TryReceiveAll(out var items))
                        Console.WriteLine($"Consumer {num}:    " + 
                            items.Aggregate((a, b) => a + ", " + b));

                        // real life processing would take some time
                        await Task.Delay(500); 
                }

                Console.WriteLine($"Consumer {num} complete");
            }));

            // give consumer tasks time to activate for a better demo
            Thread.Sleep(100); 
        }

        // Kick off producer task(s)
        List<Task> producers = new List<Task>();
        for (int i = 0; i < 3; i++)
        {
            producers.Add(Task.Factory.StartNew(() =>
            {
                for (int j = 0 + (1000 * i); j < 500 + (1000 * i); j++)
                    buffer.Post(j.ToString());
            }));

            // space out the producers for a better demo
            Thread.Sleep(10); 
        }

        // may also use the async equivalent
        Task.WaitAll(producers.ToArray());
        Console.WriteLine("Finished waiting on producers");

        // demo being able to complete the collection
        buffer.Complete(); 

        // may also use the async equivalent
        Task.WaitAll(consumers.ToArray()); 
        Console.WriteLine("Finished waiting on consumers");

        Console.ReadLine();
    }
}

这是代码的现代化和简化版本。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    private static async Task Main()
    {
        var buffer = new BufferBlock<string>();

        // Kick off consumer task(s)
        var consumers = new List<Task>();
        for (var i = 0; i < 3; i++)
        {
            var id = i;
            consumers.Add(Task.Run(() => StartConsumer(id, buffer)));

            // give consumer tasks time to activate for a better demo
            await Task.Delay(100);
        }

        // Kick off producer task(s)
        var producers = new List<Task>();
        for (var i = 0; i < 3; i++)
        {
            var pid = i;
            producers.Add(Task.Run(() => StartProducer(pid, buffer)));

            // space out the producers for a better demo
            await Task.Delay(10);
        }

        // may also use the async equivalent
        await Task.WhenAll(producers);
        Console.WriteLine("Finished waiting on producers");

        // demo being able to complete the collection
        buffer.Complete();

        // may also use the async equivalent
        await Task.WhenAll(consumers);
        Console.WriteLine("Finished waiting on consumers");

        Console.ReadLine();
    }

    private static async Task StartConsumer(
            int id,
            IReceivableSourceBlock<string> buffer)
    {
        while (await buffer.OutputAvailableAsync())
        {
            if (buffer.TryReceiveAll(out var items))
            {
                Console.WriteLine($"Consumer {id}: " + 
                    items.Aggregate((a, b) => a + ", " + b));
            }

            // real life processing would take some time
            await Task.Delay(500);
        }

        Console.WriteLine($"Consumer {id} complete");
    }

    private static Task StartProducer(int pid, ITargetBlock<string> buffer)
    {
        for (var j = 0 + (1000 * pid); j < 500 + (1000 * pid); j++)
        {
            buffer.Post(j.ToString());
        }

        return Task.CompletedTask;
    }
}

示例的样式和质量可以改进,例如使其适合屏幕并使用await Task.Delay而不是Thread.Sleep,我认为还有其他改进。 - Jodrell
感谢您的反馈。我已经重新格式化以消除水平滚动条。我曾考虑使用await Task.Delay()代替Thread.Sleep(),但我想展示这段代码可以从构造函数或其他不允许异步的地方创建。您觉得呢? - vlee
我认为你应该使用 async Main,并且不应该在构造函数中使用 sleep 或 delay。 - Jodrell
我禁不住调整了你的代码,请查看编辑后的答案。 - Jodrell
Sleep()/Delay()函数是为了模拟现实生活中更加密集的处理。因此,我想在示例中包含它(尽管在生产中应该用真正的代码替换它)。同时,我不想让任何人误以为异步是运行此代码所必需的。感谢您发布异步版本的代码! - vlee

2
虽然在某些方面不如ConcurrentQueue<T>,但我的LLQueue<T>允许使用AtomicDequeueAll方法进行批量出列,该方法可在单个(原子和线程安全的)操作中从队列中取出所有当前项,并将其放入一个非线程安全集合以供单个线程使用。该方法是专为需要批量读取操作的情况而设计的。
虽然这不是阻塞的,但它可以很容易地用来创建一个阻塞集合:
public BlockingBatchedQueue<T>
{
  private readonly AutoResetEvent _are = new AutoResetEvent(false);
  private readonly LLQueue<T> _store;
  public void Add(T item)
  {
    _store.Enqueue(item);
    _are.Set();
  }
  public IEnumerable<T> Take()
  {
    _are.WaitOne();
    return _store.AtomicDequeueAll();
  }
  public bool TryTake(out IEnumerable<T> items, int millisecTimeout)
  {
    if(_are.WaitOne(millisecTimeout))
    {
      items = _store.AtomicDequeueAll();
      return true;
    }
    items = null;
    return false;
  }
}

以下是一个起点,并不包含以下内容:

  1. 处理处置时待处理的读取器。
  2. 担心多个读取器可能会因为一次写入而同时触发的潜在竞争(它只认为偶尔出现的空结果可枚举是可以接受的)。
  3. 对写入设置任何上限。

这些都可以添加,但我想保持最少的一些实际用途,希望在以上定义的限制条件内没有漏洞。


我在想,如果你创建了一个实现IProducerConsumer<IEnumerable<T>>BatchedQueue<T>,或者LLQueue<T>实现了它,那么我就在想是不是我错过了什么,还是框架本身存在问题。 - Jodrell
LLQueue<T> 实现了 IProducerConsumerCollection<T> 接口,但是直接在阻塞集合中使用它意味着我们失去了对 AtomicDequeueAll() 的访问,而这是它在这里比其他任何东西都更具优势的唯一方法。你可以尝试将 LLQueue<T> 包装在一个实现了 IProducerConsumer<IEnumerable<T>> 接口的类中,通过调用 EnqueueRange()AtomicDequeueAll() 来实现,但是 EnqueueRange() 不是原子操作,因此可能会出现等待读取时实际上有项目可供读取的情况,因为阻塞集合没有意识到这一点。 - Jon Hanna
“TryAdd” 需要对整个 “IEnumerable<T>” 原子化。 - Jodrell
没错。EnqueueRange() 只是在重复调用 Enqueue() 上的一种便利方式。如果您不介意它与其他 Enqueue() 交错使用,那么这很好,但如果我们将其包装在某个东西中并且该东西希望它是原子性的,则无用。话虽如此,我想 BlockingCollection 不允许读取,直到写入返回,因此在这方面它可能实际上是可以的。但是,我真的不喜欢将预期行为拉伸得太多的想法。 - Jon Hanna
实际上,我看了一下它,发现我忘记了我的代码:EnqueueRange()目前是原子的,但我刚刚决定不保证它,因为我不确定这是否是最好的方法,所以我想保留更改的权利。使用当前的代码,您绝对可以创建一个既能够添加又能够删除的“BlockingCollection<IEnumerable<T>>”原子集合。 - Jon Hanna

0

不,没有更好的方法。你的方法基本上是正确的。

为了使用方便,你可以将“分批处理”的功能包装在一个扩展方法中。下面的实现在整个枚举过程中使用相同的 List<T> 作为缓冲区,以防止在每次迭代时分配新的缓冲区。它还包括一个 maxSize 参数,允许限制发出的批次大小:

/// <summary>
/// Consumes the items in the collection in batches. Each batch contains all
/// the items that are immediately available, up to a specified maximum number.
/// </summary>
public static IEnumerable<T[]> GetConsumingEnumerableBatch<T>(
    this BlockingCollection<T> source, int maxSize,
    CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    if (maxSize < 1) throw new ArgumentOutOfRangeException(nameof(maxSize));
    if (source.IsCompleted) yield break;
    var buffer = new List<T>();
    while (source.TryTake(out var item, Timeout.Infinite, cancellationToken))
    {
        Debug.Assert(buffer.Count == 0);
        buffer.Add(item);
        while (buffer.Count < maxSize && source.TryTake(out item))
            buffer.Add(item);
        T[] batch = buffer.ToArray();
        int batchSize = batch.Length;
        buffer.Clear();
        yield return batch;
        if (batchSize < buffer.Capacity >> 2)
            buffer.Capacity = buffer.Capacity >> 1; // Shrink oversized buffer
    }
}

使用示例:

foreach (Item[] batch in this.items.GetConsumingEnumerableBatch(Int32.MaxValue))
{
    // Process the batch
}

每当发出的批次小于缓冲区容量的四分之一时,缓冲区会缩小一半。这将使缓冲区保持在控制范围内,以防在枚举过程中某个时刻它变得过大。

if (source.IsCompleted) yield break 行的意图是复制内置的 GetConsumingEnumerable 方法的行为,当它提供了一个已取消的令牌,并且集合为空并已完成。

在取消的情况下,没有缓冲的消息会丢失。只有当 buffer 为空时才检查 cancellationToken

此答案的第一个版本中可以找到一个不带内存管理功能的更简单的实现。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接