How to convert rdd object to dataframe in spark

2019-01-01 00:23发布

How can I convert an RDD (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]) to a Dataframe org.apache.spark.sql.DataFrame. I converted a dataframe to rdd using .rdd. After processing it I want it back in dataframe. How can I do this ?

10条回答
旧时光的记忆
2楼-- · 2019-01-01 00:44

This code works perfectly from Spark 2.x with Scala 2.11

Import necessary classes

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

Create SparkSession Object, Here it's spark

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext // Just used to create test RDDs

Let's an RDD to make it DataFrame

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

Method 1

Using SparkSession.createDataFrame(RDD obj).

val dfWithoutSchema = spark.createDataFrame(rdd)

dfWithoutSchema.show()
+------+--------------------+
|    _1|                  _2|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
|  test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+

Method 2

Using SparkSession.createDataFrame(RDD obj) and specifying column names.

val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals")

dfWithSchema.show()
+------+--------------------+
|    id|                vals|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
|  test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+

Method 3 (Actual answer to question)

This way requires the input rdd should be of type RDD[Row].

val rowsRdd: RDD[Row] = sc.parallelize(
  Seq(
    Row("first", 2.0, 7.0),
    Row("second", 3.5, 2.5),
    Row("third", 7.0, 5.9)
  )
)

create the schema

val schema = new StructType()
  .add(StructField("id", StringType, true))
  .add(StructField("val1", DoubleType, true))
  .add(StructField("val2", DoubleType, true))

Now apply both rowsRdd and schema to createDataFrame()

val df = spark.createDataFrame(rowsRdd, schema)

df.show()
+------+----+----+
|    id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
查看更多
初与友歌
3楼-- · 2019-01-01 00:47

Here is a simple example of converting your List into Spark RDD and then converting that Spark RDD into Dataframe.

Please note that I have used Spark-shell's scala REPL to execute following code, Here sc is an instance of SparkContext which is implicitly available in Spark-shell. Hope it answer your question.

scala> val numList = List(1,2,3,4,5)
numList: List[Int] = List(1, 2, 3, 4, 5)

scala> val numRDD = sc.parallelize(numList)
numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at <console>:28

scala> val numDF = numRDD.toDF
numDF: org.apache.spark.sql.DataFrame = [_1: int]

scala> numDF.show
+---+
| _1|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
+---+
查看更多
路过你的时光
4楼-- · 2019-01-01 00:48

Method 1: (Scala)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df_2 = sc.parallelize(Seq((1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c"))).toDF("x", "y", "z")

Method 2: (Scala)

case class temp(val1: String,val3 : Double) 

val rdd = sc.parallelize(Seq(
  Row("foo",  0.5), Row("bar",  0.0)
))
val rows = rdd.map({case Row(val1:String,val3:Double) => temp(val1,val3)}).toDF()
rows.show()

Method 1: (Python)

from pyspark.sql import Row
l = [('Alice',2)]
Person = Row('name','age')
rdd = sc.parallelize(l)
person = rdd.map(lambda r:Person(*r))
df2 = sqlContext.createDataFrame(person)
df2.show()

Method 2: (Python)

from pyspark.sql.types import * 
l = [('Alice',2)]
rdd = sc.parallelize(l)
schema =  StructType([StructField ("name" , StringType(), True) , 
StructField("age" , IntegerType(), True)]) 
df3 = sqlContext.createDataFrame(rdd, schema) 
df3.show()

Extracted the value from the row object and then applied the case class to convert rdd to DF

val temp1 = attrib1.map{case Row ( key: Int ) => s"$key" }
val temp2 = attrib2.map{case Row ( key: Int) => s"$key" }

case class RLT (id: String, attrib_1 : String, attrib_2 : String)
import hiveContext.implicits._

val df = result.map{ s => RLT(s(0),s(1),s(2)) }.toDF
查看更多
妖精总统
5楼-- · 2019-01-01 00:50

Suppose you have a DataFrame and you want to do some modification on the fields data by converting it to RDD[Row].

val aRdd = aDF.map(x=>Row(x.getAs[Long]("id"),x.getAs[List[String]]("role").head))

To convert back to DataFrame from RDD we need to define the structure type of the RDD.

If the datatype was Long then it will become as LongType in structure.

If String then StringType in structure.

val aStruct = new StructType(Array(StructField("id",LongType,nullable = true),StructField("role",StringType,nullable = true)))

Now you can convert the RDD to DataFrame using the createDataFrame method.

val aNamedDF = sqlContext.createDataFrame(aRdd,aStruct)
查看更多
何处买醉
6楼-- · 2019-01-01 00:51

Assuming your RDD[row] is called rdd, you can use:

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()
查看更多
不流泪的眼
7楼-- · 2019-01-01 00:56

Note: This answer was originally posted here

I am posting this answer because I would like to share additional details about the available options that I did not find in the other answers


To create a DataFrame from an RDD of Rows, there are two main options:

1) As already pointed out, you could use toDF() which can be imported by import sqlContext.implicits._. However, this approach only works for the following types of RDDs:

  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]

(source: Scaladoc of the SQLContext.implicits object)

The last signature actually means that it can work for an RDD of tuples or an RDD of case classes (because tuples and case classes are subclasses of scala.Product).

So, to use this approach for an RDD[Row], you have to map it to an RDD[T <: scala.Product]. This can be done by mapping each row to a custom case class or to a tuple, as in the following code snippets:

val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

or

case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

The main drawback of this approach (in my opinion) is that you have to explicitly set the schema of the resulting DataFrame in the map function, column by column. Maybe this can be done programatically if you don't know the schema in advance, but things can get a little messy there. So, alternatively, there is another option:


2) You can use createDataFrame(rowRDD: RDD[Row], schema: StructType) as in the accepted answer, which is available in the SQLContext object. Example for converting an RDD of an old DataFrame:

val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

Note that there is no need to explicitly set any schema column. We reuse the old DF's schema, which is of StructType class and can be easily extended. However, this approach sometimes is not possible, and in some cases can be less efficient than the first one.

查看更多
登录 后发表回答