I have this data frame
df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store", "values"])
+-----+---------+
|store| values|
+-----+---------+
| 1|[1, 2, 3]|
| 1|[4, 5, 6]|
| 2| [2]|
| 2| [3]|
+-----+---------+
and I would like to convert into the follwing df:
+-----+-----------------+
|store| values |
+-----+-----------------+
| 1|[1, 2, 3,4, 5, 6]|
| 2| [2,3]|
+-----+-----------------+
I did this:
from pyspark.sql import functions as F
df.groupBy("store").agg(F.collect_list("values"))
but the solution has this WrappedArrays
+-----+----------------------------------------------+
|store|collect_list(values) |
+-----+----------------------------------------------+
|1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]|
|2 |[WrappedArray(2), WrappedArray(3)] |
+-----+----------------------------------------------+
Is there any way to transform the WrappedArrays
into concatenated arrays? Or can I do it differently?
Thank you!
You need a flattening UDF; starting from your own df
:
spark.version
# u'2.2.0'
from pyspark.sql import functions as F
import pyspark.sql.types as T
def fudf(val):
return reduce (lambda x, y:x+y, val)
flattenUdf = F.udf(fudf, T.ArrayType(T.IntegerType()))
df2 = df.groupBy("store").agg(F.collect_list("values"))
df2.show(truncate=False)
# +-----+----------------------------------------------+
# |store| collect_list(values) |
# +-----+----------------------------------------------+
# |1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]|
# |2 |[WrappedArray(2), WrappedArray(3)] |
# +-----+----------------------------------------------+
df3 = df2.select("store", flattenUdf("collect_list(values)").alias("values"))
df3.show(truncate=False)
# +-----+------------------+
# |store| values |
# +-----+------------------+
# |1 |[1, 2, 3, 4, 5, 6]|
# |2 |[2, 3] |
# +-----+------------------+
I would probably do it this way.
>>> df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store", "values"])
>>> df.show()
+-----+---------+
|store| values|
+-----+---------+
| 1|[1, 2, 3]|
| 1|[4, 5, 6]|
| 2| [2]|
| 2| [3]|
+-----+---------+
>>> df.rdd.map(lambda r: (r.store, r.values)).reduceByKey(lambda x,y: x + y).toDF(['store','values']).show()
+-----+------------------+
|store| values|
+-----+------------------+
| 1|[1, 2, 3, 4, 5, 6]|
| 2| [2, 3]|
+-----+------------------+