How can I parallelize different SparkSQL execution

2019-01-29 13:52发布



  • Scala
  • Apache Spark: Spark 2.2.1
  • EMR on AWS: emr-5.12.1


I have one large DataFrame, like below:

val df ="basePath", "s3://some_bucket/").json("s3://some_bucket/group_id=*/")

There are JSON files ~1TB at s3://some_bucket and it includes 5000 partitions of group_id. I want to execute conversion using SparkSQL, and it differs by each group_id.

The Spark code is like below:

// Create view
val df ="basePath", "s3://data_lake/").json("s3://data_lake/group_id=*/")

// one of queries like this:
//   col1 as userId,
//   col2 as userName,
//   .....
//   lakeView
//   group_id = xxx;
val queries: Seq[String] = getGroupIdMapping

// ** Want to know better ways **
queries.par.foreach(query => {
  val convertedDF: DataFrame = spark.sql(query)"s3://another_bucket/")

The par can parallelize by Runtime.getRuntime.availableProcessors num, and it will be equal to the number of driver's cores.

But It seems weird and not efficient enough because it has nothing to do with Spark's parallization.

I really want to do with something like groupBy in scala.collection.Seq.

This is not right spark code:

df.groupBy(groupId).foreach((groupId, parDF) => {
  val convertedDF: DataFrame = spark.sql(queryByGroupId)"s3://another_bucket")


1) First of all if your data is already stored in files per group id there is no reason to mix it up and then group by id using Spark. It's much more simple and efficient to load for each group id only relevant files

2) Spark itself parallelizes the computation. So in most cases there is no need for external parallelization. But if you feel that Spark doesn't utilize all resources you can:

a) if each individual computation takes less than few seconds then task schedulling overhead is comparable to task execution time so it's possible to get a boost by running few tasks in parallel.

b) computation takes significant amount of time but resources are still underutilized. Then most probably you should increase the number of partitions for your dataset.

3) If you finally decided to run several tasks in parallel it can be achieved this way:

val parallelism = 10
val executor = Executors.newFixedThreadPool(parallelism)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val tasks: Seq[String] = ???
val results: Seq[Future[Int]] = => {
    //spark stuff here
val allDone: Future[Seq[Int]] = Future.sequence(results)
//wait for results
Await.result(allDone, scala.concurrent.duration.Duration.Inf)
executor.shutdown //otherwise jvm will probably not exit