非阻塞并发集合?

8
System.Collections.Concurrent中有一些新的集合,非常适用于多线程环境。但是它们有点受限制。要么它们会阻塞直到有一个项目可用,要么它们会返回default(T)(TryXXX方法)。
我需要一个线程安全的集合,但不是阻塞调用线程,而是使用回调通知我至少有一个项目可用。
我的当前解决方案是使用BlockingCollection,但使用带有委托的APM来获取下一个元素。换句话说,我创建了一个指向从集合中Take的方法的委托,并使用BeginInvoke执行该委托。
不幸的是,我必须在我的类中保留很多状态才能实现这一点。更糟糕的是,这个类不是线程安全的;它只能被单个线程使用。我正在维护性的边缘上,我希望不要这样做。
我知道有一些库可以使我在不添加任何引用外部版本4框架的情况下轻松完成我在这里所做的事情(我相信反应框架就是其中之一),但我希望实现我的目标而不添加任何引用。
是否有任何更好的模式可以使用,而不需要外部引用,以实现我的目标?
简而言之:
是否有满足以下要求的模式:
“我需要向集合发出信号,表明我已准备好下一个元素,并在下一个元素到达时使集合执行回调,而不会阻塞任何线程。”

这个会是线程安全的吗?在委托被调用之前,有什么阻止可用项目变得不可用?你的总体目标是什么(例如,一个排队系统)? - Adam Houldsworth
@Adam 对于消耗物品的观点很好。委托会获取从集合中移除的物品。因此,委托的执行会被阻塞,直到从集合中Take出一个物品,并且该物品是传递给EndInvoke的object。总体目标有点复杂;基本上我必须使工作流程处于空闲状态,直到物品变得可用。您不能阻止工作流程的执行,因此仅仅Take一个物品是行不通的,因为调用会被阻塞。我必须创建一个书签,然后将其传递给扩展程序。扩展程序调用委托,在回调中恢复书签。 - user1228
不幸的是,我对工作流程的经验很少 - 尝试在你的问题中添加这个细节,它可能会引起某人的兴趣 :-) - Adam Houldsworth
@Adam 工作流程部分并不是很相关;它只是我需求的原因。我没有包括它是因为我觉得要么会混淆,要么会得到"你做错了"的回答/评论,或者他们会试图回答关于工作流程而不是异步的问题。 - user1228
2个回答

5

我认为有两个可能的解决方案。虽然我并不是特别满意,但它们至少提供了一个合理的替代APM方法。

第一个解决方案虽然不符合你的无阻塞线程要求,但我认为它相当优雅,因为你可以注册回调函数,并按循环轮流调用它们,但你仍然可以像通常一样调用TakeTryTake来获取BlockingCollection中的项。该代码在每次请求项时都强制注册回调。这是集合的信号机制。这种方法的好处是,对Take的调用不会像在我的第二个解决方案中那样被饿死。

public class NotifyingBlockingCollection<T> : BlockingCollection<T>
{
    private Thread m_Notifier;
    private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>();

    public NotifyingBlockingCollection()
    {
        m_Notifier = new Thread(Notify);
        m_Notifier.IsBackground = true;
        m_Notifier.Start();
    }

    private void Notify()
    {
        while (true)
        {
            Action<T> callback = m_Callbacks.Take();
            T item = Take();
            callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
        }
    }

    public void RegisterForTake(Action<T> callback)
    {
        m_Callbacks.Add(callback);
    }
}

第二个函数符合你对非阻塞线程的要求。请注意,它将回调函数的调用转移到了线程池中。我这样做是因为我认为如果它同步执行,那么锁定时间会更长,导致 AddRegisterForTake 函数出现瓶颈。我仔细检查过了,我认为它不会出现死锁(即有一个项目和一个回调可用,但回调从未被执行),但你可能需要自己检查一下。唯一的问题是,调用 Take 将会被饿死,因为回调函数总是优先执行。
public class NotifyingBlockingCollection<T>
{
    private BlockingCollection<T> m_Items = new BlockingCollection<T>();
    private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>();

