How to optimize below spark code (scala)?

2019-06-01 09:59发布

问题:

I have some huge files (of 19GB, 40GB etc.). I need to execute following algorithm on these files:

  1. Read the file
  2. Sort it on the basis of 1 column
  3. Take 1st 70% of the data:

    a) Take all the distinct records of the subset of the columns
    b) write it to train file

  4. Take the last 30% of the data:

    a) Take all the distinct records of the subset of the columns

    b) write it to test file

I tried running following code in spark (using Scala).

import scala.collection.mutable.ListBuffer

import java.io.FileWriter

import org.apache.spark.sql.functions.year

    val offers = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .option("delimiter", ",")
    .load("input.txt")
    val csvOuterJoin = offers.orderBy("utcDate")
    val trainDF = csvOuterJoin.limit((csvOuterJoin.count*.7).toInt)
    val maxTimeTrain = trainDF.agg(max("utcDate"))
    val maxtimeStamp = maxTimeTrain.collect()(0).getTimestamp(0)
    val testDF = csvOuterJoin.filter(csvOuterJoin("utcDate") > maxtimeStamp)    
    val inputTrain = trainDF.select("offerIdClicks","userIdClicks","userIdOffers","offerIdOffers").distinct
    val inputTest = testDF.select("offerIdClicks","userIdClicks","userIdOffers","offerIdOffers").distinct
    inputTrain.rdd.coalesce(1,false).saveAsTextFile("train.csv")
    inputTest.rdd.coalesce(1,false).saveAsTextFile("test.csv")   

This is how I initiate spark-shell:

./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0 --total-executor-cores 70 --executor-memory 10G --driver-memory 20G

I execute this code on a distributed cluster with 1 master and many slaves each having sufficient amount of RAM. As of now, this code ends up taking a lot of memory and I get java heap space issues.

Is there a way to optimize the above code (preferably in spark)? I appreciate any kind of minimal help in optimizing the above code.

回答1:

The problem is you don't distribute at all. And the source is here:

val csvOuterJoin = offers.orderBy("utcDate")
val trainDF = csvOuterJoin.limit((csvOuterJoin.count*.7).toInt)

limit operation is not designed for large scale operations and it moves all records to a single partition:

val df = spark.range(0, 10000, 1, 1000)
df.rdd.partitions.size
Int = 1000
// Take all records by limit
df.orderBy($"id").limit(10000).rdd.partitions.size
Int = 1

You can use RDD API:

val ordered = df.orderBy($"utcDate")
val cnt = df.count * 0.7

val train = spark.createDataFrame(ordered.rdd.zipWithIndex.filter {
  case (_, i) => i <= cnt 
}.map(_._1), ordered.schema)

val test = spark.createDataFrame(ordered.rdd.zipWithIndex.filter {
  case (_, i) => i > cnt 
}.map(_._1), ordered.schema)


回答2:

coalesce(1,false) means merging all data into one partition, aka keeping 40GB data in memory of one node.

Never try to get all data in one file by coalesce(1,false). Instead, you should just call saveAsTextFile(so the output looks like part-00001, part00002, etc.) and then merge these partition files outside. The merge operation depends on your file system. In case of HDFS, you can use http://hadoop.apache.org/docs/r2.7.3/hadoop-project-dist/hadoop-common/FileSystemShell.html#getmerge