出于某种原因,我正在用下面的代码任务不能序列例外。 我使用本地SBT测试的火花运行此。
@RunWith(classOf[JUnitRunner])
class NQTest extends FeatureSpec with Matchers with Serializable {
val conf = new SparkConf().setAppName("NQ Market Makers Test").setMaster("local")
val sc = new SparkContext(conf)
...
val testData : RDD[(String, String)] = sc.textFile("testcases/NQIntervalsTestData").map { line => (line.split(":", 2)(0), line.split(":", 2)(1)) }
testData.persist();
def testDatasets(input : Int) = {
testData.filter(_ match {
case (s, _) => (s == "Test Case " + input)
case _ => false
}).map(x => x match {
case (_, line) => line
})
}
...
feature("NQIntervals") {
scenario("Test data sanity check") {
(testDatasets(1).collect()) should not be null
}
}
}
和异常:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.filter(RDD.scala:303)
at test.scala.org.<redacted>.NQTest$.testDatasets(NQTest.scala:31)
不像我在这里看到关于此异常的其他堆栈溢出的问题,这似乎是关于RDD本身,而不是我通过过滤功能。
例如,我们可以删除过滤器和完全映射,我们最终仍期间收集的异常。 我从谷歌上搜索我只能够找到答案涉及过滤器或地图,没有问题与RDD本身内部的非序列化对象的问题。
事情到目前为止,我已经试过:
- 删除了testDatasets方法内部的过滤器和地图,只是返回的TESTDATA集。 这导致被称为收集时发生异常。
- 除去单元测试框架完全,制成NQTest直接延伸序列化和写组成的一个线主方法
testDatasets(1).collect()
仍然相同的异常 - 删除
testData.persist()
还是一样的异常
任何有识之士将受到欢迎!