Timer RXjava: control system function

2019-07-21 01:56发布

问题:

I have a problem with RXjava. My system has three observable that send signals (are temperature sensors) to a listener that applies the function zip and later calculates the average of the values received.

I have to implement a function that, after a number of milliseconds that the average temperature is out of range according to a parameter "t", the system signals the presence of an anomaly.

For example:

a = anomaly
x = average value
- = Second

if t = 3:

x-x-x-x-a-a-x => ok
x-x-x-a-a-a-x => ko

My code is here:

public class Example extends Thread {

@override
public void run() {
    /*Create 3 observable*/
    Observable<Double> alfa = Observable.create((
            Subscriber<? super Double> subscriber) -> {
        new ObservableTempStream().start();
    });

    Observable<Double> bravo = Observable.create((
            Subscriber<? super Double> subscriber) -> {
        new ObservableTempStream().start();
    });

    Observable<Double> charlie = Observable.create((
            Subscriber<? super Double> subscriber) -> {
        new ObservableTempStream().start();
    });

    /*Create 1 observable that apply func avg with zip*/
    ConnectableObservable<Double> averageTempStream = Observable.zip(
            alfa, bravo, charlie,
            (Double a, Double b, Double c) -> ((a + b + c) / 3)).publish();

    averageTempStream.connect();


    averageTempStream.subscribe((Double v) -> {

            if ((v) < (averageTempSensors - threshold)
                || (v) > (averageTempSensors + threshold)) {
                System.out.println("Value out of threshold:  " + v);
            } else {
                System.out.println("Value avg it's ok: " + v);
            }
        }, (Throwable t) -> {
            System.out.println("error  " + t);
        }, () -> {
            System.out.println("Completed");
        });
}

}

What strategy can be applied to solve this problem?
Is there any function that can be used with asynchronous stream?

In my code:
I report the presence of the error whenever the average is out of range (in practice one of the sensors has sent a spike). Instead I have to signal an error only when the average is out of range for more than "t" seconds.

thanks a lot

回答1:

What about this:

averageTempStream.map((Double v) -> {    // check whether the value is ok
    (v > averageTempSensors - threshold) && (v < averageTempSensors + threshold)
})
.distinctUntilChanged()                  // ignore subsequent identical values
                                         // (e. g. "x-x-x-x-a-a-x" becomes "x- - - -a- -x")
.debounce(t, TimeUnit.SECONDS)           // only emit a value if it not followed by
                                         // another one within t seconds
                                         // (e. g. "x- - - -a- -x" becomes " - -x- - - - - -x",
                                         // because the final x comes within t seconds of the a and thus prevents it from being passed down the chain)
.subscribe((Boolean ok) -> {
    if (ok) {
        System.out.println("Value avg is ok!");
    } else {
        System.out.println("Value out of threshold!");
    }
}, (Throwable t) -> {
    System.out.println("error  " + t);
}, () -> {
    System.out.println("Completed");
});

Notice that, of course, all items emitted by debounce are delayed by t seconds (how else would it know that there's no newer item coming along within that interval?) - so the ok-signals are delayed, too. You could overcome that by (1) filtering the above stream to remove all the ok-signals and (2) merge it with stream that contains only (undelayed) ok-signals.



标签: rx-java