I want to take 3 last elements from an observable. Let's say that my timeline looks like this:
--a---b-c---d---e---f-g-h-i------j->
where: a, b, c, d, e, f, g, h, i, j are emitted values
Whenever a new value is emitted I want to get it immediately, so it can look like this:
[a]
[a, b]
[a, b, c]
[b, c, d]
[c, d, e]
[d, e, f]
[e, f, g]
[f, g, h]
... and so on
I think that this is super useful. Imagine building a chat where you want to display 10 last messages. Whenever a new message comes you want to update your view.
My attempt: demo
You can use scan
for this:
from(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u'])
.pipe(
scan((acc, val) => {
acc.push(val);
return acc.slice(-3);
}, []),
)
.subscribe(console.log);
This will print:
[ 'a' ]
[ 'a', 'b' ]
[ 'a', 'b', 'c' ]
[ 'b', 'c', 'd' ]
[ 'c', 'd', 'e' ]
...
[ 's', 't', 'u' ]
The bufferCount
won't do what you want. It'll emit only when each buffer is exactly === 3
which means you won't get any emission until you post at least 3 messages.
Jan 2019: Updated for RxJS 6
You can look at Observable#bufferCount function. One difference is that it wants at least 3 times to emit (first parameter, in this example).
const source = Rx.Observable.interval(1000);
const example = source.bufferCount(3,1)
const subscribe = example.subscribe(val => console.log(val));
<script src="https://unpkg.com/@reactivex/rxjs@5.4.3/dist/global/Rx.js"></script>