I have a problem with RXjava. My system has three observable that send signals (are temperature sensors) to a listener that applies the function zip and later calculates the average of the values received.
I have to implement a function that, after a number of milliseconds that the average temperature is out of range according to a parameter "t", the system signals the presence of an anomaly.
For example:
a = anomaly
x = average value
- = Second
if t = 3:
x-x-x-x-a-a-x => ok
x-x-x-a-a-a-x => ko
My code is here:
public class Example extends Thread {
@override
public void run() {
/*Create 3 observable*/
Observable<Double> alfa = Observable.create((
Subscriber<? super Double> subscriber) -> {
new ObservableTempStream().start();
});
Observable<Double> bravo = Observable.create((
Subscriber<? super Double> subscriber) -> {
new ObservableTempStream().start();
});
Observable<Double> charlie = Observable.create((
Subscriber<? super Double> subscriber) -> {
new ObservableTempStream().start();
});
/*Create 1 observable that apply func avg with zip*/
ConnectableObservable<Double> averageTempStream = Observable.zip(
alfa, bravo, charlie,
(Double a, Double b, Double c) -> ((a + b + c) / 3)).publish();
averageTempStream.connect();
averageTempStream.subscribe((Double v) -> {
if ((v) < (averageTempSensors - threshold)
|| (v) > (averageTempSensors + threshold)) {
System.out.println("Value out of threshold: " + v);
} else {
System.out.println("Value avg it's ok: " + v);
}
}, (Throwable t) -> {
System.out.println("error " + t);
}, () -> {
System.out.println("Completed");
});
}
}
What strategy can be applied to solve this problem?
Is there any function that can be used with asynchronous stream?
In my code:
I report the presence of the error whenever the average is out of range (in practice one of the sensors has sent a spike).
Instead I have to signal an error only when the average is out of range for more than "t" seconds.
thanks a lot