I tried the new "pivot" function of 1.6 on a larger stacked dataset. It has 5,656,458 rows and the IndicatorCode
column has 1344 different codes.
The idea was to use pivot to "unstack" (in pandas terms) this data set and have a column for each IndicatorCode.
schema = StructType([ \
StructField("CountryName", StringType(), True), \
StructField("CountryCode", StringType(), True), \
StructField("IndicatorName", StringType(), True), \
StructField("IndicatorCode", StringType(), True), \
StructField("Year", IntegerType(), True), \
StructField("Value", DoubleType(), True) \
])
data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv',
format='com.databricks.spark.csv',
header='true',
schema=schema)
data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", "\.", "_"))\
.select(["CountryCode", "IndicatorCode2", "Year", "Value"])
columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]
data3 = data2.groupBy(["Year", "CountryCode"])\
.pivot("IndicatorCode2", columns)\
.max("Value")
While this returned successfully, data3.first()
never returned a result (I interrupted on my standalone using 3 cores after 10 min).
My approach using RDD
and aggregateByKey
worked well, so I'm not searching for a solution about how to do it, but whether pivot with DataFrames can also do the trick.