I'm using websocket to receive data frame from hardware. The data frame is defined like this:
0xbb(head) ---data--- 0xee(tail)
the received data is store in Uint8Array, there maybe multiple frame:
var buffer = new Uint8Array([0xbb,0,0,0,0xee,0xbb,1,1,1,0xee,0xbb,3,3,3,0xee]);
and I can convert the array to observable:
var obs= Rx.Observable.from(buffer);
RxMarbles:
--0xbb--0--0--0--0xee--0xbb--1--1--1--0xee--0xbb--2--2--2--0xee------
------------------000------------------111------------------222------
How to use RxJS to split the observable? Which operators to use? Is RxJS the best practice for this situation?
We need to convert the values to async by using
concatMap(.. delay(1))
because otherwise theclosingNotifier
of buffer is running faster then the ingestion of values and you end up with three empty array's. But since you receive these packets from a websocket you are already async.This code is not 100% foolproof, for instance what should happen when the external device does not emit
0xee
? We end up with the next message being concatenated to the previous message.I think you could achieve what you want with
scan()
:This prints the following arrays:
However if you need to emit Observables with each data frame then follow the Mark's answer just use
window()
instead ofbuffer()
.