I want to create an array of arrays. This is my data table:
// A case class for our sample table
case class Testing(name: String, age: Int, salary: Int)
// Create an RDD with some data
val x = sc.parallelize(Array(
Testing(null, 21, 905),
Testing("Noelia", 26, 1130),
Testing("Pilar", 52, 1890),
Testing("Roberto", 31, 1450)
))
// Convert RDD to a DataFrame
val df = sqlContext.createDataFrame(x)
// For SQL usage we need to register the table
df.registerTempTable("df")
I want to create an array of integer column "age". For that I use "collect_list":
sqlContext.sql("SELECT collect_list(age) as age from df").show
But now I want to generate an array containing multiple arrays as created above:
sqlContext.sql("SELECT collect_list(collect_list(age), collect_list(salary)) as arrayInt from df").show
But this does not work , or use the function org.apache.spark.sql.functions.array. Any ideas?
Let's create the DataFrame the way have created above.
Here we can use array_union function to achieve the desired result. array_unionfunction will return the union of all elements from the input arrays. This function is available since spark 2.4.0
// Scala Ref : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
// Pyspark Ref : https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.array_union
// Output
I hope this helps.
Ok, things can't get more simple. Let's consider the same data you are working on and go step by step from there
As the error message says :
collest_list
takes just one argument. Let's check the documentation here.It actually takes one argument ! But let's go further in the documentation of the functions object. You seem to have noticed that the array function allows you to create a new array column out of a Column or a repeated Column parameter. So let's use that :
The array function create indeed a column from the column list create before-hand by collect_list on both age and salary :
Where do we go from here ?
You have to remember that a Row from a DataFrame is just another collection wrapped by a Row.
The first thing I'll do is work on that collection. So How do we flatten a
WrappedArray[WrappedArray[Int]]
?Scala is kind of magical you just need to use
.flatten
Now let's wrap it in a UDF so we can use it on the DataFrame :
Since we registered the UDF, we can now use it inside the sqlContext :
I hope this helps !