Multiple Partitions in Spark RDD

2019-08-02 12:14发布

问题:

So I am trying to get data from a MySQL database using Spark within a Play/Scala project. Since the amount of rows I am trying to receive is huge, my aim is to get an Iterator from the spark rdd. Here is the Spark context and configuration...

  private val configuration = new SparkConf()
    .setAppName("Reporting")
    .setMaster("local[*]")
    .set("spark.executor.memory", "2g")
    .set("spark.akka.timeout", "5")
    .set("spark.driver.allowMultipleContexts", "true")

  val sparkContext = new SparkContext(configuration)

The JDBCRDD is as follows along with the sql query

val query =
  """
    |SELECT id, date
    |FROM itembid
    |WHERE date BETWEEN ? AND ?
  """.stripMargin


val rdd = new JdbcRDD[ItemLeadReportOutput](SparkProcessor.sparkContext,
      driverFactory,
      query,
      rangeMinValue.get,
      rangeMaxValue.get,
      partitionCount,
      rowMapper)
      .persist(StorageLevel.MEMORY_AND_DISK)

The data is too much to get it at once. At the beginning with smaller data sets it was possible the get an iterator from rdd.toLocalIterator. However in this specific case it can not compute an iterator. So my aim is to have multiple partitions and recevie data part by part. I keep getting errors. What is the correct way of doing this ?

回答1:

I believe that you are facing a heap problem read your MySQL table.

What I'll do in your case is to fetch the data from MySQL into the storage system (HDFS, local) files and then I'll use spark's context textFile to fetch it!

Example :

object JDBCExample {

  def main(args: Array[String]) {
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost/database"
    val username = "user"
    val password = "pass"

    var connection: Connection = null

    try {
      Class.forName(driver)
      connection = DriverManager.getConnection(url, username, password)

      // This is the tricky part of reading a huge MySQL table you'll need to set your sql statement as following :
      val statement = connection.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY)
      statement.setMaxRows(0)
      statement.setFetchSize(Integer.MIN_VALUE)

      val resultSet = statement.executeQuery("select * from ex_table")

      val fileWriter = new FileWriter("output.csv")
      val writer = new CSVWriter(fileWriter, '\t');

      while (resultSet.next()) {
        val entries = List(... // process result here //...)
        writer.writeNext(entries.toArray)
      }
      writer.close();

    } catch {
      case e: Throwable => e.printStackTrace
    }
    connection.close()
  }
}

Once your data is stored you can read it:

val data = sc.textFile("output.csv")

PS: I've used some shortcuts (CSVWriter per example) in the code but you can use it as a skeleton to what you are intending to do!