I have an observable get data from stream each time at size 512 each next I have to break it up to 200 char at other observable and keep [12] char in other buffer to concatenate with next block, I solve it by using new subject and for loop, I believe there maybe a better, more pretty solution.
received Observable ----------------------------------------
- 1st next [512] -------> [112] [200] [200] -------> [200] [200]
- 2nd next [512][112] --> [24][200][200] [88+112] --> [200] [200]
- 3rd next [512][24] --> [136] [200] [76+124] .....
nth iteration [512][194] --> [106][200][200][106+94] --> [200][200][200]
n+1th [512][6].......
maxValueSize = 200
this._sreamRecord$.subscribe(
{
next: (val) => {
const bufferToSend: Buffer = Buffer.concat([completationBuffer, val])
for (let i = 0; i < bufferToSend.length; i += maxValueSize) {
if (bufferToSend.length - i > maxValueSize) {
bufferStreamer.next(bufferToSend.slice(i, i + maxValueSize))
} else {
completationBuffer = bufferToSend.slice(i, i + maxValueSize)
}
}
},
complete() {
if (completationBuffer.length) {
bufferStreamer.next(completationBuffer)
}
bufferStreamer.complete()
}
})
You may want to consider a solution along these lines
The idea is to split each
streamRecord
withlodash
chunk
function after having concatenated the previous remainder, i.e. the array left as tail from the split of the previousstreamRecord
.This is done using the function
splitInChunksWithRemainder
, which is an higher level function, i.e. a function which returns a function, in this case after having set theremainder
coming from the previous split.UPDATE after comment
If you need to emit also the last
newRemainder
, than you can consider a slightly more complex solution such as the followingWe have introduced the
pipeableChain
function. In this function we save the remainder which is returned by the execution ofsplitInChunksWithRemainder
. Once the source Observable completes, we add a last notification via theconcat
operator. As you see, we have also to use thedefer
operator to make sure we create the Observable only when the Observer subscribes, i.e. after the source Observable completes. Withoutdefer
the Observable passed toconcat
as parameter would be created when the source Observable is initially subscribed, i.e. whenlastRemainder
is still undefined.