How do I obtain a rolling buffer of the last two i

2019-07-11 03:55发布

问题:

I have a stream that emits numbers x. What I want is dx (difference in x) so I need a rolling buffer which emits x_n and x_(n-1) so I can map to dx = x_n - x_(n-1). In a marble diagram this would look like ...

SOURCE --A------B------C--------D------F--G-----

RESULT ---------AB-----BC-------CD-----DF-FG----

This would be handy for other operations like rolling averages etc.

I have checked the operator docs but can't seem to find anything similar. sample is sort of close but is time dependent. buffer is also close but it strictly queues values with no overlap between the buffers.

I'm using RxJS

回答1:

RXJS 4

You maybe don't even need a buffer for this, a simple concatMap might work for you (of course I don't know any details of your stream:

observable = Rx.Observable.from(["A", "B", "C", "D", "E", "F"]);

observable
  .bufferWithCount(2, 1)
  .subscribe(all => {
    console.log(all);
});

See live here



回答2:

RXJS 5

RxJS has the bufferCount operator which works as follows ...

observable = Rx.Observable.from(["A", "B", "C", "D", "E", "F"])

bufferSize = 2
overlap = 1
observable.bufferCount(bufferSize, overlap)
    .subscribe(vals => console.log(vals));

// Results in,
//
// ["A", "B"]
// ["B", "C"]
// ...

overlap is actually the sample frequency so for example in the above case if overlap = 2 then we would get the normal buffer behaviour.



回答3:

Two options that will work regardless of the version:

Better

Pairwise

Rx.Observable.from(["A", "B", "C", "D", "E", "F"])
  .pairwise()
  .subscribe(all => {
    console.log(all);
  });

bufferCount (or bufferWithCount)

This does exist in RxJS 4 as well as RxJS 5

version == 5.* .* ===> bufferCount

version >= 4.1.0 ===> bufferCount

version < 4.1.0 ===> bufferWithCount

Rx.Observable.from(["A", "B", "C", "D", "E", "F"])
  // Buffers of length 2 and skip 1 item (i.e. overlap)
  .bufferCount(2, 1)
  // the last buffer will have only the last item so drop it
  .filter(x => x.length == 2)
  .subscribe(all => {
    console.log(all);
  });

See both here



回答4:

observable = Rx.Observable.from(["A", "B", "C", "D", "E", "F"]);

we can do

observable
  .scan((x, y) => [_.last(x), y], [])
  .filter((x) => x[0] !== undefined)
  .subscribe(all => {
    console.log(all);
  });

or if a hot observable:

observable.
  zip(observable.skip(1))
  .subscribe(all => {
    console.log(all);
  });

or if a cold observable:

observable
  .publish(source_ => source_.zip(source_.skip(1)))
  .subscribe(all => {
    console.log(all);
  });

or easiest:

observable
  .pairwise()
  .subscribe(all => {
    console.log(all);
  });