How can I implement the observer pattern in Rust?

2020-05-23 01:42发布

问题:

I have an observable collection and an observer. I want the observer to be a trait implementation of trait Observer. The observable object should be able to notify each observer when some event occurs. This should explain my intentions:

struct A {
    observables: Vec<Observable>,
}

impl A {
    fn new() -> A {
        A {
            observables: vec![],
        }
    }
}

trait Observer {
    fn event(&mut self, _: &String);
}

impl Observer for A {
    fn event(&mut self, ev: &String) {
        println!("Got event from observable: {}", ev);
    }
}

struct Observable {
    observers: Vec<dyn Observer>, // How to contain references to observers? (this line is invalid)
}

impl Observable {
    fn new() -> Observable {
        Observable {
            observers: Vec::new(),
        }
    }

    fn add_observer(&mut self, o: &dyn Observer) {
        // incorrect line too
        self.observers.push(o);
    }

    fn remove_observer(&mut self, o: &dyn Observer) {
        // incorrect line too
        self.observers.remove(o);
    }

    fn notify_observers(&self, ev: &String) {
        for o in &mut self.observers {
            o.event(ev);
        }
    }
}

(Playground)

I get the error:

error[E0277]: the size for values of type `(dyn Observer + 'static)` cannot be known at compilation time
  --> src/lib.rs:24:5
   |
24 |     observers: Vec<dyn Observer>, // How to contain references to observers?
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time
   |
   = help: the trait `std::marker::Sized` is not implemented for `(dyn Observer + 'static)`
   = note: to learn more, visit <https://doc.rust-lang.org/book/second-edition/ch19-04-advanced-types.html#dynamically-sized-types-and-the-sized-trait>
   = note: required by `std::vec::Vec`

This is just a mock-up of what I want to do. I have code like this in Java, Python, and C++, but I don't know how to implement the observer pattern in Rust. I believe my problem is in storing a reference to observer objects inside observable objects.

回答1:

The Observer pattern, depending on implementation choices, may pose an ownership challenge.

In garbage collected languages it is typical to have the Observable referring to the Observer (to notify it) and the Observer referring to the Observable (to unregister itself)... this causes some challenges in terms of ownership (who outlives whom?) and there is this whole "notification on un-registering" thing.

In Rust (and C++), I advise avoiding cycles.


Simple solution

The Observable and Observer have distinct lifetimes, none owning the other or being expected to outlive the other.

use std::rc::Weak;

struct Event;

trait Observable {
    fn register(&mut self, observer: Weak<dyn Observer>);
}

trait Observer {
    fn notify(&self, event: &Event);
}

The key is to allocate the Observer into a Rc and then hand over Weak (weak references) to the Observable.

If the Observer needs be modified on the Event, then either it needs internal mutability or it needs to be wrapped into a RefCell (passing Weak<RefCell<dyn Observer>> to the Observable).

When notifying, the Observable will regularly realize that there are dead weak-references (the Observer has disappeared), it can remove those then, lazily.


There are other solutions, such as using a Broker (quite similar to an event loop), moving from push mode to pull mode (i.e. (1) generate all events, (2) treat all of them), however these depart a bit from the traditional Observer Pattern and have different pluses/minuses so I will not attempt to treat them all here.



回答2:

I used a callback function. It's simple and powerful and there are no lifetime issues or type erasure.

I tried Weak<dyn Observer>, but

  1. It needs an Rc
  2. You have to repeat code to create a different observer struct.
  3. It requires type erasure
pub struct Notifier<E> {
    subscribers: Vec<Box<dyn Fn(&E)>>,
}

impl<E> Notifier<E> {
    pub fn new() -> Notifier<E> {
        Notifier {
            subscribers: Vec::new(),
        }
    }

    pub fn register<F>(&mut self, callback: F)
    where
        F: 'static + Fn(&E),
    {
        self.subscribers.push(Box::new(callback));
    }

    pub fn notify(&self, event: E) {
        for callback in &self.subscribers {
            callback(&event);
        }
    }
}


回答3:

This is my implementation based on the answers to this question and much pain and suffering. I use a weak reference to store the observer and a RefCell to be able to call a mutable notify().

