I am trying to convert some input to the format I want in an spark dataframe. The input I have is a Sequence of this case class with up to 10,000,000 classes (or possibly also the Json string before I convert it to the case class..):
case class Element(paramName: String, value: Int, time: Int)
As a result I want a dataframe like this:
|Time | ParamA | ParamB | ParamC | Param 10,000 |
|1000 | 432432 | 8768768 | Null....... | 75675678622 |
|2000 | Null.......| Null.........| 734543 | Null................. |
....
So not every parameter has to have to be defined for all time slots. Missing values should be filled with Null. And there are probably going to be 10,000 parameter and around 1000 time slots.
The way I do it right now seems to be very bad from the efficiency:
case class Elements(name: String, value: Int, time: Int)
case class GroupedObjects(time: Int, params: (String, Int)*)
//elements contains the seq of Element
val elementsRdd: RDD[Elements] = sc.parallelize(elements)
val groupedRDD: RDD[GroupedObjects] = elementsRdd
.groupBy(element => element.time)
.map(tuple => GroupedObjects(tuple._1, tuple._2.map(element =>
(element.name, element.value)).toSeq: _*))
//transforming back to json string to get right format for RDD
val jsonRDD: RDD[String] = groupedRDD.map { obj =>
"{\"time\":" + obj.time + obj.params.map(tuple =>
",\"" + tuple._1 + "\":" + tuple._2).reduce(_ + _) + "}"
}
val df = sqlContext.read.json(jsonRDD).orderBy("time")
df.show(10)
The problem I see here is definitely the change back to a String, only to read it in again in the right format. I would be really glad for any help showing me how to get the input case class in the wanted dataframe format.
With the way I am doing it right now it is really slow and I get a heap size exception for 10,000,000 input lines.
You might try to build Row objects and define the RDD schema manually, something like the following example:
I tried this locally with 10,000,000 elements like this:
And it completes successfully in a reasonable time.
As of Spark 1.6 there's a
pivot
function. It works on DataFrames. Since you are using case classes, this is as easy as:You can then do:
See the documentation for
GroupedData
for more onpivot()
, but this should be more than enough for you to go on.