C#生产者/消费者/观察者模式?

6
我有一个生产者/消费者队列,但是其中有特定类型的对象。因此,不是任何消费者都可以消费添加的对象。我不想为每种类型创建一个特定的队列,因为种类太多。(这有点扩展了生产者/消费者的定义,但我不确定正确的术语是什么。)
是否有一种名为EventWaitHandle的东西,允许带参数的脉冲?例如:myHandle.Set(AddedType = "foo")。现在,我正在使用Monitor.Wait,然后每个消费者检查脉冲是否确实是针对他们的,但这似乎有点无意义。
以下是我现在的伪代码版本:
class MyWorker {
    public string MyType {get; set;}
    public static Dictionary<string, MyInfo> data;

    public static void DoWork(){
        while(true){
             if(Monitor.Wait(data, timeout)){
                   if (data.ContainsKey(MyType)){
                        // OK, do work
                   }
             }
        }
    }
}

从上面可以看出,当字典中添加其他内容时,我可能会获得脉冲。 我只关心当MyType被添加到字典中时的情况。 有没有办法做到这一点? 这不是很重要,但是例如,现在我必须手动处理超时,因为每次锁定获取都可能在超时时间内成功,但是MyType从未在timeout内添加到字典中。


有一个想法,每种对象类型都可以有自己的锁对象,这样你就可以Monitor.Pulse适当的对象了? - C.Evenhuis
@deltreme:是的,这是可能的,但我不想为每种类型创建一个新对象。 - Xodarap
我不确定您所说的“有太多类型需要为每个类型创建单独的队列”是什么意思。这样做是自然而然的,而且我不确定有很多单独的队列会比其他任何事件处理机制更低效。 - user24359
@ Isaac:一个用户说“我想要X”。 X涉及以这样的方式聚合一些数据,以便生产者/消费者有意义。但是,可能再也没有人请求X了,因此仅针对X拥有队列是毫无意义的,并且由于我们无法预测用户想要什么,这也可能是不可能的。您可以将其视为事件带参数的等效方式-即使有一种通用类型的事件(“生产者制造了某些东西”),我们也希望以不同的方式处理该事件。 - Xodarap
@Isaac:我已经添加了一个示例;这可能会更清楚地说明我的意思。很有可能我在解决问题的方法上出现了错误。 - Xodarap
1
这样怎么样:每当有脉冲时,它就会激活下一个消费者从睡眠中醒来。此时,他可以“寻找”可用的内容,如果不是他需要的类型,他可以回去睡觉,让其他人接替他的位置。如果这不影响任何性能,我认为这是一个简单的方法。 - decyclone
2个回答

3

这是一个有趣的问题。听起来解决方案的关键在于阻塞式优先队列(Priority Queue)。Java有PriorityBlockingQueue,但是遗憾的是.NET BCL没有相应的实现。不过一旦你有了它,实现就很容易。

class MyWorker 
{
    public string MyType {get; set;}
    public static PriorityBlockingQueue<string, MyInfo> data; 

    public static void DoWork()
    {
        while(true)
        {
            MyInfo value;
            if (data.TryTake(MyType, timeout, out value))
            {
                // OK, do work
            }
        }
    }
}

实现一个PriorityBlockingQueue并不是非常困难。通过利用AddTake风格的方法,按照与BlockingCollection相同的模式,我想出了以下代码。
public class PriorityBlockingQueue<TKey, TValue>
{
    private SortedDictionary<TKey, TValue> m_Dictionary = new SortedDictionary<TKey,TValue>();

    public void Add(TKey key, TValue value)
    {
        lock (m_Dictionary)
        {
            m_Dictionary.Add(key, value);
            Monitor.Pulse(m_Dictionary);
        }
    }

    public TValue Take(TKey key)
    {
        TValue value;
        TryTake(key, TimeSpan.FromTicks(long.MaxValue), out value);
        return value;
    }

    public bool TryTake(TKey key, TimeSpan timeout, out TValue value)
    {
        value = default(TValue);
        DateTime initial = DateTime.UtcNow;
        lock (m_Dictionary)
        {
            while (!m_Dictionary.TryGetValue(key, out value))
            {
                if (m_Dictionary.Count > 0) Monitor.Pulse(m_Dictionary); // Important!
                TimeSpan span = timeout - (DateTime.UtcNow - initial);
                if (!Monitor.Wait(m_Dictionary, span))
                {
                    return false;
                }
            }
            m_Dictionary.Remove(key);
            return true;
        }
    }
}

这只是一个快速实现,并且存在一些问题。首先,我没有进行任何测试。其次,它使用红黑树(通过SortedDictionary)作为底层数据结构。这意味着TryTake方法的复杂度为O(log(n))。优先队列通常具有O(1)的删除复杂度。优先队列的典型数据结构是,但我发现跳表在实践中实际上更好,原因有几个。由于这两者都不存在于.NET BCL中,所以我在这种情况下仍然使用了SortedDictionary,尽管它的性能较差。
我应该在这里指出,这并没有真正解决无意义的Wait/Pulse行为。它只是封装在PriorityBlockingQueue类中。但是,至少这肯定会清理您代码的核心部分。

看起来你的代码似乎没有处理每个键对应多个对象的情况,但是通过在添加到字典时使用Queue<MyInfo>而不是普通的MyInfo很容易解决这个问题。


有趣的想法,封装起来很好。你使用SortedDictionary而不是Dictionary有什么原因吗?只是为了更快的插入时间吗? - Xodarap
@Xodarap:嗯,好问题。我最初想到的是你需要一个不接受键值的Take重载函数,它将从字典中删除第一项。但是,你需要一个排序过的字典才能正确地实现这个功能。这实际上就是优先队列的工作原理。我想你可以使用一个普通的字典,只需将包装类称为BlockingDictionary即可。很好的发现。 - Brian Gideon

1

看起来你想将生产者/消费者队列与观察者模式相结合 - 通用的消费者线程或线程从队列中读取,然后将事件传递给所需的代码。在这种情况下,您实际上不会发出观察者信号,而是在消费者线程确定谁对给定的工作项感兴趣时调用它。

.Net中的观察者模式通常使用C#事件来实现。您只需要调用对象的事件处理程序,一个或多个观察者将通过它被调用。目标代码首先必须通过将自身添加到事件以便在到达工作时获得通知来向被观察对象注册自己。


这听起来像是我应该有一个 Dictionary<type,pulseObject>,并且当 type 更新时只需执行 Monitor.Pulse(Dictionary[type])?看起来我可以删除观察者,然后从生产者手动执行此操作。(我同意它会起作用,但我希望我可以使用一个幕后的消费者集合。) - Xodarap
如果您使用 Monitor 来激活目标对象,则通过队列访问单独的消费者线程似乎是多余的。 - Steve Townsend

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接