I have implemented a Spark application using both DataFrame/DataSet and RDD. I submitted the application to my local development environment of Spark 2.1.1. My PC has eight CPU cores.
DateFrame/DataSet
val date : LocalDate = ....
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
import sqlContext.implicits._
val processed = itemListJob.run(rc, priority).select("id").map(d => {
val (a, b) = runJob.run(d, date) // returns a tuple of (int, java.sql.Date), which are the passed parameters.
s"$a, $b"
})
class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
def run(date: LocalDate) = {
import sqlContext.implicits._
sqlContext.read.format("jdbc").options(Map(
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url" -> jdbcSqlConn,
"dbtable" -> s"dbo.GetList('$date')"
)).load()
.select("id")
.as[Int]
}
}
processed.write.text("c:\\temp\\mpa")
RDD
val itemList = itemListJob.run(rc, priority).select("id").rdd.map(r => r(0).asInstanceOf[Int]).collect()
val processed = sc.parallelize(itemList).map(d => {
runJob.run(d, rc) // returns a tuple of (int, LocalDate), which are the passed parameters.
})
processed.saveAsTextFile("c:\\temp\\mpa")
The RDD application split and generated eight text files while the Dataframe/DataSet one generated only one file. Does it mean the RDD ran eight runJob.run()
in parallel while the DataFrame/DataSet approach only ran one a time without concurrency?
I want the runJob.run(), which does the main workload and will also make a jdbc call, to be run distributed and in parallel.