How to obtain convert DataFrame to specific RDD?

2019-08-16 17:15发布

问题:

I have the following DataFrame in Spark 2.2:

df = 
   v_in   v_out
   123    456
   123    789
   456    789

This df defines edges of a graph. Each row is a pair of vertices. I want to extract the Array of edges in order to create an RDD of edges as follows:

val edgeArray = Array(
  Edge(2L, 1L, 0.0),
  Edge(2L, 4L, 0.2),
  Edge(3L, 2L, 0.9),
  Edge(3L, 6L, 0.1),
  Edge(4L, 1L, 0.0),
  Edge(5L, 2L, 0.8),
  Edge(5L, 3L, 0.7),
  Edge(5L, 6L, 0.5)
)

val spark = SparkSession.builder()
                        .appName("ES")
                        .master("local[*]")
                        .getOrCreate()

implicit val sparkContext = spark.sparkContext

val edgeRDD: RDD[Edge[Double]] = sparkContext.parallelize(edgeArray)

How can I obtain edgeArray of the same structure using df? In each Edge, the third value can be any random Double value from 0 to 1.

UPDATE:

I did it in this way, but not sure if this is the most optimal solution:

val edgeArray = df.rdd.collect().map(row => Edge(row.get(0).toString.toLong, row.get(1).toString.toLong, 0.0))
val edgeRDD: RDD[Edge[Double]] = sparkContext.parallelize(edgeArray)

I don't like to use Array, because I might have millions of edges. Can I pass DataFrame more directly to RDD?

回答1:

Given

val df = Seq((123, 456), (123, 789), (456, 789)).toDF("v_in", "v_out")

Import

import org.apache.spark.sql.functions.rand
import org.apache.spark.graphx.Edge

and convert:

val edgeRDD = df.toDF("srcId", "dstId")
  .withColumn("attr", rand)
  .as[Edge[Double]].rdd

With graphframes:

spark.jars.packages graphframes:graphframes:X.X.X-sparkY.Y-s_Z.ZZ

where X.X.X is package version, Y.Y is Spark version and Z.ZZ is Scala version, you can create Graph like this:

GraphFrame.fromEdges(df.toDF("src", "dst")).toGraphX

but it'll use Row attributes.