I am using Spark 1.3.1 and I have written a small program to filter data on cassandra
val sc = new SparkContext(conf)
val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
println(rdd2.count())
sc.stop()
This program runs for a very long time, printing messages like
16/09/01 21:10:31 INFO Executor: Running task 46.0 in stage 0.0 (TID 46)
16/09/01 21:10:31 INFO TaskSetManager: Finished task 42.0 in stage 0.0 (TID 42) in 20790 ms on localhost (43/1350)
If I terminate the program and change my code to
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
It still runs for a very long time with messages like
6/09/01 21:14:01 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
16/09/01 21:14:01 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 19395 ms on localhost (5/1350)
So it seems like the program will always try to load the entire cassandra table in memory (or try to scan it completely) and only then apply the filter. Which seems extremely inefficient to me.
How can I write this code in a better way so that spark doesn't try to load the entire cassandra table (or scan it completely) into an RDD and only then apply filter?