limiting number of parallel request to cassandra d

2019-08-13 23:17发布

问题:

I currently parsing a file and getting its data in order tu push them in my db. To do that I made an array of query and I execute them through a loop.

The problem is that I'm limited to 2048 parallel requests.

This is the code I made:

index.js=>

const ImportClient = require("./scripts/import_client_leasing")
const InsertDb = require("./scripts/insertDb")

const cassandra = require('cassandra-driver');
const databaseConfig = require('./config/database.json');


const authProvider = new cassandra.auth.PlainTextAuthProvider(databaseConfig.cassandra.username, databaseConfig.cassandra.password);

const db = new cassandra.Client({
    contactPoints: databaseConfig.cassandra.contactPoints,
    authProvider: authProvider
});

ImportClient.clientLeasingImport().then(queries => { // this function parse the data and return an array of query
    return InsertDb.Clients(db, queries);    //inserting in the database returns something when all the promises are done
}).then(result => {
    return db.shutdown(function (err, result) {});
}).then(result => {
    console.log(result);
}).catch(error => {
    console.log(error)
});

insertDb.js =>

module.exports = {
    Clients: function (db, queries) {
        DB = db;
        return insertClients(queries);
    }
}

function insertClients(queries) {
    return new Promise((resolve, reject) => {
        let promisesArray = [];

        for (let i = 0; i < queries.length; i++) {
            promisesArray.push(new Promise(function (resolve, reject) {
                DB.execute(queries[i], function (err, result) {
                    if (err) {
                        reject(err)
                    } else {
                        resolve("success");
                    }
                });
            }));
        }
        Promise.all(promisesArray).then((result) => {
            resolve("success");
        }).catch((error) => {
            resolve("error");
        });
    });
}

I tried multiple things, like adding an await function thats set a timout in my for loop every x seconds (but it doesn't work because i'm already in a promise), i also tried with p-queue and p-limit but it doesn't seems to work either.

I'm kinda stuck here, I'm think I'm missing something trivial but I don't really get what.

Thanks

回答1:

When submitting several requests in parallel (execute() function uses asynchronous execution), you end up queueing at one of the different levels: on the driver side, on the network stack or on the server side. Excessive queueing affects the total time it takes each operation to complete. You should limit the amount of simultaneous requests at any time, also known as concurrency level, to get high throughput and low latency.

When thinking about implementing it in your code, you should consider launching a fixed amount of asynchronous executions, using your concurrency level as a cap and only adding new operations once executions within that cap completed.

Here is an example on how to limit the amount of concurrent executions when processing items in a loop: https://github.com/datastax/nodejs-driver/blob/master/examples/concurrent-executions/execute-in-loop.js

In a nutshell:

// Launch in parallel n async operations (n being the concurrency level)
for (let i = 0; i < concurrencyLevel; i++) {
  promises[i] = executeOneAtATime();
}

// ...
async function executeOneAtATime() {
  // ...
  // Execute queries asynchronously in sequence
  while (counter++ < totalLength) {;
    await client.execute(query, params, options);
  }
}


回答2:

Ok, so I found a workaround to reach my goal. I wrote in a file all my queries

const fs = require('fs')
fs.appendFileSync('my_file.cql', queries[i] + "\n");

and i then used

child_process.exec("cqls --file my_file", function(err, stdout, stderr){})"

to insert in cassandra all my queries