MongoDBObject not being added to inside of an rrd

2019-08-14 09:43发布

问题:

Ok guru's, i've got a problem that doesnt make much sense to me. I am stuck trying to save an object to mongodb that looks like such (roughly)

{data:[baseball:[{timestamp (essentially):tweet},{timestamp: another tweet}]
       football:[{timestamp:footballtweet},{timestamp:differentfootballtweet}]
      ]
 timeInterval:"last minute to this minute" ( i'm doing timeseries data)
 terms:["football","baseball"]
}

see below on which loop im stuck on. note the issue might have something to do with the rrd expiring. I tried to fix it by persisting it in memory but i'm not sure what to do.

   twitterStream.foreachRDD(rrd => {
          val entryToSave = MongoDBObject()
          val termWithTweets = MongoDBObject()
          rrd.persist()
          filters.foreach(term =>{
          var listOfTweets = MongoDBObject()
            rrd.persist()
            for(status <- rrd){
              if(status.getText.contains(term)) {
    //            listOfTweets += status
//Why doesnt this line below actually add the key value pair to the variable
//defined outside of the "for(status <- rrd)" loop? I know ( through debugging)
//that it does in fact append inside the loop.

                listOfTweets += (DateTime.now.toString() -> status.toString)
              }
            }
//when I print the listOfTweets outside of the for loop it is empty, Why?
            println("outsideRRD",listOfTweets)
              termWithTweets += (term -> listOfTweets)
          })
          entryToSave += ("data" -> termWithTweets)
          entryToSave += ("timeInterval" -> (DateTime.lastMinute to DateTime.now).toString)
          entryToSave += ("terms" -> filters)
          collection.insert(entryToSave)
        })

I dont think this is a val/var issue, although it may be. I've tried it both ways

回答1:

The computations on RDD's are distributed over the cluster. You can't update a variable that was created outside the RDD operation closure from within the RDD. They are basically in two different places: The variable is created in the Spark driver and accessed in the workers and should be treated as read-only.

Spark supports distributed cummulators that could be used in this case: Spark Cummulators

Another option (the one I'd prefer) is to transform the stream of RDD into the desired data format and the use the foreachRDD method to persist it into secondary storage. This would be a more functional way to approach the problem. It would roughly look like this:

  val filteredStream = twitterStream.filter(entry => filters.exists(term => entry.getText.getStatus.contains(term)))
  val filteredStreamWithTs = filteredStream.map(x => ((DateTime.now.toString(), x)))
  filteredStreamWithTs.foreachRdd(rdd => // write to Mongo)