Parallelism: rdd.parallelize(…) vs dataSet.map(…)?

2019-09-17 07:50发布

问题:

I have implemented a Spark application using both DataFrame/DataSet and RDD. I submitted the application to my local development environment of Spark 2.1.1. My PC has eight CPU cores.

DateFrame/DataSet

val date : LocalDate = ....
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
import sqlContext.implicits._ 
val processed = itemListJob.run(rc, priority).select("id").map(d => {
  val (a, b) = runJob.run(d, date) // returns a tuple of (int, java.sql.Date), which are the passed parameters.
  s"$a, $b"
})

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    import sqlContext.implicits._ 
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
    .select("id") 
    .as[Int] 
  }
}
processed.write.text("c:\\temp\\mpa")

RDD

val itemList = itemListJob.run(rc, priority).select("id").rdd.map(r => r(0).asInstanceOf[Int]).collect()

val processed = sc.parallelize(itemList).map(d => {
  runJob.run(d, rc) // returns a tuple of (int, LocalDate), which are the passed parameters.
})
processed.saveAsTextFile("c:\\temp\\mpa")

The RDD application split and generated eight text files while the Dataframe/DataSet one generated only one file. Does it mean the RDD ran eight runJob.run() in parallel while the DataFrame/DataSet approach only ran one a time without concurrency?

I want the runJob.run(), which does the main workload and will also make a jdbc call, to be run distributed and in parallel.

回答1:

Yes. But collecting data back to driver to parallelize is really not necessary. You can just call Dataset.repartition(...) to split your one partition into multiple ones. A better way is using other jdbc overloads to load data from JDBC, such as http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,predicates:Array[String],connectionProperties:java.util.Properties):org.apache.spark.sql.DataFrame or http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,predicates:Array[String],connectionProperties:java.util.Properties):org.apache.spark.sql.DataFrame to avoid shuffle.



回答2:

Yes, the number of files produced is a good indicator of the parallelism in the last step. (I can think of a few corner cases where this might not be the case, but that's irrelevant here)

sc.parallelize when running locally should split according to the number of cores.

However, in both cases, you would be using only 1 core to read over the jdbc connection, and in the RDD case, you additionally collect() the data back to the driver, then parallelize back to the task.

The preferred approach is to use repartition rather than collect and then parallelize. And even better would be to always be doing things in parallel. In the case of loading the data frame over jdbc, take a look at whether using the parameters partitionColumn, lowerBound, upperBound, numPartition (link) might be applicable in order to run in parallel from the very start.