我遇到一个“OutOfMemoryError异常:要求数组大小超过VM限制”跑我斯卡拉星火作业时的错误。
我正在用下面的化妆AWS EMR集群上这份工作:
主:1 m4.4xlarge 32 CPU核心,64吉布存储器
核心:1个r3.4xlarge 32 CPU核心,122吉布存储器
我使用的是星火的版本是2.2.1 EMR发布标签5.11.0上。
我跑我的以下配置的火花壳工作:
spark-shell --conf spark.driver.memory=40G
--conf spark.driver.maxResultSize=25G
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000
--conf spark.rpc.message.maxSize=2000
--conf spark.dynamicAllocation.enabled=true
什么我试图用这个工作做的是对象的一列数据帧转换为包含这些对象的列表中的一行数据帧。
这些对象如下:
case class Properties (id: String)
case class Geometry (`type`: String, coordinates: Seq[Seq[Seq[String]]])
case class Features (`type`: String, properties: Properties, geometry: Geometry)
而我的数据帧架构如下:
root
|-- geometry: struct (nullable = true)
| |-- type: string (nullable = true)
| |-- coordinates: array (nullable = true)
| | |-- element: array (containsNull = true)
| | | |-- element: array (containsNull = true)
| | | | |-- element: string (containsNull = true)
|-- type: string (nullable = false)
|-- properties: struct (nullable = false)
| |-- id: string (nullable = true)
我将其转换为一个列表并将其添加到一个行数据帧像这样:
val x = Seq(df.collect.toList)
final_df.withColumn("features", typedLit(x))
创建这个列表的时候,我不遇到任何问题,这是相当快。 然而,似乎是此列表的大小的限制,当我试图通过以下任一方式的写出来:
final_df.first
final_df.write.json(s"s3a://<PATH>/")
我一直在努力,也通过执行以下列表转换为数据帧,但它似乎永远不会结束。
val x = Seq(df.collect.toList)
val y = x.toDF
最大的名单我已经能够得到这个数据帧用过813318点特点的对象,每个都包含一个包含33个元素的列表,共计29491869元的几何对象的工作。
运行我的工作时,试图写入几乎比任何大名单给我下面的堆栈跟踪。
# java.lang.OutOfMemoryError: Requested array size exceeds VM limit
# -XX:OnOutOfMemoryError="kill -9 %p"
# Executing /bin/sh -c "kill -9 33028"...
os::fork_and_exec failed: Cannot allocate memory (12)
18/03/29 21:41:35 ERROR FileFormatWriter: Aborting job null.
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter.write(UnsafeArrayWriter.java:217)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply1_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows$lzycompute(LocalTableScanExec.scala:41)
at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows(LocalTableScanExec.scala:36)
at org.apache.spark.sql.execution.LocalTableScanExec.rdd$lzycompute(LocalTableScanExec.scala:48)
at org.apache.spark.sql.execution.LocalTableScanExec.rdd(LocalTableScanExec.scala:48)
at org.apache.spark.sql.execution.LocalTableScanExec.doExecute(LocalTableScanExec.scala:52)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
我试着做一百万的配置更改,包括在这项工作既丢更多的驱动器和执行内存,但无济于事。 有没有办法解决? 有任何想法吗?