使用weak_ptr实现观察者模式

16

我正在尝试从观察者模式中编写一个安全的Subject类。我想知道是否使用weak_ptr是以这种方式存储IObserver实例的最佳方法,使得:

  • 在释放后不可能再使用IObserver实例。
  • Subject类不会保留应该被释放的IObserver引用(过期监听器问题)。
  • Subject类必须是线程安全的。

不幸的是,我们的编码标准规定我们不能使用boost。我猜我在前世是个坏人。幸运的是,我可以使用C++11(Visual Studio 2012附带的内容)。

这是一个示例Observer类。

// Observer interface that supports notify() method
class IObserver
{
public:
    virtual void notify() const = 0;
    virtual ~IObserver() {}
};

// Concrete observer implementation that prints a message
class Observer : public IObserver
{
public:
    Observer( const std::string& message) : m_message( message ){}

    void notify() const {
        printf( "%s\r\n", m_message.c_str() );
    }

private:
    std::string m_message;
};

这里是Subject类。

// Subject which registers observers and notifies them as needed.
class Subject
{
public:
    // Use shared_ptr to guarantee the observer is valid right now
    void registerObserver( const std::shared_ptr<IObserver>& o )
    {
        std::lock_guard<std::mutex> guard( m_observersMutex );
        m_observers.push_back( o );
    }

    void unregisterObserver( const std::shared_ptr<IObserver>& o )
    {
        std::lock_guard<std::mutex> guard( m_observersMutex );
        // Code to remove the observer from m_observersMutex
    }

    // This is a method that is run in its own thread that notifies observers of some event
    void doNotify()
    {
        std::lock_guard<std::mutex> guard( m_observersMutex );
        // Notify any valid observers of events.
        std::for_each( m_observers.cbegin(), m_observers.cend(), 
            []( const std::weak_ptr<IObserver>& o )
        {
            auto observer = o.lock();
            if ( observer ) {
                observer->notify();
            } 
        } );

        // Remove any dead observers.  These are ones which have expired().
        m_observers.erase( std::remove_if( m_observers.begin(), m_observers.end(), 
            []( const std::weak_ptr<IObserver>& o )
        {
            return o.expired();
        } ), m_observers.end() );

    }


private:
    std::vector<std::weak_ptr<IObserver>> m_observers;
    std::mutex m_observersMutex;
};

这里有一些涉及 Subject 的代码示例:

int main(int argc, wchar_t* argv[])
{

    Subject subject;
    auto observerHello = std::make_shared<Observer>( "Hello world" );
    subject.registerObserver( observerHello );
    {
        // Create a scope to show unregistration.
        auto observerBye = std::make_shared<Observer>( "Good bye" );
        subject.registerObserver( observerBye );

        subject.doNotify();
    }
    printf( "%s\r\n", "Observer good bye is now be destructed" );
    subject.doNotify();
    return 0;
}

我使用weak_ptr是否线程安全?从这里https://dev59.com/z0vSa4cB1Zd3GeqPf6K-#2160422我认为是安全的。

这是解决监听器过期问题的合法方法吗?


2012年的C++标准不是已经支持范围for循环了吗?你为什么还在使用for_each?(无关问题,呵呵) - David
这只是一种习惯,没有其他原因,只是因为我习惯了这样做。 - Steve
1个回答

14

我会对你的doNotify持有一些保留态度——假设你触发的观察者中有一个添加或删除其他观察者,会发生不好的事情(包括崩溃)。或者阻塞在另一个线程上,该线程试图添加一个观察者?这也会导致问题(死锁!)。

这个问题很棘手,基本上是一个重入问题。

永远不要在持有锁时控制代码。在调用回调时持有锁是不允许的。

所以,至少:

获取锁,然后复制列表,然后解锁。在进行此复制时,您还可以从原始列表和副本列表中删除过期的观察者。

然后从已复制的列表触发观察者。

这样仍然存在一些未解决的问题。例如,移除观察者并不能保证它将来不会被调用!它只是意味着它最终不会被调用。

这有多重要取决于您如何使用监听器。

可能有效的一种方法是任务队列,其中包括添加/删除/通知/killthread事件(使killthread成为队列中的任务可以使关闭变得不那么麻烦)。现在所有同步都在队列上。如果您不想编写非阻塞无锁队列,则通知代码可以简单地加锁,std::move队列,解锁,然后继续执行它。或者您可以编写一个队列,使得pop在有东西可读取时才阻塞,而push不会阻塞。

一个快速而简单的“复制和广播”的示例可能如下:

std::vector<std::shared_ptr<IObserver>> targets;
{
  std::lock_guard<std::mutex> guard( m_observersMutex );
  m_observers.erase( std::remove_if( m_observers.begin(), m_observers.end(), 
        [&targets]( const std::weak_ptr<IObserver>& o )
    {
      std::shared_ptr<IObserver> ptr = o.lock();
      if (ptr) {
        targets.push_back(ptr);
        return false;
      } else {
        return true;
      }
    } ), m_observers.end() );
}

for( auto& target:targets ) {
  target->notify();
}

非常有帮助,谢谢 :) 目前我会使用复制和广播,因为在移除观察者后接收事件并不是什么大问题。不过我需要阅读一下非阻塞无锁队列的相关知识。 - Steve

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接