生产者/消费者模式与批处理生产者

5
我正在尝试实现一个相当简单的多生产者单消费者应用程序。经过调查,我找到了BlockingCollection<T>这个有用的类,它使我能够实现下面的长时间运行的消费者任务。
var c1 = Task.Factory.StartNew(() =>
{
    var buffer = new List<int>(BATCH_BUFFER_SIZE);

    foreach (var value in blockingCollection.GetConsumingEnumerable())
    {
        buffer.Add(value);
        if (buffer.Count == BATCH_BUFFER_SIZE)
        {
            ProcessItems(buffer);
            buffer.Clear();
        }
    }
});
ProcessItems 函数将缓冲区提交到数据库,并且以批处理的方式进行操作。然而,这种解决方案并不是最优的。在繁忙生产期间,可能需要一段时间才能填满缓冲区,这意味着数据库已经过时。
更理想的解决方案是使用30秒计时器运行任务,或者使用超时终止 foreach
我尝试了计时器的想法,并得出了以下结论:
syncTimer = new Timer(new TimerCallback(TimerElapsed), blockingCollection, 5000, 5000);

private static void TimerElapsed(object state)
{
    var buffer = new List<int>();
    var collection = ((BlockingCollection<int>)state).GetConsumingEnumerable();

    foreach (var value in collection)
    {
        buffer.Add(value);
    }

    ProcessItems(buffer);
    buffer.Clear();
}

这里有一个明显的问题,即 foreach 将会被阻塞直到结束,这违背了计时器的初衷。

有没有人能提供一个方向?我基本上需要定期快照 BlockingCollection 并处理其中的内容以清除它。也许 BlockingCollection 不是正确的类型?

1个回答

6

不要在定时器回调中使用GetConsumingEnumerable,而是使用以下方法之一将结果添加到列表中,直到它返回false或达到满意的批处理大小。

BlockingCollection.TryTake Method (T) - 可能是您需要的,您不希望执行进一步的等待。

BlockingCollection.TryTake Method (T, Int32)

BlockingCollection.TryTake Method (T, TimeSpan)

您可以轻松地将其提取为扩展(未经测试):

public static IList<T> Flush<T>
(this BlockingCollection<T> collection, int maxSize = int.MaxValue)
{
     // Argument checking.

     T next;
     var result = new List<T>();

     while(result.Count < maxSize && collection.TryTake(out next))
     {
         result.Add(next);
     }

     return result;
}

我选择了这个实现,它完美地工作。它每x秒运行一次,并且只推送可用的内容(无需等待)。感谢您的帮助。https://gist.github.com/AntSwift/d2faa4f43d1a6d594172 - Ant Swift
现在可以在 https://github.com/AntSwift/AS.BlockingCollectionDemo 上获取完整的可工作版本。 - Ant Swift
我认为你的代码示例可能会受到竞态条件的影响,因为计时器机制存在。如果你的消费者无法在计时器周期内完成任务,计时器将会触发并产生另一个消费者。 - Andrew Harry
2
@Andrew:这取决于所使用的计时器实现,这超出了本答案的范围,本回答仅涉及阻塞集合本身。 - Ani

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接