这里是我的函数计算均方根误差。 然而最后一行不能因为错误的编译Type mismatch issue (expected: Double, actual: Unit)
。 我尝试了许多不同的方法来解决这个问题,但还是没有成功。 有任何想法吗?
def calculateRMSE(output: DStream[(Double, Double)]): Double = {
val summse = output.foreachRDD { rdd =>
rdd.map {
case pair: (Double, Double) =>
val err = math.abs(pair._1 - pair._2);
err*err
}.reduce(_ + _)
}
// math.sqrt(summse) HOW TO APPLY SQRT HERE?
}
作为eliasah指出, foreach
(和foreachRDD
)没有返回值; 他们是唯一的副作用。 如果你想返回的东西,你需要map
。 根据关你的第二个解决方案:
val rmse = output.map(rdd => new RegressionMetrics(rdd).rootMeanSquaredError)
它看起来更好,如果你犯了一个小功能吧:
val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError
val rmse = output.map(getRmse)
忽略空RDDS,
val rmse = output.filter(_.nonEmpty).map(getRmse)
这里是完全相同的序列作为换理解。 这只是在地图,flatMap和过滤语法糖,但我认为这是很容易当我第一次学习Scala了解:
val rmse = for {
rdd <- output
if (rdd.nonEmpty)
} yield new RegressionMetrics(rdd).rootMeanSquaredError
下面是总结错误,就像你第一次尝试的功能:
def calculateRmse(output: DStream[(Double, Double)]): Double = {
val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError
output.filter(_.nonEmpty).map(getRmse).reduce(_+_)
}
编译器的约投诉nonEmpty
其实是DSTREAM的问题filter
的方法。 相反,在RDDS在DSTREAM经营, filter
是在对双打的操作(Double, Double)
你DSTREAM的类型参数给出。
我不知道足够的火花,说这是一个缺陷 ,但它是非常奇怪的。 Filter
和收藏上大多数其他操作通常在的foreach来定义 ,但DSTREAM实现不遵循相同的约定的那些功能; 其方法已过时foreach
和电流foreachRDD
过流的RDDS进行操作,但它的其他高阶方法没有 。
所以,我的方法是行不通的。 DSTREAM可能有一个很好的理由,是怪异(性能有关?)这可能是坏的方式做它foreach
:
def calculateRmse(ds: DStream[(Double, Double)]): Double = {
var totalError: Double = 0
def getRmse(rdd:RDD[(Double, Double)]): Double = new RegressionMetrics(rdd).rootMeanSquaredError
ds.foreachRDD((rdd:RDD[(Double, Double)]) => if (!rdd.isEmpty) totalError += getRmse(rdd))
totalError
}
但是,它的工程!
我设法做这个任务如下:
import org.apache.spark.mllib.evaluation.RegressionMetrics
output.foreachRDD { rdd =>
if (!rdd.isEmpty)
{
val metrics = new RegressionMetrics(rdd)
val rmse = metrics.rootMeanSquaredError
println("RMSE: " + rmse)
}
}