I'm in trouble with data generation from spark to cassandra using dse 4.5.3
I have a cluster of 8 nodes ( pretty powerfull nodes ) and I want to generate some test data from spark.
My spark job is reading 5M of rows from a cassandra table (it represents one day of data), then is caching them in memory ( 32 GB per Node of Mem, so no problem ) and finally save them n-times in an other cassandra table, to simulate more days of data.
val table = sc.cassandraTable[RecordData]( "data", "one_day" ).cache
val firstDate = table.first.gets_dt_tm
val start = 1
val end = 10
for(i <- start to end){
table.map(row => {
//modify row to increment row timestamp day according to i
java.lang.Thread sleep 2
row
}).saveToCassandra("data","ten_days")
}
I put also a sleep to slow down the write process, but it didn't help. The problem is that in my cluster I get a lot of hints and I'm forced to repair nodes continuously. Keep in mind that I need to generate 600 days of data.
This is the structure of my table
CREATE TABLE ten_days(
YEAR int,
MONTH int,
DAY int,
ID decimal,
... other fields
S_DT_TM timestamp,
PRIMARY KEY ((ID,C_TRX_REF),YEAR,MONTH,DAY,S_DT_TM));
ID and C_TRX_REF are a unique key inside a single day but not across multiple days. The distinct count of (ID,C_TRX_REF) is 5M.
S_DT_TM is a timestamp with second resolution so it's not unique in my dataset.
Why spark write to cassandra is generating hints ? Do you need more info ? What are the best practices to write millions of rows to cassandra from spark ?
Thanks
The sleep in your statement is most likely not actually slowing down the execution of this query. Since the operations are applied on a per partition basis, my guess would be that the sleep simply pauses before an entire partition begin's being written.
For the real issue. The only reason that you will be generating hints is if one of your nodes is unable to keep up with the amount of data being written by your spark job. It means that a node was unreachable during the execution of the mutation so the coordinating node saved a serialized copy of the mutation for when the unreachable node came back online. You can throttle down the batch size to lower the amount of concurrent writes using
or
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
But you can most likely better increase your throughput by making sure all of your harddrives in you cluster are on SSD's and that the commitlog/spark directories are all also on SSDs.