I have a stream that emits numbers x
. What I want is dx
(difference in x) so I need a rolling buffer which emits x_n
and x_(n-1)
so I can map to dx = x_n - x_(n-1)
. In a marble diagram this would look like ...
SOURCE --A------B------C--------D------F--G-----
RESULT ---------AB-----BC-------CD-----DF-FG----
This would be handy for other operations like rolling averages etc.
I have checked the operator docs but can't seem to find anything similar. sample
is sort of close but is time dependent. buffer
is also close but it strictly queues values with no overlap between the buffers.
I'm using RxJS
RXJS 4
You maybe don't even need a buffer
for this, a simple concatMap
might work for you (of course I don't know any details of your stream:
observable = Rx.Observable.from(["A", "B", "C", "D", "E", "F"]);
observable
.bufferWithCount(2, 1)
.subscribe(all => {
console.log(all);
});
See live here
RXJS 5
RxJS has the bufferCount
operator which works as follows ...
observable = Rx.Observable.from(["A", "B", "C", "D", "E", "F"])
bufferSize = 2
overlap = 1
observable.bufferCount(bufferSize, overlap)
.subscribe(vals => console.log(vals));
// Results in,
//
// ["A", "B"]
// ["B", "C"]
// ...
overlap
is actually the sample frequency so for example in the above case if overlap = 2
then we would get the normal buffer
behaviour.
Two options that will work regardless of the version:
Better
Pairwise
Rx.Observable.from(["A", "B", "C", "D", "E", "F"])
.pairwise()
.subscribe(all => {
console.log(all);
});
bufferCount (or bufferWithCount
)
This does exist in RxJS 4 as well as RxJS 5
version == 5.* .* ===> bufferCount
version >= 4.1.0 ===> bufferCount
version < 4.1.0 ===> bufferWithCount
Rx.Observable.from(["A", "B", "C", "D", "E", "F"])
// Buffers of length 2 and skip 1 item (i.e. overlap)
.bufferCount(2, 1)
// the last buffer will have only the last item so drop it
.filter(x => x.length == 2)
.subscribe(all => {
console.log(all);
});
See both here
observable = Rx.Observable.from(["A", "B", "C", "D", "E", "F"]);
we can do
observable
.scan((x, y) => [_.last(x), y], [])
.filter((x) => x[0] !== undefined)
.subscribe(all => {
console.log(all);
});
or if a hot observable:
observable.
zip(observable.skip(1))
.subscribe(all => {
console.log(all);
});
or if a cold observable:
observable
.publish(source_ => source_.zip(source_.skip(1)))
.subscribe(all => {
console.log(all);
});
or easiest:
observable
.pairwise()
.subscribe(all => {
console.log(all);
});