Building a Sensor Monitoring System using RX

2019-09-14 14:54发布

问题:

See Merging multiple custom observables in RX for background.

My scenario is that I have a number of arbitrary sensors (hardware). I have written some pluggable modules that can connect to these sensors in C#. They currently each use a thread to run an acquisition routine on a timer. The larger goal is to change polling to RX as well once I understand how!

There is a requirement to monitor these sensors in groups so I was thinking there would be an aggregated topic where a monitor could subscribe to for updates from a particular group of sensors (Temperature, Signal Strength etc) and potentially make changes to the behaviour of the system based on the readings from the sensors.

Additionally each sensor would possibly connect to a logging observer to log their current state and the monitor would connect to a logging observer to log its decisions

The same design pattern would apply to any new sensor ,monitor or logger we introduce.

Sample code below:

using System;
using System.Threading;
using System.Collections.Generic;

namespace Soln
    {
    class MainClass
    {
        public static void Main (string[] args)
        {
            Console.WriteLine ("Hello World!");
            var sensorA = new ASensor ();
            sensorA.Start ();

            var sensorB = new BSensor ();
            sensorB.Start ();

            var list = new List<ICustomEventHandler<string>> ();
            list.Add (sensorA);
            list.Add (sensorB);

            var strObserver = new StringObserver (list);
            strObserver.StartMonitor ();
            Console.Read ();
            sensorA.Stop ();
            sensorB.Stop ();
        }
    }

    //its a modular framework so every module implements
    //this interface to interface to a core that loads them up etc
    public interface IPlugin
    {
        bool Start();
        void Stop();
    }

    public interface ICustomEventHandler<T>
    {
        event MyEventHandler<T> SomethingHappened;
    }
    //most sensors inherit from a base class and
    //most create a thread to work in. 
    //The base interface also has an event that it uses to transmit
    //notifications. The actual eventhandler is genericised so
    //can be anything from a primitive to an actual object. Each plugin
    //can additionally transmit multiply types but this is a basic example.
    //hopefully once i can understand how rx works better , i can change the event handling to an IObservable interface
    public abstract class Plugin<T>:IPlugin,ICustomEventHandler<T>
    {   
        Thread oThread;

        protected volatile bool _continueWorking = false;
        #region IPlugin implementation
        public bool Start ()
        {
            oThread = new Thread (DoWork);
            _continueWorking = true;

            oThread.Start ();
            return true;
        }
        protected abstract void DoWork();

        public void Stop ()
        {
            _continueWorking = false;
        }

        protected void RaiseEvent(T eventMessage)
        {
            if (SomethingHappened != null) {
                SomethingHappened (eventMessage);
                Console.WriteLine (eventMessage);
            }
        }
        #endregion
        public event MyEventHandler<T> SomethingHappened;
    }

    public class ASensor:Plugin<string>
    {
        protected override void DoWork()
        {
            //can't share the code for company reasons
            while (_continueWorking) {
                Console.WriteLine (" A doing some work");
                Thread.Sleep (1000);
                RaiseEvent ("ASensor has an event");
            }
        }

    }
    public delegate void MyEventHandler<T>(T foo);

    public class BSensor:Plugin<string>
    {
        protected override void DoWork()
        {
            //can't share the code for company reasons
            while (_continueWorking) {
                Console.WriteLine ("B doing some work");
                Thread.Sleep (1000);
                RaiseEvent ("BSensor has an event");
            }
        }
    }
    //the observer should be strongly typed and take a list of 
    //plugins to monitor. At least those are my current thoughts,happy
    //to find a better way. There could be multiple observers all monitoring
    //the same plugins for different purposes
    public abstract class Observer<T>
    {
        protected List<ICustomEventHandler<T>> Plugins;
        protected Observer(List<ICustomEventHandler<T>> plugins)
        {
            Plugins = plugins;
        }
        //use rx to subscribe to all events 
        public abstract void StartMonitor ();
    }

    public class StringObserver:Observer<string>
    {

        public StringObserver(List<ICustomEventHandler<string>> plugins)
            :base(plugins)
        {
        }

        //subscribe to all plugin events in list using rx merge?
        //monitor and log to file
        public override void StartMonitor ()
        {
            //throw new NotImplementedException ();
        }
    }
}

Thanks for reading

回答1:

This isn't completely a direct answer, but it might give you some idea where your code can head.

Because you haven't given us the code for your sensors I can't really give you a concrete solution, but if you asked me to convert your current code to Rx then I'd probably do something like this:

Func<string, IObservable<string>> generate = t =>
    Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(x => t);

var subject = new Subject<IObservable<string>>();

using (var subscription = subject.Merge().Subscribe(Console.WriteLine))
{
    subject.OnNext(generate("A doing some work"));
    subject.OnNext(generate("B doing some work"));
    Console.ReadLine();
}

Now the Func<string, IObservable<string>> was just a bit of duplication removal, but without it I can replicate the functionality of your code in 5 lines (which includes a Console.ReadLine();).

Can you show us the sensor code please?