I'm using Arc because my listener could be called from any thread. If you were using a single thread, you could use Rc.

Every time dispatch() is called, it will check if there are any weakly referenced listeners which have disappeared. If there are any it will clean up the listener list.

pub enum Event {} // You make Event hold anything you want to fire 

pub trait Listener {
    fn notify(&mut self, event: &Event);
}

pub trait Dispatchable<T>
    where T: Listener
{
    fn register_listener(&mut self, listener: Arc<RefCell<T>>);
}

pub struct Dispatcher<T>
    where T: Listener
{
    /// A list of synchronous weak refs to listeners
    listeners: Vec<Weak<RefCell<T>>>,
}

impl<T> Dispatchable<T> for Dispatcher<T>
    where T: Listener
{
    /// Registers a new listener
    fn register_listener(&mut self, listener: Arc<RefCell<T>>) {
        self.listeners.push(Arc::downgrade(&listener));
    }
}

impl<T> Dispatcher<T>
    where T: Listener
{
    pub fn new() -> Dispatcher<T> {
        Dispatcher { listeners: Vec::new() }
    }

    pub fn num_listeners(&self) -> usize {
        self.listeners.len()
    }

    pub fn dispatch(&mut self, event: Event) {
        let mut cleanup = false;
        // Call the listeners
        for l in self.listeners.iter() {
            if let Some(mut listener_rc) = l.upgrade() {
                let mut listener = listener_rc.borrow_mut();
                listener.notify(&event);
            } else {
                println!("Cannot get listener, cleanup necessary");
                cleanup = true;
            }
        }
        // If there were invalid weak refs, clean up the list
        if cleanup {
            println!("Dispatcher is cleaning up weak refs");
            self.listeners.retain(|ref l| {
                // Only retain valid weak refs
                let got_ref = l.clone().upgrade();
                match got_ref {
                    None => false,
                    _ => true,
                }
            });
        }
    }
}

Here is a unit test code snippet that exercises it.

The test is from a card game library where my Event enum has FlopDealt and GameFinished variants. The test creates and registers my listener, and ensures that it was called when FlopDealt is dispatched. The scoped section is so I can test the weak reference behaviour after the listener goes out of scope. I fire another event and count the number of listeners to ensure the list was cleaned out.

use std::time::Instant;

#[derive(Debug)]
pub enum Event {
    FlopDealt,
    GameFinished { ended: Instant },
}

struct MyListener {
    pub flop_dealt: bool,
}

impl Listener for MyListener {
    fn notify(&mut self, event: &Event) {
        println!("Notify called with {:?}", event);
        if let Event::FlopDealt = event {
            println!("Flop dealt");
            self.flop_dealt = true;
        }
    }
}

#[test]
fn events_register() {
    let mut d: Dispatcher<MyListener> = Dispatcher::new();

    {
        let listener_rc = Arc::new(RefCell::new(MyListener { flop_dealt: false }));
        d.register_listener(listener_rc.clone());
        d.dispatch(Event::FlopDealt);

        let flop_dealt = listener_rc.borrow().flop_dealt;
        println!("Flop was {}dealt", if flop_dealt { "" } else { "not " });
        assert_eq!(flop_dealt, true);
        assert_eq!(d.num_listeners(), 1);
    }

    // Listener should disappear
    d.dispatch(Event::GameFinished {
        ended: Instant::now(),
    });
    assert_eq!(d.num_listeners(), 0);
}


回答4:

rust design patterns https://github.com/lpxxn/rust-design-pattern

trait IObserver {
    fn update(&self);
}

trait ISubject<'a, T: IObserver> {
    fn attach(&mut self, observer: &'a T);
    fn detach(&mut self, observer: &'a T);
    fn notify_observers(&self);
}

struct Subject<'a, T: IObserver> {
    observers: Vec<&'a T>,
}
impl<'a, T: IObserver + PartialEq> Subject<'a, T> {
    fn new() -> Subject<'a, T> {
        Subject {
            observers: Vec::new(),
        }
    }
}

