Processing large CSV uploads in Node.js

2019-01-19 07:50发布

Per a prior thread here:

Node async loop - how to make this code run in sequential order?

...I'm looking for broader advice on processing large data upload files.

Scenario:

User uploads a very large CSV file with hundreds-of-thousands to millions of rows. It's streaming into an endpoint using multer:

const storage = multer.memoryStorage();
const upload = multer({ storage: storage });

router.post("/", upload.single("upload"), (req, res) => {
    //...
});

Each row is transformed into a JSON object. That object is then mapped into several smaller ones, which need to be inserted into several different tables, spread out across, and accessed by, various microservice containers.

async.forEachOfSeries(data, (line, key, callback) => {
    let model = splitData(line);
    //save model.record1, model.record2, etc. sequentially
});

It's obvious I'm going to run into memory limitations with this approach. What is the most efficient manner for doing this?

1条回答
叛逆
2楼-- · 2019-01-19 08:06

To avoid memory issues you need to process the file using streams - in plain words, incrementally.

You can do this with a combination of a CSV stream parser, to stream the binary contents as CSV rows and through2, a stream utility that allows you to control the flow of the stream.

The Process

The process goes as follows:

  • You acquire a stream to the data
  • You pipe it through the CSV parser
  • You pipe it through a through2
  • You save each row in your database
  • When you're done saving, call cb() to move on to the next item.

I'm not familiar with multer but here's an example that uses a stream from a File.

const fs = require('fs')
const csv = require('csv-stream')
const through2 = require('through2')

const stream = fs.createReadStream('foo.csv')
  .pipe(csv.createStream({
      endLine : '\n',
      columns : ['Year', 'Make', 'Model'],
      escapeChar : '"',
      enclosedChar : '"'
  }))
  .pipe(through2({ objectMode: true }, (row, enc, cb) => {
    // - `row` holds the first row of the CSV,
    //   as: `{ Year: '1997', Make: 'Ford', Model: 'E350' }`
    // - The stream won't process the *next* item unless you call the callback
    //  `cb` on it.
    // - This allows us to save the row in our database/microservice and when
    //   we're done, we call `cb()` to move on to the *next* row.
    saveIntoDatabase(row).then(() => {
      cb(null, true)
    })
    .catch(err => {
      cb(err, null)
    })
  }))
  .on('data', data => {
    console.log('saved a row')
  })
  .on('end', () => {
    console.log('end')
  })
  .on('error', err => {
    console.error(err)
  })

// Mock function that emulates saving the row into a database,
// asynchronously in ~500 ms
const saveIntoDatabase = row =>
  new Promise((resolve, reject) =>
    setTimeout(() => resolve(), 500))

The example foo.csv CSV is this:

1997,Ford,E350
2000,Mercury,Cougar
1998,Ford,Focus
2005,Jaguar,XKR
1991,Yugo,LLS
2006,Mercedes,SLK
2009,Porsche,Boxter
2001,Dodge,Viper

Notes

  • This approach avoids having to load the entire CSV in-memory. As soon as a row is processed it goes out of scope/becomes unreacheable, hence it's eligible for Garbage Collection. This is what makes this approach so memory efficient. Read the Streams Handbook for more info on streams.
  • You probably want to save/process more than 1 row per cycle. In that case push some rows into an Array, process/save the entire Array and then call cb to move on to the next chunk - repeating the process.
  • Streams emit events that you can listen on. The end/error events are particularly useful for responding back whether the operation was a success or a failure.
  • Express works with streams by default - I'm almost certain you don't need multer at all.
查看更多
登录 后发表回答