I am trying to write a program that gets the documents from a mongo database with mongoose and process them using an API and then edits each document in the database with the results of the processing. My problem is that I have problems because I don't understand completely nodejs and the asynchronous. This is my code:
Model.find(function (err, tweets) {
if (err) return err;
for (var i = 0; i < tweets.length; i++) {
console.log(tweets[i].tweet);
api.petition(tweets[i].tweet)
.then(function(res) {
TweetModel.findOneAndUpdate({_id: tweets[i]._id}, {result: res}, function (err, tweetFound) {
if (err) throw err;
console.log(tweetFound);
});
})
.catch(function(err) {
console.log(err);
})
}
})
The problem is that in the findOneAndUpdate, tweets is undefined so it can't find that id. Any solution? Thanks
The core thing you are really missing is that the Mongoose API methods also use "Promises", but you seem to just be copying from documentation or old examples using callbacks. The solution to this is to convert to using Promises only.
Working with Promises
Model.find({},{ _id: 1, tweet: 1}).then(tweets =>
Promise.all(
tweets.map(({ _id, tweet }) =>
api.petition(tweet).then(result =>
TweetModel.findOneAndUpdate({ _id }, { result }, { new: true })
.then( updated => { console.log(updated); return updated })
)
)
)
)
.then( updatedDocs => {
// do something with array of updated documents
})
.catch(e => console.error(e))
Aside from the general conversion from callbacks, the main change is using Promise.all()
to resolve the ouput from the Array.map()
being processed on the results from .find()
instead of the for
loop. That is actually one of the biggest problems in your attempt, since the for
cannot actually control when the async functions resolve. The other issue is "mixing callbacks", but that is what we are generally addressing here by only using Promises.
Within the Array.map()
we return the Promise
from the API call, chained to the findOneAndUpdate()
which is actually updating the document. We also use new: true
to actually return the modified document.
Promise.all()
allows an "array of Promise" to resolve and return an array of results. These you see as updatedDocs
. Another advantage here is that the inner methods will fire in "parallel" and not in series. This usually means a faster resolution, though it takes a few more resources.
Note also that we use the "projection" of { _id: 1, tweet: 1 }
to only return those two fields from the Model.find()
result because those are the only ones used in the remaining calls. This saves on returning the whole document for each result there when you don't use the other values.
You could simply just return the Promise
from the findOneAndUpdate()
, but I'm just adding in the console.log()
so you can see the output is firing at that point.
Normal production use should do without it:
Model.find({},{ _id: 1, tweet: 1}).then(tweets =>
Promise.all(
tweets.map(({ _id, tweet }) =>
api.petition(tweet).then(result =>
TweetModel.findOneAndUpdate({ _id }, { result }, { new: true })
)
)
)
)
.then( updatedDocs => {
// do something with array of updated documents
})
.catch(e => console.error(e))
Another "tweak" could be to use the "bluebird" implementation of Promise.map()
, which both combines the common Array.map()
to Promise
(s) implementation with the ability to control "concurrency" of running parallel calls:
const Promise = require("bluebird");
Model.find({},{ _id: 1, tweet: 1}).then(tweets =>
Promise.map(tweets, ({ _id, tweet }) =>
api.petition(tweet).then(result =>
TweetModel.findOneAndUpdate({ _id }, { result }, { new: true })
),
{ concurrency: 5 }
)
)
.then( updatedDocs => {
// do something with array of updated documents
})
.catch(e => console.error(e))
An alternate to "parallel" would be executing in sequence. This might be considered if too many results causes too many API calls and calls to write back to the database:
Model.find({},{ _id: 1, tweet: 1}).then(tweets => {
let updatedDocs = [];
return tweets.reduce((o,{ _id, tweet }) =>
o.then(() => api.petition(tweet))
.then(result => TweetModel.findByIdAndUpdate(_id, { result }, { new: true })
.then(updated => updatedDocs.push(updated))
,Promise.resolve()
).then(() => updatedDocs);
})
.then( updatedDocs => {
// do something with array of updated documents
})
.catch(e => console.error(e))
There we can use Array.reduce()
to "chain" the promises together allowing them to resolve sequentially. Note the array of results is kept in scope and swapped out with the final .then()
appended to the end of the joined chain since you need such a technique to "collect" results from Promises resolving at different points in that "chain".
Async/Await
In modern environments as from NodeJS V8.x which is actually the current LTS release and has been for a while now, you actually have support for async/await
. This allows you to more naturally write your flow
try {
let tweets = await Model.find({},{ _id: 1, tweet: 1});
let updatedDocs = await Promise.all(
tweets.map(({ _id, tweet }) =>
api.petition(tweet).then(result =>
TweetModel.findByIdAndUpdate(_id, { result }, { new: true })
)
)
);
// Do something with results
} catch(e) {
console.error(e);
}
Or even possibly process sequentially, if resources are an issue:
try {
let cursor = Model.collection.find().project({ _id: 1, tweet: 1 });
while ( await cursor.hasNext() ) {
let { _id, tweet } = await cursor.next();
let result = await api.petition(tweet);
let updated = await TweetModel.findByIdAndUpdate(_id, { result },{ new: true });
// do something with updated document
}
} catch(e) {
console.error(e)
}
Noting also that findByIdAndUpdate()
can also be used as matching the _id
is already implied so you don't need a whole query document as a first argument.
BulkWrite
As a final note if you don't actually need the updated documents in response at all, then bulkWrite()
is the better option and allows the writes to generally process on the server in a single request:
Model.find({},{ _id: 1, tweet: 1}).then(tweets =>
Promise.all(
tweets.map(({ _id, tweet }) => api.petition(tweet).then(result => ({ _id, result }))
)
).then( results =>
Tweetmodel.bulkWrite(
results.map(({ _id, result }) =>
({ updateOne: { filter: { _id }, update: { $set: { result } } } })
)
)
)
.catch(e => console.error(e))
Or via async/await
syntax:
try {
let tweets = await Model.find({},{ _id: 1, tweet: 1});
let writeResult = await Tweetmodel.bulkWrite(
(await Promise.all(
tweets.map(({ _id, tweet }) => api.petition(tweet).then(result => ({ _id, result }))
)).map(({ _id, result }) =>
({ updateOne: { filter: { _id }, update: { $set: { result } } } })
)
);
} catch(e) {
console.error(e);
}
Pretty much all of the combinations shown above can be varied into this as the bulkWrite()
method takes an "array" of instructions, so you can construct that array from the processed API calls out of every method above.