任务并行库中的优先级队列

7

是否有将任务以不同优先级添加到TPL运行时的先前工作?

如果没有,一般来说,我应该如何实现这个功能?

理想情况下,我计划使用生产者-消费者模式将“待办”工作添加到TPL中。有时,我可能会发现需要将低优先级的工作升级为高优先级的工作(相对于其他工作)。

如果有人有一些搜索关键字,可以在评论中提出,因为我还没有找到能够满足我的需求的代码。


你想要只有低/高优先级吗?还是你想要一个固定数量的优先级,例如最高、高、普通、低、最低?或者你想要能够为每个任务分配1到100之间或1到非常大的数字之间的优先级?第一种情况很容易实现,但第二种情况就不那么简单了。 - Servy
如果您找不到先前的工作,这里有一个实现自定义块的指南:http://blogs.msdn.com/b/pfxteam/archive/2011/12/05/10244302.aspx - Eric J.
@EricJ。我不明白自定义数据流块与问题有什么关系。 - svick
据我所知,数据流是TPL的一个子集,不适用于一般的TPL使用。了解它对其他工作很有好处。 - makerofthings7
相关:具有优先级的并发集合 - Theodor Zoulias
2个回答

4
这里提供了一个相对简单的并发实现,基于一个相对简单的优先队列。其思路是,创建一个排序集合,其中包含了实际项目和优先级的一组对,但给定了一个只比较优先级的比较器。构造函数接受一个计算给定对象优先级的函数。
至于实际实现,它们没有高效地实现,我只是在所有操作周围加上了“锁”。创建更有效的实现将阻止使用SortedSet作为优先队列,并重新实现其中一个可以有效地同时访问的优先队列不会那么容易。
要更改项的优先级,您需要从集合中删除该项,然后再次添加。要查找项而不必迭代整个集合,您需要知道旧优先级以及新优先级。
public class ConcurrentPriorityQueue<T> : IProducerConsumerCollection<T>
{
    private object key = new object();
    private SortedSet<Tuple<T, int>> set;

    private Func<T, int> prioritySelector;

    public ConcurrentPriorityQueue(Func<T, int> prioritySelector, IComparer<T> comparer = null)
    {
        this.prioritySelector = prioritySelector;
        set = new SortedSet<Tuple<T, int>>(
            new MyComparer<T>(comparer ?? Comparer<T>.Default));
    }

    private class MyComparer<T> : IComparer<Tuple<T, int>>
    {
        private IComparer<T> comparer;
        public MyComparer(IComparer<T> comparer)
        {
            this.comparer = comparer;
        }
        public int Compare(Tuple<T, int> first, Tuple<T, int> second)
        {
            var returnValue = first.Item2.CompareTo(second.Item2);
            if (returnValue == 0)
                returnValue = comparer.Compare(first.Item1, second.Item1);
            return returnValue;
        }
    }

    public bool TryAdd(T item)
    {
        lock (key)
        {
            return set.Add(Tuple.Create(item, prioritySelector(item)));
        }
    }

    public bool TryTake(out T item)
    {
        lock (key)
        {
            if (set.Count > 0)
            {
                var first = set.First();
                item = first.Item1;
                return set.Remove(first);
            }
            else
            {
                item = default(T);
                return false;
            }
        }
    }

    public bool ChangePriority(T item, int oldPriority, int newPriority)
    {
        lock (key)
        {
            if (set.Remove(Tuple.Create(item, oldPriority)))
            {
                return set.Add(Tuple.Create(item, newPriority));
            }
            else
                return false;
        }
    }

    public bool ChangePriority(T item)
    {
        lock (key)
        {
            var result = set.FirstOrDefault(pair => object.Equals(pair.Item1, item));

            if (object.Equals(result.Item1, item))
            {
                return ChangePriority(item, result.Item2, prioritySelector(item));
            }
            else
            {
                return false;
            }
        }
    }

    public void CopyTo(T[] array, int index)
    {
        lock (key)
        {
            foreach (var item in set.Select(pair => pair.Item1))
            {
                array[index++] = item;
            }
        }
    }

    public T[] ToArray()
    {
        lock (key)
        {
            return set.Select(pair => pair.Item1).ToArray();
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        return ToArray().AsEnumerable().GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        lock (key)
        {
            foreach (var item in set.Select(pair => pair.Item1))
            {
                array.SetValue(item, index++);
            }
        }
    }

    public int Count
    {
        get { lock (key) { return set.Count; } }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public object SyncRoot
    {
        get { return key; }
    }
}

一旦你有了一个IProducerConsumerCollection<T>实例,该对象就可以在BlockingCollection<T>内部作为内部支持对象使用,以便拥有更易于使用的用户界面。


1
将此与BlockingCollection结合使用会提供一个奇怪的接口,我认为。您需要通过BlockingCollection进行添加和删除,但通过ConcurrentPriorityQueue更改优先级。我认为应该封装它。 - svick
@svick,理想情况下是这样的。你有没有看到其他解决问题的方法,除了基本上重新编写BlockingCollection但加入一些额外的方法? - Servy
不,我没有看到其他的方法,我认为如果你想这样做的话,用额外的封装方式是正确的方法。 - svick
1
此外,似乎当比较器认为两个项目相等时,SortedSet 将它们视为完全相等。这意味着您不能在队列中具有相同优先级的两个不同项。您可以通过首先按 Item2 排序,然后再按 Item1 排序(这需要 T 是可比较的)来解决这个问题。 - svick
我更喜欢使用 var first = set.Min 而不是 var first = set.First();Min 属性应该会稍微更有效率一些。 - Theodor Zoulias

3

ParallelExtensionsExtras包含多个自定义TaskScheduler,可以直接使用或作为您自己调度程序的基础,尤其是有两个调度程序可能对您有用:

  • QueuedTaskScheduler,它允许您以不同的优先级调度Task,但不允许更改已排队的Task的优先级。
  • ReprioritizableTaskScheduler,它没有不同的优先级,但允许您将特定的Task移动到队列的前面或后面。(虽然更改优先级的时间复杂度为当前等待的Task数量的O(n),如果同时有许多Task,这可能会成为一个问题。)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接