I have a spark dataframe 'mydataframe' with many columns. I am trying to run kmeans on only two columns: lat and long (latitude & longitude) using them as simple values). I want to extract 7 clusters based on just those 2 columns and then I want to attach the cluster asignment to my original dataframe. I've tried:
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd # needs to be an RDD
data_rdd.cache()
# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")
But I am getting an error after a while:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5191.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5191.0 (TID 260738, 10.19.211.69, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last)
I've tried to detach and re-attach the cluster. Same result. What am I doing wrong?
Thank you very much!
Since, based on another recent question of yours, I guess you are in your very first steps with Spark clustering (you are even importing sqrt
& array
, without ever using them, probably because it is like that in the docs example), let me offer advice in a more general level rather than in the specific question you are asking here (hopefully also saving you from subsequently opening 3-4 more questions, trying to get your cluster assignments back into your dataframe)...
Since
you have your data already in a dataframe
you want to attach the cluster membership back into your initial
dataframe
you have no reason to revert to an RDD and use the (soon to be deprecated) MLlib package; you will do your job much more easily, elegantly, and efficiently using the (now recommended) ML package, which works directly with dataframes.
Step 0 - make some toy data resembling yours:
spark.version
# u'2.2.0'
df = spark.createDataFrame([[0, 33.3, -17.5],
[1, 40.4, -20.5],
[2, 28., -23.9],
[3, 29.5, -19.0],
[4, 32.8, -18.84]
],
["other","lat", "long"])
df.show()
# +-----+----+------+
# |other| lat| long|
# +-----+----+------+
# | 0|33.3| -17.5|
# | 1|40.4| -20.5|
# | 2|28.0| -23.9|
# | 3|29.5| -19.0|
# | 4|32.8|-18.84|
# +-----+----+------+
Step 1 - assemble your features
In contrast to most ML packages out there, Spark ML requires your input features to be gathered in a single column of your dataframe, usually named features
; and it provides a specific method for doing this, VectorAssembler
:
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show()
# +-----+----+------+-------------+
# |other| lat| long| features|
# +-----+----+------+-------------+
# | 0|33.3| -17.5| [33.3,-17.5]|
# | 1|40.4| -20.5| [40.4,-20.5]|
# | 2|28.0| -23.9| [28.0,-23.9]|
# | 3|29.5| -19.0| [29.5,-19.0]|
# | 4|32.8|-18.84|[32.8,-18.84]|
# +-----+----+------+-------------+
As perhaps already guessed, the argument inputCols
serves to tell VectoeAssembler
which particular columns in our dataframe are to be used as features.
Step 2 - fit your KMeans model
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=2, seed=1) # 2 clusters here
model = kmeans.fit(new_df.select('features'))
select('features')
here serves to tell the algorithm which column of the dataframe to use for clustering - remember that, after Step 1 above, your original lat
& long
features are no more directly used.
Step 3 - transform your initial dataframe to include cluster assignments
transformed = model.transform(new_df)
transformed.show()
# +-----+----+------+-------------+----------+
# |other| lat| long| features|prediction|
# +-----+----+------+-------------+----------+
# | 0|33.3| -17.5| [33.3,-17.5]| 0|
# | 1|40.4| -20.5| [40.4,-20.5]| 1|
# | 2|28.0| -23.9| [28.0,-23.9]| 0|
# | 3|29.5| -19.0| [29.5,-19.0]| 0|
# | 4|32.8|-18.84|[32.8,-18.84]| 0|
# +-----+----+------+-------------+----------+
The last column of the transformed
dataframe, prediction
, shows the cluster assignment - in my toy case, I have ended up with 4 records in cluster #0 and 1 record in cluster #1.
You can further manipulate the transformed
dataframe with select
statements, or even drop
the features
column (which has now fulfilled its function and may be no longer necessary)...
Hopefully you are much closer now to what you actually wanted to achieve in the first place. For extracting cluster statistics etc., another recent answer of mine might be helpful (and upvotes are always welcome)...
Despite my other general answer, and in case you, for whatever reason, must stick with MLlib & RDDs, here is what causes your error using the same toy df
.
When you select
columns from a dataframe to convert to RDD, as you do, the result is an RDD of Rows:
df.select('lat', 'long').rdd.collect()
# [Row(lat=33.3, long=-17.5), Row(lat=40.4, long=-20.5), Row(lat=28.0, long=-23.9), Row(lat=29.5, long=-19.0), Row(lat=32.8, long=-18.84)]
which is not suitable as an input to MLlib KMeans. You'll need a map
operation for this to work:
df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1])).collect()
# [(33.3, -17.5), (40.4, -20.5), (28.0, -23.9), (29.5, -19.0), (32.8, -18.84)]
So, your code should be like this:
from pyspark.mllib.clustering import KMeans, KMeansModel
rdd = df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1]))
clusters = KMeans.train(rdd, 2, maxIterations=10, initializationMode="random") # works OK
clusters.centers
# [array([ 40.4, -20.5]), array([ 30.9 , -19.81])]