I tried a modification of DF to RDD for a table containing 25 columns. Thereafter I came to know that Scala (until 2.11.8) has a limitation of a max of 22 tuples that could be used.
val rdd = sc.textFile("/user/hive/warehouse/myDB.db/myTable/")
rdd: org.apache.spark.rdd.RDD[String] = /user/hive/warehouse/myDB.db/myTable/ MapPartitionsRDD[3] at textFile at <console>:24
Sample Data:
[2017-02-26, 100052-ACC, 100052, 3260, 1005, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000]
Accessing each column:
val rdd3 = rdd.map(elements => {
val el = elements.split(",")
(el(0).substring(1,11).toString, el(1).toString ,el(2).toInt, el(3).toInt, el(4).toInt, el(5).sum.toDouble, el(6).sum.toDouble, el(7).sum.toDouble, el(8).sum.toDouble, el(9).sum.toDouble, el(10).sum.toDouble, el(11).sum.toDouble, el(12).sum.toDouble, el(13).sum.toDouble, el(14).sum.toDouble, el(15).sum.toDouble, el(15).sum.toDouble, el(17).sum.toDouble, el(18).sum.toDouble, el(19).sum.toDouble, el(20).sum.toDouble, el(21).sum.toDouble, el(22).sum.toDouble, el(23).sum.toDouble, el(24).sum.toDouble)
}
)
It throws an error:
<console>:1: error: too many elements for tuple: 26, allowed: 22
It's a bug in Scala (https://issues.scala-lang.org/browse/SI-9572). So I created a case class to go ahead with the problem.
case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double, col11: Double, col12: Double, col13: Double, col14: Double, col15: Double, col16: Double, col17: Double, col18: Double, col19: Double, col20: Double, col21: Double, col22: Double, col23: Double, col24: Double, col25:Double)
Thus the new rdd definition becomes:
val rdd3 = rdd.map(elements => {
val el = elements.split(",")
(HandleMaxTuple(el(0).substring(1,11).toString, el(1).toString,el(2).toInt, el(3).toInt, el(4).toInt, el(5).toDouble, el(6).toDouble, el(7).toDouble, el(8).toDouble, el(9).toDouble, el(10).toDouble, el(11).toDouble, el(12).toDouble, el(13).toDouble, el(14).toDouble, el(15).toDouble, el(15).toDouble, el(17).toDouble, el(18).toDouble, el(19).toDouble, el(20).toDouble, el(21).toDouble, el(22).toDouble, el(23).toDouble, el(24).toDouble))
}
)
However, when I try to read the contents of RDD:
rdd.take(2).foreach(println)
it throws me an exception of java.lang.ArrayIndexOutOfBoundsException:
Error Stack:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
... 48 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
Any idea why it's happening? Any workarounds?
Hi i have tried to do exactly same as per your data using case class and i see two problems , first look at the answer
First problem is that for el(0) you are using substring() which as per java doc should be
when i go with
el(0).substring(1,11)
i getjava.lang.StringIndexOutOfBoundsException: String index out of range: 11
so go with to el(0).substring(0,10) (as index starts form zero not form 1)
Second problem you are using toInt and doubles for some fields conversion but as i can see all of them contains a space in starting, so, beware that this can fail with a NumberFormatException just like it does in Java, like this:
for more info go to https://alvinalexander.com/scala/how-cast-string-to-int-in-scala-string-int-conversion so to correct it i have used
.replaceAll("\\s","")
which removes all spaces just before the numbers and then converted them to int and doublesHope this helps you and when you run above sample you will get output as