Broadcast Variables not showing inside Partitions

2019-08-11 06:15发布

问题:

Scenario and Problem: I want to add two attributes to JSON object based on the look up table values and insert the JSON to Mongo DB. I have broadcast variable which holds look up table. However, i am not being able to access it inside foreachPartition as you can see in the code. It does not give me any error but simply does not display anything. Also, because of it i cant insert JSON to Mongo DB. I cant find any explanation to this behaviour. Any explanation or work around to make it work is much appreciated.

Here is my full code:

object ProcessMicroBatchStreams {
val calculateDistance = udf { 
 (lat: String, lon: String) =>      
 GeoHash.getDistance(lat.toDouble, lon.toDouble) }
 val DB_NAME = "IRT"
 val COLLECTION_NAME = "sensordata"
 val records = Array[String]()

def main(args: Array[String]): Unit = {
  if (args.length < 0) {
  System.err.println("Usage: ProcessMicroBatchStreams <master> <input_directory>")
  System.exit(1)
}
val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName(this.getClass.getCanonicalName)
  .set("spark.hadoop.validateOutputSpecs", "false")
/*.set("spark.executor.instances", "3")
.set("spark.executor.memory", "18g")
.set("spark.executor.cores", "9")
.set("spark.task.cpus", "1")
.set("spark.driver.memory", "10g")*/

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(60))
val sqc = new SQLContext(sc)
val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val broadcastTable = sc.broadcast(gpsLookUpTable)


ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
  .foreachRDD { rdd =>
  //broadcastTable.value.show() // I can access broadcast value here
  if (!rdd.partitions.isEmpty) {
    val partitionedRDD = rdd.repartition(4)
    partitionedRDD.foreachPartition {
      partition =>
        println("Inside Partition")
        broadcastTable.value.show() // I cannot access broadcast value here
        partition.foreach {
          row =>
            val items = row.split("\n")
            items.foreach { item =>
              val mongoColl = MongoClient()(DB_NAME)(COLLECTION_NAME)
              val jsonObject = new JSONObject(item)
              val latitude = jsonObject.getDouble(Constants.LATITUDE)
              val longitude = jsonObject.getDouble(Constants.LONGITUDE)

              // The broadcast value is not being shown here
              // However, there is no error shown
              // I cannot insert the value into Mongo DB
              val selectedRow = broadcastTable.value
                .filter("geoCode LIKE '" + GeoHash.subString(latitude, longitude) + "%'")
                .withColumn("Distance", calculateDistance(col("Lat"), col("Lon")))
                .orderBy("Distance")
                .select(Constants.TRACK_KM, Constants.TRACK_NAME).take(1)
              if (selectedRow.length != 0) {
                jsonObject.put(Constants.TRACK_KM, selectedRow(0).get(0))
                jsonObject.put(Constants.TRACK_NAME, selectedRow(0).get(1))
              }
              else {
                jsonObject.put(Constants.TRACK_KM, "NULL")
                jsonObject.put(Constants.TRACK_NAME, "NULL")
              }
              val record = JSON.parse(jsonObject.toString()).asInstanceOf[DBObject]
              mongoColl.insert(record)
            }
        }
    }
  }
}
sys.addShutdownHook {
  ssc.stop(true, true)
}

ssc.start()
ssc.awaitTermination()
}
}

回答1:

It looks like you're trying to broadcast an RDD. Try something like this:

broadCastVal = gpsLookUpTable.collect
broadCastTable = sc.broadcast(broadCastVal)

You should be able to get the value you're expecting.



回答2:

I am not totally sure about this but after two encounters as such i am writing this answer. I could broadcast a RDD but i am not able to access the value. If i create a list or treeMap, i am being able to broadcast and retrieve the value as well. I am not sure why. Although, i haven't found any where written that we cant broadcast a RDD.