I want read big file line by line and insert the data in DB store.
My functon return a Promise inside it created stream, and resolve it when event stream.on('end')
was invoked, but it is not I really want, because in stream.on('data')
it produce Promise.map()
on each line, and I want to be sure, that all insertion ops completed before resolve()
invoked. How I can produce right chain in this case?
var loadFromFile = (options) => new Promise(function (resolve, reject) {
let stream = fs.createReadStream(options.filePath, {
flags: 'r',
});
stream.on('data', (chunk) => {
/* process data chunk to string Array */
Promise.map(lines, (line) => {
/* process and insert each line here */
})
.catch((err)=>{
reject(err);
});
});
stream.on('end', () => {
if (someBuisnessLogicCheckHere) {
reject('Invalid data was found in file');
}
resolve(); // Here I am not sure, that all inserts for each chunk in Promise.map was completed
});
});
If you want to be sure you don't resolve until all of the promises in the mapping operation have resolved, wait for the promise that Promise.map
returns; see comments:
var loadFromFile = (options) => new Promise(function(resolve, reject) {
let stream = fs.createReadStream(options.filePath, {
flags: 'r',
});
let promises = []; // Array of promises we'll wait for
stream.on('data', (chunk) => {
promises.push( // Remember this promise
Promise.map(lines, (line) => {
/* process and insert each line here */
})
.catch((err) => {
reject(err);
})
);
});
stream.on('end', () => {
if (someBuisnessLogicCheckHere) {
reject('Invalid data was found in file');
}
Promise.all(promises).then(() => { resolve()}); // Wait for it before resolving
});
});
Note that I didn't just do Promise.all(promises).then(resolve);
, because that will pass an array of resolutions to resolve
, which your original code didn't share with the caller.
If there's a possibility the array will get quite large while being filled with most already-resolved promises, you could remove them proactively as they resolve and only wait on the ones that are left at the end.
If you need to process the file with automated mechanisms of pause/resume in the pipe method, take a look at scramjet.
Your example would look much clearer, see:
var loadFromFile = (options) =>
fs.createReadStream(options.filePath, {
flags: 'r',
})
.split(/\r?\n/)
// split by line by regex
.parse((line) => aPromiseReturningFunction(line))
// will wait until promises are resolved
.map((data) => doSomeAsyncAndReturnPromise(data))
// will wait as above
.accumulate((acc, resolved) => acc.push(resolved), [])
.then((arr) => {
if (someBuisnessLogicCheckHere) {
return Promise.reject('Invalid data was found in file');
}
// Here all your items are saved (i.e. all Promises resolved)
});
});