火花:“请求数组大小超过限制VM”写数据帧时(Spark: 'Requested array

2019-10-29 04:04发布

我遇到一个“OutOfMemoryError异常:要求数组大小超过VM限制”跑我斯卡拉星火作业时的错误。

我正在用下面的化妆AWS EMR集群上这份工作:

主:1 m4.4xlarge 32 CPU核心,64吉布存储器

核心:1个r3.4xlarge 32 CPU核心,122吉布存储器

我使用的是星火的版本是2.2.1 EMR发布标签5.11.0上。

我跑我的以下配置的火花壳工作:

spark-shell --conf spark.driver.memory=40G 
--conf spark.driver.maxResultSize=25G 
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
--conf spark.kryoserializer.buffer.max=2000 
--conf spark.rpc.message.maxSize=2000 
--conf spark.dynamicAllocation.enabled=true

什么我试图用这个工作做的是对象的一列数据帧转换为包含这些对象的列表中的一行数据帧。

这些对象如下:

case class Properties (id: String)
case class Geometry (`type`: String, coordinates: Seq[Seq[Seq[String]]])
case class Features (`type`: String, properties: Properties, geometry: Geometry)

而我的数据帧架构如下:

root
 |-- geometry: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |-- type: string (nullable = false)
 |-- properties: struct (nullable = false)
 |    |-- id: string (nullable = true)

我将其转换为一个列表并将其添加到一个行数据帧像这样:

val x = Seq(df.collect.toList)
final_df.withColumn("features", typedLit(x))

创建这个列表的时候,我不遇到任何问题,这是相当快。 然而,似乎是此列表的大小的限制,当我试图通过以下任一方式的写出来:

final_df.first
final_df.write.json(s"s3a://<PATH>/")

我一直在努力,也通过执行以下列表转换为数据帧,但它似乎永远不会结束。

val x = Seq(df.collect.toList)
val y = x.toDF

最大的名单我已经能够得到这个数据帧用过813318点特点的对象,每个都包含一个包含33个元素的列表,共计29491869元的几何对象的工作。

运行我的工作时,试图写入几乎比任何大名单给我下面的堆栈跟踪。

# java.lang.OutOfMemoryError: Requested array size exceeds VM limit
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 33028"...
os::fork_and_exec failed: Cannot allocate memory (12)
18/03/29 21:41:35 ERROR FileFormatWriter: Aborting job null.
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter.write(UnsafeArrayWriter.java:217)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply1_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
    at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows$lzycompute(LocalTableScanExec.scala:41)
    at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows(LocalTableScanExec.scala:36)
    at org.apache.spark.sql.execution.LocalTableScanExec.rdd$lzycompute(LocalTableScanExec.scala:48)
    at org.apache.spark.sql.execution.LocalTableScanExec.rdd(LocalTableScanExec.scala:48)
    at org.apache.spark.sql.execution.LocalTableScanExec.doExecute(LocalTableScanExec.scala:52)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)

我试着做一百万的配置更改,包括在这项工作既丢更多的驱动器和执行内存,但无济于事。 有没有办法解决? 有任何想法吗?

Answer 1:

问题就在这里

val x = Seq(df.collect.toList) 

当你收集一个数据帧将发送数据帧到驱动程序的所有数据。 所以,如果你的数据框大,这将导致司机摆脱记忆。

应当指出的是,从所有的内存分配给执行者,堆内存的驱动程序,你可以一般为30%(如果不改变)。 所以,发生了什么驱动程序与数据量窒息由于聚集操作。

现在的事情是,你可能会认为数据帧是在磁盘上的大小小,但就是因为数据被序列化并保存在那里。 当你收集它物化数据框,并使用JVM来存储数据。 这将导致巨大的内存爆炸(一般为5-7X)。

我会建议您删除收集的部分,并使用df直接数据帧。 因为我侦察

val x = Seq(df.collect.toList) and df are essentially same


Answer 2:

那么,有一个数据帧聚合功能,你想要做什么而不做收集的驱动程序。 例如,如果你想通过钥匙收集所有“功能”列: df.groupBy($"key").agg(collect_list("feature"))或者如果你真正想做的事,对于不进行分组整个数据框: df.agg(collect_list("feature"))

但是我不知道为什么你要做到这一点,当它似乎更容易用数据帧的工作与每个对象一行不是包含整个结果一行。 即使使用collect_list聚合函数,我不会感到惊讶,如果你仍然运行内存不足。



文章来源: Spark: 'Requested array size exceeds VM limit' when writing dataframe