使用TPL DataFlow的BlockingCollection<T>批处理

5

有没有一种方法可以批处理来自阻塞集合的一组项。 例如:

我有一个消息总线发布者调用blockingCollection.Add()

还有一个消费线程是这样创建的:

Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
                {
                    Console.WriteLine(value);
                }
        });

然而,我只希望当阻塞集合中有10个项目时才写入控制台,而GetConsumingEnumerable()在每个项目添加后总是触发。我可以自己编写队列来实现此功能,但如果可能的话,我想使用阻塞集合。


2
@HansPassant:但这将导致第11个项目及以后的项目不被处理,是吗? - Jon
2个回答

4
一个快速的解决方案可能是这样的:
public class ConsoleQueue
{
    private readonly List<string> _values = new List<string>();

    public void FlushQueueIfFull()
    {
        if (_values.Count < 10) return;
        foreach (var value in _values)
        {
            Console.WriteLine(value);
        }
        _values.Clear();
    }

    public void Push(string message)
    {
        _values.Add(message);
        FlushQueueIfFull();
    }
}

然后,您可以像这样使用它。
        var queue = new ConsoleQueue();

        Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
            {
                queue.Push(value);
            }
        });

您可以轻松扩展它以涵盖线程安全等内容


1
+1 - 另一个选择是使用DataFlow BatchBlock,它已经实现了线程安全的批处理。 - Dimitri
1
谢谢,这个完美地运作了。 - The Unculled Badger

4

不确定项目要求是什么,但我建议使用TPL DataFlow BatchBlock

您可以实例化一个BatchBlock<string>,将其绑定到一个ActionBlock<string>,然后将其发布到批处理块。

伪代码可能如下所示:

var bb = new BatchBlock<string>(10);
var ab = new ActionBlock<string[]>(msgArray=>{ 
    foreach(var msg in msgArray) 
        Console.Writeline(msg);
});

bb.LinkTo(ab);

foreach (string value in blockingCollection.GetConsumingEnumerable())
{
      bb.Post(value);
}

使用DataFlow甚至可以使用BufferBlock替换BlockingCollection,或者直接将数据发布到缓冲块中而不必先添加到阻塞集合中,因为批处理块已经是线程安全的。


谢谢Dmitri。我在bb.LinkTo(ab)这里遇到了类型转换问题。无法将BatchBlock<string>转换为ISourceBlock<string>... - The Unculled Badger
我的错。ActionBlock<string> 必须是 ActionBlock<string[]>,因为它从批处理块接收字符串数组。请查看更新的答案。 - Dimitri
1
这个被标记为正确的,因为我通过TPL DataFlow学到了一些新东西!谢谢Dmitri。 - The Unculled Badger
根据MSDN(https://msdn.microsoft.com/en-us/library/hh194745%28v=vs.110%29.aspx)的说明,`BatchBlock`不是线程安全的(请参见链接页面底部)。 - Coder1095

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接