Iterating over a mongodb cursor serially (waiting

2019-01-30 14:14发布

Using mongoskin, I can do a query like this, which will return a cursor:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

      }
}

However, I'd like to call some async functions for each document, and only move on to the next item on the cursor after this has called back (similar to the eachSeries structure in the async.js module). E.g:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

            externalAsyncFunction(result, function(err) {
               //externalAsyncFunction completed - now want to move to next doc
            });

      }
}  

How could I do this?

Thanks

UPDATE:

I don't wan't to use toArray() as this is a large batch operation, and the results might not fit in memory in one go.

8条回答
甜甜的少女心
2楼-- · 2019-01-30 14:36

A more modern approach that uses async/await:

const cursor = db.collection("foo").find({});
while(await cursor.hasNext()) {
  const doc = await cursor.next();
  // process doc here
}

Notes:

  • This may be even more simple to do when async iterators arrive.
  • You'll probably want to add try/catch for error checking.
  • The containing function should be async or the code should be wrapped in (async function() { ... })() since it uses await.
  • If you want, add await new Promise(resolve => setTimeout(resolve, 1000)); (pause for 1 second) at the end of the while loop to show that it does process docs one after the other.
查看更多
劳资没心,怎么记你
3楼-- · 2019-01-30 14:48

If someone is looking for a Promise way of doing this (as opposed to using callbacks of nextObject), here it is. I am using Node v4.2.2 and mongo driver v2.1.7. This is kind of an asyncSeries version of Cursor.forEach():

function forEachSeries(cursor, iterator) {
  return new Promise(function(resolve, reject) {
    var count = 0;
    function processDoc(doc) {
      if (doc != null) {
        count++;
        return iterator(doc).then(function() {
          return cursor.next().then(processDoc);
        });
      } else {
        resolve(count);
      }
    }
    cursor.next().then(processDoc);
  });
}

To use this, pass the cursor and an iterator that operates on each document asynchronously (like you would for Cursor.forEach). The iterator needs to return a promise, like most mongodb native driver functions do.

Say, you want to update all documents in the collection test. This is how you would do it:

var theDb;
MongoClient.connect(dbUrl).then(function(db) {
  theDb = db;     // save it, we'll need to close the connection when done.
  var cur = db.collection('test').find();

  return forEachSeries(cur, function(doc) {    // this is the iterator
    return db.collection('test').updateOne(
      {_id: doc._id},
      {$set: {updated: true}}       // or whatever else you need to change
    );
    // updateOne returns a promise, if not supplied a callback. Just return it.
  });
})
.then(function(count) {
  console.log("All Done. Processed", count, "records");
  theDb.close();
})
查看更多
登录 后发表回答