impl<'a, T: IObserver + PartialEq> ISubject<'a, T> for Subject<'a, T> {
    fn attach(&mut self, observer: &'a T) {
        self.observers.push(observer);
    }
    fn detach(&mut self, observer: &'a T) {
        if let Some(idx) = self.observers.iter().position(|x| *x == observer) {
            self.observers.remove(idx);
        }
    }
    fn notify_observers(&self) {
        for item in self.observers.iter() {
            item.update();
        }
    }
}

#[derive(PartialEq)]
struct ConcreteObserver {
    id: i32,
}
impl IObserver for ConcreteObserver {
    fn update(&self) {
        println!("Observer id:{} received event!", self.id);
    }
}

fn main() {
    let mut subject = Subject::new();
    let observer_a = ConcreteObserver { id: 1 };
    let observer_b = ConcreteObserver { id: 2 };

    subject.attach(&observer_a);
    subject.attach(&observer_b);
    subject.notify_observers();

    subject.detach(&observer_b);
    subject.notify_observers();
}

output

Observer id:1 received event!
Observer id:2 received event!
Observer id:1 received event!


回答5:

I wrote this answer if you are still interested about it:

I tried this way, and it worked fine with me, it is as simple as:

  • Define your object struct
  • Define your Listeners,
  • Define your standard functions, let's call them Extensions,
  • Define the add Emitter option to the Extensions by execution Self::Fn<Listener>

The same code I used in the playground is below, I just solved it in the rust forum:

// 1. Define your object
//#[derive(Debug)]
pub struct Counter {
 count: i32,
}

// 2. (Optional), if do not want to use `#[derive(Debug)]` 
//    you can define your own debug/release format
impl std::fmt::Debug for Counter {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "Counter `count` is: {}", self.count)
    }
}

// 3. Define your Listeners trait 
trait EventListener {
     fn on_squared() {
        println!("Counter squared")
     }
     fn on_increased(amount: i32) {
        println!("Counter increased by {}", amount)
     }
     fn on_decreased(self, amount: i32);
}

// 4. Implement your Listeners trait to your object
impl EventListener for Counter {
    fn on_decreased(self, amount: i32) {
        println!("Counter reduced from {} to {}", &self.count, &self.count - amount)
    }
}

// 5. (Recommended), Define your standard functions/Extensions/Emitters
//    trait signatures
trait EventEmitter {
    fn square(&mut self);
    fn increase(&mut self, amount: i32);
    fn decrease(&mut self, amount: i32);
    fn change_by(&mut self, amount: i32);
}

// 6. Implement your standard functions/Extensions/Emitters trait to your object
impl EventEmitter for Counter {
    fn square(&mut self) { 
        self.count = self.count.pow(2);
        Self::on_squared();      // This is Event Emitter, calling the Listner
    }
    fn increase(&mut self, amount: i32) { 
        self.count = self.count + amount; 
        Self::on_increased(amount);   // This is Event Emitter, calling the Listner
    }
    fn decrease(&mut self, amount: i32) {
        let initial_value = self.count;
        self.count = self.count - amount;
        Self::on_decreased(Self {count: initial_value}, amount);  // This is Event Emitter, calling the Listner
    }
    fn change_by(&mut self, amount: i32) {
        let initial_value = self.count;
        self.count = self.count + amount;
        match amount {
            x if x > 0 => Self::on_increased(amount),   // This is Event Emitter, calling the Listner
            x if x < 0 => Self::on_decreased(Self {count: initial_value},  // This is Event Emitter, calling the Listneramount.abs()),
            _   => println!("No changes")
        }
    }
}

// 7. Build your main function
fn main() {
    let mut x = Counter { count: 5 };
    println!("Counter started at: {:#?}", x.count);
    x.square();   // Call the extension, which will automatically trigger the listner
    println!("{:?}", x);
    x.increase(3);
    println!("{:?}", x);
    x.decrease(2);
    println!("{:?}", x);
    x.change_by(-1);
    println!("{:?}", x);
}

And got the below output:

Counter started at: 5
Counter squared
Counter `count` is: 25
Counter increased by 3
Counter `count` is: 28
Counter reduced from 28 to 26
Counter `count` is: 26
Counter reduced from 26 to 25
Counter `count` is: 25