In finance domain, we usually need to calculate the moving-window aggregate value from a stream of time series data, use moving average as an example, say we have the following data stream(T is time stamp and V is the actual vlaue):
[T0,V0],[T1,V1],[T2,V2],[T3,V3],[T4,V4],[T5,V5],[T6,V6],[T7,V7],[T8,V8],[T9,V9],[T10,1V0],......
to calculate a moving average 3 from the stream we get:
avg([T0,V0],[T1,V1],[T2,V2]),
avg([T1,V1],[T2,V2],[T3,V3]),
avg([T2,V2],[T3,V3],[T4,V4]),
avg([T3,V3],[T4,V4],[T5,V5]),
avg([T4,V4],[T5,V5],[T6,V6]),...
To calculate the moving average, it seems like we could do it by :
- build a Observable from the original stream
- build a Observable from the original stream by aggregate the values into groups
- using aggregate operator to calculate the final results from Observable in step 2.
Step 1 and 3 is trivial to implement, however, for step 2 it seems like current RxJava do not have build-in operator to produce moving-windows groups, the window/groupBy operator seems not fit in this case, and I did not find a easy way to compose a solution from existing operators, can any one suggest how to do this in RxJava in a "elegantly" fashion?
RxJava version: 0.15.1
Sample output:
I'd do it like this:
window
(which emits Observables, which only consume a constant amount of memory) and notbuffer
(which emits Lists, which consume memory for each of their item).Update: If you want to filter out the windows at the end of the stream which have less than
n
elements, you could do it like this:(I chose Scala because it's shorter to write, but in Java, you can do the same, just note that Scala's
foldLeft
is calledreduce
in Java).