均值替换缺失值 - 星火数据帧均值替换缺失值 - 星火数据帧(Replace missing val

2019-05-12 02:17发布

我有一些遗漏值的星火数据帧。 我想通过与平均值为该列替换缺失值进行简单估算。 我是很新的火花,所以我一直在努力实现这个逻辑。 这是我设法到目前为止做的事:

一)要为单个列(假设柱A)做到这一点,这行代码似乎工作:

df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
  .first()(0).asInstanceOf[Double])
  .otherwise($"ColA"))

二)但是,我一直无法弄清楚,如何为我的数据帧中的所有列做到这一点。 我是想出来的地图功能,但我相信它遍历一个数据帧中的每一行

三)关于被如此类似的问题- 在这里 。 虽然我喜欢的解决方案(使用汇总表和聚结),我非常希望知道是否有一种方法通过每列循环,从而做到这一点(我来自R,通过每个柱,使用更高阶的功能,像这样循环lapply似乎更自然的我)。

谢谢!

Answer 1:

火花> = 2.2

您可以使用org.apache.spark.ml.feature.Imputer (同时支持平均数和中位数的策略)。

斯卡拉

import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer()
  .setInputCols(df.columns)
  .setOutputCols(df.columns.map(c => s"${c}_imputed"))
  .setStrategy("mean")

imputer.fit(df).transform(df)

Python的

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=df.columns, 
    outputCols=["{}_imputed".format(c) for c in df.columns]
)
imputer.fit(df).transform(df)

火花<2.2

这个给你:

import org.apache.spark.sql.functions.mean

df.na.fill(df.columns.zip(
  df.select(df.columns.map(mean(_)): _*).first.toSeq
).toMap)

哪里

df.columns.map(mean(_)): Array[Column] 

计算的平均为每列,

df.select(_: *).first.toSeq: Seq[Any]

收集汇总值,并将其转换行Seq[Any] (我知道这是不理想的,但是这是我们必须与之合作的API),

df.columns.zip(_).toMap: Map[String,Any] 

创建aMap: Map[String, Any]从列名称映射到其平均,最后:

df.na.fill(_): DataFrame

使用填充缺失值:

fill: Map[String, Any] => DataFrame 

DataFrameNaFunctions

为了则会忽略NaN条目可以更换:

df.select(df.columns.map(mean(_)): _*).first.toSeq

有:

import org.apache.spark.sql.functions.{col, isnan, when}


df.select(df.columns.map(
  c => mean(when(!isnan(col(c)), col(c)))
): _*).first.toSeq


Answer 2:

在PySpark归咎于(而不是平均值)中值<2.2

## filter numeric cols
num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "int"}, df.dtypes)]
### Compute a dict with <col_name, median_value>
median_dict = dict()
for c in num_cols:
   median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0]

然后,应用na.fill

df_imputed = df.na.fill(median_dict)


Answer 3:

对于PySpark,这是我使用的代码:

mean_dict = { col: 'mean' for col in df.columns }
col_avgs = df.agg( mean_dict ).collect()[0].asDict()
col_avgs = { k[4:-1]: v for k,v in col_avgs.iteritems() }
df.fillna( col_avgs ).show()

这四个步骤是:

  1. 创建字典mean_dict映射列名的总操作(均值)
  2. 计算平均值为每列,并将其保存为词典col_avgs
  3. 列名col_avgs开始avg(和结束) ,如avg(col1) 剥去括号出来。
  4. 填写数据框的列与使用该平均值col_avgs


文章来源: Replace missing values with mean - Spark Dataframe