I'm running a knex seed
in Node and need to batch an additional query to my database due to restrictions on my server. I'm starting to get the hang of promises and async/await, but I'm having trouble getting it to work at several levels deep (what's throwing me off in particular at this point is that it seems to interfere with the batching in a way that I can't quite make sense of). My seed
file looks like this:
exports.seed = async function(knex) {
const fs = require('fs');
const _ = require('lodash');
function get_event_id(location) {
return knex('events')
.where({location: location})
.first()
.then(result => { return result['id']; })
.finally(() => { knex.destroy() })
}
function createImage(row, event_id) {
return {
name: row[4],
event_id: event_id
}
};
async function run_query(line) {
let row = line.split(',');
let event_id = await get_event_id(row[0]);
return createImage(row, event_id);
};
async function run_batch(batch) {
return Promise.all(batch.map(run_query));
}
const file = fs.readFileSync('./data.csv');
const lines = file.toString().replace(/[\r]/g, '').split('\n').slice(1,60); // skip csv header, then run first 59 lines
const batches = _.chunk(lines, 30); // set batch size
let images = await Promise.all(batches.map(run_batch));
console.log(_.flatten(images).length);
};
My database can handle 30 queries at a time. Everything resolves properly if I run a single batch using .slice(1,30)
on the line where lines
is defined. But running with 60 as above gives me ER_TOO_MANY_USER_CONNECTIONS: User already has more than 'max_user_connections' active connections
.
The script completes if I change the content of run_batch
to return batch.map(run_query)
, and it returns the correct number of entries (so it seems to be batching properly). But then the Promises are still pending. What am I missing, and is there a more elegant way to do this?
In this line:
You are trying to run ALL the batches in parallel which is defeating your chunking entirely.
You could use a regular
for
loop withawait
instead of the.map()
so you runva batch, wait for it to finish, then run the next batch.FYI, you might benefit from any number of functions people have written for processing a large array with no more than N requests in flight at the same time. These do not require you to manually break the data into batches. Instead, they monitor how many requests are in-flight at the same time and they start up your desired number of requests and as one finishes, they start another one, collecting the results for you.
runN(fn, limit, cnt, options)
: Loop through an API on multiple requestspMap(array, fn, limit)
: Make several requests to an api that can only handle 20 at a timerateLimitMap(array, requestsPerSec, maxInFlight, fn)
: Proper async method for max requests per secondmapConcurrent(array, maxConcurrent, fn)
: Promise.all() consumes all my ramThere are also features to do this built into the Bluebird promise library and the Async-promises library.