如何在Rust中实现观察者模式?

37

我有一个可观察的集合和一个观察者。我希望观察者是trait Observer的特征实现。可观察对象应该能够在发生某些事件时通知每个观察者。这应该说明我的意图:

struct A {
    observables: Vec<Observable>,
}

impl A {
    fn new() -> A {
        A {
            observables: vec![],
        }
    }
}

trait Observer {
    fn event(&mut self, _: &String);
}

impl Observer for A {
    fn event(&mut self, ev: &String) {
        println!("Got event from observable: {}", ev);
    }
}

struct Observable {
    observers: Vec<dyn Observer>, // How to contain references to observers? (this line is invalid)
}

impl Observable {
    fn new() -> Observable {
        Observable {
            observers: Vec::new(),
        }
    }

    fn add_observer(&mut self, o: &dyn Observer) {
        // incorrect line too
        self.observers.push(o);
    }

    fn remove_observer(&mut self, o: &dyn Observer) {
        // incorrect line too
        self.observers.remove(o);
    }

    fn notify_observers(&self, ev: &String) {
        for o in &mut self.observers {
            o.event(ev);
        }
    }
}

(播放区)

我收到了错误信息:

error[E0277]: the size for values of type `(dyn Observer + 'static)` cannot be known at compilation time
  --> src/lib.rs:24:5
   |
24 |     observers: Vec<dyn Observer>, // How to contain references to observers?
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time
   |
   = help: the trait `std::marker::Sized` is not implemented for `(dyn Observer + 'static)`
   = note: to learn more, visit <https://doc.rust-lang.org/book/second-edition/ch19-04-advanced-types.html#dynamically-sized-types-and-the-sized-trait>
   = note: required by `std::vec::Vec`

这只是我想要做的东西的模拟。我已经有了Java、Python和C++中的类似代码,但我不知道如何在Rust中实现观察者模式。我相信我的问题在于在可观察对象内存储观察者对象的引用。


1
Observer 如何从 Observable 注销?Observer 的生命周期如何处理? - Matthieu M.
@MatthieuM。我也想知道,因为它与问题的答案有关。对不起,我不明白你的第二个问题。 - Victor Polevoy
2
在users.rust-lang.org上进行跨帖:https://users.rust-lang.org/t/how-can-i-correctly-implement-observer-pattern-in-rust/6058 - oli_obk
请查看http://stackoverflow.com/q/37504162/5189607。 - malbarbo
5个回答

36

根据实现选择的不同,观察者模式可能会带来所有权方面的挑战。

在垃圾收集语言中,通常情况下Observable将引用Observer(以便通知它),而Observer将引用Observable(以便取消注册)...这会在所有权方面造成一些挑战(谁能比谁活得更久?),还有整个“注销时通知”的问题。

在Rust(和C++)中,我建议避免循环引用。


简单的解决方案

ObservableObserver具有不同的生命周期,彼此之间没有彼此拥有或预计彼此比对方活得更久的情况。

use std::rc::Weak;

struct Event;

trait Observable {
    fn register(&mut self, observer: Weak<dyn Observer>);
}

trait Observer {
    fn notify(&self, event: &Event);
}
关键是将Observer分配到一个Rc中,然后将Weak(弱引用)交给Observable

