有一个固定大小的任务队列。线程想要将任务加入队列。如果队列已满,它们应该等待。线程的顺序应该保持不变:如果线程1带着任务1,之后线程2带着任务2,那么任务1应该在任务2之前进入队列。
其他线程想要出队任务并执行它。如果队列为空,它们应该等待,而且它们的顺序也应该保持不变:如果t3在t4之前到达,则t3应该在t4之前将任务加入队列。
如何实现这个功能(使用伪代码)?
System.Collections.Concurrent
,其中的类运行得非常好 - 我无法从它们那里获得任何错误。基于Jeffrey Richter's book信息的解决方案:
基本代码(C#):
internal sealed class SynchronizedQueue<T> {
private readonly Object m_lock = new Object();
private readonly Queue<T> m_queue = new Queue<T>();
public void Enqueue(T item) {
Monitor.Enter(m_lock);
// After enqueuing an item, wake up any/all waiters
m_queue.Enqueue(item);
Monitor.PulseAll(m_lock);
Monitor.Exit(m_lock);
}
public T Dequeue() {
Monitor.Enter(m_lock);
// Loop while the queue is empty (the condition)
while (m_queue.Count == 0)
Monitor.Wait(m_lock);
// Dequeue an item from the queue and return it for processing
T item = m_queue.Dequeue();
Monitor.Exit(m_lock);
return item;
}
}
这个类是线程安全的,但仍然没有检查顺序 - 这里有很多实现方式。来自同一本书:
ConcurrentQueue
和ConcurrentStack
是无锁的;它们都在内部使用Interlocked
方法来操作集合。
因此,您必须删除Monitor
类的使用,并为您的线程提供检查以成为下一个要排队项目的方法。
这可以通过在私有字段中维护当前添加者和当前队列长度的数量来完成。您应该使这些字段volatile。
您应该使用Interlocked.Exchange来获取您的当前添加者,并使用Interlocked.Read来获取您的当前队列长度。
之后,您就有了一个唯一的数字用于您的线程 - 当前长度+当前添加者。使用SpinWait类在当前长度不等于您的数字时旋转,之后排队项目并离开Enqueue方法。
为了同步访问有限数量的资源,通常使用信号量。可以搜索谷歌来获取更多信息。
困难的部分是要保持阻塞线程的顺序。
我发现这个项目包含一个 C# 的 FifoSemaphore
:http://dcutilities.codeplex.com
numEmptySpaces
信号量以访问队列,由于信号量等待队列不可能采用除FIFO之外的任何实现方式,因此这种行为可能会发生,但在大多数信号量实现中并不保证。我不确定标准的BlockingCollection
是否可以用于此操作。您可能能够通过序列号对条目进行“OrderBy”排序,但我不确定此操作是否会锁定队列 - 它应该会,但是...此外,序列号必须添加为BlockingCollection后代中的私有成员,并将原子递增结果作为每个任务的状态维护 - 您必须将其添加到Task
成员中。
我会尝试构建自己的BlockingQueue
类,其中包含一个“正常”的队列、一些信号量和互斥量来实现此操作,在获取numEmptySpaces单元和队列互斥量后,按序列号顺序将新任务插入队列中。然后可以将原子递增结果组装成堆栈/自动变量。
这可能是一个面试问题的合理选择,但我必须受到解雇的威胁才会在生产代码中实际实现它。很难想象什么情况下它可能是必要的。在我所能想到的所有情况中,额外开销和争用的缺点都超过了可疑的优势。
我对试图在出队/执行端显式维护任何排序方式也有类似的保留意见。试图确保以序列号顺序到达出队任务中的某个“检查点”将会很混乱。这将需要任务的合作,它需要一个私有同步对象成员来在到达其检查点时发出信号。千万不要尝试这样做 :)