DataStax Enterprise: saveToCassandra generate a lo

2019-04-15 12:31发布

I'm in trouble with data generation from spark to cassandra using dse 4.5.3

I have a cluster of 8 nodes ( pretty powerfull nodes ) and I want to generate some test data from spark.

My spark job is reading 5M of rows from a cassandra table (it represents one day of data), then is caching them in memory ( 32 GB per Node of Mem, so no problem ) and finally save them n-times in an other cassandra table, to simulate more days of data.

 val table = sc.cassandraTable[RecordData]( "data", "one_day" ).cache
    val firstDate = table.first.gets_dt_tm
    val start = 1
    val end = 10
    for(i <- start to end){

        table.map(row => {
            //modify row to increment row timestamp day according to i

            java.lang.Thread sleep 2
            row

       }).saveToCassandra("data","ten_days")
    }

I put also a sleep to slow down the write process, but it didn't help. The problem is that in my cluster I get a lot of hints and I'm forced to repair nodes continuously. Keep in mind that I need to generate 600 days of data.

This is the structure of my table

CREATE TABLE ten_days(
YEAR int,
MONTH int,
DAY int,
ID decimal,
... other fields
S_DT_TM timestamp,
PRIMARY KEY  ((ID,C_TRX_REF),YEAR,MONTH,DAY,S_DT_TM));

ID and C_TRX_REF are a unique key inside a single day but not across multiple days. The distinct count of (ID,C_TRX_REF) is 5M.

S_DT_TM is a timestamp with second resolution so it's not unique in my dataset.

Why spark write to cassandra is generating hints ? Do you need more info ? What are the best practices to write millions of rows to cassandra from spark ?

Thanks

enter image description here

1条回答
Summer. ? 凉城
2楼-- · 2019-04-15 13:09

The sleep in your statement is most likely not actually slowing down the execution of this query. Since the operations are applied on a per partition basis, my guess would be that the sleep simply pauses before an entire partition begin's being written.

For the real issue. The only reason that you will be generating hints is if one of your nodes is unable to keep up with the amount of data being written by your spark job. It means that a node was unreachable during the execution of the mutation so the coordinating node saved a serialized copy of the mutation for when the unreachable node came back online. You can throttle down the batch size to lower the amount of concurrent writes using

spark.cassandra.output.batch.size.rows: number of rows per single batch; default is 'auto' which means the connector will adjust the number of rows based on the amount of data in each row

or

spark.cassandra.output.batch.size.bytes: maximum total size of the batch in bytes; defaults to 64 kB.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

But you can most likely better increase your throughput by making sure all of your harddrives in you cluster are on SSD's and that the commitlog/spark directories are all also on SSDs.

查看更多
登录 后发表回答