I have an observable get data from stream each time at size 512 each next I have to break it up to 200 char at other observable and keep [12] char in other buffer to concatenate with next block, I solve it by using new subject and for loop, I believe there maybe a better, more pretty solution.
received Observable ----------------------------------------
maxValueSize = 200
this._sreamRecord$.subscribe(
{
next: (val) => {
const bufferToSend: Buffer = Buffer.concat([completationBuffer, val])
for (let i = 0; i < bufferToSend.length; i += maxValueSize) {
if (bufferToSend.length - i > maxValueSize) {
bufferStreamer.next(bufferToSend.slice(i, i + maxValueSize))
} else {
completationBuffer = bufferToSend.slice(i, i + maxValueSize)
}
}
},
complete() {
if (completationBuffer.length) {
bufferStreamer.next(completationBuffer)
}
bufferStreamer.complete()
}
})
You may want to consider a solution along these lines
const splitInChunksWithRemainder = (remainder: Array<any>) => {
return (streamRecord: Array<any>) => {
const streamRecordWithRemainder = remainder.concat(streamRecord);
let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
const last = chunks[chunks.length - 1];
let newRemainder = [];
if (last.length != maxValueSize) {
newRemainder = chunks[chunks.length - 1];
chunks.length = chunks.length - 1;
}
return {chunks, newRemainder};
};
}
let f = splitInChunksWithRemainder([]);
this._sreamRecord$.pipe(
switchMap(s => {
const res = f(s);
f = splitInChunksWithRemainder(res.newRemainder);
return from(res.chunks);
})
)
.subscribe(console.log);
The idea is to split each streamRecord
with lodash
chunk
function after having concatenated the previous remainder, i.e. the array left as tail from the split of the previous streamRecord
.
This is done using the function splitInChunksWithRemainder
, which is an higher level function, i.e. a function which returns a function, in this case after having set the remainder
coming from the previous split.
UPDATE after comment
If you need to emit also the last newRemainder
, than you can consider a slightly more complex solution such as the following
const splitInChunksWithRemainder = (remainder: Array<any>) => {
return (streamRecord: Array<any>) => {
const streamRecordWithRemainder = remainder.concat(streamRecord);
let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
const last = chunks[chunks.length - 1];
let newRemainder = [];
if (last.length != maxValueSize) {
newRemainder = chunks[chunks.length - 1];
chunks.length = chunks.length - 1;
}
return {chunks, newRemainder};
};
}
const pipeableChain = () => (source: Observable<any>) => {
let f = splitInChunksWithRemainder([]);
let lastRemainder: any[];
return source.pipe(
switchMap(s => {
const res = f(s);
lastRemainder = res.newRemainder;
f = splitInChunksWithRemainder(lastRemainder);
return from(res.chunks);
}),
concat(defer(() => of(lastRemainder)))
)
}
_streamRecord$.pipe(
pipeableChain()
)
.subscribe(console.log);
We have introduced the pipeableChain
function. In this function we save the remainder which is returned by the execution of splitInChunksWithRemainder
. Once the source Observable completes, we add a last notification via the concat
operator.
As you see, we have also to use the defer
operator to make sure we create the Observable only when the Observer subscribes, i.e. after the source Observable completes. Without defer
the Observable passed to concat
as parameter would be created when the source Observable is initially subscribed, i.e. when lastRemainder
is still undefined.