-->

MongoDBObject没有被添加到RRD foreach循环卡斯巴阶apache的火花的内部(M

2019-10-21 02:53发布

好了大师的,我有这并不多大意义,我的一个问题。 我坚持努力的对象保存到MongoDB的,看起来像这样(大约)

{data:[baseball:[{timestamp (essentially):tweet},{timestamp: another tweet}]
       football:[{timestamp:footballtweet},{timestamp:differentfootballtweet}]
      ]
 timeInterval:"last minute to this minute" ( i'm doing timeseries data)
 terms:["football","baseball"]
}

见下面哪个环路IM卡上。 注意这个问题可能有一些做的RRD期满。 我试图通过在内存中坚持它来修复它,但我不知道该怎么办。

   twitterStream.foreachRDD(rrd => {
          val entryToSave = MongoDBObject()
          val termWithTweets = MongoDBObject()
          rrd.persist()
          filters.foreach(term =>{
          var listOfTweets = MongoDBObject()
            rrd.persist()
            for(status <- rrd){
              if(status.getText.contains(term)) {
    //            listOfTweets += status
//Why doesnt this line below actually add the key value pair to the variable
//defined outside of the "for(status <- rrd)" loop? I know ( through debugging)
//that it does in fact append inside the loop.

                listOfTweets += (DateTime.now.toString() -> status.toString)
              }
            }
//when I print the listOfTweets outside of the for loop it is empty, Why?
            println("outsideRRD",listOfTweets)
              termWithTweets += (term -> listOfTweets)
          })
          entryToSave += ("data" -> termWithTweets)
          entryToSave += ("timeInterval" -> (DateTime.lastMinute to DateTime.now).toString)
          entryToSave += ("terms" -> filters)
          collection.insert(entryToSave)
        })

我不认为这是一个VAL / VAR问题,虽然可能。 我已经尝试了两种方式

Answer 1:

在RDD的计算被分布在集群上。 你不能更新在该RDD操作关闭之外创建从RDD中的变量。 他们基本上是在两个不同的地方:该变量在Spark驱动程序创建,并在工作人员访问,并应被视为只读。

星火支持分布式cummulators可能在这种情况下使用: 星火Cummulators

另一种选择(一个我宁愿)是RDD的流变换成所需的数据格式和利用foreachRDD方法将其持续到二次存储。 这将是解决这个问题一个功能更强大的方式。 这将大致是这样的:

  val filteredStream = twitterStream.filter(entry => filters.exists(term => entry.getText.getStatus.contains(term)))
  val filteredStreamWithTs = filteredStream.map(x => ((DateTime.now.toString(), x)))
  filteredStreamWithTs.foreachRdd(rdd => // write to Mongo)


文章来源: MongoDBObject not being added to inside of an rrd foreach loop casbah scala apache spark