Cassandra cluster with bad insert performance and

2020-02-05 09:21发布

I have to store around 250 numerical values per second, per client, which is around 900k numbers per hour. It probably will not be a full-day recording (probably between 5-10 hours a day), but I will partition my data based on the client id and the day the reading is made. The maximum row length comes at about 22-23M which is still manageable. Neverteless, my scheme looks like this:

CREATE TABLE measurement (
  clientid text,
  date text,
  event_time timestamp,
  value int,
  PRIMARY KEY ((clientid,date), event_time)
);

The keyspace has a replication factor of 2, just for testing, the snitch is GossipingPropertyFileSnitch and NetworkTopologyStrategy. I know that replication factor 3 is more production standard.

Next up, I created a small cluster on the companies servers, three bare metal virtualized machines with 2 CPUs x 2 cores and 16GB of RAM and a lot of space. I'm in gigabit LAN with them. The cluster is operational, based on the nodetool.

Here is the code I'm using to test my setup:

        Cluster cluster = Cluster.builder()
                .addContactPoint("192.168.1.100")
                .addContactPoint("192.168.1.102")
                .build();
        Session session = cluster.connect();
        DateTime time = DateTime.now();
        BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true);

    try {

        ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts

        String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)";
        PreparedStatement preparedStatement = session.prepare(insertQuery);
        BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also

        //generating the entries
        for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements
            time = time.plus(4); //4ms between each entry
            BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important
            batch.add(bound);

            //The batch statement must have 65535 statements at most
            if (batch.size() >= 65534) {
                queryQueue.put(batch);
                batch = new BatchStatement();
            }
        }
        queryQueue.put(batch); //the last batch, perhaps shorter than 65535

        //storing the data
        System.out.println("Starting storing");
        while (!queryQueue.isEmpty()) {
            pool.execute(() -> {
                try {

                    long threadId = Thread.currentThread().getId();
                    System.out.println("Started: " + threadId);
                    BatchStatement statement = queryQueue.take();
                    long start2 = System.currentTimeMillis();
                    session.execute(statement);
                    System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2));
                } catch (Exception ex) {
                    System.out.println(ex.toString());
                }
            });

        }
        pool.shutdown();
        pool.awaitTermination(120,TimeUnit.SECONDS);


    } catch (Exception ex) {
        System.out.println(ex.toString());
    } finally {
        session.close();
        cluster.close();
    }

I came up with the code by reading posts here and on other blogs and websites. As I understood it is important for the client to use multiple threads, that's why I have done this. I also tried using async operations.

The bottom line result is this, no matter which approach I use, one batch executes in 5-6 seconds, although it might take up to 10. It takes the same if I enter just one batch (so, only ~65k columns) or if I use a dumb single thread application. Honestly, I expected a bit more. Especially since I get more or less similar performance on my laptop with a local instance.

The second, maybe more important issue, are the exceptions I am facing in an unpredictable manner. These two:

com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency ONE (1 replica were required but only 0 acknowledged the write)

and

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.1.102:9042 (com.datastax.driver.core.TransportException: [/192.168.1.102:9042] Connection has been closed), /192.168.1.100:9042 (com.datastax.driver.core.TransportException: [/192.168.1.100:9042] Connection has been closed), /192.168.1.101:9042 (com.datastax.driver.core.TransportException: [/192.168.1.101:9042] Connection has been closed))

In the bottom line, am I doing something wrong? Should I reorganize the way I load data, or change the scheme. I tried reducing the row length (so I have 12-hour rows) but that didn't make a big difference.

============================== Update:

I was rude and forgot to paste an example of the code I used after the question was answered. It works reasonably well, however I'm continuing my research with KairosDB and binary transfer with Astyanax. It looks like I can get much better performance with them over CQL, although KairosDB can have some issues when it is in overload (but I'm working on it) and Astyanax is a bit verbose to use for my taste. Nevertheless, here is the code, I'm maybe mistaken somewhere.

The semaphore slot number has no effect on performance when going above 5000, its almost constant.

String insertQuery = "insert into keyspace.measurement     (userid,time_by_hour,time,value) values (?, ?, ?, ?)";
        PreparedStatement preparedStatement =     session.prepare(insertQuery);
        Semaphore semaphore = new Semaphore(15000);

    System.out.println("Starting " + Thread.currentThread().getId());
    DateTime time = DateTime.parse("2015-01-05T12:00:00");
    //generating the entries
    long start = System.currentTimeMillis();

    for (int i = 0; i < 900000; i++) { 

        BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important
        semaphore.acquire();
        ResultSetFuture resultSetFuture = session.executeAsync(statement);
        Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) {

                semaphore.release();
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("Error: " + throwable.toString());
                semaphore.release();
            }
        });
        time = time.plus(4); //4ms between each entry
    }

1条回答
手持菜刀,她持情操
2楼-- · 2020-02-05 09:34

What are your results using unlogged batching? Are you sure you want to use batch statements at all? https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e

查看更多
登录 后发表回答