Use more than one collect_list in one query in Spa

2019-01-24 12:52发布

问题:

I have the following dataframe data:

root
 |-- userId: string 
 |-- product: string 
 |-- rating: double

and the following query:

val result = sqlContext.sql("select userId, collect_list(product), collect_list(rating) from data group by userId")

My question is that, does product and rating in the aggregated arrays match each other? That is, whether the product and the rating from the same row have the same index in the aggregated arrays.

Update: Starting from Spark 2.0.0, one can do collect_list on struct type so we can do one collect_list on a combined column. But for pre 2.0.0 version, one can only use collect_list on primitive type.

回答1:

I believe there is no explicit guarantee that all arrays will have the same order. Spark SQL uses multiple optimizations and under certain conditions there is no guarantee that all aggregations are scheduled at the same time (one example is aggregation with DISTINCT). Since exchange (shuffle) results in nondeterministic order it is theoretically possible that order will differ.

So while it should work in practice it could be risky and introduce some hard to detect bugs.

If you Spark 2.0.0 or later you can aggregate non-atomic columns with collect_list:

SELECT userId, collect_list(struct(product, rating)) FROM data GROUP BY userId

If you use an earlier version you can try to use explicit partitions and order:

WITH tmp AS (
  SELECT * FROM data DISTRIBUTE BY userId SORT BY userId, product, rating
)
SELECT userId, collect_list(product), collect_list(rating)
FROM tmp
GROUP BY userId