在星火EET多个分区(Multiple Partitions in Spark RDD)

2019-10-22 15:30发布

所以我想从使用播放/斯卡拉项目内星火MySQL数据库获取数据。 因为我尝试接收行的量是巨大的,我的目的是让来自火花RDD一个Iterator。 这里是星火上下文和配置...

  private val configuration = new SparkConf()
    .setAppName("Reporting")
    .setMaster("local[*]")
    .set("spark.executor.memory", "2g")
    .set("spark.akka.timeout", "5")
    .set("spark.driver.allowMultipleContexts", "true")

  val sparkContext = new SparkContext(configuration)

该JDBCRDD是与SQL查询一起如下

val query =
  """
    |SELECT id, date
    |FROM itembid
    |WHERE date BETWEEN ? AND ?
  """.stripMargin


val rdd = new JdbcRDD[ItemLeadReportOutput](SparkProcessor.sparkContext,
      driverFactory,
      query,
      rangeMinValue.get,
      rangeMaxValue.get,
      partitionCount,
      rowMapper)
      .persist(StorageLevel.MEMORY_AND_DISK)

该数据是太大得到它一次。 在与较小的数据集开始的时候很可能将得到rdd.toLocalIterator的迭代器。 然而,在这种特殊情况下无法计算的迭代器。 所以我的目的是通过部分有多个分区和recevie数据部分。 我不断收到错误。 什么是这样做的正确方法是什么?

Answer 1:

我相信,你所面临的问题堆读你的MySQL表。

我会在你的情况做的是获取从MySQL(HDFS,本地)数据到存储系统中的文件,然后我会用火花的上下文文本文件来获取吧!

例如:

object JDBCExample {

  def main(args: Array[String]) {
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost/database"
    val username = "user"
    val password = "pass"

    var connection: Connection = null

    try {
      Class.forName(driver)
      connection = DriverManager.getConnection(url, username, password)

      // This is the tricky part of reading a huge MySQL table you'll need to set your sql statement as following :
      val statement = connection.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY)
      statement.setMaxRows(0)
      statement.setFetchSize(Integer.MIN_VALUE)

      val resultSet = statement.executeQuery("select * from ex_table")

      val fileWriter = new FileWriter("output.csv")
      val writer = new CSVWriter(fileWriter, '\t');

      while (resultSet.next()) {
        val entries = List(... // process result here //...)
        writer.writeNext(entries.toArray)
      }
      writer.close();

    } catch {
      case e: Throwable => e.printStackTrace
    }
    connection.close()
  }
}

一旦你的数据存储,你可以阅读:

val data = sc.textFile("output.csv")

PS:我使用的代码中的一些快捷方式(例如每CSVWriter),但你可以使用它作为一个骨架你打算做什么!



文章来源: Multiple Partitions in Spark RDD