How can i add multiple columns in Spark Datframe i

2020-07-24 05:44发布

问题:

I have set of columns names and need to add those columns in existing dataframe which is also very huge in size, i need to add the all columns from set to dataframe with StringType and default null value. I am following below approach but i found that when the number of columns and dataframe size is huge this affecting my performance. Is there any better way to this in spark? Note : Number of columns : ~500

import sparkSession.sqlContext.implicits._
var df = Seq(
  (1, "James"),
  (2, "Michael"),
  (3, "Robert"),
  (4, "Washington"),
  (5, "Jefferson")
).toDF("Id", "Name")
df.show(false)

val diff_set = Seq("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11", "col12", "col13", "col14", "col15", "col16", "col17", "col18", "col19", "col20", "col21", "col22").toSet
diff_set.foreach(x => {
  if (x.size > 0) {
    df = df.withColumn(x, lit(null)).withColumn(x, col(x).cast(StringType))
  }
})
df.show(false)
+---+----------+
|Id |Name      |
+---+----------+
|1  |James     |
|2  |Michael   |
|3  |Robert    |
|4  |Washington|
|5  |Jefferson |
+---+----------+

+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+
|Id |Name      |col7|col8|col3|col17|col6|col20|col2|col14|col16|col21|col15|col9|col10|col5|col1|col13|col19|col11|col22|col18|col4|col12|
+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+
|1  |James     |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|2  |Michael   |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|3  |Robert    |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|4  |Washington|null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|5  |Jefferson |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+

回答1:

Use select

df
.select(
    df.columns.map(c => col(c).as(c)) ++ 
    diff_set.map(c => lit(null).cast("string").as(c)):_*
)
.show(false)

Use foldLeft

scala> df.show(false)
+---+----------+
|Id |Name      |
+---+----------+
|1  |James     |
|2  |Michael   |
|3  |Robert    |
|4  |Washington|
|5  |Jefferson |
+---+----------+
scala> val diff_set = Seq("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11", "col12", "col13", "col14", "col15", "col16", "col17", "col18", "col19", "col20", "col21", "col22").toSet
scala> 

diff_set
.foldLeft(df)((ddf,c) => 
    ddf
    .withColumn(c,lit(null).cast("string"))
)
.show(false)

+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+
|Id |Name      |col7|col8|col3|col17|col6|col20|col2|col14|col16|col21|col15|col9|col10|col5|col1|col13|col19|col11|col22|col18|col4|col12|
+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+
|1  |James     |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|2  |Michael   |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|3  |Robert    |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|4  |Washington|null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|5  |Jefferson |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+

Comparison

Using foldLeft for 1000000 records - Time taken: 18017 ms

spark.time {
    val diff_set = Seq("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11", "col12", "col13", "col14", "col15", "col16", "col17", "col18", "col19", "col20", "col21", "col22").toSet
    val df = (1 to 1000000).toDF
    diff_set.foldLeft(df)((ddf,c) => ddf.withColumn(c,lit(null).cast("string"))).show(false)
}

Using crossJoin for 1000000 records - Time taken: 13224 ms

spark.time {
    val diff_set = Seq("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11", "col12", "col13", "col14", "col15", "col16", "col17", "col18", "col19", "col20", "col21", "col22").toSet
    val df = (1 to 1000000).toDF
    val dfb = Seq(("null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null")).toDF(diff_set.toList:_*)
    df.crossJoin(dfb).show(false)
}

Using select for 1000000 records - Time taken: 8519 ms

spark.time {
    val diff_set = Seq("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11", "col12", "col13", "col14", "col15", "col16", "col17", "col18", "col19", "col20", "col21", "col22").toSet
    val df = (1 to 1000000).toDF
    df.select(df.columns.map(c => col(c).as(c)) ++ diff_set.map(c => lit(null).cast("string").as(c)):_*).show
}


回答2:

So this is in PySpark.

df.select(
    '*', 
    *list(F.lit(None).alias(f'col{n}') for n in range(7,13))
).show()
+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+
|Id |Name      |col7|col8|col3|col17|col6|col20|col2|col14|col16|col21|col15|col9|col10|col5|col1|col13|col19|col11|col22|col18|col4|col12|
+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+
|1  |James     |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|2  |Michael   |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|3  |Robert    |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|4  |Washington|null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|5  |Jefferson |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+

This logic translates to Scala spark if you understand how to replace the list comprehension in Scala with map.

This is faster as it creates the 22 columns to be executed at once, rather that adding them in iterations as foldleft does.