I have the following dataframe :df
In some point I need to filter out items base on timestamps(milliseconds).
However it is important to me to save how much records werefiltered(In case it is too many I want to fail the job)
Naively I can do:
======Lots of calculations on df ======
val df_filtered = df.filter($"ts" >= startDay && $"ts" <= endDay)
val filtered_count = df.count - df_filtered.count
However it feels like complete overkill since SPARK will perform the whole execution tree, 3 times (filter and 2 counts).
This task in Hadoop MapReduce is really easy since I can maintain counter for each row filtered.
Is there more efficient way, I could only find accumulators but I can't connect it to filter.
A suggested approach was to cache df before the filter however I would prefer this option as last resort due to DF size.
Spark 1.6.0 code:
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object Main {
val conf = new SparkConf().setAppName("myapp").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
case class xxx(a: Int, b: Int)
def main(args: Array[String]): Unit = {
val df = sqlContext.createDataFrame(sc.parallelize(Seq(xxx(1, 1), xxx(2, 2), xxx(3,3))))
val acc = sc.accumulator[Long](0)
val filteredRdd = df.rdd.filter(r => {
if (r.getAs[Int]("a") > 2) {
true
} else {
acc.add(1)
false
}
})
val filteredRddDf = sqlContext.createDataFrame(filteredRdd, df.schema)
filteredRddDf.show()
println(acc.value)
}
}
Spark 2.x.x code:
import org.apache.spark.sql.SparkSession
object Main {
val ss = SparkSession.builder().master("local[*]").getOrCreate()
val sc = ss.sparkContext
case class xxx(a: Int, b: Int)
def main(args: Array[String]): Unit = {
val df = ss.createDataFrame(sc.parallelize(Seq(xxx(1, 1), xxx(2, 2), xxx(3,3))))
val acc = sc.longAccumulator
val filteredDf = df.filter(r => {
if (r.getAs[Int]("a") > 2) {
true
} else {
acc.add(1)
false
}
}).toDF()
filteredDf.show()
println(acc.value)
}
}