How to transpose dataframe in Spark 1.5 (no pivot

2019-01-25 22:29发布

问题:

I want to transpose following table using spark scala without Pivot function

I am using Spark 1.5.1 and Pivot function does not support in 1.5.1. Please suggest suitable method to transpose following table:

Customer Day   Sales
1        Mon    12
1        Tue    10
1        Thu    15
1        Fri     2
2        Sun    10
2        Wed     5
2        Thu     4
2        Fri     3

Output table :

Customer Sun Mon Tue Wed Thu Fri
   1      0   12  10   0  15  2
   2     10    0   0   5  4   3

Following code is not working as I am using Spark 1.5.1 and pivot function is available from Spark 1.6:

    var Trans = Cust_Sales.groupBy("Customer").Pivot("Day").sum("Sales")

回答1:

Not sure how efficient that is, but you can use collect to get all the distinct days, and then add these columns, then use groupBy and sum:

// get distinct days from data (this assumes there are not too many of them):
val days: Array[String] = df.select("Day")
    .distinct()
    .collect()
    .map(_.getAs[String]("Day"))

// add column for each day with the Sale value if days match:
val withDayColumns = days.foldLeft(df) { 
    case (data, day) => data.selectExpr("*", s"IF(Day = '$day', Sales, 0) AS $day")
}

// wrap it up 
val result = withDayColumns
   .drop("Day")
   .drop("Sales")
   .groupBy("Customer")
   .sum(days: _*)

result.show()

Which prints (almost) what you wanted:

+--------+--------+--------+--------+--------+--------+--------+
|Customer|sum(Tue)|sum(Thu)|sum(Sun)|sum(Fri)|sum(Mon)|sum(Wed)|
+--------+--------+--------+--------+--------+--------+--------+
|       1|      10|      15|       0|       2|      12|       0|
|       2|       0|       4|      10|       3|       0|       5|
+--------+--------+--------+--------+--------+--------+--------+

I'll leave it to you to rename / reorder the columns if needed.



回答2:

If you are working with python below code might help. Let's say you want to transpose spark DataFrame df:

pandas_df = df.toPandas().transpose().reset_index()
transposed_df = sqlContext.createDataFrame(pandas_df)
transposed_df.show()


回答3:

Consider a data frame which has 6 columns and we want to group by first 4 columns and pivot on col5 while aggregating on col6 (say sum on it). So lets say you cannot use the spark 1.6 version then the below code can be written (in spark 1.5) as:

val pivotedDf = df_to_pivot
    .groupBy(col1,col2,col3,col4)
    .pivot(col5)
    .agg(sum(col6))

Here is the code with same output but without using in-built pivot function:

import scala.collection.SortedMap

//Extracting the col5 distinct values to create the new columns
val distinctCol5Values = df_to_pivot
    .select(col(col5))
    .distinct
    .sort(col5)  // ensure that the output columns are in a consistent logical order
    .map(_.getString(0))
    .toArray
    .toSeq

//Grouping by the data frame to be pivoted on col1-col4
val pivotedAgg = df_to_pivot.rdd
      .groupBy{row=>(row.getString(col1Index),
      row.getDate(col2Index),
      row.getDate(col3Index),
      row.getString(col4Index))}

//Initializing a List of tuple of (String, double values) to be filled in the columns that will be created
val pivotColListTuple = distinctCol5Values.map(ft=> (ft,0.0))
// Using Sorted Map to ensure the order is maintained
var distinctCol5ValuesListMap = SortedMap(pivotColListTuple : _*)
//Pivoting the data on col5 by opening the grouped data
val pivotedRDD = pivotedAgg.map{groupedRow=>
    distinctCol5ValuesListMap = distinctCol5ValuesListMap.map(ft=> (ft._1,0.0))
     groupedRow._2.foreach{row=>
//Updating the distinctCol5ValuesListMap values to reflect the changes
//Change this part accordingly to what you want
       distinctCol5ValuesListMap = distinctCol5ValuesListMap.updated(row.getString(col5Index),
         distinctCol5ValuesListMap.getOrElse(row.getString(col5Index),0.0)+row.getDouble(col6Index))
     }        
    Row.fromSeq(Seq(groupedRow._1._1,groupedRow._1._2,groupedRow._1._3,groupedRow._1._4) ++ distinctCol5ValuesListMap.values.toSeq)
  }

//Consructing the structFields for new columns
val colTypesStruct = distinctCol5ValuesListMap.map(colName=>StructField(colName._1,DoubleType))
//Adding the first four column structFields with the new columns struct
val opStructType = StructType(Seq(StructField(col1Name,StringType),
                                     StructField(col2Name,DateType),
                                     StructField(col3Name,DateType),
                                     StructField(col4Name,StringType)) ++ colTypesStruct )

//Creating the final data frame
val pivotedDF = sqlContext.createDataFrame(pivotedRDD,opStructType)