使用并发功能使队列 Queue<T> 可用

10

我之前有一个相关的问题,我提供了解决方案;但是,因为我使用的是.Net 3.5版本,无法使用ConcurrentQueue<T>。我需要Queue<T>来允许并发操作。我阅读了这个问题,如果队列中没有任何项,而线程方法尝试出队列,就会遇到问题。

现在我的任务是确定是否可以派生自己的并发队列类。这是我想到的:

public sealed class ConcurrentQueue : Queue<DataTable>
{
    public event EventHandler<TableQueuedEventArgs> TableQueued;
    private ICollection que;

    new public void Enqueue(DataTable Table)
    {
        lock (que.SyncRoot)
        {
            base.Enqueue(Table);
        }

        OnTableQueued(new TableQueuedEventArgs(Dequeue()));
    }

    //  this is where I think I will have a problem...
    new public DataTable Dequeue()
    {
        DataTable table;

        lock (que.SyncRoot)
        {
            table = base.Dequeue();
        }

        return table;
    }

    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;

        if (handler != null)
        {
            handler(this, table);
        }
    }
}

当 DataTable 被排队时,EventArgs会将出列的表格传递给事件订阅者。这种实现是否会为我提供线程安全的队列?


3
"que" 完全没有用。你应该锁定一个 "readonly object key = new object();"。 - SLaks
@SLaks:我根据MSDN上的http://msdn.microsoft.com/en-us/library/bb344892.aspx实现了`ICollection quelock(que.SyncRoot)`。 - IAbstract
你根本不需要它。如果您有需要为同一集合锁定的不连续代码片段,则SyncRoot很有用。在您的情况下,quenull。您只需要在方法中锁定单个对象即可。 - SLaks
我看不出你在这里为什么要使用队列,因为它会立即在同一线程上出列。如果您想让工作线程处理出列,您可以使用生产者/消费者队列模式。在SO上有很多例子。此外,我假设只要您入列就会抛出空引用异常。 - dashton
5个回答

3

如果您需要使用new来隐藏基类中的方法,通常这意味着您应该使用组合而不是继承...

这里有一个简单的同步队列,它不使用继承,但仍然依赖于标准Queue<T>的行为:

public class ConcurrentQueue<T> : ICollection, IEnumerable<T>
{
    private readonly Queue<T> _queue;

