I have written a function that joins a list of data frames using some common column.Below is the code :
def joinByColumn(dfs: List[DataFrame], column: String): DataFrame = {
//check that all dfs contain the required column
require(dfs.map(_.columns).forall(_.contains(column)))
dfs.reduce((df1, df2) => df1.join(df2, Seq(column), "full_outer"))
}
I have written a test case for this function which works for small value of columnNum (lets say 4),but when I use a larger value like 200 then it throws stack overflow error.
test("complicated") {
val base = sqlContext.createDataFrame(
Seq(
(1, 1)
)
).toDF("key", "a")
val columnNum = 200
val dfs = (1 to columnNum)
.map(i => base.toDF("key", s"a$i"))
.toList
val actual = Ex4.joinByColumn(dfs, "key")
actual.explain()
val row = Row.fromSeq(Seq.fill(columnNum + 1)(1))
val rdd = sc.parallelize(Seq(row))
val columns = "key" :: (1 to columnNum).map(i => s"a$i").toList
val schema = StructType(columns.map(c => StructField(c, IntegerType)))
val expected = sqlContext.createDataFrame(rdd, schema)
expected should beEqualTo(actual)
}
PFB stacktrace:
java.lang.StackOverflowError was thrown.
java.lang.StackOverflowError
at com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer._findMissing(PropertyValueBuffer.java:134)
at com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer.getParameters(PropertyValueBuffer.java:118)
at com.fasterxml.jackson.databind.deser.impl.PropertyBasedCreator.build(PropertyBasedCreator.java:136)
at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:442)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:296)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
at org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:86)
at org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)
at org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)
at scala.Option.map(Option.scala:146)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:137)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
...........
Can someone help me in finding root cause for this? How can we solve this problem in this example and is there any better approach to join huge list of data frames?