I'm using Spark 1.6.1, Cassandra 2.2.3 and Cassandra-Spark connector 1.6. .
I already tried to write to multi node cluster but with replication_factor:1
.
Now, I'm trying to write to 6-node cluster with one seed one and keyspace which has replication_factor
> 1 but Spark is not responding and he is refusing to do that.
As I mention, it works when I'm writing to coordinator with keyspace set to 1
.
This is an log which I'm getting and it always stops here or after half an hour he starts to cleaning accumulators and stops on fourth again.
16/08/16 17:07:03 INFO NettyUtil: Found Netty's native epoll transport in the classpath, using it
16/08/16 17:07:04 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
16/08/16 17:07:04 INFO LocalNodeFirstLoadBalancingPolicy: Added host 127.0.0.1 (datacenter1)
16/08/16 17:07:04 INFO Cluster: New Cassandra host /127.0.0.2:9042 added
16/08/16 17:07:04 INFO LocalNodeFirstLoadBalancingPolicy: Added host 127.0.0.2 (datacenter1)
16/08/16 17:07:04 INFO Cluster: New Cassandra host /127.0.0.3:9042 added
16/08/16 17:07:04 INFO LocalNodeFirstLoadBalancingPolicy: Added host 127.0.0.3 (datacenter1)
16/08/16 17:07:04 INFO Cluster: New Cassandra host /127.0.0.4:9042 added
16/08/16 17:07:04 INFO LocalNodeFirstLoadBalancingPolicy: Added host 127.0.0.4 (datacenter1)
16/08/16 17:07:04 INFO Cluster: New Cassandra host /127.0.0.5:9042 added
16/08/16 17:07:04 INFO LocalNodeFirstLoadBalancingPolicy: Added host 127.0.0.5 (datacenter1)
16/08/16 17:07:04 INFO Cluster: New Cassandra host /127.0.0.6:9042 added
16/08/16 17:07:04 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
16/08/16 17:07:05 INFO SparkContext: Starting job: take at CassandraRDD.scala:121
16/08/16 17:07:05 INFO DAGScheduler: Got job 3 (take at CassandraRDD.scala:121) with 1 output partitions
16/08/16 17:07:05 INFO DAGScheduler: Final stage: ResultStage 4 (take at CassandraRDD.scala:121)
16/08/16 17:07:05 INFO DAGScheduler: Parents of final stage: List()
16/08/16 17:07:05 INFO DAGScheduler: Missing parents: List()
16/08/16 17:07:05 INFO DAGScheduler: Submitting ResultStage 4 (CassandraTableScanRDD[17] at RDD at CassandraRDD.scala:18), which has no missing parents
16/08/16 17:07:05 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 8.3 KB, free 170.5 KB)
16/08/16 17:07:05 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 4.2 KB, free 174.7 KB)
16/08/16 17:07:05 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:43680 (size: 4.2 KB, free: 756.4 MB)
16/08/16 17:07:05 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1006
16/08/16 17:07:05 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (CassandraTableScanRDD[17] at RDD at CassandraRDD.scala:18)
16/08/16 17:07:05 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
16/08/16 17:07:05 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 204, localhost, partition 0,NODE_LOCAL, 22553 bytes)
16/08/16 17:07:05 INFO Executor: Running task 0.0 in stage 4.0 (TID 204)
16/08/16 17:07:06 INFO Executor: Finished task 0.0 in stage 4.0 (TID 204). 2074 bytes result sent to driver
16/08/16 17:07:06 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 204) in 1267 ms on localhost (1/1)
16/08/16 17:07:06 INFO DAGScheduler: ResultStage 4 (take at CassandraRDD.scala:121) finished in 1.276 s
16/08/16 17:07:06 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
16/08/16 17:07:06 INFO DAGScheduler: Job 3 finished: take at CassandraRDD.scala:121, took 1.310929 s
16/08/16 17:07:06 INFO SparkContext: Starting job: take at CassandraRDD.scala:121
16/08/16 17:07:06 INFO DAGScheduler: Got job 4 (take at CassandraRDD.scala:121) with 4 output partitions
16/08/16 17:07:06 INFO DAGScheduler: Final stage: ResultStage 5 (take at CassandraRDD.scala:121)
16/08/16 17:07:06 INFO DAGScheduler: Parents of final stage: List()
16/08/16 17:07:06 INFO DAGScheduler: Missing parents: List()
16/08/16 17:07:06 INFO DAGScheduler: Submitting ResultStage 5 (CassandraTableScanRDD[17] at RDD at CassandraRDD.scala:18), which has no missing parents
16/08/16 17:07:06 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 8.4 KB, free 183.1 KB)
16/08/16 17:07:06 INFO MemoryStore: Block broadcast_8_piece0 stored as byt es in memory (estimated size 4.2 KB, free 187.3 KB)
16/08/16 17:07:06 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:43680 (size: 4.2 KB, free: 756.3 MB)
16/08/16 17:07:06 INFO SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:1006
16/08/16 17:07:06 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 5 (CassandraTableScanRDD[17] at RDD at CassandraRDD.scala:18)
16/08/16 17:07:06 INFO TaskSchedulerImpl: Adding task set 5.0 with 4 tasks
16/08/16 17:07:06 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 205, localhost, partition 1,NODE_LOCAL, 22553 bytes)
16/08/16 17:07:06 INFO Executor: Running task 0.0 in stage 5.0 (TID 205)
16/08/16 17:07:07 INFO Executor: Finished task 0.0 in stage 5.0 (TID 205). 2074 bytes result sent to driver
16/08/16 17:07:07 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 205) in 706 ms on localhost (1/4)
16/08/16 17:07:14 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
16/08/16 17:32:40 INFO BlockManagerInfo: Removed broadcast_7_piece0 on localhost:43680 in memory (size: 4.2 KB, free: 756.4 MB)
16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 14
16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 13
16/08/16 17:32:40 INFO BlockManagerInfo: Removed broadcast_5_piece0 on localhost:43680 in memory (size: 7.1 KB, free: 756.4 MB)
16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 12
16/08/16 17:32:40 INFO ContextCleaner: Cleaned shuffle 0
16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 11
16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 10
16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 9
16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 8
16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 7
16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 6
16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 5
16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 4
16/08/16 17:32:40 INFO BlockManagerInfo: Removed broadcast_4_piece0 on localhost:43680 in memory (size: 13.8 KB, free: 756.4 MB)
16/08/16 20:45:06 INFO SparkContext: Invoking stop() from shutdown hook
EDIT
This is snippet of code what am I doing exactly:
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.types.{StructType, StructField, DateType, IntegerType};
object ff {
def main(string: Array[String]) {
val conf = new SparkConf()
.set("spark.cassandra.connection.host", "127.0.0.1")
.setMaster("local[4]")
.setAppName("ff")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true")
.load("test.csv")
df.registerTempTable("ff_table")
//df.printSchema()
df.count
time {
df.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "ff_table", "keyspace" -> "traffic"))
.save()
}
def time[A](f: => A) = {
val s = System.nanoTime
val ret = f
println("time: " + (System.nanoTime - s) / 1e6 + "ms")
ret
}
}
}
Also, if I run nodetool describecluster
I got this results:
Cluster Information:
Name: Test Cluster
Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Schema versions:
bf6c3ae7-5c8b-3e5d-9794-8e34bee9278f: [127.0.0.1, 127.0.0.2, 127.0.0.3, 127.0.0.4, 127.0.0.5, 127.0.0.6]
My keyspace configuration:
CREATE KEYSPACE traffic WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'} AND durable_writes = true;
I tried to insert in CLI on row for replication_factor
:3 and it's working, so every node can see each other.
Why Spark can't insert anything than, anyone idea?