break up buffer into size rxjs

2019-08-13 07:14发布

I have an observable get data from stream each time at size 512 each next I have to break it up to 200 char at other observable and keep [12] char in other buffer to concatenate with next block, I solve it by using new subject and for loop, I believe there maybe a better, more pretty solution.

received Observable ----------------------------------------

  • 1st next [512] -------> [112] [200] [200] -------> [200] [200]
  • 2nd next [512][112] --> [24][200][200] [88+112] --> [200] [200]
  • 3rd next [512][24] --> [136] [200] [76+124] .....
  • nth iteration [512][194] --> [106][200][200][106+94] --> [200][200][200]

  • n+1th [512][6].......

maxValueSize = 200
this._sreamRecord$.subscribe(
    {
        next: (val) => {
            const bufferToSend: Buffer = Buffer.concat([completationBuffer, val])
            for (let i = 0; i < bufferToSend.length; i += maxValueSize) {
                if (bufferToSend.length - i > maxValueSize) {
                    bufferStreamer.next(bufferToSend.slice(i, i + maxValueSize))
                } else {
                    completationBuffer = bufferToSend.slice(i, i + maxValueSize)
                }
            }
        },
        complete() {
            if (completationBuffer.length) {
                bufferStreamer.next(completationBuffer)
            }
            bufferStreamer.complete()
        }
    })

1条回答
别忘想泡老子
2楼-- · 2019-08-13 08:21

You may want to consider a solution along these lines

const splitInChunksWithRemainder = (remainder: Array<any>) => {
    return (streamRecord: Array<any>) => {
        const streamRecordWithRemainder = remainder.concat(streamRecord);
        let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
        const last = chunks[chunks.length - 1];
        let newRemainder = [];
        if (last.length != maxValueSize) {
            newRemainder = chunks[chunks.length - 1];
            chunks.length = chunks.length - 1;
        }
        return {chunks, newRemainder};
    };
}

let f = splitInChunksWithRemainder([]);

this._sreamRecord$.pipe(
    switchMap(s => {
        const res = f(s);
        f = splitInChunksWithRemainder(res.newRemainder);
        return from(res.chunks);
    })
)
.subscribe(console.log);

The idea is to split each streamRecord with lodash chunk function after having concatenated the previous remainder, i.e. the array left as tail from the split of the previous streamRecord.

This is done using the function splitInChunksWithRemainder, which is an higher level function, i.e. a function which returns a function, in this case after having set the remainder coming from the previous split.

UPDATE after comment

If you need to emit also the last newRemainder, than you can consider a slightly more complex solution such as the following

const splitInChunksWithRemainder = (remainder: Array<any>) => {
    return (streamRecord: Array<any>) => {
        const streamRecordWithRemainder = remainder.concat(streamRecord);
        let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
        const last = chunks[chunks.length - 1];
        let newRemainder = [];
        if (last.length != maxValueSize) {
            newRemainder = chunks[chunks.length - 1];
            chunks.length = chunks.length - 1;
        }
        return {chunks, newRemainder};
    };
}

const pipeableChain = () => (source: Observable<any>) => {
    let f = splitInChunksWithRemainder([]);
    let lastRemainder: any[];
    return source.pipe(
        switchMap(s => {
            const res = f(s);
            lastRemainder = res.newRemainder;
            f = splitInChunksWithRemainder(lastRemainder);
            return from(res.chunks);
        }),
        concat(defer(() => of(lastRemainder)))
    )
}

_streamRecord$.pipe(
    pipeableChain()
)
.subscribe(console.log);

We have introduced the pipeableChain function. In this function we save the remainder which is returned by the execution of splitInChunksWithRemainder. Once the source Observable completes, we add a last notification via the concat operator. As you see, we have also to use the defer operator to make sure we create the Observable only when the Observer subscribes, i.e. after the source Observable completes. Without defer the Observable passed to concat as parameter would be created when the source Observable is initially subscribed, i.e. when lastRemainder is still undefined.

查看更多
登录 后发表回答