Massive inserts with pg-promise

2019-05-19 05:39发布

问题:

I'm using pg-promise and I want to make multiple inserts to one table. I've seen some solutions like Multi-row insert with pg-promise and How do I properly insert multiple rows into PG with node-postgres?, and I could use pgp.helpers.concat in order to concatenate multiple selects.

But now, I need to insert a lot of measurements in a table, with more than 10,000 records, and in https://github.com/vitaly-t/pg-promise/wiki/Performance-Boost says: "How many records you can concatenate like this - depends on the size of the records, but I would never go over 10,000 records with this approach. So if you have to insert many more records, you would want to split them into such concatenated batches and then execute them one by one."

I read all the article but I can't figure it out how to "split" my inserts into batches and then execute them one by one.

Thanks!

回答1:

UPDATE

Best is to read the following article: Data Imports.


As the author of pg-promise I was compelled to finally provide the right answer to the question, as the one published earlier didn't really do it justice.

In order to insert massive/infinite number of records, your approach should be based on method sequence, that's available within tasks and transactions.

var cs = new pgp.helpers.ColumnSet(['col_a', 'col_b'], {table: 'tableName'});

// returns a promise with the next array of data objects,
// while there is data, or an empty array when no more data left
function getData(index) {
    if (/*still have data for the index*/) {
        // - resolve with the next array of data
    } else {
        // - resolve with an empty array, if no more data left
        // - reject, if something went wrong
    }        
}

function source(index) {
    var t = this;
    return getData(index)
        .then(data => {
            if (data.length) {
                // while there is still data, insert the next bunch:
                var insert = pgp.helpers.insert(data, cs);
                return t.none(insert);
            }
            // returning nothing/undefined ends the sequence
        });
}

db.tx(t => t.sequence(source))
    .then(data => {
        // success
    })
    .catch(error => {
        // error
    });

This is the best approach to inserting massive number of rows into the database, from both performance point of view and load throttling.

All you have to do is implement your function getData according to the logic of your app, i.e. where your large data is coming from, based on the index of the sequence, to return some 1,000 - 10,000 objects at a time, depending on the size of objects and data availability.

See also some API examples:

  • spex -> sequence
  • Linked and Detached Sequencing
  • Streaming and Paging

Related question: node-postgres with massive amount of queries.


And in cases where you need to acquire generated id-s of all the inserted records, you would change the two lines as follows:

// return t.none(insert);
return t.map(insert + 'RETURNING id', [], a => +a.id);

and

// db.tx(t => t.sequence(source))
db.tx(t => t.sequence(source, {track: true}))

just be careful, as keeping too many record id-s in memory can create an overload.



回答2:

I think the naive approach would work.

Try to split your data into multiple pieces of 10,000 records or less. I would try splitting the array using the solution from this post.

Then, multi-row insert each array with pg-promise and execute them one by one in a transaction.

Edit : Thanks to @vitaly-t for the wonderful library and for improving my answer.

Also don't forget to wrap your queries in a transaction, or else it will deplete the connections.

To do this, use the batch function from pg-promise to resolve all queries asynchronously :

// split your array here to get splittedData
int i = 0 
var cs = new pgp.helpers.ColumnSet(['col_a', 'col_b'], {table: 'tmp'})

// values = [..,[{col_a: 'a1', col_b: 'b1'}, {col_a: 'a2', col_b: 'b2'}]]
let queries = []
for (var i = 0; i < splittedData.length; i++) {
   var query = pgp.helpers.insert(splittedData[i], cs)
   queries.push(query)
}

db.tx(function () {
   this.batch(queries)
})
.then(function (data) {
   // all record inserted successfully ! 
}
.catch(function (error) {
    // error;
});