How to convert Node.js async streaming callback in

2020-03-28 17:38发布

问题:

I have a function that streams data in batches via a callback.

Each batch will await the callback function before fetching another batch and the entire function returns a promise that resolves when all batches are finished.

(I'm using TypeScript annotations to help with readability)

async function callbackStream(fn: (batch: Array<number>) => Promise<void>) {}

How do I to turn this function into an async generator that yields one value at a time?

async function* generatorStream(): AsyncIterableIterator<number> {}

This has proven to be quite a difficult task.

I've toyed around with this problem and I've built something that works, but its very convoluted and I can't justify merging this code and making others on my team deal with it.


Here's my current implementation:

I'm using this helper function that created a "deferred" promise which helps with passing promises around callbacks.

interface DeferredPromise<T> {
    resolve: (value: T) => void
    reject: (error: any) => void
    promise: Promise<T>
}

function deferred<T>(): DeferredPromise<T> {
    let resolve
    let reject
    const promise = new Promise<T>((res, rej) => {
        resolve = res
        reject = rej
    })
    return {
        resolve: resolve as (value: T) => void,
        reject: reject as (error: any) => void,
        promise,
    }
}

Next I have this hairball of logic that linearizes the promise callbacks into a chain where each promise resolved a batch with next function that will return another promise fetching the next batch.

type Done = { done: true }
type More = { done: false; value: Array<number>; next: () => Promise<Result> }
type Result = More | Done

async function chainedPromises() {
    let deferred = PromiseUtils.deferred<Result>()

    callbackStream(async batch => {
        const next = PromiseUtils.deferred<null>()
        deferred.resolve({
            done: false,
            value: batch,
            next: () => {
                deferred = PromiseUtils.deferred<Result>()
                next.resolve(null)
                return deferred.promise
            },
        })
        await next.promise
    }).then(() => {
        deferred.resolve({ done: true })
    })

    return deferred.promise
}

From here, creating a generator that yields one item at a time isn't very difficult:

async function* generatorStream(): AsyncIterableIterator<number> {
    let next = chainedPromises
    while (true) {
        const result = await next()
        if (result.done) {
            return
        }
        for (const item of result.value) {
            yield item
        }
        next = result.next
    }
}

I think we can all agree that the intermediate chainedPromises function is very confusing and convoluted. Is there any way I can transform callbackStream into generatorStream in a way that is easy to understand and easy to follow? I don't mind using a library if its well established, but I would also appreciate a simple implementation from first-principles.

回答1:

No, I don't think there's a way to implement this transformation in a way that's easy to understand and easy to follow. However, I would recommend to drop the deferreds (you're never rejecting anyway) and just use the promise constructor. Also I'd rather implement an asynchronous generator right away.

function queue() {
    let resolve = () => {};
    const q = {
        put() {
            resolve();
            q.promise = new Promise(r => { resolve = r; });
        },
        promise: null,
    }
    q.put(); // generate first promise
    return q;
}
function toAsyncIterator(callbackStream) {
    const query = queue();
    const result = queue();
    const end = callbackStream(batch => {
        result.put(batch);
        return query.promise;
    }).then(value => ({value, done: true}));
    end.catch(e => void e); // prevent unhandled promise rejection warnings
    return {
        [Symbol.asyncIterator]() { return this; },
        next(x) {
            query.put(x);
            return Promise.race([
                end,
                result.promise.then(value => ({value, done:false})
            ]);
        }
    }
}
async function* batchToAsyncIterator(batchCallbackStream) {
    for await (const batch of toAsyncIterator(batchCallbackStream)) {
        // for (const val of batch) yield val;
        // or simpler:
        yield* batch;
    }
}