I need to build a function for processing large CSV files for use in a bluebird.map() call. Given the potential sizes of the file, I'd like to use streaming.
This function should accept a stream (a CSV file) and a function (that processes the chunks from the stream) and return a promise when the file is read to end (resolved) or errors (rejected).
So, I start with:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});
var db = pgp(api.config.mailroom.fileMakerDbConfig);
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
Now, I have two inter-related issues:
- I need to throttle the actual amount of data being processed, so as to not create memory pressures.
- The function passed as the
processor
param is going to often be async, such as saving the contents of the file to the db via a library that is promise-based (right now:pg-promise
). As such, it will create a promise in memory and move on, repeatedly.
The pg-promise
library has functions to manage this, like page(), but I'm not able to wrap my ahead around how to mix stream event handlers with these promise methods. Right now, I return a promise in the handler for readable
section after each read()
, which means I create a huge amount of promised database operations and eventually fault out because I hit a process memory limit.
Does anyone have a working example of this that I can use as a jumping point?
UPDATE: Probably more than one way to skin the cat, but this works:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
Anyone sees a potential problem with this approach?
Find below a complete application that correctly executes the same kind of task as you want: It reads a file as a stream, parses it as a CSV and inserts each row into the database.
Note that the only thing I changed: using library
csv-parse
instead ofcsv
, as a better alternative.Added use of method stream.read from the spex library, which properly serves a Readable stream for use with promises.
You might want to look at promise-streams
Works with backpressure and everything.
So to say you don't want streaming but some kind of data chunks? ;-)
Do you know https://github.com/substack/stream-handbook?
I think the simplest approach without changing your architecture would be some kind of promise pool. e.g. https://github.com/timdp/es6-promise-pool