apache-spark memory consumption for cache() / pers

2019-03-11 13:21发布

问题:

My spark cluster hangs when I try to cache() or persist(MEMORY_ONLY_SER()) my RDDs. It works great and computes results in about 7min. if I don't use cache().

I've got 6 c3.xlarge EC2 instances (4 cores, 7.5 GB RAM each), which gives in total 24 cores and 37.7 GB.

I run my application with the following command on master:

SPARK_MEM=5g MEMORY_FRACTION="0.6" SPARK_HOME="/root/spark" java -cp ./uber-offline.jar:/root/spark/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar pl.instream.dsp.offline.OfflineAnalysis

The data set is about 50GB of data partitioned into 24 files. I compressed it and stored in S3 bucket in 24 files (where each of it has size of 7MB to 300MB).

I absolutely can't find a reason for such behaviour of my cluster, but it seems, like spark consumed all available memory and got into GC collecting loop. When I look into gc verbose, I can find a cycles like below:

[GC 5208198K(5208832K), 0,2403780 secs]
[Full GC 5208831K->5208212K(5208832K), 9,8765730 secs]
[Full GC 5208829K->5208238K(5208832K), 9,7567820 secs]
[Full GC 5208829K->5208295K(5208832K), 9,7629460 secs]
[GC 5208301K(5208832K), 0,2403480 secs]
[Full GC 5208831K->5208344K(5208832K), 9,7497710 secs]
[Full GC 5208829K->5208366K(5208832K), 9,7542880 secs]
[Full GC 5208831K->5208415K(5208832K), 9,7574860 secs]

This finally leads to the messages like:

WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(0, ip-xx-xx-xxx-xxx.eu-west-1.compute.internal, 60048, 0) with no recent heart beats: 64828ms exceeds 45000ms

...and stops any progress in computing. This looks like the memory was consumed in 100%, but I tried to use machines with more RAM (like 30GB each), and the effect is the same.

What might be the reason of such behaviour?? Could anybody help??

回答1:

Try using more partitions, you should have 2 - 4 per CPU. IME increasing the number of partitions is often the easiest way to make a program more stable (and often faster).

By default I think your code will use 24 partitions, but for 50 GB of data that is far too little. I'd try a few 100 partitions at least.

Next you use SPARK_MEM=5g but say each node has 7.5 GB, so you might as well have SPARK_MEM=7500m.

You could also try increasing the memory fraction, but I think the above is more likely to help.

General points: use HDFS for you files not s3, it's hugely faster. Ensure you munge your data properly before caching it - e.g. if you have say TSV data with 100 columns, but you only use 10 of the fields, then make sure you've extracted those fields before you try to cache.



回答2:

There is a big difference between 'raw' caching and 'serialized' caching

  1. Raw Caching : (rdd.cache() or rdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY) )

    This will consume 2x-3x the memory. For example a 100MB rdd can consume 350MB in memory

  2. serialized caching (rdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER))

    This consumes pretty much the same amount of memory plus some small overhead. For example 100MB data will consume 100MB + few KB in memory.

How ever raw caching is faster during operations. Serialized cache takes longer (because the object has to be de-serialized before compute)

Here is a interesting result from my experiment.