StackOverflowError when operating with a large num

2020-02-29 01:56发布

问题:

I have a wide dataframe (130000 rows x 8700 columns) and when I try to sum all columns I´m getting the following error:

Exception in thread "main" java.lang.StackOverflowError at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:49) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.children(Expression.scala:400) at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:88) ...

This is my Scala code:

  val df = spark.read
    .option("header", "false")
    .option("delimiter", "\t")
    .option("inferSchema", "true")
    .csv("D:\\Documents\\Trabajo\\Fábregas\\matrizLuna\\matrizRelativa")


  val arrayList = df.drop("cups").columns
  var colsList = List[Column]()
  arrayList.foreach { c => colsList :+= col(c) }

  val df_suma = df.withColumn("consumo_total", colsList.reduce(_ + _))

If I do the same with a few columns it works fine but I´m always getting the same error when i try the reduce operation with a high number of columns.

Can anyone suggest how can I do it? is there any limitation on the number of columns?

Thx!

回答1:

You can use a different reduction method that produces a balanced binary tree of depth O(log(n)) instead of a degenerate linearized BinaryExpression chain of depth O(n):

def balancedReduce[X](list: List[X])(op: (X, X) => X): X = list match {
  case Nil => throw new IllegalArgumentException("Cannot reduce empty list")
  case List(x) => x
  case xs => {
    val n = xs.size
    val (as, bs) = list.splitAt(n / 2)
    op(balancedReduce(as)(op), balancedReduce(bs)(op))
  }
}

Now in your code, you can replace

colsList.reduce(_ + _)

by

balancedReduce(colsList)(_ + _)

A little example to further illustrate what happens with the BinaryExpressions, compilable without any dependencies:

sealed trait FormalExpr
case class BinOp(left: FormalExpr, right: FormalExpr) extends FormalExpr {
  override def toString: String = {
    val lStr = left.toString.split("\n").map("  " + _).mkString("\n")
    val rStr = right.toString.split("\n").map("  " + _).mkString("\n")
    return s"BinOp(\n${lStr}\n${rStr}\n)"
  }
}
case object Leaf extends FormalExpr

val leafs = List.fill[FormalExpr](16){Leaf}

println(leafs.reduce(BinOp(_, _)))
println(balancedReduce(leafs)(BinOp(_, _)))

This is what the ordinary reduce does (and this is what essentially happens in your code):

BinOp(
  BinOp(
    BinOp(
      BinOp(
        BinOp(
          BinOp(
            BinOp(
              BinOp(
                BinOp(
                  BinOp(
                    BinOp(
                      BinOp(
                        BinOp(
                          BinOp(
                            BinOp(
                              Leaf
                              Leaf
                            )
                            Leaf
                          )
                          Leaf
                        )
                        Leaf
                      )
                      Leaf
                    )
                    Leaf
                  )
                  Leaf
                )
                Leaf
              )
              Leaf
            )
            Leaf
          )
          Leaf
        )
        Leaf
      )
      Leaf
    )
    Leaf
  )
  Leaf
)

This is what balancedReduce produces:

BinOp(
  BinOp(
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
  )
  BinOp(
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
  )
)

The linearized chain is of length O(n), and when Catalyst is trying to evaluate it, it blows the stack. This should not happen with the flat tree of depth O(log(n)).

And while we are talking about asymptotic runtimes: why are you appending to a mutable colsList? This needs O(n^2) time. Why not simply call toList on the output of .columns?