.NET 4.0 中的并发优先队列

34

看起来.NET 4.0 在并发方面有很多改进,可能需要使用并发优先队列。在框架内是否有可重用的体面优先队列实现?


请查看:http://msdn.microsoft.com/zh-cn/library/system.collections.concurrent.aspx - Mitch Wheat
2
@Mitch:不幸的是,我正在寻找_priority_队列实现(http://en.wikipedia.org/wiki/Priority_queue)。普通的FIFO队列无法满足我的需求。 - user166010
1
你可能需要自己编写代码。一个相对简单的方法是使用一个正常队列数组,优先级递减。 - Steven Sudit
我已经将我的评论改为了答案。 - Steven Sudit
相关:https://dev59.com/-nVD5IYBdhLWcg3wE3Jo - Mechanical snail
显示剩余2条评论
9个回答

21

2
该实现似乎存在一个错误。如果您将其包装在BlockingCollection中并调用Add 5次,则会将项目放入公开集合中的错误顺序。如果您深入挖掘blockingConcurrentPriorityQueue的私有成员,您会发现底层CPQ本身包含正确顺序的正确数据。但是公开集合是无序的(CPQ包含0,1,2,3,4; 公开集合包含0,4,3,2,1)。因此,请勿将此版本用作Blocking Collection的一部分。 - Joe
1
但是要用哪一个呢?你知道有更好的选择吗? - ironic
@Joe:我也尝试将它封装到BlockingCollection中,但我无法重现您的问题......因此如果您有更多关于如何重现该问题的详细信息-我将不胜感激! - Timur Sadykov

7
您可能需要自行编写代码。相对较简单的方法是创建一个正则队列数组,优先级递减。

基本上,您将插入适当优先级的队列中。然后,在消费者端,您将按优先级从高到低的顺序遍历列表,检查队列是否非空,如果是,则消耗一个条目。


1
+1,我忘了提到如何实现,但我认为它非常直观 :) 没有这个,我的答案就不完整。 - Nick Martyshchenko
1
这不是一个很好的实现。可以通过在插入任何队列后发出信号的ManualResetEvent来改进它,以便消费者可以轮询一次,然后等待信号触发。在实践中,最好等待有限而相对较短(可能为四分之一秒),以避免错过信号的情况。 - Steven Sudit
Paw的实现锁定不必要,但总体结构是一个很好的起点。 - Steven Sudit

5
也许您可以使用我自己实现的PriorityQueue。它实现了比通常的推入/弹出/查看更多的功能,这些功能是我在需要时实现的。它还具有用于并发的锁定。
非常感谢对代码的评论 :)
public class PriorityQueue<T> where T : class
{
    private readonly object lockObject = new object();
    private readonly SortedList<int, Queue<T>> list = new SortedList<int, Queue<T>>();

    public int Count
    {
        get
        {
            lock (this.lockObject)
            {
                return list.Sum(keyValuePair => keyValuePair.Value.Count);
            }
        }
    }

    public void Push(int priority, T item)
    {
        lock (this.lockObject)
        {
            if (!this.list.ContainsKey(priority))
                this.list.Add(priority, new Queue<T>());
            this.list[priority].Enqueue(item);
        }
    }
    public T Pop()
    {
        lock (this.lockObject)
        {
            if (this.list.Count > 0)
            {
                T obj = this.list.First().Value.Dequeue();
                if (this.list.First().Value.Count == 0)
                    this.list.Remove(this.list.First().Key);
                return obj;
            }
        }
        return null;
    }
    public T PopPriority(int priority)
    {
        lock (this.lockObject)
        {
            if (this.list.ContainsKey(priority))
            {
                T obj = this.list[priority].Dequeue();
                if (this.list[priority].Count == 0)
                    this.list.Remove(priority);
                return obj;
            }
        }
        return null;
    }
    public IEnumerable<T> PopAllPriority(int priority)
    {
        List<T> ret = new List<T>();
        lock(this.lockObject)
        {
            if (this.list.ContainsKey(priority))
            {
                while(this.list.ContainsKey(priority) && this.list[priority].Count > 0)
                    ret.Add(PopPriority(priority));
                return ret;
            }
        }
        return ret;
    }
    public T Peek()
    {
        lock (this.lockObject)
        {
            if (this.list.Count > 0)
                return this.list.First().Value.Peek();
        }
        return null;
    }
    public IEnumerable<T> PeekAll()
    {
        List<T> ret = new List<T>();
        lock (this.lockObject)
        {
            foreach (KeyValuePair<int, Queue<T>> keyValuePair in list)
                ret.AddRange(keyValuePair.Value.AsEnumerable());
        }
        return ret;
    }
    public IEnumerable<T> PopAll()
    {
        List<T> ret = new List<T>();
        lock (this.lockObject)
        {
            while (this.list.Count > 0)
                ret.Add(Pop());
        }
        return ret;
    }
}

2
除了不遵循.NET约定外,它看起来是正确的但速度较慢。 这种缓慢是由于锁住所有内容,而 .NET 4.0 并发队列是无锁的。请参见:http://www.codethinked.com/post/2010/02/04/NET-40-and-System_Collections_Concurrent_ConcurrentQueue.aspx - Steven Sudit
无论如何,总体结构加一分。 - Steven Sudit
你所说的不遵循.NET约定是什么意思? - Paw Baltzersen
我在考虑将 SortedList 中的 Queue 改为 ConcurrentQueue,然后只在向 SortedList 添加或删除新项时进行锁定,而不是在处理 Queues 时进行锁定。但是,当检查给定优先级是否存在于 SortedList 中时,仍然需要进行锁定,因此这并没有太大帮助。 - Paw Baltzersen
这个实现怎么样 https://dev59.com/-nVD5IYBdhLWcg3wE3Jo#4994931?哪个更好? - Kiquenet
@Kiquenet 取决于您是否需要我解决方案提供的通用功能。 - Paw Baltzersen

