How do I obtain a rolling buffer of the last two i

2019-07-11 04:09发布

I have a stream that emits numbers x. What I want is dx (difference in x) so I need a rolling buffer which emits x_n and x_(n-1) so I can map to dx = x_n - x_(n-1). In a marble diagram this would look like ...

SOURCE --A------B------C--------D------F--G-----

RESULT ---------AB-----BC-------CD-----DF-FG----

This would be handy for other operations like rolling averages etc.

I have checked the operator docs but can't seem to find anything similar. sample is sort of close but is time dependent. buffer is also close but it strictly queues values with no overlap between the buffers.

I'm using RxJS

4条回答
姐就是有狂的资本
2楼-- · 2019-07-11 04:34

RXJS 4

You maybe don't even need a buffer for this, a simple concatMap might work for you (of course I don't know any details of your stream:

observable = Rx.Observable.from(["A", "B", "C", "D", "E", "F"]);

observable
  .bufferWithCount(2, 1)
  .subscribe(all => {
    console.log(all);
});

See live here

查看更多
Evening l夕情丶
3楼-- · 2019-07-11 04:38

RXJS 5

RxJS has the bufferCount operator which works as follows ...

observable = Rx.Observable.from(["A", "B", "C", "D", "E", "F"])

bufferSize = 2
overlap = 1
observable.bufferCount(bufferSize, overlap)
    .subscribe(vals => console.log(vals));

// Results in,
//
// ["A", "B"]
// ["B", "C"]
// ...

overlap is actually the sample frequency so for example in the above case if overlap = 2 then we would get the normal buffer behaviour.

查看更多
姐就是有狂的资本
4楼-- · 2019-07-11 04:44

Two options that will work regardless of the version:

Better

Pairwise

Rx.Observable.from(["A", "B", "C", "D", "E", "F"])
  .pairwise()
  .subscribe(all => {
    console.log(all);
  });

bufferCount (or bufferWithCount)

This does exist in RxJS 4 as well as RxJS 5

version == 5.* .* ===> bufferCount

version >= 4.1.0 ===> bufferCount

version < 4.1.0 ===> bufferWithCount

Rx.Observable.from(["A", "B", "C", "D", "E", "F"])
  // Buffers of length 2 and skip 1 item (i.e. overlap)
  .bufferCount(2, 1)
  // the last buffer will have only the last item so drop it
  .filter(x => x.length == 2)
  .subscribe(all => {
    console.log(all);
  });

See both here

查看更多
放荡不羁爱自由
5楼-- · 2019-07-11 04:56
observable = Rx.Observable.from(["A", "B", "C", "D", "E", "F"]);

we can do

observable
  .scan((x, y) => [_.last(x), y], [])
  .filter((x) => x[0] !== undefined)
  .subscribe(all => {
    console.log(all);
  });

or if a hot observable:

observable.
  zip(observable.skip(1))
  .subscribe(all => {
    console.log(all);
  });

or if a cold observable:

observable
  .publish(source_ => source_.zip(source_.skip(1)))
  .subscribe(all => {
    console.log(all);
  });

or easiest:

observable
  .pairwise()
  .subscribe(all => {
    console.log(all);
  });
查看更多
登录 后发表回答