有没有一种.NET队列类可以允许一次性出队多个项目?

19

我相信一个很常见的情况是有一个待处理的项目队列,需要每次处理N个。

例如...如果我们有23个项目,并且应该每次处理10个,那么就像这样:

Process batch of 10
Process batch of 10
Process batch of 3
我可以用多种方式解决这个问题。我的问题是:.NET框架是否提供了任何专门解决这种情况的类? Queue类非常完美,但它不允许一次出队多个项目。

2
你不能只是在循环中出队10个项目,然后处理它们吗?我是否从你的问题中漏掉了什么,使这种方法不可行? - xxbbcc
1
为什么不直接将完整的10个批次加入队列? - Random Dev
你可以构建一个扩展方法,按块出队并返回一个IEnumerable的出队对象。 - Andrew Whitaker
这些可以被无序或混合地排队吗?有些排队系统具有会话的概念(Service Broker),其中消息可以被逻辑上分组成一个对话,因此它们可以由单个读取器处理,而其他消息可以由其各自的读取器处理。 - Matthew Whited
4个回答

28

你可以在 Queue<T> 上创建一个扩展方法:

public static class QueueExtensions
{
    public static IEnumerable<T> DequeueChunk<T>(this Queue<T> queue, int chunkSize) 
    {
        for (int i = 0; i < chunkSize && queue.Count > 0; i++)
        {
            yield return queue.Dequeue();
        }
    }
}

用法:

var q = new Queue<char>();
q.DequeueChunk(10) // first 10 items
q.DequeueChunk(10) // next 10 items

示例:https://dotnetfiddle.net/OTcIZX


小心。我不相信return yield是线程安全的。我建议在处理出列时更加明确。public static IEnumerable<T> DequeueChunk<T>(this Queue<T> queue, int chunkSize) { var result = new List<T>(); for (var i = 0; i < chunkSize && queue.Count > 0; i++) { result.Add(queue.Dequeue()); } return result; } - Simon K

6

使用Linq,可以在.NET中通过使用Enumerable.Range() 方法和Select()扩展方法来实现此目标:

var chunk = Enumerable.Range(0, chuckCount).Select(i => queue.Dequeue()).ToList();

这是通过生成一个整数的枚举器,然后对新枚举器中的每个整数出队列一个项目实现的。通过调用ToList()确保立即执行该操作。


3
如果队列小于chunkCount,这将会崩溃,因此您需要防范这种情况。 - Lodewijk

1
TPL Dataflow库提供BatchBlock < T > ,它将输入消息序列分组为所需大小的块。
 var bb = new BatchBlock<int>(10);
 var ab = new ActionBlock<int[]>((Action<int[]>)chunk=>HandleChunk(chunk));  

 bb.LinkTo(ab, new DataflowLinkOptions(){PropogateCompletion = true});

 for(int i = 0; i < 23; ++i)
 {
     bb.Post(i);
 }

 bb.Complete();
 ab.Completion.Wait();

感谢您的回答。只是为了明确:当您向“BatchBlock”发布时,您正在分配要处理的数据。但每次只会处理10个。因此,可能“Post”将阻塞线程,直到某些项目返回。对吗?还是并行的?它可能是并行的,因为它来自TPL库。 - Andre Pena
@andrerpena 这是正确的。默认情况下,ActionBlock按顺序处理消息。您可以通过在选项中指定MaxDegreeOfParallelism来更改此行为。 - alexm
我明白了,谢谢。所以,我假设chunk=>HandleChunck(chunk)这个操作将会对每10个项目执行一次(chunk将包含10个项目)。但是,它如何知道应该处理最后3个项目,即使我们没有达到30个项目?这是因为Complete方法吗?只有在调用Complete时,进程才会开始吗? - Andre Pena
@andrerpena:小小的澄清:BatchBlock.Post() 不是阻塞的。它会维护一个内部缓冲区,直到消息被分组成块并被目标块(ActionBlock)消耗。 - alexm
那么,Complete() 方法会启动“消耗”吗? - Andre Pena
显示剩余2条评论

-1

我可能只会使用一个简化版本,从队列中出列并在特定时间间隔内使用计时器或系统中可用的任何东西执行操作。

所以经过10秒钟,如果有10个项目,则出列并处理;或者如果有100个,则同样如此。所有这些都取决于负载、正在完成的工作等因素,以及您尝试实现的延迟和响应的期望等等......

我认为您使用队列是为了不需要立即结果,而是需要在较长时间内执行的内容,例如数据聚合。

然后这就没有严格的SLA窗口。这可能需要一些基准测试和测试才能理解负载是什么,以及首选时间间隔是多少。

    public static IEnumerable<T> DequeueAvailable<T>(this Queue<T> queue)
    {
        for (int i = 0; i < queue.Count; i++)
        {
            yield return queue.Dequeue();
        }
    }

1
这种方法按照原文的写法是行不通的。每次迭代时,queue.Count都会减少,因此你只会出队列大约一半的项目(即当i超过queue.Count时)。循环应该改为while (queue.Count > 0) - nollidge

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接