Use Promise.all() in AWS lambda

2019-07-08 10:01发布

问题:

I don't understand why my promise.all() is not executed in Lambda while in development already work.

On AWS Lambda, it's already timeout because Promise.all() are not completed and I don't receive the new messages in my SQS queue.

However, the console.log(promises) returned : 2017-01-29T22:55:46.191Z 0e82eeaf-e676-11e6-b69d-73a6bbd86272 [ Promise { <pending> } ]

var url = require('url');
var AWS = require('aws-sdk');
// var Promise = require("bluebird");

exports.handler = function (event, context, callback) {

    AWS.config = {
        region: 'us-east-1',
        apiVersions : {
            sqs: '2012-11-05'
        }
    };

    if (typeof Promise === 'undefined') {
        AWS.config.setPromisesDependency(require('bluebird'));
    }

    var sqs = new AWS.SQS(); // système de queue
    var date = new Date();

    var knex = require('knex')({
        client: 'mysql',
        connection: {
            host : event.database.host,
            user : event.database.user,
            password : event.database.pwd,
            database : event.database.name
        }
    });

    var sqsQueueUrl = event.sqsQueueUrl;
    var keywords = event.keywords;

    knex.select('keywords').from('keyword')
        .timeout(1000)
        .then(function(results) 
        {
            var keywordsInDbArray = returnArrayWithOnlyKeywords(results);

            var keywordsWillBeStored = [];

            for (var i in keywords) 
            {
                var isInDb = keywordsInDbArray.indexOf(keywords[i]);

                if(isInDb == -1)
                    keywordsWillBeStored.push({
                        keywords: keywords[i],
                        'created_at': date, 
                        'updated_at': date
                    });
            }
            if(keywordsWillBeStored.length > 0)
            {
                knex.batchInsert('keyword', keywordsWillBeStored, keywordsWillBeStored.length)
                    .then(function(firstId) 
                    { 
                        var newKeywordsStored = dataStoredInDb(firstId, keywordsWillBeStored);
                        console.log(newKeywordsStored);

                        sendKeywordInqueue(newKeywordsStored);
                    })
                    .catch(function(error) 
                    { 
                        console.error(error);
                        context.fail(error);
                        knex.destroy();
                    });
            }
            else
            {
                context.succeed('Aucun nouveau keyword a enregistré');
            }
        })
        .catch(function(error) 
        {
            console.error(error);
            context.fail(error);
            knex.destroy();
        });

    function sendKeywordInqueue(array)
    {
        var promises = [];
        for( var i in array)
        {
            var promise = sqs.sendMessage({
                MessageBody: JSON.stringify(array[i]),
                QueueUrl: sqsQueueUrl
            }).promise().catch(function(error) 
            { 
                console.error(error);
            });
            promises.push(promise);
        }

        console.log(promises);

        Promise.all(promises)
            .catch(function(error) 
            { 
                console.error(error);
            })
            .then(function() 
            {
                callback(null, "DONE");
                context.succeed('Il y a eu '+array.length+' nouveau keywords enregistré et qui va être scrapé');
                knex.destroy();
            });
    }

    function dataStoredInDb(firstIdSavedInDb, arrayWithKeywords) 
    {
        id = firstIdSavedInDb -1;
        return arrayWithKeywords.map(function (obj, i) {
            var rObj = {};
            rObj.id = id + i + 1;
            rObj.keywords = obj.keywords;
            return rObj;
        })
    }

    function returnArrayWithOnlyKeywords(resultDb)
    {
        var dataKeywords = [];

        for(var key in resultDb)
        {
            dataKeywords.push(resultDb[key].keywords);
        }

        return dataKeywords;
    }
};

回答1:

If anyone comes back to this, you need to await Promise.all ie:

const res = await Promise.all([
    Do_Stuff(),
    Do_more_stuff()
    ])
    .then(() => callback(null,{
        'statusCode': 200,
        'headers' : {'Access-Control-Allow-Origin' : '*'},
        'body' : All Good)
      })
    .catch(() => callback('Something went wrong'))
  )


回答2:

I had the same issue and ended up introducing a polyfill until this gets resolved:

function promiseAll(promises) {
  return new Promise((resolve, reject) => {
    const result = [];
    let count = promises.length;
    const checkDone = () => {
      if (--count === 0) resolve(result);
    };
    promises.forEach((promise, i) => {
      promise
        .then(x => {
          result[i] = x;
        }, reject)
        .then(checkDone);
    });
  });
}

[source: https://codereview.stackexchange.com/questions/134224/pseudo-promise-all-polyfill]



回答3:

We have to see your actual lambda handler, but you probably exit the lambda before your async operations finish. Have you tried the callback parameter?

exports.handler = function(event, context, callback) {
    handleWhatever(context, callback);
};

Then after you finished, you can call

callback(null, "DONE");


回答4:

You can try it with this:

var url = require('url');
var AWS = require('aws-sdk');
var knex = require('knex');
AWS.config = {
  region: 'us-east-1',
  apiVersions : {
    sqs: '2012-11-05'
  }
};
var sqs = new AWS.SQS(); // système de queue

exports.handler = function (event, context, callback) {
  var db = knex({
    client: 'mysql',
    connection: {
      host : event.database.host,
      user : event.database.user,
      password : event.database.pwd,
      database : event.database.name
    }
  });

  function destroyDb (out) {
    return db.destroy().catch(function (err) {
      console.error('Unable to close connection to database', err)
    }).return(out)
  }

  return performKeywordQuery(db, event)
  .then(destroyDb)
  .then(context.succeed)
  .catch(function (error) {
    return destroyDb(error).then(context.fail)
  })
};

function performKeywordQuery (db, event) {
  var sqsQueueUrl = event.sqsQueueUrl;
  var keywords = event.keywords;

  return db.select('keywords').from('keyword')
  .timeout(1000)
  .then(function(results) 
  {
    var date = new Date();
    var keywordsInDbArray = returnArrayWithOnlyKeywords(results);

    var keywordsWillBeStored = [];

    for (var i in keywords) 
    {
      var isInDb = keywordsInDbArray.indexOf(keywords[i]);

      if(isInDb == -1)
      keywordsWillBeStored.push({
        keywords: keywords[i],
        created_at: date, 
        updated_at: date
      });
    }
    if (keywordsWillBeStored.length > 0)
    {
      return knex.batchInsert('keyword', keywordsWillBeStored, keywordsWillBeStored.length)
      .then(function(firstId) 
      { 
        var newKeywordsStored = dataStoredInDb(firstId, keywordsWillBeStored);
        console.log(newKeywordsStored);

        return Promise.all(newKeywordsStored.map(function (body) {
          return sqs.sendMessage({
            MessageBody: JSON.stringify(body),
            QueueUrl: sqsQueueUrl
          }).promise()
        }));
      })
      .then(function (array) {
        return 'Il y a eu '+array.length+' nouveau keywords enregistré et qui va être scrapé';
      })
    }
    else
    {
      return 'Aucun nouveau keyword a enregistré';
    }
  })
}

function dataStoredInDb(firstIdSavedInDb, arrayWithKeywords) 
{
  var id = firstIdSavedInDb -1;
  return arrayWithKeywords.map(function (obj, i) {
    var rObj = {};
    rObj.id = id + i + 1;
    rObj.keywords = obj.keywords;
    return rObj;
  })
}

function returnArrayWithOnlyKeywords(resultDb)
{
  var dataKeywords = [];

  for(var key in resultDb)
  {
    dataKeywords.push(resultDb[key].keywords);
  }

  return dataKeywords;
}