Run Plenty of promises sequentially

2019-08-14 05:23发布

问题:

A nodejs project.I've tried to run plenty(about 100k) task sequently with promises. What I can do is converting it to a workOnebyOne function with Q. Is there a better way to do this?

function workOnebyOne(items, worker) {
  var deferred = Q.defer()

  function _doNext() {
    if (items.length === 0) {
      deferred.resolve()
      return
    }
    var item = items[0]
    synchronize(worker, item)
      .then(function (result) {
        items = items.slice(1)
        deferred.notify({
          item: item,
          result: result
        })
        _doNext()
      }, function () {
        items = items.slice(1)
        _doNext()
      })
  }

  _doNext()

  return deferred.promise
}

utils.workOnebyOne(tasks, workerFunction)

回答1:

You are basically re-implementing queuing here. In Bluebird promises (Which are also much faster and consume a lot less memory which helps with 100K tasks) you'd use Promise.each.

In Q you can typically use .reduce on the tasks array to queue them at once - however with 100K elements creating a 100K promise queue in Q promises would crash node (again, this is Q, Bluebird or when promises) would handle it just fine). This (incorrect here) solution would look something like:

var res = tasks.reduce(function(p, c){
    return p.then(function(){ return workerFunction(c); });
}, Q());

For short queues ( < 500 promises in Q) this works nicely.

So because of the old library choice and because of the large number of promises involved you can't realistically solve it elegantly, using a callback queue like approach is pretty close to your only way. I'd also avoid notify as it is being removed (even from Q) and is generally a bad API (doesn't compose well).



回答2:

I've spent some time finding simple and elegant solution. I have found only some hints and discussions but without ready example. Finally I have found discussion on https://github.com/kriskowal/q/issues/606 and as a result what have worked for me can be isolated and generalised like this:

function workOneByOne(items, someAsyncFuntionReturningPromise) {
  var lastResultPromise = items
    .map(function(item) {
      return function(previousResult) {
        /*
         * this function has to:
         * - process result from previous item processed if any
         * - call the async job
         * - return promise of the job done
         */
        if (previousResult) {
          // process result here
        }

        return someAsyncFuntionReturningPromise(item);
    }})
    .reduce(Q.when, Q());

    return lastResultPromise;
}

and in case there is no function returning promise available you can call above with

workOneByOne(items, Q.nfbind(someAsyncFunctionWithCallback))


回答3:

Ultra simple solution if await / async is available.

// functions is an array of functions that return a promise.
async function runInSequence(functions) {
  const results = [];

  for (const fn of functions) {
    results.push(await fn());
  }

  return results;
}

And we can use it like this:

/**
 * Waits "delay" miliseconds before resolving the promise.
 * @param {Number} delay The time it takes to resolve the promise.
 * @param {any} value The resolving value.
 */
function promisedFunction(delay, value) {
  return new Promise(resolve => {
    setTimeout(() => resolve(value), delay);
  });
}

console.time("execution");
runInSequence([
    promisedFunction.bind(this, 1000, 1),
    promisedFunction.bind(this, 1000, 2),
    promisedFunction.bind(this, 1000, 3)
]).then(results => {
    console.timeEnd("execution");
    console.log(results);
});

Working RunKit example here.