How can I Interleave / merge async iterables?

2019-02-12 17:04发布

问题:

Suppose I have some asnyc iterable objects like this:

// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
  setTimeout(() => resolve(ms), ms);
});

const a = {
  [Symbol.asyncIterator]: async function * () {
    yield 'a';
    await sleep(1000);
    yield 'b';
    await sleep(2000);
    yield 'c';
  }, 
};

const b = {
  [Symbol.asyncIterator]: async function * () {
    await sleep(6000);
    yield 'i';
    yield 'j';
    await sleep(2000);
    yield 'k';
  }, 
};

const c = {
  [Symbol.asyncIterator]: async function * () {
    yield 'x';
    await sleep(2000);
    yield 'y';
    await sleep(8000);
    yield 'z';
    await sleep(10000);
    throw new Error('You have gone too far! ');
  }, 
};

Now, suppose I can concat them like this:

const abcs = async function * () {
  yield * a;
  yield * b;
  yield * c;
};

The (first 9) items yielded will be:

(async () => {
  const limit = 9;
  let i = 0; 
  const xs = [];
  for await (const x of abcs()) {
    xs.push(x);
    i++;
    if (i === limit) {
      break;
    }
  }
  console.log(xs);
})().catch(error => console.error(error));

// [ 'a', 'b', 'c', 'i', 'j', 'k', 'x', 'y', 'z' ]

But imagine that I do not care about the order, that a, b and c yield at different speeds, and that I want to yield as quickly as possible.

How can I rewrite this loop so that xs are yielded as soon as possible, ignoring order?


It is also possible that a, b or c are infinite sequences, so the solution must not require all elements to be buffered into an array.

回答1:

There is no way to write this with a loop statement. async/await code always executes sequentially, to do things concurrently you need to use promise combinators directly. For plain promises, there's Promise.all, for async iterators there is nothing (yet) so we need to write it on our own:

async function* combine(iterable) {
    const asyncIterators = Array.from(iterable, o => o[Symbol.asyncIterator]());
    const results = [];
    let count = asyncIterators.length;
    const never = new Promise(() => {});
    function getNext(asyncIterator, index) {
        return asyncIterator.next().then(result => ({
            index,
            result,
        }));
    }
    const nextPromises = asyncIterators.map(getNext);
    while (count) {
        const {index, result} = await Promise.race(nextPromises);
        if (result.done) {
            nextPromises[index] = never;
            results[index] = result.value;
            count--;
        } else {
            nextPromises[index] = getNext(asyncIterators[index], index);
            yield result.value;
        }
    }
    return results;
}

Notice that combine does not support passing values into next or cancellation through .throw or .return.

You can call it like

(async () => {
  for await (const x of combine([a, b, c])) {
    console.log(x);
  }
})().catch(console.error);


回答2:

If I change abcs to accept the generators to process, I come up with this, see inline comments:

const abcs = async function * (...gens) {
  // Worker function to queue up the next result
  const queueNext = async (e) => {
    e.result = null; // Release previous one as soon as possible
    e.result = await e.it.next();
    return e;
  };
  // Map the generators to source objects in a map, get and start their
  // first iteration
  const sources = new Map(gens.map(gen => [
    gen,
    queueNext({
      key: gen,
      it:  gen[Symbol.asyncIterator]()
    })
  ]));
  // While we still have any sources, race the current promise of
  // the sources we have left
  while (sources.size) {
    const winner = await Promise.race(sources.values());
    // Completed the sequence?
    if (winner.result.done) {
      // Yes, drop it from sources
      sources.delete(winner.key);
    } else {
      // No, grab the value to yield and queue up the next
      // Then yield the value
      const {value} = winner.result;
      sources.set(winner.key, queueNext(winner));
      yield value;
    }
  }
};

Live Example:

// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
  setTimeout(() => resolve(ms), ms);
});

const a = {
  [Symbol.asyncIterator]: async function * () {
    yield 'a';
    await sleep(1000);
    yield 'b';
    await sleep(2000);
    yield 'c';
  }, 
};

const b = {
  [Symbol.asyncIterator]: async function * () {
    await sleep(6000);
    yield 'i';
    yield 'j';
    await sleep(2000);
    yield 'k';
  }, 
};

const c = {
  [Symbol.asyncIterator]: async function * () {
    yield 'x';
    await sleep(2000);
    yield 'y';
    await sleep(8000);
    yield 'z';
  }, 
};

const abcs = async function * (...gens) {
  // Worker function to queue up the next result
  const queueNext = async (e) => {
    e.result = null; // Release previous one as soon as possible
    e.result = await e.it.next();
    return e;
  };
  // Map the generators to source objects in a map, get and start their
  // first iteration
  const sources = new Map(gens.map(gen => [
    gen,
    queueNext({
      key: gen,
      it:  gen[Symbol.asyncIterator]()
    })
  ]));
  // While we still have any sources, race the current promise of
  // the sources we have left
  while (sources.size) {
    const winner = await Promise.race(sources.values());
    // Completed the sequence?
    if (winner.result.done) {
      // Yes, drop it from sources
      sources.delete(winner.key);
    } else {
      // No, grab the value to yield and queue up the next
      // Then yield the value
      const {value} = winner.result;
      sources.set(winner.key, queueNext(winner));
      yield value;
    }
  }
};

(async () => {
  console.log("start");
  for await (const x of abcs(a, b, c)) {
    console.log(x);
  }
  console.log("done");
})().catch(error => console.error(error));
.as-console-wrapper {
  max-height: 100% !important;
}



回答3:

I solved this using async generators. (I wish I'd find this question a few days ago, would save me some time) Will gladly hear opinion and criticism.

async function* mergen(...gens) {
  const promises = gens.map((gen, index) =>
    gen.next().then(p => ({...p, gen}))
  );

  while (promises.length > 0) {
    yield race(promises).then(({index, value: {value, done, gen}}) => {
      promises.splice(index, 1);
      if (!done)
        promises.push(
          gen.next().then(({value: newVal, done: newDone}) => ({
            value: newVal,
            done: newDone,
            gen
          }))
        );
      return value;
    });
  }
};

// Needed to implement race to provide index of resolved promise
function race(promises) {
  return new Promise(resolve =>
    promises.forEach((p, index) => {
      p.then(value => {
        resolve({index, value});
      });
    })
  );
}

It took me a bunch of time to find and I got so excited I put it in a npm package :) https://www.npmjs.com/package/mergen



回答4:

I hope I understood your question correctly, here's how I'd approach it:

let results = [];

Promise.all([ a, b, c ].map(async function(source) {
    for await (let item of source) {
        results.push(item);
    }
}))
.then(() => console.log(results));

I tried it with three normal arrays:

var a = [ 1, 2, 3 ];
var b = [ 4, 5, 6 ];
var c = [ 7, 8, 9 ];

And it resulted in [1, 4, 7, 2, 5, 8, 3, 6, 9].