3

好的,已经过去了7年,但为了后人,我想用我的实现来回答。

文档:可选等待简单易用的并发优先队列

源代码:Github

NuGet包

  • 无锁,
  • 高度并发,
  • 存储项类型通用,
  • 优先级类型通用,但受限于由.NET枚举表示的优先级,强类型优先级,
  • 在构建期间明确定义优先级的降序,
  • 能够检测项数和每个优先级级别的项数,
  • 能够按优先级降序出队,
  • 能够覆盖出队优先级级别,
  • 可能可等待,
  • 可能基于优先级可等待,

2
最好链接到具有优先队列的库,而不是链接到包含可下载zip文件的CodeProject文章,该文件包含对包含优先队列的nuget包的引用。或者您可以添加一个抽象级别,并通过bit.ly或goo.gl进一步混淆它。 - Lasse V. Karlsen
这篇文章解释了一切。它展示了如何使用它以及从github/nuget下载它。我并不强迫任何人从文章中下载zip文件。我只是觉得在文章中提供简单的例子很好。在文章的结尾,有指向github源代码和nuget的链接。我在github文档方面有些落后,明年我会继续努力。 - ipavlu

1

1

由于所有当前的答案都已过时或没有可行的解决方案,因此有一个 MSDN上的实现是可用的。请注意,此实现中较低优先级的内容会被首先处理。


1
该实现似乎存在严重问题,请参见https://dev59.com/P5nga4cB1Zd3GeqPb6kY。 - Douglas

0
选项:
1)如果您的队列永远不会变得很大,请使用堆并为每个插入和删除锁定整个结构。
2)如果您的队列将变得很大,可以使用此类算法:

http://www.research.ibm.com/people/m/michael/ipl-1996.pdf

这个算法允许多个线程同时使用堆结构,而不会因为一次锁定整个堆而导致损坏或死锁,因为它支持对树的部分进行细粒度锁定。您需要进行基准测试,以确定额外锁定和解锁操作的开销是否比锁定整个堆产生的争用更大。

3)如果您想完全避免锁定,另一个算法(在上面的链接中提到)建议使用请求的FIFO队列(可以轻松实现且无需锁定),以及一个单独的线程,该线程是唯一接触堆的东西。您需要测量以查看使用同步对象在线程之间切换焦点的开销与纯直接锁定的开销相比,哪种方式的开销更大。

在开始之前,值得一试的是,先看看使用锁定的简单实现会有多糟糕。它可能不是最有效的实现,但如果它仍然比您所需的执行速度快几个数量级,那么维护的便利性(即任何人,包括您自己一年后,都能够简单地查看代码并理解其作用)可能会超过在排队机制中花费的微小CPU时间。

希望这可以帮助您 :-)


稍微解释一下——值得注意的是,算法中锁操作的数量与树的高度成正比,即O(lg n)。因此,每次您想要添加一个锁定/解锁操作,都需要将您排队的项目数量翻倍 - Jonathan Gilbert
另外,为了澄清第3点,我没有直接阅读引用的论文,但我认为其想法是,向想要排队或出队项目的消费者呈现的前端基本上是实际请求运行的线程的适配器。在任一情况下,使用类似ManualResetEvent的请求结构构建并放入FIFO中。然后等待事件。请求处理器线程拾取它,执行工作,并在继续之前设置事件。这样可以序列化对堆的所有访问而不使用锁定,但阻塞操作和线程切换可能同样糟糕。 - Jonathan Gilbert
值得注意的是,只有消费者需要等待他们的请求,而生产者可以“发射并忘记”。 :-) - Jonathan Gilbert

0
最近,我正在创建一个状态机,其中需要时间戳事件。与仅仅是简单的时钟滴答声不同,我需要带有自己ID的定时事件,以便我可以区分一个事件和另一个事件。
研究这个问题让我想到使用优先队列的想法。我可以按任何顺序将定时事件及其信息入队,优先队列会正确地排序事件。计时器会定期检查优先队列,以查看是否到了队列头部的事件触发时间。如果是,它将出队事件并调用与之关联的委托。这种方法正是我所需要的。
在CodeProject上搜索。

https://www.codeproject.com/Articles/13295/A-Priority-Queue-in-C

我发现已经有一个优先队列[^]类被写出来了。然而,我想到我可以很容易地使用我的老朋友跳表来编写自己的优先队列。这样做的好处是出队操作只需要O(1)的时间,而平均入队操作仍然是log(n)。我认为以这种方式使用跳表是足够新颖的,值得一篇独立的文章。

所以这就是它。我希望你觉得它有趣。


0

我找到了一个很好的并发优先队列的例子这里。希望它能对你有所帮助。

var priorityQueue = new ConcurrentPriorityQueue<TKey, TValue>();

在此队列的上下文中,TKey 可以是 int 值或任何实现 IComparable 接口的其他对象。
要消费这样的队列,您可以执行以下操作:
var priorityQueue = new ConcurrentPriorityQueue<int, object>(); 

// Add elements
priorityQueue.Enqueue(2, elementP2); 
priorityQueue.Enqueue(1, elementP1);

// Here you will receive elementP1
bool result = priorityQueue.TryDequeue(out KeyValuePair<int, object> element);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接