I'm using websocket to receive data frame from hardware.
The data frame is defined like this:
0xbb(head) ---data--- 0xee(tail)
the received data is store in Uint8Array, there maybe multiple frame:
var buffer = new Uint8Array([0xbb,0,0,0,0xee,0xbb,1,1,1,0xee,0xbb,3,3,3,0xee]);
and I can convert the array to observable:
var obs= Rx.Observable.from(buffer);
RxMarbles:
--0xbb--0--0--0--0xee--0xbb--1--1--1--0xee--0xbb--2--2--2--0xee------
------------------000------------------111------------------222------
How to use RxJS to split the observable?
Which operators to use?
Is RxJS the best practice for this situation?
const source = Rx.Observable
.from(['0xbb','0','0','0','0xee','0xbb','1','1','1','0xee','0xbb','3','3','3','0xee'])
.concatMap(i => Rx.Observable.of(i).delay(1));
source
.filter(i => i != '0xee' && i != '0xbb')
.buffer(source.filter(i => i === '0xee'))
.subscribe(val => console.log(val));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>
We need to convert the values to async by using concatMap(.. delay(1))
because otherwise the closingNotifier
of buffer is running faster then the ingestion of values and you end up with three empty array's. But since you receive these packets from a websocket you are already async.
This code is not 100% foolproof, for instance what should happen when the external device does not emit 0xee
? We end up with the next message being concatenated to the previous message.
I think you could achieve what you want with scan()
:
const buffer = new Uint8Array([0xbb,0,0,0,0xee,0xbb,1,1,1,0xee,0xbb,3,3,3,0xee]);
const obs = Observable.from(buffer);
obs.scan((acc, v) => {
if (v === 0xbb) {
return [v];
} else {
acc.push(v);
return acc;
}
}, [])
.filter(acc => acc[acc.length - 1] === 0xee)
.subscribe(console.log);
This prints the following arrays:
[ 187, 0, 0, 0, 238 ]
[ 187, 1, 1, 1, 238 ]
[ 187, 3, 3, 3, 238 ]
However if you need to emit Observables with each data frame then follow the Mark's answer just use window()
instead of buffer()
.