    public NotifyingBlockingCollection()
    {
    }

    public void Add(T item)
    {
        lock (m_Callbacks)
        {
            if (m_Callbacks.Count > 0)
            {
                Action<T> callback = m_Callbacks.Dequeue();
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Items.Add(item);
            }
        }
    }

    public T Take()
    {
        return m_Items.Take();
    }

    public void RegisterForTake(Action<T> callback)
    {
        lock (m_Callbacks)
        {
            T item;
            if (m_Items.TryTake(out item))
            {
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Callbacks.Enqueue(callback);
            }
        }
    }
}

谢谢您的回答,但它略微不是我要找的。这就是我目前正在做的事情,但是使用了将APM推入集合中的代码(您提供的代码)。我想我的问题的关键是APM不符合我的要求,它只是我使用的实现。我的要求需要一种模式,该模式提供了解决方案来回答以下问题:“如何向集合发出信号,表明我已准备好下一个元素,并在到达下一个元素时使集合执行回调,而不会阻塞任何线程?” - user1228
我有点意识到那不是你想要的。这是一个有趣的问题。太糟糕了 Add 不是 virtual,否则你也许可以在那里注入通知。也许你可以使用其中一个阻塞队列实现作为起点。问题是你必须小心如何传递那个通知,否则另一个消费者会先获取该项。如果我有时间,我今天可能会试一下。如果你找到答案,请自己发布一个回答。我不知道...你可能会发现抛弃它并引用另一个库更容易。 - Brian Gideon
通知应该包含下一个元素,并由通知器控制。也许这个想法是一个集合是错误的;只有通过这种机制才能提供下一个项目,从而避免两个观察者争夺单个项目的问题。换句话说,一个观察者不能使用机制A来获取下一个项目(即T Pop()),而另一个观察者已经注册了回调。 - user1228
@Will:现在看一下。我有两个不同的想法。我对第二个例子可能出现的死锁情况有些担心,但据我所知,它似乎对于多个生产者和多个消费者是安全的。 - Brian Gideon
谢谢。我可能最终会适应其中一些内容。我希望框架中有类似我所需的东西,但事实并非如此。 - user1228

3
这个怎么样?(命名可能需要改进。请注意,这尚未经过测试。)
public class CallbackCollection<T>
{
    // Sychronization object to prevent race conditions.
    private object _SyncObject = new object();

    // A queue for callbacks that are waiting for items.
    private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>();

    // A queue for items that are waiting for callbacks.
    private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>();

    public void Add(T item)
    {
        Action<T> callback;
        lock (_SyncObject)
        {
            // Try to get a callback. If no callback is available,
            // then enqueue the item to wait for the next callback
            // and return.
            if (!_Callbacks.TryDequeue(out callback))
            {
                _Items.Enqueue(item);
                return;
            }
        }

        ExecuteCallback(callback, item);
    }

    public void TakeAndCallback(Action<T> callback)
    {
        T item;
        lock(_SyncObject)
        {
            // Try to get an item. If no item is available, then
            // enqueue the callback to wait for the next item
            // and return.
            if (!_Items.TryDequeue(out item))
            {
                _Callbacks.Enqueue(callback);
                return;
            }
        }
        ExecuteCallback(callback, item);
    }

    private void ExecuteCallback(Action<T> callback, T item)
    {
        // Use a new Task to execute the callback so that we don't
        // execute it on the current thread.
        Task.Factory.StartNew(() => callback.Invoke(item));
    }
}

刚刚刷新页面看到了@Brian的NotifyingBlockingCollection。看起来他和我同时想出了大致相同的解决方案。 - Jack Leitch
是的,我们在这方面肯定是有同样的想法,特别是关于将回调函数的调用从当前线程中移除。 - Brian Gideon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接