Spark: 'Requested array size exceeds VM limit&

2019-08-20 06:39发布

问题:

I am running into a "OutOfMemoryError: Requested array size exceeds VM limit" error when running my Scala Spark job.

I'm running this job on an AWS EMR cluster with the following makeup:

Master: 1 m4.4xlarge 32 vCore, 64 GiB memory

Core: 1 r3.4xlarge 32 vCore, 122 GiB memory

The version of Spark I'm using is 2.2.1 on EMR release label 5.11.0.

I'm running my job in a spark shell with the following configurations:

spark-shell --conf spark.driver.memory=40G 
--conf spark.driver.maxResultSize=25G 
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
--conf spark.kryoserializer.buffer.max=2000 
--conf spark.rpc.message.maxSize=2000 
--conf spark.dynamicAllocation.enabled=true

What I'm attempting to do with this job is to convert a one column dataframe of objects into a one row dataframe that contains a list of those objects.

The objects are as follows:

case class Properties (id: String)
case class Geometry (`type`: String, coordinates: Seq[Seq[Seq[String]]])
case class Features (`type`: String, properties: Properties, geometry: Geometry)

And my dataframe schema is as follows:

root
 |-- geometry: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |-- type: string (nullable = false)
 |-- properties: struct (nullable = false)
 |    |-- id: string (nullable = true)

I'm converting it to a list and adding it to a one row dataframe like so:

val x = Seq(df.collect.toList)
final_df.withColumn("features", typedLit(x))

I don't run into any issues when creating this list and it's pretty quick. However, there seems to be a limit to the size of this list when I try to write it out by doing either of the following:

final_df.first
final_df.write.json(s"s3a://<PATH>/")

I've tried to also convert the list to a dataframe by doing the following, but it seems to never end.

val x = Seq(df.collect.toList)
val y = x.toDF

The largest list I've been capable of getting this dataframe to work with had 813318 Features objects, each of which contains a Geometry object that contains a list of 33 elements, for a total of 29491869 elements.

Attempting to write pretty much any list larger than that gives me the following stacktrace when running my job.

# java.lang.OutOfMemoryError: Requested array size exceeds VM limit
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 33028"...
os::fork_and_exec failed: Cannot allocate memory (12)
18/03/29 21:41:35 ERROR FileFormatWriter: Aborting job null.
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter.write(UnsafeArrayWriter.java:217)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply1_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
    at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows$lzycompute(LocalTableScanExec.scala:41)
    at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows(LocalTableScanExec.scala:36)
    at org.apache.spark.sql.execution.LocalTableScanExec.rdd$lzycompute(LocalTableScanExec.scala:48)
    at org.apache.spark.sql.execution.LocalTableScanExec.rdd(LocalTableScanExec.scala:48)
    at org.apache.spark.sql.execution.LocalTableScanExec.doExecute(LocalTableScanExec.scala:52)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)

I've tried making a million configuration changes, including throwing both more driver and executor memory at this job, but to no avail. Is there any way around this? Any ideas?

回答1:

The problem is here

val x = Seq(df.collect.toList) 

When you do collect on a dataframe it will send all the data of the dataframe to the driver. So if your dataframe is big this will cause driver to get out of memory.

It is to be noted that out of all the memory you assign to the executor, the heap memory which driver can you is generally 30% (if not changed). So what is happening the driver is choking with the data volume due to the collect operation.

Now the thing is you might think the dataframe is smaller in size on disk but that is because the data is serialized and saved there. When you do collect it materialize the dataframe and uses JVM to store the data. This will cause huge memory explode ( generally 5-7X).

I would recommend you to remove the collect part and use df dataframe directly. Because I recon

val x = Seq(df.collect.toList) and df are essentially same


回答2:

Well, there is a dataframe aggregation function that does what you want without doing a collect on the driver. For example if you wanted to collect all "feature" columns by key: df.groupBy($"key").agg(collect_list("feature")), or if you really wanted to do that for the whole dataframe without grouping: df.agg(collect_list("feature")).

However I wonder why you'd want to do that, when it seems easier to work with a dataframe with one row per object than a single row containing the entire result. Even using the collect_list aggregation function I wouldn't be surprised if you still run out of memory.