如果需要在Event上修改Observer,则需要内部可变性或包装成RefCell(将Weak<RefCell<dyn Observer>>传递给Observable

在通知时,Observable会定期意识到有死亡的弱引用(Observer已经消失),然后可以懒惰地删除它们。


还有其他解决方案,例如使用代理(与事件循环非常相似),从推模式转换为拉模式(即(1)生成所有事件,(2)处理所有事件),但这些方法略有不同于传统的观察者模式,并且有不同的优缺点,因此我不会在这里尝试处理它们全部。

这是一个不错的解决方案。如果您不打算在所有可观察对象都消失之前更改(构建或删除)观察者,则还可以使用生命周期来表示所有观察者必须比可观察对象存在更长时间。尽管这会相当受限制。 - Michael Younkin

20

我使用了回调函数。它简单而功能强大,没有生命周期问题或类型擦除。

我尝试了Weak<dyn Observer>,但是

  1. 它需要一个Rc
  2. 你必须重复代码以创建不同的观察者结构体。
  3. 它需要类型擦除
pub struct Notifier<E> {
    subscribers: Vec<Box<dyn Fn(&E)>>,
}

impl<E> Notifier<E> {
    pub fn new() -> Notifier<E> {
        Notifier {
            subscribers: Vec::new(),
        }
    }

    pub fn register<F>(&mut self, callback: F)
    where
        F: 'static + Fn(&E),
    {
        self.subscribers.push(Box::new(callback));
    }

    pub fn notify(&self, event: E) {
        for callback in &self.subscribers {
            callback(&event);
        }
    }
}

7
这个“static”是否意味着只能使用静态函数?如果是这样,我们就不能使用运行时函数了,对吗?可以举个例子吗?例如,我有一个观察者:Observer<E>,其中Observer<E>有一个方法do_something(&self, event: &E)。 - Robert Cutajar
1
@Rbjz 'static' 限制了 F 的生命周期,意味着 F 类型必须在完整的运行时范围内可用。 - uwu
它能够移除订阅者/观察者吗? - chenzhongpu
2
这是GTK使用的方法,但它会导致许多问题,因为您无法访问任何非静态数据。例如,当“Notifier”是要注册回调的结构体的字段时,您无法将对象上的方法用作回调,这是一种常见做法。 - Ben Jaguar Marshall
显示剩余2条评论

9
这是我基于这个问题的答案和许多痛苦和折磨实现的。我使用弱引用来存储观察者,并使用 RefCell 来调用可变的 notify()
我使用 Arc,因为我的监听器可能会从任何线程调用。如果您只使用单个线程,则可以使用 Rc
每次调用 dispatch() 时,它将检查是否有任何已消失的弱引用监听器。 如果有任何监听器,则会清除该监听器列表。
pub enum Event {} // You make Event hold anything you want to fire 

pub trait Listener {
    fn notify(&mut self, event: &Event);
}

pub trait Dispatchable<T>
    where T: Listener
{
    fn register_listener(&mut self, listener: Arc<RefCell<T>>);
}

pub struct Dispatcher<T>
    where T: Listener
{
    /// A list of synchronous weak refs to listeners
    listeners: Vec<Weak<RefCell<T>>>,
}

impl<T> Dispatchable<T> for Dispatcher<T>
    where T: Listener
{
    /// Registers a new listener
    fn register_listener(&mut self, listener: Arc<RefCell<T>>) {
        self.listeners.push(Arc::downgrade(&listener));
    }
}

impl<T> Dispatcher<T>
    where T: Listener
{
    pub fn new() -> Dispatcher<T> {
        Dispatcher { listeners: Vec::new() }
    }

    pub fn num_listeners(&self) -> usize {
        self.listeners.len()
    }

    pub fn dispatch(&mut self, event: Event) {
        let mut cleanup = false;
        // Call the listeners
        for l in self.listeners.iter() {
            if let Some(mut listener_rc) = l.upgrade() {
                let mut listener = listener_rc.borrow_mut();
                listener.notify(&event);
            } else {
                println!("Cannot get listener, cleanup necessary");
                cleanup = true;
            }
        }
        // If there were invalid weak refs, clean up the list
        if cleanup {
            println!("Dispatcher is cleaning up weak refs");
            self.listeners.retain(|ref l| {
                // Only retain valid weak refs
                let got_ref = l.clone().upgrade();
                match got_ref {
                    None => false,
                    _ => true,
                }
            });
        }
    }
}

这里是一个单元测试代码片段,用于测试它。该测试来自一张卡牌游戏库,我的Event枚举具有FlopDealtGameFinished变量。该测试创建并注册我的监听器,并确保在调度FlopDealt时被调用。作用域部分用于测试监听器超出作用域后的弱引用行为。我触发另一个事件并计算侦听器数量以确保列表已清除。
use std::time::Instant;

#[derive(Debug)]
pub enum Event {
    FlopDealt,
    GameFinished { ended: Instant },
}

struct MyListener {
    pub flop_dealt: bool,
}

impl Listener for MyListener {
    fn notify(&mut self, event: &Event) {
        println!("Notify called with {:?}", event);
        if let Event::FlopDealt = event {
            println!("Flop dealt");
            self.flop_dealt = true;
        }
    }
}

#[test]
fn events_register() {
    let mut d: Dispatcher<MyListener> = Dispatcher::new();

    {
        let listener_rc = Arc::new(RefCell::new(MyListener { flop_dealt: false }));
        d.register_listener(listener_rc.clone());
        d.dispatch(Event::FlopDealt);

        let flop_dealt = listener_rc.borrow().flop_dealt;
        println!("Flop was {}dealt", if flop_dealt { "" } else { "not " });
        assert_eq!(flop_dealt, true);
        assert_eq!(d.num_listeners(), 1);
    }

    // Listener should disappear
    d.dispatch(Event::GameFinished {
        ended: Instant::now(),
    });
    assert_eq!(d.num_listeners(), 0);
}

1

Rust 设计模式 https://github.com/lpxxn/rust-design-pattern

trait IObserver {
    fn update(&self);
}

trait ISubject<'a, T: IObserver> {
    fn attach(&mut self, observer: &'a T);
    fn detach(&mut self, observer: &'a T);
    fn notify_observers(&self);
}

struct Subject<'a, T: IObserver> {
    observers: Vec<&'a T>,
}
impl<'a, T: IObserver + PartialEq> Subject<'a, T> {
    fn new() -> Subject<'a, T> {
        Subject {
            observers: Vec::new(),
        }
    }
}

