I have data as shown below
-----------------------------
place | key | weights
----------------------------
amazon | lion | [ 34, 23, 56 ]
north | bear | [ 90, 45]
amazon | lion | [ 38, 30, 50 ]
amazon | bear | [ 45 ]
amazon | bear | [ 40 ]
I trying to get the result like below
-----------------------------
place | key | average
----------------------------
amazon | lion1 | 36.0 #(34 + 38)/2
amazon | lion2 | 26.5 #(23 + 30)/2
amazon | lion3 | 53.0 #(50 + 56)/2
north | bear1 | 90 #(90)/1
north | bear2 | 45 #(45)/1
amazon | bear1 | 42.5 #(45 + 40)/2
I get the point that first I have to do a groupby on columns place
and key
, and then I have to take average on array elements based on indexes.
For example lion1 is 1st index element in arrays [ 34, 23, 56 ]
and [ 38, 30, 50 ]
.
I already has a solution using posexplode
, but the problem is in real data weights
array column size is very high, as posexplode
adds more rows, data size has increased enormously from 10 million rows to 1.2 billion and unable to compute in a reliable time on present cluster.
I think it is better to add more columns than rows and then unpivot the columns, but I have no idea how to achieve that using pyspark or spark SQL 2.2.1 .
One option is to merge all the
array
s for a given place,key combination into an array. On this array of arrays, you can use audf
which computes the desired average and finallyposexplode
to get the desired result.You can find max number of elements in an array column by functions.size() and then expand that column:
setup the data
Find the max number of elements in the array field 'average'
Convert array column into n-columns
Calculate the mean aggregation on the new columns
Unpivot the columns using select + union + reduce