是否有将任务以不同优先级添加到TPL运行时的先前工作?
如果没有,一般来说,我应该如何实现这个功能?
理想情况下,我计划使用生产者-消费者模式将“待办”工作添加到TPL中。有时,我可能会发现需要将低优先级的工作升级为高优先级的工作(相对于其他工作)。
如果有人有一些搜索关键字,可以在评论中提出,因为我还没有找到能够满足我的需求的代码。
是否有将任务以不同优先级添加到TPL运行时的先前工作?
如果没有,一般来说,我应该如何实现这个功能?
理想情况下,我计划使用生产者-消费者模式将“待办”工作添加到TPL中。有时,我可能会发现需要将低优先级的工作升级为高优先级的工作(相对于其他工作)。
如果有人有一些搜索关键字,可以在评论中提出,因为我还没有找到能够满足我的需求的代码。
public class ConcurrentPriorityQueue<T> : IProducerConsumerCollection<T>
{
private object key = new object();
private SortedSet<Tuple<T, int>> set;
private Func<T, int> prioritySelector;
public ConcurrentPriorityQueue(Func<T, int> prioritySelector, IComparer<T> comparer = null)
{
this.prioritySelector = prioritySelector;
set = new SortedSet<Tuple<T, int>>(
new MyComparer<T>(comparer ?? Comparer<T>.Default));
}
private class MyComparer<T> : IComparer<Tuple<T, int>>
{
private IComparer<T> comparer;
public MyComparer(IComparer<T> comparer)
{
this.comparer = comparer;
}
public int Compare(Tuple<T, int> first, Tuple<T, int> second)
{
var returnValue = first.Item2.CompareTo(second.Item2);
if (returnValue == 0)
returnValue = comparer.Compare(first.Item1, second.Item1);
return returnValue;
}
}
public bool TryAdd(T item)
{
lock (key)
{
return set.Add(Tuple.Create(item, prioritySelector(item)));
}
}
public bool TryTake(out T item)
{
lock (key)
{
if (set.Count > 0)
{
var first = set.First();
item = first.Item1;
return set.Remove(first);
}
else
{
item = default(T);
return false;
}
}
}
public bool ChangePriority(T item, int oldPriority, int newPriority)
{
lock (key)
{
if (set.Remove(Tuple.Create(item, oldPriority)))
{
return set.Add(Tuple.Create(item, newPriority));
}
else
return false;
}
}
public bool ChangePriority(T item)
{
lock (key)
{
var result = set.FirstOrDefault(pair => object.Equals(pair.Item1, item));
if (object.Equals(result.Item1, item))
{
return ChangePriority(item, result.Item2, prioritySelector(item));
}
else
{
return false;
}
}
}
public void CopyTo(T[] array, int index)
{
lock (key)
{
foreach (var item in set.Select(pair => pair.Item1))
{
array[index++] = item;
}
}
}
public T[] ToArray()
{
lock (key)
{
return set.Select(pair => pair.Item1).ToArray();
}
}
public IEnumerator<T> GetEnumerator()
{
return ToArray().AsEnumerable().GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public void CopyTo(Array array, int index)
{
lock (key)
{
foreach (var item in set.Select(pair => pair.Item1))
{
array.SetValue(item, index++);
}
}
}
public int Count
{
get { lock (key) { return set.Count; } }
}
public bool IsSynchronized
{
get { return true; }
}
public object SyncRoot
{
get { return key; }
}
}
一旦你有了一个IProducerConsumerCollection<T>
实例,该对象就可以在BlockingCollection<T>
内部作为内部支持对象使用,以便拥有更易于使用的用户界面。
BlockingCollection
结合使用会提供一个奇怪的接口,我认为。您需要通过BlockingCollection
进行添加和删除,但通过ConcurrentPriorityQueue
更改优先级。我认为应该封装它。 - svickBlockingCollection
但加入一些额外的方法? - ServySortedSet
将它们视为完全相等。这意味着您不能在队列中具有相同优先级的两个不同项。您可以通过首先按 Item2
排序,然后再按 Item1
排序(这需要 T
是可比较的)来解决这个问题。 - svickParallelExtensionsExtras包含多个自定义TaskScheduler
,可以直接使用或作为您自己调度程序的基础,尤其是有两个调度程序可能对您有用:
QueuedTaskScheduler
,它允许您以不同的优先级调度Task
,但不允许更改已排队的Task
的优先级。ReprioritizableTaskScheduler
,它没有不同的优先级,但允许您将特定的Task
移动到队列的前面或后面。(虽然更改优先级的时间复杂度为当前等待的Task
数量的O(n),如果同时有许多Task
,这可能会成为一个问题。)