I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row. For instance,
id | name | likes
_______________________________
1 | Luke | [baseball, soccer]
should become
id | name | likes
_______________________________
1 | Luke | baseball
1 | Luke | soccer
This is my code
private DataFrame explodeDataFrame(DataFrame df) {
DataFrame resultDf = df;
for (StructField field : df.schema().fields()) {
if (field.dataType() instanceof ArrayType) {
resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
resultDf.show();
}
}
return resultDf;
}
The problem is that in my data, some of the array columns have nulls. In that case, the entire row is deleted. So this dataframe:
id | name | likes
_______________________________
1 | Luke | [baseball, soccer]
2 | Lucy | null
becomes
id | name | likes
_______________________________
1 | Luke | baseball
1 | Luke | soccer
instead of
id | name | likes
_______________________________
1 | Luke | baseball
1 | Luke | soccer
2 | Lucy | null
How can I explode my arrays so that I don't lose the null rows?
I am using Spark 1.5.2 and Java 8
Spark 2.2+
You can use explode_outer
function:
import org.apache.spark.sql.functions.explode_outer
df.withColumn("likes", explode_outer($"likes")).show
// +---+----+--------+
// | id|name| likes|
// +---+----+--------+
// | 1|Luke|baseball|
// | 1|Luke| soccer|
// | 2|Lucy| null|
// +---+----+--------+
Spark <= 2.1
In Scala but Java equivalent should be almost identical (to import individual functions use import static
).
import org.apache.spark.sql.functions.{array, col, explode, lit, when}
val df = Seq(
(1, "Luke", Some(Array("baseball", "soccer"))),
(2, "Lucy", None)
).toDF("id", "name", "likes")
df.withColumn("likes", explode(
when(col("likes").isNotNull, col("likes"))
// If null explode an array<string> with a single null
.otherwise(array(lit(null).cast("string")))))
The idea here is basically to replace NULL
with an array(NULL)
of a desired type. For complex type (a.k.a structs
) you have to provide full schema:
val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")
val st = StructType(Seq(
StructField("_1", IntegerType, false), StructField("_2", StringType, true)
))
dfStruct.withColumn("y", explode(
when(col("y").isNotNull, col("y"))
.otherwise(array(lit(null).cast(st)))))
or
dfStruct.withColumn("y", explode(
when(col("y").isNotNull, col("y"))
.otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))
Note:
If array Column
has been created with containsNull
set to false
you should change this first (tested with Spark 2.1):
df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))
You can use explode_outer()
function.
Following up on the accepted answer, when the array elements are a complex type it can be difficult to define it by hand (e.g with large structs).
To do it automatically I wrote the following helper method:
def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = {
val arrayFields = df.schema.fields
.map(field => field.name -> field.dataType)
.collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])}
.toMap
columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) =>
dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol))
.otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))
}
Edit: it seems that spark 2.2 and newer have this built in.