how to calculate moving average in RxJava

2019-04-07 00:26发布

问题:

In finance domain, we usually need to calculate the moving-window aggregate value from a stream of time series data, use moving average as an example, say we have the following data stream(T is time stamp and V is the actual vlaue):

[T0,V0],[T1,V1],[T2,V2],[T3,V3],[T4,V4],[T5,V5],[T6,V6],[T7,V7],[T8,V8],[T9,V9],[T10,1V0],......

to calculate a moving average 3 from the stream we get:

avg([T0,V0],[T1,V1],[T2,V2]),
avg([T1,V1],[T2,V2],[T3,V3]),
avg([T2,V2],[T3,V3],[T4,V4]),
avg([T3,V3],[T4,V4],[T5,V5]),
avg([T4,V4],[T5,V5],[T6,V6]),...

To calculate the moving average, it seems like we could do it by :

  1. build a Observable from the original stream
  2. build a Observable from the original stream by aggregate the values into groups
  3. using aggregate operator to calculate the final results from Observable in step 2.

Step 1 and 3 is trivial to implement, however, for step 2 it seems like current RxJava do not have build-in operator to produce moving-windows groups, the window/groupBy operator seems not fit in this case, and I did not find a easy way to compose a solution from existing operators, can any one suggest how to do this in RxJava in a "elegantly" fashion?

回答1:

RxJava version: 0.15.1

import java.util.List;                                                          
import rx.Observable;                                                           
import rx.util.functions.Action1;                                               

class Bar {                                                                     

    public static void main(String args[]) {                                    

        Integer arr[] = {1, 2, 3, 4, 5, 6}; // N = 6                            
        Observable<Integer> oi = Observable.from(arr);                          

        // 1.- bundle 3, skip 1                                                 
        oi.buffer(3, 1)                                                         
        /**                                                                     
         * 2.- take only the first X bundles                                    
         * When bundle 3, X = N - 2 => 4                                        
         * When bundle 4, X = N - 3 => 3                                        
         * When bundle a, X = N - (a-1)                                         
         */                                                                     
          .take(4)                                                              
        // 3.- calculate average                                                
          .subscribe(new Action1<List<Integer>>() {                             
            @Override                                                           
            public void call(List<Integer> lst) {                               
                int sum = 0;                                                    
                for(int i = 0; i < lst.size(); i++) {                           
                    sum += lst.get(i);                                          
                }                                                               

                System.out.println("MA(3) " + lst +                             
                                   " => " + sum / lst.size());                  
            }                                                                   
        });                                                                     

    }                                                                           

}  

Sample output:

MA(3) [1, 2, 3] => 2

MA(3) [2, 3, 4] => 3

MA(3) [3, 4, 5] => 4

MA(3) [4, 5, 6] => 5



回答2:

I'd do it like this:

public static Observable<Double> movingAverage(Observable<Double> o, int N) {
    return o.window(N, 1).flatMap(
        new Func1<Observable<Double>, Observable<Double>>() {
            public Observable<Double> call(Observable<Double> window) {
                return Observable.averageDoubles(window);
            }
        }
    );
}
  • I use window (which emits Observables, which only consume a constant amount of memory) and not buffer (which emits Lists, which consume memory for each of their item).
  • This is an example of how you can use combinator operators instead of writing your own loops, something that you should always consider when using Observables.

Update: If you want to filter out the windows at the end of the stream which have less than n elements, you could do it like this:

def movingAverage(o: Observable[Double], n: Int): Observable[Double] = {
  class State(val sum: Double, val n: Int)
  o.window(n, 1).flatMap(win => 
    win.foldLeft(new State(0.0, 0))((s, e) => new State(s.sum + e, s.n + 1))
       .filter(s => s.n == n)
       .map(s => s.sum/s.n))
}

(I chose Scala because it's shorter to write, but in Java, you can do the same, just note that Scala's foldLeft is called reduce in Java).