My cluster: 1 master, 11 slaves, each node has 6 GB memory.
My settings:
spark.executor.memory=4g, Dspark.akka.frameSize=512
Here is the problem:
First, I read some data (2.19 GB) from HDFS to RDD:
val imageBundleRDD = sc.newAPIHadoopFile(...)
Second, do something on this RDD:
val res = imageBundleRDD.map(data => {
val desPoints = threeDReconstruction(data._2, bg)
(data._1, desPoints)
})
Last, output to HDFS:
res.saveAsNewAPIHadoopFile(...)
When I run my program it shows:
.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space
There are too many tasks?
PS: Every thing is ok when the input data is about 225 MB.
How can I solve this problem?
The location to set the memory heap size (at least in spark-1.0.0) is in conf/spark-env. The relevant variables are
SPARK_EXECUTOR_MEMORY
&SPARK_DRIVER_MEMORY
. More docs are in the deployment guideAlso, don't forget to copy the configuration file to all the slave nodes.
You should increase the driver memory. In your $SPARK_HOME/conf folder you should find the file
spark-defaults.conf
, edit and set thespark.driver.memory 4000m
depending on the memory on your master, I think. This is what fixed the issue for me and everything runs smoothlyHave a look at the start up scripts a Java heap size is set there, it looks like you're not setting this before running Spark worker.
You can find the documentation to deploy scripts here.
You should configure offHeap memory settings as shown below:
Give the driver memory and executor memory as per your machines RAM availability. You can increase the offHeap size if you are still facing the OutofMemory issue.
I suffered from this issue a lot, we use dynamic resource allocation and I thought it will utilize my cluster resources to best fit the application.
But the truth is, the dynamic resource allocation doesn't set the driver memory and it keeps it to its default value which is 1g.
I have resolved it by setting spark.driver.memory to a number that suits my driver's memory (for 32gb ram I set it to 18gb)
you can set it using spark submit command as follows:
Very important note, this property will not be taken into consideration if you set it from code, according to spark documentation:
Broadly speaking, spark Executor JVM memory can be divided into two parts. Spark memory and User memory. This is controlled by property
spark.memory.fraction
- the value is between 0 and 1. When working with images or doing memory intensive processing in spark applications, consider decreasing thespark.memory.fraction
. This will make more memory available to your application work. Spark can spill, so it will still work with less memory share.The second part of the problem is division of work. If possible, partition your data into smaller chunks. Smaller data possibly needs less memory. But if that is not possible, you are sacrifice compute for memory. Typically a single executor will be running multiple cores. Total memory of executors must be enough to handle memory requirements of all concurrent tasks. If increasing executor memory is not a option, you can decrease the cores per executor so that each task gets more memory to work with. Test with 1 core executors which have largest possible memory you can give and then keep increasing cores until you find the best core count.