impl<'a, T: IObserver + PartialEq> ISubject<'a, T> for Subject<'a, T> {
    fn attach(&mut self, observer: &'a T) {
        self.observers.push(observer);
    }
    fn detach(&mut self, observer: &'a T) {
        if let Some(idx) = self.observers.iter().position(|x| *x == observer) {
            self.observers.remove(idx);
        }
    }
    fn notify_observers(&self) {
        for item in self.observers.iter() {
            item.update();
        }
    }
}

#[derive(PartialEq)]
struct ConcreteObserver {
    id: i32,
}
impl IObserver for ConcreteObserver {
    fn update(&self) {
        println!("Observer id:{} received event!", self.id);
    }
}

fn main() {
    let mut subject = Subject::new();
    let observer_a = ConcreteObserver { id: 1 };
    let observer_b = ConcreteObserver { id: 2 };

    subject.attach(&observer_a);
    subject.attach(&observer_b);
    subject.notify_observers();

    subject.detach(&observer_b);
    subject.notify_observers();
}

输出

Observer id:1 received event!
Observer id:2 received event!
Observer id:1 received event!

2
observers: Vec<&'a T> 意味着只能使用一种类型的观察者,因此它不是一个实用的实现。 - chenzhongpu
1
此外,观察者必须比可观察的主体存在时间更长。在某些用例中这可能是可以接受的,但如果您想要能够动态添加/删除它们,您可能更喜欢使用弱引用的解决方案。 - hans_meine

-1

如果您仍然对此感兴趣,我写了this答案:

我尝试了this的方法,并且它对我很有效,它非常简单:

  • 定义您的对象struct
  • 定义您的Listeners,
  • 定义您的标准函数,让我们称之为Extensions,
  • 通过执行Self::Fn<Listener>将添加Emitter选项到Extensions

我在playground中使用的相同代码如下,我只是在rust forum中解决了它:

// 1. Define your object
//#[derive(Debug)]
pub struct Counter {
 count: i32,
}

// 2. (Optional), if do not want to use `#[derive(Debug)]` 
//    you can define your own debug/release format
impl std::fmt::Debug for Counter {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "Counter `count` is: {}", self.count)
    }
}

// 3. Define your Listeners trait 
trait EventListener {
     fn on_squared() {
        println!("Counter squared")
     }
     fn on_increased(amount: i32) {
        println!("Counter increased by {}", amount)
     }
     fn on_decreased(self, amount: i32);
}

// 4. Implement your Listeners trait to your object
impl EventListener for Counter {
    fn on_decreased(self, amount: i32) {
        println!("Counter reduced from {} to {}", &self.count, &self.count - amount)
    }
}

// 5. (Recommended), Define your standard functions/Extensions/Emitters
//    trait signatures
trait EventEmitter {
    fn square(&mut self);
    fn increase(&mut self, amount: i32);
    fn decrease(&mut self, amount: i32);
    fn change_by(&mut self, amount: i32);
}

// 6. Implement your standard functions/Extensions/Emitters trait to your object
impl EventEmitter for Counter {
    fn square(&mut self) { 
        self.count = self.count.pow(2);
        Self::on_squared();      // This is Event Emitter, calling the Listner
    }
    fn increase(&mut self, amount: i32) { 
        self.count = self.count + amount; 
        Self::on_increased(amount);   // This is Event Emitter, calling the Listner
    }
    fn decrease(&mut self, amount: i32) {
        let initial_value = self.count;
        self.count = self.count - amount;
        Self::on_decreased(Self {count: initial_value}, amount);  // This is Event Emitter, calling the Listner
    }
    fn change_by(&mut self, amount: i32) {
        let initial_value = self.count;
        self.count = self.count + amount;
        match amount {
            x if x > 0 => Self::on_increased(amount),   // This is Event Emitter, calling the Listner
            x if x < 0 => Self::on_decreased(Self {count: initial_value},  // This is Event Emitter, calling the Listneramount.abs()),
            _   => println!("No changes")
        }
    }
}

// 7. Build your main function
fn main() {
    let mut x = Counter { count: 5 };
    println!("Counter started at: {:#?}", x.count);
    x.square();   // Call the extension, which will automatically trigger the listner
    println!("{:?}", x);
    x.increase(3);
    println!("{:?}", x);
    x.decrease(2);
    println!("{:?}", x);
    x.change_by(-1);
    println!("{:?}", x);
}

并获得以下输出:

Counter started at: 5
Counter squared
Counter `count` is: 25
Counter increased by 3
Counter `count` is: 28
Counter reduced from 28 to 26
Counter `count` is: 26
Counter reduced from 26 to 25
Counter `count` is: 25

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接