KMeans has several parameters for its training, with initialization mode defaulted to kmeans||. The problem is that it marches quickly (less than 10min) to the first 13 stages, but then hangs completely, without yielding an error!
Minimal Example which reproduces the issue (it will succeed if I use 1000 points or random initialization):
from pyspark.context import SparkContext
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.random import RandomRDDs
if __name__ == "__main__":
sc = SparkContext(appName='kmeansMinimalExample')
# same with 10000 points
data = RandomRDDs.uniformVectorRDD(sc, 10000000, 64)
C = KMeans.train(data, 8192, maxIterations=10)
sc.stop()
The job does nothing (it doesn't succeed, fail or progress..), as shown below. There are no active/failed tasks in the Executors tab. Stdout and Stderr Logs don't have anything particularly interesting:
If I use k=81
, instead of 8192, it will succeed:
Notice that the two calls of takeSample()
, should not be an issue, since there were called twice in the random initialization case.
So, what is happening? Is Spark's Kmeans unable to scale? Does anybody know? Can you reproduce?
If it was a memory issue, I would get warnings and errors, as I had been before.
Note: placeybordeaux's comments are based on the execution of the job in client mode, where the driver's configurations are invalidated, causing the exit code 143 and such (see edit history), not in cluster mode, where there is no error reported at all, the application just hangs.
From zero323: Why is Spark Mllib KMeans algorithm extremely slow? is related, but I think he witnesses some progress, while mine hangs, I did leave a comment...
I think the 'hanging' is because your executors keep dying. As I mentioned in a side conversation, this code runs fine for me, locally and on a cluster, in Pyspark and Scala. However, it takes a lot longer than it should. It is almost all time spent in k-means|| initialization.
I opened https://issues.apache.org/jira/browse/SPARK-17389 to track two main improvements, one of which you can use now. Edit: really, see also https://issues.apache.org/jira/browse/SPARK-11560
First, there are some code optimizations that would speed up the init by about 13%.
However most of the issue is that it default to 5 steps of k-means|| init, when it seems that 2 is almost always just as good. You can set initialization steps to 2 to see a speedup, especially in the stage that's hanging now.
In my (smaller) test on my laptop, init time went from 5:54 to 1:41 with both changes, mostly due to setting init steps.
If your RDD is so large the collectAsMap will attempt to copy every single element in the RDD onto the single driver program, and then run out of memory and crash. Even though you had partitioned the data, the collectAsMap sends everything to the driver and you job crashs.
You can make sure the number of elements you return is capped by calling take or takeSample, or perhaps filtering or sampling your RDD.
Similarly, be cautious of these other actions as well unless you are sure your dataset size is small enough to fit in memory:
countByKey,
countByValue,
collect
If you really do need every one of these values of the RDD and the data is too big to fit into memory, you could write out the RDD to files or export the RDD to a database that is large enough to hold all the data. As you are using an API, I think you are not able to do that (rewrite all the code maybe? Increase Memory?). I think this collectAsMap in the runAlgorithm method is a really bad thing in Kmeans (https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html)...