I have built an importer for MongoDB and Cassandra. Basically all operations of the importer are the same, except for the last part where data gets formed to match the needed cassandra table schema and wanted mongodb document structure. The write performance of Cassandra is really bad compared to MongoDB and I think I'm doing something wrong.
Basically, my abstract importer class loads the data, reads out all data and passes it to the extending MongoDBImporter or CassandraImporter class to send data to the databases. One database is targeted at a time - no "dual" inserts to both C* and MongoDB at the same time. The importer is run on the same machine against the same number of nodes (6).
The Problem:
MongoDB import finished after 57 minutes. I ingested 10.000.000 documents and I expect about the same amount of rows for Cassandra. My Cassandra importer is now running since 2,5 hours and is only at 5.000.000 inserted rows. I will wait for the importer to finish and edit the actual finish time in here.
How I import with Cassandra:
I prepare two statements once before ingesting data. Both statements are UPDATE queries because sometimes I have to append data to an existing list. My table is cleared completely before starting the import. The prepared statements get used over and over again.
PreparedStatement statementA = session.prepare(queryA);
PreparedStatement statementB = session.prepare(queryB);
For every row, I create a BoundStatement and pass that statement to my "custom" batching method:
BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B
bs = bs.bind();
//add data... with several bs.setXXX(..) calls
cassandraConnection.executeBatch(bs);
With MongoDB, I can insert 1000 Documents (thats the maximum) at a time without problems. For Cassandra, the importer crashes with com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
for just 10 of my statements at some point. I'm using this code to build the batches. Btw, I began with 1000, 500, 300, 200, 100, 50, 20 batch size before but obviously they do not work too. I then set it down to 10 and it threw the exception again. Now I'm out of ideas why it's breaking.
private static final int MAX_BATCH_SIZE = 10;
private Session session;
private BatchStatement currentBatch;
...
@Override
public ResultSet executeBatch(Statement statement) {
if (session == null) {
throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);
}
if (currentBatch == null) {
currentBatch = new BatchStatement(Type.UNLOGGED);
}
currentBatch.add(statement);
if (currentBatch.size() == MAX_BATCH_SIZE) {
ResultSet result = session.execute(currentBatch);
currentBatch = new BatchStatement(Type.UNLOGGED);
return result;
}
return null;
}
My C* schema looks like this
CREATE TYPE stream.event (
data_dbl frozen<map<text, double>>,
data_str frozen<map<text, text>>,
data_bool frozen<map<text, boolean>>,
);
CREATE TABLE stream.data (
log_creator text,
date text, //date of the timestamp
ts timestamp,
log_id text, //some id
hour int, //just the hour of the timestmap
x double,
y double,
events list<frozen<event>>,
PRIMARY KEY ((log_creator, date, hour), ts, log_id)
) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)
I sometimes need to add further new events to an existing row. That's why I need a List of UDTs. My UDT contains three maps because the event creators produce different data (key/value pairs of type string/double/boolean). I am aware of the fact that the UDTs are frozen and I can not touch the maps of already ingested events. That's fine for me, I just need to add new events that have the same timestamp sometimes. I partition on the creator of the logs (some sensor name) as well as the date of the record (ie. "22-09-2016") and the hour of the timestamp (to distribute data more while keeping related data close together in a partition).
I'm using Cassandra 3.0.8 with the Datastax Java Driver, version 3.1.0 in my pom.
According to What is the batch limit in Cassandra?, I should not increase the batch size by adjusting batch_size_fail_threshold_in_kb
in my cassandra.yaml
. So... what do or what's wrong with my import?
UPDATE So I have adjusted my code to run async queries and store the currently running inserts in a list. Whenever an async insert finishes, it will be removed from the list. When the list size exceeds a threshold and an error occured in an insert before, the method will wait 500ms until the inserts are below the threshold. My code is now automatically increasing the threshold when no insert failed.
But after streaming 3.300.000 rows, there were 280.000 inserts being processed but no error happened. This seems number of currently processed inserts looks too high. The 6 cassandra nodes are running on commodity hardware, which is 2 years old.
Is this the high number (280.000 for 6 nodes) of concurrent inserts a problem? Should I add a variable like MAX_CONCURRENT_INSERT_LIMIT
?
private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...
@Override
public void executeBatch(Statement statement) throws InterruptedException {
if (this.runningInsertList == null) {
this.runningInsertList = new ArrayList<>();
}
//Sleep while the currently processing number of inserts is too high
while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
Thread.sleep(concurrentInsertSleepTime);
}
ResultSetFuture future = this.executeAsync(statement);
this.runningInsertList.add(future);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
runningInsertList.remove(future);
}
@Override
public void onFailure(Throwable t) {
concurrentInsertErrorOccured = true;
}
}, MoreExecutors.sameThreadExecutor());
if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
concurrentInsertLimit += 2000;
LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit));
}
return;
}