There was a question regarding this issue here:
Explode (transpose?) multiple columns in Spark SQL table
Suppose that we have extra columns as below:
**userId someString varA varB varC varD**
1 "example1" [0,2,5] [1,2,9] [a,b,c] [red,green,yellow]
2 "example2" [1,20,5] [9,null,6] [d,e,f] [white,black,cyan]
To conclude an output like below:
userId someString varA varB varC varD
1 "example1" 0 1 a red
1 "example1" 2 2 b green
1 "example1" 5 9 c yellow
2 "example2" 1 9 d white
2 "example2" 20 null e black
2 "example2" 5 6 f Cyan
The answer was by defining a udf
as:
val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
and defining "withColumn".
df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show
If we need to extend the above answer, with more columns, what is the easiest way to amend the above code. Any help please.
The approach with the zip
udf seems ok, but you need to extend if for more collections. Unfortunately there is no really nice way to zip 4 Seqs, but this should work:
def assertSameSize(arrs:Seq[_]*) = {
assert(arrs.map(_.size).distinct.size==1,"sizes differ")
}
val zip4 = udf((xa:Seq[Long],xb:Seq[Long],xc:Seq[String],xd:Seq[String]) => {
assertSameSize(xa,xb,xc,xd)
xa.indices.map(i=> (xa(i),xb(i),xc(i),xd(i)))
}
)
I am assuming that the size of varA,varB,varC,varD remains same from your example.
scala> case class Input(user_id : Integer,someString : String, varA : Array[Integer],varB : Array[Integer],varC : Array[String], varD : Array[String])
defined class Input
scala> case class Result(user_id : Integer,someString : String , varA : Integer,varB : Integer,varC : String, varD : String)
defined class Result
scala> val obj1 = Input(1,"example1",Array(0,2,5),Array(1,2,9),Array("a","b","c"),Array("red","green","yellow"))
obj1: Input = Input(1,example1,[Ljava.lang.Integer;@77c43ec2,[Ljava.lang.Integer;@3a332d08,[Ljava.lang.String;@5c1222da,[Ljava.lang.String;@114e051a)
scala> val obj2 = Input(2,"example2",Array(1,20,5),Array(9,null,6),Array("d","e","f"),Array("white","black","cyan"))
obj2: Input = Input(2,example2,[Ljava.lang.Integer;@326db38,[Ljava.lang.Integer;@50914458,[Ljava.lang.String;@339b73ae,[Ljava.lang.String;@1567ee0a)
scala> val input_df = sc.parallelize(Seq(obj1,obj2)).toDS
input_df: org.apache.spark.sql.Dataset[Input] = [user_id: int, someString: string ... 4 more fields]
scala> input_df.show
+-------+----------+----------+------------+---------+--------------------+
|user_id|someString| varA| varB| varC| varD|
+-------+----------+----------+------------+---------+--------------------+
| 1| example1| [0, 2, 5]| [1, 2, 9]|[a, b, c]|[red, green, yellow]|
| 2| example2|[1, 20, 5]|[9, null, 6]|[d, e, f]|[white, black, cyan]|
+-------+----------+----------+------------+---------+--------------------+
scala> def getResult(row : Input) : Iterable[Result] = {
| val user_id = row.user_id
| val someString = row.someString
| val varA = row.varA
| val varB = row.varB
| val varC = row.varC
| val varD = row.varD
| val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i),varC(i),varD(i))}
| seq.toSeq
| }
getResult: (row: Input)Iterable[Result]
scala> val resdf = input_df.flatMap{row => getResult(row)}
resdf: org.apache.spark.sql.Dataset[Result] = [user_id: int, someString: string ... 4 more fields]
scala> resdf.show
+-------+----------+----+----+----+------+
|user_id|someString|varA|varB|varC| varD|
+-------+----------+----+----+----+------+
| 1| example1| 0| 1| a| red|
| 1| example1| 2| 2| b| green|
| 1| example1| 5| 9| c|yellow|
| 2| example2| 1| 9| d| white|
| 2| example2| 20|null| e| black|
| 2| example2| 5| 6| f| cyan|
+-------+----------+----+----+----+------+
If the size of columns varA,varB,varC or varD is different then those scenarios need to be handles.
You could iterate over the max size and output null values if values are not present in any columns by handling exceptions.