Cassandra slowed down with more nodes

2019-05-21 12:56发布

问题:

I set up a Cassandra cluster on AWS. What I want to get is increased I/O throughput (number of reads/writes per second) as more nodes are added (as advertised). However, I got exactly the opposite. The performance is reduced as new nodes are added.

Do you know any typical issues that prevents it from scaling?

Here is some details:

I am adding a text file (15MB) to the column family. Each line is a record. There are 150000 records. When there is 1 node, it takes about 90 seconds to write. But when there are 2 nodes, it takes 120 seconds. I can see the data is spread to 2 nodes. However, there is no increase in throughput.

The source code is below:

public class WordGenCAS {
static final String KEYSPACE = "text_ks";
static final String COLUMN_FAMILY = "text_table";
static final String COLUMN_NAME = "text_col";

public static void main(String[] args) throws Exception {
    if (args.length < 2) {
        System.out.println("Usage: WordGenCAS <input file> <host1,host2,...>");
        System.exit(-1);
    }

    String[] contactPts = args[1].split(",");

    Cluster cluster = Cluster.builder()
            .addContactPoints(contactPts)
            .build();
    Session session = cluster.connect(KEYSPACE);

    InputStream fis = new FileInputStream(args[0]);
    InputStreamReader in = new InputStreamReader(fis, "UTF-8");
    BufferedReader br = new BufferedReader(in);

    String line;
    int lineCount = 0;
    while ( (line = br.readLine()) != null) {
        line = line.replaceAll("'", " ");
        line = line.trim();
        if (line.isEmpty())
            continue;
        System.out.println("[" + line + "]");
        String cqlStatement2 = String.format("insert into %s (id, %s) values (%d, '%s');",
                COLUMN_FAMILY,
                COLUMN_NAME,
                lineCount,
                line);
        session.execute(cqlStatement2);
        lineCount++;
    }

    System.out.println("Total lines written: " + lineCount);
}

}

The DB schema is the following:

CREATE KEYSPACE text_ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };

USE text_ks;

CREATE TABLE text_table (
    id int,
    text_col text,
    primary key (id)
) WITH COMPACT STORAGE;

Thanks!

回答1:

Even if this an old post, I think it's worth posting a solution for these (common) kind of problems.

As you've already discovered, loading data with a serial procedure is slow. What you've been suggested is the right thing to do.

However, issuing a lot of queries without applying some sort of back pressure is likely looking for troubles, and you'll gonna lose data due to excessive overload on the server (and on the driver to some extent).

This solution will load data with async calls, and will try to apply some back pressure on the client to avoid data loss.

public class WordGenCAS {
    static final String KEYSPACE = "text_ks";
    static final String COLUMN_FAMILY = "text_table";
    static final String COLUMN_NAME = "text_col";

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.out.println("Usage: WordGenCAS <input file> <host1,host2,...>");
            System.exit(-1);
        }

        String[] contactPts = args[1].split(",");

        Cluster cluster = Cluster.builder()
                .addContactPoints(contactPts)
                .build();
        Session session = cluster.connect(KEYSPACE);

        InputStream fis = new FileInputStream(args[0]);
        InputStreamReader in = new InputStreamReader(fis, "UTF-8");
        BufferedReader br = new BufferedReader(in);

        String line;
        int lineCount = 0;

        // This is the futures list of our queries
        List<Future<ResultSet>> futures = new ArrayList<>();

        // Loop
        while ( (line = br.readLine()) != null) {
            line = line.replaceAll("'", " ");
            line = line.trim();
            if (line.isEmpty())
                continue;
            System.out.println("[" + line + "]");
            String cqlStatement2 = String.format("insert into %s (id, %s) values (%d, '%s');",
                    COLUMN_FAMILY,
                    COLUMN_NAME,
                    lineCount,
                    line);
            lineCount++;

            // Add the "future" returned by async method the to the list
            futures.add(session.executeAsync(cqlStatement2));

            // Apply some backpressure if we issued more than X query.
            // Change X to another value suitable for your cluster
            while (futures.size() > 1000) {
                Future<ResultSet> future = futures.remove(0);
                try {
                    future.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        System.out.println("Total lines written: " + lineCount);
        System.out.println("Waiting for writes to complete...");

        // Wait until all writes are done.
        while (futures.size() > 0) {
            Future<ResultSet> future = futures.remove(0);
            try {
                future.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        System.out.println("Done!");
    }
}