    public ConcurrentQueue()
    {
        _queue = new Queue<T>();
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (SyncRoot)
        {
            foreach (var item in _queue)
            {
                yield return item;
            }
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        lock (SyncRoot)
        {
            ((ICollection)_queue).CopyTo(array, index);
        }
    }

    public int Count
    {
        get
        { 
            // Assumed to be atomic, so locking is unnecessary
            return _queue.Count;
        }
    }

    public object SyncRoot
    {
        get { return ((ICollection)_queue).SyncRoot; }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public void Enqueue(T item)
    {
        lock (SyncRoot)
        {
            _queue.Enqueue(item);
        }
    }

    public T Dequeue()
    {
        lock(SyncRoot)
        {
            return _queue.Dequeue();
        }
    }

    public T Peek()
    {
        lock (SyncRoot)
        {
            return _queue.Peek();
        }
    }

    public void Clear()
    {
        lock (SyncRoot)
        {
            _queue.Clear();
        }
    }
}

显式接口实现和将其转换为_queue有什么作用? - abatishchev
1
在迭代器中进行lock操作是很危险的。 - SLaks
1
@dboarman,只要您通过此类型的变量使用您的类,那就没问题。但是如果您通过类型为“Queue<T>”的变量使用它,您的“new”方法将不会被调用。 - Thomas Levesque
1
@ThomasпјҡеҸӘжңүеңЁдҪ и°ғз”Ёиҝӯд»ЈеҷЁзҡ„IEnumerator<T>зҡ„Dispose()ж–№жі•ж—¶пјҢй”ҒжүҚдјҡиў«йҮҠж”ҫгҖӮhttps://dev59.com/questions/kEzSa4cB1Zd3GeqPiwOw#2274773 - SLaks
@SLaks,这正是我想要的...但我意识到这可能会成为一个问题。另一方面,如果您在枚举期间不锁定,另一个线程可以修改集合,从而使枚举器无效。我不确定哪个更糟... - Thomas Levesque
显示剩余3条评论

3

很不幸,我受到(内部政策的)约束,只能使用 .Net 3.5 中使用的基本库 - 这基本上使所有开发人员都使用相同的库。如果我尝试使用TPLib,我会被标记为流氓。否则,这是一个不错的选择。 - IAbstract
3
那是一个“疯狂”的政策。重新发明轮子是最糟糕的生产力浪费之一,也是 bug 的主要来源。 - SLaks
4
这种政策通常是由对编程一无所知的人强制执行的... - Thomas Levesque
1
实际上,当 CTP 不再可用且反应式扩展不受支持时,该策略就变得有意义了。 - IAbstract

2

在最初的问题之后一段时间,我知道(这个问题出现在另一个问题右侧的“相关”中),但是在类似情况下,我采用了以下方法。虽然不太适合CPU缓存使用,但它简单、无锁、线程安全,如果操作之间经常存在大的间隔,那么CPU缓存使用可能并不重要,而且分配的接近性可能会减少影响:

internal sealed class LockFreeQueue<T>
{
  private sealed class Node
  {
    public readonly T Item;
    public Node Next;
    public Node(T item)
    {
      Item = item;
    }
  }
  private volatile Node _head;
  private volatile Node _tail;
  public LockFreeQueue()
  {
    _head = _tail = new Node(default(T));
  }
#pragma warning disable 420 // volatile semantics not lost as only by-ref calls are interlocked
  public void Enqueue(T item)
  {
    Node newNode = new Node(item);
    for(;;)
    {
      Node curTail = _tail;
      if (Interlocked.CompareExchange(ref curTail.Next, newNode, null) == null)   //append to the tail if it is indeed the tail.
      {
        Interlocked.CompareExchange(ref _tail, newNode, curTail);   //CAS in case we were assisted by an obstructed thread.
        return;
      }
      else
      {
        Interlocked.CompareExchange(ref _tail, curTail.Next, curTail);  //assist obstructing thread.
      }
    }
  }    
  public bool TryDequeue(out T item)
  {
    for(;;)
    {
      Node curHead = _head;
      Node curTail = _tail;
      Node curHeadNext = curHead.Next;
      if (curHead == curTail)
      {
        if (curHeadNext == null)
        {
          item = default(T);
          return false;
        }
        else
          Interlocked.CompareExchange(ref _tail, curHeadNext, curTail);   // assist obstructing thread
      }
      else
      {
        item = curHeadNext.Item;
        if (Interlocked.CompareExchange(ref _head, curHeadNext, curHead) == curHead)
        {
          return true;
        }
      }
    }
  }
#pragma warning restore 420
}

2

当你入队时,需要同时出队你的项目。
你需要使用参数来触发事件。

它是否真正线程安全取决于你如何使用它。
如果你检查Count或检查是否为空,它就不是线程安全的,也无法轻松地使其线程安全。
如果你不这样做,你可能可以使用比队列更简单的东西。


预计排队速度比出队速度快(但我还没有数据来确认)。排队也在主线程上执行。当事件触发时,我使用一个动作 BeginInvoke 来异步运行出队过程。 - IAbstract
另外……我从来没有检查过Count,也没有看到需要这样做的必要,因为我几乎立即在工作线程上出队。除非我使用List<T>,否则我不知道有什么比Queue<T>更简单的了——我几乎这样做了,但是Queue<T>类似乎拥有我需要的一切,当然,除了入队时触发的事件。 - IAbstract
看起来你根本不需要队列;你可以将表作为参数直接传递给 BeginInvoke - SLaks
1
@IAbstract 如果您想创建一个在项目队列(项目到达)上引发的事件,可以通过创建自定义事件处理程序EventHandler来实现。下面是一个使用NET 3.5的示例:https://www.experts-exchange.com/questions/28768639/C-I-need-a-FIFO-list-thread-in-order-to-provide-a-buffer-for-printing.html - Willy

0
在你的Enqueue方法中的这一行代码 OnTableQueued(new TableQueuedEventArgs(Dequeue())); 中,
请使用Peek而不是Dequeue。
应该改为

OnTableQueued(new TableQueuedEventArgs(base.Peek()));


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接