How to convert a stream into a generator without l

2019-04-10 01:28发布

问题:

I have a stream and I need to convert it to a generator, so an uploader can consume the generic generator.

This means turning:

stream.on('data', chunk => ...);

to:

generator = streamGenerator(stream);
chunk = await generator.next()
...

better yet:

chunk = yield streamGenerator;

Overall my best attempt requires leaking the resolve from a promise and I'd like to avoid that:

function streamToIterable(chunkSize, stream) {
    let collector = [];
    let value = [];
    let done = false;
    let _resolve;
    let promise = new Promise(resolve => _resolve = resolve);
    stream.on('data', chunk => {
        collector = collector.concat(chunk);
        if (value.length >= chunkSize) {
            value = collector.splice(0, chunkSize);
            _resolve(value);
            stream.pause();
        }
    });
    stream.on('end', () => {
        _resolve(collection);

        // With done set to true, the next iteration well ignore 'value' and end the loop
        done = true;
    });
    stream.resume();

    return {
        next: () => ({
            value: promise.then(() => {
                stream.resume();
                promise = new Promise(resolve => _resolve = resolve);
            }),
            done,
        }),
    };
}

function* streamToGenerator(stream) {
    const iterator = streamToIterable(stream);
    let next = iterator.next();
    while (!next.done) {
        yield next.value;
    }
};

Usage in a generator for uploading chunks:

for (const chunkData of generator()) {
    let result = yield uploadPost(url, formData, onChunkProgress(chunkIndex));

This is in a redux-saga, so "next()" isn't called on the generator until the return promise is resolved.

回答1:

You cannot avoid storing the resolve function in a mutable variable if you want to use a single event listener that resolves different promises. You could simplify the promise creation by using the once method similar to the following:

function streamToIterator(stream) {
    let done = false;
    const end = new Promise(resolve => {
        stream.once('end', resolve);
    }).then(e => {
        done = true;
    });

    return {
        [Symbol.iterator]() { return this; }
        next() {
            const promise = new Promise(resolve => {
                stream.once('data', value => {
                    resolve(value);
                    stream.pause();
                });
                stream.resume();
            });

            return {
                value: Promise.race([promise, end]),
                done,
            };
        }),
    };
}

Of course, you are doing the racing between end and data yourself, you resume the stream before next is called the first time and most importantly you do the chunking yourself, so this might to be applicable to your situation.

Apart from that, I'd recommend to check out the buffering internals of node.js streams, it might be easier to read chunks of certain sizes using a lower-level API than data events.

Also you definitely should have a look at the asynchronous iteration proposal for es-next. The iterable interface you're trying to implement is very similar, and surely they either already have or really would welcome an example of making a node readablestream iterable.



回答2:

EDIT: this answer is only required if you have a volatile stream that doesn't pause right away, and therefor also doesn't have an event system that supports "once". It also allows asyncronous yielding.

I greatly changed my previous answer and this one works.

This one uses two arrays; one of promises and another of resolves, which allows a queue of data that is bi-directional.

So if you iterate faster than the stream, all promises well resolve when they receive data and also if you stream faster than you iterate, you'll have promises to resolve from the iterator.

function streamToAsyncIterator(chunkSize, stream) {
    let done = false;
    let endPromise = new Promise(resolve => {
        //flush out the last data.
        stream.on('end', () => {
            resolve({ value: collector, done: false });
        });
    });

    //two-track queue for expecting and sending data with promises
    let dataPromises = [];
    let dataResolves = [];
    stream.on('data', value => {
        const dataResolve = dataResolves.shift();
        if (dataResolve) {
            dataResolve({ value, done: false });
        } else {
            dataPromises.push(Promise.resolve({ value, done: false }));
        }
        stream.pause();
    });

    return {
        [Symbol.asyncIterator]() {
            return this;
        },
        //TODO handle return() to close the stream
        next() {
            if (done) return Promise.resolve({ done });

            stream.resume();

            let dataPromise = dataPromises.shift();
            if (!dataPromise) {
                dataPromise = new Promise(resolve => dataResolves.push(resolve));
            }

            return Promise.race([dataPromise, endPromise])
                // done must be set in the resolution of the race, or done could complete the generator before the last iteration of data.
                .then(next => {
                     if (next.done) {
                         done = true;
                         next.done = false;
                     }
                     return next;
                });
        },
    };
}

async function* streamToAsyncGenerator(chunkSize, stream) {
    const iterator = streamToAsyncIterator(chunkSize, stream);
    let next = await iterator.next();
    while (!next.done) {
        yield next.value;
        // Delete is needed to release resouces
        // Without delete, you'll get a memory error at 2GB.
        delete next.value;
        next = await iterator.next();
    }
};

EDIT: I removed the collector, which has nothing to do with the question and I added the delete which is necessary, because GC doesn't appear to run with an array of iterators. This should be the final answer as it works swell for me.