How can I improve this nested asynchronous loop?

2019-08-29 04:54发布

The problem is as follows. I have an array of objects like so:

let myObj = [
{'db1':['doc1','doc2','doc3']},
{'db2':['doc4','doc5']},
{'db3':['doc7','doc8','doc9','doc10']}
]

Note that this is a data structure I decided to use for the problem and can be changed if it can improve the overall implementation. The actual db and doc Ids are read from a text file formatted as below.

"db1","doc1"
"db1","doc2"
...

My app will iterate through the db list synchronously. Inside each db iteration, there will be an asynchronous iteration of the document list. Each document will be retrieved, processed and saved back to the db.

So basically at any given instance: one db, but multiple documents.

I have a working implementation of the above like so:

dbIterator: the synchronous outer loop to iterate dbs. The callback passed to docIterator will trigger the next iteration.

const dbIterator = function (x) {
  if (x < myObj.length) {
    let dbObj = myObj[x];
    let dbId = Object.keys(dbObj)[0];
    docIterator(dbId, dbObj[dbId], ()=>merchantIterator(x+1));
  } else {
    logger.info('All dbs processed');
  }
};

docIterator: the asynchronous loop to iterate docs. The callback cb is called after all documents are processed. This is tracked via the docsProcessed and docsToBeProcessed variables

const docIterator = function(dbId, docIds, cb){
  //create connection
  targetConnection = //some config for connection to dbId
  let docsProcessed = 0;
  let docsToBeProcessed = docIds.length;

  //asynchronous iteration of documents
  docIds.forEach((docId)=>{
    getDocument(docId, targetConnection).then((doc)=>{
      //process document
      processDoc(doc, targetConnection).then(()=>{
        //if processing is successful
        if (++docsProcessed >= docsToBeProcessed) {
          cb();
        }
      })
       //if processing fails
      .catch((e) => {
        logger.error('error when processing document');
        if (++docsProcessed >= docsToBeProcessed) {
          cb();
        }
      });

    }).catch((e)=>{
      logger.error('error when retrieving document: ');
      if (++docsProcessed >= docsToBeProcessed) {
        cb();
      }
    });
  });
};

processDoc: used to process and save an individual document. This returns a promise that gets resolved when the document processing is done which in turn increments docsProcessed and conditionally (docsProcessed >= docsToBeProcessed) calls the call back passed into docIterator

const processDoc = function(doc, targetConnection) {

  return new Promise(function(resolve, reject) {
    if(shouldThisDocBeProcessed(doc){
      let updatedDoc = logic(doc);
      targetConnection.insert(updatedDoc, updatedDoc._id,
        function (error, response) {
          if (!error){
            logger.info('updated successfully');
          } else {
            logger.error('error when saving doc');
          }
          resolve();
        }
      );
    } else {
      resolve();
    }
  })
};

This works as expected but for me this implementation is sub-optimal and messy. I'm pretty sure this can be improved upon and most importantly a chance to better understand and implement solutions to synchronous and asynchronous problems.

I'm open to constructive criticism. So how can this be improved?

1条回答
我欲成王,谁敢阻挡
2楼-- · 2019-08-29 05:30

Maybe something like this?

An example implementation of throttle can be found here.

//this should be available in both modules so you can filter
const Fail = function(details){this.details=details;};
// docIterator(dbId,docIds)
// .then(
//   results =>{
//     const failedResults = results.filter(
//       result => (result&&result.constructor)===Failed
//     );
//     const successfullResults = results.filter(
//       result => (result&&result.constructor)!==Failed
//     );
//   }
// )

const docIterator = function(dbId, docIds){
  //create connection
  // targetConnection = //some config for connection to dbId
  let docsProcessed = 0;
  let docsToBeProcessed = docIds.length;
  //asynchronous iteration of documents
  docIds.map(
    docId =>
      new Promise(
        (resolve,reject) =>
          //if you use throttled you can do:
          // max10(
          //   ([docId,targetConnection])=>
          //     getDocument(docId,targetConnection)
          // )([docId, targetConnection])
          getDocument(docId, targetConnection)
      )
      .then(
        doc =>
          //if this returns nothing then maybe you'd like to return the document
          processDoc(doc, targetConnection)
          .then(
            _ => doc
          )
      )
      .catch(
        err => new fail([err,docId])
      )

  )
};
查看更多
登录 后发表回答