Scenario and Problem: I want to add two attributes to JSON object based on the look up table values and insert the JSON to Mongo DB. I have broadcast variable which holds look up table. However, i am not being able to access it inside foreachPartition as you can see in the code. It does not give me any error but simply does not display anything. Also, because of it i cant insert JSON to Mongo DB. I cant find any explanation to this behaviour. Any explanation or work around to make it work is much appreciated.
Here is my full code:
object ProcessMicroBatchStreams {
val calculateDistance = udf {
(lat: String, lon: String) =>
GeoHash.getDistance(lat.toDouble, lon.toDouble) }
val DB_NAME = "IRT"
val COLLECTION_NAME = "sensordata"
val records = Array[String]()
def main(args: Array[String]): Unit = {
if (args.length < 0) {
System.err.println("Usage: ProcessMicroBatchStreams <master> <input_directory>")
System.exit(1)
}
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getCanonicalName)
.set("spark.hadoop.validateOutputSpecs", "false")
/*.set("spark.executor.instances", "3")
.set("spark.executor.memory", "18g")
.set("spark.executor.cores", "9")
.set("spark.task.cpus", "1")
.set("spark.driver.memory", "10g")*/
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(60))
val sqc = new SQLContext(sc)
val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val broadcastTable = sc.broadcast(gpsLookUpTable)
ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
.foreachRDD { rdd =>
//broadcastTable.value.show() // I can access broadcast value here
if (!rdd.partitions.isEmpty) {
val partitionedRDD = rdd.repartition(4)
partitionedRDD.foreachPartition {
partition =>
println("Inside Partition")
broadcastTable.value.show() // I cannot access broadcast value here
partition.foreach {
row =>
val items = row.split("\n")
items.foreach { item =>
val mongoColl = MongoClient()(DB_NAME)(COLLECTION_NAME)
val jsonObject = new JSONObject(item)
val latitude = jsonObject.getDouble(Constants.LATITUDE)
val longitude = jsonObject.getDouble(Constants.LONGITUDE)
// The broadcast value is not being shown here
// However, there is no error shown
// I cannot insert the value into Mongo DB
val selectedRow = broadcastTable.value
.filter("geoCode LIKE '" + GeoHash.subString(latitude, longitude) + "%'")
.withColumn("Distance", calculateDistance(col("Lat"), col("Lon")))
.orderBy("Distance")
.select(Constants.TRACK_KM, Constants.TRACK_NAME).take(1)
if (selectedRow.length != 0) {
jsonObject.put(Constants.TRACK_KM, selectedRow(0).get(0))
jsonObject.put(Constants.TRACK_NAME, selectedRow(0).get(1))
}
else {
jsonObject.put(Constants.TRACK_KM, "NULL")
jsonObject.put(Constants.TRACK_NAME, "NULL")
}
val record = JSON.parse(jsonObject.toString()).asInstanceOf[DBObject]
mongoColl.insert(record)
}
}
}
}
}
sys.addShutdownHook {
ssc.stop(true, true)
}
ssc.start()
ssc.awaitTermination()
}
}