Joining huge list of data frames causes stack-over

2020-05-05 17:36发布

问题:

I have written a function that joins a list of data frames using some common column.Below is the code :

def joinByColumn(dfs: List[DataFrame], column: String): DataFrame = {
    //check that all dfs contain the required column
    require(dfs.map(_.columns).forall(_.contains(column)))

    dfs.reduce((df1, df2) => df1.join(df2, Seq(column), "full_outer"))
  }

I have written a test case for this function which works for small value of columnNum (lets say 4),but when I use a larger value like 200 then it throws stack overflow error.

test("complicated") {
    val base = sqlContext.createDataFrame(
      Seq(
        (1, 1)
      )
    ).toDF("key", "a")

    val columnNum = 200

    val dfs = (1 to columnNum)
      .map(i => base.toDF("key", s"a$i"))
      .toList

    val actual = Ex4.joinByColumn(dfs, "key")
    actual.explain()
    val row = Row.fromSeq(Seq.fill(columnNum + 1)(1))
    val rdd = sc.parallelize(Seq(row))

    val columns = "key" :: (1 to columnNum).map(i => s"a$i").toList
    val schema = StructType(columns.map(c => StructField(c, IntegerType)))

    val expected = sqlContext.createDataFrame(rdd, schema)

    expected should beEqualTo(actual)

  }

PFB stacktrace:

java.lang.StackOverflowError was thrown.
java.lang.StackOverflowError
    at com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer._findMissing(PropertyValueBuffer.java:134)
    at com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer.getParameters(PropertyValueBuffer.java:118)
    at com.fasterxml.jackson.databind.deser.impl.PropertyBasedCreator.build(PropertyBasedCreator.java:136)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:442)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:296)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
    at org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:86)
    at org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)
    at org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:137)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)

...........

Can someone help me in finding root cause for this? How can we solve this problem in this example and is there any better approach to join huge list of data frames?

回答1:

I was able to resolve this issue using local-checkpointing :

def joinByColumn(dfs: List[DataFrame], column: String): DataFrame = {
    //check that all dfs contain the required column
    require(dfs.map(_.columns).forall(_.contains(column)))

    dfs.reduce((df1, df2) =>

      df1.join(df2, Seq(column), "full_outer").localCheckpoint(true)


    )
  }