number of tuples limit in RDD; reading RDD throws

2019-08-01 12:58发布

问题:

I tried a modification of DF to RDD for a table containing 25 columns. Thereafter I came to know that Scala (until 2.11.8) has a limitation of a max of 22 tuples that could be used.

val rdd = sc.textFile("/user/hive/warehouse/myDB.db/myTable/")
rdd: org.apache.spark.rdd.RDD[String] = /user/hive/warehouse/myDB.db/myTable/ MapPartitionsRDD[3] at textFile at <console>:24

Sample Data:

[2017-02-26, 100052-ACC, 100052, 3260, 1005, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000]

Accessing each column:

val rdd3 = rdd.map(elements => {
val el = elements.split(",")
(el(0).substring(1,11).toString, el(1).toString ,el(2).toInt, el(3).toInt, el(4).toInt, el(5).sum.toDouble, el(6).sum.toDouble, el(7).sum.toDouble, el(8).sum.toDouble, el(9).sum.toDouble, el(10).sum.toDouble, el(11).sum.toDouble, el(12).sum.toDouble, el(13).sum.toDouble, el(14).sum.toDouble, el(15).sum.toDouble, el(15).sum.toDouble, el(17).sum.toDouble, el(18).sum.toDouble, el(19).sum.toDouble, el(20).sum.toDouble, el(21).sum.toDouble, el(22).sum.toDouble, el(23).sum.toDouble, el(24).sum.toDouble)
}
)

It throws an error:

<console>:1: error: too many elements for tuple: 26, allowed: 22

It's a bug in Scala (https://issues.scala-lang.org/browse/SI-9572). So I created a case class to go ahead with the problem.

case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double, col11: Double, col12: Double, col13: Double, col14: Double, col15: Double, col16: Double, col17: Double, col18: Double, col19: Double, col20: Double, col21: Double, col22: Double, col23: Double, col24: Double, col25:Double)

Thus the new rdd definition becomes:

val rdd3 = rdd.map(elements => {
val el = elements.split(",")
(HandleMaxTuple(el(0).substring(1,11).toString, el(1).toString,el(2).toInt, el(3).toInt, el(4).toInt, el(5).toDouble, el(6).toDouble, el(7).toDouble, el(8).toDouble, el(9).toDouble, el(10).toDouble, el(11).toDouble, el(12).toDouble, el(13).toDouble, el(14).toDouble, el(15).toDouble, el(15).toDouble, el(17).toDouble, el(18).toDouble, el(19).toDouble, el(20).toDouble, el(21).toDouble, el(22).toDouble, el(23).toDouble, el(24).toDouble))
}
)

However, when I try to read the contents of RDD:

rdd.take(2).foreach(println)

it throws me an exception of java.lang.ArrayIndexOutOfBoundsException:

Error Stack:

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
  ... 48 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1

Any idea why it's happening? Any workarounds?

回答1:

Hi i have tried to do exactly same as per your data using case class and i see two problems , first look at the answer

package com.scalaspark.stackoverflow
import org.apache.spark.sql.SparkSession

object StackOverFlow {
  def main(args: Array[String]): Unit = {

    def parser(lines : String): HandleMaxTuple = {
      val fileds = lines.split(",")
      val c1 = fileds(0).substring(1,10).toString()
      val c2 = fileds(1).toString()
      val c3 = fileds(2).replaceAll("\\s","").toInt
      val c4 = fileds(3).replaceAll("\\s","").toInt
      val c5 = fileds(4).replaceAll("\\s","").toInt
      val c6 = fileds(5).replaceAll("\\s","").toDouble
      val c7 = fileds(6).replaceAll("\\s","").toDouble
      val c8 = fileds(7).replaceAll("\\s","").toDouble
      val c9 = fileds(8).replaceAll("\\s","").toDouble
      val c10 = fileds(9).replaceAll("\\s","").toDouble
      val c11 = fileds(10).replaceAll("\\s","").toDouble
      val c12 = fileds(11).replaceAll("\\s","").toDouble
      val c13 = fileds(12).replaceAll("\\s","").toDouble
      val c14 = fileds(13).replaceAll("\\s","").toDouble
      val c15 = fileds(14).replaceAll("\\s","").toDouble
      val c16 = fileds(15).replaceAll("\\s","").toDouble
      val c17 = fileds(16).replaceAll("\\s","").toDouble
      val c18 = fileds(17).replaceAll("\\s","").toDouble
      val c19 = fileds(18).replaceAll("\\s","").toDouble
      val c20 = fileds(19).replaceAll("\\s","").toDouble
      val c21 = fileds(20).replaceAll("\\s","").toDouble
      val c22 = fileds(21).replaceAll("\\s","").toDouble
      val c23 = fileds(22).replaceAll("\\s","").toDouble
      val c24 = fileds(23).replaceAll("\\s","").toDouble
      val c25 = fileds(24).replaceAll("\\s","").toDouble

      val handleMaxTuple : HandleMaxTuple = HandleMaxTuple(c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25)
      return handleMaxTuple 
    }
    val spark = SparkSession
                .builder()
                .appName("number of tuples limit in RDD")
                .master("local[*]")
                .getOrCreate()

    val lines = spark.sparkContext.textFile("C:\\Users\\rajnish.kumar\\Desktop\\sampleData.txt", 1)
    lines.foreach(println)
    val parseddata = lines.map(parser)
    parseddata.foreach(println)
  }

  case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double, col11: Double, col12: Double, col13: Double, col14: Double, col15: Double, col16: Double, col17: Double, col18: Double, col19: Double, col20: Double, col21: Double, col22: Double, col23: Double, col24: Double, col25:Double)
}

First problem is that for el(0) you are using substring() which as per java doc should be

String substring(int beginIndex, int endIndex)
Returns a new string that is a substring of this string. 

when i go with el(0).substring(1,11) i get java.lang.StringIndexOutOfBoundsException: String index out of range: 11

so go with to el(0).substring(0,10) (as index starts form zero not form 1)

Second problem you are using toInt and doubles for some fields conversion but as i can see all of them contains a space in starting, so, beware that this can fail with a NumberFormatException just like it does in Java, like this:

scala> val i = "foo".toInt
java.lang.NumberFormatException: For input string: "foo"

for more info go to https://alvinalexander.com/scala/how-cast-string-to-int-in-scala-string-int-conversion so to correct it i have used .replaceAll("\\s","") which removes all spaces just before the numbers and then converted them to int and doubles

Hope this helps you and when you run above sample you will get output as

HandleMaxTuple(2017-02-26, 100052-ACC,100052,3260,1005,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0)